1/* RxRPC recvmsg() implementation 2 * 3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 4 * Written by David Howells (dhowells@redhat.com) 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public License 8 * as published by the Free Software Foundation; either version 9 * 2 of the License, or (at your option) any later version. 10 */ 11 12#include <linux/net.h> 13#include <linux/skbuff.h> 14#include <linux/export.h> 15#include <net/sock.h> 16#include <net/af_rxrpc.h> 17#include "ar-internal.h" 18 19/* 20 * removal a call's user ID from the socket tree to make the user ID available 21 * again and so that it won't be seen again in association with that call 22 */ 23void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call) 24{ 25 _debug("RELEASE CALL %d", call->debug_id); 26 27 if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) { 28 write_lock_bh(&rx->call_lock); 29 rb_erase(&call->sock_node, &call->socket->calls); 30 clear_bit(RXRPC_CALL_HAS_USERID, &call->flags); 31 write_unlock_bh(&rx->call_lock); 32 } 33 34 read_lock_bh(&call->state_lock); 35 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) && 36 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events)) 37 rxrpc_queue_call(call); 38 read_unlock_bh(&call->state_lock); 39} 40 41/* 42 * receive a message from an RxRPC socket 43 * - we need to be careful about two or more threads calling recvmsg 44 * simultaneously 45 */ 46int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len, 47 int flags) 48{ 49 struct rxrpc_skb_priv *sp; 50 struct rxrpc_call *call = NULL, *continue_call = NULL; 51 struct rxrpc_sock *rx = rxrpc_sk(sock->sk); 52 struct sk_buff *skb; 53 long timeo; 54 int copy, ret, ullen, offset, copied = 0; 55 u32 abort_code; 56 57 DEFINE_WAIT(wait); 58 59 _enter(",,,%zu,%d", len, flags); 60 61 if (flags & (MSG_OOB | MSG_TRUNC)) 62 return -EOPNOTSUPP; 63 64 ullen = msg->msg_flags & MSG_CMSG_COMPAT ? 4 : sizeof(unsigned long); 65 66 timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT); 67 msg->msg_flags |= MSG_MORE; 68 69 lock_sock(&rx->sk); 70 71 for (;;) { 72 /* return immediately if a client socket has no outstanding 73 * calls */ 74 if (RB_EMPTY_ROOT(&rx->calls)) { 75 if (copied) 76 goto out; 77 if (rx->sk.sk_state != RXRPC_SERVER_LISTENING) { 78 release_sock(&rx->sk); 79 if (continue_call) 80 rxrpc_put_call(continue_call); 81 return -ENODATA; 82 } 83 } 84 85 /* get the next message on the Rx queue */ 86 skb = skb_peek(&rx->sk.sk_receive_queue); 87 if (!skb) { 88 /* nothing remains on the queue */ 89 if (copied && 90 (flags & MSG_PEEK || timeo == 0)) 91 goto out; 92 93 /* wait for a message to turn up */ 94 release_sock(&rx->sk); 95 prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait, 96 TASK_INTERRUPTIBLE); 97 ret = sock_error(&rx->sk); 98 if (ret) 99 goto wait_error; 100 101 if (skb_queue_empty(&rx->sk.sk_receive_queue)) { 102 if (signal_pending(current)) 103 goto wait_interrupted; 104 timeo = schedule_timeout(timeo); 105 } 106 finish_wait(sk_sleep(&rx->sk), &wait); 107 lock_sock(&rx->sk); 108 continue; 109 } 110 111 peek_next_packet: 112 sp = rxrpc_skb(skb); 113 call = sp->call; 114 ASSERT(call != NULL); 115 116 _debug("next pkt %s", rxrpc_pkts[sp->hdr.type]); 117 118 /* make sure we wait for the state to be updated in this call */ 119 spin_lock_bh(&call->lock); 120 spin_unlock_bh(&call->lock); 121 122 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) { 123 _debug("packet from released call"); 124 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 125 BUG(); 126 rxrpc_free_skb(skb); 127 continue; 128 } 129 130 /* determine whether to continue last data receive */ 131 if (continue_call) { 132 _debug("maybe cont"); 133 if (call != continue_call || 134 skb->mark != RXRPC_SKB_MARK_DATA) { 135 release_sock(&rx->sk); 136 rxrpc_put_call(continue_call); 137 _leave(" = %d [noncont]", copied); 138 return copied; 139 } 140 } 141 142 rxrpc_get_call(call); 143 144 /* copy the peer address and timestamp */ 145 if (!continue_call) { 146 if (msg->msg_name) { 147 size_t len = 148 sizeof(call->conn->trans->peer->srx); 149 memcpy(msg->msg_name, 150 &call->conn->trans->peer->srx, len); 151 msg->msg_namelen = len; 152 } 153 sock_recv_timestamp(msg, &rx->sk, skb); 154 } 155 156 /* receive the message */ 157 if (skb->mark != RXRPC_SKB_MARK_DATA) 158 goto receive_non_data_message; 159 160 _debug("recvmsg DATA #%u { %d, %d }", 161 ntohl(sp->hdr.seq), skb->len, sp->offset); 162 163 if (!continue_call) { 164 /* only set the control data once per recvmsg() */ 165 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 166 ullen, &call->user_call_ID); 167 if (ret < 0) 168 goto copy_error; 169 ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags)); 170 } 171 172 ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv); 173 ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1); 174 call->rx_data_recv = ntohl(sp->hdr.seq); 175 176 ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten); 177 178 offset = sp->offset; 179 copy = skb->len - offset; 180 if (copy > len - copied) 181 copy = len - copied; 182 183 ret = skb_copy_datagram_msg(skb, offset, msg, copy); 184 185 if (ret < 0) 186 goto copy_error; 187 188 /* handle piecemeal consumption of data packets */ 189 _debug("copied %d+%d", copy, copied); 190 191 offset += copy; 192 copied += copy; 193 194 if (!(flags & MSG_PEEK)) 195 sp->offset = offset; 196 197 if (sp->offset < skb->len) { 198 _debug("buffer full"); 199 ASSERTCMP(copied, ==, len); 200 break; 201 } 202 203 /* we transferred the whole data packet */ 204 if (sp->hdr.flags & RXRPC_LAST_PACKET) { 205 _debug("last"); 206 if (call->conn->out_clientflag) { 207 /* last byte of reply received */ 208 ret = copied; 209 goto terminal_message; 210 } 211 212 /* last bit of request received */ 213 if (!(flags & MSG_PEEK)) { 214 _debug("eat packet"); 215 if (skb_dequeue(&rx->sk.sk_receive_queue) != 216 skb) 217 BUG(); 218 rxrpc_free_skb(skb); 219 } 220 msg->msg_flags &= ~MSG_MORE; 221 break; 222 } 223 224 /* move on to the next data message */ 225 _debug("next"); 226 if (!continue_call) 227 continue_call = sp->call; 228 else 229 rxrpc_put_call(call); 230 call = NULL; 231 232 if (flags & MSG_PEEK) { 233 _debug("peek next"); 234 skb = skb->next; 235 if (skb == (struct sk_buff *) &rx->sk.sk_receive_queue) 236 break; 237 goto peek_next_packet; 238 } 239 240 _debug("eat packet"); 241 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 242 BUG(); 243 rxrpc_free_skb(skb); 244 } 245 246 /* end of non-terminal data packet reception for the moment */ 247 _debug("end rcv data"); 248out: 249 release_sock(&rx->sk); 250 if (call) 251 rxrpc_put_call(call); 252 if (continue_call) 253 rxrpc_put_call(continue_call); 254 _leave(" = %d [data]", copied); 255 return copied; 256 257 /* handle non-DATA messages such as aborts, incoming connections and 258 * final ACKs */ 259receive_non_data_message: 260 _debug("non-data"); 261 262 if (skb->mark == RXRPC_SKB_MARK_NEW_CALL) { 263 _debug("RECV NEW CALL"); 264 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &abort_code); 265 if (ret < 0) 266 goto copy_error; 267 if (!(flags & MSG_PEEK)) { 268 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 269 BUG(); 270 rxrpc_free_skb(skb); 271 } 272 goto out; 273 } 274 275 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 276 ullen, &call->user_call_ID); 277 if (ret < 0) 278 goto copy_error; 279 ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags)); 280 281 switch (skb->mark) { 282 case RXRPC_SKB_MARK_DATA: 283 BUG(); 284 case RXRPC_SKB_MARK_FINAL_ACK: 285 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &abort_code); 286 break; 287 case RXRPC_SKB_MARK_BUSY: 288 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_BUSY, 0, &abort_code); 289 break; 290 case RXRPC_SKB_MARK_REMOTE_ABORT: 291 abort_code = call->abort_code; 292 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code); 293 break; 294 case RXRPC_SKB_MARK_NET_ERROR: 295 _debug("RECV NET ERROR %d", sp->error); 296 abort_code = sp->error; 297 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &abort_code); 298 break; 299 case RXRPC_SKB_MARK_LOCAL_ERROR: 300 _debug("RECV LOCAL ERROR %d", sp->error); 301 abort_code = sp->error; 302 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, 303 &abort_code); 304 break; 305 default: 306 BUG(); 307 break; 308 } 309 310 if (ret < 0) 311 goto copy_error; 312 313terminal_message: 314 _debug("terminal"); 315 msg->msg_flags &= ~MSG_MORE; 316 msg->msg_flags |= MSG_EOR; 317 318 if (!(flags & MSG_PEEK)) { 319 _net("free terminal skb %p", skb); 320 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 321 BUG(); 322 rxrpc_free_skb(skb); 323 rxrpc_remove_user_ID(rx, call); 324 } 325 326 release_sock(&rx->sk); 327 rxrpc_put_call(call); 328 if (continue_call) 329 rxrpc_put_call(continue_call); 330 _leave(" = %d", ret); 331 return ret; 332 333copy_error: 334 _debug("copy error"); 335 release_sock(&rx->sk); 336 rxrpc_put_call(call); 337 if (continue_call) 338 rxrpc_put_call(continue_call); 339 _leave(" = %d", ret); 340 return ret; 341 342wait_interrupted: 343 ret = sock_intr_errno(timeo); 344wait_error: 345 finish_wait(sk_sleep(&rx->sk), &wait); 346 if (continue_call) 347 rxrpc_put_call(continue_call); 348 if (copied) 349 copied = ret; 350 _leave(" = %d [waitfail %d]", copied, ret); 351 return copied; 352 353} 354 355/** 356 * rxrpc_kernel_data_delivered - Record delivery of data message 357 * @skb: Message holding data 358 * 359 * Record the delivery of a data message. This permits RxRPC to keep its 360 * tracking correct. The socket buffer will be deleted. 361 */ 362void rxrpc_kernel_data_delivered(struct sk_buff *skb) 363{ 364 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 365 struct rxrpc_call *call = sp->call; 366 367 ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv); 368 ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1); 369 call->rx_data_recv = ntohl(sp->hdr.seq); 370 371 ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten); 372 rxrpc_free_skb(skb); 373} 374 375EXPORT_SYMBOL(rxrpc_kernel_data_delivered); 376 377/** 378 * rxrpc_kernel_is_data_last - Determine if data message is last one 379 * @skb: Message holding data 380 * 381 * Determine if data message is last one for the parent call. 382 */ 383bool rxrpc_kernel_is_data_last(struct sk_buff *skb) 384{ 385 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 386 387 ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA); 388 389 return sp->hdr.flags & RXRPC_LAST_PACKET; 390} 391 392EXPORT_SYMBOL(rxrpc_kernel_is_data_last); 393 394/** 395 * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message 396 * @skb: Message indicating an abort 397 * 398 * Get the abort code from an RxRPC abort message. 399 */ 400u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb) 401{ 402 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 403 404 ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_REMOTE_ABORT); 405 406 return sp->call->abort_code; 407} 408 409EXPORT_SYMBOL(rxrpc_kernel_get_abort_code); 410 411/** 412 * rxrpc_kernel_get_error - Get the error number from an RxRPC error message 413 * @skb: Message indicating an error 414 * 415 * Get the error number from an RxRPC error message. 416 */ 417int rxrpc_kernel_get_error_number(struct sk_buff *skb) 418{ 419 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 420 421 return sp->error; 422} 423 424EXPORT_SYMBOL(rxrpc_kernel_get_error_number); 425