linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH net-next 0/3] rxrpc: Miscellaneous improvements
@ 2016-08-23 15:28 David Howells
  2016-08-23 15:29 ` [PATCH net-next 1/3] rxrpc: Set connection expiry on idle, not put David Howells
                   ` (3 more replies)
  0 siblings, 4 replies; 5+ messages in thread
From: David Howells @ 2016-08-23 15:28 UTC (permalink / raw)
  To: netdev; +Cc: dhowells, linux-afs, linux-kernel



Here are some improvements that are part of the AF_RXRPC rewrite.  They
need to be applied on top of the just posted cleanups.

 (1) Set the connection expiry on the connection becoming idle when its
     last currently active call completes rather than each time put is
     called.

     This means that the connection isn't held open by retransmissions,
     pings and duplicate packets.  Future patches will limit the number of
     live connections that the kernel will support, so making sure that old
     connections don't overstay their welcome is necessary.

 (2) Calculate packet serial skew in the UDP data_ready callback rather
     than in the call processor on a work queue.  Deferring it like this
     causes the skew to be elevated by further packets coming in before we
     get to make the calculation.

 (3) Move retransmission of the terminal ACK or ABORT packet for a
     connection to the connection processor, using the terminal state
     cached in the rxrpc_connection struct.  This means that once last_call
     is set in a channel to the current call's ID, no more packets will be
     routed to that rxrpc_call struct.

The patches can be found here also:

	http://git.kernel.org/cgit/linux/kernel/git/dhowells/linux-fs.git/log/?h=rxrpc-rewrite

Tagged thusly:

	git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs.git
	rxrpc-rewrite-20160823-2

David
---
David Howells (3):
      rxrpc: Set connection expiry on idle, not put
      rxrpc: Calculate serial skew on packet reception
      rxrpc: Perform terminal call ACK/ABORT retransmission from conn processor


 net/rxrpc/ar-internal.h |   25 ++++++++--
 net/rxrpc/call_event.c  |   18 ++++---
 net/rxrpc/conn_event.c  |  113 +++++++++++++++++++++++++++++++++++++++++++++++
 net/rxrpc/conn_object.c |   52 +++++++++++-----------
 net/rxrpc/input.c       |   68 ++++++++++++++++++++++------
 net/rxrpc/proc.c        |    2 -
 net/rxrpc/skbuff.c      |   10 ++--
 7 files changed, 226 insertions(+), 62 deletions(-)

^ permalink raw reply	[flat|nested] 5+ messages in thread

* [PATCH net-next 1/3] rxrpc: Set connection expiry on idle, not put
  2016-08-23 15:28 [PATCH net-next 0/3] rxrpc: Miscellaneous improvements David Howells
