1/*
2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses.  You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
8 * license below:
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 *
14 *      Redistributions of source code must retain the above copyright
15 *      notice, this list of conditions and the following disclaimer.
16 *
17 *      Redistributions in binary form must reproduce the above
18 *      copyright notice, this list of conditions and the following
19 *      disclaimer in the documentation and/or other materials provided
20 *      with the distribution.
21 *
22 *      Neither the name of the Network Appliance, Inc. nor the names of
23 *      its contributors may be used to endorse or promote products
24 *      derived from this software without specific prior written
25 *      permission.
26 *
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 */
39
40/*
41 * verbs.c
42 *
43 * Encapsulates the major functions managing:
44 *  o adapters
45 *  o endpoints
46 *  o connections
47 *  o buffer memory
48 */
49
50#include <linux/interrupt.h>
51#include <linux/slab.h>
52#include <linux/prefetch.h>
53#include <linux/sunrpc/addr.h>
54#include <asm/bitops.h>
55
56#include "xprt_rdma.h"
57
58/*
59 * Globals/Macros
60 */
61
62#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
63# define RPCDBG_FACILITY	RPCDBG_TRANS
64#endif
65
66/*
67 * internal functions
68 */
69
70/*
71 * handle replies in tasklet context, using a single, global list
72 * rdma tasklet function -- just turn around and call the func
73 * for all replies on the list
74 */
75
76static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
77static LIST_HEAD(rpcrdma_tasklets_g);
78
79static void
80rpcrdma_run_tasklet(unsigned long data)
81{
82	struct rpcrdma_rep *rep;
83	void (*func)(struct rpcrdma_rep *);
84	unsigned long flags;
85
86	data = data;
87	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
88	while (!list_empty(&rpcrdma_tasklets_g)) {
89		rep = list_entry(rpcrdma_tasklets_g.next,
90				 struct rpcrdma_rep, rr_list);
91		list_del(&rep->rr_list);
92		func = rep->rr_func;
93		rep->rr_func = NULL;
94		spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
95
96		if (func)
97			func(rep);
98		else
99			rpcrdma_recv_buffer_put(rep);
100
101		spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
102	}
103	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
104}
105
106static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
107
108static const char * const async_event[] = {
109	"CQ error",
110	"QP fatal error",
111	"QP request error",
112	"QP access error",
113	"communication established",
114	"send queue drained",
115	"path migration successful",
116	"path mig error",
117	"device fatal error",
118	"port active",
119	"port error",
120	"LID change",
121	"P_key change",
122	"SM change",
123	"SRQ error",
124	"SRQ limit reached",
125	"last WQE reached",
126	"client reregister",
127	"GID change",
128};
129
130#define ASYNC_MSG(status)					\
131	((status) < ARRAY_SIZE(async_event) ?			\
132		async_event[(status)] : "unknown async error")
133
134static void
135rpcrdma_schedule_tasklet(struct list_head *sched_list)
136{
137	unsigned long flags;
138
139	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
140	list_splice_tail(sched_list, &rpcrdma_tasklets_g);
141	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
142	tasklet_schedule(&rpcrdma_tasklet_g);
143}
144
145static void
146rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
147{
148	struct rpcrdma_ep *ep = context;
149
150	pr_err("RPC:       %s: %s on device %s ep %p\n",
151	       __func__, ASYNC_MSG(event->event),
152		event->device->name, context);
153	if (ep->rep_connected == 1) {
154		ep->rep_connected = -EIO;
155		rpcrdma_conn_func(ep);
156		wake_up_all(&ep->rep_connect_wait);
157	}
158}
159
160static void
161rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
162{
163	struct rpcrdma_ep *ep = context;
164
165	pr_err("RPC:       %s: %s on device %s ep %p\n",
166	       __func__, ASYNC_MSG(event->event),
167		event->device->name, context);
168	if (ep->rep_connected == 1) {
169		ep->rep_connected = -EIO;
170		rpcrdma_conn_func(ep);
171		wake_up_all(&ep->rep_connect_wait);
172	}
173}
174
175static const char * const wc_status[] = {
176	"success",
177	"local length error",
178	"local QP operation error",
179	"local EE context operation error",
180	"local protection error",
181	"WR flushed",
182	"memory management operation error",
183	"bad response error",
184	"local access error",
185	"remote invalid request error",
186	"remote access error",
187	"remote operation error",
188	"transport retry counter exceeded",
189	"RNR retry counter exceeded",
190	"local RDD violation error",
191	"remove invalid RD request",
192	"operation aborted",
193	"invalid EE context number",
194	"invalid EE context state",
195	"fatal error",
196	"response timeout error",
197	"general error",
198};
199
200#define COMPLETION_MSG(status)					\
201	((status) < ARRAY_SIZE(wc_status) ?			\
202		wc_status[(status)] : "unexpected completion error")
203
204static void
205rpcrdma_sendcq_process_wc(struct ib_wc *wc)
206{
207	/* WARNING: Only wr_id and status are reliable at this point */
208	if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) {
209		if (wc->status != IB_WC_SUCCESS &&
210		    wc->status != IB_WC_WR_FLUSH_ERR)
211			pr_err("RPC:       %s: SEND: %s\n",
212			       __func__, COMPLETION_MSG(wc->status));
213	} else {
214		struct rpcrdma_mw *r;
215
216		r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
217		r->mw_sendcompletion(wc);
218	}
219}
220
221static int
222rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
223{
224	struct ib_wc *wcs;
225	int budget, count, rc;
226
227	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
228	do {
229		wcs = ep->rep_send_wcs;
230
231		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
232		if (rc <= 0)
233			return rc;
234
235		count = rc;
236		while (count-- > 0)
237			rpcrdma_sendcq_process_wc(wcs++);
238	} while (rc == RPCRDMA_POLLSIZE && --budget);
239	return 0;
240}
241
242/*
243 * Handle send, fast_reg_mr, and local_inv completions.
244 *
245 * Send events are typically suppressed and thus do not result
246 * in an upcall. Occasionally one is signaled, however. This
247 * prevents the provider's completion queue from wrapping and
248 * losing a completion.
249 */
250static void
251rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
252{
253	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
254	int rc;
255
256	rc = rpcrdma_sendcq_poll(cq, ep);
257	if (rc) {
258		dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
259			__func__, rc);
260		return;
261	}
262
263	rc = ib_req_notify_cq(cq,
264			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
265	if (rc == 0)
266		return;
267	if (rc < 0) {
268		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
269			__func__, rc);
270		return;
271	}
272
273	rpcrdma_sendcq_poll(cq, ep);
274}
275
276static void
277rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
278{
279	struct rpcrdma_rep *rep =
280			(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
281
282	/* WARNING: Only wr_id and status are reliable at this point */
283	if (wc->status != IB_WC_SUCCESS)
284		goto out_fail;
285
286	/* status == SUCCESS means all fields in wc are trustworthy */
287	if (wc->opcode != IB_WC_RECV)
288		return;
289
290	dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
291		__func__, rep, wc->byte_len);
292
293	rep->rr_len = wc->byte_len;
294	ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
295				   rdmab_addr(rep->rr_rdmabuf),
296				   rep->rr_len, DMA_FROM_DEVICE);
297	prefetch(rdmab_to_msg(rep->rr_rdmabuf));
298
299out_schedule:
300	list_add_tail(&rep->rr_list, sched_list);
301	return;
302out_fail:
303	if (wc->status != IB_WC_WR_FLUSH_ERR)
304		pr_err("RPC:       %s: rep %p: %s\n",
305		       __func__, rep, COMPLETION_MSG(wc->status));
306	rep->rr_len = ~0U;
307	goto out_schedule;
308}
309
310static int
311rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
312{
313	struct list_head sched_list;
314	struct ib_wc *wcs;
315	int budget, count, rc;
316
317	INIT_LIST_HEAD(&sched_list);
318	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
319	do {
320		wcs = ep->rep_recv_wcs;
321
322		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
323		if (rc <= 0)
324			goto out_schedule;
325
326		count = rc;
327		while (count-- > 0)
328			rpcrdma_recvcq_process_wc(wcs++, &sched_list);
329	} while (rc == RPCRDMA_POLLSIZE && --budget);
330	rc = 0;
331
332out_schedule:
333	rpcrdma_schedule_tasklet(&sched_list);
334	return rc;
335}
336
337/*
338 * Handle receive completions.
339 *
340 * It is reentrant but processes single events in order to maintain
341 * ordering of receives to keep server credits.
342 *
343 * It is the responsibility of the scheduled tasklet to return
344 * recv buffers to the pool. NOTE: this affects synchronization of
345 * connection shutdown. That is, the structures required for
346 * the completion of the reply handler must remain intact until
347 * all memory has been reclaimed.
348 */
349static void
350rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
351{
352	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
353	int rc;
354
355	rc = rpcrdma_recvcq_poll(cq, ep);
356	if (rc) {
357		dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
358			__func__, rc);
359		return;
360	}
361
362	rc = ib_req_notify_cq(cq,
363			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
364	if (rc == 0)
365		return;
366	if (rc < 0) {
367		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
368			__func__, rc);
369		return;
370	}
371
372	rpcrdma_recvcq_poll(cq, ep);
373}
374
375static void
376rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
377{
378	struct ib_wc wc;
379	LIST_HEAD(sched_list);
380
381	while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
382		rpcrdma_recvcq_process_wc(&wc, &sched_list);
383	if (!list_empty(&sched_list))
384		rpcrdma_schedule_tasklet(&sched_list);
385	while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
386		rpcrdma_sendcq_process_wc(&wc);
387}
388
389#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
390static const char * const conn[] = {
391	"address resolved",
392	"address error",
393	"route resolved",
394	"route error",
395	"connect request",
396	"connect response",
397	"connect error",
398	"unreachable",
399	"rejected",
400	"established",
401	"disconnected",
402	"device removal",
403	"multicast join",
404	"multicast error",
405	"address change",
406	"timewait exit",
407};
408
409#define CONNECTION_MSG(status)						\
410	((status) < ARRAY_SIZE(conn) ?					\
411		conn[(status)] : "unrecognized connection error")
412#endif
413
414static int
415rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
416{
417	struct rpcrdma_xprt *xprt = id->context;
418	struct rpcrdma_ia *ia = &xprt->rx_ia;
419	struct rpcrdma_ep *ep = &xprt->rx_ep;
420#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
421	struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
422#endif
423	struct ib_qp_attr *attr = &ia->ri_qp_attr;
424	struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
425	int connstate = 0;
426
427	switch (event->event) {
428	case RDMA_CM_EVENT_ADDR_RESOLVED:
429	case RDMA_CM_EVENT_ROUTE_RESOLVED:
430		ia->ri_async_rc = 0;
431		complete(&ia->ri_done);
432		break;
433	case RDMA_CM_EVENT_ADDR_ERROR:
434		ia->ri_async_rc = -EHOSTUNREACH;
435		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
436			__func__, ep);
437		complete(&ia->ri_done);
438		break;
439	case RDMA_CM_EVENT_ROUTE_ERROR:
440		ia->ri_async_rc = -ENETUNREACH;
441		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
442			__func__, ep);
443		complete(&ia->ri_done);
444		break;
445	case RDMA_CM_EVENT_ESTABLISHED:
446		connstate = 1;
447		ib_query_qp(ia->ri_id->qp, attr,
448			    IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
449			    iattr);
450		dprintk("RPC:       %s: %d responder resources"
451			" (%d initiator)\n",
452			__func__, attr->max_dest_rd_atomic,
453			attr->max_rd_atomic);
454		goto connected;
455	case RDMA_CM_EVENT_CONNECT_ERROR:
456		connstate = -ENOTCONN;
457		goto connected;
458	case RDMA_CM_EVENT_UNREACHABLE:
459		connstate = -ENETDOWN;
460		goto connected;
461	case RDMA_CM_EVENT_REJECTED:
462		connstate = -ECONNREFUSED;
463		goto connected;
464	case RDMA_CM_EVENT_DISCONNECTED:
465		connstate = -ECONNABORTED;
466		goto connected;
467	case RDMA_CM_EVENT_DEVICE_REMOVAL:
468		connstate = -ENODEV;
469connected:
470		dprintk("RPC:       %s: %sconnected\n",
471					__func__, connstate > 0 ? "" : "dis");
472		ep->rep_connected = connstate;
473		rpcrdma_conn_func(ep);
474		wake_up_all(&ep->rep_connect_wait);
475		/*FALLTHROUGH*/
476	default:
477		dprintk("RPC:       %s: %pIS:%u (ep 0x%p): %s\n",
478			__func__, sap, rpc_get_port(sap), ep,
479			CONNECTION_MSG(event->event));
480		break;
481	}
482
483#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
484	if (connstate == 1) {
485		int ird = attr->max_dest_rd_atomic;
486		int tird = ep->rep_remote_cma.responder_resources;
487
488		pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
489			sap, rpc_get_port(sap),
490			ia->ri_id->device->name,
491			ia->ri_ops->ro_displayname,
492			xprt->rx_buf.rb_max_requests,
493			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
494	} else if (connstate < 0) {
495		pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
496			sap, rpc_get_port(sap), connstate);
497	}
498#endif
499
500	return 0;
501}
502
503static struct rdma_cm_id *
504rpcrdma_create_id(struct rpcrdma_xprt *xprt,
505			struct rpcrdma_ia *ia, struct sockaddr *addr)
506{
507	struct rdma_cm_id *id;
508	int rc;
509
510	init_completion(&ia->ri_done);
511
512	id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
513	if (IS_ERR(id)) {
514		rc = PTR_ERR(id);
515		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
516			__func__, rc);
517		return id;
518	}
519
520	ia->ri_async_rc = -ETIMEDOUT;
521	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
522	if (rc) {
523		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
524			__func__, rc);
525		goto out;
526	}
527	wait_for_completion_interruptible_timeout(&ia->ri_done,
528				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
529	rc = ia->ri_async_rc;
530	if (rc)
531		goto out;
532
533	ia->ri_async_rc = -ETIMEDOUT;
534	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
535	if (rc) {
536		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
537			__func__, rc);
538		goto out;
539	}
540	wait_for_completion_interruptible_timeout(&ia->ri_done,
541				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
542	rc = ia->ri_async_rc;
543	if (rc)
544		goto out;
545
546	return id;
547
548out:
549	rdma_destroy_id(id);
550	return ERR_PTR(rc);
551}
552
553/*
554 * Drain any cq, prior to teardown.
555 */
556static void
557rpcrdma_clean_cq(struct ib_cq *cq)
558{
559	struct ib_wc wc;
560	int count = 0;
561
562	while (1 == ib_poll_cq(cq, 1, &wc))
563		++count;
564
565	if (count)
566		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
567			__func__, count, wc.opcode);
568}
569
570/*
571 * Exported functions.
572 */
573
574/*
575 * Open and initialize an Interface Adapter.
576 *  o initializes fields of struct rpcrdma_ia, including
577 *    interface and provider attributes and protection zone.
578 */
579int
580rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
581{
582	int rc, mem_priv;
583	struct rpcrdma_ia *ia = &xprt->rx_ia;
584	struct ib_device_attr *devattr = &ia->ri_devattr;
585
586	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
587	if (IS_ERR(ia->ri_id)) {
588		rc = PTR_ERR(ia->ri_id);
589		goto out1;
590	}
591
592	ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
593	if (IS_ERR(ia->ri_pd)) {
594		rc = PTR_ERR(ia->ri_pd);
595		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
596			__func__, rc);
597		goto out2;
598	}
599
600	rc = ib_query_device(ia->ri_id->device, devattr);
601	if (rc) {
602		dprintk("RPC:       %s: ib_query_device failed %d\n",
603			__func__, rc);
604		goto out3;
605	}
606
607	if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
608		ia->ri_have_dma_lkey = 1;
609		ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
610	}
611
612	if (memreg == RPCRDMA_FRMR) {
613		/* Requires both frmr reg and local dma lkey */
614		if (((devattr->device_cap_flags &
615		     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
616		    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) ||
617		      (devattr->max_fast_reg_page_list_len == 0)) {
618			dprintk("RPC:       %s: FRMR registration "
619				"not supported by HCA\n", __func__);
620			memreg = RPCRDMA_MTHCAFMR;
621		}
622	}
623	if (memreg == RPCRDMA_MTHCAFMR) {
624		if (!ia->ri_id->device->alloc_fmr) {
625			dprintk("RPC:       %s: MTHCAFMR registration "
626				"not supported by HCA\n", __func__);
627			memreg = RPCRDMA_ALLPHYSICAL;
628		}
629	}
630
631	/*
632	 * Optionally obtain an underlying physical identity mapping in
633	 * order to do a memory window-based bind. This base registration
634	 * is protected from remote access - that is enabled only by binding
635	 * for the specific bytes targeted during each RPC operation, and
636	 * revoked after the corresponding completion similar to a storage
637	 * adapter.
638	 */
639	switch (memreg) {
640	case RPCRDMA_FRMR:
641		ia->ri_ops = &rpcrdma_frwr_memreg_ops;
642		break;
643	case RPCRDMA_ALLPHYSICAL:
644		ia->ri_ops = &rpcrdma_physical_memreg_ops;
645		mem_priv = IB_ACCESS_LOCAL_WRITE |
646				IB_ACCESS_REMOTE_WRITE |
647				IB_ACCESS_REMOTE_READ;
648		goto register_setup;
649	case RPCRDMA_MTHCAFMR:
650		ia->ri_ops = &rpcrdma_fmr_memreg_ops;
651		if (ia->ri_have_dma_lkey)
652			break;
653		mem_priv = IB_ACCESS_LOCAL_WRITE;
654	register_setup:
655		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
656		if (IS_ERR(ia->ri_bind_mem)) {
657			printk(KERN_ALERT "%s: ib_get_dma_mr for "
658				"phys register failed with %lX\n",
659				__func__, PTR_ERR(ia->ri_bind_mem));
660			rc = -ENOMEM;
661			goto out3;
662		}
663		break;
664	default:
665		printk(KERN_ERR "RPC: Unsupported memory "
666				"registration mode: %d\n", memreg);
667		rc = -ENOMEM;
668		goto out3;
669	}
670	dprintk("RPC:       %s: memory registration strategy is '%s'\n",
671		__func__, ia->ri_ops->ro_displayname);
672
673	/* Else will do memory reg/dereg for each chunk */
674	ia->ri_memreg_strategy = memreg;
675
676	rwlock_init(&ia->ri_qplock);
677	return 0;
678
679out3:
680	ib_dealloc_pd(ia->ri_pd);
681	ia->ri_pd = NULL;
682out2:
683	rdma_destroy_id(ia->ri_id);
684	ia->ri_id = NULL;
685out1:
686	return rc;
687}
688
689/*
690 * Clean up/close an IA.
691 *   o if event handles and PD have been initialized, free them.
692 *   o close the IA
693 */
694void
695rpcrdma_ia_close(struct rpcrdma_ia *ia)
696{
697	int rc;
698
699	dprintk("RPC:       %s: entering\n", __func__);
700	if (ia->ri_bind_mem != NULL) {
701		rc = ib_dereg_mr(ia->ri_bind_mem);
702		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
703			__func__, rc);
704	}
705	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
706		if (ia->ri_id->qp)
707			rdma_destroy_qp(ia->ri_id);
708		rdma_destroy_id(ia->ri_id);
709		ia->ri_id = NULL;
710	}
711	if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
712		rc = ib_dealloc_pd(ia->ri_pd);
713		dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
714			__func__, rc);
715	}
716}
717
718/*
719 * Create unconnected endpoint.
720 */
721int
722rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
723				struct rpcrdma_create_data_internal *cdata)
724{
725	struct ib_device_attr *devattr = &ia->ri_devattr;
726	struct ib_cq *sendcq, *recvcq;
727	int rc, err;
728
729	/* check provider's send/recv wr limits */
730	if (cdata->max_requests > devattr->max_qp_wr)
731		cdata->max_requests = devattr->max_qp_wr;
732
733	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
734	ep->rep_attr.qp_context = ep;
735	ep->rep_attr.srq = NULL;
736	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
737	rc = ia->ri_ops->ro_open(ia, ep, cdata);
738	if (rc)
739		return rc;
740	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
741	ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
742	ep->rep_attr.cap.max_recv_sge = 1;
743	ep->rep_attr.cap.max_inline_data = 0;
744	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
745	ep->rep_attr.qp_type = IB_QPT_RC;
746	ep->rep_attr.port_num = ~0;
747
748	if (cdata->padding) {
749		ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding,
750						      GFP_KERNEL);
751		if (IS_ERR(ep->rep_padbuf))
752			return PTR_ERR(ep->rep_padbuf);
753	} else
754		ep->rep_padbuf = NULL;
755
756	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
757		"iovs: send %d recv %d\n",
758		__func__,
759		ep->rep_attr.cap.max_send_wr,
760		ep->rep_attr.cap.max_recv_wr,
761		ep->rep_attr.cap.max_send_sge,
762		ep->rep_attr.cap.max_recv_sge);
763
764	/* set trigger for requesting send completion */
765	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
766	if (ep->rep_cqinit > RPCRDMA_MAX_UNSIGNALED_SENDS)
767		ep->rep_cqinit = RPCRDMA_MAX_UNSIGNALED_SENDS;
768	else if (ep->rep_cqinit <= 2)
769		ep->rep_cqinit = 0;
770	INIT_CQCOUNT(ep);
771	init_waitqueue_head(&ep->rep_connect_wait);
772	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
773
774	sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
775				  rpcrdma_cq_async_error_upcall, ep,
776				  ep->rep_attr.cap.max_send_wr + 1, 0);
777	if (IS_ERR(sendcq)) {
778		rc = PTR_ERR(sendcq);
779		dprintk("RPC:       %s: failed to create send CQ: %i\n",
780			__func__, rc);
781		goto out1;
782	}
783
784	rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
785	if (rc) {
786		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
787			__func__, rc);
788		goto out2;
789	}
790
791	recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
792				  rpcrdma_cq_async_error_upcall, ep,
793				  ep->rep_attr.cap.max_recv_wr + 1, 0);
794	if (IS_ERR(recvcq)) {
795		rc = PTR_ERR(recvcq);
796		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
797			__func__, rc);
798		goto out2;
799	}
800
801	rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
802	if (rc) {
803		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
804			__func__, rc);
805		ib_destroy_cq(recvcq);
806		goto out2;
807	}
808
809	ep->rep_attr.send_cq = sendcq;
810	ep->rep_attr.recv_cq = recvcq;
811
812	/* Initialize cma parameters */
813
814	/* RPC/RDMA does not use private data */
815	ep->rep_remote_cma.private_data = NULL;
816	ep->rep_remote_cma.private_data_len = 0;
817
818	/* Client offers RDMA Read but does not initiate */
819	ep->rep_remote_cma.initiator_depth = 0;
820	if (devattr->max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
821		ep->rep_remote_cma.responder_resources = 32;
822	else
823		ep->rep_remote_cma.responder_resources =
824						devattr->max_qp_rd_atom;
825
826	ep->rep_remote_cma.retry_count = 7;
827	ep->rep_remote_cma.flow_control = 0;
828	ep->rep_remote_cma.rnr_retry_count = 0;
829
830	return 0;
831
832out2:
833	err = ib_destroy_cq(sendcq);
834	if (err)
835		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
836			__func__, err);
837out1:
838	rpcrdma_free_regbuf(ia, ep->rep_padbuf);
839	return rc;
840}
841
842/*
843 * rpcrdma_ep_destroy
844 *
845 * Disconnect and destroy endpoint. After this, the only
846 * valid operations on the ep are to free it (if dynamically
847 * allocated) or re-create it.
848 */
849void
850rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
851{
852	int rc;
853
854	dprintk("RPC:       %s: entering, connected is %d\n",
855		__func__, ep->rep_connected);
856
857	cancel_delayed_work_sync(&ep->rep_connect_worker);
858
859	if (ia->ri_id->qp) {
860		rpcrdma_ep_disconnect(ep, ia);
861		rdma_destroy_qp(ia->ri_id);
862		ia->ri_id->qp = NULL;
863	}
864
865	rpcrdma_free_regbuf(ia, ep->rep_padbuf);
866
867	rpcrdma_clean_cq(ep->rep_attr.recv_cq);
868	rc = ib_destroy_cq(ep->rep_attr.recv_cq);
869	if (rc)
870		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
871			__func__, rc);
872
873	rpcrdma_clean_cq(ep->rep_attr.send_cq);
874	rc = ib_destroy_cq(ep->rep_attr.send_cq);
875	if (rc)
876		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
877			__func__, rc);
878}
879
880/*
881 * Connect unconnected endpoint.
882 */
883int
884rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
885{
886	struct rdma_cm_id *id, *old;
887	int rc = 0;
888	int retry_count = 0;
889
890	if (ep->rep_connected != 0) {
891		struct rpcrdma_xprt *xprt;
892retry:
893		dprintk("RPC:       %s: reconnecting...\n", __func__);
894
895		rpcrdma_ep_disconnect(ep, ia);
896		rpcrdma_flush_cqs(ep);
897
898		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
899		ia->ri_ops->ro_reset(xprt);
900
901		id = rpcrdma_create_id(xprt, ia,
902				(struct sockaddr *)&xprt->rx_data.addr);
903		if (IS_ERR(id)) {
904			rc = -EHOSTUNREACH;
905			goto out;
906		}
907		/* TEMP TEMP TEMP - fail if new device:
908		 * Deregister/remarshal *all* requests!
909		 * Close and recreate adapter, pd, etc!
910		 * Re-determine all attributes still sane!
911		 * More stuff I haven't thought of!
912		 * Rrrgh!
913		 */
914		if (ia->ri_id->device != id->device) {
915			printk("RPC:       %s: can't reconnect on "
916				"different device!\n", __func__);
917			rdma_destroy_id(id);
918			rc = -ENETUNREACH;
919			goto out;
920		}
921		/* END TEMP */
922		rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
923		if (rc) {
924			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
925				__func__, rc);
926			rdma_destroy_id(id);
927			rc = -ENETUNREACH;
928			goto out;
929		}
930
931		write_lock(&ia->ri_qplock);
932		old = ia->ri_id;
933		ia->ri_id = id;
934		write_unlock(&ia->ri_qplock);
935
936		rdma_destroy_qp(old);
937		rdma_destroy_id(old);
938	} else {
939		dprintk("RPC:       %s: connecting...\n", __func__);
940		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
941		if (rc) {
942			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
943				__func__, rc);
944			/* do not update ep->rep_connected */
945			return -ENETUNREACH;
946		}
947	}
948
949	ep->rep_connected = 0;
950
951	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
952	if (rc) {
953		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
954				__func__, rc);
955		goto out;
956	}
957
958	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
959
960	/*
961	 * Check state. A non-peer reject indicates no listener
962	 * (ECONNREFUSED), which may be a transient state. All
963	 * others indicate a transport condition which has already
964	 * undergone a best-effort.
965	 */
966	if (ep->rep_connected == -ECONNREFUSED &&
967	    ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
968		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
969		goto retry;
970	}
971	if (ep->rep_connected <= 0) {
972		/* Sometimes, the only way to reliably connect to remote
973		 * CMs is to use same nonzero values for ORD and IRD. */
974		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
975		    (ep->rep_remote_cma.responder_resources == 0 ||
976		     ep->rep_remote_cma.initiator_depth !=
977				ep->rep_remote_cma.responder_resources)) {
978			if (ep->rep_remote_cma.responder_resources == 0)
979				ep->rep_remote_cma.responder_resources = 1;
980			ep->rep_remote_cma.initiator_depth =
981				ep->rep_remote_cma.responder_resources;
982			goto retry;
983		}
984		rc = ep->rep_connected;
985	} else {
986		dprintk("RPC:       %s: connected\n", __func__);
987	}
988
989out:
990	if (rc)
991		ep->rep_connected = rc;
992	return rc;
993}
994
995/*
996 * rpcrdma_ep_disconnect
997 *
998 * This is separate from destroy to facilitate the ability
999 * to reconnect without recreating the endpoint.
1000 *
1001 * This call is not reentrant, and must not be made in parallel
1002 * on the same endpoint.
1003 */
1004void
1005rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
1006{
1007	int rc;
1008
1009	rpcrdma_flush_cqs(ep);
1010	rc = rdma_disconnect(ia->ri_id);
1011	if (!rc) {
1012		/* returns without wait if not connected */
1013		wait_event_interruptible(ep->rep_connect_wait,
1014							ep->rep_connected != 1);
1015		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
1016			(ep->rep_connected == 1) ? "still " : "dis");
1017	} else {
1018		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
1019		ep->rep_connected = rc;
1020	}
1021}
1022
1023static struct rpcrdma_req *
1024rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
1025{
1026	struct rpcrdma_req *req;
1027
1028	req = kzalloc(sizeof(*req), GFP_KERNEL);
1029	if (req == NULL)
1030		return ERR_PTR(-ENOMEM);
1031
1032	req->rl_buffer = &r_xprt->rx_buf;
1033	return req;
1034}
1035
1036static struct rpcrdma_rep *
1037rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
1038{
1039	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1040	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1041	struct rpcrdma_rep *rep;
1042	int rc;
1043
1044	rc = -ENOMEM;
1045	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1046	if (rep == NULL)
1047		goto out;
1048
1049	rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
1050					       GFP_KERNEL);
1051	if (IS_ERR(rep->rr_rdmabuf)) {
1052		rc = PTR_ERR(rep->rr_rdmabuf);
1053		goto out_free;
1054	}
1055
1056	rep->rr_buffer = &r_xprt->rx_buf;
1057	return rep;
1058
1059out_free:
1060	kfree(rep);
1061out:
1062	return ERR_PTR(rc);
1063}
1064
1065int
1066rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1067{
1068	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1069	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1070	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1071	char *p;
1072	size_t len;
1073	int i, rc;
1074
1075	buf->rb_max_requests = cdata->max_requests;
1076	spin_lock_init(&buf->rb_lock);
1077
1078	/* Need to allocate:
1079	 *   1.  arrays for send and recv pointers
1080	 *   2.  arrays of struct rpcrdma_req to fill in pointers
1081	 *   3.  array of struct rpcrdma_rep for replies
1082	 * Send/recv buffers in req/rep need to be registered
1083	 */
1084	len = buf->rb_max_requests *
1085		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1086
1087	p = kzalloc(len, GFP_KERNEL);
1088	if (p == NULL) {
1089		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1090			__func__, len);
1091		rc = -ENOMEM;
1092		goto out;
1093	}
1094	buf->rb_pool = p;	/* for freeing it later */
1095
1096	buf->rb_send_bufs = (struct rpcrdma_req **) p;
1097	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1098	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1099	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1100
1101	rc = ia->ri_ops->ro_init(r_xprt);
1102	if (rc)
1103		goto out;
1104
1105	for (i = 0; i < buf->rb_max_requests; i++) {
1106		struct rpcrdma_req *req;
1107		struct rpcrdma_rep *rep;
1108
1109		req = rpcrdma_create_req(r_xprt);
1110		if (IS_ERR(req)) {
1111			dprintk("RPC:       %s: request buffer %d alloc"
1112				" failed\n", __func__, i);
1113			rc = PTR_ERR(req);
1114			goto out;
1115		}
1116		buf->rb_send_bufs[i] = req;
1117
1118		rep = rpcrdma_create_rep(r_xprt);
1119		if (IS_ERR(rep)) {
1120			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1121				__func__, i);
1122			rc = PTR_ERR(rep);
1123			goto out;
1124		}
1125		buf->rb_recv_bufs[i] = rep;
1126	}
1127
1128	return 0;
1129out:
1130	rpcrdma_buffer_destroy(buf);
1131	return rc;
1132}
1133
1134static void
1135rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
1136{
1137	if (!rep)
1138		return;
1139
1140	rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
1141	kfree(rep);
1142}
1143
1144static void
1145rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
1146{
1147	if (!req)
1148		return;
1149
1150	rpcrdma_free_regbuf(ia, req->rl_sendbuf);
1151	rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
1152	kfree(req);
1153}
1154
1155void
1156rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1157{
1158	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1159	int i;
1160
1161	/* clean up in reverse order from create
1162	 *   1.  recv mr memory (mr free, then kfree)
1163	 *   2.  send mr memory (mr free, then kfree)
1164	 *   3.  MWs
1165	 */
1166	dprintk("RPC:       %s: entering\n", __func__);
1167
1168	for (i = 0; i < buf->rb_max_requests; i++) {
1169		if (buf->rb_recv_bufs)
1170			rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
1171		if (buf->rb_send_bufs)
1172			rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
1173	}
1174
1175	ia->ri_ops->ro_destroy(buf);
1176
1177	kfree(buf->rb_pool);
1178}
1179
1180/* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
1181 * some req segments uninitialized.
1182 */
1183static void
1184rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
1185{
1186	if (*mw) {
1187		list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
1188		*mw = NULL;
1189	}
1190}
1191
1192/* Cycle mw's back in reverse order, and "spin" them.
1193 * This delays and scrambles reuse as much as possible.
1194 */
1195static void
1196rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1197{
1198	struct rpcrdma_mr_seg *seg = req->rl_segments;
1199	struct rpcrdma_mr_seg *seg1 = seg;
1200	int i;
1201
1202	for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
1203		rpcrdma_buffer_put_mr(&seg->rl_mw, buf);
1204	rpcrdma_buffer_put_mr(&seg1->rl_mw, buf);
1205}
1206
1207static void
1208rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1209{
1210	buf->rb_send_bufs[--buf->rb_send_index] = req;
1211	req->rl_niovs = 0;
1212	if (req->rl_reply) {
1213		buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1214		req->rl_reply->rr_func = NULL;
1215		req->rl_reply = NULL;
1216	}
1217}
1218
1219/* rpcrdma_unmap_one() was already done during deregistration.
1220 * Redo only the ib_post_send().
1221 */
1222static void
1223rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
1224{
1225	struct rpcrdma_xprt *r_xprt =
1226				container_of(ia, struct rpcrdma_xprt, rx_ia);
1227	struct ib_send_wr invalidate_wr, *bad_wr;
1228	int rc;
1229
1230	dprintk("RPC:       %s: FRMR %p is stale\n", __func__, r);
1231
1232	/* When this FRMR is re-inserted into rb_mws, it is no longer stale */
1233	r->r.frmr.fr_state = FRMR_IS_INVALID;
1234
1235	memset(&invalidate_wr, 0, sizeof(invalidate_wr));
1236	invalidate_wr.wr_id = (unsigned long)(void *)r;
1237	invalidate_wr.opcode = IB_WR_LOCAL_INV;
1238	invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
1239	DECR_CQCOUNT(&r_xprt->rx_ep);
1240
1241	dprintk("RPC:       %s: frmr %p invalidating rkey %08x\n",
1242		__func__, r, r->r.frmr.fr_mr->rkey);
1243
1244	read_lock(&ia->ri_qplock);
1245	rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1246	read_unlock(&ia->ri_qplock);
1247	if (rc) {
1248		/* Force rpcrdma_buffer_get() to retry */
1249		r->r.frmr.fr_state = FRMR_IS_STALE;
1250		dprintk("RPC:       %s: ib_post_send failed, %i\n",
1251			__func__, rc);
1252	}
1253}
1254
1255static void
1256rpcrdma_retry_flushed_linv(struct list_head *stale,
1257			   struct rpcrdma_buffer *buf)
1258{
1259	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1260	struct list_head *pos;
1261	struct rpcrdma_mw *r;
1262	unsigned long flags;
1263
1264	list_for_each(pos, stale) {
1265		r = list_entry(pos, struct rpcrdma_mw, mw_list);
1266		rpcrdma_retry_local_inv(r, ia);
1267	}
1268
1269	spin_lock_irqsave(&buf->rb_lock, flags);
1270	list_splice_tail(stale, &buf->rb_mws);
1271	spin_unlock_irqrestore(&buf->rb_lock, flags);
1272}
1273
1274static struct rpcrdma_req *
1275rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
1276			 struct list_head *stale)
1277{
1278	struct rpcrdma_mw *r;
1279	int i;
1280
1281	i = RPCRDMA_MAX_SEGS - 1;
1282	while (!list_empty(&buf->rb_mws)) {
1283		r = list_entry(buf->rb_mws.next,
1284			       struct rpcrdma_mw, mw_list);
1285		list_del(&r->mw_list);
1286		if (r->r.frmr.fr_state == FRMR_IS_STALE) {
1287			list_add(&r->mw_list, stale);
1288			continue;
1289		}
1290		req->rl_segments[i].rl_mw = r;
1291		if (unlikely(i-- == 0))
1292			return req;	/* Success */
1293	}
1294
1295	/* Not enough entries on rb_mws for this req */
1296	rpcrdma_buffer_put_sendbuf(req, buf);
1297	rpcrdma_buffer_put_mrs(req, buf);
1298	return NULL;
1299}
1300
1301static struct rpcrdma_req *
1302rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1303{
1304	struct rpcrdma_mw *r;
1305	int i;
1306
1307	i = RPCRDMA_MAX_SEGS - 1;
1308	while (!list_empty(&buf->rb_mws)) {
1309		r = list_entry(buf->rb_mws.next,
1310			       struct rpcrdma_mw, mw_list);
1311		list_del(&r->mw_list);
1312		req->rl_segments[i].rl_mw = r;
1313		if (unlikely(i-- == 0))
1314			return req;	/* Success */
1315	}
1316
1317	/* Not enough entries on rb_mws for this req */
1318	rpcrdma_buffer_put_sendbuf(req, buf);
1319	rpcrdma_buffer_put_mrs(req, buf);
1320	return NULL;
1321}
1322
1323/*
1324 * Get a set of request/reply buffers.
1325 *
1326 * Reply buffer (if needed) is attached to send buffer upon return.
1327 * Rule:
1328 *    rb_send_index and rb_recv_index MUST always be pointing to the
1329 *    *next* available buffer (non-NULL). They are incremented after
1330 *    removing buffers, and decremented *before* returning them.
1331 */
1332struct rpcrdma_req *
1333rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1334{
1335	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1336	struct list_head stale;
1337	struct rpcrdma_req *req;
1338	unsigned long flags;
1339
1340	spin_lock_irqsave(&buffers->rb_lock, flags);
1341	if (buffers->rb_send_index == buffers->rb_max_requests) {
1342		spin_unlock_irqrestore(&buffers->rb_lock, flags);
1343		dprintk("RPC:       %s: out of request buffers\n", __func__);
1344		return ((struct rpcrdma_req *)NULL);
1345	}
1346
1347	req = buffers->rb_send_bufs[buffers->rb_send_index];
1348	if (buffers->rb_send_index < buffers->rb_recv_index) {
1349		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1350			__func__,
1351			buffers->rb_recv_index - buffers->rb_send_index);
1352		req->rl_reply = NULL;
1353	} else {
1354		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1355		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1356	}
1357	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1358
1359	INIT_LIST_HEAD(&stale);
1360	switch (ia->ri_memreg_strategy) {
1361	case RPCRDMA_FRMR:
1362		req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
1363		break;
1364	case RPCRDMA_MTHCAFMR:
1365		req = rpcrdma_buffer_get_fmrs(req, buffers);
1366		break;
1367	default:
1368		break;
1369	}
1370	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1371	if (!list_empty(&stale))
1372		rpcrdma_retry_flushed_linv(&stale, buffers);
1373	return req;
1374}
1375
1376/*
1377 * Put request/reply buffers back into pool.
1378 * Pre-decrement counter/array index.
1379 */
1380void
1381rpcrdma_buffer_put(struct rpcrdma_req *req)
1382{
1383	struct rpcrdma_buffer *buffers = req->rl_buffer;
1384	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1385	unsigned long flags;
1386
1387	spin_lock_irqsave(&buffers->rb_lock, flags);
1388	rpcrdma_buffer_put_sendbuf(req, buffers);
1389	switch (ia->ri_memreg_strategy) {
1390	case RPCRDMA_FRMR:
1391	case RPCRDMA_MTHCAFMR:
1392		rpcrdma_buffer_put_mrs(req, buffers);
1393		break;
1394	default:
1395		break;
1396	}
1397	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1398}
1399
1400/*
1401 * Recover reply buffers from pool.
1402 * This happens when recovering from error conditions.
1403 * Post-increment counter/array index.
1404 */
1405void
1406rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1407{
1408	struct rpcrdma_buffer *buffers = req->rl_buffer;
1409	unsigned long flags;
1410
1411	spin_lock_irqsave(&buffers->rb_lock, flags);
1412	if (buffers->rb_recv_index < buffers->rb_max_requests) {
1413		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1414		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1415	}
1416	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1417}
1418
1419/*
1420 * Put reply buffers back into pool when not attached to
1421 * request. This happens in error conditions.
1422 */
1423void
1424rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1425{
1426	struct rpcrdma_buffer *buffers = rep->rr_buffer;
1427	unsigned long flags;
1428
1429	rep->rr_func = NULL;
1430	spin_lock_irqsave(&buffers->rb_lock, flags);
1431	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1432	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1433}
1434
1435/*
1436 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1437 */
1438
1439void
1440rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg)
1441{
1442	dprintk("RPC:       map_one: offset %p iova %llx len %zu\n",
1443		seg->mr_offset,
1444		(unsigned long long)seg->mr_dma, seg->mr_dmalen);
1445}
1446
1447static int
1448rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1449				struct ib_mr **mrp, struct ib_sge *iov)
1450{
1451	struct ib_phys_buf ipb;
1452	struct ib_mr *mr;
1453	int rc;
1454
1455	/*
1456	 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1457	 */
1458	iov->addr = ib_dma_map_single(ia->ri_id->device,
1459			va, len, DMA_BIDIRECTIONAL);
1460	if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
1461		return -ENOMEM;
1462
1463	iov->length = len;
1464
1465	if (ia->ri_have_dma_lkey) {
1466		*mrp = NULL;
1467		iov->lkey = ia->ri_dma_lkey;
1468		return 0;
1469	} else if (ia->ri_bind_mem != NULL) {
1470		*mrp = NULL;
1471		iov->lkey = ia->ri_bind_mem->lkey;
1472		return 0;
1473	}
1474
1475	ipb.addr = iov->addr;
1476	ipb.size = iov->length;
1477	mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1478			IB_ACCESS_LOCAL_WRITE, &iov->addr);
1479
1480	dprintk("RPC:       %s: phys convert: 0x%llx "
1481			"registered 0x%llx length %d\n",
1482			__func__, (unsigned long long)ipb.addr,
1483			(unsigned long long)iov->addr, len);
1484
1485	if (IS_ERR(mr)) {
1486		*mrp = NULL;
1487		rc = PTR_ERR(mr);
1488		dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1489	} else {
1490		*mrp = mr;
1491		iov->lkey = mr->lkey;
1492		rc = 0;
1493	}
1494
1495	return rc;
1496}
1497
1498static int
1499rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1500				struct ib_mr *mr, struct ib_sge *iov)
1501{
1502	int rc;
1503
1504	ib_dma_unmap_single(ia->ri_id->device,
1505			iov->addr, iov->length, DMA_BIDIRECTIONAL);
1506
1507	if (NULL == mr)
1508		return 0;
1509
1510	rc = ib_dereg_mr(mr);
1511	if (rc)
1512		dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1513	return rc;
1514}
1515
1516/**
1517 * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
1518 * @ia: controlling rpcrdma_ia
1519 * @size: size of buffer to be allocated, in bytes
1520 * @flags: GFP flags
1521 *
1522 * Returns pointer to private header of an area of internally
1523 * registered memory, or an ERR_PTR. The registered buffer follows
1524 * the end of the private header.
1525 *
1526 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1527 * receiving the payload of RDMA RECV operations. regbufs are not
1528 * used for RDMA READ/WRITE operations, thus are registered only for
1529 * LOCAL access.
1530 */
1531struct rpcrdma_regbuf *
1532rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
1533{
1534	struct rpcrdma_regbuf *rb;
1535	int rc;
1536
1537	rc = -ENOMEM;
1538	rb = kmalloc(sizeof(*rb) + size, flags);
1539	if (rb == NULL)
1540		goto out;
1541
1542	rb->rg_size = size;
1543	rb->rg_owner = NULL;
1544	rc = rpcrdma_register_internal(ia, rb->rg_base, size,
1545				       &rb->rg_mr, &rb->rg_iov);
1546	if (rc)
1547		goto out_free;
1548
1549	return rb;
1550
1551out_free:
1552	kfree(rb);
1553out:
1554	return ERR_PTR(rc);
1555}
1556
1557/**
1558 * rpcrdma_free_regbuf - deregister and free registered buffer
1559 * @ia: controlling rpcrdma_ia
1560 * @rb: regbuf to be deregistered and freed
1561 */
1562void
1563rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1564{
1565	if (rb) {
1566		rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov);
1567		kfree(rb);
1568	}
1569}
1570
1571/*
1572 * Prepost any receive buffer, then post send.
1573 *
1574 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1575 */
1576int
1577rpcrdma_ep_post(struct rpcrdma_ia *ia,
1578		struct rpcrdma_ep *ep,
1579		struct rpcrdma_req *req)
1580{
1581	struct ib_send_wr send_wr, *send_wr_fail;
1582	struct rpcrdma_rep *rep = req->rl_reply;
1583	int rc;
1584
1585	if (rep) {
1586		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1587		if (rc)
1588			goto out;
1589		req->rl_reply = NULL;
1590	}
1591
1592	send_wr.next = NULL;
1593	send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION;
1594	send_wr.sg_list = req->rl_send_iov;
1595	send_wr.num_sge = req->rl_niovs;
1596	send_wr.opcode = IB_WR_SEND;
1597	if (send_wr.num_sge == 4)	/* no need to sync any pad (constant) */
1598		ib_dma_sync_single_for_device(ia->ri_id->device,
1599			req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1600			DMA_TO_DEVICE);
1601	ib_dma_sync_single_for_device(ia->ri_id->device,
1602		req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1603		DMA_TO_DEVICE);
1604	ib_dma_sync_single_for_device(ia->ri_id->device,
1605		req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1606		DMA_TO_DEVICE);
1607
1608	if (DECR_CQCOUNT(ep) > 0)
1609		send_wr.send_flags = 0;
1610	else { /* Provider must take a send completion every now and then */
1611		INIT_CQCOUNT(ep);
1612		send_wr.send_flags = IB_SEND_SIGNALED;
1613	}
1614
1615	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1616	if (rc)
1617		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1618			rc);
1619out:
1620	return rc;
1621}
1622
1623/*
1624 * (Re)post a receive buffer.
1625 */
1626int
1627rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1628		     struct rpcrdma_ep *ep,
1629		     struct rpcrdma_rep *rep)
1630{
1631	struct ib_recv_wr recv_wr, *recv_wr_fail;
1632	int rc;
1633
1634	recv_wr.next = NULL;
1635	recv_wr.wr_id = (u64) (unsigned long) rep;
1636	recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1637	recv_wr.num_sge = 1;
1638
1639	ib_dma_sync_single_for_cpu(ia->ri_id->device,
1640				   rdmab_addr(rep->rr_rdmabuf),
1641				   rdmab_length(rep->rr_rdmabuf),
1642				   DMA_BIDIRECTIONAL);
1643
1644	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1645
1646	if (rc)
1647		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1648			rc);
1649	return rc;
1650}
1651
1652/* How many chunk list items fit within our inline buffers?
1653 */
1654unsigned int
1655rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt)
1656{
1657	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1658	int bytes, segments;
1659
1660	bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize);
1661	bytes -= RPCRDMA_HDRLEN_MIN;
1662	if (bytes < sizeof(struct rpcrdma_segment) * 2) {
1663		pr_warn("RPC:       %s: inline threshold too small\n",
1664			__func__);
1665		return 0;
1666	}
1667
1668	segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1);
1669	dprintk("RPC:       %s: max chunk list size = %d segments\n",
1670		__func__, segments);
1671	return segments;
1672}
1673