1/*
2 * linux/net/sunrpc/svc.c
3 *
4 * High-level RPC service routines
5 *
6 * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
7 *
8 * Multiple threads pools and NUMAisation
9 * Copyright (c) 2006 Silicon Graphics, Inc.
10 * by Greg Banks <gnb@melbourne.sgi.com>
11 */
12
13#include <linux/linkage.h>
14#include <linux/sched.h>
15#include <linux/errno.h>
16#include <linux/net.h>
17#include <linux/in.h>
18#include <linux/mm.h>
19#include <linux/interrupt.h>
20#include <linux/module.h>
21#include <linux/kthread.h>
22#include <linux/slab.h>
23
24#include <linux/sunrpc/types.h>
25#include <linux/sunrpc/xdr.h>
26#include <linux/sunrpc/stats.h>
27#include <linux/sunrpc/svcsock.h>
28#include <linux/sunrpc/clnt.h>
29#include <linux/sunrpc/bc_xprt.h>
30
31#include <trace/events/sunrpc.h>
32
33#define RPCDBG_FACILITY	RPCDBG_SVCDSP
34
35static void svc_unregister(const struct svc_serv *serv, struct net *net);
36
37#define svc_serv_is_pooled(serv)    ((serv)->sv_function)
38
39/*
40 * Mode for mapping cpus to pools.
41 */
42enum {
43	SVC_POOL_AUTO = -1,	/* choose one of the others */
44	SVC_POOL_GLOBAL,	/* no mapping, just a single global pool
45				 * (legacy & UP mode) */
46	SVC_POOL_PERCPU,	/* one pool per cpu */
47	SVC_POOL_PERNODE	/* one pool per numa node */
48};
49#define SVC_POOL_DEFAULT	SVC_POOL_GLOBAL
50
51/*
52 * Structure for mapping cpus to pools and vice versa.
53 * Setup once during sunrpc initialisation.
54 */
55static struct svc_pool_map {
56	int count;			/* How many svc_servs use us */
57	int mode;			/* Note: int not enum to avoid
58					 * warnings about "enumeration value
59					 * not handled in switch" */
60	unsigned int npools;
61	unsigned int *pool_to;		/* maps pool id to cpu or node */
62	unsigned int *to_pool;		/* maps cpu or node to pool id */
63} svc_pool_map = {
64	.count = 0,
65	.mode = SVC_POOL_DEFAULT
66};
67static DEFINE_MUTEX(svc_pool_map_mutex);/* protects svc_pool_map.count only */
68
69static int
70param_set_pool_mode(const char *val, struct kernel_param *kp)
71{
72	int *ip = (int *)kp->arg;
73	struct svc_pool_map *m = &svc_pool_map;
74	int err;
75
76	mutex_lock(&svc_pool_map_mutex);
77
78	err = -EBUSY;
79	if (m->count)
80		goto out;
81
82	err = 0;
83	if (!strncmp(val, "auto", 4))
84		*ip = SVC_POOL_AUTO;
85	else if (!strncmp(val, "global", 6))
86		*ip = SVC_POOL_GLOBAL;
87	else if (!strncmp(val, "percpu", 6))
88		*ip = SVC_POOL_PERCPU;
89	else if (!strncmp(val, "pernode", 7))
90		*ip = SVC_POOL_PERNODE;
91	else
92		err = -EINVAL;
93
94out:
95	mutex_unlock(&svc_pool_map_mutex);
96	return err;
97}
98
99static int
100param_get_pool_mode(char *buf, struct kernel_param *kp)
101{
102	int *ip = (int *)kp->arg;
103
104	switch (*ip)
105	{
106	case SVC_POOL_AUTO:
107		return strlcpy(buf, "auto", 20);
108	case SVC_POOL_GLOBAL:
109		return strlcpy(buf, "global", 20);
110	case SVC_POOL_PERCPU:
111		return strlcpy(buf, "percpu", 20);
112	case SVC_POOL_PERNODE:
113		return strlcpy(buf, "pernode", 20);
114	default:
115		return sprintf(buf, "%d", *ip);
116	}
117}
118
119module_param_call(pool_mode, param_set_pool_mode, param_get_pool_mode,
120		 &svc_pool_map.mode, 0644);
121
122/*
123 * Detect best pool mapping mode heuristically,
124 * according to the machine's topology.
125 */
126static int
127svc_pool_map_choose_mode(void)
128{
129	unsigned int node;
130
131	if (nr_online_nodes > 1) {
132		/*
133		 * Actually have multiple NUMA nodes,
134		 * so split pools on NUMA node boundaries
135		 */
136		return SVC_POOL_PERNODE;
137	}
138
139	node = first_online_node;
140	if (nr_cpus_node(node) > 2) {
141		/*
142		 * Non-trivial SMP, or CONFIG_NUMA on
143		 * non-NUMA hardware, e.g. with a generic
144		 * x86_64 kernel on Xeons.  In this case we
145		 * want to divide the pools on cpu boundaries.
146		 */
147		return SVC_POOL_PERCPU;
148	}
149
150	/* default: one global pool */
151	return SVC_POOL_GLOBAL;
152}
153
154/*
155 * Allocate the to_pool[] and pool_to[] arrays.
156 * Returns 0 on success or an errno.
157 */
158static int
159svc_pool_map_alloc_arrays(struct svc_pool_map *m, unsigned int maxpools)
160{
161	m->to_pool = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
162	if (!m->to_pool)
163		goto fail;
164	m->pool_to = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
165	if (!m->pool_to)
166		goto fail_free;
167
168	return 0;
169
170fail_free:
171	kfree(m->to_pool);
172	m->to_pool = NULL;
173fail:
174	return -ENOMEM;
175}
176
177/*
178 * Initialise the pool map for SVC_POOL_PERCPU mode.
179 * Returns number of pools or <0 on error.
180 */
181static int
182svc_pool_map_init_percpu(struct svc_pool_map *m)
183{
184	unsigned int maxpools = nr_cpu_ids;
185	unsigned int pidx = 0;
186	unsigned int cpu;
187	int err;
188
189	err = svc_pool_map_alloc_arrays(m, maxpools);
190	if (err)
191		return err;
192
193	for_each_online_cpu(cpu) {
194		BUG_ON(pidx >= maxpools);
195		m->to_pool[cpu] = pidx;
196		m->pool_to[pidx] = cpu;
197		pidx++;
198	}
199	/* cpus brought online later all get mapped to pool0, sorry */
200
201	return pidx;
202};
203
204
205/*
206 * Initialise the pool map for SVC_POOL_PERNODE mode.
207 * Returns number of pools or <0 on error.
208 */
209static int
210svc_pool_map_init_pernode(struct svc_pool_map *m)
211{
212	unsigned int maxpools = nr_node_ids;
213	unsigned int pidx = 0;
214	unsigned int node;
215	int err;
216
217	err = svc_pool_map_alloc_arrays(m, maxpools);
218	if (err)
219		return err;
220
221	for_each_node_with_cpus(node) {
222		/* some architectures (e.g. SN2) have cpuless nodes */
223		BUG_ON(pidx > maxpools);
224		m->to_pool[node] = pidx;
225		m->pool_to[pidx] = node;
226		pidx++;
227	}
228	/* nodes brought online later all get mapped to pool0, sorry */
229
230	return pidx;
231}
232
233
234/*
235 * Add a reference to the global map of cpus to pools (and
236 * vice versa).  Initialise the map if we're the first user.
237 * Returns the number of pools.
238 */
239static unsigned int
240svc_pool_map_get(void)
241{
242	struct svc_pool_map *m = &svc_pool_map;
243	int npools = -1;
244
245	mutex_lock(&svc_pool_map_mutex);
246
247	if (m->count++) {
248		mutex_unlock(&svc_pool_map_mutex);
249		return m->npools;
250	}
251
252	if (m->mode == SVC_POOL_AUTO)
253		m->mode = svc_pool_map_choose_mode();
254
255	switch (m->mode) {
256	case SVC_POOL_PERCPU:
257		npools = svc_pool_map_init_percpu(m);
258		break;
259	case SVC_POOL_PERNODE:
260		npools = svc_pool_map_init_pernode(m);
261		break;
262	}
263
264	if (npools < 0) {
265		/* default, or memory allocation failure */
266		npools = 1;
267		m->mode = SVC_POOL_GLOBAL;
268	}
269	m->npools = npools;
270
271	mutex_unlock(&svc_pool_map_mutex);
272	return m->npools;
273}
274
275
276/*
277 * Drop a reference to the global map of cpus to pools.
278 * When the last reference is dropped, the map data is
279 * freed; this allows the sysadmin to change the pool
280 * mode using the pool_mode module option without
281 * rebooting or re-loading sunrpc.ko.
282 */
283static void
284svc_pool_map_put(void)
285{
286	struct svc_pool_map *m = &svc_pool_map;
287
288	mutex_lock(&svc_pool_map_mutex);
289
290	if (!--m->count) {
291		kfree(m->to_pool);
292		m->to_pool = NULL;
293		kfree(m->pool_to);
294		m->pool_to = NULL;
295		m->npools = 0;
296	}
297
298	mutex_unlock(&svc_pool_map_mutex);
299}
300
301
302static int svc_pool_map_get_node(unsigned int pidx)
303{
304	const struct svc_pool_map *m = &svc_pool_map;
305
306	if (m->count) {
307		if (m->mode == SVC_POOL_PERCPU)
308			return cpu_to_node(m->pool_to[pidx]);
309		if (m->mode == SVC_POOL_PERNODE)
310			return m->pool_to[pidx];
311	}
312	return NUMA_NO_NODE;
313}
314/*
315 * Set the given thread's cpus_allowed mask so that it
316 * will only run on cpus in the given pool.
317 */
318static inline void
319svc_pool_map_set_cpumask(struct task_struct *task, unsigned int pidx)
320{
321	struct svc_pool_map *m = &svc_pool_map;
322	unsigned int node = m->pool_to[pidx];
323
324	/*
325	 * The caller checks for sv_nrpools > 1, which
326	 * implies that we've been initialized.
327	 */
328	WARN_ON_ONCE(m->count == 0);
329	if (m->count == 0)
330		return;
331
332	switch (m->mode) {
333	case SVC_POOL_PERCPU:
334	{
335		set_cpus_allowed_ptr(task, cpumask_of(node));
336		break;
337	}
338	case SVC_POOL_PERNODE:
339	{
340		set_cpus_allowed_ptr(task, cpumask_of_node(node));
341		break;
342	}
343	}
344}
345
346/*
347 * Use the mapping mode to choose a pool for a given CPU.
348 * Used when enqueueing an incoming RPC.  Always returns
349 * a non-NULL pool pointer.
350 */
351struct svc_pool *
352svc_pool_for_cpu(struct svc_serv *serv, int cpu)
353{
354	struct svc_pool_map *m = &svc_pool_map;
355	unsigned int pidx = 0;
356
357	/*
358	 * An uninitialised map happens in a pure client when
359	 * lockd is brought up, so silently treat it the
360	 * same as SVC_POOL_GLOBAL.
361	 */
362	if (svc_serv_is_pooled(serv)) {
363		switch (m->mode) {
364		case SVC_POOL_PERCPU:
365			pidx = m->to_pool[cpu];
366			break;
367		case SVC_POOL_PERNODE:
368			pidx = m->to_pool[cpu_to_node(cpu)];
369			break;
370		}
371	}
372	return &serv->sv_pools[pidx % serv->sv_nrpools];
373}
374
375int svc_rpcb_setup(struct svc_serv *serv, struct net *net)
376{
377	int err;
378
379	err = rpcb_create_local(net);
380	if (err)
381		return err;
382
383	/* Remove any stale portmap registrations */
384	svc_unregister(serv, net);
385	return 0;
386}
387EXPORT_SYMBOL_GPL(svc_rpcb_setup);
388
389void svc_rpcb_cleanup(struct svc_serv *serv, struct net *net)
390{
391	svc_unregister(serv, net);
392	rpcb_put_local(net);
393}
394EXPORT_SYMBOL_GPL(svc_rpcb_cleanup);
395
396static int svc_uses_rpcbind(struct svc_serv *serv)
397{
398	struct svc_program	*progp;
399	unsigned int		i;
400
401	for (progp = serv->sv_program; progp; progp = progp->pg_next) {
402		for (i = 0; i < progp->pg_nvers; i++) {
403			if (progp->pg_vers[i] == NULL)
404				continue;
405			if (progp->pg_vers[i]->vs_hidden == 0)
406				return 1;
407		}
408	}
409
410	return 0;
411}
412
413int svc_bind(struct svc_serv *serv, struct net *net)
414{
415	if (!svc_uses_rpcbind(serv))
416		return 0;
417	return svc_rpcb_setup(serv, net);
418}
419EXPORT_SYMBOL_GPL(svc_bind);
420
421/*
422 * Create an RPC service
423 */
424static struct svc_serv *
425__svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
426	     void (*shutdown)(struct svc_serv *serv, struct net *net))
427{
428	struct svc_serv	*serv;
429	unsigned int vers;
430	unsigned int xdrsize;
431	unsigned int i;
432
433	if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL)))
434		return NULL;
435	serv->sv_name      = prog->pg_name;
436	serv->sv_program   = prog;
437	serv->sv_nrthreads = 1;
438	serv->sv_stats     = prog->pg_stats;
439	if (bufsize > RPCSVC_MAXPAYLOAD)
440		bufsize = RPCSVC_MAXPAYLOAD;
441	serv->sv_max_payload = bufsize? bufsize : 4096;
442	serv->sv_max_mesg  = roundup(serv->sv_max_payload + PAGE_SIZE, PAGE_SIZE);
443	serv->sv_shutdown  = shutdown;
444	xdrsize = 0;
445	while (prog) {
446		prog->pg_lovers = prog->pg_nvers-1;
447		for (vers=0; vers<prog->pg_nvers ; vers++)
448			if (prog->pg_vers[vers]) {
449				prog->pg_hivers = vers;
450				if (prog->pg_lovers > vers)
451					prog->pg_lovers = vers;
452				if (prog->pg_vers[vers]->vs_xdrsize > xdrsize)
453					xdrsize = prog->pg_vers[vers]->vs_xdrsize;
454			}
455		prog = prog->pg_next;
456	}
457	serv->sv_xdrsize   = xdrsize;
458	INIT_LIST_HEAD(&serv->sv_tempsocks);
459	INIT_LIST_HEAD(&serv->sv_permsocks);
460	init_timer(&serv->sv_temptimer);
461	spin_lock_init(&serv->sv_lock);
462
463	serv->sv_nrpools = npools;
464	serv->sv_pools =
465		kcalloc(serv->sv_nrpools, sizeof(struct svc_pool),
466			GFP_KERNEL);
467	if (!serv->sv_pools) {
468		kfree(serv);
469		return NULL;
470	}
471
472	for (i = 0; i < serv->sv_nrpools; i++) {
473		struct svc_pool *pool = &serv->sv_pools[i];
474
475		dprintk("svc: initialising pool %u for %s\n",
476				i, serv->sv_name);
477
478		pool->sp_id = i;
479		INIT_LIST_HEAD(&pool->sp_sockets);
480		INIT_LIST_HEAD(&pool->sp_all_threads);
481		spin_lock_init(&pool->sp_lock);
482	}
483
484	return serv;
485}
486
487struct svc_serv *
488svc_create(struct svc_program *prog, unsigned int bufsize,
489	   void (*shutdown)(struct svc_serv *serv, struct net *net))
490{
491	return __svc_create(prog, bufsize, /*npools*/1, shutdown);
492}
493EXPORT_SYMBOL_GPL(svc_create);
494
495struct svc_serv *
496svc_create_pooled(struct svc_program *prog, unsigned int bufsize,
497		  void (*shutdown)(struct svc_serv *serv, struct net *net),
498		  svc_thread_fn func, struct module *mod)
499{
500	struct svc_serv *serv;
501	unsigned int npools = svc_pool_map_get();
502
503	serv = __svc_create(prog, bufsize, npools, shutdown);
504	if (!serv)
505		goto out_err;
506
507	serv->sv_function = func;
508	serv->sv_module = mod;
509	return serv;
510out_err:
511	svc_pool_map_put();
512	return NULL;
513}
514EXPORT_SYMBOL_GPL(svc_create_pooled);
515
516void svc_shutdown_net(struct svc_serv *serv, struct net *net)
517{
518	svc_close_net(serv, net);
519
520	if (serv->sv_shutdown)
521		serv->sv_shutdown(serv, net);
522}
523EXPORT_SYMBOL_GPL(svc_shutdown_net);
524
525/*
526 * Destroy an RPC service. Should be called with appropriate locking to
527 * protect the sv_nrthreads, sv_permsocks and sv_tempsocks.
528 */
529void
530svc_destroy(struct svc_serv *serv)
531{
532	dprintk("svc: svc_destroy(%s, %d)\n",
533				serv->sv_program->pg_name,
534				serv->sv_nrthreads);
535
536	if (serv->sv_nrthreads) {
537		if (--(serv->sv_nrthreads) != 0) {
538			svc_sock_update_bufs(serv);
539			return;
540		}
541	} else
542		printk("svc_destroy: no threads for serv=%p!\n", serv);
543
544	del_timer_sync(&serv->sv_temptimer);
545
546	/*
547	 * The last user is gone and thus all sockets have to be destroyed to
548	 * the point. Check this.
549	 */
550	BUG_ON(!list_empty(&serv->sv_permsocks));
551	BUG_ON(!list_empty(&serv->sv_tempsocks));
552
553	cache_clean_deferred(serv);
554
555	if (svc_serv_is_pooled(serv))
556		svc_pool_map_put();
557
558	kfree(serv->sv_pools);
559	kfree(serv);
560}
561EXPORT_SYMBOL_GPL(svc_destroy);
562
563/*
564 * Allocate an RPC server's buffer space.
565 * We allocate pages and place them in rq_argpages.
566 */
567static int
568svc_init_buffer(struct svc_rqst *rqstp, unsigned int size, int node)
569{
570	unsigned int pages, arghi;
571
572	/* bc_xprt uses fore channel allocated buffers */
573	if (svc_is_backchannel(rqstp))
574		return 1;
575
576	pages = size / PAGE_SIZE + 1; /* extra page as we hold both request and reply.
577				       * We assume one is at most one page
578				       */
579	arghi = 0;
580	WARN_ON_ONCE(pages > RPCSVC_MAXPAGES);
581	if (pages > RPCSVC_MAXPAGES)
582		pages = RPCSVC_MAXPAGES;
583	while (pages) {
584		struct page *p = alloc_pages_node(node, GFP_KERNEL, 0);
585		if (!p)
586			break;
587		rqstp->rq_pages[arghi++] = p;
588		pages--;
589	}
590	return pages == 0;
591}
592
593/*
594 * Release an RPC server buffer
595 */
596static void
597svc_release_buffer(struct svc_rqst *rqstp)
598{
599	unsigned int i;
600
601	for (i = 0; i < ARRAY_SIZE(rqstp->rq_pages); i++)
602		if (rqstp->rq_pages[i])
603			put_page(rqstp->rq_pages[i]);
604}
605
606struct svc_rqst *
607svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
608{
609	struct svc_rqst	*rqstp;
610
611	rqstp = kzalloc_node(sizeof(*rqstp), GFP_KERNEL, node);
612	if (!rqstp)
613		goto out_enomem;
614
615	serv->sv_nrthreads++;
616	__set_bit(RQ_BUSY, &rqstp->rq_flags);
617	spin_lock_init(&rqstp->rq_lock);
618	rqstp->rq_server = serv;
619	rqstp->rq_pool = pool;
620	spin_lock_bh(&pool->sp_lock);
621	pool->sp_nrthreads++;
622	list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads);
623	spin_unlock_bh(&pool->sp_lock);
624
625	rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
626	if (!rqstp->rq_argp)
627		goto out_thread;
628
629	rqstp->rq_resp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
630	if (!rqstp->rq_resp)
631		goto out_thread;
632
633	if (!svc_init_buffer(rqstp, serv->sv_max_mesg, node))
634		goto out_thread;
635
636	return rqstp;
637out_thread:
638	svc_exit_thread(rqstp);
639out_enomem:
640	return ERR_PTR(-ENOMEM);
641}
642EXPORT_SYMBOL_GPL(svc_prepare_thread);
643
644/*
645 * Choose a pool in which to create a new thread, for svc_set_num_threads
646 */
647static inline struct svc_pool *
648choose_pool(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
649{
650	if (pool != NULL)
651		return pool;
652
653	return &serv->sv_pools[(*state)++ % serv->sv_nrpools];
654}
655
656/*
657 * Choose a thread to kill, for svc_set_num_threads
658 */
659static inline struct task_struct *
660choose_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
661{
662	unsigned int i;
663	struct task_struct *task = NULL;
664
665	if (pool != NULL) {
666		spin_lock_bh(&pool->sp_lock);
667	} else {
668		/* choose a pool in round-robin fashion */
669		for (i = 0; i < serv->sv_nrpools; i++) {
670			pool = &serv->sv_pools[--(*state) % serv->sv_nrpools];
671			spin_lock_bh(&pool->sp_lock);
672			if (!list_empty(&pool->sp_all_threads))
673				goto found_pool;
674			spin_unlock_bh(&pool->sp_lock);
675		}
676		return NULL;
677	}
678
679found_pool:
680	if (!list_empty(&pool->sp_all_threads)) {
681		struct svc_rqst *rqstp;
682
683		/*
684		 * Remove from the pool->sp_all_threads list
685		 * so we don't try to kill it again.
686		 */
687		rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all);
688		set_bit(RQ_VICTIM, &rqstp->rq_flags);
689		list_del_rcu(&rqstp->rq_all);
690		task = rqstp->rq_task;
691	}
692	spin_unlock_bh(&pool->sp_lock);
693
694	return task;
695}
696
697/*
698 * Create or destroy enough new threads to make the number
699 * of threads the given number.  If `pool' is non-NULL, applies
700 * only to threads in that pool, otherwise round-robins between
701 * all pools.  Caller must ensure that mutual exclusion between this and
702 * server startup or shutdown.
703 *
704 * Destroying threads relies on the service threads filling in
705 * rqstp->rq_task, which only the nfs ones do.  Assumes the serv
706 * has been created using svc_create_pooled().
707 *
708 * Based on code that used to be in nfsd_svc() but tweaked
709 * to be pool-aware.
710 */
711int
712svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
713{
714	struct svc_rqst	*rqstp;
715	struct task_struct *task;
716	struct svc_pool *chosen_pool;
717	int error = 0;
718	unsigned int state = serv->sv_nrthreads-1;
719	int node;
720
721	if (pool == NULL) {
722		/* The -1 assumes caller has done a svc_get() */
723		nrservs -= (serv->sv_nrthreads-1);
724	} else {
725		spin_lock_bh(&pool->sp_lock);
726		nrservs -= pool->sp_nrthreads;
727		spin_unlock_bh(&pool->sp_lock);
728	}
729
730	/* create new threads */
731	while (nrservs > 0) {
732		nrservs--;
733		chosen_pool = choose_pool(serv, pool, &state);
734
735		node = svc_pool_map_get_node(chosen_pool->sp_id);
736		rqstp = svc_prepare_thread(serv, chosen_pool, node);
737		if (IS_ERR(rqstp)) {
738			error = PTR_ERR(rqstp);
739			break;
740		}
741
742		__module_get(serv->sv_module);
743		task = kthread_create_on_node(serv->sv_function, rqstp,
744					      node, "%s", serv->sv_name);
745		if (IS_ERR(task)) {
746			error = PTR_ERR(task);
747			module_put(serv->sv_module);
748			svc_exit_thread(rqstp);
749			break;
750		}
751
752		rqstp->rq_task = task;
753		if (serv->sv_nrpools > 1)
754			svc_pool_map_set_cpumask(task, chosen_pool->sp_id);
755
756		svc_sock_update_bufs(serv);
757		wake_up_process(task);
758	}
759	/* destroy old threads */
760	while (nrservs < 0 &&
761	       (task = choose_victim(serv, pool, &state)) != NULL) {
762		send_sig(SIGINT, task, 1);
763		nrservs++;
764	}
765
766	return error;
767}
768EXPORT_SYMBOL_GPL(svc_set_num_threads);
769
770/*
771 * Called from a server thread as it's exiting. Caller must hold the "service
772 * mutex" for the service.
773 */
774void
775svc_exit_thread(struct svc_rqst *rqstp)
776{
777	struct svc_serv	*serv = rqstp->rq_server;
778	struct svc_pool	*pool = rqstp->rq_pool;
779
780	svc_release_buffer(rqstp);
781	kfree(rqstp->rq_resp);
782	kfree(rqstp->rq_argp);
783	kfree(rqstp->rq_auth_data);
784
785	spin_lock_bh(&pool->sp_lock);
786	pool->sp_nrthreads--;
787	if (!test_and_set_bit(RQ_VICTIM, &rqstp->rq_flags))
788		list_del_rcu(&rqstp->rq_all);
789	spin_unlock_bh(&pool->sp_lock);
790
791	kfree_rcu(rqstp, rq_rcu_head);
792
793	/* Release the server */
794	if (serv)
795		svc_destroy(serv);
796}
797EXPORT_SYMBOL_GPL(svc_exit_thread);
798
799/*
800 * Register an "inet" protocol family netid with the local
801 * rpcbind daemon via an rpcbind v4 SET request.
802 *
803 * No netconfig infrastructure is available in the kernel, so
804 * we map IP_ protocol numbers to netids by hand.
805 *
806 * Returns zero on success; a negative errno value is returned
807 * if any error occurs.
808 */
809static int __svc_rpcb_register4(struct net *net, const u32 program,
810				const u32 version,
811				const unsigned short protocol,
812				const unsigned short port)
813{
814	const struct sockaddr_in sin = {
815		.sin_family		= AF_INET,
816		.sin_addr.s_addr	= htonl(INADDR_ANY),
817		.sin_port		= htons(port),
818	};
819	const char *netid;
820	int error;
821
822	switch (protocol) {
823	case IPPROTO_UDP:
824		netid = RPCBIND_NETID_UDP;
825		break;
826	case IPPROTO_TCP:
827		netid = RPCBIND_NETID_TCP;
828		break;
829	default:
830		return -ENOPROTOOPT;
831	}
832
833	error = rpcb_v4_register(net, program, version,
834					(const struct sockaddr *)&sin, netid);
835
836	/*
837	 * User space didn't support rpcbind v4, so retry this
838	 * registration request with the legacy rpcbind v2 protocol.
839	 */
840	if (error == -EPROTONOSUPPORT)
841		error = rpcb_register(net, program, version, protocol, port);
842
843	return error;
844}
845
846#if IS_ENABLED(CONFIG_IPV6)
847/*
848 * Register an "inet6" protocol family netid with the local
849 * rpcbind daemon via an rpcbind v4 SET request.
850 *
851 * No netconfig infrastructure is available in the kernel, so
852 * we map IP_ protocol numbers to netids by hand.
853 *
854 * Returns zero on success; a negative errno value is returned
855 * if any error occurs.
856 */
857static int __svc_rpcb_register6(struct net *net, const u32 program,
858				const u32 version,
859				const unsigned short protocol,
860				const unsigned short port)
861{
862	const struct sockaddr_in6 sin6 = {
863		.sin6_family		= AF_INET6,
864		.sin6_addr		= IN6ADDR_ANY_INIT,
865		.sin6_port		= htons(port),
866	};
867	const char *netid;
868	int error;
869
870	switch (protocol) {
871	case IPPROTO_UDP:
872		netid = RPCBIND_NETID_UDP6;
873		break;
874	case IPPROTO_TCP:
875		netid = RPCBIND_NETID_TCP6;
876		break;
877	default:
878		return -ENOPROTOOPT;
879	}
880
881	error = rpcb_v4_register(net, program, version,
882					(const struct sockaddr *)&sin6, netid);
883
884	/*
885	 * User space didn't support rpcbind version 4, so we won't
886	 * use a PF_INET6 listener.
887	 */
888	if (error == -EPROTONOSUPPORT)
889		error = -EAFNOSUPPORT;
890
891	return error;
892}
893#endif	/* IS_ENABLED(CONFIG_IPV6) */
894
895/*
896 * Register a kernel RPC service via rpcbind version 4.
897 *
898 * Returns zero on success; a negative errno value is returned
899 * if any error occurs.
900 */
901static int __svc_register(struct net *net, const char *progname,
902			  const u32 program, const u32 version,
903			  const int family,
904			  const unsigned short protocol,
905			  const unsigned short port)
906{
907	int error = -EAFNOSUPPORT;
908
909	switch (family) {
910	case PF_INET:
911		error = __svc_rpcb_register4(net, program, version,
912						protocol, port);
913		break;
914#if IS_ENABLED(CONFIG_IPV6)
915	case PF_INET6:
916		error = __svc_rpcb_register6(net, program, version,
917						protocol, port);
918#endif
919	}
920
921	return error;
922}
923
924/**
925 * svc_register - register an RPC service with the local portmapper
926 * @serv: svc_serv struct for the service to register
927 * @net: net namespace for the service to register
928 * @family: protocol family of service's listener socket
929 * @proto: transport protocol number to advertise
930 * @port: port to advertise
931 *
932 * Service is registered for any address in the passed-in protocol family
933 */
934int svc_register(const struct svc_serv *serv, struct net *net,
935		 const int family, const unsigned short proto,
936		 const unsigned short port)
937{
938	struct svc_program	*progp;
939	struct svc_version	*vers;
940	unsigned int		i;
941	int			error = 0;
942
943	WARN_ON_ONCE(proto == 0 && port == 0);
944	if (proto == 0 && port == 0)
945		return -EINVAL;
946
947	for (progp = serv->sv_program; progp; progp = progp->pg_next) {
948		for (i = 0; i < progp->pg_nvers; i++) {
949			vers = progp->pg_vers[i];
950			if (vers == NULL)
951				continue;
952
953			dprintk("svc: svc_register(%sv%d, %s, %u, %u)%s\n",
954					progp->pg_name,
955					i,
956					proto == IPPROTO_UDP?  "udp" : "tcp",
957					port,
958					family,
959					vers->vs_hidden ?
960					" (but not telling portmap)" : "");
961
962			if (vers->vs_hidden)
963				continue;
964
965			error = __svc_register(net, progp->pg_name, progp->pg_prog,
966						i, family, proto, port);
967
968			if (vers->vs_rpcb_optnl) {
969				error = 0;
970				continue;
971			}
972
973			if (error < 0) {
974				printk(KERN_WARNING "svc: failed to register "
975					"%sv%u RPC service (errno %d).\n",
976					progp->pg_name, i, -error);
977				break;
978			}
979		}
980	}
981
982	return error;
983}
984
985/*
986 * If user space is running rpcbind, it should take the v4 UNSET
987 * and clear everything for this [program, version].  If user space
988 * is running portmap, it will reject the v4 UNSET, but won't have
989 * any "inet6" entries anyway.  So a PMAP_UNSET should be sufficient
990 * in this case to clear all existing entries for [program, version].
991 */
992static void __svc_unregister(struct net *net, const u32 program, const u32 version,
993			     const char *progname)
994{
995	int error;
996
997	error = rpcb_v4_register(net, program, version, NULL, "");
998
999	/*
1000	 * User space didn't support rpcbind v4, so retry this
1001	 * request with the legacy rpcbind v2 protocol.
1002	 */
1003	if (error == -EPROTONOSUPPORT)
1004		error = rpcb_register(net, program, version, 0, 0);
1005
1006	dprintk("svc: %s(%sv%u), error %d\n",
1007			__func__, progname, version, error);
1008}
1009
1010/*
1011 * All netids, bind addresses and ports registered for [program, version]
1012 * are removed from the local rpcbind database (if the service is not
1013 * hidden) to make way for a new instance of the service.
1014 *
1015 * The result of unregistration is reported via dprintk for those who want
1016 * verification of the result, but is otherwise not important.
1017 */
1018static void svc_unregister(const struct svc_serv *serv, struct net *net)
1019{
1020	struct svc_program *progp;
1021	unsigned long flags;
1022	unsigned int i;
1023
1024	clear_thread_flag(TIF_SIGPENDING);
1025
1026	for (progp = serv->sv_program; progp; progp = progp->pg_next) {
1027		for (i = 0; i < progp->pg_nvers; i++) {
1028			if (progp->pg_vers[i] == NULL)
1029				continue;
1030			if (progp->pg_vers[i]->vs_hidden)
1031				continue;
1032
1033			dprintk("svc: attempting to unregister %sv%u\n",
1034				progp->pg_name, i);
1035			__svc_unregister(net, progp->pg_prog, i, progp->pg_name);
1036		}
1037	}
1038
1039	spin_lock_irqsave(&current->sighand->siglock, flags);
1040	recalc_sigpending();
1041	spin_unlock_irqrestore(&current->sighand->siglock, flags);
1042}
1043
1044/*
1045 * dprintk the given error with the address of the client that caused it.
1046 */
1047#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
1048static __printf(2, 3)
1049void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...)
1050{
1051	struct va_format vaf;
1052	va_list args;
1053	char 	buf[RPC_MAX_ADDRBUFLEN];
1054
1055	va_start(args, fmt);
1056
1057	vaf.fmt = fmt;
1058	vaf.va = &args;
1059
1060	dprintk("svc: %s: %pV", svc_print_addr(rqstp, buf, sizeof(buf)), &vaf);
1061
1062	va_end(args);
1063}
1064#else
1065static __printf(2,3) void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...) {}
1066#endif
1067
1068/*
1069 * Common routine for processing the RPC request.
1070 */
1071static int
1072svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
1073{
1074	struct svc_program	*progp;
1075	struct svc_version	*versp = NULL;	/* compiler food */
1076	struct svc_procedure	*procp = NULL;
1077	struct svc_serv		*serv = rqstp->rq_server;
1078	kxdrproc_t		xdr;
1079	__be32			*statp;
1080	u32			prog, vers, proc;
1081	__be32			auth_stat, rpc_stat;
1082	int			auth_res;
1083	__be32			*reply_statp;
1084
1085	rpc_stat = rpc_success;
1086
1087	if (argv->iov_len < 6*4)
1088		goto err_short_len;
1089
1090	/* Will be turned off only in gss privacy case: */
1091	set_bit(RQ_SPLICE_OK, &rqstp->rq_flags);
1092	/* Will be turned off only when NFSv4 Sessions are used */
1093	set_bit(RQ_USEDEFERRAL, &rqstp->rq_flags);
1094	clear_bit(RQ_DROPME, &rqstp->rq_flags);
1095
1096	/* Setup reply header */
1097	rqstp->rq_xprt->xpt_ops->xpo_prep_reply_hdr(rqstp);
1098
1099	svc_putu32(resv, rqstp->rq_xid);
1100
1101	vers = svc_getnl(argv);
1102
1103	/* First words of reply: */
1104	svc_putnl(resv, 1);		/* REPLY */
1105
1106	if (vers != 2)		/* RPC version number */
1107		goto err_bad_rpc;
1108
1109	/* Save position in case we later decide to reject: */
1110	reply_statp = resv->iov_base + resv->iov_len;
1111
1112	svc_putnl(resv, 0);		/* ACCEPT */
1113
1114	rqstp->rq_prog = prog = svc_getnl(argv);	/* program number */
1115	rqstp->rq_vers = vers = svc_getnl(argv);	/* version number */
1116	rqstp->rq_proc = proc = svc_getnl(argv);	/* procedure number */
1117
1118	for (progp = serv->sv_program; progp; progp = progp->pg_next)
1119		if (prog == progp->pg_prog)
1120			break;
1121
1122	/*
1123	 * Decode auth data, and add verifier to reply buffer.
1124	 * We do this before anything else in order to get a decent
1125	 * auth verifier.
1126	 */
1127	auth_res = svc_authenticate(rqstp, &auth_stat);
1128	/* Also give the program a chance to reject this call: */
1129	if (auth_res == SVC_OK && progp) {
1130		auth_stat = rpc_autherr_badcred;
1131		auth_res = progp->pg_authenticate(rqstp);
1132	}
1133	switch (auth_res) {
1134	case SVC_OK:
1135		break;
1136	case SVC_GARBAGE:
1137		goto err_garbage;
1138	case SVC_SYSERR:
1139		rpc_stat = rpc_system_err;
1140		goto err_bad;
1141	case SVC_DENIED:
1142		goto err_bad_auth;
1143	case SVC_CLOSE:
1144		if (test_bit(XPT_TEMP, &rqstp->rq_xprt->xpt_flags))
1145			svc_close_xprt(rqstp->rq_xprt);
1146	case SVC_DROP:
1147		goto dropit;
1148	case SVC_COMPLETE:
1149		goto sendit;
1150	}
1151
1152	if (progp == NULL)
1153		goto err_bad_prog;
1154
1155	if (vers >= progp->pg_nvers ||
1156	  !(versp = progp->pg_vers[vers]))
1157		goto err_bad_vers;
1158
1159	procp = versp->vs_proc + proc;
1160	if (proc >= versp->vs_nproc || !procp->pc_func)
1161		goto err_bad_proc;
1162	rqstp->rq_procinfo = procp;
1163
1164	/* Syntactic check complete */
1165	serv->sv_stats->rpccnt++;
1166
1167	/* Build the reply header. */
1168	statp = resv->iov_base +resv->iov_len;
1169	svc_putnl(resv, RPC_SUCCESS);
1170
1171	/* Bump per-procedure stats counter */
1172	procp->pc_count++;
1173
1174	/* Initialize storage for argp and resp */
1175	memset(rqstp->rq_argp, 0, procp->pc_argsize);
1176	memset(rqstp->rq_resp, 0, procp->pc_ressize);
1177
1178	/* un-reserve some of the out-queue now that we have a
1179	 * better idea of reply size
1180	 */
1181	if (procp->pc_xdrressize)
1182		svc_reserve_auth(rqstp, procp->pc_xdrressize<<2);
1183
1184	/* Call the function that processes the request. */
1185	if (!versp->vs_dispatch) {
1186		/* Decode arguments */
1187		xdr = procp->pc_decode;
1188		if (xdr && !xdr(rqstp, argv->iov_base, rqstp->rq_argp))
1189			goto err_garbage;
1190
1191		*statp = procp->pc_func(rqstp, rqstp->rq_argp, rqstp->rq_resp);
1192
1193		/* Encode reply */
1194		if (test_bit(RQ_DROPME, &rqstp->rq_flags)) {
1195			if (procp->pc_release)
1196				procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1197			goto dropit;
1198		}
1199		if (*statp == rpc_success &&
1200		    (xdr = procp->pc_encode) &&
1201		    !xdr(rqstp, resv->iov_base+resv->iov_len, rqstp->rq_resp)) {
1202			dprintk("svc: failed to encode reply\n");
1203			/* serv->sv_stats->rpcsystemerr++; */
1204			*statp = rpc_system_err;
1205		}
1206	} else {
1207		dprintk("svc: calling dispatcher\n");
1208		if (!versp->vs_dispatch(rqstp, statp)) {
1209			/* Release reply info */
1210			if (procp->pc_release)
1211				procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1212			goto dropit;
1213		}
1214	}
1215
1216	/* Check RPC status result */
1217	if (*statp != rpc_success)
1218		resv->iov_len = ((void*)statp)  - resv->iov_base + 4;
1219
1220	/* Release reply info */
1221	if (procp->pc_release)
1222		procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1223
1224	if (procp->pc_encode == NULL)
1225		goto dropit;
1226
1227 sendit:
1228	if (svc_authorise(rqstp))
1229		goto dropit;
1230	return 1;		/* Caller can now send it */
1231
1232 dropit:
1233	svc_authorise(rqstp);	/* doesn't hurt to call this twice */
1234	dprintk("svc: svc_process dropit\n");
1235	return 0;
1236
1237err_short_len:
1238	svc_printk(rqstp, "short len %Zd, dropping request\n",
1239			argv->iov_len);
1240
1241	goto dropit;			/* drop request */
1242
1243err_bad_rpc:
1244	serv->sv_stats->rpcbadfmt++;
1245	svc_putnl(resv, 1);	/* REJECT */
1246	svc_putnl(resv, 0);	/* RPC_MISMATCH */
1247	svc_putnl(resv, 2);	/* Only RPCv2 supported */
1248	svc_putnl(resv, 2);
1249	goto sendit;
1250
1251err_bad_auth:
1252	dprintk("svc: authentication failed (%d)\n", ntohl(auth_stat));
1253	serv->sv_stats->rpcbadauth++;
1254	/* Restore write pointer to location of accept status: */
1255	xdr_ressize_check(rqstp, reply_statp);
1256	svc_putnl(resv, 1);	/* REJECT */
1257	svc_putnl(resv, 1);	/* AUTH_ERROR */
1258	svc_putnl(resv, ntohl(auth_stat));	/* status */
1259	goto sendit;
1260
1261err_bad_prog:
1262	dprintk("svc: unknown program %d\n", prog);
1263	serv->sv_stats->rpcbadfmt++;
1264	svc_putnl(resv, RPC_PROG_UNAVAIL);
1265	goto sendit;
1266
1267err_bad_vers:
1268	svc_printk(rqstp, "unknown version (%d for prog %d, %s)\n",
1269		       vers, prog, progp->pg_name);
1270
1271	serv->sv_stats->rpcbadfmt++;
1272	svc_putnl(resv, RPC_PROG_MISMATCH);
1273	svc_putnl(resv, progp->pg_lovers);
1274	svc_putnl(resv, progp->pg_hivers);
1275	goto sendit;
1276
1277err_bad_proc:
1278	svc_printk(rqstp, "unknown procedure (%d)\n", proc);
1279
1280	serv->sv_stats->rpcbadfmt++;
1281	svc_putnl(resv, RPC_PROC_UNAVAIL);
1282	goto sendit;
1283
1284err_garbage:
1285	svc_printk(rqstp, "failed to decode args\n");
1286
1287	rpc_stat = rpc_garbage_args;
1288err_bad:
1289	serv->sv_stats->rpcbadfmt++;
1290	svc_putnl(resv, ntohl(rpc_stat));
1291	goto sendit;
1292}
1293EXPORT_SYMBOL_GPL(svc_process);
1294
1295/*
1296 * Process the RPC request.
1297 */
1298int
1299svc_process(struct svc_rqst *rqstp)
1300{
1301	struct kvec		*argv = &rqstp->rq_arg.head[0];
1302	struct kvec		*resv = &rqstp->rq_res.head[0];
1303	struct svc_serv		*serv = rqstp->rq_server;
1304	u32			dir;
1305
1306	/*
1307	 * Setup response xdr_buf.
1308	 * Initially it has just one page
1309	 */
1310	rqstp->rq_next_page = &rqstp->rq_respages[1];
1311	resv->iov_base = page_address(rqstp->rq_respages[0]);
1312	resv->iov_len = 0;
1313	rqstp->rq_res.pages = rqstp->rq_respages + 1;
1314	rqstp->rq_res.len = 0;
1315	rqstp->rq_res.page_base = 0;
1316	rqstp->rq_res.page_len = 0;
1317	rqstp->rq_res.buflen = PAGE_SIZE;
1318	rqstp->rq_res.tail[0].iov_base = NULL;
1319	rqstp->rq_res.tail[0].iov_len = 0;
1320
1321	dir  = svc_getnl(argv);
1322	if (dir != 0) {
1323		/* direction != CALL */
1324		svc_printk(rqstp, "bad direction %d, dropping request\n", dir);
1325		serv->sv_stats->rpcbadfmt++;
1326		goto out_drop;
1327	}
1328
1329	/* Returns 1 for send, 0 for drop */
1330	if (likely(svc_process_common(rqstp, argv, resv))) {
1331		int ret = svc_send(rqstp);
1332
1333		trace_svc_process(rqstp, ret);
1334		return ret;
1335	}
1336out_drop:
1337	trace_svc_process(rqstp, 0);
1338	svc_drop(rqstp);
1339	return 0;
1340}
1341
1342#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1343/*
1344 * Process a backchannel RPC request that arrived over an existing
1345 * outbound connection
1346 */
1347int
1348bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
1349	       struct svc_rqst *rqstp)
1350{
1351	struct kvec	*argv = &rqstp->rq_arg.head[0];
1352	struct kvec	*resv = &rqstp->rq_res.head[0];
1353
1354	/* Build the svc_rqst used by the common processing routine */
1355	rqstp->rq_xprt = serv->sv_bc_xprt;
1356	rqstp->rq_xid = req->rq_xid;
1357	rqstp->rq_prot = req->rq_xprt->prot;
1358	rqstp->rq_server = serv;
1359
1360	rqstp->rq_addrlen = sizeof(req->rq_xprt->addr);
1361	memcpy(&rqstp->rq_addr, &req->rq_xprt->addr, rqstp->rq_addrlen);
1362	memcpy(&rqstp->rq_arg, &req->rq_rcv_buf, sizeof(rqstp->rq_arg));
1363	memcpy(&rqstp->rq_res, &req->rq_snd_buf, sizeof(rqstp->rq_res));
1364
1365	/* reset result send buffer "put" position */
1366	resv->iov_len = 0;
1367
1368	if (rqstp->rq_prot != IPPROTO_TCP) {
1369		printk(KERN_ERR "No support for Non-TCP transports!\n");
1370		BUG();
1371	}
1372
1373	/*
1374	 * Skip the next two words because they've already been
1375	 * processed in the trasport
1376	 */
1377	svc_getu32(argv);	/* XID */
1378	svc_getnl(argv);	/* CALLDIR */
1379
1380	/* Returns 1 for send, 0 for drop */
1381	if (svc_process_common(rqstp, argv, resv)) {
1382		memcpy(&req->rq_snd_buf, &rqstp->rq_res,
1383						sizeof(req->rq_snd_buf));
1384		return bc_send(req);
1385	} else {
1386		/* drop request */
1387		xprt_free_bc_request(req);
1388		return 0;
1389	}
1390}
1391EXPORT_SYMBOL_GPL(bc_svc_process);
1392#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1393
1394/*
1395 * Return (transport-specific) limit on the rpc payload.
1396 */
1397u32 svc_max_payload(const struct svc_rqst *rqstp)
1398{
1399	u32 max = rqstp->rq_xprt->xpt_class->xcl_max_payload;
1400
1401	if (rqstp->rq_server->sv_max_payload < max)
1402		max = rqstp->rq_server->sv_max_payload;
1403	return max;
1404}
1405EXPORT_SYMBOL_GPL(svc_max_payload);
1406