1/*
2 * linux/net/sunrpc/xprtsock.c
3 *
4 * Client-side transport implementation for sockets.
5 *
6 * TCP callback races fixes (C) 1998 Red Hat
7 * TCP send fixes (C) 1998 Red Hat
8 * TCP NFS related read + write fixes
9 *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10 *
11 * Rewrite of larges part of the code in order to stabilize TCP stuff.
12 * Fix behaviour when socket buffer is full.
13 *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14 *
15 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16 *
17 * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
18 *   <gilles.quillard@bull.net>
19 */
20
21#include <linux/types.h>
22#include <linux/string.h>
23#include <linux/slab.h>
24#include <linux/module.h>
25#include <linux/capability.h>
26#include <linux/pagemap.h>
27#include <linux/errno.h>
28#include <linux/socket.h>
29#include <linux/in.h>
30#include <linux/net.h>
31#include <linux/mm.h>
32#include <linux/un.h>
33#include <linux/udp.h>
34#include <linux/tcp.h>
35#include <linux/sunrpc/clnt.h>
36#include <linux/sunrpc/addr.h>
37#include <linux/sunrpc/sched.h>
38#include <linux/sunrpc/svcsock.h>
39#include <linux/sunrpc/xprtsock.h>
40#include <linux/file.h>
41#ifdef CONFIG_SUNRPC_BACKCHANNEL
42#include <linux/sunrpc/bc_xprt.h>
43#endif
44
45#include <net/sock.h>
46#include <net/checksum.h>
47#include <net/udp.h>
48#include <net/tcp.h>
49
50#include <trace/events/sunrpc.h>
51
52#include "sunrpc.h"
53
54static void xs_close(struct rpc_xprt *xprt);
55
56/*
57 * xprtsock tunables
58 */
59static unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
60static unsigned int xprt_tcp_slot_table_entries = RPC_MIN_SLOT_TABLE;
61static unsigned int xprt_max_tcp_slot_table_entries = RPC_MAX_SLOT_TABLE;
62
63static unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
64static unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
65
66#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
67
68#define XS_TCP_LINGER_TO	(15U * HZ)
69static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
70
71/*
72 * We can register our own files under /proc/sys/sunrpc by
73 * calling register_sysctl_table() again.  The files in that
74 * directory become the union of all files registered there.
75 *
76 * We simply need to make sure that we don't collide with
77 * someone else's file names!
78 */
79
80static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
81static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
82static unsigned int max_tcp_slot_table_limit = RPC_MAX_SLOT_TABLE_LIMIT;
83static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
84static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
85
86static struct ctl_table_header *sunrpc_table_header;
87
88/*
89 * FIXME: changing the UDP slot table size should also resize the UDP
90 *        socket buffers for existing UDP transports
91 */
92static struct ctl_table xs_tunables_table[] = {
93	{
94		.procname	= "udp_slot_table_entries",
95		.data		= &xprt_udp_slot_table_entries,
96		.maxlen		= sizeof(unsigned int),
97		.mode		= 0644,
98		.proc_handler	= proc_dointvec_minmax,
99		.extra1		= &min_slot_table_size,
100		.extra2		= &max_slot_table_size
101	},
102	{
103		.procname	= "tcp_slot_table_entries",
104		.data		= &xprt_tcp_slot_table_entries,
105		.maxlen		= sizeof(unsigned int),
106		.mode		= 0644,
107		.proc_handler	= proc_dointvec_minmax,
108		.extra1		= &min_slot_table_size,
109		.extra2		= &max_slot_table_size
110	},
111	{
112		.procname	= "tcp_max_slot_table_entries",
113		.data		= &xprt_max_tcp_slot_table_entries,
114		.maxlen		= sizeof(unsigned int),
115		.mode		= 0644,
116		.proc_handler	= proc_dointvec_minmax,
117		.extra1		= &min_slot_table_size,
118		.extra2		= &max_tcp_slot_table_limit
119	},
120	{
121		.procname	= "min_resvport",
122		.data		= &xprt_min_resvport,
123		.maxlen		= sizeof(unsigned int),
124		.mode		= 0644,
125		.proc_handler	= proc_dointvec_minmax,
126		.extra1		= &xprt_min_resvport_limit,
127		.extra2		= &xprt_max_resvport_limit
128	},
129	{
130		.procname	= "max_resvport",
131		.data		= &xprt_max_resvport,
132		.maxlen		= sizeof(unsigned int),
133		.mode		= 0644,
134		.proc_handler	= proc_dointvec_minmax,
135		.extra1		= &xprt_min_resvport_limit,
136		.extra2		= &xprt_max_resvport_limit
137	},
138	{
139		.procname	= "tcp_fin_timeout",
140		.data		= &xs_tcp_fin_timeout,
141		.maxlen		= sizeof(xs_tcp_fin_timeout),
142		.mode		= 0644,
143		.proc_handler	= proc_dointvec_jiffies,
144	},
145	{ },
146};
147
148static struct ctl_table sunrpc_table[] = {
149	{
150		.procname	= "sunrpc",
151		.mode		= 0555,
152		.child		= xs_tunables_table
153	},
154	{ },
155};
156
157#endif
158
159/*
160 * Wait duration for a reply from the RPC portmapper.
161 */
162#define XS_BIND_TO		(60U * HZ)
163
164/*
165 * Delay if a UDP socket connect error occurs.  This is most likely some
166 * kind of resource problem on the local host.
167 */
168#define XS_UDP_REEST_TO		(2U * HZ)
169
170/*
171 * The reestablish timeout allows clients to delay for a bit before attempting
172 * to reconnect to a server that just dropped our connection.
173 *
174 * We implement an exponential backoff when trying to reestablish a TCP
175 * transport connection with the server.  Some servers like to drop a TCP
176 * connection when they are overworked, so we start with a short timeout and
177 * increase over time if the server is down or not responding.
178 */
179#define XS_TCP_INIT_REEST_TO	(3U * HZ)
180#define XS_TCP_MAX_REEST_TO	(5U * 60 * HZ)
181
182/*
183 * TCP idle timeout; client drops the transport socket if it is idle
184 * for this long.  Note that we also timeout UDP sockets to prevent
185 * holding port numbers when there is no RPC traffic.
186 */
187#define XS_IDLE_DISC_TO		(5U * 60 * HZ)
188
189#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
190# undef  RPC_DEBUG_DATA
191# define RPCDBG_FACILITY	RPCDBG_TRANS
192#endif
193
194#ifdef RPC_DEBUG_DATA
195static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
196{
197	u8 *buf = (u8 *) packet;
198	int j;
199
200	dprintk("RPC:       %s\n", msg);
201	for (j = 0; j < count && j < 128; j += 4) {
202		if (!(j & 31)) {
203			if (j)
204				dprintk("\n");
205			dprintk("0x%04x ", j);
206		}
207		dprintk("%02x%02x%02x%02x ",
208			buf[j], buf[j+1], buf[j+2], buf[j+3]);
209	}
210	dprintk("\n");
211}
212#else
213static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
214{
215	/* NOP */
216}
217#endif
218
219static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
220{
221	return (struct rpc_xprt *) sk->sk_user_data;
222}
223
224static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
225{
226	return (struct sockaddr *) &xprt->addr;
227}
228
229static inline struct sockaddr_un *xs_addr_un(struct rpc_xprt *xprt)
230{
231	return (struct sockaddr_un *) &xprt->addr;
232}
233
234static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
235{
236	return (struct sockaddr_in *) &xprt->addr;
237}
238
239static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
240{
241	return (struct sockaddr_in6 *) &xprt->addr;
242}
243
244static void xs_format_common_peer_addresses(struct rpc_xprt *xprt)
245{
246	struct sockaddr *sap = xs_addr(xprt);
247	struct sockaddr_in6 *sin6;
248	struct sockaddr_in *sin;
249	struct sockaddr_un *sun;
250	char buf[128];
251
252	switch (sap->sa_family) {
253	case AF_LOCAL:
254		sun = xs_addr_un(xprt);
255		strlcpy(buf, sun->sun_path, sizeof(buf));
256		xprt->address_strings[RPC_DISPLAY_ADDR] =
257						kstrdup(buf, GFP_KERNEL);
258		break;
259	case AF_INET:
260		(void)rpc_ntop(sap, buf, sizeof(buf));
261		xprt->address_strings[RPC_DISPLAY_ADDR] =
262						kstrdup(buf, GFP_KERNEL);
263		sin = xs_addr_in(xprt);
264		snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
265		break;
266	case AF_INET6:
267		(void)rpc_ntop(sap, buf, sizeof(buf));
268		xprt->address_strings[RPC_DISPLAY_ADDR] =
269						kstrdup(buf, GFP_KERNEL);
270		sin6 = xs_addr_in6(xprt);
271		snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
272		break;
273	default:
274		BUG();
275	}
276
277	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
278}
279
280static void xs_format_common_peer_ports(struct rpc_xprt *xprt)
281{
282	struct sockaddr *sap = xs_addr(xprt);
283	char buf[128];
284
285	snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
286	xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
287
288	snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
289	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
290}
291
292static void xs_format_peer_addresses(struct rpc_xprt *xprt,
293				     const char *protocol,
294				     const char *netid)
295{
296	xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
297	xprt->address_strings[RPC_DISPLAY_NETID] = netid;
298	xs_format_common_peer_addresses(xprt);
299	xs_format_common_peer_ports(xprt);
300}
301
302static void xs_update_peer_port(struct rpc_xprt *xprt)
303{
304	kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
305	kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
306
307	xs_format_common_peer_ports(xprt);
308}
309
310static void xs_free_peer_addresses(struct rpc_xprt *xprt)
311{
312	unsigned int i;
313
314	for (i = 0; i < RPC_DISPLAY_MAX; i++)
315		switch (i) {
316		case RPC_DISPLAY_PROTO:
317		case RPC_DISPLAY_NETID:
318			continue;
319		default:
320			kfree(xprt->address_strings[i]);
321		}
322}
323
324#define XS_SENDMSG_FLAGS	(MSG_DONTWAIT | MSG_NOSIGNAL)
325
326static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
327{
328	struct msghdr msg = {
329		.msg_name	= addr,
330		.msg_namelen	= addrlen,
331		.msg_flags	= XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
332	};
333	struct kvec iov = {
334		.iov_base	= vec->iov_base + base,
335		.iov_len	= vec->iov_len - base,
336	};
337
338	if (iov.iov_len != 0)
339		return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
340	return kernel_sendmsg(sock, &msg, NULL, 0, 0);
341}
342
343static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more, bool zerocopy, int *sent_p)
344{
345	ssize_t (*do_sendpage)(struct socket *sock, struct page *page,
346			int offset, size_t size, int flags);
347	struct page **ppage;
348	unsigned int remainder;
349	int err;
350
351	remainder = xdr->page_len - base;
352	base += xdr->page_base;
353	ppage = xdr->pages + (base >> PAGE_SHIFT);
354	base &= ~PAGE_MASK;
355	do_sendpage = sock->ops->sendpage;
356	if (!zerocopy)
357		do_sendpage = sock_no_sendpage;
358	for(;;) {
359		unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
360		int flags = XS_SENDMSG_FLAGS;
361
362		remainder -= len;
363		if (remainder != 0 || more)
364			flags |= MSG_MORE;
365		err = do_sendpage(sock, *ppage, base, len, flags);
366		if (remainder == 0 || err != len)
367			break;
368		*sent_p += err;
369		ppage++;
370		base = 0;
371	}
372	if (err > 0) {
373		*sent_p += err;
374		err = 0;
375	}
376	return err;
377}
378
379/**
380 * xs_sendpages - write pages directly to a socket
381 * @sock: socket to send on
382 * @addr: UDP only -- address of destination
383 * @addrlen: UDP only -- length of destination address
384 * @xdr: buffer containing this request
385 * @base: starting position in the buffer
386 * @zerocopy: true if it is safe to use sendpage()
387 * @sent_p: return the total number of bytes successfully queued for sending
388 *
389 */
390static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, bool zerocopy, int *sent_p)
391{
392	unsigned int remainder = xdr->len - base;
393	int err = 0;
394	int sent = 0;
395
396	if (unlikely(!sock))
397		return -ENOTSOCK;
398
399	clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
400	if (base != 0) {
401		addr = NULL;
402		addrlen = 0;
403	}
404
405	if (base < xdr->head[0].iov_len || addr != NULL) {
406		unsigned int len = xdr->head[0].iov_len - base;
407		remainder -= len;
408		err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
409		if (remainder == 0 || err != len)
410			goto out;
411		*sent_p += err;
412		base = 0;
413	} else
414		base -= xdr->head[0].iov_len;
415
416	if (base < xdr->page_len) {
417		unsigned int len = xdr->page_len - base;
418		remainder -= len;
419		err = xs_send_pagedata(sock, xdr, base, remainder != 0, zerocopy, &sent);
420		*sent_p += sent;
421		if (remainder == 0 || sent != len)
422			goto out;
423		base = 0;
424	} else
425		base -= xdr->page_len;
426
427	if (base >= xdr->tail[0].iov_len)
428		return 0;
429	err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
430out:
431	if (err > 0) {
432		*sent_p += err;
433		err = 0;
434	}
435	return err;
436}
437
438static void xs_nospace_callback(struct rpc_task *task)
439{
440	struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);
441
442	transport->inet->sk_write_pending--;
443	clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
444}
445
446/**
447 * xs_nospace - place task on wait queue if transmit was incomplete
448 * @task: task to put to sleep
449 *
450 */
451static int xs_nospace(struct rpc_task *task)
452{
453	struct rpc_rqst *req = task->tk_rqstp;
454	struct rpc_xprt *xprt = req->rq_xprt;
455	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
456	struct sock *sk = transport->inet;
457	int ret = -EAGAIN;
458
459	dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
460			task->tk_pid, req->rq_slen - req->rq_bytes_sent,
461			req->rq_slen);
462
463	/* Protect against races with write_space */
464	spin_lock_bh(&xprt->transport_lock);
465
466	/* Don't race with disconnect */
467	if (xprt_connected(xprt)) {
468		if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
469			/*
470			 * Notify TCP that we're limited by the application
471			 * window size
472			 */
473			set_bit(SOCK_NOSPACE, &transport->sock->flags);
474			sk->sk_write_pending++;
475			/* ...and wait for more buffer space */
476			xprt_wait_for_buffer_space(task, xs_nospace_callback);
477		}
478	} else {
479		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
480		ret = -ENOTCONN;
481	}
482
483	spin_unlock_bh(&xprt->transport_lock);
484
485	/* Race breaker in case memory is freed before above code is called */
486	sk->sk_write_space(sk);
487	return ret;
488}
489
490/*
491 * Construct a stream transport record marker in @buf.
492 */
493static inline void xs_encode_stream_record_marker(struct xdr_buf *buf)
494{
495	u32 reclen = buf->len - sizeof(rpc_fraghdr);
496	rpc_fraghdr *base = buf->head[0].iov_base;
497	*base = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | reclen);
498}
499
500/**
501 * xs_local_send_request - write an RPC request to an AF_LOCAL socket
502 * @task: RPC task that manages the state of an RPC request
503 *
504 * Return values:
505 *        0:	The request has been sent
506 *   EAGAIN:	The socket was blocked, please call again later to
507 *		complete the request
508 * ENOTCONN:	Caller needs to invoke connect logic then call again
509 *    other:	Some other error occured, the request was not sent
510 */
511static int xs_local_send_request(struct rpc_task *task)
512{
513	struct rpc_rqst *req = task->tk_rqstp;
514	struct rpc_xprt *xprt = req->rq_xprt;
515	struct sock_xprt *transport =
516				container_of(xprt, struct sock_xprt, xprt);
517	struct xdr_buf *xdr = &req->rq_snd_buf;
518	int status;
519	int sent = 0;
520
521	xs_encode_stream_record_marker(&req->rq_snd_buf);
522
523	xs_pktdump("packet data:",
524			req->rq_svec->iov_base, req->rq_svec->iov_len);
525
526	status = xs_sendpages(transport->sock, NULL, 0, xdr, req->rq_bytes_sent,
527			      true, &sent);
528	dprintk("RPC:       %s(%u) = %d\n",
529			__func__, xdr->len - req->rq_bytes_sent, status);
530	if (likely(sent > 0) || status == 0) {
531		req->rq_bytes_sent += sent;
532		req->rq_xmit_bytes_sent += sent;
533		if (likely(req->rq_bytes_sent >= req->rq_slen)) {
534			req->rq_bytes_sent = 0;
535			return 0;
536		}
537		status = -EAGAIN;
538	}
539
540	switch (status) {
541	case -ENOBUFS:
542	case -EAGAIN:
543		status = xs_nospace(task);
544		break;
545	default:
546		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
547			-status);
548	case -EPIPE:
549		xs_close(xprt);
550		status = -ENOTCONN;
551	}
552
553	return status;
554}
555
556/**
557 * xs_udp_send_request - write an RPC request to a UDP socket
558 * @task: address of RPC task that manages the state of an RPC request
559 *
560 * Return values:
561 *        0:	The request has been sent
562 *   EAGAIN:	The socket was blocked, please call again later to
563 *		complete the request
564 * ENOTCONN:	Caller needs to invoke connect logic then call again
565 *    other:	Some other error occurred, the request was not sent
566 */
567static int xs_udp_send_request(struct rpc_task *task)
568{
569	struct rpc_rqst *req = task->tk_rqstp;
570	struct rpc_xprt *xprt = req->rq_xprt;
571	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
572	struct xdr_buf *xdr = &req->rq_snd_buf;
573	int sent = 0;
574	int status;
575
576	xs_pktdump("packet data:",
577				req->rq_svec->iov_base,
578				req->rq_svec->iov_len);
579
580	if (!xprt_bound(xprt))
581		return -ENOTCONN;
582	status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen,
583			      xdr, req->rq_bytes_sent, true, &sent);
584
585	dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
586			xdr->len - req->rq_bytes_sent, status);
587
588	/* firewall is blocking us, don't return -EAGAIN or we end up looping */
589	if (status == -EPERM)
590		goto process_status;
591
592	if (sent > 0 || status == 0) {
593		req->rq_xmit_bytes_sent += sent;
594		if (sent >= req->rq_slen)
595			return 0;
596		/* Still some bytes left; set up for a retry later. */
597		status = -EAGAIN;
598	}
599
600process_status:
601	switch (status) {
602	case -ENOTSOCK:
603		status = -ENOTCONN;
604		/* Should we call xs_close() here? */
605		break;
606	case -EAGAIN:
607		status = xs_nospace(task);
608		break;
609	default:
610		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
611			-status);
612	case -ENETUNREACH:
613	case -ENOBUFS:
614	case -EPIPE:
615	case -ECONNREFUSED:
616	case -EPERM:
617		/* When the server has died, an ICMP port unreachable message
618		 * prompts ECONNREFUSED. */
619		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
620	}
621
622	return status;
623}
624
625/**
626 * xs_tcp_shutdown - gracefully shut down a TCP socket
627 * @xprt: transport
628 *
629 * Initiates a graceful shutdown of the TCP socket by calling the
630 * equivalent of shutdown(SHUT_RDWR);
631 */
632static void xs_tcp_shutdown(struct rpc_xprt *xprt)
633{
634	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
635	struct socket *sock = transport->sock;
636
637	if (sock != NULL) {
638		kernel_sock_shutdown(sock, SHUT_RDWR);
639		trace_rpc_socket_shutdown(xprt, sock);
640	}
641}
642
643/**
644 * xs_tcp_send_request - write an RPC request to a TCP socket
645 * @task: address of RPC task that manages the state of an RPC request
646 *
647 * Return values:
648 *        0:	The request has been sent
649 *   EAGAIN:	The socket was blocked, please call again later to
650 *		complete the request
651 * ENOTCONN:	Caller needs to invoke connect logic then call again
652 *    other:	Some other error occurred, the request was not sent
653 *
654 * XXX: In the case of soft timeouts, should we eventually give up
655 *	if sendmsg is not able to make progress?
656 */
657static int xs_tcp_send_request(struct rpc_task *task)
658{
659	struct rpc_rqst *req = task->tk_rqstp;
660	struct rpc_xprt *xprt = req->rq_xprt;
661	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
662	struct xdr_buf *xdr = &req->rq_snd_buf;
663	bool zerocopy = true;
664	int status;
665	int sent;
666
667	xs_encode_stream_record_marker(&req->rq_snd_buf);
668
669	xs_pktdump("packet data:",
670				req->rq_svec->iov_base,
671				req->rq_svec->iov_len);
672	/* Don't use zero copy if this is a resend. If the RPC call
673	 * completes while the socket holds a reference to the pages,
674	 * then we may end up resending corrupted data.
675	 */
676	if (task->tk_flags & RPC_TASK_SENT)
677		zerocopy = false;
678
679	/* Continue transmitting the packet/record. We must be careful
680	 * to cope with writespace callbacks arriving _after_ we have
681	 * called sendmsg(). */
682	while (1) {
683		sent = 0;
684		status = xs_sendpages(transport->sock, NULL, 0, xdr,
685				      req->rq_bytes_sent, zerocopy, &sent);
686
687		dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
688				xdr->len - req->rq_bytes_sent, status);
689
690		if (unlikely(sent == 0 && status < 0))
691			break;
692
693		/* If we've sent the entire packet, immediately
694		 * reset the count of bytes sent. */
695		req->rq_bytes_sent += sent;
696		req->rq_xmit_bytes_sent += sent;
697		if (likely(req->rq_bytes_sent >= req->rq_slen)) {
698			req->rq_bytes_sent = 0;
699			return 0;
700		}
701
702		if (sent != 0)
703			continue;
704		status = -EAGAIN;
705		break;
706	}
707
708	switch (status) {
709	case -ENOTSOCK:
710		status = -ENOTCONN;
711		/* Should we call xs_close() here? */
712		break;
713	case -ENOBUFS:
714	case -EAGAIN:
715		status = xs_nospace(task);
716		break;
717	default:
718		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
719			-status);
720	case -ECONNRESET:
721	case -ECONNREFUSED:
722	case -ENOTCONN:
723	case -EADDRINUSE:
724	case -EPIPE:
725		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
726	}
727
728	return status;
729}
730
731/**
732 * xs_tcp_release_xprt - clean up after a tcp transmission
733 * @xprt: transport
734 * @task: rpc task
735 *
736 * This cleans up if an error causes us to abort the transmission of a request.
737 * In this case, the socket may need to be reset in order to avoid confusing
738 * the server.
739 */
740static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
741{
742	struct rpc_rqst *req;
743
744	if (task != xprt->snd_task)
745		return;
746	if (task == NULL)
747		goto out_release;
748	req = task->tk_rqstp;
749	if (req == NULL)
750		goto out_release;
751	if (req->rq_bytes_sent == 0)
752		goto out_release;
753	if (req->rq_bytes_sent == req->rq_snd_buf.len)
754		goto out_release;
755	set_bit(XPRT_CLOSE_WAIT, &xprt->state);
756out_release:
757	xprt_release_xprt(xprt, task);
758}
759
760static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
761{
762	transport->old_data_ready = sk->sk_data_ready;
763	transport->old_state_change = sk->sk_state_change;
764	transport->old_write_space = sk->sk_write_space;
765	transport->old_error_report = sk->sk_error_report;
766}
767
768static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
769{
770	sk->sk_data_ready = transport->old_data_ready;
771	sk->sk_state_change = transport->old_state_change;
772	sk->sk_write_space = transport->old_write_space;
773	sk->sk_error_report = transport->old_error_report;
774}
775
776static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
777{
778	smp_mb__before_atomic();
779	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
780	clear_bit(XPRT_CLOSING, &xprt->state);
781	smp_mb__after_atomic();
782}
783
784static void xs_sock_mark_closed(struct rpc_xprt *xprt)
785{
786	xs_sock_reset_connection_flags(xprt);
787	/* Mark transport as closed and wake up all pending tasks */
788	xprt_disconnect_done(xprt);
789}
790
791/**
792 * xs_error_report - callback to handle TCP socket state errors
793 * @sk: socket
794 *
795 * Note: we don't call sock_error() since there may be a rpc_task
796 * using the socket, and so we don't want to clear sk->sk_err.
797 */
798static void xs_error_report(struct sock *sk)
799{
800	struct rpc_xprt *xprt;
801	int err;
802
803	read_lock_bh(&sk->sk_callback_lock);
804	if (!(xprt = xprt_from_sock(sk)))
805		goto out;
806
807	err = -sk->sk_err;
808	if (err == 0)
809		goto out;
810	/* Is this a reset event? */
811	if (sk->sk_state == TCP_CLOSE)
812		xs_sock_mark_closed(xprt);
813	dprintk("RPC:       xs_error_report client %p, error=%d...\n",
814			xprt, -err);
815	trace_rpc_socket_error(xprt, sk->sk_socket, err);
816	xprt_wake_pending_tasks(xprt, err);
817 out:
818	read_unlock_bh(&sk->sk_callback_lock);
819}
820
821static void xs_reset_transport(struct sock_xprt *transport)
822{
823	struct socket *sock = transport->sock;
824	struct sock *sk = transport->inet;
825	struct rpc_xprt *xprt = &transport->xprt;
826
827	if (sk == NULL)
828		return;
829
830	write_lock_bh(&sk->sk_callback_lock);
831	transport->inet = NULL;
832	transport->sock = NULL;
833
834	sk->sk_user_data = NULL;
835
836	xs_restore_old_callbacks(transport, sk);
837	xprt_clear_connected(xprt);
838	write_unlock_bh(&sk->sk_callback_lock);
839	xs_sock_reset_connection_flags(xprt);
840
841	trace_rpc_socket_close(xprt, sock);
842	sock_release(sock);
843}
844
845/**
846 * xs_close - close a socket
847 * @xprt: transport
848 *
849 * This is used when all requests are complete; ie, no DRC state remains
850 * on the server we want to save.
851 *
852 * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with
853 * xs_reset_transport() zeroing the socket from underneath a writer.
854 */
855static void xs_close(struct rpc_xprt *xprt)
856{
857	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
858
859	dprintk("RPC:       xs_close xprt %p\n", xprt);
860
861	xs_reset_transport(transport);
862	xprt->reestablish_timeout = 0;
863
864	xprt_disconnect_done(xprt);
865}
866
867static void xs_xprt_free(struct rpc_xprt *xprt)
868{
869	xs_free_peer_addresses(xprt);
870	xprt_free(xprt);
871}
872
873/**
874 * xs_destroy - prepare to shutdown a transport
875 * @xprt: doomed transport
876 *
877 */
878static void xs_destroy(struct rpc_xprt *xprt)
879{
880	dprintk("RPC:       xs_destroy xprt %p\n", xprt);
881
882	xs_close(xprt);
883	xs_xprt_free(xprt);
884	module_put(THIS_MODULE);
885}
886
887static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
888{
889	struct xdr_skb_reader desc = {
890		.skb		= skb,
891		.offset		= sizeof(rpc_fraghdr),
892		.count		= skb->len - sizeof(rpc_fraghdr),
893	};
894
895	if (xdr_partial_copy_from_skb(xdr, 0, &desc, xdr_skb_read_bits) < 0)
896		return -1;
897	if (desc.count)
898		return -1;
899	return 0;
900}
901
902/**
903 * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets
904 * @sk: socket with data to read
905 * @len: how much data to read
906 *
907 * Currently this assumes we can read the whole reply in a single gulp.
908 */
909static void xs_local_data_ready(struct sock *sk)
910{
911	struct rpc_task *task;
912	struct rpc_xprt *xprt;
913	struct rpc_rqst *rovr;
914	struct sk_buff *skb;
915	int err, repsize, copied;
916	u32 _xid;
917	__be32 *xp;
918
919	read_lock_bh(&sk->sk_callback_lock);
920	dprintk("RPC:       %s...\n", __func__);
921	xprt = xprt_from_sock(sk);
922	if (xprt == NULL)
923		goto out;
924
925	skb = skb_recv_datagram(sk, 0, 1, &err);
926	if (skb == NULL)
927		goto out;
928
929	repsize = skb->len - sizeof(rpc_fraghdr);
930	if (repsize < 4) {
931		dprintk("RPC:       impossible RPC reply size %d\n", repsize);
932		goto dropit;
933	}
934
935	/* Copy the XID from the skb... */
936	xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid);
937	if (xp == NULL)
938		goto dropit;
939
940	/* Look up and lock the request corresponding to the given XID */
941	spin_lock(&xprt->transport_lock);
942	rovr = xprt_lookup_rqst(xprt, *xp);
943	if (!rovr)
944		goto out_unlock;
945	task = rovr->rq_task;
946
947	copied = rovr->rq_private_buf.buflen;
948	if (copied > repsize)
949		copied = repsize;
950
951	if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
952		dprintk("RPC:       sk_buff copy failed\n");
953		goto out_unlock;
954	}
955
956	xprt_complete_rqst(task, copied);
957
958 out_unlock:
959	spin_unlock(&xprt->transport_lock);
960 dropit:
961	skb_free_datagram(sk, skb);
962 out:
963	read_unlock_bh(&sk->sk_callback_lock);
964}
965
966/**
967 * xs_udp_data_ready - "data ready" callback for UDP sockets
968 * @sk: socket with data to read
969 * @len: how much data to read
970 *
971 */
972static void xs_udp_data_ready(struct sock *sk)
973{
974	struct rpc_task *task;
975	struct rpc_xprt *xprt;
976	struct rpc_rqst *rovr;
977	struct sk_buff *skb;
978	int err, repsize, copied;
979	u32 _xid;
980	__be32 *xp;
981
982	read_lock_bh(&sk->sk_callback_lock);
983	dprintk("RPC:       xs_udp_data_ready...\n");
984	if (!(xprt = xprt_from_sock(sk)))
985		goto out;
986
987	if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
988		goto out;
989
990	repsize = skb->len - sizeof(struct udphdr);
991	if (repsize < 4) {
992		dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
993		goto dropit;
994	}
995
996	/* Copy the XID from the skb... */
997	xp = skb_header_pointer(skb, sizeof(struct udphdr),
998				sizeof(_xid), &_xid);
999	if (xp == NULL)
1000		goto dropit;
1001
1002	/* Look up and lock the request corresponding to the given XID */
1003	spin_lock(&xprt->transport_lock);
1004	rovr = xprt_lookup_rqst(xprt, *xp);
1005	if (!rovr)
1006		goto out_unlock;
1007	task = rovr->rq_task;
1008
1009	if ((copied = rovr->rq_private_buf.buflen) > repsize)
1010		copied = repsize;
1011
1012	/* Suck it into the iovec, verify checksum if not done by hw. */
1013	if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
1014		UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
1015		goto out_unlock;
1016	}
1017
1018	UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
1019
1020	xprt_adjust_cwnd(xprt, task, copied);
1021	xprt_complete_rqst(task, copied);
1022
1023 out_unlock:
1024	spin_unlock(&xprt->transport_lock);
1025 dropit:
1026	skb_free_datagram(sk, skb);
1027 out:
1028	read_unlock_bh(&sk->sk_callback_lock);
1029}
1030
1031/*
1032 * Helper function to force a TCP close if the server is sending
1033 * junk and/or it has put us in CLOSE_WAIT
1034 */
1035static void xs_tcp_force_close(struct rpc_xprt *xprt)
1036{
1037	xprt_force_disconnect(xprt);
1038}
1039
1040static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
1041{
1042	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1043	size_t len, used;
1044	char *p;
1045
1046	p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
1047	len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
1048	used = xdr_skb_read_bits(desc, p, len);
1049	transport->tcp_offset += used;
1050	if (used != len)
1051		return;
1052
1053	transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
1054	if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
1055		transport->tcp_flags |= TCP_RCV_LAST_FRAG;
1056	else
1057		transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
1058	transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
1059
1060	transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
1061	transport->tcp_offset = 0;
1062
1063	/* Sanity check of the record length */
1064	if (unlikely(transport->tcp_reclen < 8)) {
1065		dprintk("RPC:       invalid TCP record fragment length\n");
1066		xs_tcp_force_close(xprt);
1067		return;
1068	}
1069	dprintk("RPC:       reading TCP record fragment of length %d\n",
1070			transport->tcp_reclen);
1071}
1072
1073static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
1074{
1075	if (transport->tcp_offset == transport->tcp_reclen) {
1076		transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
1077		transport->tcp_offset = 0;
1078		if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
1079			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1080			transport->tcp_flags |= TCP_RCV_COPY_XID;
1081			transport->tcp_copied = 0;
1082		}
1083	}
1084}
1085
1086static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1087{
1088	size_t len, used;
1089	char *p;
1090
1091	len = sizeof(transport->tcp_xid) - transport->tcp_offset;
1092	dprintk("RPC:       reading XID (%Zu bytes)\n", len);
1093	p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
1094	used = xdr_skb_read_bits(desc, p, len);
1095	transport->tcp_offset += used;
1096	if (used != len)
1097		return;
1098	transport->tcp_flags &= ~TCP_RCV_COPY_XID;
1099	transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
1100	transport->tcp_copied = 4;
1101	dprintk("RPC:       reading %s XID %08x\n",
1102			(transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
1103							      : "request with",
1104			ntohl(transport->tcp_xid));
1105	xs_tcp_check_fraghdr(transport);
1106}
1107
1108static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
1109				       struct xdr_skb_reader *desc)
1110{
1111	size_t len, used;
1112	u32 offset;
1113	char *p;
1114
1115	/*
1116	 * We want transport->tcp_offset to be 8 at the end of this routine
1117	 * (4 bytes for the xid and 4 bytes for the call/reply flag).
1118	 * When this function is called for the first time,
1119	 * transport->tcp_offset is 4 (after having already read the xid).
1120	 */
1121	offset = transport->tcp_offset - sizeof(transport->tcp_xid);
1122	len = sizeof(transport->tcp_calldir) - offset;
1123	dprintk("RPC:       reading CALL/REPLY flag (%Zu bytes)\n", len);
1124	p = ((char *) &transport->tcp_calldir) + offset;
1125	used = xdr_skb_read_bits(desc, p, len);
1126	transport->tcp_offset += used;
1127	if (used != len)
1128		return;
1129	transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
1130	/*
1131	 * We don't yet have the XDR buffer, so we will write the calldir
1132	 * out after we get the buffer from the 'struct rpc_rqst'
1133	 */
1134	switch (ntohl(transport->tcp_calldir)) {
1135	case RPC_REPLY:
1136		transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
1137		transport->tcp_flags |= TCP_RCV_COPY_DATA;
1138		transport->tcp_flags |= TCP_RPC_REPLY;
1139		break;
1140	case RPC_CALL:
1141		transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
1142		transport->tcp_flags |= TCP_RCV_COPY_DATA;
1143		transport->tcp_flags &= ~TCP_RPC_REPLY;
1144		break;
1145	default:
1146		dprintk("RPC:       invalid request message type\n");
1147		xs_tcp_force_close(&transport->xprt);
1148	}
1149	xs_tcp_check_fraghdr(transport);
1150}
1151
1152static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
1153				     struct xdr_skb_reader *desc,
1154				     struct rpc_rqst *req)
1155{
1156	struct sock_xprt *transport =
1157				container_of(xprt, struct sock_xprt, xprt);
1158	struct xdr_buf *rcvbuf;
1159	size_t len;
1160	ssize_t r;
1161
1162	rcvbuf = &req->rq_private_buf;
1163
1164	if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
1165		/*
1166		 * Save the RPC direction in the XDR buffer
1167		 */
1168		memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied,
1169			&transport->tcp_calldir,
1170			sizeof(transport->tcp_calldir));
1171		transport->tcp_copied += sizeof(transport->tcp_calldir);
1172		transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR;
1173	}
1174
1175	len = desc->count;
1176	if (len > transport->tcp_reclen - transport->tcp_offset) {
1177		struct xdr_skb_reader my_desc;
1178
1179		len = transport->tcp_reclen - transport->tcp_offset;
1180		memcpy(&my_desc, desc, sizeof(my_desc));
1181		my_desc.count = len;
1182		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1183					  &my_desc, xdr_skb_read_bits);
1184		desc->count -= r;
1185		desc->offset += r;
1186	} else
1187		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1188					  desc, xdr_skb_read_bits);
1189
1190	if (r > 0) {
1191		transport->tcp_copied += r;
1192		transport->tcp_offset += r;
1193	}
1194	if (r != len) {
1195		/* Error when copying to the receive buffer,
1196		 * usually because we weren't able to allocate
1197		 * additional buffer pages. All we can do now
1198		 * is turn off TCP_RCV_COPY_DATA, so the request
1199		 * will not receive any additional updates,
1200		 * and time out.
1201		 * Any remaining data from this record will
1202		 * be discarded.
1203		 */
1204		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1205		dprintk("RPC:       XID %08x truncated request\n",
1206				ntohl(transport->tcp_xid));
1207		dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
1208				"tcp_offset = %u, tcp_reclen = %u\n",
1209				xprt, transport->tcp_copied,
1210				transport->tcp_offset, transport->tcp_reclen);
1211		return;
1212	}
1213
1214	dprintk("RPC:       XID %08x read %Zd bytes\n",
1215			ntohl(transport->tcp_xid), r);
1216	dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
1217			"tcp_reclen = %u\n", xprt, transport->tcp_copied,
1218			transport->tcp_offset, transport->tcp_reclen);
1219
1220	if (transport->tcp_copied == req->rq_private_buf.buflen)
1221		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1222	else if (transport->tcp_offset == transport->tcp_reclen) {
1223		if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
1224			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1225	}
1226}
1227
1228/*
1229 * Finds the request corresponding to the RPC xid and invokes the common
1230 * tcp read code to read the data.
1231 */
1232static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1233				    struct xdr_skb_reader *desc)
1234{
1235	struct sock_xprt *transport =
1236				container_of(xprt, struct sock_xprt, xprt);
1237	struct rpc_rqst *req;
1238
1239	dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
1240
1241	/* Find and lock the request corresponding to this xid */
1242	spin_lock(&xprt->transport_lock);
1243	req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1244	if (!req) {
1245		dprintk("RPC:       XID %08x request not found!\n",
1246				ntohl(transport->tcp_xid));
1247		spin_unlock(&xprt->transport_lock);
1248		return -1;
1249	}
1250
1251	xs_tcp_read_common(xprt, desc, req);
1252
1253	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1254		xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1255
1256	spin_unlock(&xprt->transport_lock);
1257	return 0;
1258}
1259
1260#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1261/*
1262 * Obtains an rpc_rqst previously allocated and invokes the common
1263 * tcp read code to read the data.  The result is placed in the callback
1264 * queue.
1265 * If we're unable to obtain the rpc_rqst we schedule the closing of the
1266 * connection and return -1.
1267 */
1268static int xs_tcp_read_callback(struct rpc_xprt *xprt,
1269				       struct xdr_skb_reader *desc)
1270{
1271	struct sock_xprt *transport =
1272				container_of(xprt, struct sock_xprt, xprt);
1273	struct rpc_rqst *req;
1274
1275	/* Look up and lock the request corresponding to the given XID */
1276	spin_lock(&xprt->transport_lock);
1277	req = xprt_lookup_bc_request(xprt, transport->tcp_xid);
1278	if (req == NULL) {
1279		spin_unlock(&xprt->transport_lock);
1280		printk(KERN_WARNING "Callback slot table overflowed\n");
1281		xprt_force_disconnect(xprt);
1282		return -1;
1283	}
1284
1285	dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
1286	xs_tcp_read_common(xprt, desc, req);
1287
1288	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1289		xprt_complete_bc_request(req, transport->tcp_copied);
1290	spin_unlock(&xprt->transport_lock);
1291
1292	return 0;
1293}
1294
1295static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1296					struct xdr_skb_reader *desc)
1297{
1298	struct sock_xprt *transport =
1299				container_of(xprt, struct sock_xprt, xprt);
1300
1301	return (transport->tcp_flags & TCP_RPC_REPLY) ?
1302		xs_tcp_read_reply(xprt, desc) :
1303		xs_tcp_read_callback(xprt, desc);
1304}
1305#else
1306static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1307					struct xdr_skb_reader *desc)
1308{
1309	return xs_tcp_read_reply(xprt, desc);
1310}
1311#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1312
1313/*
1314 * Read data off the transport.  This can be either an RPC_CALL or an
1315 * RPC_REPLY.  Relay the processing to helper functions.
1316 */
1317static void xs_tcp_read_data(struct rpc_xprt *xprt,
1318				    struct xdr_skb_reader *desc)
1319{
1320	struct sock_xprt *transport =
1321				container_of(xprt, struct sock_xprt, xprt);
1322
1323	if (_xs_tcp_read_data(xprt, desc) == 0)
1324		xs_tcp_check_fraghdr(transport);
1325	else {
1326		/*
1327		 * The transport_lock protects the request handling.
1328		 * There's no need to hold it to update the tcp_flags.
1329		 */
1330		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1331	}
1332}
1333
1334static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1335{
1336	size_t len;
1337
1338	len = transport->tcp_reclen - transport->tcp_offset;
1339	if (len > desc->count)
1340		len = desc->count;
1341	desc->count -= len;
1342	desc->offset += len;
1343	transport->tcp_offset += len;
1344	dprintk("RPC:       discarded %Zu bytes\n", len);
1345	xs_tcp_check_fraghdr(transport);
1346}
1347
1348static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
1349{
1350	struct rpc_xprt *xprt = rd_desc->arg.data;
1351	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1352	struct xdr_skb_reader desc = {
1353		.skb	= skb,
1354		.offset	= offset,
1355		.count	= len,
1356	};
1357
1358	dprintk("RPC:       xs_tcp_data_recv started\n");
1359	do {
1360		trace_xs_tcp_data_recv(transport);
1361		/* Read in a new fragment marker if necessary */
1362		/* Can we ever really expect to get completely empty fragments? */
1363		if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
1364			xs_tcp_read_fraghdr(xprt, &desc);
1365			continue;
1366		}
1367		/* Read in the xid if necessary */
1368		if (transport->tcp_flags & TCP_RCV_COPY_XID) {
1369			xs_tcp_read_xid(transport, &desc);
1370			continue;
1371		}
1372		/* Read in the call/reply flag */
1373		if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) {
1374			xs_tcp_read_calldir(transport, &desc);
1375			continue;
1376		}
1377		/* Read in the request data */
1378		if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
1379			xs_tcp_read_data(xprt, &desc);
1380			continue;
1381		}
1382		/* Skip over any trailing bytes on short reads */
1383		xs_tcp_read_discard(transport, &desc);
1384	} while (desc.count);
1385	trace_xs_tcp_data_recv(transport);
1386	dprintk("RPC:       xs_tcp_data_recv done\n");
1387	return len - desc.count;
1388}
1389
1390/**
1391 * xs_tcp_data_ready - "data ready" callback for TCP sockets
1392 * @sk: socket with data to read
1393 * @bytes: how much data to read
1394 *
1395 */
1396static void xs_tcp_data_ready(struct sock *sk)
1397{
1398	struct rpc_xprt *xprt;
1399	read_descriptor_t rd_desc;
1400	int read;
1401	unsigned long total = 0;
1402
1403	dprintk("RPC:       xs_tcp_data_ready...\n");
1404
1405	read_lock_bh(&sk->sk_callback_lock);
1406	if (!(xprt = xprt_from_sock(sk))) {
1407		read = 0;
1408		goto out;
1409	}
1410	/* Any data means we had a useful conversation, so
1411	 * the we don't need to delay the next reconnect
1412	 */
1413	if (xprt->reestablish_timeout)
1414		xprt->reestablish_timeout = 0;
1415
1416	/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1417	rd_desc.arg.data = xprt;
1418	do {
1419		rd_desc.count = 65536;
1420		read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1421		if (read > 0)
1422			total += read;
1423	} while (read > 0);
1424out:
1425	trace_xs_tcp_data_ready(xprt, read, total);
1426	read_unlock_bh(&sk->sk_callback_lock);
1427}
1428
1429/**
1430 * xs_tcp_state_change - callback to handle TCP socket state changes
1431 * @sk: socket whose state has changed
1432 *
1433 */
1434static void xs_tcp_state_change(struct sock *sk)
1435{
1436	struct rpc_xprt *xprt;
1437	struct sock_xprt *transport;
1438
1439	read_lock_bh(&sk->sk_callback_lock);
1440	if (!(xprt = xprt_from_sock(sk)))
1441		goto out;
1442	dprintk("RPC:       xs_tcp_state_change client %p...\n", xprt);
1443	dprintk("RPC:       state %x conn %d dead %d zapped %d sk_shutdown %d\n",
1444			sk->sk_state, xprt_connected(xprt),
1445			sock_flag(sk, SOCK_DEAD),
1446			sock_flag(sk, SOCK_ZAPPED),
1447			sk->sk_shutdown);
1448
1449	transport = container_of(xprt, struct sock_xprt, xprt);
1450	trace_rpc_socket_state_change(xprt, sk->sk_socket);
1451	switch (sk->sk_state) {
1452	case TCP_ESTABLISHED:
1453		spin_lock(&xprt->transport_lock);
1454		if (!xprt_test_and_set_connected(xprt)) {
1455
1456			/* Reset TCP record info */
1457			transport->tcp_offset = 0;
1458			transport->tcp_reclen = 0;
1459			transport->tcp_copied = 0;
1460			transport->tcp_flags =
1461				TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
1462			xprt->connect_cookie++;
1463			clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
1464			xprt_clear_connecting(xprt);
1465
1466			xprt_wake_pending_tasks(xprt, -EAGAIN);
1467		}
1468		spin_unlock(&xprt->transport_lock);
1469		break;
1470	case TCP_FIN_WAIT1:
1471		/* The client initiated a shutdown of the socket */
1472		xprt->connect_cookie++;
1473		xprt->reestablish_timeout = 0;
1474		set_bit(XPRT_CLOSING, &xprt->state);
1475		smp_mb__before_atomic();
1476		clear_bit(XPRT_CONNECTED, &xprt->state);
1477		clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1478		smp_mb__after_atomic();
1479		break;
1480	case TCP_CLOSE_WAIT:
1481		/* The server initiated a shutdown of the socket */
1482		xprt->connect_cookie++;
1483		clear_bit(XPRT_CONNECTED, &xprt->state);
1484		xs_tcp_force_close(xprt);
1485	case TCP_CLOSING:
1486		/*
1487		 * If the server closed down the connection, make sure that
1488		 * we back off before reconnecting
1489		 */
1490		if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
1491			xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1492		break;
1493	case TCP_LAST_ACK:
1494		set_bit(XPRT_CLOSING, &xprt->state);
1495		smp_mb__before_atomic();
1496		clear_bit(XPRT_CONNECTED, &xprt->state);
1497		smp_mb__after_atomic();
1498		break;
1499	case TCP_CLOSE:
1500		if (test_and_clear_bit(XPRT_SOCK_CONNECTING,
1501					&transport->sock_state))
1502			xprt_clear_connecting(xprt);
1503		xs_sock_mark_closed(xprt);
1504	}
1505 out:
1506	read_unlock_bh(&sk->sk_callback_lock);
1507}
1508
1509static void xs_write_space(struct sock *sk)
1510{
1511	struct socket *sock;
1512	struct rpc_xprt *xprt;
1513
1514	if (unlikely(!(sock = sk->sk_socket)))
1515		return;
1516	clear_bit(SOCK_NOSPACE, &sock->flags);
1517
1518	if (unlikely(!(xprt = xprt_from_sock(sk))))
1519		return;
1520	if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
1521		return;
1522
1523	xprt_write_space(xprt);
1524}
1525
1526/**
1527 * xs_udp_write_space - callback invoked when socket buffer space
1528 *                             becomes available
1529 * @sk: socket whose state has changed
1530 *
1531 * Called when more output buffer space is available for this socket.
1532 * We try not to wake our writers until they can make "significant"
1533 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1534 * with a bunch of small requests.
1535 */
1536static void xs_udp_write_space(struct sock *sk)
1537{
1538	read_lock_bh(&sk->sk_callback_lock);
1539
1540	/* from net/core/sock.c:sock_def_write_space */
1541	if (sock_writeable(sk))
1542		xs_write_space(sk);
1543
1544	read_unlock_bh(&sk->sk_callback_lock);
1545}
1546
1547/**
1548 * xs_tcp_write_space - callback invoked when socket buffer space
1549 *                             becomes available
1550 * @sk: socket whose state has changed
1551 *
1552 * Called when more output buffer space is available for this socket.
1553 * We try not to wake our writers until they can make "significant"
1554 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1555 * with a bunch of small requests.
1556 */
1557static void xs_tcp_write_space(struct sock *sk)
1558{
1559	read_lock_bh(&sk->sk_callback_lock);
1560
1561	/* from net/core/stream.c:sk_stream_write_space */
1562	if (sk_stream_is_writeable(sk))
1563		xs_write_space(sk);
1564
1565	read_unlock_bh(&sk->sk_callback_lock);
1566}
1567
1568static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
1569{
1570	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1571	struct sock *sk = transport->inet;
1572
1573	if (transport->rcvsize) {
1574		sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1575		sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1576	}
1577	if (transport->sndsize) {
1578		sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1579		sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1580		sk->sk_write_space(sk);
1581	}
1582}
1583
1584/**
1585 * xs_udp_set_buffer_size - set send and receive limits
1586 * @xprt: generic transport
1587 * @sndsize: requested size of send buffer, in bytes
1588 * @rcvsize: requested size of receive buffer, in bytes
1589 *
1590 * Set socket send and receive buffer size limits.
1591 */
1592static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1593{
1594	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1595
1596	transport->sndsize = 0;
1597	if (sndsize)
1598		transport->sndsize = sndsize + 1024;
1599	transport->rcvsize = 0;
1600	if (rcvsize)
1601		transport->rcvsize = rcvsize + 1024;
1602
1603	xs_udp_do_set_buffer_size(xprt);
1604}
1605
1606/**
1607 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1608 * @task: task that timed out
1609 *
1610 * Adjust the congestion window after a retransmit timeout has occurred.
1611 */
1612static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task)
1613{
1614	xprt_adjust_cwnd(xprt, task, -ETIMEDOUT);
1615}
1616
1617static unsigned short xs_get_random_port(void)
1618{
1619	unsigned short range = xprt_max_resvport - xprt_min_resvport;
1620	unsigned short rand = (unsigned short) prandom_u32() % range;
1621	return rand + xprt_min_resvport;
1622}
1623
1624/**
1625 * xs_set_reuseaddr_port - set the socket's port and address reuse options
1626 * @sock: socket
1627 *
1628 * Note that this function has to be called on all sockets that share the
1629 * same port, and it must be called before binding.
1630 */
1631static void xs_sock_set_reuseport(struct socket *sock)
1632{
1633	int opt = 1;
1634
1635	kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEPORT,
1636			(char *)&opt, sizeof(opt));
1637}
1638
1639static unsigned short xs_sock_getport(struct socket *sock)
1640{
1641	struct sockaddr_storage buf;
1642	int buflen;
1643	unsigned short port = 0;
1644
1645	if (kernel_getsockname(sock, (struct sockaddr *)&buf, &buflen) < 0)
1646		goto out;
1647	switch (buf.ss_family) {
1648	case AF_INET6:
1649		port = ntohs(((struct sockaddr_in6 *)&buf)->sin6_port);
1650		break;
1651	case AF_INET:
1652		port = ntohs(((struct sockaddr_in *)&buf)->sin_port);
1653	}
1654out:
1655	return port;
1656}
1657
1658/**
1659 * xs_set_port - reset the port number in the remote endpoint address
1660 * @xprt: generic transport
1661 * @port: new port number
1662 *
1663 */
1664static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
1665{
1666	dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
1667
1668	rpc_set_port(xs_addr(xprt), port);
1669	xs_update_peer_port(xprt);
1670}
1671
1672static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock)
1673{
1674	if (transport->srcport == 0)
1675		transport->srcport = xs_sock_getport(sock);
1676}
1677
1678static unsigned short xs_get_srcport(struct sock_xprt *transport)
1679{
1680	unsigned short port = transport->srcport;
1681
1682	if (port == 0 && transport->xprt.resvport)
1683		port = xs_get_random_port();
1684	return port;
1685}
1686
1687static unsigned short xs_next_srcport(struct sock_xprt *transport, unsigned short port)
1688{
1689	if (transport->srcport != 0)
1690		transport->srcport = 0;
1691	if (!transport->xprt.resvport)
1692		return 0;
1693	if (port <= xprt_min_resvport || port > xprt_max_resvport)
1694		return xprt_max_resvport;
1695	return --port;
1696}
1697static int xs_bind(struct sock_xprt *transport, struct socket *sock)
1698{
1699	struct sockaddr_storage myaddr;
1700	int err, nloop = 0;
1701	unsigned short port = xs_get_srcport(transport);
1702	unsigned short last;
1703
1704	/*
1705	 * If we are asking for any ephemeral port (i.e. port == 0 &&
1706	 * transport->xprt.resvport == 0), don't bind.  Let the local
1707	 * port selection happen implicitly when the socket is used
1708	 * (for example at connect time).
1709	 *
1710	 * This ensures that we can continue to establish TCP
1711	 * connections even when all local ephemeral ports are already
1712	 * a part of some TCP connection.  This makes no difference
1713	 * for UDP sockets, but also doens't harm them.
1714	 *
1715	 * If we're asking for any reserved port (i.e. port == 0 &&
1716	 * transport->xprt.resvport == 1) xs_get_srcport above will
1717	 * ensure that port is non-zero and we will bind as needed.
1718	 */
1719	if (port == 0)
1720		return 0;
1721
1722	memcpy(&myaddr, &transport->srcaddr, transport->xprt.addrlen);
1723	do {
1724		rpc_set_port((struct sockaddr *)&myaddr, port);
1725		err = kernel_bind(sock, (struct sockaddr *)&myaddr,
1726				transport->xprt.addrlen);
1727		if (err == 0) {
1728			transport->srcport = port;
1729			break;
1730		}
1731		last = port;
1732		port = xs_next_srcport(transport, port);
1733		if (port > last)
1734			nloop++;
1735	} while (err == -EADDRINUSE && nloop != 2);
1736
1737	if (myaddr.ss_family == AF_INET)
1738		dprintk("RPC:       %s %pI4:%u: %s (%d)\n", __func__,
1739				&((struct sockaddr_in *)&myaddr)->sin_addr,
1740				port, err ? "failed" : "ok", err);
1741	else
1742		dprintk("RPC:       %s %pI6:%u: %s (%d)\n", __func__,
1743				&((struct sockaddr_in6 *)&myaddr)->sin6_addr,
1744				port, err ? "failed" : "ok", err);
1745	return err;
1746}
1747
1748/*
1749 * We don't support autobind on AF_LOCAL sockets
1750 */
1751static void xs_local_rpcbind(struct rpc_task *task)
1752{
1753	rcu_read_lock();
1754	xprt_set_bound(rcu_dereference(task->tk_client->cl_xprt));
1755	rcu_read_unlock();
1756}
1757
1758static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port)
1759{
1760}
1761
1762#ifdef CONFIG_DEBUG_LOCK_ALLOC
1763static struct lock_class_key xs_key[2];
1764static struct lock_class_key xs_slock_key[2];
1765
1766static inline void xs_reclassify_socketu(struct socket *sock)
1767{
1768	struct sock *sk = sock->sk;
1769
1770	sock_lock_init_class_and_name(sk, "slock-AF_LOCAL-RPC",
1771		&xs_slock_key[1], "sk_lock-AF_LOCAL-RPC", &xs_key[1]);
1772}
1773
1774static inline void xs_reclassify_socket4(struct socket *sock)
1775{
1776	struct sock *sk = sock->sk;
1777
1778	sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
1779		&xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]);
1780}
1781
1782static inline void xs_reclassify_socket6(struct socket *sock)
1783{
1784	struct sock *sk = sock->sk;
1785
1786	sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
1787		&xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
1788}
1789
1790static inline void xs_reclassify_socket(int family, struct socket *sock)
1791{
1792	WARN_ON_ONCE(sock_owned_by_user(sock->sk));
1793	if (sock_owned_by_user(sock->sk))
1794		return;
1795
1796	switch (family) {
1797	case AF_LOCAL:
1798		xs_reclassify_socketu(sock);
1799		break;
1800	case AF_INET:
1801		xs_reclassify_socket4(sock);
1802		break;
1803	case AF_INET6:
1804		xs_reclassify_socket6(sock);
1805		break;
1806	}
1807}
1808#else
1809static inline void xs_reclassify_socketu(struct socket *sock)
1810{
1811}
1812
1813static inline void xs_reclassify_socket4(struct socket *sock)
1814{
1815}
1816
1817static inline void xs_reclassify_socket6(struct socket *sock)
1818{
1819}
1820
1821static inline void xs_reclassify_socket(int family, struct socket *sock)
1822{
1823}
1824#endif
1825
1826static void xs_dummy_setup_socket(struct work_struct *work)
1827{
1828}
1829
1830static struct socket *xs_create_sock(struct rpc_xprt *xprt,
1831		struct sock_xprt *transport, int family, int type,
1832		int protocol, bool reuseport)
1833{
1834	struct socket *sock;
1835	int err;
1836
1837	err = __sock_create(xprt->xprt_net, family, type, protocol, &sock, 1);
1838	if (err < 0) {
1839		dprintk("RPC:       can't create %d transport socket (%d).\n",
1840				protocol, -err);
1841		goto out;
1842	}
1843	xs_reclassify_socket(family, sock);
1844
1845	if (reuseport)
1846		xs_sock_set_reuseport(sock);
1847
1848	err = xs_bind(transport, sock);
1849	if (err) {
1850		sock_release(sock);
1851		goto out;
1852	}
1853
1854	return sock;
1855out:
1856	return ERR_PTR(err);
1857}
1858
1859static int xs_local_finish_connecting(struct rpc_xprt *xprt,
1860				      struct socket *sock)
1861{
1862	struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
1863									xprt);
1864
1865	if (!transport->inet) {
1866		struct sock *sk = sock->sk;
1867
1868		write_lock_bh(&sk->sk_callback_lock);
1869
1870		xs_save_old_callbacks(transport, sk);
1871
1872		sk->sk_user_data = xprt;
1873		sk->sk_data_ready = xs_local_data_ready;
1874		sk->sk_write_space = xs_udp_write_space;
1875		sk->sk_error_report = xs_error_report;
1876		sk->sk_allocation = GFP_ATOMIC;
1877
1878		xprt_clear_connected(xprt);
1879
1880		/* Reset to new socket */
1881		transport->sock = sock;
1882		transport->inet = sk;
1883
1884		write_unlock_bh(&sk->sk_callback_lock);
1885	}
1886
1887	/* Tell the socket layer to start connecting... */
1888	xprt->stat.connect_count++;
1889	xprt->stat.connect_start = jiffies;
1890	return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0);
1891}
1892
1893/**
1894 * xs_local_setup_socket - create AF_LOCAL socket, connect to a local endpoint
1895 * @xprt: RPC transport to connect
1896 * @transport: socket transport to connect
1897 * @create_sock: function to create a socket of the correct type
1898 */
1899static int xs_local_setup_socket(struct sock_xprt *transport)
1900{
1901	struct rpc_xprt *xprt = &transport->xprt;
1902	struct socket *sock;
1903	int status = -EIO;
1904
1905	status = __sock_create(xprt->xprt_net, AF_LOCAL,
1906					SOCK_STREAM, 0, &sock, 1);
1907	if (status < 0) {
1908		dprintk("RPC:       can't create AF_LOCAL "
1909			"transport socket (%d).\n", -status);
1910		goto out;
1911	}
1912	xs_reclassify_socketu(sock);
1913
1914	dprintk("RPC:       worker connecting xprt %p via AF_LOCAL to %s\n",
1915			xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1916
1917	status = xs_local_finish_connecting(xprt, sock);
1918	trace_rpc_socket_connect(xprt, sock, status);
1919	switch (status) {
1920	case 0:
1921		dprintk("RPC:       xprt %p connected to %s\n",
1922				xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1923		xprt_set_connected(xprt);
1924	case -ENOBUFS:
1925		break;
1926	case -ENOENT:
1927		dprintk("RPC:       xprt %p: socket %s does not exist\n",
1928				xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1929		break;
1930	case -ECONNREFUSED:
1931		dprintk("RPC:       xprt %p: connection refused for %s\n",
1932				xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1933		break;
1934	default:
1935		printk(KERN_ERR "%s: unhandled error (%d) connecting to %s\n",
1936				__func__, -status,
1937				xprt->address_strings[RPC_DISPLAY_ADDR]);
1938	}
1939
1940out:
1941	xprt_clear_connecting(xprt);
1942	xprt_wake_pending_tasks(xprt, status);
1943	return status;
1944}
1945
1946static void xs_local_connect(struct rpc_xprt *xprt, struct rpc_task *task)
1947{
1948	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1949	int ret;
1950
1951	 if (RPC_IS_ASYNC(task)) {
1952		/*
1953		 * We want the AF_LOCAL connect to be resolved in the
1954		 * filesystem namespace of the process making the rpc
1955		 * call.  Thus we connect synchronously.
1956		 *
1957		 * If we want to support asynchronous AF_LOCAL calls,
1958		 * we'll need to figure out how to pass a namespace to
1959		 * connect.
1960		 */
1961		rpc_exit(task, -ENOTCONN);
1962		return;
1963	}
1964	ret = xs_local_setup_socket(transport);
1965	if (ret && !RPC_IS_SOFTCONN(task))
1966		msleep_interruptible(15000);
1967}
1968
1969#ifdef CONFIG_SUNRPC_SWAP
1970static void xs_set_memalloc(struct rpc_xprt *xprt)
1971{
1972	struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
1973			xprt);
1974
1975	if (xprt->swapper)
1976		sk_set_memalloc(transport->inet);
1977}
1978
1979/**
1980 * xs_swapper - Tag this transport as being used for swap.
1981 * @xprt: transport to tag
1982 * @enable: enable/disable
1983 *
1984 */
1985int xs_swapper(struct rpc_xprt *xprt, int enable)
1986{
1987	struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
1988			xprt);
1989	int err = 0;
1990
1991	if (enable) {
1992		xprt->swapper++;
1993		xs_set_memalloc(xprt);
1994	} else if (xprt->swapper) {
1995		xprt->swapper--;
1996		sk_clear_memalloc(transport->inet);
1997	}
1998
1999	return err;
2000}
2001EXPORT_SYMBOL_GPL(xs_swapper);
2002#else
2003static void xs_set_memalloc(struct rpc_xprt *xprt)
2004{
2005}
2006#endif
2007
2008static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
2009{
2010	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2011
2012	if (!transport->inet) {
2013		struct sock *sk = sock->sk;
2014
2015		write_lock_bh(&sk->sk_callback_lock);
2016
2017		xs_save_old_callbacks(transport, sk);
2018
2019		sk->sk_user_data = xprt;
2020		sk->sk_data_ready = xs_udp_data_ready;
2021		sk->sk_write_space = xs_udp_write_space;
2022		sk->sk_allocation = GFP_ATOMIC;
2023
2024		xprt_set_connected(xprt);
2025
2026		/* Reset to new socket */
2027		transport->sock = sock;
2028		transport->inet = sk;
2029
2030		xs_set_memalloc(xprt);
2031
2032		write_unlock_bh(&sk->sk_callback_lock);
2033	}
2034	xs_udp_do_set_buffer_size(xprt);
2035}
2036
2037static void xs_udp_setup_socket(struct work_struct *work)
2038{
2039	struct sock_xprt *transport =
2040		container_of(work, struct sock_xprt, connect_worker.work);
2041	struct rpc_xprt *xprt = &transport->xprt;
2042	struct socket *sock = transport->sock;
2043	int status = -EIO;
2044
2045	sock = xs_create_sock(xprt, transport,
2046			xs_addr(xprt)->sa_family, SOCK_DGRAM,
2047			IPPROTO_UDP, false);
2048	if (IS_ERR(sock))
2049		goto out;
2050
2051	dprintk("RPC:       worker connecting xprt %p via %s to "
2052				"%s (port %s)\n", xprt,
2053			xprt->address_strings[RPC_DISPLAY_PROTO],
2054			xprt->address_strings[RPC_DISPLAY_ADDR],
2055			xprt->address_strings[RPC_DISPLAY_PORT]);
2056
2057	xs_udp_finish_connecting(xprt, sock);
2058	trace_rpc_socket_connect(xprt, sock, 0);
2059	status = 0;
2060out:
2061	xprt_unlock_connect(xprt, transport);
2062	xprt_clear_connecting(xprt);
2063	xprt_wake_pending_tasks(xprt, status);
2064}
2065
2066static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
2067{
2068	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2069	int ret = -ENOTCONN;
2070
2071	if (!transport->inet) {
2072		struct sock *sk = sock->sk;
2073		unsigned int keepidle = xprt->timeout->to_initval / HZ;
2074		unsigned int keepcnt = xprt->timeout->to_retries + 1;
2075		unsigned int opt_on = 1;
2076
2077		/* TCP Keepalive options */
2078		kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
2079				(char *)&opt_on, sizeof(opt_on));
2080		kernel_setsockopt(sock, SOL_TCP, TCP_KEEPIDLE,
2081				(char *)&keepidle, sizeof(keepidle));
2082		kernel_setsockopt(sock, SOL_TCP, TCP_KEEPINTVL,
2083				(char *)&keepidle, sizeof(keepidle));
2084		kernel_setsockopt(sock, SOL_TCP, TCP_KEEPCNT,
2085				(char *)&keepcnt, sizeof(keepcnt));
2086
2087		write_lock_bh(&sk->sk_callback_lock);
2088
2089		xs_save_old_callbacks(transport, sk);
2090
2091		sk->sk_user_data = xprt;
2092		sk->sk_data_ready = xs_tcp_data_ready;
2093		sk->sk_state_change = xs_tcp_state_change;
2094		sk->sk_write_space = xs_tcp_write_space;
2095		sk->sk_error_report = xs_error_report;
2096		sk->sk_allocation = GFP_ATOMIC;
2097
2098		/* socket options */
2099		sock_reset_flag(sk, SOCK_LINGER);
2100		tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
2101
2102		xprt_clear_connected(xprt);
2103
2104		/* Reset to new socket */
2105		transport->sock = sock;
2106		transport->inet = sk;
2107
2108		write_unlock_bh(&sk->sk_callback_lock);
2109	}
2110
2111	if (!xprt_bound(xprt))
2112		goto out;
2113
2114	xs_set_memalloc(xprt);
2115
2116	/* Tell the socket layer to start connecting... */
2117	xprt->stat.connect_count++;
2118	xprt->stat.connect_start = jiffies;
2119	set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
2120	ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
2121	switch (ret) {
2122	case 0:
2123		xs_set_srcport(transport, sock);
2124	case -EINPROGRESS:
2125		/* SYN_SENT! */
2126		if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
2127			xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2128	}
2129out:
2130	return ret;
2131}
2132
2133/**
2134 * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint
2135 * @xprt: RPC transport to connect
2136 * @transport: socket transport to connect
2137 * @create_sock: function to create a socket of the correct type
2138 *
2139 * Invoked by a work queue tasklet.
2140 */
2141static void xs_tcp_setup_socket(struct work_struct *work)
2142{
2143	struct sock_xprt *transport =
2144		container_of(work, struct sock_xprt, connect_worker.work);
2145	struct socket *sock = transport->sock;
2146	struct rpc_xprt *xprt = &transport->xprt;
2147	int status = -EIO;
2148
2149	if (!sock) {
2150		sock = xs_create_sock(xprt, transport,
2151				xs_addr(xprt)->sa_family, SOCK_STREAM,
2152				IPPROTO_TCP, true);
2153		if (IS_ERR(sock)) {
2154			status = PTR_ERR(sock);
2155			goto out;
2156		}
2157	}
2158
2159	dprintk("RPC:       worker connecting xprt %p via %s to "
2160				"%s (port %s)\n", xprt,
2161			xprt->address_strings[RPC_DISPLAY_PROTO],
2162			xprt->address_strings[RPC_DISPLAY_ADDR],
2163			xprt->address_strings[RPC_DISPLAY_PORT]);
2164
2165	status = xs_tcp_finish_connecting(xprt, sock);
2166	trace_rpc_socket_connect(xprt, sock, status);
2167	dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
2168			xprt, -status, xprt_connected(xprt),
2169			sock->sk->sk_state);
2170	switch (status) {
2171	default:
2172		printk("%s: connect returned unhandled error %d\n",
2173			__func__, status);
2174	case -EADDRNOTAVAIL:
2175		/* We're probably in TIME_WAIT. Get rid of existing socket,
2176		 * and retry
2177		 */
2178		xs_tcp_force_close(xprt);
2179		break;
2180	case 0:
2181	case -EINPROGRESS:
2182	case -EALREADY:
2183		xprt_unlock_connect(xprt, transport);
2184		return;
2185	case -EINVAL:
2186		/* Happens, for instance, if the user specified a link
2187		 * local IPv6 address without a scope-id.
2188		 */
2189	case -ECONNREFUSED:
2190	case -ECONNRESET:
2191	case -ENETUNREACH:
2192	case -EADDRINUSE:
2193	case -ENOBUFS:
2194		/* retry with existing socket, after a delay */
2195		xs_tcp_force_close(xprt);
2196		goto out;
2197	}
2198	status = -EAGAIN;
2199out:
2200	xprt_unlock_connect(xprt, transport);
2201	xprt_clear_connecting(xprt);
2202	xprt_wake_pending_tasks(xprt, status);
2203}
2204
2205/**
2206 * xs_connect - connect a socket to a remote endpoint
2207 * @xprt: pointer to transport structure
2208 * @task: address of RPC task that manages state of connect request
2209 *
2210 * TCP: If the remote end dropped the connection, delay reconnecting.
2211 *
2212 * UDP socket connects are synchronous, but we use a work queue anyway
2213 * to guarantee that even unprivileged user processes can set up a
2214 * socket on a privileged port.
2215 *
2216 * If a UDP socket connect fails, the delay behavior here prevents
2217 * retry floods (hard mounts).
2218 */
2219static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
2220{
2221	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2222
2223	WARN_ON_ONCE(!xprt_lock_connect(xprt, task, transport));
2224
2225	if (transport->sock != NULL) {
2226		dprintk("RPC:       xs_connect delayed xprt %p for %lu "
2227				"seconds\n",
2228				xprt, xprt->reestablish_timeout / HZ);
2229
2230		/* Start by resetting any existing state */
2231		xs_reset_transport(transport);
2232
2233		queue_delayed_work(rpciod_workqueue,
2234				   &transport->connect_worker,
2235				   xprt->reestablish_timeout);
2236		xprt->reestablish_timeout <<= 1;
2237		if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
2238			xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2239		if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
2240			xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
2241	} else {
2242		dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
2243		queue_delayed_work(rpciod_workqueue,
2244				   &transport->connect_worker, 0);
2245	}
2246}
2247
2248/**
2249 * xs_local_print_stats - display AF_LOCAL socket-specifc stats
2250 * @xprt: rpc_xprt struct containing statistics
2251 * @seq: output file
2252 *
2253 */
2254static void xs_local_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2255{
2256	long idle_time = 0;
2257
2258	if (xprt_connected(xprt))
2259		idle_time = (long)(jiffies - xprt->last_used) / HZ;
2260
2261	seq_printf(seq, "\txprt:\tlocal %lu %lu %lu %ld %lu %lu %lu "
2262			"%llu %llu %lu %llu %llu\n",
2263			xprt->stat.bind_count,
2264			xprt->stat.connect_count,
2265			xprt->stat.connect_time,
2266			idle_time,
2267			xprt->stat.sends,
2268			xprt->stat.recvs,
2269			xprt->stat.bad_xids,
2270			xprt->stat.req_u,
2271			xprt->stat.bklog_u,
2272			xprt->stat.max_slots,
2273			xprt->stat.sending_u,
2274			xprt->stat.pending_u);
2275}
2276
2277/**
2278 * xs_udp_print_stats - display UDP socket-specifc stats
2279 * @xprt: rpc_xprt struct containing statistics
2280 * @seq: output file
2281 *
2282 */
2283static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2284{
2285	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2286
2287	seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %llu %llu "
2288			"%lu %llu %llu\n",
2289			transport->srcport,
2290			xprt->stat.bind_count,
2291			xprt->stat.sends,
2292			xprt->stat.recvs,
2293			xprt->stat.bad_xids,
2294			xprt->stat.req_u,
2295			xprt->stat.bklog_u,
2296			xprt->stat.max_slots,
2297			xprt->stat.sending_u,
2298			xprt->stat.pending_u);
2299}
2300
2301/**
2302 * xs_tcp_print_stats - display TCP socket-specifc stats
2303 * @xprt: rpc_xprt struct containing statistics
2304 * @seq: output file
2305 *
2306 */
2307static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2308{
2309	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2310	long idle_time = 0;
2311
2312	if (xprt_connected(xprt))
2313		idle_time = (long)(jiffies - xprt->last_used) / HZ;
2314
2315	seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu "
2316			"%llu %llu %lu %llu %llu\n",
2317			transport->srcport,
2318			xprt->stat.bind_count,
2319			xprt->stat.connect_count,
2320			xprt->stat.connect_time,
2321			idle_time,
2322			xprt->stat.sends,
2323			xprt->stat.recvs,
2324			xprt->stat.bad_xids,
2325			xprt->stat.req_u,
2326			xprt->stat.bklog_u,
2327			xprt->stat.max_slots,
2328			xprt->stat.sending_u,
2329			xprt->stat.pending_u);
2330}
2331
2332/*
2333 * Allocate a bunch of pages for a scratch buffer for the rpc code. The reason
2334 * we allocate pages instead doing a kmalloc like rpc_malloc is because we want
2335 * to use the server side send routines.
2336 */
2337static void *bc_malloc(struct rpc_task *task, size_t size)
2338{
2339	struct page *page;
2340	struct rpc_buffer *buf;
2341
2342	WARN_ON_ONCE(size > PAGE_SIZE - sizeof(struct rpc_buffer));
2343	if (size > PAGE_SIZE - sizeof(struct rpc_buffer))
2344		return NULL;
2345
2346	page = alloc_page(GFP_KERNEL);
2347	if (!page)
2348		return NULL;
2349
2350	buf = page_address(page);
2351	buf->len = PAGE_SIZE;
2352
2353	return buf->data;
2354}
2355
2356/*
2357 * Free the space allocated in the bc_alloc routine
2358 */
2359static void bc_free(void *buffer)
2360{
2361	struct rpc_buffer *buf;
2362
2363	if (!buffer)
2364		return;
2365
2366	buf = container_of(buffer, struct rpc_buffer, data);
2367	free_page((unsigned long)buf);
2368}
2369
2370/*
2371 * Use the svc_sock to send the callback. Must be called with svsk->sk_mutex
2372 * held. Borrows heavily from svc_tcp_sendto and xs_tcp_send_request.
2373 */
2374static int bc_sendto(struct rpc_rqst *req)
2375{
2376	int len;
2377	struct xdr_buf *xbufp = &req->rq_snd_buf;
2378	struct rpc_xprt *xprt = req->rq_xprt;
2379	struct sock_xprt *transport =
2380				container_of(xprt, struct sock_xprt, xprt);
2381	struct socket *sock = transport->sock;
2382	unsigned long headoff;
2383	unsigned long tailoff;
2384
2385	xs_encode_stream_record_marker(xbufp);
2386
2387	tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK;
2388	headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK;
2389	len = svc_send_common(sock, xbufp,
2390			      virt_to_page(xbufp->head[0].iov_base), headoff,
2391			      xbufp->tail[0].iov_base, tailoff);
2392
2393	if (len != xbufp->len) {
2394		printk(KERN_NOTICE "Error sending entire callback!\n");
2395		len = -EAGAIN;
2396	}
2397
2398	return len;
2399}
2400
2401/*
2402 * The send routine. Borrows from svc_send
2403 */
2404static int bc_send_request(struct rpc_task *task)
2405{
2406	struct rpc_rqst *req = task->tk_rqstp;
2407	struct svc_xprt	*xprt;
2408	u32                     len;
2409
2410	dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid));
2411	/*
2412	 * Get the server socket associated with this callback xprt
2413	 */
2414	xprt = req->rq_xprt->bc_xprt;
2415
2416	/*
2417	 * Grab the mutex to serialize data as the connection is shared
2418	 * with the fore channel
2419	 */
2420	if (!mutex_trylock(&xprt->xpt_mutex)) {
2421		rpc_sleep_on(&xprt->xpt_bc_pending, task, NULL);
2422		if (!mutex_trylock(&xprt->xpt_mutex))
2423			return -EAGAIN;
2424		rpc_wake_up_queued_task(&xprt->xpt_bc_pending, task);
2425	}
2426	if (test_bit(XPT_DEAD, &xprt->xpt_flags))
2427		len = -ENOTCONN;
2428	else
2429		len = bc_sendto(req);
2430	mutex_unlock(&xprt->xpt_mutex);
2431
2432	if (len > 0)
2433		len = 0;
2434
2435	return len;
2436}
2437
2438/*
2439 * The close routine. Since this is client initiated, we do nothing
2440 */
2441
2442static void bc_close(struct rpc_xprt *xprt)
2443{
2444}
2445
2446/*
2447 * The xprt destroy routine. Again, because this connection is client
2448 * initiated, we do nothing
2449 */
2450
2451static void bc_destroy(struct rpc_xprt *xprt)
2452{
2453	dprintk("RPC:       bc_destroy xprt %p\n", xprt);
2454
2455	xs_xprt_free(xprt);
2456	module_put(THIS_MODULE);
2457}
2458
2459static struct rpc_xprt_ops xs_local_ops = {
2460	.reserve_xprt		= xprt_reserve_xprt,
2461	.release_xprt		= xs_tcp_release_xprt,
2462	.alloc_slot		= xprt_alloc_slot,
2463	.rpcbind		= xs_local_rpcbind,
2464	.set_port		= xs_local_set_port,
2465	.connect		= xs_local_connect,
2466	.buf_alloc		= rpc_malloc,
2467	.buf_free		= rpc_free,
2468	.send_request		= xs_local_send_request,
2469	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
2470	.close			= xs_close,
2471	.destroy		= xs_destroy,
2472	.print_stats		= xs_local_print_stats,
2473};
2474
2475static struct rpc_xprt_ops xs_udp_ops = {
2476	.set_buffer_size	= xs_udp_set_buffer_size,
2477	.reserve_xprt		= xprt_reserve_xprt_cong,
2478	.release_xprt		= xprt_release_xprt_cong,
2479	.alloc_slot		= xprt_alloc_slot,
2480	.rpcbind		= rpcb_getport_async,
2481	.set_port		= xs_set_port,
2482	.connect		= xs_connect,
2483	.buf_alloc		= rpc_malloc,
2484	.buf_free		= rpc_free,
2485	.send_request		= xs_udp_send_request,
2486	.set_retrans_timeout	= xprt_set_retrans_timeout_rtt,
2487	.timer			= xs_udp_timer,
2488	.release_request	= xprt_release_rqst_cong,
2489	.close			= xs_close,
2490	.destroy		= xs_destroy,
2491	.print_stats		= xs_udp_print_stats,
2492};
2493
2494static struct rpc_xprt_ops xs_tcp_ops = {
2495	.reserve_xprt		= xprt_reserve_xprt,
2496	.release_xprt		= xs_tcp_release_xprt,
2497	.alloc_slot		= xprt_lock_and_alloc_slot,
2498	.rpcbind		= rpcb_getport_async,
2499	.set_port		= xs_set_port,
2500	.connect		= xs_connect,
2501	.buf_alloc		= rpc_malloc,
2502	.buf_free		= rpc_free,
2503	.send_request		= xs_tcp_send_request,
2504	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
2505	.close			= xs_tcp_shutdown,
2506	.destroy		= xs_destroy,
2507	.print_stats		= xs_tcp_print_stats,
2508};
2509
2510/*
2511 * The rpc_xprt_ops for the server backchannel
2512 */
2513
2514static struct rpc_xprt_ops bc_tcp_ops = {
2515	.reserve_xprt		= xprt_reserve_xprt,
2516	.release_xprt		= xprt_release_xprt,
2517	.alloc_slot		= xprt_alloc_slot,
2518	.buf_alloc		= bc_malloc,
2519	.buf_free		= bc_free,
2520	.send_request		= bc_send_request,
2521	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
2522	.close			= bc_close,
2523	.destroy		= bc_destroy,
2524	.print_stats		= xs_tcp_print_stats,
2525};
2526
2527static int xs_init_anyaddr(const int family, struct sockaddr *sap)
2528{
2529	static const struct sockaddr_in sin = {
2530		.sin_family		= AF_INET,
2531		.sin_addr.s_addr	= htonl(INADDR_ANY),
2532	};
2533	static const struct sockaddr_in6 sin6 = {
2534		.sin6_family		= AF_INET6,
2535		.sin6_addr		= IN6ADDR_ANY_INIT,
2536	};
2537
2538	switch (family) {
2539	case AF_LOCAL:
2540		break;
2541	case AF_INET:
2542		memcpy(sap, &sin, sizeof(sin));
2543		break;
2544	case AF_INET6:
2545		memcpy(sap, &sin6, sizeof(sin6));
2546		break;
2547	default:
2548		dprintk("RPC:       %s: Bad address family\n", __func__);
2549		return -EAFNOSUPPORT;
2550	}
2551	return 0;
2552}
2553
2554static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
2555				      unsigned int slot_table_size,
2556				      unsigned int max_slot_table_size)
2557{
2558	struct rpc_xprt *xprt;
2559	struct sock_xprt *new;
2560
2561	if (args->addrlen > sizeof(xprt->addr)) {
2562		dprintk("RPC:       xs_setup_xprt: address too large\n");
2563		return ERR_PTR(-EBADF);
2564	}
2565
2566	xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size,
2567			max_slot_table_size);
2568	if (xprt == NULL) {
2569		dprintk("RPC:       xs_setup_xprt: couldn't allocate "
2570				"rpc_xprt\n");
2571		return ERR_PTR(-ENOMEM);
2572	}
2573
2574	new = container_of(xprt, struct sock_xprt, xprt);
2575	memcpy(&xprt->addr, args->dstaddr, args->addrlen);
2576	xprt->addrlen = args->addrlen;
2577	if (args->srcaddr)
2578		memcpy(&new->srcaddr, args->srcaddr, args->addrlen);
2579	else {
2580		int err;
2581		err = xs_init_anyaddr(args->dstaddr->sa_family,
2582					(struct sockaddr *)&new->srcaddr);
2583		if (err != 0) {
2584			xprt_free(xprt);
2585			return ERR_PTR(err);
2586		}
2587	}
2588
2589	return xprt;
2590}
2591
2592static const struct rpc_timeout xs_local_default_timeout = {
2593	.to_initval = 10 * HZ,
2594	.to_maxval = 10 * HZ,
2595	.to_retries = 2,
2596};
2597
2598/**
2599 * xs_setup_local - Set up transport to use an AF_LOCAL socket
2600 * @args: rpc transport creation arguments
2601 *
2602 * AF_LOCAL is a "tpi_cots_ord" transport, just like TCP
2603 */
2604static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
2605{
2606	struct sockaddr_un *sun = (struct sockaddr_un *)args->dstaddr;
2607	struct sock_xprt *transport;
2608	struct rpc_xprt *xprt;
2609	struct rpc_xprt *ret;
2610
2611	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
2612			xprt_max_tcp_slot_table_entries);
2613	if (IS_ERR(xprt))
2614		return xprt;
2615	transport = container_of(xprt, struct sock_xprt, xprt);
2616
2617	xprt->prot = 0;
2618	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2619	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2620
2621	xprt->bind_timeout = XS_BIND_TO;
2622	xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2623	xprt->idle_timeout = XS_IDLE_DISC_TO;
2624
2625	xprt->ops = &xs_local_ops;
2626	xprt->timeout = &xs_local_default_timeout;
2627
2628	INIT_DELAYED_WORK(&transport->connect_worker,
2629			xs_dummy_setup_socket);
2630
2631	switch (sun->sun_family) {
2632	case AF_LOCAL:
2633		if (sun->sun_path[0] != '/') {
2634			dprintk("RPC:       bad AF_LOCAL address: %s\n",
2635					sun->sun_path);
2636			ret = ERR_PTR(-EINVAL);
2637			goto out_err;
2638		}
2639		xprt_set_bound(xprt);
2640		xs_format_peer_addresses(xprt, "local", RPCBIND_NETID_LOCAL);
2641		ret = ERR_PTR(xs_local_setup_socket(transport));
2642		if (ret)
2643			goto out_err;
2644		break;
2645	default:
2646		ret = ERR_PTR(-EAFNOSUPPORT);
2647		goto out_err;
2648	}
2649
2650	dprintk("RPC:       set up xprt to %s via AF_LOCAL\n",
2651			xprt->address_strings[RPC_DISPLAY_ADDR]);
2652
2653	if (try_module_get(THIS_MODULE))
2654		return xprt;
2655	ret = ERR_PTR(-EINVAL);
2656out_err:
2657	xs_xprt_free(xprt);
2658	return ret;
2659}
2660
2661static const struct rpc_timeout xs_udp_default_timeout = {
2662	.to_initval = 5 * HZ,
2663	.to_maxval = 30 * HZ,
2664	.to_increment = 5 * HZ,
2665	.to_retries = 5,
2666};
2667
2668/**
2669 * xs_setup_udp - Set up transport to use a UDP socket
2670 * @args: rpc transport creation arguments
2671 *
2672 */
2673static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
2674{
2675	struct sockaddr *addr = args->dstaddr;
2676	struct rpc_xprt *xprt;
2677	struct sock_xprt *transport;
2678	struct rpc_xprt *ret;
2679
2680	xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries,
2681			xprt_udp_slot_table_entries);
2682	if (IS_ERR(xprt))
2683		return xprt;
2684	transport = container_of(xprt, struct sock_xprt, xprt);
2685
2686	xprt->prot = IPPROTO_UDP;
2687	xprt->tsh_size = 0;
2688	/* XXX: header size can vary due to auth type, IPv6, etc. */
2689	xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
2690
2691	xprt->bind_timeout = XS_BIND_TO;
2692	xprt->reestablish_timeout = XS_UDP_REEST_TO;
2693	xprt->idle_timeout = XS_IDLE_DISC_TO;
2694
2695	xprt->ops = &xs_udp_ops;
2696
2697	xprt->timeout = &xs_udp_default_timeout;
2698
2699	switch (addr->sa_family) {
2700	case AF_INET:
2701		if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2702			xprt_set_bound(xprt);
2703
2704		INIT_DELAYED_WORK(&transport->connect_worker,
2705					xs_udp_setup_socket);
2706		xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
2707		break;
2708	case AF_INET6:
2709		if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2710			xprt_set_bound(xprt);
2711
2712		INIT_DELAYED_WORK(&transport->connect_worker,
2713					xs_udp_setup_socket);
2714		xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
2715		break;
2716	default:
2717		ret = ERR_PTR(-EAFNOSUPPORT);
2718		goto out_err;
2719	}
2720
2721	if (xprt_bound(xprt))
2722		dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2723				xprt->address_strings[RPC_DISPLAY_ADDR],
2724				xprt->address_strings[RPC_DISPLAY_PORT],
2725				xprt->address_strings[RPC_DISPLAY_PROTO]);
2726	else
2727		dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2728				xprt->address_strings[RPC_DISPLAY_ADDR],
2729				xprt->address_strings[RPC_DISPLAY_PROTO]);
2730
2731	if (try_module_get(THIS_MODULE))
2732		return xprt;
2733	ret = ERR_PTR(-EINVAL);
2734out_err:
2735	xs_xprt_free(xprt);
2736	return ret;
2737}
2738
2739static const struct rpc_timeout xs_tcp_default_timeout = {
2740	.to_initval = 60 * HZ,
2741	.to_maxval = 60 * HZ,
2742	.to_retries = 2,
2743};
2744
2745/**
2746 * xs_setup_tcp - Set up transport to use a TCP socket
2747 * @args: rpc transport creation arguments
2748 *
2749 */
2750static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
2751{
2752	struct sockaddr *addr = args->dstaddr;
2753	struct rpc_xprt *xprt;
2754	struct sock_xprt *transport;
2755	struct rpc_xprt *ret;
2756	unsigned int max_slot_table_size = xprt_max_tcp_slot_table_entries;
2757
2758	if (args->flags & XPRT_CREATE_INFINITE_SLOTS)
2759		max_slot_table_size = RPC_MAX_SLOT_TABLE_LIMIT;
2760
2761	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
2762			max_slot_table_size);
2763	if (IS_ERR(xprt))
2764		return xprt;
2765	transport = container_of(xprt, struct sock_xprt, xprt);
2766
2767	xprt->prot = IPPROTO_TCP;
2768	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2769	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2770
2771	xprt->bind_timeout = XS_BIND_TO;
2772	xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2773	xprt->idle_timeout = XS_IDLE_DISC_TO;
2774
2775	xprt->ops = &xs_tcp_ops;
2776	xprt->timeout = &xs_tcp_default_timeout;
2777
2778	switch (addr->sa_family) {
2779	case AF_INET:
2780		if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2781			xprt_set_bound(xprt);
2782
2783		INIT_DELAYED_WORK(&transport->connect_worker,
2784					xs_tcp_setup_socket);
2785		xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
2786		break;
2787	case AF_INET6:
2788		if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2789			xprt_set_bound(xprt);
2790
2791		INIT_DELAYED_WORK(&transport->connect_worker,
2792					xs_tcp_setup_socket);
2793		xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
2794		break;
2795	default:
2796		ret = ERR_PTR(-EAFNOSUPPORT);
2797		goto out_err;
2798	}
2799
2800	if (xprt_bound(xprt))
2801		dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2802				xprt->address_strings[RPC_DISPLAY_ADDR],
2803				xprt->address_strings[RPC_DISPLAY_PORT],
2804				xprt->address_strings[RPC_DISPLAY_PROTO]);
2805	else
2806		dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2807				xprt->address_strings[RPC_DISPLAY_ADDR],
2808				xprt->address_strings[RPC_DISPLAY_PROTO]);
2809
2810	if (try_module_get(THIS_MODULE))
2811		return xprt;
2812	ret = ERR_PTR(-EINVAL);
2813out_err:
2814	xs_xprt_free(xprt);
2815	return ret;
2816}
2817
2818/**
2819 * xs_setup_bc_tcp - Set up transport to use a TCP backchannel socket
2820 * @args: rpc transport creation arguments
2821 *
2822 */
2823static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
2824{
2825	struct sockaddr *addr = args->dstaddr;
2826	struct rpc_xprt *xprt;
2827	struct sock_xprt *transport;
2828	struct svc_sock *bc_sock;
2829	struct rpc_xprt *ret;
2830
2831	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
2832			xprt_tcp_slot_table_entries);
2833	if (IS_ERR(xprt))
2834		return xprt;
2835	transport = container_of(xprt, struct sock_xprt, xprt);
2836
2837	xprt->prot = IPPROTO_TCP;
2838	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2839	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2840	xprt->timeout = &xs_tcp_default_timeout;
2841
2842	/* backchannel */
2843	xprt_set_bound(xprt);
2844	xprt->bind_timeout = 0;
2845	xprt->reestablish_timeout = 0;
2846	xprt->idle_timeout = 0;
2847
2848	xprt->ops = &bc_tcp_ops;
2849
2850	switch (addr->sa_family) {
2851	case AF_INET:
2852		xs_format_peer_addresses(xprt, "tcp",
2853					 RPCBIND_NETID_TCP);
2854		break;
2855	case AF_INET6:
2856		xs_format_peer_addresses(xprt, "tcp",
2857				   RPCBIND_NETID_TCP6);
2858		break;
2859	default:
2860		ret = ERR_PTR(-EAFNOSUPPORT);
2861		goto out_err;
2862	}
2863
2864	dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2865			xprt->address_strings[RPC_DISPLAY_ADDR],
2866			xprt->address_strings[RPC_DISPLAY_PORT],
2867			xprt->address_strings[RPC_DISPLAY_PROTO]);
2868
2869	/*
2870	 * Once we've associated a backchannel xprt with a connection,
2871	 * we want to keep it around as long as the connection lasts,
2872	 * in case we need to start using it for a backchannel again;
2873	 * this reference won't be dropped until bc_xprt is destroyed.
2874	 */
2875	xprt_get(xprt);
2876	args->bc_xprt->xpt_bc_xprt = xprt;
2877	xprt->bc_xprt = args->bc_xprt;
2878	bc_sock = container_of(args->bc_xprt, struct svc_sock, sk_xprt);
2879	transport->sock = bc_sock->sk_sock;
2880	transport->inet = bc_sock->sk_sk;
2881
2882	/*
2883	 * Since we don't want connections for the backchannel, we set
2884	 * the xprt status to connected
2885	 */
2886	xprt_set_connected(xprt);
2887
2888	if (try_module_get(THIS_MODULE))
2889		return xprt;
2890
2891	args->bc_xprt->xpt_bc_xprt = NULL;
2892	xprt_put(xprt);
2893	ret = ERR_PTR(-EINVAL);
2894out_err:
2895	xs_xprt_free(xprt);
2896	return ret;
2897}
2898
2899static struct xprt_class	xs_local_transport = {
2900	.list		= LIST_HEAD_INIT(xs_local_transport.list),
2901	.name		= "named UNIX socket",
2902	.owner		= THIS_MODULE,
2903	.ident		= XPRT_TRANSPORT_LOCAL,
2904	.setup		= xs_setup_local,
2905};
2906
2907static struct xprt_class	xs_udp_transport = {
2908	.list		= LIST_HEAD_INIT(xs_udp_transport.list),
2909	.name		= "udp",
2910	.owner		= THIS_MODULE,
2911	.ident		= XPRT_TRANSPORT_UDP,
2912	.setup		= xs_setup_udp,
2913};
2914
2915static struct xprt_class	xs_tcp_transport = {
2916	.list		= LIST_HEAD_INIT(xs_tcp_transport.list),
2917	.name		= "tcp",
2918	.owner		= THIS_MODULE,
2919	.ident		= XPRT_TRANSPORT_TCP,
2920	.setup		= xs_setup_tcp,
2921};
2922
2923static struct xprt_class	xs_bc_tcp_transport = {
2924	.list		= LIST_HEAD_INIT(xs_bc_tcp_transport.list),
2925	.name		= "tcp NFSv4.1 backchannel",
2926	.owner		= THIS_MODULE,
2927	.ident		= XPRT_TRANSPORT_BC_TCP,
2928	.setup		= xs_setup_bc_tcp,
2929};
2930
2931/**
2932 * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
2933 *
2934 */
2935int init_socket_xprt(void)
2936{
2937#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
2938	if (!sunrpc_table_header)
2939		sunrpc_table_header = register_sysctl_table(sunrpc_table);
2940#endif
2941
2942	xprt_register_transport(&xs_local_transport);
2943	xprt_register_transport(&xs_udp_transport);
2944	xprt_register_transport(&xs_tcp_transport);
2945	xprt_register_transport(&xs_bc_tcp_transport);
2946
2947	return 0;
2948}
2949
2950/**
2951 * cleanup_socket_xprt - remove xprtsock's sysctls, unregister
2952 *
2953 */
2954void cleanup_socket_xprt(void)
2955{
2956#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
2957	if (sunrpc_table_header) {
2958		unregister_sysctl_table(sunrpc_table_header);
2959		sunrpc_table_header = NULL;
2960	}
2961#endif
2962
2963	xprt_unregister_transport(&xs_local_transport);
2964	xprt_unregister_transport(&xs_udp_transport);
2965	xprt_unregister_transport(&xs_tcp_transport);
2966	xprt_unregister_transport(&xs_bc_tcp_transport);
2967}
2968
2969static int param_set_uint_minmax(const char *val,
2970		const struct kernel_param *kp,
2971		unsigned int min, unsigned int max)
2972{
2973	unsigned int num;
2974	int ret;
2975
2976	if (!val)
2977		return -EINVAL;
2978	ret = kstrtouint(val, 0, &num);
2979	if (ret == -EINVAL || num < min || num > max)
2980		return -EINVAL;
2981	*((unsigned int *)kp->arg) = num;
2982	return 0;
2983}
2984
2985static int param_set_portnr(const char *val, const struct kernel_param *kp)
2986{
2987	return param_set_uint_minmax(val, kp,
2988			RPC_MIN_RESVPORT,
2989			RPC_MAX_RESVPORT);
2990}
2991
2992static struct kernel_param_ops param_ops_portnr = {
2993	.set = param_set_portnr,
2994	.get = param_get_uint,
2995};
2996
2997#define param_check_portnr(name, p) \
2998	__param_check(name, p, unsigned int);
2999
3000module_param_named(min_resvport, xprt_min_resvport, portnr, 0644);
3001module_param_named(max_resvport, xprt_max_resvport, portnr, 0644);
3002
3003static int param_set_slot_table_size(const char *val,
3004				     const struct kernel_param *kp)
3005{
3006	return param_set_uint_minmax(val, kp,
3007			RPC_MIN_SLOT_TABLE,
3008			RPC_MAX_SLOT_TABLE);
3009}
3010
3011static struct kernel_param_ops param_ops_slot_table_size = {
3012	.set = param_set_slot_table_size,
3013	.get = param_get_uint,
3014};
3015
3016#define param_check_slot_table_size(name, p) \
3017	__param_check(name, p, unsigned int);
3018
3019static int param_set_max_slot_table_size(const char *val,
3020				     const struct kernel_param *kp)
3021{
3022	return param_set_uint_minmax(val, kp,
3023			RPC_MIN_SLOT_TABLE,
3024			RPC_MAX_SLOT_TABLE_LIMIT);
3025}
3026
3027static struct kernel_param_ops param_ops_max_slot_table_size = {
3028	.set = param_set_max_slot_table_size,
3029	.get = param_get_uint,
3030};
3031
3032#define param_check_max_slot_table_size(name, p) \
3033	__param_check(name, p, unsigned int);
3034
3035module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries,
3036		   slot_table_size, 0644);
3037module_param_named(tcp_max_slot_table_entries, xprt_max_tcp_slot_table_entries,
3038		   max_slot_table_size, 0644);
3039module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries,
3040		   slot_table_size, 0644);
3041
3042