@ 2016-08-23 15:29 ` David Howells
  2016-08-23 15:29 ` [PATCH net-next 2/3] rxrpc: Calculate serial skew on packet reception David Howells
                   ` (2 subsequent siblings)
  3 siblings, 0 replies; 5+ messages in thread
From: David Howells @ 2016-08-23 15:29 UTC (permalink / raw)
  To: netdev; +Cc: dhowells, linux-afs, linux-kernel

Set the connection expiry time when a connection becomes idle rather than
doing this in rxrpc_put_connection().  This makes the put path more
efficient (it is likely to be called occasionally whilst a connection has
outstanding calls because active workqueue items needs to be given a ref).

The time is also preset in the connection allocator in case the connection
never gets used.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/ar-internal.h |   11 +++++++++--
 net/rxrpc/conn_object.c |   42 +++++++++++++++++-------------------------
 2 files changed, 26 insertions(+), 27 deletions(-)

diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 8cb517fbbd23..66c917077880 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -313,7 +313,7 @@ struct rxrpc_connection {
 	struct rxrpc_crypt	csum_iv;	/* packet checksum base */
 	unsigned long		flags;
 	unsigned long		events;
-	unsigned long		put_time;	/* Time at which last put */
+	unsigned long		idle_timestamp;	/* Time at which last became idle */
 	spinlock_t		state_lock;	/* state-change lock */
 	atomic_t		usage;
 	enum rxrpc_conn_proto_state state : 8;	/* current state of connection */
@@ -565,7 +565,7 @@ struct rxrpc_connection *rxrpc_find_connection_rcu(struct rxrpc_local *,
 						   struct sk_buff *);
 void __rxrpc_disconnect_call(struct rxrpc_call *);
 void rxrpc_disconnect_call(struct rxrpc_call *);
-void rxrpc_put_connection(struct rxrpc_connection *);
+void __rxrpc_put_connection(struct rxrpc_connection *);
 void __exit rxrpc_destroy_all_connections(void);
 
 static inline bool rxrpc_conn_is_client(const struct rxrpc_connection *conn)
@@ -589,6 +589,13 @@ struct rxrpc_connection *rxrpc_get_connection_maybe(struct rxrpc_connection *con
 	return atomic_inc_not_zero(&conn->usage) ? conn : NULL;
 }
 
+static inline void rxrpc_put_connection(struct rxrpc_connection *conn)
+{
+	if (conn && atomic_dec_return(&conn->usage) == 1)
+		__rxrpc_put_connection(conn);
+}
+
+
 static inline bool rxrpc_queue_conn(struct rxrpc_connection *conn)
 {
 	if (!rxrpc_get_connection_maybe(conn))
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index 6a5a17efc538..743f0bb4aaa8 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -56,6 +56,7 @@ struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
 		atomic_set(&conn->avail_chans, RXRPC_MAXCALLS);
 		conn->size_align = 4;
 		conn->header_size = sizeof(struct rxrpc_wire_header);
+		conn->idle_timestamp = jiffies;
 	}
 
 	_leave(" = %p{%d}", conn, conn ? conn->debug_id : 0);
@@ -191,29 +192,16 @@ void rxrpc_disconnect_call(struct rxrpc_call *call)
 	spin_unlock(&conn->channel_lock);
 
 	call->conn = NULL;
+	conn->idle_timestamp = jiffies;
 	rxrpc_put_connection(conn);
 }
 
 /*
  * release a virtual connection
  */
-void rxrpc_put_connection(struct rxrpc_connection *conn)
+void __rxrpc_put_connection(struct rxrpc_connection *conn)
 {
-	if (!conn)
-		return;
-
-	_enter("%p{u=%d,d=%d}",
-	       conn, atomic_read(&conn->usage), conn->debug_id);
-
-	ASSERTCMP(atomic_read(&conn->usage), >, 1);
-
-	conn->put_time = ktime_get_seconds();
-	if (atomic_dec_return(&conn->usage) == 1) {
-		_debug("zombie");
-		rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0);
-	}
-
-	_leave("");
+	rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0);
 }
 
 /*
@@ -248,14 +236,14 @@ static void rxrpc_destroy_connection(struct rcu_head *rcu)
 static void rxrpc_connection_reaper(struct work_struct *work)
 {
 	struct rxrpc_connection *conn, *_p;
-	unsigned long reap_older_than, earliest, put_time, now;
+	unsigned long reap_older_than, earliest, idle_timestamp, now;
 
 	LIST_HEAD(graveyard);
 
 	_enter("");
 
-	now = ktime_get_seconds();
-	reap_older_than =  now - rxrpc_connection_expiry;
+	now = jiffies;
+	reap_older_than = now - rxrpc_connection_expiry * HZ;
 	earliest = ULONG_MAX;
 
 	write_lock(&rxrpc_connection_lock);
@@ -264,10 +252,14 @@ static void rxrpc_connection_reaper(struct work_struct *work)
 		if (likely(atomic_read(&conn->usage) > 1))
 			continue;
 
-		put_time = READ_ONCE(conn->put_time);
-		if (time_after(put_time, reap_older_than)) {
-			if (time_before(put_time, earliest))
-				earliest = put_time;
+		idle_timestamp = READ_ONCE(conn->idle_timestamp);
+		_debug("reap CONN %d { u=%d,t=%ld }",
+		       conn->debug_id, atomic_read(&conn->usage),
+		       (long)reap_older_than - (long)idle_timestamp);
+
+		if (time_after(idle_timestamp, reap_older_than)) {
+			if (time_before(idle_timestamp, earliest))
+				earliest = idle_timestamp;
 			continue;
 		}
 
@@ -288,9 +280,9 @@ static void rxrpc_connection_reaper(struct work_struct *work)
 
 	if (earliest != ULONG_MAX) {
 		_debug("reschedule reaper %ld", (long) earliest - now);
-		ASSERTCMP(earliest, >, now);
+		ASSERT(time_after(earliest, now));
 		rxrpc_queue_delayed_work(&rxrpc_connection_reap,
-					 (earliest - now) * HZ);
+					 earliest - now);
 	}
 
 	while (!list_empty(&graveyard)) {

^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH net-next 2/3] rxrpc: Calculate serial skew on packet reception
  2016-08-23 15:28 [PATCH net-next 0/3] rxrpc: Miscellaneous improvements David Howells
  2016-08-23 15:29 ` [PATCH net-next 1/3] rxrpc: Set connection expiry on idle, not put David Howells
