1/* Management of Tx window, Tx resend, ACKs and out-of-sequence reception
2 *
3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 */
11
12#include <linux/module.h>
13#include <linux/circ_buf.h>
14#include <linux/net.h>
15#include <linux/skbuff.h>
16#include <linux/slab.h>
17#include <linux/udp.h>
18#include <net/sock.h>
19#include <net/af_rxrpc.h>
20#include "ar-internal.h"
21
22/*
23 * How long to wait before scheduling ACK generation after seeing a
24 * packet with RXRPC_REQUEST_ACK set (in jiffies).
25 */
26unsigned rxrpc_requested_ack_delay = 1;
27
28/*
29 * How long to wait before scheduling an ACK with subtype DELAY (in jiffies).
30 *
31 * We use this when we've received new data packets.  If those packets aren't
32 * all consumed within this time we will send a DELAY ACK if an ACK was not
33 * requested to let the sender know it doesn't need to resend.
34 */
35unsigned rxrpc_soft_ack_delay = 1 * HZ;
36
37/*
38 * How long to wait before scheduling an ACK with subtype IDLE (in jiffies).
39 *
40 * We use this when we've consumed some previously soft-ACK'd packets when
41 * further packets aren't immediately received to decide when to send an IDLE
42 * ACK let the other end know that it can free up its Tx buffer space.
43 */
44unsigned rxrpc_idle_ack_delay = 0.5 * HZ;
45
46/*
47 * Receive window size in packets.  This indicates the maximum number of
48 * unconsumed received packets we're willing to retain in memory.  Once this
49 * limit is hit, we should generate an EXCEEDS_WINDOW ACK and discard further
50 * packets.
51 */
52unsigned rxrpc_rx_window_size = 32;
53
54/*
55 * Maximum Rx MTU size.  This indicates to the sender the size of jumbo packet
56 * made by gluing normal packets together that we're willing to handle.
57 */
58unsigned rxrpc_rx_mtu = 5692;
59
60/*
61 * The maximum number of fragments in a received jumbo packet that we tell the
62 * sender that we're willing to handle.
63 */
64unsigned rxrpc_rx_jumbo_max = 4;
65
66static const char *rxrpc_acks(u8 reason)
67{
68	static const char *const str[] = {
69		"---", "REQ", "DUP", "OOS", "WIN", "MEM", "PNG", "PNR", "DLY",
70		"IDL", "-?-"
71	};
72
73	if (reason >= ARRAY_SIZE(str))
74		reason = ARRAY_SIZE(str) - 1;
75	return str[reason];
76}
77
78static const s8 rxrpc_ack_priority[] = {
79	[0]				= 0,
80	[RXRPC_ACK_DELAY]		= 1,
81	[RXRPC_ACK_REQUESTED]		= 2,
82	[RXRPC_ACK_IDLE]		= 3,
83	[RXRPC_ACK_PING_RESPONSE]	= 4,
84	[RXRPC_ACK_DUPLICATE]		= 5,
85	[RXRPC_ACK_OUT_OF_SEQUENCE]	= 6,
86	[RXRPC_ACK_EXCEEDS_WINDOW]	= 7,
87	[RXRPC_ACK_NOSPACE]		= 8,
88};
89
90/*
91 * propose an ACK be sent
92 */
93void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
94			 __be32 serial, bool immediate)
95{
96	unsigned long expiry;
97	s8 prior = rxrpc_ack_priority[ack_reason];
98
99	ASSERTCMP(prior, >, 0);
100
101	_enter("{%d},%s,%%%x,%u",
102	       call->debug_id, rxrpc_acks(ack_reason), ntohl(serial),
103	       immediate);
104
105	if (prior < rxrpc_ack_priority[call->ackr_reason]) {
106		if (immediate)
107			goto cancel_timer;
108		return;
109	}
110
111	/* update DELAY, IDLE, REQUESTED and PING_RESPONSE ACK serial
112	 * numbers */
113	if (prior == rxrpc_ack_priority[call->ackr_reason]) {
114		if (prior <= 4)
115			call->ackr_serial = serial;
116		if (immediate)
117			goto cancel_timer;
118		return;
119	}
120
121	call->ackr_reason = ack_reason;
122	call->ackr_serial = serial;
123
124	switch (ack_reason) {
125	case RXRPC_ACK_DELAY:
126		_debug("run delay timer");
127		expiry = rxrpc_soft_ack_delay;
128		goto run_timer;
129
130	case RXRPC_ACK_IDLE:
131		if (!immediate) {
132			_debug("run defer timer");
133			expiry = rxrpc_idle_ack_delay;
134			goto run_timer;
135		}
136		goto cancel_timer;
137
138	case RXRPC_ACK_REQUESTED:
139		expiry = rxrpc_requested_ack_delay;
140		if (!expiry)
141			goto cancel_timer;
142		if (!immediate || serial == cpu_to_be32(1)) {
143			_debug("run defer timer");
144			goto run_timer;
145		}
146
147	default:
148		_debug("immediate ACK");
149		goto cancel_timer;
150	}
151
152run_timer:
153	expiry += jiffies;
154	if (!timer_pending(&call->ack_timer) ||
155	    time_after(call->ack_timer.expires, expiry))
156		mod_timer(&call->ack_timer, expiry);
157	return;
158
159cancel_timer:
160	_debug("cancel timer %%%u", ntohl(serial));
161	try_to_del_timer_sync(&call->ack_timer);
162	read_lock_bh(&call->state_lock);
163	if (call->state <= RXRPC_CALL_COMPLETE &&
164	    !test_and_set_bit(RXRPC_CALL_ACK, &call->events))
165		rxrpc_queue_call(call);
166	read_unlock_bh(&call->state_lock);
167}
168
169/*
170 * propose an ACK be sent, locking the call structure
171 */
172void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
173		       __be32 serial, bool immediate)
174{
175	s8 prior = rxrpc_ack_priority[ack_reason];
176
177	if (prior > rxrpc_ack_priority[call->ackr_reason]) {
178		spin_lock_bh(&call->lock);
179		__rxrpc_propose_ACK(call, ack_reason, serial, immediate);
180		spin_unlock_bh(&call->lock);
181	}
182}
183
184/*
185 * set the resend timer
186 */
187static void rxrpc_set_resend(struct rxrpc_call *call, u8 resend,
188			     unsigned long resend_at)
189{
190	read_lock_bh(&call->state_lock);
191	if (call->state >= RXRPC_CALL_COMPLETE)
192		resend = 0;
193
194	if (resend & 1) {
195		_debug("SET RESEND");
196		set_bit(RXRPC_CALL_RESEND, &call->events);
197	}
198
199	if (resend & 2) {
200		_debug("MODIFY RESEND TIMER");
201		set_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
202		mod_timer(&call->resend_timer, resend_at);
203	} else {
204		_debug("KILL RESEND TIMER");
205		del_timer_sync(&call->resend_timer);
206		clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
207		clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
208	}
209	read_unlock_bh(&call->state_lock);
210}
211
212/*
213 * resend packets
214 */
215static void rxrpc_resend(struct rxrpc_call *call)
216{
217	struct rxrpc_skb_priv *sp;
218	struct rxrpc_header *hdr;
219	struct sk_buff *txb;
220	unsigned long *p_txb, resend_at;
221	bool stop;
222	int loop;
223	u8 resend;
224
225	_enter("{%d,%d,%d,%d},",
226	       call->acks_hard, call->acks_unacked,
227	       atomic_read(&call->sequence),
228	       CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz));
229
230	stop = false;
231	resend = 0;
232	resend_at = 0;
233
234	for (loop = call->acks_tail;
235	     loop != call->acks_head || stop;
236	     loop = (loop + 1) &  (call->acks_winsz - 1)
237	     ) {
238		p_txb = call->acks_window + loop;
239		smp_read_barrier_depends();
240		if (*p_txb & 1)
241			continue;
242
243		txb = (struct sk_buff *) *p_txb;
244		sp = rxrpc_skb(txb);
245
246		if (sp->need_resend) {
247			sp->need_resend = false;
248
249			/* each Tx packet has a new serial number */
250			sp->hdr.serial =
251				htonl(atomic_inc_return(&call->conn->serial));
252
253			hdr = (struct rxrpc_header *) txb->head;
254			hdr->serial = sp->hdr.serial;
255
256			_proto("Tx DATA %%%u { #%d }",
257			       ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
258			if (rxrpc_send_packet(call->conn->trans, txb) < 0) {
259				stop = true;
260				sp->resend_at = jiffies + 3;
261			} else {
262				sp->resend_at =
263					jiffies + rxrpc_resend_timeout;
264			}
265		}
266
267		if (time_after_eq(jiffies + 1, sp->resend_at)) {
268			sp->need_resend = true;
269			resend |= 1;
270		} else if (resend & 2) {
271			if (time_before(sp->resend_at, resend_at))
272				resend_at = sp->resend_at;
273		} else {
274			resend_at = sp->resend_at;
275			resend |= 2;
276		}
277	}
278
279	rxrpc_set_resend(call, resend, resend_at);
280	_leave("");
281}
282
283/*
284 * handle resend timer expiry
285 */
286static void rxrpc_resend_timer(struct rxrpc_call *call)
287{
288	struct rxrpc_skb_priv *sp;
289	struct sk_buff *txb;
290	unsigned long *p_txb, resend_at;
291	int loop;
292	u8 resend;
293
294	_enter("%d,%d,%d",
295	       call->acks_tail, call->acks_unacked, call->acks_head);
296
297	if (call->state >= RXRPC_CALL_COMPLETE)
298		return;
299
300	resend = 0;
301	resend_at = 0;
302
303	for (loop = call->acks_unacked;
304	     loop != call->acks_head;
305	     loop = (loop + 1) &  (call->acks_winsz - 1)
306	     ) {
307		p_txb = call->acks_window + loop;
308		smp_read_barrier_depends();
309		txb = (struct sk_buff *) (*p_txb & ~1);
310		sp = rxrpc_skb(txb);
311
312		ASSERT(!(*p_txb & 1));
313
314		if (sp->need_resend) {
315			;
316		} else if (time_after_eq(jiffies + 1, sp->resend_at)) {
317			sp->need_resend = true;
318			resend |= 1;
319		} else if (resend & 2) {
320			if (time_before(sp->resend_at, resend_at))
321				resend_at = sp->resend_at;
322		} else {
323			resend_at = sp->resend_at;
324			resend |= 2;
325		}
326	}
327
328	rxrpc_set_resend(call, resend, resend_at);
329	_leave("");
330}
331
332/*
333 * process soft ACKs of our transmitted packets
334 * - these indicate packets the peer has or has not received, but hasn't yet
335 *   given to the consumer, and so can still be discarded and re-requested
336 */
337static int rxrpc_process_soft_ACKs(struct rxrpc_call *call,
338				   struct rxrpc_ackpacket *ack,
339				   struct sk_buff *skb)
340{
341	struct rxrpc_skb_priv *sp;
342	struct sk_buff *txb;
343	unsigned long *p_txb, resend_at;
344	int loop;
345	u8 sacks[RXRPC_MAXACKS], resend;
346
347	_enter("{%d,%d},{%d},",
348	       call->acks_hard,
349	       CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz),
350	       ack->nAcks);
351
352	if (skb_copy_bits(skb, 0, sacks, ack->nAcks) < 0)
353		goto protocol_error;
354
355	resend = 0;
356	resend_at = 0;
357	for (loop = 0; loop < ack->nAcks; loop++) {
358		p_txb = call->acks_window;
359		p_txb += (call->acks_tail + loop) & (call->acks_winsz - 1);
360		smp_read_barrier_depends();
361		txb = (struct sk_buff *) (*p_txb & ~1);
362		sp = rxrpc_skb(txb);
363
364		switch (sacks[loop]) {
365		case RXRPC_ACK_TYPE_ACK:
366			sp->need_resend = false;
367			*p_txb |= 1;
368			break;
369		case RXRPC_ACK_TYPE_NACK:
370			sp->need_resend = true;
371			*p_txb &= ~1;
372			resend = 1;
373			break;
374		default:
375			_debug("Unsupported ACK type %d", sacks[loop]);
376			goto protocol_error;
377		}
378	}
379
380	smp_mb();
381	call->acks_unacked = (call->acks_tail + loop) & (call->acks_winsz - 1);
382
383	/* anything not explicitly ACK'd is implicitly NACK'd, but may just not
384	 * have been received or processed yet by the far end */
385	for (loop = call->acks_unacked;
386	     loop != call->acks_head;
387	     loop = (loop + 1) &  (call->acks_winsz - 1)
388	     ) {
389		p_txb = call->acks_window + loop;
390		smp_read_barrier_depends();
391		txb = (struct sk_buff *) (*p_txb & ~1);
392		sp = rxrpc_skb(txb);
393
394		if (*p_txb & 1) {
395			/* packet must have been discarded */
396			sp->need_resend = true;
397			*p_txb &= ~1;
398			resend |= 1;
399		} else if (sp->need_resend) {
400			;
401		} else if (time_after_eq(jiffies + 1, sp->resend_at)) {
402			sp->need_resend = true;
403			resend |= 1;
404		} else if (resend & 2) {
405			if (time_before(sp->resend_at, resend_at))
406				resend_at = sp->resend_at;
407		} else {
408			resend_at = sp->resend_at;
409			resend |= 2;
410		}
411	}
412
413	rxrpc_set_resend(call, resend, resend_at);
414	_leave(" = 0");
415	return 0;
416
417protocol_error:
418	_leave(" = -EPROTO");
419	return -EPROTO;
420}
421
422/*
423 * discard hard-ACK'd packets from the Tx window
424 */
425static void rxrpc_rotate_tx_window(struct rxrpc_call *call, u32 hard)
426{
427	unsigned long _skb;
428	int tail = call->acks_tail, old_tail;
429	int win = CIRC_CNT(call->acks_head, tail, call->acks_winsz);
430
431	_enter("{%u,%u},%u", call->acks_hard, win, hard);
432
433	ASSERTCMP(hard - call->acks_hard, <=, win);
434
435	while (call->acks_hard < hard) {
436		smp_read_barrier_depends();
437		_skb = call->acks_window[tail] & ~1;
438		rxrpc_free_skb((struct sk_buff *) _skb);
439		old_tail = tail;
440		tail = (tail + 1) & (call->acks_winsz - 1);
441		call->acks_tail = tail;
442		if (call->acks_unacked == old_tail)
443			call->acks_unacked = tail;
444		call->acks_hard++;
445	}
446
447	wake_up(&call->tx_waitq);
448}
449
450/*
451 * clear the Tx window in the event of a failure
452 */
453static void rxrpc_clear_tx_window(struct rxrpc_call *call)
454{
455	rxrpc_rotate_tx_window(call, atomic_read(&call->sequence));
456}
457
458/*
459 * drain the out of sequence received packet queue into the packet Rx queue
460 */
461static int rxrpc_drain_rx_oos_queue(struct rxrpc_call *call)
462{
463	struct rxrpc_skb_priv *sp;
464	struct sk_buff *skb;
465	bool terminal;
466	int ret;
467
468	_enter("{%d,%d}", call->rx_data_post, call->rx_first_oos);
469
470	spin_lock_bh(&call->lock);
471
472	ret = -ECONNRESET;
473	if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
474		goto socket_unavailable;
475
476	skb = skb_dequeue(&call->rx_oos_queue);
477	if (skb) {
478		sp = rxrpc_skb(skb);
479
480		_debug("drain OOS packet %d [%d]",
481		       ntohl(sp->hdr.seq), call->rx_first_oos);
482
483		if (ntohl(sp->hdr.seq) != call->rx_first_oos) {
484			skb_queue_head(&call->rx_oos_queue, skb);
485			call->rx_first_oos = ntohl(rxrpc_skb(skb)->hdr.seq);
486			_debug("requeue %p {%u}", skb, call->rx_first_oos);
487		} else {
488			skb->mark = RXRPC_SKB_MARK_DATA;
489			terminal = ((sp->hdr.flags & RXRPC_LAST_PACKET) &&
490				!(sp->hdr.flags & RXRPC_CLIENT_INITIATED));
491			ret = rxrpc_queue_rcv_skb(call, skb, true, terminal);
492			BUG_ON(ret < 0);
493			_debug("drain #%u", call->rx_data_post);
494			call->rx_data_post++;
495
496			/* find out what the next packet is */
497			skb = skb_peek(&call->rx_oos_queue);
498			if (skb)
499				call->rx_first_oos =
500					ntohl(rxrpc_skb(skb)->hdr.seq);
501			else
502				call->rx_first_oos = 0;
503			_debug("peek %p {%u}", skb, call->rx_first_oos);
504		}
505	}
506
507	ret = 0;
508socket_unavailable:
509	spin_unlock_bh(&call->lock);
510	_leave(" = %d", ret);
511	return ret;
512}
513
514/*
515 * insert an out of sequence packet into the buffer
516 */
517static void rxrpc_insert_oos_packet(struct rxrpc_call *call,
518				    struct sk_buff *skb)
519{
520	struct rxrpc_skb_priv *sp, *psp;
521	struct sk_buff *p;
522	u32 seq;
523
524	sp = rxrpc_skb(skb);
525	seq = ntohl(sp->hdr.seq);
526	_enter(",,{%u}", seq);
527
528	skb->destructor = rxrpc_packet_destructor;
529	ASSERTCMP(sp->call, ==, NULL);
530	sp->call = call;
531	rxrpc_get_call(call);
532
533	/* insert into the buffer in sequence order */
534	spin_lock_bh(&call->lock);
535
536	skb_queue_walk(&call->rx_oos_queue, p) {
537		psp = rxrpc_skb(p);
538		if (ntohl(psp->hdr.seq) > seq) {
539			_debug("insert oos #%u before #%u",
540			       seq, ntohl(psp->hdr.seq));
541			skb_insert(p, skb, &call->rx_oos_queue);
542			goto inserted;
543		}
544	}
545
546	_debug("append oos #%u", seq);
547	skb_queue_tail(&call->rx_oos_queue, skb);
548inserted:
549
550	/* we might now have a new front to the queue */
551	if (call->rx_first_oos == 0 || seq < call->rx_first_oos)
552		call->rx_first_oos = seq;
553
554	read_lock(&call->state_lock);
555	if (call->state < RXRPC_CALL_COMPLETE &&
556	    call->rx_data_post == call->rx_first_oos) {
557		_debug("drain rx oos now");
558		set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events);
559	}
560	read_unlock(&call->state_lock);
561
562	spin_unlock_bh(&call->lock);
563	_leave(" [stored #%u]", call->rx_first_oos);
564}
565
566/*
567 * clear the Tx window on final ACK reception
568 */
569static void rxrpc_zap_tx_window(struct rxrpc_call *call)
570{
571	struct rxrpc_skb_priv *sp;
572	struct sk_buff *skb;
573	unsigned long _skb, *acks_window;
574	u8 winsz = call->acks_winsz;
575	int tail;
576
577	acks_window = call->acks_window;
578	call->acks_window = NULL;
579
580	while (CIRC_CNT(call->acks_head, call->acks_tail, winsz) > 0) {
581		tail = call->acks_tail;
582		smp_read_barrier_depends();
583		_skb = acks_window[tail] & ~1;
584		smp_mb();
585		call->acks_tail = (call->acks_tail + 1) & (winsz - 1);
586
587		skb = (struct sk_buff *) _skb;
588		sp = rxrpc_skb(skb);
589		_debug("+++ clear Tx %u", ntohl(sp->hdr.seq));
590		rxrpc_free_skb(skb);
591	}
592
593	kfree(acks_window);
594}
595
596/*
597 * process the extra information that may be appended to an ACK packet
598 */
599static void rxrpc_extract_ackinfo(struct rxrpc_call *call, struct sk_buff *skb,
600				  unsigned int latest, int nAcks)
601{
602	struct rxrpc_ackinfo ackinfo;
603	struct rxrpc_peer *peer;
604	unsigned int mtu;
605
606	if (skb_copy_bits(skb, nAcks + 3, &ackinfo, sizeof(ackinfo)) < 0) {
607		_leave(" [no ackinfo]");
608		return;
609	}
610
611	_proto("Rx ACK %%%u Info { rx=%u max=%u rwin=%u jm=%u }",
612	       latest,
613	       ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU),
614	       ntohl(ackinfo.rwind), ntohl(ackinfo.jumbo_max));
615
616	mtu = min(ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU));
617
618	peer = call->conn->trans->peer;
619	if (mtu < peer->maxdata) {
620		spin_lock_bh(&peer->lock);
621		peer->maxdata = mtu;
622		peer->mtu = mtu + peer->hdrsize;
623		spin_unlock_bh(&peer->lock);
624		_net("Net MTU %u (maxdata %u)", peer->mtu, peer->maxdata);
625	}
626}
627
628/*
629 * process packets in the reception queue
630 */
631static int rxrpc_process_rx_queue(struct rxrpc_call *call,
632				  u32 *_abort_code)
633{
634	struct rxrpc_ackpacket ack;
635	struct rxrpc_skb_priv *sp;
636	struct sk_buff *skb;
637	bool post_ACK;
638	int latest;
639	u32 hard, tx;
640
641	_enter("");
642
643process_further:
644	skb = skb_dequeue(&call->rx_queue);
645	if (!skb)
646		return -EAGAIN;
647
648	_net("deferred skb %p", skb);
649
650	sp = rxrpc_skb(skb);
651
652	_debug("process %s [st %d]", rxrpc_pkts[sp->hdr.type], call->state);
653
654	post_ACK = false;
655
656	switch (sp->hdr.type) {
657		/* data packets that wind up here have been received out of
658		 * order, need security processing or are jumbo packets */
659	case RXRPC_PACKET_TYPE_DATA:
660		_proto("OOSQ DATA %%%u { #%u }",
661		       ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
662
663		/* secured packets must be verified and possibly decrypted */
664		if (rxrpc_verify_packet(call, skb, _abort_code) < 0)
665			goto protocol_error;
666
667		rxrpc_insert_oos_packet(call, skb);
668		goto process_further;
669
670		/* partial ACK to process */
671	case RXRPC_PACKET_TYPE_ACK:
672		if (skb_copy_bits(skb, 0, &ack, sizeof(ack)) < 0) {
673			_debug("extraction failure");
674			goto protocol_error;
675		}
676		if (!skb_pull(skb, sizeof(ack)))
677			BUG();
678
679		latest = ntohl(sp->hdr.serial);
680		hard = ntohl(ack.firstPacket);
681		tx = atomic_read(&call->sequence);
682
683		_proto("Rx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
684		       latest,
685		       ntohs(ack.maxSkew),
686		       hard,
687		       ntohl(ack.previousPacket),
688		       ntohl(ack.serial),
689		       rxrpc_acks(ack.reason),
690		       ack.nAcks);
691
692		rxrpc_extract_ackinfo(call, skb, latest, ack.nAcks);
693
694		if (ack.reason == RXRPC_ACK_PING) {
695			_proto("Rx ACK %%%u PING Request", latest);
696			rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE,
697					  sp->hdr.serial, true);
698		}
699
700		/* discard any out-of-order or duplicate ACKs */
701		if (latest - call->acks_latest <= 0) {
702			_debug("discard ACK %d <= %d",
703			       latest, call->acks_latest);
704			goto discard;
705		}
706		call->acks_latest = latest;
707
708		if (call->state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
709		    call->state != RXRPC_CALL_CLIENT_AWAIT_REPLY &&
710		    call->state != RXRPC_CALL_SERVER_SEND_REPLY &&
711		    call->state != RXRPC_CALL_SERVER_AWAIT_ACK)
712			goto discard;
713
714		_debug("Tx=%d H=%u S=%d", tx, call->acks_hard, call->state);
715
716		if (hard > 0) {
717			if (hard - 1 > tx) {
718				_debug("hard-ACK'd packet %d not transmitted"
719				       " (%d top)",
720				       hard - 1, tx);
721				goto protocol_error;
722			}
723
724			if ((call->state == RXRPC_CALL_CLIENT_AWAIT_REPLY ||
725			     call->state == RXRPC_CALL_SERVER_AWAIT_ACK) &&
726			    hard > tx)
727				goto all_acked;
728
729			smp_rmb();
730			rxrpc_rotate_tx_window(call, hard - 1);
731		}
732
733		if (ack.nAcks > 0) {
734			if (hard - 1 + ack.nAcks > tx) {
735				_debug("soft-ACK'd packet %d+%d not"
736				       " transmitted (%d top)",
737				       hard - 1, ack.nAcks, tx);
738				goto protocol_error;
739			}
740
741			if (rxrpc_process_soft_ACKs(call, &ack, skb) < 0)
742				goto protocol_error;
743		}
744		goto discard;
745
746		/* complete ACK to process */
747	case RXRPC_PACKET_TYPE_ACKALL:
748		goto all_acked;
749
750		/* abort and busy are handled elsewhere */
751	case RXRPC_PACKET_TYPE_BUSY:
752	case RXRPC_PACKET_TYPE_ABORT:
753		BUG();
754
755		/* connection level events - also handled elsewhere */
756	case RXRPC_PACKET_TYPE_CHALLENGE:
757	case RXRPC_PACKET_TYPE_RESPONSE:
758	case RXRPC_PACKET_TYPE_DEBUG:
759		BUG();
760	}
761
762	/* if we've had a hard ACK that covers all the packets we've sent, then
763	 * that ends that phase of the operation */
764all_acked:
765	write_lock_bh(&call->state_lock);
766	_debug("ack all %d", call->state);
767
768	switch (call->state) {
769	case RXRPC_CALL_CLIENT_AWAIT_REPLY:
770		call->state = RXRPC_CALL_CLIENT_RECV_REPLY;
771		break;
772	case RXRPC_CALL_SERVER_AWAIT_ACK:
773		_debug("srv complete");
774		call->state = RXRPC_CALL_COMPLETE;
775		post_ACK = true;
776		break;
777	case RXRPC_CALL_CLIENT_SEND_REQUEST:
778	case RXRPC_CALL_SERVER_RECV_REQUEST:
779		goto protocol_error_unlock; /* can't occur yet */
780	default:
781		write_unlock_bh(&call->state_lock);
782		goto discard; /* assume packet left over from earlier phase */
783	}
784
785	write_unlock_bh(&call->state_lock);
786
787	/* if all the packets we sent are hard-ACK'd, then we can discard
788	 * whatever we've got left */
789	_debug("clear Tx %d",
790	       CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz));
791
792	del_timer_sync(&call->resend_timer);
793	clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
794	clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
795
796	if (call->acks_window)
797		rxrpc_zap_tx_window(call);
798
799	if (post_ACK) {
800		/* post the final ACK message for userspace to pick up */
801		_debug("post ACK");
802		skb->mark = RXRPC_SKB_MARK_FINAL_ACK;
803		sp->call = call;
804		rxrpc_get_call(call);
805		spin_lock_bh(&call->lock);
806		if (rxrpc_queue_rcv_skb(call, skb, true, true) < 0)
807			BUG();
808		spin_unlock_bh(&call->lock);
809		goto process_further;
810	}
811
812discard:
813	rxrpc_free_skb(skb);
814	goto process_further;
815
816protocol_error_unlock:
817	write_unlock_bh(&call->state_lock);
818protocol_error:
819	rxrpc_free_skb(skb);
820	_leave(" = -EPROTO");
821	return -EPROTO;
822}
823
824/*
825 * post a message to the socket Rx queue for recvmsg() to pick up
826 */
827static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error,
828			      bool fatal)
829{
830	struct rxrpc_skb_priv *sp;
831	struct sk_buff *skb;
832	int ret;
833
834	_enter("{%d,%lx},%u,%u,%d",
835	       call->debug_id, call->flags, mark, error, fatal);
836
837	/* remove timers and things for fatal messages */
838	if (fatal) {
839		del_timer_sync(&call->resend_timer);
840		del_timer_sync(&call->ack_timer);
841		clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
842	}
843
844	if (mark != RXRPC_SKB_MARK_NEW_CALL &&
845	    !test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
846		_leave("[no userid]");
847		return 0;
848	}
849
850	if (!test_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags)) {
851		skb = alloc_skb(0, GFP_NOFS);
852		if (!skb)
853			return -ENOMEM;
854
855		rxrpc_new_skb(skb);
856
857		skb->mark = mark;
858
859		sp = rxrpc_skb(skb);
860		memset(sp, 0, sizeof(*sp));
861		sp->error = error;
862		sp->call = call;
863		rxrpc_get_call(call);
864
865		spin_lock_bh(&call->lock);
866		ret = rxrpc_queue_rcv_skb(call, skb, true, fatal);
867		spin_unlock_bh(&call->lock);
868		BUG_ON(ret < 0);
869	}
870
871	return 0;
872}
873
874/*
875 * handle background processing of incoming call packets and ACK / abort
876 * generation
877 */
878void rxrpc_process_call(struct work_struct *work)
879{
880	struct rxrpc_call *call =
881		container_of(work, struct rxrpc_call, processor);
882	struct rxrpc_ackpacket ack;
883	struct rxrpc_ackinfo ackinfo;
884	struct rxrpc_header hdr;
885	struct msghdr msg;
886	struct kvec iov[5];
887	unsigned long bits;
888	__be32 data, pad;
889	size_t len;
890	int genbit, loop, nbit, ioc, ret, mtu;
891	u32 abort_code = RX_PROTOCOL_ERROR;
892	u8 *acks = NULL;
893
894	//printk("\n--------------------\n");
895	_enter("{%d,%s,%lx} [%lu]",
896	       call->debug_id, rxrpc_call_states[call->state], call->events,
897	       (jiffies - call->creation_jif) / (HZ / 10));
898
899	if (test_and_set_bit(RXRPC_CALL_PROC_BUSY, &call->flags)) {
900		_debug("XXXXXXXXXXXXX RUNNING ON MULTIPLE CPUS XXXXXXXXXXXXX");
901		return;
902	}
903
904	/* there's a good chance we're going to have to send a message, so set
905	 * one up in advance */
906	msg.msg_name	= &call->conn->trans->peer->srx.transport.sin;
907	msg.msg_namelen	= sizeof(call->conn->trans->peer->srx.transport.sin);
908	msg.msg_control	= NULL;
909	msg.msg_controllen = 0;
910	msg.msg_flags	= 0;
911
912	hdr.epoch	= call->conn->epoch;
913	hdr.cid		= call->cid;
914	hdr.callNumber	= call->call_id;
915	hdr.seq		= 0;
916	hdr.type	= RXRPC_PACKET_TYPE_ACK;
917	hdr.flags	= call->conn->out_clientflag;
918	hdr.userStatus	= 0;
919	hdr.securityIndex = call->conn->security_ix;
920	hdr._rsvd	= 0;
921	hdr.serviceId	= call->conn->service_id;
922
923	memset(iov, 0, sizeof(iov));
924	iov[0].iov_base	= &hdr;
925	iov[0].iov_len	= sizeof(hdr);
926
927	/* deal with events of a final nature */
928	if (test_bit(RXRPC_CALL_RELEASE, &call->events)) {
929		rxrpc_release_call(call);
930		clear_bit(RXRPC_CALL_RELEASE, &call->events);
931	}
932
933	if (test_bit(RXRPC_CALL_RCVD_ERROR, &call->events)) {
934		int error;
935
936		clear_bit(RXRPC_CALL_CONN_ABORT, &call->events);
937		clear_bit(RXRPC_CALL_REJECT_BUSY, &call->events);
938		clear_bit(RXRPC_CALL_ABORT, &call->events);
939
940		error = call->conn->trans->peer->net_error;
941		_debug("post net error %d", error);
942
943		if (rxrpc_post_message(call, RXRPC_SKB_MARK_NET_ERROR,
944				       error, true) < 0)
945			goto no_mem;
946		clear_bit(RXRPC_CALL_RCVD_ERROR, &call->events);
947		goto kill_ACKs;
948	}
949
950	if (test_bit(RXRPC_CALL_CONN_ABORT, &call->events)) {
951		ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
952
953		clear_bit(RXRPC_CALL_REJECT_BUSY, &call->events);
954		clear_bit(RXRPC_CALL_ABORT, &call->events);
955
956		_debug("post conn abort");
957
958		if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
959				       call->conn->error, true) < 0)
960			goto no_mem;
961		clear_bit(RXRPC_CALL_CONN_ABORT, &call->events);
962		goto kill_ACKs;
963	}
964
965	if (test_bit(RXRPC_CALL_REJECT_BUSY, &call->events)) {
966		hdr.type = RXRPC_PACKET_TYPE_BUSY;
967		genbit = RXRPC_CALL_REJECT_BUSY;
968		goto send_message;
969	}
970
971	if (test_bit(RXRPC_CALL_ABORT, &call->events)) {
972		ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
973
974		if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
975				       ECONNABORTED, true) < 0)
976			goto no_mem;
977		hdr.type = RXRPC_PACKET_TYPE_ABORT;
978		data = htonl(call->abort_code);
979		iov[1].iov_base = &data;
980		iov[1].iov_len = sizeof(data);
981		genbit = RXRPC_CALL_ABORT;
982		goto send_message;
983	}
984
985	if (test_bit(RXRPC_CALL_ACK_FINAL, &call->events)) {
986		genbit = RXRPC_CALL_ACK_FINAL;
987
988		ack.bufferSpace	= htons(8);
989		ack.maxSkew	= 0;
990		ack.serial	= 0;
991		ack.reason	= RXRPC_ACK_IDLE;
992		ack.nAcks	= 0;
993		call->ackr_reason = 0;
994
995		spin_lock_bh(&call->lock);
996		ack.serial = call->ackr_serial;
997		ack.previousPacket = call->ackr_prev_seq;
998		ack.firstPacket = htonl(call->rx_data_eaten + 1);
999		spin_unlock_bh(&call->lock);
1000
1001		pad = 0;
1002
1003		iov[1].iov_base = &ack;
1004		iov[1].iov_len	= sizeof(ack);
1005		iov[2].iov_base = &pad;
1006		iov[2].iov_len	= 3;
1007		iov[3].iov_base = &ackinfo;
1008		iov[3].iov_len	= sizeof(ackinfo);
1009		goto send_ACK;
1010	}
1011
1012	if (call->events & ((1 << RXRPC_CALL_RCVD_BUSY) |
1013			    (1 << RXRPC_CALL_RCVD_ABORT))
1014	    ) {
1015		u32 mark;
1016
1017		if (test_bit(RXRPC_CALL_RCVD_ABORT, &call->events))
1018			mark = RXRPC_SKB_MARK_REMOTE_ABORT;
1019		else
1020			mark = RXRPC_SKB_MARK_BUSY;
1021
1022		_debug("post abort/busy");
1023		rxrpc_clear_tx_window(call);
1024		if (rxrpc_post_message(call, mark, ECONNABORTED, true) < 0)
1025			goto no_mem;
1026
1027		clear_bit(RXRPC_CALL_RCVD_BUSY, &call->events);
1028		clear_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
1029		goto kill_ACKs;
1030	}
1031
1032	if (test_and_clear_bit(RXRPC_CALL_RCVD_ACKALL, &call->events)) {
1033		_debug("do implicit ackall");
1034		rxrpc_clear_tx_window(call);
1035	}
1036
1037	if (test_bit(RXRPC_CALL_LIFE_TIMER, &call->events)) {
1038		write_lock_bh(&call->state_lock);
1039		if (call->state <= RXRPC_CALL_COMPLETE) {
1040			call->state = RXRPC_CALL_LOCALLY_ABORTED;
1041			call->abort_code = RX_CALL_TIMEOUT;
1042			set_bit(RXRPC_CALL_ABORT, &call->events);
1043		}
1044		write_unlock_bh(&call->state_lock);
1045
1046		_debug("post timeout");
1047		if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
1048				       ETIME, true) < 0)
1049			goto no_mem;
1050
1051		clear_bit(RXRPC_CALL_LIFE_TIMER, &call->events);
1052		goto kill_ACKs;
1053	}
1054
1055	/* deal with assorted inbound messages */
1056	if (!skb_queue_empty(&call->rx_queue)) {
1057		switch (rxrpc_process_rx_queue(call, &abort_code)) {
1058		case 0:
1059		case -EAGAIN:
1060			break;
1061		case -ENOMEM:
1062			goto no_mem;
1063		case -EKEYEXPIRED:
1064		case -EKEYREJECTED:
1065		case -EPROTO:
1066			rxrpc_abort_call(call, abort_code);
1067			goto kill_ACKs;
1068		}
1069	}
1070
1071	/* handle resending */
1072	if (test_and_clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events))
1073		rxrpc_resend_timer(call);
1074	if (test_and_clear_bit(RXRPC_CALL_RESEND, &call->events))
1075		rxrpc_resend(call);
1076
1077	/* consider sending an ordinary ACK */
1078	if (test_bit(RXRPC_CALL_ACK, &call->events)) {
1079		_debug("send ACK: window: %d - %d { %lx }",
1080		       call->rx_data_eaten, call->ackr_win_top,
1081		       call->ackr_window[0]);
1082
1083		if (call->state > RXRPC_CALL_SERVER_ACK_REQUEST &&
1084		    call->ackr_reason != RXRPC_ACK_PING_RESPONSE) {
1085			/* ACK by sending reply DATA packet in this state */
1086			clear_bit(RXRPC_CALL_ACK, &call->events);
1087			goto maybe_reschedule;
1088		}
1089
1090		genbit = RXRPC_CALL_ACK;
1091
1092		acks = kzalloc(call->ackr_win_top - call->rx_data_eaten,
1093			       GFP_NOFS);
1094		if (!acks)
1095			goto no_mem;
1096
1097		//hdr.flags	= RXRPC_SLOW_START_OK;
1098		ack.bufferSpace	= htons(8);
1099		ack.maxSkew	= 0;
1100		ack.serial	= 0;
1101		ack.reason	= 0;
1102
1103		spin_lock_bh(&call->lock);
1104		ack.reason = call->ackr_reason;
1105		ack.serial = call->ackr_serial;
1106		ack.previousPacket = call->ackr_prev_seq;
1107		ack.firstPacket = htonl(call->rx_data_eaten + 1);
1108
1109		ack.nAcks = 0;
1110		for (loop = 0; loop < RXRPC_ACKR_WINDOW_ASZ; loop++) {
1111			nbit = loop * BITS_PER_LONG;
1112			for (bits = call->ackr_window[loop]; bits; bits >>= 1
1113			     ) {
1114				_debug("- l=%d n=%d b=%lx", loop, nbit, bits);
1115				if (bits & 1) {
1116					acks[nbit] = RXRPC_ACK_TYPE_ACK;
1117					ack.nAcks = nbit + 1;
1118				}
1119				nbit++;
1120			}
1121		}
1122		call->ackr_reason = 0;
1123		spin_unlock_bh(&call->lock);
1124
1125		pad = 0;
1126
1127		iov[1].iov_base = &ack;
1128		iov[1].iov_len	= sizeof(ack);
1129		iov[2].iov_base = acks;
1130		iov[2].iov_len	= ack.nAcks;
1131		iov[3].iov_base = &pad;
1132		iov[3].iov_len	= 3;
1133		iov[4].iov_base = &ackinfo;
1134		iov[4].iov_len	= sizeof(ackinfo);
1135
1136		switch (ack.reason) {
1137		case RXRPC_ACK_REQUESTED:
1138		case RXRPC_ACK_DUPLICATE:
1139		case RXRPC_ACK_OUT_OF_SEQUENCE:
1140		case RXRPC_ACK_EXCEEDS_WINDOW:
1141		case RXRPC_ACK_NOSPACE:
1142		case RXRPC_ACK_PING:
1143		case RXRPC_ACK_PING_RESPONSE:
1144			goto send_ACK_with_skew;
1145		case RXRPC_ACK_DELAY:
1146		case RXRPC_ACK_IDLE:
1147			goto send_ACK;
1148		}
1149	}
1150
1151	/* handle completion of security negotiations on an incoming
1152	 * connection */
1153	if (test_and_clear_bit(RXRPC_CALL_SECURED, &call->events)) {
1154		_debug("secured");
1155		spin_lock_bh(&call->lock);
1156
1157		if (call->state == RXRPC_CALL_SERVER_SECURING) {
1158			_debug("securing");
1159			write_lock(&call->conn->lock);
1160			if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1161			    !test_bit(RXRPC_CALL_RELEASE, &call->events)) {
1162				_debug("not released");
1163				call->state = RXRPC_CALL_SERVER_ACCEPTING;
1164				list_move_tail(&call->accept_link,
1165					       &call->socket->acceptq);
1166			}
1167			write_unlock(&call->conn->lock);
1168			read_lock(&call->state_lock);
1169			if (call->state < RXRPC_CALL_COMPLETE)
1170				set_bit(RXRPC_CALL_POST_ACCEPT, &call->events);
1171			read_unlock(&call->state_lock);
1172		}
1173
1174		spin_unlock_bh(&call->lock);
1175		if (!test_bit(RXRPC_CALL_POST_ACCEPT, &call->events))
1176			goto maybe_reschedule;
1177	}
1178
1179	/* post a notification of an acceptable connection to the app */
1180	if (test_bit(RXRPC_CALL_POST_ACCEPT, &call->events)) {
1181		_debug("post accept");
1182		if (rxrpc_post_message(call, RXRPC_SKB_MARK_NEW_CALL,
1183				       0, false) < 0)
1184			goto no_mem;
1185		clear_bit(RXRPC_CALL_POST_ACCEPT, &call->events);
1186		goto maybe_reschedule;
1187	}
1188
1189	/* handle incoming call acceptance */
1190	if (test_and_clear_bit(RXRPC_CALL_ACCEPTED, &call->events)) {
1191		_debug("accepted");
1192		ASSERTCMP(call->rx_data_post, ==, 0);
1193		call->rx_data_post = 1;
1194		read_lock_bh(&call->state_lock);
1195		if (call->state < RXRPC_CALL_COMPLETE)
1196			set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events);
1197		read_unlock_bh(&call->state_lock);
1198	}
1199
1200	/* drain the out of sequence received packet queue into the packet Rx
1201	 * queue */
1202	if (test_and_clear_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events)) {
1203		while (call->rx_data_post == call->rx_first_oos)
1204			if (rxrpc_drain_rx_oos_queue(call) < 0)
1205				break;
1206		goto maybe_reschedule;
1207	}
1208
1209	/* other events may have been raised since we started checking */
1210	goto maybe_reschedule;
1211
1212send_ACK_with_skew:
1213	ack.maxSkew = htons(atomic_read(&call->conn->hi_serial) -
1214			    ntohl(ack.serial));
1215send_ACK:
1216	mtu = call->conn->trans->peer->if_mtu;
1217	mtu -= call->conn->trans->peer->hdrsize;
1218	ackinfo.maxMTU	= htonl(mtu);
1219	ackinfo.rwind	= htonl(rxrpc_rx_window_size);
1220
1221	/* permit the peer to send us jumbo packets if it wants to */
1222	ackinfo.rxMTU	= htonl(rxrpc_rx_mtu);
1223	ackinfo.jumbo_max = htonl(rxrpc_rx_jumbo_max);
1224
1225	hdr.serial = htonl(atomic_inc_return(&call->conn->serial));
1226	_proto("Tx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
1227	       ntohl(hdr.serial),
1228	       ntohs(ack.maxSkew),
1229	       ntohl(ack.firstPacket),
1230	       ntohl(ack.previousPacket),
1231	       ntohl(ack.serial),
1232	       rxrpc_acks(ack.reason),
1233	       ack.nAcks);
1234
1235	del_timer_sync(&call->ack_timer);
1236	if (ack.nAcks > 0)
1237		set_bit(RXRPC_CALL_TX_SOFT_ACK, &call->flags);
1238	goto send_message_2;
1239
1240send_message:
1241	_debug("send message");
1242
1243	hdr.serial = htonl(atomic_inc_return(&call->conn->serial));
1244	_proto("Tx %s %%%u", rxrpc_pkts[hdr.type], ntohl(hdr.serial));
1245send_message_2:
1246
1247	len = iov[0].iov_len;
1248	ioc = 1;
1249	if (iov[4].iov_len) {
1250		ioc = 5;
1251		len += iov[4].iov_len;
1252		len += iov[3].iov_len;
1253		len += iov[2].iov_len;
1254		len += iov[1].iov_len;
1255	} else if (iov[3].iov_len) {
1256		ioc = 4;
1257		len += iov[3].iov_len;
1258		len += iov[2].iov_len;
1259		len += iov[1].iov_len;
1260	} else if (iov[2].iov_len) {
1261		ioc = 3;
1262		len += iov[2].iov_len;
1263		len += iov[1].iov_len;
1264	} else if (iov[1].iov_len) {
1265		ioc = 2;
1266		len += iov[1].iov_len;
1267	}
1268
1269	ret = kernel_sendmsg(call->conn->trans->local->socket,
1270			     &msg, iov, ioc, len);
1271	if (ret < 0) {
1272		_debug("sendmsg failed: %d", ret);
1273		read_lock_bh(&call->state_lock);
1274		if (call->state < RXRPC_CALL_DEAD)
1275			rxrpc_queue_call(call);
1276		read_unlock_bh(&call->state_lock);
1277		goto error;
1278	}
1279
1280	switch (genbit) {
1281	case RXRPC_CALL_ABORT:
1282		clear_bit(genbit, &call->events);
1283		clear_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
1284		goto kill_ACKs;
1285
1286	case RXRPC_CALL_ACK_FINAL:
1287		write_lock_bh(&call->state_lock);
1288		if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK)
1289			call->state = RXRPC_CALL_COMPLETE;
1290		write_unlock_bh(&call->state_lock);
1291		goto kill_ACKs;
1292
1293	default:
1294		clear_bit(genbit, &call->events);
1295		switch (call->state) {
1296		case RXRPC_CALL_CLIENT_AWAIT_REPLY:
1297		case RXRPC_CALL_CLIENT_RECV_REPLY:
1298		case RXRPC_CALL_SERVER_RECV_REQUEST:
1299		case RXRPC_CALL_SERVER_ACK_REQUEST:
1300			_debug("start ACK timer");
1301			rxrpc_propose_ACK(call, RXRPC_ACK_DELAY,
1302					  call->ackr_serial, false);
1303		default:
1304			break;
1305		}
1306		goto maybe_reschedule;
1307	}
1308
1309kill_ACKs:
1310	del_timer_sync(&call->ack_timer);
1311	if (test_and_clear_bit(RXRPC_CALL_ACK_FINAL, &call->events))
1312		rxrpc_put_call(call);
1313	clear_bit(RXRPC_CALL_ACK, &call->events);
1314
1315maybe_reschedule:
1316	if (call->events || !skb_queue_empty(&call->rx_queue)) {
1317		read_lock_bh(&call->state_lock);
1318		if (call->state < RXRPC_CALL_DEAD)
1319			rxrpc_queue_call(call);
1320		read_unlock_bh(&call->state_lock);
1321	}
1322
1323	/* don't leave aborted connections on the accept queue */
1324	if (call->state >= RXRPC_CALL_COMPLETE &&
1325	    !list_empty(&call->accept_link)) {
1326		_debug("X unlinking once-pending call %p { e=%lx f=%lx c=%x }",
1327		       call, call->events, call->flags,
1328		       ntohl(call->conn->cid));
1329
1330		read_lock_bh(&call->state_lock);
1331		if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1332		    !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
1333			rxrpc_queue_call(call);
1334		read_unlock_bh(&call->state_lock);
1335	}
1336
1337error:
1338	clear_bit(RXRPC_CALL_PROC_BUSY, &call->flags);
1339	kfree(acks);
1340
1341	/* because we don't want two CPUs both processing the work item for one
1342	 * call at the same time, we use a flag to note when it's busy; however
1343	 * this means there's a race between clearing the flag and setting the
1344	 * work pending bit and the work item being processed again */
1345	if (call->events && !work_pending(&call->processor)) {
1346		_debug("jumpstart %x", ntohl(call->conn->cid));
1347		rxrpc_queue_call(call);
1348	}
1349
1350	_leave("");
1351	return;
1352
1353no_mem:
1354	_debug("out of memory");
1355	goto maybe_reschedule;
1356}
1357