root/net/rds/recv.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. rds_inc_init
  2. rds_inc_path_init
  3. rds_inc_addref
  4. rds_inc_put
  5. rds_recv_rcvbuf_delta
  6. rds_conn_peer_gen_update
  7. rds_recv_incoming_exthdrs
  8. rds_recv_hs_exthdrs
  9. rds_start_mprds
  10. rds_recv_incoming
  11. rds_next_incoming
  12. rds_still_queued
  13. rds_notify_queue_get
  14. rds_notify_cong
  15. rds_cmsg_recv
  16. rds_recvmsg_zcookie
  17. rds_recvmsg
  18. rds_clear_recv_queue
  19. rds_inc_info_copy
  20. rds6_inc_info_copy

   1 /*
   2  * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved.
   3  *
   4  * This software is available to you under a choice of one of two
   5  * licenses.  You may choose to be licensed under the terms of the GNU
   6  * General Public License (GPL) Version 2, available from the file
   7  * COPYING in the main directory of this source tree, or the
   8  * OpenIB.org BSD license below:
   9  *
  10  *     Redistribution and use in source and binary forms, with or
  11  *     without modification, are permitted provided that the following
  12  *     conditions are met:
  13  *
  14  *      - Redistributions of source code must retain the above
  15  *        copyright notice, this list of conditions and the following
  16  *        disclaimer.
  17  *
  18  *      - Redistributions in binary form must reproduce the above
  19  *        copyright notice, this list of conditions and the following
  20  *        disclaimer in the documentation and/or other materials
  21  *        provided with the distribution.
  22  *
  23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  30  * SOFTWARE.
  31  *
  32  */
  33 #include <linux/kernel.h>
  34 #include <linux/slab.h>
  35 #include <net/sock.h>
  36 #include <linux/in.h>
  37 #include <linux/export.h>
  38 #include <linux/time.h>
  39 #include <linux/rds.h>
  40 
  41 #include "rds.h"
  42 
  43 void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn,
  44                  struct in6_addr *saddr)
  45 {
  46         refcount_set(&inc->i_refcount, 1);
  47         INIT_LIST_HEAD(&inc->i_item);
  48         inc->i_conn = conn;
  49         inc->i_saddr = *saddr;
  50         inc->i_usercopy.rdma_cookie = 0;
  51         inc->i_usercopy.rx_tstamp = ktime_set(0, 0);
  52 
  53         memset(inc->i_rx_lat_trace, 0, sizeof(inc->i_rx_lat_trace));
  54 }
  55 EXPORT_SYMBOL_GPL(rds_inc_init);
  56 
  57 void rds_inc_path_init(struct rds_incoming *inc, struct rds_conn_path *cp,
  58                        struct in6_addr  *saddr)
  59 {
  60         refcount_set(&inc->i_refcount, 1);
  61         INIT_LIST_HEAD(&inc->i_item);
  62         inc->i_conn = cp->cp_conn;
  63         inc->i_conn_path = cp;
  64         inc->i_saddr = *saddr;
  65         inc->i_usercopy.rdma_cookie = 0;
  66         inc->i_usercopy.rx_tstamp = ktime_set(0, 0);
  67 }
  68 EXPORT_SYMBOL_GPL(rds_inc_path_init);
  69 
  70 static void rds_inc_addref(struct rds_incoming *inc)
  71 {
  72         rdsdebug("addref inc %p ref %d\n", inc, refcount_read(&inc->i_refcount));
  73         refcount_inc(&inc->i_refcount);
  74 }
  75 
  76 void rds_inc_put(struct rds_incoming *inc)
  77 {
  78         rdsdebug("put inc %p ref %d\n", inc, refcount_read(&inc->i_refcount));
  79         if (refcount_dec_and_test(&inc->i_refcount)) {
  80                 BUG_ON(!list_empty(&inc->i_item));
  81 
  82                 inc->i_conn->c_trans->inc_free(inc);
  83         }
  84 }
  85 EXPORT_SYMBOL_GPL(rds_inc_put);
  86 
  87 static void rds_recv_rcvbuf_delta(struct rds_sock *rs, struct sock *sk,
  88                                   struct rds_cong_map *map,
  89                                   int delta, __be16 port)
  90 {
  91         int now_congested;
  92 
  93         if (delta == 0)
  94                 return;
  95 
  96         rs->rs_rcv_bytes += delta;
  97         if (delta > 0)
  98                 rds_stats_add(s_recv_bytes_added_to_socket, delta);
  99         else
 100                 rds_stats_add(s_recv_bytes_removed_from_socket, -delta);
 101 
 102         /* loop transport doesn't send/recv congestion updates */
 103         if (rs->rs_transport->t_type == RDS_TRANS_LOOP)
 104                 return;
 105 
 106         now_congested = rs->rs_rcv_bytes > rds_sk_rcvbuf(rs);
 107 
 108         rdsdebug("rs %p (%pI6c:%u) recv bytes %d buf %d "
 109           "now_cong %d delta %d\n",
 110           rs, &rs->rs_bound_addr,
 111           ntohs(rs->rs_bound_port), rs->rs_rcv_bytes,
 112           rds_sk_rcvbuf(rs), now_congested, delta);
 113 
 114         /* wasn't -> am congested */
 115         if (!rs->rs_congested && now_congested) {
 116                 rs->rs_congested = 1;
 117                 rds_cong_set_bit(map, port);
 118                 rds_cong_queue_updates(map);
 119         }
 120         /* was -> aren't congested */
 121         /* Require more free space before reporting uncongested to prevent
 122            bouncing cong/uncong state too often */
 123         else if (rs->rs_congested && (rs->rs_rcv_bytes < (rds_sk_rcvbuf(rs)/2))) {
 124                 rs->rs_congested = 0;
 125                 rds_cong_clear_bit(map, port);
 126                 rds_cong_queue_updates(map);
 127         }
 128 
 129         /* do nothing if no change in cong state */
 130 }
 131 
 132 static void rds_conn_peer_gen_update(struct rds_connection *conn,
 133                                      u32 peer_gen_num)
 134 {
 135         int i;
 136         struct rds_message *rm, *tmp;
 137         unsigned long flags;
 138 
 139         WARN_ON(conn->c_trans->t_type != RDS_TRANS_TCP);
 140         if (peer_gen_num != 0) {
 141                 if (conn->c_peer_gen_num != 0 &&
 142                     peer_gen_num != conn->c_peer_gen_num) {
 143                         for (i = 0; i < RDS_MPATH_WORKERS; i++) {
 144                                 struct rds_conn_path *cp;
 145 
 146                                 cp = &conn->c_path[i];
 147                                 spin_lock_irqsave(&cp->cp_lock, flags);
 148                                 cp->cp_next_tx_seq = 1;
 149                                 cp->cp_next_rx_seq = 0;
 150                                 list_for_each_entry_safe(rm, tmp,
 151                                                          &cp->cp_retrans,
 152                                                          m_conn_item) {
 153                                         set_bit(RDS_MSG_FLUSH, &rm->m_flags);
 154                                 }
 155                                 spin_unlock_irqrestore(&cp->cp_lock, flags);
 156                         }
 157                 }
 158                 conn->c_peer_gen_num = peer_gen_num;
 159         }
 160 }
 161 
 162 /*
 163  * Process all extension headers that come with this message.
 164  */
 165 static void rds_recv_incoming_exthdrs(struct rds_incoming *inc, struct rds_sock *rs)
 166 {
 167         struct rds_header *hdr = &inc->i_hdr;
 168         unsigned int pos = 0, type, len;
 169         union {
 170                 struct rds_ext_header_version version;
 171                 struct rds_ext_header_rdma rdma;
 172                 struct rds_ext_header_rdma_dest rdma_dest;
 173         } buffer;
 174 
 175         while (1) {
 176                 len = sizeof(buffer);
 177                 type = rds_message_next_extension(hdr, &pos, &buffer, &len);
 178                 if (type == RDS_EXTHDR_NONE)
 179                         break;
 180                 /* Process extension header here */
 181                 switch (type) {
 182                 case RDS_EXTHDR_RDMA:
 183                         rds_rdma_unuse(rs, be32_to_cpu(buffer.rdma.h_rdma_rkey), 0);
 184                         break;
 185 
 186                 case RDS_EXTHDR_RDMA_DEST:
 187                         /* We ignore the size for now. We could stash it
 188                          * somewhere and use it for error checking. */
 189                         inc->i_usercopy.rdma_cookie = rds_rdma_make_cookie(
 190                                         be32_to_cpu(buffer.rdma_dest.h_rdma_rkey),
 191                                         be32_to_cpu(buffer.rdma_dest.h_rdma_offset));
 192 
 193                         break;
 194                 }
 195         }
 196 }
 197 
 198 static void rds_recv_hs_exthdrs(struct rds_header *hdr,
 199                                 struct rds_connection *conn)
 200 {
 201         unsigned int pos = 0, type, len;
 202         union {
 203                 struct rds_ext_header_version version;
 204                 u16 rds_npaths;
 205                 u32 rds_gen_num;
 206         } buffer;
 207         u32 new_peer_gen_num = 0;
 208 
 209         while (1) {
 210                 len = sizeof(buffer);
 211                 type = rds_message_next_extension(hdr, &pos, &buffer, &len);
 212                 if (type == RDS_EXTHDR_NONE)
 213                         break;
 214                 /* Process extension header here */
 215                 switch (type) {
 216                 case RDS_EXTHDR_NPATHS:
 217                         conn->c_npaths = min_t(int, RDS_MPATH_WORKERS,
 218                                                be16_to_cpu(buffer.rds_npaths));
 219                         break;
 220                 case RDS_EXTHDR_GEN_NUM:
 221                         new_peer_gen_num = be32_to_cpu(buffer.rds_gen_num);
 222                         break;
 223                 default:
 224                         pr_warn_ratelimited("ignoring unknown exthdr type "
 225                                              "0x%x\n", type);
 226                 }
 227         }
 228         /* if RDS_EXTHDR_NPATHS was not found, default to a single-path */
 229         conn->c_npaths = max_t(int, conn->c_npaths, 1);
 230         conn->c_ping_triggered = 0;
 231         rds_conn_peer_gen_update(conn, new_peer_gen_num);
 232 }
 233 
 234 /* rds_start_mprds() will synchronously start multiple paths when appropriate.
 235  * The scheme is based on the following rules:
 236  *
 237  * 1. rds_sendmsg on first connect attempt sends the probe ping, with the
 238  *    sender's npaths (s_npaths)
 239  * 2. rcvr of probe-ping knows the mprds_paths = min(s_npaths, r_npaths). It
 240  *    sends back a probe-pong with r_npaths. After that, if rcvr is the
 241  *    smaller ip addr, it starts rds_conn_path_connect_if_down on all
 242  *    mprds_paths.
 243  * 3. sender gets woken up, and can move to rds_conn_path_connect_if_down.
 244  *    If it is the smaller ipaddr, rds_conn_path_connect_if_down can be
 245  *    called after reception of the probe-pong on all mprds_paths.
 246  *    Otherwise (sender of probe-ping is not the smaller ip addr): just call
 247  *    rds_conn_path_connect_if_down on the hashed path. (see rule 4)
 248  * 4. rds_connect_worker must only trigger a connection if laddr < faddr.
 249  * 5. sender may end up queuing the packet on the cp. will get sent out later.
 250  *    when connection is completed.
 251  */
 252 static void rds_start_mprds(struct rds_connection *conn)
 253 {
 254         int i;
 255         struct rds_conn_path *cp;
 256 
 257         if (conn->c_npaths > 1 &&
 258             rds_addr_cmp(&conn->c_laddr, &conn->c_faddr) < 0) {
 259                 for (i = 0; i < conn->c_npaths; i++) {
 260                         cp = &conn->c_path[i];
 261                         rds_conn_path_connect_if_down(cp);
 262                 }
 263         }
 264 }
 265 
 266 /*
 267  * The transport must make sure that this is serialized against other
 268  * rx and conn reset on this specific conn.
 269  *
 270  * We currently assert that only one fragmented message will be sent
 271  * down a connection at a time.  This lets us reassemble in the conn
 272  * instead of per-flow which means that we don't have to go digging through
 273  * flows to tear down partial reassembly progress on conn failure and
 274  * we save flow lookup and locking for each frag arrival.  It does mean
 275  * that small messages will wait behind large ones.  Fragmenting at all
 276  * is only to reduce the memory consumption of pre-posted buffers.
 277  *
 278  * The caller passes in saddr and daddr instead of us getting it from the
 279  * conn.  This lets loopback, who only has one conn for both directions,
 280  * tell us which roles the addrs in the conn are playing for this message.
 281  */
 282 void rds_recv_incoming(struct rds_connection *conn, struct in6_addr *saddr,
 283                        struct in6_addr *daddr,
 284                        struct rds_incoming *inc, gfp_t gfp)
 285 {
 286         struct rds_sock *rs = NULL;
 287         struct sock *sk;
 288         unsigned long flags;
 289         struct rds_conn_path *cp;
 290 
 291         inc->i_conn = conn;
 292         inc->i_rx_jiffies = jiffies;
 293         if (conn->c_trans->t_mp_capable)
 294                 cp = inc->i_conn_path;
 295         else
 296                 cp = &conn->c_path[0];
 297 
 298         rdsdebug("conn %p next %llu inc %p seq %llu len %u sport %u dport %u "
 299                  "flags 0x%x rx_jiffies %lu\n", conn,
 300                  (unsigned long long)cp->cp_next_rx_seq,
 301                  inc,
 302                  (unsigned long long)be64_to_cpu(inc->i_hdr.h_sequence),
 303                  be32_to_cpu(inc->i_hdr.h_len),
 304                  be16_to_cpu(inc->i_hdr.h_sport),
 305                  be16_to_cpu(inc->i_hdr.h_dport),
 306                  inc->i_hdr.h_flags,
 307                  inc->i_rx_jiffies);
 308 
 309         /*
 310          * Sequence numbers should only increase.  Messages get their
 311          * sequence number as they're queued in a sending conn.  They
 312          * can be dropped, though, if the sending socket is closed before
 313          * they hit the wire.  So sequence numbers can skip forward
 314          * under normal operation.  They can also drop back in the conn
 315          * failover case as previously sent messages are resent down the
 316          * new instance of a conn.  We drop those, otherwise we have
 317          * to assume that the next valid seq does not come after a
 318          * hole in the fragment stream.
 319          *
 320          * The headers don't give us a way to realize if fragments of
 321          * a message have been dropped.  We assume that frags that arrive
 322          * to a flow are part of the current message on the flow that is
 323          * being reassembled.  This means that senders can't drop messages
 324          * from the sending conn until all their frags are sent.
 325          *
 326          * XXX we could spend more on the wire to get more robust failure
 327          * detection, arguably worth it to avoid data corruption.
 328          */
 329         if (be64_to_cpu(inc->i_hdr.h_sequence) < cp->cp_next_rx_seq &&
 330             (inc->i_hdr.h_flags & RDS_FLAG_RETRANSMITTED)) {
 331                 rds_stats_inc(s_recv_drop_old_seq);
 332                 goto out;
 333         }
 334         cp->cp_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1;
 335 
 336         if (rds_sysctl_ping_enable && inc->i_hdr.h_dport == 0) {
 337                 if (inc->i_hdr.h_sport == 0) {
 338                         rdsdebug("ignore ping with 0 sport from %pI6c\n",
 339                                  saddr);
 340                         goto out;
 341                 }
 342                 rds_stats_inc(s_recv_ping);
 343                 rds_send_pong(cp, inc->i_hdr.h_sport);
 344                 /* if this is a handshake ping, start multipath if necessary */
 345                 if (RDS_HS_PROBE(be16_to_cpu(inc->i_hdr.h_sport),
 346                                  be16_to_cpu(inc->i_hdr.h_dport))) {
 347                         rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn);
 348                         rds_start_mprds(cp->cp_conn);
 349                 }
 350                 goto out;
 351         }
 352 
 353         if (be16_to_cpu(inc->i_hdr.h_dport) ==  RDS_FLAG_PROBE_PORT &&
 354             inc->i_hdr.h_sport == 0) {
 355                 rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn);
 356                 /* if this is a handshake pong, start multipath if necessary */
 357                 rds_start_mprds(cp->cp_conn);
 358                 wake_up(&cp->cp_conn->c_hs_waitq);
 359                 goto out;
 360         }
 361 
 362         rs = rds_find_bound(daddr, inc->i_hdr.h_dport, conn->c_bound_if);
 363         if (!rs) {
 364                 rds_stats_inc(s_recv_drop_no_sock);
 365                 goto out;
 366         }
 367 
 368         /* Process extension headers */
 369         rds_recv_incoming_exthdrs(inc, rs);
 370 
 371         /* We can be racing with rds_release() which marks the socket dead. */
 372         sk = rds_rs_to_sk(rs);
 373 
 374         /* serialize with rds_release -> sock_orphan */
 375         write_lock_irqsave(&rs->rs_recv_lock, flags);
 376         if (!sock_flag(sk, SOCK_DEAD)) {
 377                 rdsdebug("adding inc %p to rs %p's recv queue\n", inc, rs);
 378                 rds_stats_inc(s_recv_queued);
 379                 rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
 380                                       be32_to_cpu(inc->i_hdr.h_len),
 381                                       inc->i_hdr.h_dport);
 382                 if (sock_flag(sk, SOCK_RCVTSTAMP))
 383                         inc->i_usercopy.rx_tstamp = ktime_get_real();
 384                 rds_inc_addref(inc);
 385                 inc->i_rx_lat_trace[RDS_MSG_RX_END] = local_clock();
 386                 list_add_tail(&inc->i_item, &rs->rs_recv_queue);
 387                 __rds_wake_sk_sleep(sk);
 388         } else {
 389                 rds_stats_inc(s_recv_drop_dead_sock);
 390         }
 391         write_unlock_irqrestore(&rs->rs_recv_lock, flags);
 392 
 393 out:
 394         if (rs)
 395                 rds_sock_put(rs);
 396 }
 397 EXPORT_SYMBOL_GPL(rds_recv_incoming);
 398 
 399 /*
 400  * be very careful here.  This is being called as the condition in
 401  * wait_event_*() needs to cope with being called many times.
 402  */
 403 static int rds_next_incoming(struct rds_sock *rs, struct rds_incoming **inc)
 404 {
 405         unsigned long flags;
 406 
 407         if (!*inc) {
 408                 read_lock_irqsave(&rs->rs_recv_lock, flags);
 409                 if (!list_empty(&rs->rs_recv_queue)) {
 410                         *inc = list_entry(rs->rs_recv_queue.next,
 411                                           struct rds_incoming,
 412                                           i_item);
 413                         rds_inc_addref(*inc);
 414                 }
 415                 read_unlock_irqrestore(&rs->rs_recv_lock, flags);
 416         }
 417 
 418         return *inc != NULL;
 419 }
 420 
 421 static int rds_still_queued(struct rds_sock *rs, struct rds_incoming *inc,
 422                             int drop)
 423 {
 424         struct sock *sk = rds_rs_to_sk(rs);
 425         int ret = 0;
 426         unsigned long flags;
 427 
 428         write_lock_irqsave(&rs->rs_recv_lock, flags);
 429         if (!list_empty(&inc->i_item)) {
 430                 ret = 1;
 431                 if (drop) {
 432                         /* XXX make sure this i_conn is reliable */
 433                         rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
 434                                               -be32_to_cpu(inc->i_hdr.h_len),
 435                                               inc->i_hdr.h_dport);
 436                         list_del_init(&inc->i_item);
 437                         rds_inc_put(inc);
 438                 }
 439         }
 440         write_unlock_irqrestore(&rs->rs_recv_lock, flags);
 441 
 442         rdsdebug("inc %p rs %p still %d dropped %d\n", inc, rs, ret, drop);
 443         return ret;
 444 }
 445 
 446 /*
 447  * Pull errors off the error queue.
 448  * If msghdr is NULL, we will just purge the error queue.
 449  */
 450 int rds_notify_queue_get(struct rds_sock *rs, struct msghdr *msghdr)
 451 {
 452         struct rds_notifier *notifier;
 453         struct rds_rdma_notify cmsg = { 0 }; /* fill holes with zero */
 454         unsigned int count = 0, max_messages = ~0U;
 455         unsigned long flags;
 456         LIST_HEAD(copy);
 457         int err = 0;
 458 
 459 
 460         /* put_cmsg copies to user space and thus may sleep. We can't do this
 461          * with rs_lock held, so first grab as many notifications as we can stuff
 462          * in the user provided cmsg buffer. We don't try to copy more, to avoid
 463          * losing notifications - except when the buffer is so small that it wouldn't
 464          * even hold a single notification. Then we give him as much of this single
 465          * msg as we can squeeze in, and set MSG_CTRUNC.
 466          */
 467         if (msghdr) {
 468                 max_messages = msghdr->msg_controllen / CMSG_SPACE(sizeof(cmsg));
 469                 if (!max_messages)
 470                         max_messages = 1;
 471         }
 472 
 473         spin_lock_irqsave(&rs->rs_lock, flags);
 474         while (!list_empty(&rs->rs_notify_queue) && count < max_messages) {
 475                 notifier = list_entry(rs->rs_notify_queue.next,
 476                                 struct rds_notifier, n_list);
 477                 list_move(&notifier->n_list, &copy);
 478                 count++;
 479         }
 480         spin_unlock_irqrestore(&rs->rs_lock, flags);
 481 
 482         if (!count)
 483                 return 0;
 484 
 485         while (!list_empty(&copy)) {
 486                 notifier = list_entry(copy.next, struct rds_notifier, n_list);
 487 
 488                 if (msghdr) {
 489                         cmsg.user_token = notifier->n_user_token;
 490                         cmsg.status = notifier->n_status;
 491 
 492                         err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_RDMA_STATUS,
 493                                        sizeof(cmsg), &cmsg);
 494                         if (err)
 495                                 break;
 496                 }
 497 
 498                 list_del_init(&notifier->n_list);
 499                 kfree(notifier);
 500         }
 501 
 502         /* If we bailed out because of an error in put_cmsg,
 503          * we may be left with one or more notifications that we
 504          * didn't process. Return them to the head of the list. */
 505         if (!list_empty(&copy)) {
 506                 spin_lock_irqsave(&rs->rs_lock, flags);
 507                 list_splice(&copy, &rs->rs_notify_queue);
 508                 spin_unlock_irqrestore(&rs->rs_lock, flags);
 509         }
 510 
 511         return err;
 512 }
 513 
 514 /*
 515  * Queue a congestion notification
 516  */
 517 static int rds_notify_cong(struct rds_sock *rs, struct msghdr *msghdr)
 518 {
 519         uint64_t notify = rs->rs_cong_notify;
 520         unsigned long flags;
 521         int err;
 522 
 523         err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_CONG_UPDATE,
 524                         sizeof(notify), &notify);
 525         if (err)
 526                 return err;
 527 
 528         spin_lock_irqsave(&rs->rs_lock, flags);
 529         rs->rs_cong_notify &= ~notify;
 530         spin_unlock_irqrestore(&rs->rs_lock, flags);
 531 
 532         return 0;
 533 }
 534 
 535 /*
 536  * Receive any control messages.
 537  */
 538 static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg,
 539                          struct rds_sock *rs)
 540 {
 541         int ret = 0;
 542 
 543         if (inc->i_usercopy.rdma_cookie) {
 544                 ret = put_cmsg(msg, SOL_RDS, RDS_CMSG_RDMA_DEST,
 545                                 sizeof(inc->i_usercopy.rdma_cookie),
 546                                 &inc->i_usercopy.rdma_cookie);
 547                 if (ret)
 548                         goto out;
 549         }
 550 
 551         if ((inc->i_usercopy.rx_tstamp != 0) &&
 552             sock_flag(rds_rs_to_sk(rs), SOCK_RCVTSTAMP)) {
 553                 struct __kernel_old_timeval tv =
 554                         ns_to_kernel_old_timeval(inc->i_usercopy.rx_tstamp);
 555 
 556                 if (!sock_flag(rds_rs_to_sk(rs), SOCK_TSTAMP_NEW)) {
 557                         ret = put_cmsg(msg, SOL_SOCKET, SO_TIMESTAMP_OLD,
 558                                        sizeof(tv), &tv);
 559                 } else {
 560                         struct __kernel_sock_timeval sk_tv;
 561 
 562                         sk_tv.tv_sec = tv.tv_sec;
 563                         sk_tv.tv_usec = tv.tv_usec;
 564 
 565                         ret = put_cmsg(msg, SOL_SOCKET, SO_TIMESTAMP_NEW,
 566                                        sizeof(sk_tv), &sk_tv);
 567                 }
 568 
 569                 if (ret)
 570                         goto out;
 571         }
 572 
 573         if (rs->rs_rx_traces) {
 574                 struct rds_cmsg_rx_trace t;
 575                 int i, j;
 576 
 577                 memset(&t, 0, sizeof(t));
 578                 inc->i_rx_lat_trace[RDS_MSG_RX_CMSG] = local_clock();
 579                 t.rx_traces =  rs->rs_rx_traces;
 580                 for (i = 0; i < rs->rs_rx_traces; i++) {
 581                         j = rs->rs_rx_trace[i];
 582                         t.rx_trace_pos[i] = j;
 583                         t.rx_trace[i] = inc->i_rx_lat_trace[j + 1] -
 584                                           inc->i_rx_lat_trace[j];
 585                 }
 586 
 587                 ret = put_cmsg(msg, SOL_RDS, RDS_CMSG_RXPATH_LATENCY,
 588                                sizeof(t), &t);
 589                 if (ret)
 590                         goto out;
 591         }
 592 
 593 out:
 594         return ret;
 595 }
 596 
 597 static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg)
 598 {
 599         struct rds_msg_zcopy_queue *q = &rs->rs_zcookie_queue;
 600         struct rds_msg_zcopy_info *info = NULL;
 601         struct rds_zcopy_cookies *done;
 602         unsigned long flags;
 603 
 604         if (!msg->msg_control)
 605                 return false;
 606 
 607         if (!sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY) ||
 608             msg->msg_controllen < CMSG_SPACE(sizeof(*done)))
 609                 return false;
 610 
 611         spin_lock_irqsave(&q->lock, flags);
 612         if (!list_empty(&q->zcookie_head)) {
 613                 info = list_entry(q->zcookie_head.next,
 614                                   struct rds_msg_zcopy_info, rs_zcookie_next);
 615                 list_del(&info->rs_zcookie_next);
 616         }
 617         spin_unlock_irqrestore(&q->lock, flags);
 618         if (!info)
 619                 return false;
 620         done = &info->zcookies;
 621         if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done),
 622                      done)) {
 623                 spin_lock_irqsave(&q->lock, flags);
 624                 list_add(&info->rs_zcookie_next, &q->zcookie_head);
 625                 spin_unlock_irqrestore(&q->lock, flags);
 626                 return false;
 627         }
 628         kfree(info);
 629         return true;
 630 }
 631 
 632 int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
 633                 int msg_flags)
 634 {
 635         struct sock *sk = sock->sk;
 636         struct rds_sock *rs = rds_sk_to_rs(sk);
 637         long timeo;
 638         int ret = 0, nonblock = msg_flags & MSG_DONTWAIT;
 639         DECLARE_SOCKADDR(struct sockaddr_in6 *, sin6, msg->msg_name);
 640         DECLARE_SOCKADDR(struct sockaddr_in *, sin, msg->msg_name);
 641         struct rds_incoming *inc = NULL;
 642 
 643         /* udp_recvmsg()->sock_recvtimeo() gets away without locking too.. */
 644         timeo = sock_rcvtimeo(sk, nonblock);
 645 
 646         rdsdebug("size %zu flags 0x%x timeo %ld\n", size, msg_flags, timeo);
 647 
 648         if (msg_flags & MSG_OOB)
 649                 goto out;
 650         if (msg_flags & MSG_ERRQUEUE)
 651                 return sock_recv_errqueue(sk, msg, size, SOL_IP, IP_RECVERR);
 652 
 653         while (1) {
 654                 /* If there are pending notifications, do those - and nothing else */
 655                 if (!list_empty(&rs->rs_notify_queue)) {
 656                         ret = rds_notify_queue_get(rs, msg);
 657                         break;
 658                 }
 659 
 660                 if (rs->rs_cong_notify) {
 661                         ret = rds_notify_cong(rs, msg);
 662                         break;
 663                 }
 664 
 665                 if (!rds_next_incoming(rs, &inc)) {
 666                         if (nonblock) {
 667                                 bool reaped = rds_recvmsg_zcookie(rs, msg);
 668 
 669                                 ret = reaped ?  0 : -EAGAIN;
 670                                 break;
 671                         }
 672 
 673                         timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
 674                                         (!list_empty(&rs->rs_notify_queue) ||
 675                                          rs->rs_cong_notify ||
 676                                          rds_next_incoming(rs, &inc)), timeo);
 677                         rdsdebug("recvmsg woke inc %p timeo %ld\n", inc,
 678                                  timeo);
 679                         if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
 680                                 continue;
 681 
 682                         ret = timeo;
 683                         if (ret == 0)
 684                                 ret = -ETIMEDOUT;
 685                         break;
 686                 }
 687 
 688                 rdsdebug("copying inc %p from %pI6c:%u to user\n", inc,
 689                          &inc->i_conn->c_faddr,
 690                          ntohs(inc->i_hdr.h_sport));
 691                 ret = inc->i_conn->c_trans->inc_copy_to_user(inc, &msg->msg_iter);
 692                 if (ret < 0)
 693                         break;
 694 
 695                 /*
 696                  * if the message we just copied isn't at the head of the
 697                  * recv queue then someone else raced us to return it, try
 698                  * to get the next message.
 699                  */
 700                 if (!rds_still_queued(rs, inc, !(msg_flags & MSG_PEEK))) {
 701                         rds_inc_put(inc);
 702                         inc = NULL;
 703                         rds_stats_inc(s_recv_deliver_raced);
 704                         iov_iter_revert(&msg->msg_iter, ret);
 705                         continue;
 706                 }
 707 
 708                 if (ret < be32_to_cpu(inc->i_hdr.h_len)) {
 709                         if (msg_flags & MSG_TRUNC)
 710                                 ret = be32_to_cpu(inc->i_hdr.h_len);
 711                         msg->msg_flags |= MSG_TRUNC;
 712                 }
 713 
 714                 if (rds_cmsg_recv(inc, msg, rs)) {
 715                         ret = -EFAULT;
 716                         goto out;
 717                 }
 718                 rds_recvmsg_zcookie(rs, msg);
 719 
 720                 rds_stats_inc(s_recv_delivered);
 721 
 722                 if (msg->msg_name) {
 723                         if (ipv6_addr_v4mapped(&inc->i_saddr)) {
 724                                 sin = (struct sockaddr_in *)msg->msg_name;
 725 
 726                                 sin->sin_family = AF_INET;
 727                                 sin->sin_port = inc->i_hdr.h_sport;
 728                                 sin->sin_addr.s_addr =
 729                                     inc->i_saddr.s6_addr32[3];
 730                                 memset(sin->sin_zero, 0, sizeof(sin->sin_zero));
 731                                 msg->msg_namelen = sizeof(*sin);
 732                         } else {
 733                                 sin6 = (struct sockaddr_in6 *)msg->msg_name;
 734 
 735                                 sin6->sin6_family = AF_INET6;
 736                                 sin6->sin6_port = inc->i_hdr.h_sport;
 737                                 sin6->sin6_addr = inc->i_saddr;
 738                                 sin6->sin6_flowinfo = 0;
 739                                 sin6->sin6_scope_id = rs->rs_bound_scope_id;
 740                                 msg->msg_namelen = sizeof(*sin6);
 741                         }
 742                 }
 743                 break;
 744         }
 745 
 746         if (inc)
 747                 rds_inc_put(inc);
 748 
 749 out:
 750         return ret;
 751 }
 752 
 753 /*
 754  * The socket is being shut down and we're asked to drop messages that were
 755  * queued for recvmsg.  The caller has unbound the socket so the receive path
 756  * won't queue any more incoming fragments or messages on the socket.
 757  */
 758 void rds_clear_recv_queue(struct rds_sock *rs)
 759 {
 760         struct sock *sk = rds_rs_to_sk(rs);
 761         struct rds_incoming *inc, *tmp;
 762         unsigned long flags;
 763 
 764         write_lock_irqsave(&rs->rs_recv_lock, flags);
 765         list_for_each_entry_safe(inc, tmp, &rs->rs_recv_queue, i_item) {
 766                 rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
 767                                       -be32_to_cpu(inc->i_hdr.h_len),
 768                                       inc->i_hdr.h_dport);
 769                 list_del_init(&inc->i_item);
 770                 rds_inc_put(inc);
 771         }
 772         write_unlock_irqrestore(&rs->rs_recv_lock, flags);
 773 }
 774 
 775 /*
 776  * inc->i_saddr isn't used here because it is only set in the receive
 777  * path.
 778  */
 779 void rds_inc_info_copy(struct rds_incoming *inc,
 780                        struct rds_info_iterator *iter,
 781                        __be32 saddr, __be32 daddr, int flip)
 782 {
 783         struct rds_info_message minfo;
 784 
 785         minfo.seq = be64_to_cpu(inc->i_hdr.h_sequence);
 786         minfo.len = be32_to_cpu(inc->i_hdr.h_len);
 787         minfo.tos = inc->i_conn->c_tos;
 788 
 789         if (flip) {
 790                 minfo.laddr = daddr;
 791                 minfo.faddr = saddr;
 792                 minfo.lport = inc->i_hdr.h_dport;
 793                 minfo.fport = inc->i_hdr.h_sport;
 794         } else {
 795                 minfo.laddr = saddr;
 796                 minfo.faddr = daddr;
 797                 minfo.lport = inc->i_hdr.h_sport;
 798                 minfo.fport = inc->i_hdr.h_dport;
 799         }
 800 
 801         minfo.flags = 0;
 802 
 803         rds_info_copy(iter, &minfo, sizeof(minfo));
 804 }
 805 
 806 #if IS_ENABLED(CONFIG_IPV6)
 807 void rds6_inc_info_copy(struct rds_incoming *inc,
 808                         struct rds_info_iterator *iter,
 809                         struct in6_addr *saddr, struct in6_addr *daddr,
 810                         int flip)
 811 {
 812         struct rds6_info_message minfo6;
 813 
 814         minfo6.seq = be64_to_cpu(inc->i_hdr.h_sequence);
 815         minfo6.len = be32_to_cpu(inc->i_hdr.h_len);
 816         minfo6.tos = inc->i_conn->c_tos;
 817 
 818         if (flip) {
 819                 minfo6.laddr = *daddr;
 820                 minfo6.faddr = *saddr;
 821                 minfo6.lport = inc->i_hdr.h_dport;
 822                 minfo6.fport = inc->i_hdr.h_sport;
 823         } else {
 824                 minfo6.laddr = *saddr;
 825                 minfo6.faddr = *daddr;
 826                 minfo6.lport = inc->i_hdr.h_sport;
 827                 minfo6.fport = inc->i_hdr.h_dport;
 828         }
 829 
 830         minfo6.flags = 0;
 831 
 832         rds_info_copy(iter, &minfo6, sizeof(minfo6));
 833 }
 834 #endif

/* [<][>][^][v][top][bottom][index][help] */