1/*
2 *  linux/net/sunrpc/xprt.c
3 *
4 *  This is a generic RPC call interface supporting congestion avoidance,
5 *  and asynchronous calls.
6 *
7 *  The interface works like this:
8 *
9 *  -	When a process places a call, it allocates a request slot if
10 *	one is available. Otherwise, it sleeps on the backlog queue
11 *	(xprt_reserve).
12 *  -	Next, the caller puts together the RPC message, stuffs it into
13 *	the request struct, and calls xprt_transmit().
14 *  -	xprt_transmit sends the message and installs the caller on the
15 *	transport's wait list. At the same time, if a reply is expected,
16 *	it installs a timer that is run after the packet's timeout has
17 *	expired.
18 *  -	When a packet arrives, the data_ready handler walks the list of
19 *	pending requests for that transport. If a matching XID is found, the
20 *	caller is woken up, and the timer removed.
21 *  -	When no reply arrives within the timeout interval, the timer is
22 *	fired by the kernel and runs xprt_timer(). It either adjusts the
23 *	timeout values (minor timeout) or wakes up the caller with a status
24 *	of -ETIMEDOUT.
25 *  -	When the caller receives a notification from RPC that a reply arrived,
26 *	it should release the RPC slot, and process the reply.
27 *	If the call timed out, it may choose to retry the operation by
28 *	adjusting the initial timeout value, and simply calling rpc_call
29 *	again.
30 *
31 *  Support for async RPC is done through a set of RPC-specific scheduling
32 *  primitives that `transparently' work for processes as well as async
33 *  tasks that rely on callbacks.
34 *
35 *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
36 *
37 *  Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com>
38 */
39
40#include <linux/module.h>
41
42#include <linux/types.h>
43#include <linux/interrupt.h>
44#include <linux/workqueue.h>
45#include <linux/net.h>
46#include <linux/ktime.h>
47
48#include <linux/sunrpc/clnt.h>
49#include <linux/sunrpc/metrics.h>
50#include <linux/sunrpc/bc_xprt.h>
51
52#include <trace/events/sunrpc.h>
53
54#include "sunrpc.h"
55
56/*
57 * Local variables
58 */
59
60#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
61# define RPCDBG_FACILITY	RPCDBG_XPRT
62#endif
63
64/*
65 * Local functions
66 */
67static void	 xprt_init(struct rpc_xprt *xprt, struct net *net);
68static void	xprt_request_init(struct rpc_task *, struct rpc_xprt *);
69static void	xprt_connect_status(struct rpc_task *task);
70static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
71static void	 xprt_destroy(struct rpc_xprt *xprt);
72
73static DEFINE_SPINLOCK(xprt_list_lock);
74static LIST_HEAD(xprt_list);
75
76/**
77 * xprt_register_transport - register a transport implementation
78 * @transport: transport to register
79 *
80 * If a transport implementation is loaded as a kernel module, it can
81 * call this interface to make itself known to the RPC client.
82 *
83 * Returns:
84 * 0:		transport successfully registered
85 * -EEXIST:	transport already registered
86 * -EINVAL:	transport module being unloaded
87 */
88int xprt_register_transport(struct xprt_class *transport)
89{
90	struct xprt_class *t;
91	int result;
92
93	result = -EEXIST;
94	spin_lock(&xprt_list_lock);
95	list_for_each_entry(t, &xprt_list, list) {
96		/* don't register the same transport class twice */
97		if (t->ident == transport->ident)
98			goto out;
99	}
100
101	list_add_tail(&transport->list, &xprt_list);
102	printk(KERN_INFO "RPC: Registered %s transport module.\n",
103	       transport->name);
104	result = 0;
105
106out:
107	spin_unlock(&xprt_list_lock);
108	return result;
109}
110EXPORT_SYMBOL_GPL(xprt_register_transport);
111
112/**
113 * xprt_unregister_transport - unregister a transport implementation
114 * @transport: transport to unregister
115 *
116 * Returns:
117 * 0:		transport successfully unregistered
118 * -ENOENT:	transport never registered
119 */
120int xprt_unregister_transport(struct xprt_class *transport)
121{
122	struct xprt_class *t;
123	int result;
124
125	result = 0;
126	spin_lock(&xprt_list_lock);
127	list_for_each_entry(t, &xprt_list, list) {
128		if (t == transport) {
129			printk(KERN_INFO
130				"RPC: Unregistered %s transport module.\n",
131				transport->name);
132			list_del_init(&transport->list);
133			goto out;
134		}
135	}
136	result = -ENOENT;
137
138out:
139	spin_unlock(&xprt_list_lock);
140	return result;
141}
142EXPORT_SYMBOL_GPL(xprt_unregister_transport);
143
144/**
145 * xprt_load_transport - load a transport implementation
146 * @transport_name: transport to load
147 *
148 * Returns:
149 * 0:		transport successfully loaded
150 * -ENOENT:	transport module not available
151 */
152int xprt_load_transport(const char *transport_name)
153{
154	struct xprt_class *t;
155	int result;
156
157	result = 0;
158	spin_lock(&xprt_list_lock);
159	list_for_each_entry(t, &xprt_list, list) {
160		if (strcmp(t->name, transport_name) == 0) {
161			spin_unlock(&xprt_list_lock);
162			goto out;
163		}
164	}
165	spin_unlock(&xprt_list_lock);
166	result = request_module("xprt%s", transport_name);
167out:
168	return result;
169}
170EXPORT_SYMBOL_GPL(xprt_load_transport);
171
172/**
173 * xprt_reserve_xprt - serialize write access to transports
174 * @task: task that is requesting access to the transport
175 * @xprt: pointer to the target transport
176 *
177 * This prevents mixing the payload of separate requests, and prevents
178 * transport connects from colliding with writes.  No congestion control
179 * is provided.
180 */
181int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
182{
183	struct rpc_rqst *req = task->tk_rqstp;
184	int priority;
185
186	if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
187		if (task == xprt->snd_task)
188			return 1;
189		goto out_sleep;
190	}
191	xprt->snd_task = task;
192	if (req != NULL)
193		req->rq_ntrans++;
194
195	return 1;
196
197out_sleep:
198	dprintk("RPC: %5u failed to lock transport %p\n",
199			task->tk_pid, xprt);
200	task->tk_timeout = 0;
201	task->tk_status = -EAGAIN;
202	if (req == NULL)
203		priority = RPC_PRIORITY_LOW;
204	else if (!req->rq_ntrans)
205		priority = RPC_PRIORITY_NORMAL;
206	else
207		priority = RPC_PRIORITY_HIGH;
208	rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);
209	return 0;
210}
211EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
212
213static void xprt_clear_locked(struct rpc_xprt *xprt)
214{
215	xprt->snd_task = NULL;
216	if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
217		smp_mb__before_atomic();
218		clear_bit(XPRT_LOCKED, &xprt->state);
219		smp_mb__after_atomic();
220	} else
221		queue_work(rpciod_workqueue, &xprt->task_cleanup);
222}
223
224/*
225 * xprt_reserve_xprt_cong - serialize write access to transports
226 * @task: task that is requesting access to the transport
227 *
228 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is
229 * integrated into the decision of whether a request is allowed to be
230 * woken up and given access to the transport.
231 */
232int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
233{
234	struct rpc_rqst *req = task->tk_rqstp;
235	int priority;
236
237	if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
238		if (task == xprt->snd_task)
239			return 1;
240		goto out_sleep;
241	}
242	if (req == NULL) {
243		xprt->snd_task = task;
244		return 1;
245	}
246	if (__xprt_get_cong(xprt, task)) {
247		xprt->snd_task = task;
248		req->rq_ntrans++;
249		return 1;
250	}
251	xprt_clear_locked(xprt);
252out_sleep:
253	dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
254	task->tk_timeout = 0;
255	task->tk_status = -EAGAIN;
256	if (req == NULL)
257		priority = RPC_PRIORITY_LOW;
258	else if (!req->rq_ntrans)
259		priority = RPC_PRIORITY_NORMAL;
260	else
261		priority = RPC_PRIORITY_HIGH;
262	rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);
263	return 0;
264}
265EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
266
267static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
268{
269	int retval;
270
271	spin_lock_bh(&xprt->transport_lock);
272	retval = xprt->ops->reserve_xprt(xprt, task);
273	spin_unlock_bh(&xprt->transport_lock);
274	return retval;
275}
276
277static bool __xprt_lock_write_func(struct rpc_task *task, void *data)
278{
279	struct rpc_xprt *xprt = data;
280	struct rpc_rqst *req;
281
282	req = task->tk_rqstp;
283	xprt->snd_task = task;
284	if (req)
285		req->rq_ntrans++;
286	return true;
287}
288
289static void __xprt_lock_write_next(struct rpc_xprt *xprt)
290{
291	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
292		return;
293
294	if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_func, xprt))
295		return;
296	xprt_clear_locked(xprt);
297}
298
299static bool __xprt_lock_write_cong_func(struct rpc_task *task, void *data)
300{
301	struct rpc_xprt *xprt = data;
302	struct rpc_rqst *req;
303
304	req = task->tk_rqstp;
305	if (req == NULL) {
306		xprt->snd_task = task;
307		return true;
308	}
309	if (__xprt_get_cong(xprt, task)) {
310		xprt->snd_task = task;
311		req->rq_ntrans++;
312		return true;
313	}
314	return false;
315}
316
317static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
318{
319	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
320		return;
321	if (RPCXPRT_CONGESTED(xprt))
322		goto out_unlock;
323	if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_cong_func, xprt))
324		return;
325out_unlock:
326	xprt_clear_locked(xprt);
327}
328
329static void xprt_task_clear_bytes_sent(struct rpc_task *task)
330{
331	if (task != NULL) {
332		struct rpc_rqst *req = task->tk_rqstp;
333		if (req != NULL)
334			req->rq_bytes_sent = 0;
335	}
336}
337
338/**
339 * xprt_release_xprt - allow other requests to use a transport
340 * @xprt: transport with other tasks potentially waiting
341 * @task: task that is releasing access to the transport
342 *
343 * Note that "task" can be NULL.  No congestion control is provided.
344 */
345void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
346{
347	if (xprt->snd_task == task) {
348		xprt_task_clear_bytes_sent(task);
349		xprt_clear_locked(xprt);
350		__xprt_lock_write_next(xprt);
351	}
352}
353EXPORT_SYMBOL_GPL(xprt_release_xprt);
354
355/**
356 * xprt_release_xprt_cong - allow other requests to use a transport
357 * @xprt: transport with other tasks potentially waiting
358 * @task: task that is releasing access to the transport
359 *
360 * Note that "task" can be NULL.  Another task is awoken to use the
361 * transport if the transport's congestion window allows it.
362 */
363void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
364{
365	if (xprt->snd_task == task) {
366		xprt_task_clear_bytes_sent(task);
367		xprt_clear_locked(xprt);
368		__xprt_lock_write_next_cong(xprt);
369	}
370}
371EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
372
373static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
374{
375	spin_lock_bh(&xprt->transport_lock);
376	xprt->ops->release_xprt(xprt, task);
377	spin_unlock_bh(&xprt->transport_lock);
378}
379
380/*
381 * Van Jacobson congestion avoidance. Check if the congestion window
382 * overflowed. Put the task to sleep if this is the case.
383 */
384static int
385__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
386{
387	struct rpc_rqst *req = task->tk_rqstp;
388
389	if (req->rq_cong)
390		return 1;
391	dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n",
392			task->tk_pid, xprt->cong, xprt->cwnd);
393	if (RPCXPRT_CONGESTED(xprt))
394		return 0;
395	req->rq_cong = 1;
396	xprt->cong += RPC_CWNDSCALE;
397	return 1;
398}
399
400/*
401 * Adjust the congestion window, and wake up the next task
402 * that has been sleeping due to congestion
403 */
404static void
405__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
406{
407	if (!req->rq_cong)
408		return;
409	req->rq_cong = 0;
410	xprt->cong -= RPC_CWNDSCALE;
411	__xprt_lock_write_next_cong(xprt);
412}
413
414/**
415 * xprt_release_rqst_cong - housekeeping when request is complete
416 * @task: RPC request that recently completed
417 *
418 * Useful for transports that require congestion control.
419 */
420void xprt_release_rqst_cong(struct rpc_task *task)
421{
422	struct rpc_rqst *req = task->tk_rqstp;
423
424	__xprt_put_cong(req->rq_xprt, req);
425}
426EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
427
428/**
429 * xprt_adjust_cwnd - adjust transport congestion window
430 * @xprt: pointer to xprt
431 * @task: recently completed RPC request used to adjust window
432 * @result: result code of completed RPC request
433 *
434 * The transport code maintains an estimate on the maximum number of out-
435 * standing RPC requests, using a smoothed version of the congestion
436 * avoidance implemented in 44BSD. This is basically the Van Jacobson
437 * congestion algorithm: If a retransmit occurs, the congestion window is
438 * halved; otherwise, it is incremented by 1/cwnd when
439 *
440 *	-	a reply is received and
441 *	-	a full number of requests are outstanding and
442 *	-	the congestion window hasn't been updated recently.
443 */
444void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result)
445{
446	struct rpc_rqst *req = task->tk_rqstp;
447	unsigned long cwnd = xprt->cwnd;
448
449	if (result >= 0 && cwnd <= xprt->cong) {
450		/* The (cwnd >> 1) term makes sure
451		 * the result gets rounded properly. */
452		cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
453		if (cwnd > RPC_MAXCWND(xprt))
454			cwnd = RPC_MAXCWND(xprt);
455		__xprt_lock_write_next_cong(xprt);
456	} else if (result == -ETIMEDOUT) {
457		cwnd >>= 1;
458		if (cwnd < RPC_CWNDSCALE)
459			cwnd = RPC_CWNDSCALE;
460	}
461	dprintk("RPC:       cong %ld, cwnd was %ld, now %ld\n",
462			xprt->cong, xprt->cwnd, cwnd);
463	xprt->cwnd = cwnd;
464	__xprt_put_cong(xprt, req);
465}
466EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);
467
468/**
469 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue
470 * @xprt: transport with waiting tasks
471 * @status: result code to plant in each task before waking it
472 *
473 */
474void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
475{
476	if (status < 0)
477		rpc_wake_up_status(&xprt->pending, status);
478	else
479		rpc_wake_up(&xprt->pending);
480}
481EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
482
483/**
484 * xprt_wait_for_buffer_space - wait for transport output buffer to clear
485 * @task: task to be put to sleep
486 * @action: function pointer to be executed after wait
487 *
488 * Note that we only set the timer for the case of RPC_IS_SOFT(), since
489 * we don't in general want to force a socket disconnection due to
490 * an incomplete RPC call transmission.
491 */
492void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action)
493{
494	struct rpc_rqst *req = task->tk_rqstp;
495	struct rpc_xprt *xprt = req->rq_xprt;
496
497	task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0;
498	rpc_sleep_on(&xprt->pending, task, action);
499}
500EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
501
502/**
503 * xprt_write_space - wake the task waiting for transport output buffer space
504 * @xprt: transport with waiting tasks
505 *
506 * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
507 */
508void xprt_write_space(struct rpc_xprt *xprt)
509{
510	spin_lock_bh(&xprt->transport_lock);
511	if (xprt->snd_task) {
512		dprintk("RPC:       write space: waking waiting task on "
513				"xprt %p\n", xprt);
514		rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task);
515	}
516	spin_unlock_bh(&xprt->transport_lock);
517}
518EXPORT_SYMBOL_GPL(xprt_write_space);
519
520/**
521 * xprt_set_retrans_timeout_def - set a request's retransmit timeout
522 * @task: task whose timeout is to be set
523 *
524 * Set a request's retransmit timeout based on the transport's
525 * default timeout parameters.  Used by transports that don't adjust
526 * the retransmit timeout based on round-trip time estimation.
527 */
528void xprt_set_retrans_timeout_def(struct rpc_task *task)
529{
530	task->tk_timeout = task->tk_rqstp->rq_timeout;
531}
532EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def);
533
534/**
535 * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout
536 * @task: task whose timeout is to be set
537 *
538 * Set a request's retransmit timeout using the RTT estimator.
539 */
540void xprt_set_retrans_timeout_rtt(struct rpc_task *task)
541{
542	int timer = task->tk_msg.rpc_proc->p_timer;
543	struct rpc_clnt *clnt = task->tk_client;
544	struct rpc_rtt *rtt = clnt->cl_rtt;
545	struct rpc_rqst *req = task->tk_rqstp;
546	unsigned long max_timeout = clnt->cl_timeout->to_maxval;
547
548	task->tk_timeout = rpc_calc_rto(rtt, timer);
549	task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
550	if (task->tk_timeout > max_timeout || task->tk_timeout == 0)
551		task->tk_timeout = max_timeout;
552}
553EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt);
554
555static void xprt_reset_majortimeo(struct rpc_rqst *req)
556{
557	const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
558
559	req->rq_majortimeo = req->rq_timeout;
560	if (to->to_exponential)
561		req->rq_majortimeo <<= to->to_retries;
562	else
563		req->rq_majortimeo += to->to_increment * to->to_retries;
564	if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)
565		req->rq_majortimeo = to->to_maxval;
566	req->rq_majortimeo += jiffies;
567}
568
569/**
570 * xprt_adjust_timeout - adjust timeout values for next retransmit
571 * @req: RPC request containing parameters to use for the adjustment
572 *
573 */
574int xprt_adjust_timeout(struct rpc_rqst *req)
575{
576	struct rpc_xprt *xprt = req->rq_xprt;
577	const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
578	int status = 0;
579
580	if (time_before(jiffies, req->rq_majortimeo)) {
581		if (to->to_exponential)
582			req->rq_timeout <<= 1;
583		else
584			req->rq_timeout += to->to_increment;
585		if (to->to_maxval && req->rq_timeout >= to->to_maxval)
586			req->rq_timeout = to->to_maxval;
587		req->rq_retries++;
588	} else {
589		req->rq_timeout = to->to_initval;
590		req->rq_retries = 0;
591		xprt_reset_majortimeo(req);
592		/* Reset the RTT counters == "slow start" */
593		spin_lock_bh(&xprt->transport_lock);
594		rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
595		spin_unlock_bh(&xprt->transport_lock);
596		status = -ETIMEDOUT;
597	}
598
599	if (req->rq_timeout == 0) {
600		printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
601		req->rq_timeout = 5 * HZ;
602	}
603	return status;
604}
605
606static void xprt_autoclose(struct work_struct *work)
607{
608	struct rpc_xprt *xprt =
609		container_of(work, struct rpc_xprt, task_cleanup);
610
611	xprt->ops->close(xprt);
612	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
613	xprt_release_write(xprt, NULL);
614	wake_up_bit(&xprt->state, XPRT_LOCKED);
615}
616
617/**
618 * xprt_disconnect_done - mark a transport as disconnected
619 * @xprt: transport to flag for disconnect
620 *
621 */
622void xprt_disconnect_done(struct rpc_xprt *xprt)
623{
624	dprintk("RPC:       disconnected transport %p\n", xprt);
625	spin_lock_bh(&xprt->transport_lock);
626	xprt_clear_connected(xprt);
627	xprt_wake_pending_tasks(xprt, -EAGAIN);
628	spin_unlock_bh(&xprt->transport_lock);
629}
630EXPORT_SYMBOL_GPL(xprt_disconnect_done);
631
632/**
633 * xprt_force_disconnect - force a transport to disconnect
634 * @xprt: transport to disconnect
635 *
636 */
637void xprt_force_disconnect(struct rpc_xprt *xprt)
638{
639	/* Don't race with the test_bit() in xprt_clear_locked() */
640	spin_lock_bh(&xprt->transport_lock);
641	set_bit(XPRT_CLOSE_WAIT, &xprt->state);
642	/* Try to schedule an autoclose RPC call */
643	if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
644		queue_work(rpciod_workqueue, &xprt->task_cleanup);
645	xprt_wake_pending_tasks(xprt, -EAGAIN);
646	spin_unlock_bh(&xprt->transport_lock);
647}
648
649/**
650 * xprt_conditional_disconnect - force a transport to disconnect
651 * @xprt: transport to disconnect
652 * @cookie: 'connection cookie'
653 *
654 * This attempts to break the connection if and only if 'cookie' matches
655 * the current transport 'connection cookie'. It ensures that we don't
656 * try to break the connection more than once when we need to retransmit
657 * a batch of RPC requests.
658 *
659 */
660void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
661{
662	/* Don't race with the test_bit() in xprt_clear_locked() */
663	spin_lock_bh(&xprt->transport_lock);
664	if (cookie != xprt->connect_cookie)
665		goto out;
666	if (test_bit(XPRT_CLOSING, &xprt->state) || !xprt_connected(xprt))
667		goto out;
668	set_bit(XPRT_CLOSE_WAIT, &xprt->state);
669	/* Try to schedule an autoclose RPC call */
670	if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
671		queue_work(rpciod_workqueue, &xprt->task_cleanup);
672	xprt_wake_pending_tasks(xprt, -EAGAIN);
673out:
674	spin_unlock_bh(&xprt->transport_lock);
675}
676
677static void
678xprt_init_autodisconnect(unsigned long data)
679{
680	struct rpc_xprt *xprt = (struct rpc_xprt *)data;
681
682	spin_lock(&xprt->transport_lock);
683	if (!list_empty(&xprt->recv))
684		goto out_abort;
685	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
686		goto out_abort;
687	spin_unlock(&xprt->transport_lock);
688	queue_work(rpciod_workqueue, &xprt->task_cleanup);
689	return;
690out_abort:
691	spin_unlock(&xprt->transport_lock);
692}
693
694bool xprt_lock_connect(struct rpc_xprt *xprt,
695		struct rpc_task *task,
696		void *cookie)
697{
698	bool ret = false;
699
700	spin_lock_bh(&xprt->transport_lock);
701	if (!test_bit(XPRT_LOCKED, &xprt->state))
702		goto out;
703	if (xprt->snd_task != task)
704		goto out;
705	xprt_task_clear_bytes_sent(task);
706	xprt->snd_task = cookie;
707	ret = true;
708out:
709	spin_unlock_bh(&xprt->transport_lock);
710	return ret;
711}
712
713void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie)
714{
715	spin_lock_bh(&xprt->transport_lock);
716	if (xprt->snd_task != cookie)
717		goto out;
718	if (!test_bit(XPRT_LOCKED, &xprt->state))
719		goto out;
720	xprt->snd_task =NULL;
721	xprt->ops->release_xprt(xprt, NULL);
722out:
723	spin_unlock_bh(&xprt->transport_lock);
724	wake_up_bit(&xprt->state, XPRT_LOCKED);
725}
726
727/**
728 * xprt_connect - schedule a transport connect operation
729 * @task: RPC task that is requesting the connect
730 *
731 */
732void xprt_connect(struct rpc_task *task)
733{
734	struct rpc_xprt	*xprt = task->tk_rqstp->rq_xprt;
735
736	dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid,
737			xprt, (xprt_connected(xprt) ? "is" : "is not"));
738
739	if (!xprt_bound(xprt)) {
740		task->tk_status = -EAGAIN;
741		return;
742	}
743	if (!xprt_lock_write(xprt, task))
744		return;
745
746	if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state))
747		xprt->ops->close(xprt);
748
749	if (!xprt_connected(xprt)) {
750		task->tk_rqstp->rq_bytes_sent = 0;
751		task->tk_timeout = task->tk_rqstp->rq_timeout;
752		rpc_sleep_on(&xprt->pending, task, xprt_connect_status);
753
754		if (test_bit(XPRT_CLOSING, &xprt->state))
755			return;
756		if (xprt_test_and_set_connecting(xprt))
757			return;
758		xprt->stat.connect_start = jiffies;
759		xprt->ops->connect(xprt, task);
760	}
761	xprt_release_write(xprt, task);
762}
763
764static void xprt_connect_status(struct rpc_task *task)
765{
766	struct rpc_xprt	*xprt = task->tk_rqstp->rq_xprt;
767
768	if (task->tk_status == 0) {
769		xprt->stat.connect_count++;
770		xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start;
771		dprintk("RPC: %5u xprt_connect_status: connection established\n",
772				task->tk_pid);
773		return;
774	}
775
776	switch (task->tk_status) {
777	case -ECONNREFUSED:
778	case -ECONNRESET:
779	case -ECONNABORTED:
780	case -ENETUNREACH:
781	case -EHOSTUNREACH:
782	case -EPIPE:
783	case -EAGAIN:
784		dprintk("RPC: %5u xprt_connect_status: retrying\n", task->tk_pid);
785		break;
786	case -ETIMEDOUT:
787		dprintk("RPC: %5u xprt_connect_status: connect attempt timed "
788				"out\n", task->tk_pid);
789		break;
790	default:
791		dprintk("RPC: %5u xprt_connect_status: error %d connecting to "
792				"server %s\n", task->tk_pid, -task->tk_status,
793				xprt->servername);
794		task->tk_status = -EIO;
795	}
796}
797
798/**
799 * xprt_lookup_rqst - find an RPC request corresponding to an XID
800 * @xprt: transport on which the original request was transmitted
801 * @xid: RPC XID of incoming reply
802 *
803 */
804struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
805{
806	struct rpc_rqst *entry;
807
808	list_for_each_entry(entry, &xprt->recv, rq_list)
809		if (entry->rq_xid == xid) {
810			trace_xprt_lookup_rqst(xprt, xid, 0);
811			return entry;
812		}
813
814	dprintk("RPC:       xprt_lookup_rqst did not find xid %08x\n",
815			ntohl(xid));
816	trace_xprt_lookup_rqst(xprt, xid, -ENOENT);
817	xprt->stat.bad_xids++;
818	return NULL;
819}
820EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
821
822static void xprt_update_rtt(struct rpc_task *task)
823{
824	struct rpc_rqst *req = task->tk_rqstp;
825	struct rpc_rtt *rtt = task->tk_client->cl_rtt;
826	unsigned int timer = task->tk_msg.rpc_proc->p_timer;
827	long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt));
828
829	if (timer) {
830		if (req->rq_ntrans == 1)
831			rpc_update_rtt(rtt, timer, m);
832		rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
833	}
834}
835
836/**
837 * xprt_complete_rqst - called when reply processing is complete
838 * @task: RPC request that recently completed
839 * @copied: actual number of bytes received from the transport
840 *
841 * Caller holds transport lock.
842 */
843void xprt_complete_rqst(struct rpc_task *task, int copied)
844{
845	struct rpc_rqst *req = task->tk_rqstp;
846	struct rpc_xprt *xprt = req->rq_xprt;
847
848	dprintk("RPC: %5u xid %08x complete (%d bytes received)\n",
849			task->tk_pid, ntohl(req->rq_xid), copied);
850	trace_xprt_complete_rqst(xprt, req->rq_xid, copied);
851
852	xprt->stat.recvs++;
853	req->rq_rtt = ktime_sub(ktime_get(), req->rq_xtime);
854	if (xprt->ops->timer != NULL)
855		xprt_update_rtt(task);
856
857	list_del_init(&req->rq_list);
858	req->rq_private_buf.len = copied;
859	/* Ensure all writes are done before we update */
860	/* req->rq_reply_bytes_recvd */
861	smp_wmb();
862	req->rq_reply_bytes_recvd = copied;
863	rpc_wake_up_queued_task(&xprt->pending, task);
864}
865EXPORT_SYMBOL_GPL(xprt_complete_rqst);
866
867static void xprt_timer(struct rpc_task *task)
868{
869	struct rpc_rqst *req = task->tk_rqstp;
870	struct rpc_xprt *xprt = req->rq_xprt;
871
872	if (task->tk_status != -ETIMEDOUT)
873		return;
874	dprintk("RPC: %5u xprt_timer\n", task->tk_pid);
875
876	spin_lock_bh(&xprt->transport_lock);
877	if (!req->rq_reply_bytes_recvd) {
878		if (xprt->ops->timer)
879			xprt->ops->timer(xprt, task);
880	} else
881		task->tk_status = 0;
882	spin_unlock_bh(&xprt->transport_lock);
883}
884
885static inline int xprt_has_timer(struct rpc_xprt *xprt)
886{
887	return xprt->idle_timeout != 0;
888}
889
890/**
891 * xprt_prepare_transmit - reserve the transport before sending a request
892 * @task: RPC task about to send a request
893 *
894 */
895bool xprt_prepare_transmit(struct rpc_task *task)
896{
897	struct rpc_rqst	*req = task->tk_rqstp;
898	struct rpc_xprt	*xprt = req->rq_xprt;
899	bool ret = false;
900
901	dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid);
902
903	spin_lock_bh(&xprt->transport_lock);
904	if (!req->rq_bytes_sent) {
905		if (req->rq_reply_bytes_recvd) {
906			task->tk_status = req->rq_reply_bytes_recvd;
907			goto out_unlock;
908		}
909		if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT)
910		    && xprt_connected(xprt)
911		    && req->rq_connect_cookie == xprt->connect_cookie) {
912			xprt->ops->set_retrans_timeout(task);
913			rpc_sleep_on(&xprt->pending, task, xprt_timer);
914			goto out_unlock;
915		}
916	}
917	if (!xprt->ops->reserve_xprt(xprt, task)) {
918		task->tk_status = -EAGAIN;
919		goto out_unlock;
920	}
921	ret = true;
922out_unlock:
923	spin_unlock_bh(&xprt->transport_lock);
924	return ret;
925}
926
927void xprt_end_transmit(struct rpc_task *task)
928{
929	xprt_release_write(task->tk_rqstp->rq_xprt, task);
930}
931
932/**
933 * xprt_transmit - send an RPC request on a transport
934 * @task: controlling RPC task
935 *
936 * We have to copy the iovec because sendmsg fiddles with its contents.
937 */
938void xprt_transmit(struct rpc_task *task)
939{
940	struct rpc_rqst	*req = task->tk_rqstp;
941	struct rpc_xprt	*xprt = req->rq_xprt;
942	int status, numreqs;
943
944	dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
945
946	if (!req->rq_reply_bytes_recvd) {
947		if (list_empty(&req->rq_list) && rpc_reply_expected(task)) {
948			/*
949			 * Add to the list only if we're expecting a reply
950			 */
951			spin_lock_bh(&xprt->transport_lock);
952			/* Update the softirq receive buffer */
953			memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
954					sizeof(req->rq_private_buf));
955			/* Add request to the receive list */
956			list_add_tail(&req->rq_list, &xprt->recv);
957			spin_unlock_bh(&xprt->transport_lock);
958			xprt_reset_majortimeo(req);
959			/* Turn off autodisconnect */
960			del_singleshot_timer_sync(&xprt->timer);
961		}
962	} else if (!req->rq_bytes_sent)
963		return;
964
965	req->rq_xtime = ktime_get();
966	status = xprt->ops->send_request(task);
967	trace_xprt_transmit(xprt, req->rq_xid, status);
968	if (status != 0) {
969		task->tk_status = status;
970		return;
971	}
972
973	dprintk("RPC: %5u xmit complete\n", task->tk_pid);
974	task->tk_flags |= RPC_TASK_SENT;
975	spin_lock_bh(&xprt->transport_lock);
976
977	xprt->ops->set_retrans_timeout(task);
978
979	numreqs = atomic_read(&xprt->num_reqs);
980	if (numreqs > xprt->stat.max_slots)
981		xprt->stat.max_slots = numreqs;
982	xprt->stat.sends++;
983	xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
984	xprt->stat.bklog_u += xprt->backlog.qlen;
985	xprt->stat.sending_u += xprt->sending.qlen;
986	xprt->stat.pending_u += xprt->pending.qlen;
987
988	/* Don't race with disconnect */
989	if (!xprt_connected(xprt))
990		task->tk_status = -ENOTCONN;
991	else {
992		/*
993		 * Sleep on the pending queue since
994		 * we're expecting a reply.
995		 */
996		if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task))
997			rpc_sleep_on(&xprt->pending, task, xprt_timer);
998		req->rq_connect_cookie = xprt->connect_cookie;
999	}
1000	spin_unlock_bh(&xprt->transport_lock);
1001}
1002
1003static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
1004{
1005	set_bit(XPRT_CONGESTED, &xprt->state);
1006	rpc_sleep_on(&xprt->backlog, task, NULL);
1007}
1008
1009static void xprt_wake_up_backlog(struct rpc_xprt *xprt)
1010{
1011	if (rpc_wake_up_next(&xprt->backlog) == NULL)
1012		clear_bit(XPRT_CONGESTED, &xprt->state);
1013}
1014
1015static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
1016{
1017	bool ret = false;
1018
1019	if (!test_bit(XPRT_CONGESTED, &xprt->state))
1020		goto out;
1021	spin_lock(&xprt->reserve_lock);
1022	if (test_bit(XPRT_CONGESTED, &xprt->state)) {
1023		rpc_sleep_on(&xprt->backlog, task, NULL);
1024		ret = true;
1025	}
1026	spin_unlock(&xprt->reserve_lock);
1027out:
1028	return ret;
1029}
1030
1031static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags)
1032{
1033	struct rpc_rqst *req = ERR_PTR(-EAGAIN);
1034
1035	if (!atomic_add_unless(&xprt->num_reqs, 1, xprt->max_reqs))
1036		goto out;
1037	req = kzalloc(sizeof(struct rpc_rqst), gfp_flags);
1038	if (req != NULL)
1039		goto out;
1040	atomic_dec(&xprt->num_reqs);
1041	req = ERR_PTR(-ENOMEM);
1042out:
1043	return req;
1044}
1045
1046static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
1047{
1048	if (atomic_add_unless(&xprt->num_reqs, -1, xprt->min_reqs)) {
1049		kfree(req);
1050		return true;
1051	}
1052	return false;
1053}
1054
1055void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
1056{
1057	struct rpc_rqst *req;
1058
1059	spin_lock(&xprt->reserve_lock);
1060	if (!list_empty(&xprt->free)) {
1061		req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
1062		list_del(&req->rq_list);
1063		goto out_init_req;
1064	}
1065	req = xprt_dynamic_alloc_slot(xprt, GFP_NOWAIT|__GFP_NOWARN);
1066	if (!IS_ERR(req))
1067		goto out_init_req;
1068	switch (PTR_ERR(req)) {
1069	case -ENOMEM:
1070		dprintk("RPC:       dynamic allocation of request slot "
1071				"failed! Retrying\n");
1072		task->tk_status = -ENOMEM;
1073		break;
1074	case -EAGAIN:
1075		xprt_add_backlog(xprt, task);
1076		dprintk("RPC:       waiting for request slot\n");
1077	default:
1078		task->tk_status = -EAGAIN;
1079	}
1080	spin_unlock(&xprt->reserve_lock);
1081	return;
1082out_init_req:
1083	task->tk_status = 0;
1084	task->tk_rqstp = req;
1085	xprt_request_init(task, xprt);
1086	spin_unlock(&xprt->reserve_lock);
1087}
1088EXPORT_SYMBOL_GPL(xprt_alloc_slot);
1089
1090void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
1091{
1092	/* Note: grabbing the xprt_lock_write() ensures that we throttle
1093	 * new slot allocation if the transport is congested (i.e. when
1094	 * reconnecting a stream transport or when out of socket write
1095	 * buffer space).
1096	 */
1097	if (xprt_lock_write(xprt, task)) {
1098		xprt_alloc_slot(xprt, task);
1099		xprt_release_write(xprt, task);
1100	}
1101}
1102EXPORT_SYMBOL_GPL(xprt_lock_and_alloc_slot);
1103
1104static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
1105{
1106	spin_lock(&xprt->reserve_lock);
1107	if (!xprt_dynamic_free_slot(xprt, req)) {
1108		memset(req, 0, sizeof(*req));	/* mark unused */
1109		list_add(&req->rq_list, &xprt->free);
1110	}
1111	xprt_wake_up_backlog(xprt);
1112	spin_unlock(&xprt->reserve_lock);
1113}
1114
1115static void xprt_free_all_slots(struct rpc_xprt *xprt)
1116{
1117	struct rpc_rqst *req;
1118	while (!list_empty(&xprt->free)) {
1119		req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list);
1120		list_del(&req->rq_list);
1121		kfree(req);
1122	}
1123}
1124
1125struct rpc_xprt *xprt_alloc(struct net *net, size_t size,
1126		unsigned int num_prealloc,
1127		unsigned int max_alloc)
1128{
1129	struct rpc_xprt *xprt;
1130	struct rpc_rqst *req;
1131	int i;
1132
1133	xprt = kzalloc(size, GFP_KERNEL);
1134	if (xprt == NULL)
1135		goto out;
1136
1137	xprt_init(xprt, net);
1138
1139	for (i = 0; i < num_prealloc; i++) {
1140		req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL);
1141		if (!req)
1142			goto out_free;
1143		list_add(&req->rq_list, &xprt->free);
1144	}
1145	if (max_alloc > num_prealloc)
1146		xprt->max_reqs = max_alloc;
1147	else
1148		xprt->max_reqs = num_prealloc;
1149	xprt->min_reqs = num_prealloc;
1150	atomic_set(&xprt->num_reqs, num_prealloc);
1151
1152	return xprt;
1153
1154out_free:
1155	xprt_free(xprt);
1156out:
1157	return NULL;
1158}
1159EXPORT_SYMBOL_GPL(xprt_alloc);
1160
1161void xprt_free(struct rpc_xprt *xprt)
1162{
1163	put_net(xprt->xprt_net);
1164	xprt_free_all_slots(xprt);
1165	kfree(xprt);
1166}
1167EXPORT_SYMBOL_GPL(xprt_free);
1168
1169/**
1170 * xprt_reserve - allocate an RPC request slot
1171 * @task: RPC task requesting a slot allocation
1172 *
1173 * If the transport is marked as being congested, or if no more
1174 * slots are available, place the task on the transport's
1175 * backlog queue.
1176 */
1177void xprt_reserve(struct rpc_task *task)
1178{
1179	struct rpc_xprt	*xprt;
1180
1181	task->tk_status = 0;
1182	if (task->tk_rqstp != NULL)
1183		return;
1184
1185	task->tk_timeout = 0;
1186	task->tk_status = -EAGAIN;
1187	rcu_read_lock();
1188	xprt = rcu_dereference(task->tk_client->cl_xprt);
1189	if (!xprt_throttle_congested(xprt, task))
1190		xprt->ops->alloc_slot(xprt, task);
1191	rcu_read_unlock();
1192}
1193
1194/**
1195 * xprt_retry_reserve - allocate an RPC request slot
1196 * @task: RPC task requesting a slot allocation
1197 *
1198 * If no more slots are available, place the task on the transport's
1199 * backlog queue.
1200 * Note that the only difference with xprt_reserve is that we now
1201 * ignore the value of the XPRT_CONGESTED flag.
1202 */
1203void xprt_retry_reserve(struct rpc_task *task)
1204{
1205	struct rpc_xprt	*xprt;
1206
1207	task->tk_status = 0;
1208	if (task->tk_rqstp != NULL)
1209		return;
1210
1211	task->tk_timeout = 0;
1212	task->tk_status = -EAGAIN;
1213	rcu_read_lock();
1214	xprt = rcu_dereference(task->tk_client->cl_xprt);
1215	xprt->ops->alloc_slot(xprt, task);
1216	rcu_read_unlock();
1217}
1218
1219static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
1220{
1221	return (__force __be32)xprt->xid++;
1222}
1223
1224static inline void xprt_init_xid(struct rpc_xprt *xprt)
1225{
1226	xprt->xid = prandom_u32();
1227}
1228
1229static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
1230{
1231	struct rpc_rqst	*req = task->tk_rqstp;
1232
1233	INIT_LIST_HEAD(&req->rq_list);
1234	req->rq_timeout = task->tk_client->cl_timeout->to_initval;
1235	req->rq_task	= task;
1236	req->rq_xprt    = xprt;
1237	req->rq_buffer  = NULL;
1238	req->rq_xid     = xprt_alloc_xid(xprt);
1239	req->rq_connect_cookie = xprt->connect_cookie - 1;
1240	req->rq_bytes_sent = 0;
1241	req->rq_snd_buf.len = 0;
1242	req->rq_snd_buf.buflen = 0;
1243	req->rq_rcv_buf.len = 0;
1244	req->rq_rcv_buf.buflen = 0;
1245	req->rq_release_snd_buf = NULL;
1246	xprt_reset_majortimeo(req);
1247	dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
1248			req, ntohl(req->rq_xid));
1249}
1250
1251/**
1252 * xprt_release - release an RPC request slot
1253 * @task: task which is finished with the slot
1254 *
1255 */
1256void xprt_release(struct rpc_task *task)
1257{
1258	struct rpc_xprt	*xprt;
1259	struct rpc_rqst	*req = task->tk_rqstp;
1260
1261	if (req == NULL) {
1262		if (task->tk_client) {
1263			rcu_read_lock();
1264			xprt = rcu_dereference(task->tk_client->cl_xprt);
1265			if (xprt->snd_task == task)
1266				xprt_release_write(xprt, task);
1267			rcu_read_unlock();
1268		}
1269		return;
1270	}
1271
1272	xprt = req->rq_xprt;
1273	if (task->tk_ops->rpc_count_stats != NULL)
1274		task->tk_ops->rpc_count_stats(task, task->tk_calldata);
1275	else if (task->tk_client)
1276		rpc_count_iostats(task, task->tk_client->cl_metrics);
1277	spin_lock_bh(&xprt->transport_lock);
1278	xprt->ops->release_xprt(xprt, task);
1279	if (xprt->ops->release_request)
1280		xprt->ops->release_request(task);
1281	if (!list_empty(&req->rq_list))
1282		list_del(&req->rq_list);
1283	xprt->last_used = jiffies;
1284	if (list_empty(&xprt->recv) && xprt_has_timer(xprt))
1285		mod_timer(&xprt->timer,
1286				xprt->last_used + xprt->idle_timeout);
1287	spin_unlock_bh(&xprt->transport_lock);
1288	if (req->rq_buffer)
1289		xprt->ops->buf_free(req->rq_buffer);
1290	if (req->rq_cred != NULL)
1291		put_rpccred(req->rq_cred);
1292	task->tk_rqstp = NULL;
1293	if (req->rq_release_snd_buf)
1294		req->rq_release_snd_buf(req);
1295
1296	dprintk("RPC: %5u release request %p\n", task->tk_pid, req);
1297	if (likely(!bc_prealloc(req)))
1298		xprt_free_slot(xprt, req);
1299	else
1300		xprt_free_bc_request(req);
1301}
1302
1303static void xprt_init(struct rpc_xprt *xprt, struct net *net)
1304{
1305	atomic_set(&xprt->count, 1);
1306
1307	spin_lock_init(&xprt->transport_lock);
1308	spin_lock_init(&xprt->reserve_lock);
1309
1310	INIT_LIST_HEAD(&xprt->free);
1311	INIT_LIST_HEAD(&xprt->recv);
1312#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1313	spin_lock_init(&xprt->bc_pa_lock);
1314	INIT_LIST_HEAD(&xprt->bc_pa_list);
1315#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1316
1317	xprt->last_used = jiffies;
1318	xprt->cwnd = RPC_INITCWND;
1319	xprt->bind_index = 0;
1320
1321	rpc_init_wait_queue(&xprt->binding, "xprt_binding");
1322	rpc_init_wait_queue(&xprt->pending, "xprt_pending");
1323	rpc_init_priority_wait_queue(&xprt->sending, "xprt_sending");
1324	rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
1325
1326	xprt_init_xid(xprt);
1327
1328	xprt->xprt_net = get_net(net);
1329}
1330
1331/**
1332 * xprt_create_transport - create an RPC transport
1333 * @args: rpc transport creation arguments
1334 *
1335 */
1336struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
1337{
1338	struct rpc_xprt	*xprt;
1339	struct xprt_class *t;
1340
1341	spin_lock(&xprt_list_lock);
1342	list_for_each_entry(t, &xprt_list, list) {
1343		if (t->ident == args->ident) {
1344			spin_unlock(&xprt_list_lock);
1345			goto found;
1346		}
1347	}
1348	spin_unlock(&xprt_list_lock);
1349	dprintk("RPC: transport (%d) not supported\n", args->ident);
1350	return ERR_PTR(-EIO);
1351
1352found:
1353	xprt = t->setup(args);
1354	if (IS_ERR(xprt)) {
1355		dprintk("RPC:       xprt_create_transport: failed, %ld\n",
1356				-PTR_ERR(xprt));
1357		goto out;
1358	}
1359	if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT)
1360		xprt->idle_timeout = 0;
1361	INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
1362	if (xprt_has_timer(xprt))
1363		setup_timer(&xprt->timer, xprt_init_autodisconnect,
1364			    (unsigned long)xprt);
1365	else
1366		init_timer(&xprt->timer);
1367
1368	if (strlen(args->servername) > RPC_MAXNETNAMELEN) {
1369		xprt_destroy(xprt);
1370		return ERR_PTR(-EINVAL);
1371	}
1372	xprt->servername = kstrdup(args->servername, GFP_KERNEL);
1373	if (xprt->servername == NULL) {
1374		xprt_destroy(xprt);
1375		return ERR_PTR(-ENOMEM);
1376	}
1377
1378	rpc_xprt_debugfs_register(xprt);
1379
1380	dprintk("RPC:       created transport %p with %u slots\n", xprt,
1381			xprt->max_reqs);
1382out:
1383	return xprt;
1384}
1385
1386/**
1387 * xprt_destroy - destroy an RPC transport, killing off all requests.
1388 * @xprt: transport to destroy
1389 *
1390 */
1391static void xprt_destroy(struct rpc_xprt *xprt)
1392{
1393	dprintk("RPC:       destroying transport %p\n", xprt);
1394
1395	/* Exclude transport connect/disconnect handlers */
1396	wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE);
1397
1398	del_timer_sync(&xprt->timer);
1399
1400	rpc_xprt_debugfs_unregister(xprt);
1401	rpc_destroy_wait_queue(&xprt->binding);
1402	rpc_destroy_wait_queue(&xprt->pending);
1403	rpc_destroy_wait_queue(&xprt->sending);
1404	rpc_destroy_wait_queue(&xprt->backlog);
1405	cancel_work_sync(&xprt->task_cleanup);
1406	kfree(xprt->servername);
1407	/*
1408	 * Tear down transport state and free the rpc_xprt
1409	 */
1410	xprt->ops->destroy(xprt);
1411}
1412
1413/**
1414 * xprt_put - release a reference to an RPC transport.
1415 * @xprt: pointer to the transport
1416 *
1417 */
1418void xprt_put(struct rpc_xprt *xprt)
1419{
1420	if (atomic_dec_and_test(&xprt->count))
1421		xprt_destroy(xprt);
1422}
1423