1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36 
37 #include "socklnd.h"
38 
39 int
ksocknal_lib_get_conn_addrs(ksock_conn_t * conn)40 ksocknal_lib_get_conn_addrs(ksock_conn_t *conn)
41 {
42 	int rc = lnet_sock_getaddr(conn->ksnc_sock, 1, &conn->ksnc_ipaddr,
43 				   &conn->ksnc_port);
44 
45 	/* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
46 	LASSERT(!conn->ksnc_closing);
47 
48 	if (rc != 0) {
49 		CERROR("Error %d getting sock peer IP\n", rc);
50 		return rc;
51 	}
52 
53 	rc = lnet_sock_getaddr(conn->ksnc_sock, 0, &conn->ksnc_myipaddr, NULL);
54 	if (rc != 0) {
55 		CERROR("Error %d getting sock local IP\n", rc);
56 		return rc;
57 	}
58 
59 	return 0;
60 }
61 
62 int
ksocknal_lib_zc_capable(ksock_conn_t * conn)63 ksocknal_lib_zc_capable(ksock_conn_t *conn)
64 {
65 	int caps = conn->ksnc_sock->sk->sk_route_caps;
66 
67 	if (conn->ksnc_proto == &ksocknal_protocol_v1x)
68 		return 0;
69 
70 	/* ZC if the socket supports scatter/gather and doesn't need software
71 	 * checksums */
72 	return ((caps & NETIF_F_SG) != 0 && (caps & NETIF_F_ALL_CSUM) != 0);
73 }
74 
75 int
ksocknal_lib_send_iov(ksock_conn_t * conn,ksock_tx_t * tx)76 ksocknal_lib_send_iov(ksock_conn_t *conn, ksock_tx_t *tx)
77 {
78 	struct socket *sock = conn->ksnc_sock;
79 	int nob;
80 	int rc;
81 
82 	if (*ksocknal_tunables.ksnd_enable_csum	&& /* checksum enabled */
83 	    conn->ksnc_proto == &ksocknal_protocol_v2x && /* V2.x connection  */
84 	    tx->tx_nob == tx->tx_resid		 && /* frist sending    */
85 	    tx->tx_msg.ksm_csum == 0)		     /* not checksummed  */
86 		ksocknal_lib_csum_tx(tx);
87 
88 	/* NB we can't trust socket ops to either consume our iovs
89 	 * or leave them alone. */
90 
91 	{
92 #if SOCKNAL_SINGLE_FRAG_TX
93 		struct kvec scratch;
94 		struct kvec *scratchiov = &scratch;
95 		unsigned int niov = 1;
96 #else
97 		struct kvec *scratchiov = conn->ksnc_scheduler->kss_scratch_iov;
98 		unsigned int niov = tx->tx_niov;
99 #endif
100 		struct msghdr msg = {.msg_flags = MSG_DONTWAIT};
101 		int i;
102 
103 		for (nob = i = 0; i < niov; i++) {
104 			scratchiov[i] = tx->tx_iov[i];
105 			nob += scratchiov[i].iov_len;
106 		}
107 
108 		if (!list_empty(&conn->ksnc_tx_queue) ||
109 		    nob < tx->tx_resid)
110 			msg.msg_flags |= MSG_MORE;
111 
112 		rc = kernel_sendmsg(sock, &msg, scratchiov, niov, nob);
113 	}
114 	return rc;
115 }
116 
117 int
ksocknal_lib_send_kiov(ksock_conn_t * conn,ksock_tx_t * tx)118 ksocknal_lib_send_kiov(ksock_conn_t *conn, ksock_tx_t *tx)
119 {
120 	struct socket *sock = conn->ksnc_sock;
121 	lnet_kiov_t *kiov = tx->tx_kiov;
122 	int rc;
123 	int nob;
124 
125 	/* Not NOOP message */
126 	LASSERT(tx->tx_lnetmsg != NULL);
127 
128 	/* NB we can't trust socket ops to either consume our iovs
129 	 * or leave them alone. */
130 	if (tx->tx_msg.ksm_zc_cookies[0] != 0) {
131 		/* Zero copy is enabled */
132 		struct sock *sk = sock->sk;
133 		struct page *page = kiov->kiov_page;
134 		int offset = kiov->kiov_offset;
135 		int fragsize = kiov->kiov_len;
136 		int msgflg = MSG_DONTWAIT;
137 
138 		CDEBUG(D_NET, "page %p + offset %x for %d\n",
139 			       page, offset, kiov->kiov_len);
140 
141 		if (!list_empty(&conn->ksnc_tx_queue) ||
142 		    fragsize < tx->tx_resid)
143 			msgflg |= MSG_MORE;
144 
145 		if (sk->sk_prot->sendpage != NULL) {
146 			rc = sk->sk_prot->sendpage(sk, page,
147 						   offset, fragsize, msgflg);
148 		} else {
149 			rc = tcp_sendpage(sk, page, offset, fragsize, msgflg);
150 		}
151 	} else {
152 #if SOCKNAL_SINGLE_FRAG_TX || !SOCKNAL_RISK_KMAP_DEADLOCK
153 		struct kvec scratch;
154 		struct kvec *scratchiov = &scratch;
155 		unsigned int niov = 1;
156 #else
157 #ifdef CONFIG_HIGHMEM
158 #warning "XXX risk of kmap deadlock on multiple frags..."
159 #endif
160 		struct kvec *scratchiov = conn->ksnc_scheduler->kss_scratch_iov;
161 		unsigned int niov = tx->tx_nkiov;
162 #endif
163 		struct msghdr msg = {.msg_flags = MSG_DONTWAIT};
164 		int i;
165 
166 		for (nob = i = 0; i < niov; i++) {
167 			scratchiov[i].iov_base = kmap(kiov[i].kiov_page) +
168 						 kiov[i].kiov_offset;
169 			nob += scratchiov[i].iov_len = kiov[i].kiov_len;
170 		}
171 
172 		if (!list_empty(&conn->ksnc_tx_queue) ||
173 		    nob < tx->tx_resid)
174 			msg.msg_flags |= MSG_MORE;
175 
176 		rc = kernel_sendmsg(sock, &msg, (struct kvec *)scratchiov, niov, nob);
177 
178 		for (i = 0; i < niov; i++)
179 			kunmap(kiov[i].kiov_page);
180 	}
181 	return rc;
182 }
183 
184 void
ksocknal_lib_eager_ack(ksock_conn_t * conn)185 ksocknal_lib_eager_ack(ksock_conn_t *conn)
186 {
187 	int opt = 1;
188 	struct socket *sock = conn->ksnc_sock;
189 
190 	/* Remind the socket to ACK eagerly.  If I don't, the socket might
191 	 * think I'm about to send something it could piggy-back the ACK
192 	 * on, introducing delay in completing zero-copy sends in my
193 	 * peer. */
194 
195 	kernel_setsockopt(sock, SOL_TCP, TCP_QUICKACK,
196 			       (char *)&opt, sizeof(opt));
197 }
198 
199 int
ksocknal_lib_recv_iov(ksock_conn_t * conn)200 ksocknal_lib_recv_iov(ksock_conn_t *conn)
201 {
202 #if SOCKNAL_SINGLE_FRAG_RX
203 	struct kvec scratch;
204 	struct kvec *scratchiov = &scratch;
205 	unsigned int niov = 1;
206 #else
207 	struct kvec *scratchiov = conn->ksnc_scheduler->kss_scratch_iov;
208 	unsigned int niov = conn->ksnc_rx_niov;
209 #endif
210 	struct kvec *iov = conn->ksnc_rx_iov;
211 	struct msghdr msg = {
212 		.msg_flags = 0
213 	};
214 	int nob;
215 	int i;
216 	int rc;
217 	int fragnob;
218 	int sum;
219 	__u32 saved_csum;
220 
221 	/* NB we can't trust socket ops to either consume our iovs
222 	 * or leave them alone. */
223 	LASSERT(niov > 0);
224 
225 	for (nob = i = 0; i < niov; i++) {
226 		scratchiov[i] = iov[i];
227 		nob += scratchiov[i].iov_len;
228 	}
229 	LASSERT(nob <= conn->ksnc_rx_nob_wanted);
230 
231 	rc = kernel_recvmsg(conn->ksnc_sock, &msg,
232 		scratchiov, niov, nob, MSG_DONTWAIT);
233 
234 	saved_csum = 0;
235 	if (conn->ksnc_proto == &ksocknal_protocol_v2x) {
236 		saved_csum = conn->ksnc_msg.ksm_csum;
237 		conn->ksnc_msg.ksm_csum = 0;
238 	}
239 
240 	if (saved_csum != 0) {
241 		/* accumulate checksum */
242 		for (i = 0, sum = rc; sum > 0; i++, sum -= fragnob) {
243 			LASSERT(i < niov);
244 
245 			fragnob = iov[i].iov_len;
246 			if (fragnob > sum)
247 				fragnob = sum;
248 
249 			conn->ksnc_rx_csum = ksocknal_csum(conn->ksnc_rx_csum,
250 							   iov[i].iov_base, fragnob);
251 		}
252 		conn->ksnc_msg.ksm_csum = saved_csum;
253 	}
254 
255 	return rc;
256 }
257 
258 static void
ksocknal_lib_kiov_vunmap(void * addr)259 ksocknal_lib_kiov_vunmap(void *addr)
260 {
261 	if (addr == NULL)
262 		return;
263 
264 	vunmap(addr);
265 }
266 
267 static void *
ksocknal_lib_kiov_vmap(lnet_kiov_t * kiov,int niov,struct kvec * iov,struct page ** pages)268 ksocknal_lib_kiov_vmap(lnet_kiov_t *kiov, int niov,
269 		       struct kvec *iov, struct page **pages)
270 {
271 	void *addr;
272 	int nob;
273 	int i;
274 
275 	if (!*ksocknal_tunables.ksnd_zc_recv || pages == NULL)
276 		return NULL;
277 
278 	LASSERT(niov <= LNET_MAX_IOV);
279 
280 	if (niov < 2 ||
281 	    niov < *ksocknal_tunables.ksnd_zc_recv_min_nfrags)
282 		return NULL;
283 
284 	for (nob = i = 0; i < niov; i++) {
285 		if ((kiov[i].kiov_offset != 0 && i > 0) ||
286 		    (kiov[i].kiov_offset + kiov[i].kiov_len != PAGE_CACHE_SIZE && i < niov - 1))
287 			return NULL;
288 
289 		pages[i] = kiov[i].kiov_page;
290 		nob += kiov[i].kiov_len;
291 	}
292 
293 	addr = vmap(pages, niov, VM_MAP, PAGE_KERNEL);
294 	if (addr == NULL)
295 		return NULL;
296 
297 	iov->iov_base = addr + kiov[0].kiov_offset;
298 	iov->iov_len = nob;
299 
300 	return addr;
301 }
302 
303 int
ksocknal_lib_recv_kiov(ksock_conn_t * conn)304 ksocknal_lib_recv_kiov(ksock_conn_t *conn)
305 {
306 #if SOCKNAL_SINGLE_FRAG_RX || !SOCKNAL_RISK_KMAP_DEADLOCK
307 	struct kvec scratch;
308 	struct kvec *scratchiov = &scratch;
309 	struct page **pages = NULL;
310 	unsigned int niov = 1;
311 #else
312 #ifdef CONFIG_HIGHMEM
313 #warning "XXX risk of kmap deadlock on multiple frags..."
314 #endif
315 	struct kvec *scratchiov = conn->ksnc_scheduler->kss_scratch_iov;
316 	struct page **pages = conn->ksnc_scheduler->kss_rx_scratch_pgs;
317 	unsigned int niov = conn->ksnc_rx_nkiov;
318 #endif
319 	lnet_kiov_t   *kiov = conn->ksnc_rx_kiov;
320 	struct msghdr msg = {
321 		.msg_flags = 0
322 	};
323 	int nob;
324 	int i;
325 	int rc;
326 	void *base;
327 	void *addr;
328 	int sum;
329 	int fragnob;
330 	int n;
331 
332 	/* NB we can't trust socket ops to either consume our iovs
333 	 * or leave them alone. */
334 	addr = ksocknal_lib_kiov_vmap(kiov, niov, scratchiov, pages);
335 	if (addr != NULL) {
336 		nob = scratchiov[0].iov_len;
337 		n = 1;
338 
339 	} else {
340 		for (nob = i = 0; i < niov; i++) {
341 			nob += scratchiov[i].iov_len = kiov[i].kiov_len;
342 			scratchiov[i].iov_base = kmap(kiov[i].kiov_page) +
343 						 kiov[i].kiov_offset;
344 		}
345 		n = niov;
346 	}
347 
348 	LASSERT(nob <= conn->ksnc_rx_nob_wanted);
349 
350 	rc = kernel_recvmsg(conn->ksnc_sock, &msg,
351 			(struct kvec *)scratchiov, n, nob, MSG_DONTWAIT);
352 
353 	if (conn->ksnc_msg.ksm_csum != 0) {
354 		for (i = 0, sum = rc; sum > 0; i++, sum -= fragnob) {
355 			LASSERT(i < niov);
356 
357 			/* Dang! have to kmap again because I have nowhere to
358 			 * stash the mapped address.  But by doing it while the
359 			 * page is still mapped, the kernel just bumps the map
360 			 * count and returns me the address it stashed. */
361 			base = kmap(kiov[i].kiov_page) + kiov[i].kiov_offset;
362 			fragnob = kiov[i].kiov_len;
363 			if (fragnob > sum)
364 				fragnob = sum;
365 
366 			conn->ksnc_rx_csum = ksocknal_csum(conn->ksnc_rx_csum,
367 							   base, fragnob);
368 
369 			kunmap(kiov[i].kiov_page);
370 		}
371 	}
372 
373 	if (addr != NULL) {
374 		ksocknal_lib_kiov_vunmap(addr);
375 	} else {
376 		for (i = 0; i < niov; i++)
377 			kunmap(kiov[i].kiov_page);
378 	}
379 
380 	return rc;
381 }
382 
383 void
ksocknal_lib_csum_tx(ksock_tx_t * tx)384 ksocknal_lib_csum_tx(ksock_tx_t *tx)
385 {
386 	int i;
387 	__u32 csum;
388 	void *base;
389 
390 	LASSERT(tx->tx_iov[0].iov_base == &tx->tx_msg);
391 	LASSERT(tx->tx_conn != NULL);
392 	LASSERT(tx->tx_conn->ksnc_proto == &ksocknal_protocol_v2x);
393 
394 	tx->tx_msg.ksm_csum = 0;
395 
396 	csum = ksocknal_csum(~0, tx->tx_iov[0].iov_base,
397 			     tx->tx_iov[0].iov_len);
398 
399 	if (tx->tx_kiov != NULL) {
400 		for (i = 0; i < tx->tx_nkiov; i++) {
401 			base = kmap(tx->tx_kiov[i].kiov_page) +
402 			       tx->tx_kiov[i].kiov_offset;
403 
404 			csum = ksocknal_csum(csum, base, tx->tx_kiov[i].kiov_len);
405 
406 			kunmap(tx->tx_kiov[i].kiov_page);
407 		}
408 	} else {
409 		for (i = 1; i < tx->tx_niov; i++)
410 			csum = ksocknal_csum(csum, tx->tx_iov[i].iov_base,
411 					     tx->tx_iov[i].iov_len);
412 	}
413 
414 	if (*ksocknal_tunables.ksnd_inject_csum_error) {
415 		csum++;
416 		*ksocknal_tunables.ksnd_inject_csum_error = 0;
417 	}
418 
419 	tx->tx_msg.ksm_csum = csum;
420 }
421 
422 int
ksocknal_lib_get_conn_tunables(ksock_conn_t * conn,int * txmem,int * rxmem,int * nagle)423 ksocknal_lib_get_conn_tunables(ksock_conn_t *conn, int *txmem, int *rxmem, int *nagle)
424 {
425 	struct socket *sock = conn->ksnc_sock;
426 	int len;
427 	int rc;
428 
429 	rc = ksocknal_connsock_addref(conn);
430 	if (rc != 0) {
431 		LASSERT(conn->ksnc_closing);
432 		*txmem = *rxmem = *nagle = 0;
433 		return -ESHUTDOWN;
434 	}
435 
436 	rc = lnet_sock_getbuf(sock, txmem, rxmem);
437 	if (rc == 0) {
438 		len = sizeof(*nagle);
439 		rc = kernel_getsockopt(sock, SOL_TCP, TCP_NODELAY,
440 					   (char *)nagle, &len);
441 	}
442 
443 	ksocknal_connsock_decref(conn);
444 
445 	if (rc == 0)
446 		*nagle = !*nagle;
447 	else
448 		*txmem = *rxmem = *nagle = 0;
449 
450 	return rc;
451 }
452 
453 int
ksocknal_lib_setup_sock(struct socket * sock)454 ksocknal_lib_setup_sock(struct socket *sock)
455 {
456 	int rc;
457 	int option;
458 	int keep_idle;
459 	int keep_intvl;
460 	int keep_count;
461 	int do_keepalive;
462 	struct linger linger;
463 
464 	sock->sk->sk_allocation = GFP_NOFS;
465 
466 	/* Ensure this socket aborts active sends immediately when we close
467 	 * it. */
468 
469 	linger.l_onoff = 0;
470 	linger.l_linger = 0;
471 
472 	rc = kernel_setsockopt(sock, SOL_SOCKET, SO_LINGER,
473 			      (char *)&linger, sizeof(linger));
474 	if (rc != 0) {
475 		CERROR("Can't set SO_LINGER: %d\n", rc);
476 		return rc;
477 	}
478 
479 	option = -1;
480 	rc = kernel_setsockopt(sock, SOL_TCP, TCP_LINGER2,
481 				    (char *)&option, sizeof(option));
482 	if (rc != 0) {
483 		CERROR("Can't set SO_LINGER2: %d\n", rc);
484 		return rc;
485 	}
486 
487 	if (!*ksocknal_tunables.ksnd_nagle) {
488 		option = 1;
489 
490 		rc = kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY,
491 					    (char *)&option, sizeof(option));
492 		if (rc != 0) {
493 			CERROR("Can't disable nagle: %d\n", rc);
494 			return rc;
495 		}
496 	}
497 
498 	rc = lnet_sock_setbuf(sock, *ksocknal_tunables.ksnd_tx_buffer_size,
499 			      *ksocknal_tunables.ksnd_rx_buffer_size);
500 	if (rc != 0) {
501 		CERROR("Can't set buffer tx %d, rx %d buffers: %d\n",
502 			*ksocknal_tunables.ksnd_tx_buffer_size,
503 			*ksocknal_tunables.ksnd_rx_buffer_size, rc);
504 		return rc;
505 	}
506 
507 /* TCP_BACKOFF_* sockopt tunables unsupported in stock kernels */
508 
509 	/* snapshot tunables */
510 	keep_idle  = *ksocknal_tunables.ksnd_keepalive_idle;
511 	keep_count = *ksocknal_tunables.ksnd_keepalive_count;
512 	keep_intvl = *ksocknal_tunables.ksnd_keepalive_intvl;
513 
514 	do_keepalive = (keep_idle > 0 && keep_count > 0 && keep_intvl > 0);
515 
516 	option = (do_keepalive ? 1 : 0);
517 	rc = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
518 			      (char *)&option, sizeof(option));
519 	if (rc != 0) {
520 		CERROR("Can't set SO_KEEPALIVE: %d\n", rc);
521 		return rc;
522 	}
523 
524 	if (!do_keepalive)
525 		return 0;
526 
527 	rc = kernel_setsockopt(sock, SOL_TCP, TCP_KEEPIDLE,
528 				    (char *)&keep_idle, sizeof(keep_idle));
529 	if (rc != 0) {
530 		CERROR("Can't set TCP_KEEPIDLE: %d\n", rc);
531 		return rc;
532 	}
533 
534 	rc = kernel_setsockopt(sock, SOL_TCP, TCP_KEEPINTVL,
535 				    (char *)&keep_intvl, sizeof(keep_intvl));
536 	if (rc != 0) {
537 		CERROR("Can't set TCP_KEEPINTVL: %d\n", rc);
538 		return rc;
539 	}
540 
541 	rc = kernel_setsockopt(sock, SOL_TCP, TCP_KEEPCNT,
542 				    (char *)&keep_count, sizeof(keep_count));
543 	if (rc != 0) {
544 		CERROR("Can't set TCP_KEEPCNT: %d\n", rc);
545 		return rc;
546 	}
547 
548 	return 0;
549 }
550 
551 void
ksocknal_lib_push_conn(ksock_conn_t * conn)552 ksocknal_lib_push_conn(ksock_conn_t *conn)
553 {
554 	struct sock *sk;
555 	struct tcp_sock *tp;
556 	int nonagle;
557 	int val = 1;
558 	int rc;
559 
560 	rc = ksocknal_connsock_addref(conn);
561 	if (rc != 0)			    /* being shut down */
562 		return;
563 
564 	sk = conn->ksnc_sock->sk;
565 	tp = tcp_sk(sk);
566 
567 	lock_sock(sk);
568 	nonagle = tp->nonagle;
569 	tp->nonagle = 1;
570 	release_sock(sk);
571 
572 	rc = kernel_setsockopt(conn->ksnc_sock, SOL_TCP, TCP_NODELAY,
573 				      (char *)&val, sizeof(val));
574 	LASSERT(rc == 0);
575 
576 	lock_sock(sk);
577 	tp->nonagle = nonagle;
578 	release_sock(sk);
579 
580 	ksocknal_connsock_decref(conn);
581 }
582 
583 extern void ksocknal_read_callback(ksock_conn_t *conn);
584 extern void ksocknal_write_callback(ksock_conn_t *conn);
585 /*
586  * socket call back in Linux
587  */
588 static void
ksocknal_data_ready(struct sock * sk)589 ksocknal_data_ready(struct sock *sk)
590 {
591 	ksock_conn_t *conn;
592 
593 	/* interleave correctly with closing sockets... */
594 	LASSERT(!in_irq());
595 	read_lock(&ksocknal_data.ksnd_global_lock);
596 
597 	conn = sk->sk_user_data;
598 	if (conn == NULL) {	     /* raced with ksocknal_terminate_conn */
599 		LASSERT(sk->sk_data_ready != &ksocknal_data_ready);
600 		sk->sk_data_ready(sk);
601 	} else
602 		ksocknal_read_callback(conn);
603 
604 	read_unlock(&ksocknal_data.ksnd_global_lock);
605 }
606 
607 static void
ksocknal_write_space(struct sock * sk)608 ksocknal_write_space(struct sock *sk)
609 {
610 	ksock_conn_t *conn;
611 	int wspace;
612 	int min_wpace;
613 
614 	/* interleave correctly with closing sockets... */
615 	LASSERT(!in_irq());
616 	read_lock(&ksocknal_data.ksnd_global_lock);
617 
618 	conn = sk->sk_user_data;
619 	wspace = sk_stream_wspace(sk);
620 	min_wpace = sk_stream_min_wspace(sk);
621 
622 	CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
623 	       sk, wspace, min_wpace, conn,
624 	       (conn == NULL) ? "" : (conn->ksnc_tx_ready ?
625 				      " ready" : " blocked"),
626 	       (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
627 				      " scheduled" : " idle"),
628 	       (conn == NULL) ? "" : (list_empty(&conn->ksnc_tx_queue) ?
629 				      " empty" : " queued"));
630 
631 	if (conn == NULL) {	     /* raced with ksocknal_terminate_conn */
632 		LASSERT(sk->sk_write_space != &ksocknal_write_space);
633 		sk->sk_write_space(sk);
634 
635 		read_unlock(&ksocknal_data.ksnd_global_lock);
636 		return;
637 	}
638 
639 	if (wspace >= min_wpace) {	      /* got enough space */
640 		ksocknal_write_callback(conn);
641 
642 		/* Clear SOCK_NOSPACE _after_ ksocknal_write_callback so the
643 		 * ENOMEM check in ksocknal_transmit is race-free (think about
644 		 * it). */
645 
646 		clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
647 	}
648 
649 	read_unlock(&ksocknal_data.ksnd_global_lock);
650 }
651 
652 void
ksocknal_lib_save_callback(struct socket * sock,ksock_conn_t * conn)653 ksocknal_lib_save_callback(struct socket *sock, ksock_conn_t *conn)
654 {
655 	conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
656 	conn->ksnc_saved_write_space = sock->sk->sk_write_space;
657 }
658 
659 void
ksocknal_lib_set_callback(struct socket * sock,ksock_conn_t * conn)660 ksocknal_lib_set_callback(struct socket *sock,  ksock_conn_t *conn)
661 {
662 	sock->sk->sk_user_data = conn;
663 	sock->sk->sk_data_ready = ksocknal_data_ready;
664 	sock->sk->sk_write_space = ksocknal_write_space;
665 	return;
666 }
667 
668 void
ksocknal_lib_reset_callback(struct socket * sock,ksock_conn_t * conn)669 ksocknal_lib_reset_callback(struct socket *sock, ksock_conn_t *conn)
670 {
671 	/* Remove conn's network callbacks.
672 	 * NB I _have_ to restore the callback, rather than storing a noop,
673 	 * since the socket could survive past this module being unloaded!! */
674 	sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
675 	sock->sk->sk_write_space = conn->ksnc_saved_write_space;
676 
677 	/* A callback could be in progress already; they hold a read lock
678 	 * on ksnd_global_lock (to serialise with me) and NOOP if
679 	 * sk_user_data is NULL. */
680 	sock->sk->sk_user_data = NULL;
681 
682 	return ;
683 }
684 
685 int
ksocknal_lib_memory_pressure(ksock_conn_t * conn)686 ksocknal_lib_memory_pressure(ksock_conn_t *conn)
687 {
688 	int rc = 0;
689 	ksock_sched_t *sched;
690 
691 	sched = conn->ksnc_scheduler;
692 	spin_lock_bh(&sched->kss_lock);
693 
694 	if (!test_bit(SOCK_NOSPACE, &conn->ksnc_sock->flags) &&
695 	    !conn->ksnc_tx_ready) {
696 		/* SOCK_NOSPACE is set when the socket fills
697 		 * and cleared in the write_space callback
698 		 * (which also sets ksnc_tx_ready).  If
699 		 * SOCK_NOSPACE and ksnc_tx_ready are BOTH
700 		 * zero, I didn't fill the socket and
701 		 * write_space won't reschedule me, so I
702 		 * return -ENOMEM to get my caller to retry
703 		 * after a timeout */
704 		rc = -ENOMEM;
705 	}
706 
707 	spin_unlock_bh(&sched->kss_lock);
708 
709 	return rc;
710 }
711