@ 2016-08-23 15:29 ` David Howells
  2016-08-23 15:29 ` [PATCH net-next 3/3] rxrpc: Perform terminal call ACK/ABORT retransmission from conn processor David Howells
  2016-08-24  0:21 ` [PATCH net-next 0/3] rxrpc: Miscellaneous improvements David Miller
  3 siblings, 0 replies; 5+ messages in thread
From: David Howells @ 2016-08-23 15:29 UTC (permalink / raw)
  To: netdev; +Cc: dhowells, linux-afs, linux-kernel

Calculate the serial number skew in the data_ready handler when a packet
has been received and a connection looked up.  The skew is cached in the
sk_buff's priority field.

The connection highest received serial number is updated at this time also.
This can be done without locks or atomic instructions because, at this
point, the code is serialised by the socket.

This generates more accurate skew data because if the packet is offloaded
to a work queue before this is determined, more packets may come in,
bumping the highest serial number and thereby increasing the apparent skew.

This also removes some unnecessary atomic ops.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/ar-internal.h |    7 ++++---
 net/rxrpc/call_event.c  |   18 ++++++++++--------
 net/rxrpc/input.c       |   37 +++++++++++++++++++++++--------------
 net/rxrpc/proc.c        |    2 +-
 net/rxrpc/skbuff.c      |   10 +++++-----
 5 files changed, 43 insertions(+), 31 deletions(-)

diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 66c917077880..c779b50135f6 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -322,7 +322,7 @@ struct rxrpc_connection {
 	int			error;		/* local error incurred */
 	int			debug_id;	/* debug ID for printks */
 	atomic_t		serial;		/* packet serial number counter */
-	atomic_t		hi_serial;	/* highest serial number received */
+	unsigned int		hi_serial;	/* highest serial number received */
 	atomic_t		avail_chans;	/* number of channels available */
 	u8			size_align;	/* data size alignment (for security) */
 	u8			header_size;	/* rxrpc + security header size */
@@ -457,6 +457,7 @@ struct rxrpc_call {
 	rxrpc_seq_t		ackr_win_top;	/* top of ACK window (rx_data_eaten is bottom) */
 	rxrpc_seq_t		ackr_prev_seq;	/* previous sequence number received */
 	u8			ackr_reason;	/* reason to ACK */
+	u16			ackr_skew;	/* skew on packet being ACK'd */
 	rxrpc_serial_t		ackr_serial;	/* serial of packet being ACK'd */
 	atomic_t		ackr_not_idle;	/* number of packets in Rx queue */
 
@@ -499,8 +500,8 @@ int rxrpc_reject_call(struct rxrpc_sock *);
 /*
  * call_event.c
  */
-void __rxrpc_propose_ACK(struct rxrpc_call *, u8, u32, bool);
-void rxrpc_propose_ACK(struct rxrpc_call *, u8, u32, bool);
+void __rxrpc_propose_ACK(struct rxrpc_call *, u8, u16, u32, bool);
+void rxrpc_propose_ACK(struct rxrpc_call *, u8, u16, u32, bool);
 void rxrpc_process_call(struct work_struct *);
 
 /*
diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c
index 3d1267cea9ea..3d1961d82325 100644
--- a/net/rxrpc/call_event.c
+++ b/net/rxrpc/call_event.c
@@ -25,7 +25,7 @@
  * propose an ACK be sent
  */
 void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
-			 u32 serial, bool immediate)
+			 u16 skew, u32 serial, bool immediate)
 {
 	unsigned long expiry;
 	s8 prior = rxrpc_ack_priority[ack_reason];
@@ -44,8 +44,10 @@ void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
 	/* update DELAY, IDLE, REQUESTED and PING_RESPONSE ACK serial
 	 * numbers */
 	if (prior == rxrpc_ack_priority[call->ackr_reason]) {
-		if (prior <= 4)
+		if (prior <= 4) {
+			call->ackr_skew = skew;
 			call->ackr_serial = serial;
+		}
 		if (immediate)
 			goto cancel_timer;
 		return;
@@ -103,13 +105,13 @@ cancel_timer:
  * propose an ACK be sent, locking the call structure
  */
 void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
-		       u32 serial, bool immediate)
+		       u16 skew, u32 serial, bool immediate)
 {
 	s8 prior = rxrpc_ack_priority[ack_reason];
 
 	if (prior > rxrpc_ack_priority[call->ackr_reason]) {
 		spin_lock_bh(&call->lock);
-		__rxrpc_propose_ACK(call, ack_reason, serial, immediate);
+		__rxrpc_propose_ACK(call, ack_reason, skew, serial, immediate);
 		spin_unlock_bh(&call->lock);
 	}
 }
@@ -628,7 +630,7 @@ process_further:
 		if (ack.reason == RXRPC_ACK_PING) {
 			_proto("Rx ACK %%%u PING Request", latest);
 			rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE,
-					  sp->hdr.serial, true);
+					  skb->priority, sp->hdr.serial, true);
 		}
 
 		/* discard any out-of-order or duplicate ACKs */
@@ -1153,8 +1155,7 @@ skip_msg_init:
 	goto maybe_reschedule;
 
 send_ACK_with_skew:
-	ack.maxSkew = htons(atomic_read(&call->conn->hi_serial) -
-			    ntohl(ack.serial));
+	ack.maxSkew = htons(call->ackr_skew);
 send_ACK:
 	mtu = call->conn->params.peer->if_mtu;
 	mtu -= call->conn->params.peer->hdrsize;
@@ -1244,7 +1245,8 @@ send_message_2:
 		case RXRPC_CALL_SERVER_ACK_REQUEST:
 			_debug("start ACK timer");
 			rxrpc_propose_ACK(call, RXRPC_ACK_DELAY,
-					  call->ackr_serial, false);
+					  call->ackr_skew, call->ackr_serial,
+					  false);
 		default:
 			break;
 		}
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 70bb77818dea..34f7431bf494 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -125,6 +125,7 @@ static int rxrpc_fast_process_data(struct rxrpc_call *call,
 	bool terminal;
 	int ret, ackbit, ack;
 	u32 serial;
+	u16 skew;
 	u8 flags;
 
 	_enter("{%u,%u},,{%u}", call->rx_data_post, call->rx_first_oos, seq);
@@ -133,6 +134,7 @@ static int rxrpc_fast_process_data(struct rxrpc_call *call,
 	ASSERTCMP(sp->call, ==, NULL);
 	flags = sp->hdr.flags;
 	serial = sp->hdr.serial;
+	skew = skb->priority;
 
 	spin_lock(&call->lock);
 
@@ -231,7 +233,7 @@ static int rxrpc_fast_process_data(struct rxrpc_call *call,
 
 	spin_unlock(&call->lock);
 	atomic_inc(&call->ackr_not_idle);
-	rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial, false);
+	rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, skew, serial, false);
 	_leave(" = 0 [posted]");
 	return 0;
 
@@ -244,7 +246,7 @@ out:
 
 discard_and_ack:
 	_debug("discard and ACK packet %p", skb);
-	__rxrpc_propose_ACK(call, ack, serial, true);
+	__rxrpc_propose_ACK(call, ack, skew, serial, true);
 discard:
 	spin_unlock(&call->lock);
 	rxrpc_free_skb(skb);
@@ -252,7 +254,7 @@ discard:
 	return 0;
 
 enqueue_and_ack:
-	__rxrpc_propose_ACK(call, ack, serial, true);
+	__rxrpc_propose_ACK(call, ack, skew, serial, true);
 enqueue_packet:
 	_net("defer skb %p", skb);
 	spin_unlock(&call->lock);
@@ -304,7 +306,7 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb)
 {
 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 	__be32 wtmp;
-	u32 hi_serial, abort_code;
+	u32 abort_code;
 
 	_enter("%p,%p", call, skb);
 
@@ -321,18 +323,12 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb)
 	}
 #endif
 
-	/* track the latest serial number on this connection for ACK packet
-	 * information */
-	hi_serial = atomic_read(&call->conn->hi_serial);
-	while (sp->hdr.serial > hi_serial)
-		hi_serial = atomic_cmpxchg(&call->conn->hi_serial, hi_serial,
-					   sp->hdr.serial);
-
 	/* request ACK generation for any ACK or DATA packet that requests
 	 * it */
 	if (sp->hdr.flags & RXRPC_REQUEST_ACK) {
 		_proto("ACK Requested on %%%u", sp->hdr.serial);
-		rxrpc_propose_ACK(call, RXRPC_ACK_REQUESTED, sp->hdr.serial, false);
+		rxrpc_propose_ACK(call, RXRPC_ACK_REQUESTED,
+				  skb->priority, sp->hdr.serial, false);
 	}
 
 	switch (sp->hdr.type) {
@@ -637,7 +633,7 @@ void rxrpc_data_ready(struct sock *sk)
 	struct rxrpc_skb_priv *sp;
 	struct rxrpc_local *local = sk->sk_user_data;
 	struct sk_buff *skb;
-	int ret;
+	int ret, skew;
 
 	_enter("%p", sk);
 
@@ -700,8 +696,21 @@ void rxrpc_data_ready(struct sock *sk)
 	rcu_read_lock();
 
 	conn = rxrpc_find_connection_rcu(local, skb);
-	if (!conn)
+	if (!conn) {
+		skb->priority = 0;
 		goto cant_route_call;
+	}
+
+	/* Note the serial number skew here */
+	skew = (int)sp->hdr.serial - (int)conn->hi_serial;
+	if (skew >= 0) {
+		if (skew > 0)
+			conn->hi_serial = sp->hdr.serial;
+		skb->priority = 0;
+	} else {
+		skew = -skew;
+		skb->priority = min(skew, 65535);
+	}
 
 	if (sp->hdr.callNumber == 0) {
 		/* Connection-level packet */
diff --git a/net/rxrpc/proc.c b/net/rxrpc/proc.c
index f92de18b5893..31b7f36a39cb 100644
--- a/net/rxrpc/proc.c
+++ b/net/rxrpc/proc.c
@@ -165,7 +165,7 @@ static int rxrpc_connection_seq_show(struct seq_file *seq, void *v)
 		   rxrpc_conn_states[conn->state],
 		   key_serial(conn->params.key),
 		   atomic_read(&conn->serial),
-		   atomic_read(&conn->hi_serial));
+		   conn->hi_serial);
 
 	return 0;
 }
diff --git a/net/rxrpc/skbuff.c b/net/rxrpc/skbuff.c
index d28058a97bc1..fbd8c74d9505 100644
--- a/net/rxrpc/skbuff.c
+++ b/net/rxrpc/skbuff.c
@@ -53,9 +53,9 @@ static void rxrpc_request_final_ACK(struct rxrpc_call *call)
 /*
  * drop the bottom ACK off of the call ACK window and advance the window
  */
-static void rxrpc_hard_ACK_data(struct rxrpc_call *call,
-				struct rxrpc_skb_priv *sp)
+static void rxrpc_hard_ACK_data(struct rxrpc_call *call, struct sk_buff *skb)
 {
+	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 	int loop;
 	u32 seq;
 
@@ -91,8 +91,8 @@ static void rxrpc_hard_ACK_data(struct rxrpc_call *call,
 		 * its Tx bufferage.
 		 */
 		_debug("send Rx idle ACK");
-		__rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, sp->hdr.serial,
-				    false);
+		__rxrpc_propose_ACK(call, RXRPC_ACK_IDLE,
+				    skb->priority, sp->hdr.serial, false);
 	}
 
 	spin_unlock_bh(&call->lock);
@@ -125,7 +125,7 @@ void rxrpc_kernel_data_consumed(struct rxrpc_call *call, struct sk_buff *skb)
 	ASSERTCMP(sp->hdr.seq, >, call->rx_data_eaten);
 
 	call->rx_data_recv = sp->hdr.seq;
-	rxrpc_hard_ACK_data(call, sp);
+	rxrpc_hard_ACK_data(call, skb);
 }
 EXPORT_SYMBOL(rxrpc_kernel_data_consumed);
 

^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH net-next 3/3] rxrpc: Perform terminal call ACK/ABORT retransmission from conn processor
  2016-08-23 15:28 [PATCH net-next 0/3] rxrpc: Miscellaneous improvements David Howells
  2016-08-23 15:29 ` [PATCH net-next 1/3] rxrpc: Set connection expiry on idle, not put David Howells
  2016-08-23 15:29 ` [PATCH net-next 2/3] rxrpc: Calculate serial skew on packet reception David Howells
@ 2016-08-23 15:29 ` David Howells
  2016-08-24  0:21 ` [PATCH net-next 0/3] rxrpc: Miscellaneous improvements David Miller
  3 siblings, 0 replies; 5+ messages in thread
From: David Howells @ 2016-08-23 15:29 UTC (permalink / raw)
  To: netdev; +Cc: dhowells, linux-afs, linux-kernel

Perform terminal call ACK/ABORT retransmission in the connection processor
rather than in the call processor.  With this change, once last_call is
set, no more incoming packets will be routed to the corresponding call or
any earlier calls on that channel (call IDs must only increase on a channel
on a connection).

Further, if a packet's callNumber is before the last_call ID or a packet is
aimed at successfully completed service call then that packet is discarded
and ignored.

Signed-off-by: David Howells <dhowells@redhat.com>
---

 net/rxrpc/ar-internal.h |    7 ++-
 net/rxrpc/conn_event.c  |  113 +++++++++++++++++++++++++++++++++++++++++++++++
 net/rxrpc/conn_object.c |   10 ++++
 net/rxrpc/input.c       |   31 ++++++++++++-
 4 files changed, 157 insertions(+), 4 deletions(-)

diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index c779b50135f6..7296039c537a 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -295,7 +295,12 @@ struct rxrpc_connection {
 		u32			call_id;	/* ID of current call */
 		u32			call_counter;	/* Call ID counter */
 		u32			last_call;	/* ID of last call */
-		u32			last_result;	/* Result of last call (0/abort) */
+		u8			last_type;	/* Type of last packet */
+		u16			last_service_id;
+		union {
+			u32		last_seq;
+			u32		last_abort;
+		};
 	} channels[RXRPC_MAXCALLS];
 	wait_queue_head_t	channel_wq;	/* queue to wait for channel to become available */
 
diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c
index c631d926f4db..c1c6b7f305d1 100644
--- a/net/rxrpc/conn_event.c
+++ b/net/rxrpc/conn_event.c
@@ -25,6 +25,113 @@
 #include "ar-internal.h"
 
 /*
+ * Retransmit terminal ACK or ABORT of the previous call.
+ */
+static void rxrpc_conn_retransmit(struct rxrpc_connection *conn,
+				  struct sk_buff *skb)
+{
+	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+	struct rxrpc_channel *chan;
+	struct msghdr msg;
+	struct kvec iov;
+	struct {
+		struct rxrpc_wire_header whdr;
+		union {
+			struct {
+				__be32 code;
+			} abort;
+			struct {
+				struct rxrpc_ackpacket ack;
+				struct rxrpc_ackinfo info;
+			};
+		};
+	} __attribute__((packed)) pkt;
+	size_t len;
+	u32 serial, mtu, call_id;
+
+	_enter("%d", conn->debug_id);
+
+	chan = &conn->channels[sp->hdr.cid & RXRPC_CHANNELMASK];
+
+	/* If the last call got moved on whilst we were waiting to run, just
+	 * ignore this packet.
+	 */
+	call_id = READ_ONCE(chan->last_call);
+	/* Sync with __rxrpc_disconnect_call() */
+	smp_rmb();
+	if (call_id != sp->hdr.callNumber)
+		return;
+
+	msg.msg_name	= &conn->params.peer->srx.transport;
+	msg.msg_namelen	= conn->params.peer->srx.transport_len;
+	msg.msg_control	= NULL;
+	msg.msg_controllen = 0;
+	msg.msg_flags	= 0;
+
+	pkt.whdr.epoch		= htonl(sp->hdr.epoch);
+	pkt.whdr.cid		= htonl(sp->hdr.cid);
+	pkt.whdr.callNumber	= htonl(sp->hdr.callNumber);
+	pkt.whdr.seq		= 0;
+	pkt.whdr.type		= chan->last_type;
+	pkt.whdr.flags		= conn->out_clientflag;
+	pkt.whdr.userStatus	= 0;
+	pkt.whdr.securityIndex	= conn->security_ix;
+	pkt.whdr._rsvd		= 0;
+	pkt.whdr.serviceId	= htons(chan->last_service_id);
+
+	len = sizeof(pkt.whdr);
+	switch (chan->last_type) {
+	case RXRPC_PACKET_TYPE_ABORT:
+		pkt.abort.code	= htonl(chan->last_abort);
+		len += sizeof(pkt.abort);
+		break;
+
+	case RXRPC_PACKET_TYPE_ACK:
+		mtu = conn->params.peer->if_mtu;
+		mtu -= conn->params.peer->hdrsize;
+		pkt.ack.bufferSpace	= 0;
+		pkt.ack.maxSkew		= htons(skb->priority);
+		pkt.ack.firstPacket	= htonl(chan->last_seq);
+		pkt.ack.previousPacket	= htonl(chan->last_seq - 1);
+		pkt.ack.serial		= htonl(sp->hdr.serial);
+		pkt.ack.reason		= RXRPC_ACK_DUPLICATE;
+		pkt.ack.nAcks		= 0;
+		pkt.info.rxMTU		= htonl(rxrpc_rx_mtu);
+		pkt.info.maxMTU		= htonl(mtu);
+		pkt.info.rwind		= htonl(rxrpc_rx_window_size);
+		pkt.info.jumbo_max	= htonl(rxrpc_rx_jumbo_max);
+		len += sizeof(pkt.ack) + sizeof(pkt.info);
+		break;
+	}
+
+	/* Resync with __rxrpc_disconnect_call() and check that the last call
+	 * didn't get advanced whilst we were filling out the packets.
+	 */
+	smp_rmb();
+	if (READ_ONCE(chan->last_call) != call_id)
+		return;
+
+	iov.iov_base	= &pkt;
+	iov.iov_len	= len;
+
+	serial = atomic_inc_return(&conn->serial);
+	pkt.whdr.serial = htonl(serial);
+
+	switch (chan->last_type) {
+	case RXRPC_PACKET_TYPE_ABORT:
+		_proto("Tx ABORT %%%u { %d } [re]", serial, conn->local_abort);
+		break;
+	case RXRPC_PACKET_TYPE_ACK:
+		_proto("Tx ACK %%%u [re]", serial);
+		break;
+	}
+
+	kernel_sendmsg(conn->params.local->socket, &msg, &iov, 1, len);
+	_leave("");
+	return;
+}
+
+/*
  * pass a connection-level abort onto all calls on that connection
  */
 static void rxrpc_abort_calls(struct rxrpc_connection *conn, int state,
@@ -166,6 +273,12 @@ static int rxrpc_process_event(struct rxrpc_connection *conn,
 	_enter("{%d},{%u,%%%u},", conn->debug_id, sp->hdr.type, sp->hdr.serial);
 
 	switch (sp->hdr.type) {
+	case RXRPC_PACKET_TYPE_DATA:
+	case RXRPC_PACKET_TYPE_ACK:
+		rxrpc_conn_retransmit(conn, skb);
+		rxrpc_free_skb(skb);
+		return 0;
+
 	case RXRPC_PACKET_TYPE_ABORT:
 		if (skb_copy_bits(skb, 0, &wtmp, sizeof(wtmp)) < 0)
 			return -EPROTO;
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index 743f0bb4aaa8..b4af37ebb112 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -166,7 +166,15 @@ void __rxrpc_disconnect_call(struct rxrpc_call *call)
 		/* Save the result of the call so that we can repeat it if necessary
 		 * through the channel, whilst disposing of the actual call record.
 		 */
-		chan->last_result = call->local_abort;
+		chan->last_service_id = call->service_id;
+		if (call->local_abort) {
+			chan->last_abort = call->local_abort;
+			chan->last_type = RXRPC_PACKET_TYPE_ABORT;
+		} else {
+			chan->last_seq = call->rx_data_eaten;
+			chan->last_type = RXRPC_PACKET_TYPE_ACK;
+		}
+		/* Sync with rxrpc_conn_retransmit(). */
 		smp_wmb();
 		chan->last_call = chan->call_id;
 		chan->call_id = chan->call_counter;
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 34f7431bf494..66cdeb56f44f 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -566,7 +566,8 @@ done:
 
 /*
  * post connection-level events to the connection
- * - this includes challenges, responses and some aborts
+ * - this includes challenges, responses, some aborts and call terminal packet
+ *   retransmission.
  */
 static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
 				      struct sk_buff *skb)
@@ -716,18 +717,44 @@ void rxrpc_data_ready(struct sock *sk)
 		/* Connection-level packet */
 		_debug("CONN %p {%d}", conn, conn->debug_id);
 		rxrpc_post_packet_to_conn(conn, skb);
+		goto out_unlock;
 	} else {
 		/* Call-bound packets are routed by connection channel. */
 		unsigned int channel = sp->hdr.cid & RXRPC_CHANNELMASK;
 		struct rxrpc_channel *chan = &conn->channels[channel];
-		struct rxrpc_call *call = rcu_dereference(chan->call);
+		struct rxrpc_call *call;
+
+		/* Ignore really old calls */
+		if (sp->hdr.callNumber < chan->last_call)
+			goto discard_unlock;
+
+		if (sp->hdr.callNumber == chan->last_call) {
+			/* For the previous service call, if completed
+			 * successfully, we discard all further packets.
+			 */
+			if (rxrpc_conn_is_service(call->conn) &&
+			    (chan->last_type == RXRPC_PACKET_TYPE_ACK ||
+			     sp->hdr.type == RXRPC_PACKET_TYPE_ABORT))
+				goto discard_unlock;
+
+			/* But otherwise we need to retransmit the final packet
+			 * from data cached in the connection record.
+			 */
+			rxrpc_post_packet_to_conn(conn, skb);
+			goto out_unlock;
+		}
 
+		call = rcu_dereference(chan->call);
 		if (!call || atomic_read(&call->usage) == 0)
 			goto cant_route_call;
 
 		rxrpc_post_packet_to_call(call, skb);
+		goto out_unlock;
 	}
 
+discard_unlock:
+	rxrpc_free_skb(skb);
+out_unlock:
 	rcu_read_unlock();
 out:
 	return;

^ permalink raw reply related	[flat|nested] 5+ messages in thread

* Re: [PATCH net-next 0/3] rxrpc: Miscellaneous improvements
  2016-08-23 15:28 [PATCH net-next 0/3] rxrpc: Miscellaneous improvements David Howells
                   ` (2 preceding siblings ...)
  2016-08-23 15:29 ` [PATCH net-next 3/3] rxrpc: Perform terminal call ACK/ABORT retransmission from conn processor David Howells
@ 2016-08-24  0:21 ` David Miller
  3 siblings, 0 replies; 5+ messages in thread
From: David Miller @ 2016-08-24  0:21 UTC (permalink / raw)
  To: dhowells; +Cc: netdev, linux-afs, linux-kernel

From: David Howells <dhowells@redhat.com>
Date: Tue, 23 Aug 2016 16:28:54 +0100

> Here are some improvements that are part of the AF_RXRPC rewrite.  They
> need to be applied on top of the just posted cleanups.
 ...
> 	git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs.git
> 	rxrpc-rewrite-20160823-2

Also pulled, thanks David.

^ permalink raw reply	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2016-08-24  0:30 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-08-23 15:28 [PATCH net-next 0/3] rxrpc: Miscellaneous improvements David Howells
2016-08-23 15:29 ` [PATCH net-next 1/3] rxrpc: Set connection expiry on idle, not put David Howells
2016-08-23 15:29 ` [PATCH net-next 2/3] rxrpc: Calculate serial skew on packet reception David Howells
2016-08-23 15:29 ` [PATCH net-next 3/3] rxrpc: Perform terminal call ACK/ABORT retransmission from conn processor David Howells
2016-08-24  0:21 ` [PATCH net-next 0/3] rxrpc: Miscellaneous improvements David Miller

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).