All of lore.kernel.org
 help / color / mirror / Atom feed
From: Bjorn Andersson <bjorn.andersson@linaro.org>
To: "David S. Miller" <davem@davemloft.net>
Cc: Arun Kumar Neelakantam <aneela@codeaurora.org>,
	Chris Lew <clew@codeaurora.org>,
	netdev@vger.kernel.org, linux-kernel@vger.kernel.org,
	linux-arm-msm@vger.kernel.org
Subject: [PATCH v4 2/5] net: qrtr: Implement outgoing flow control
Date: Mon, 13 Jan 2020 23:57:00 -0800	[thread overview]
Message-ID: <20200114075703.2145718-3-bjorn.andersson@linaro.org> (raw)
In-Reply-To: <20200114075703.2145718-1-bjorn.andersson@linaro.org>

In order to prevent overconsumption of resources on the remote side QRTR
implements a flow control mechanism.

The mechanism works by the sender keeping track of the number of
outstanding unconfirmed messages that has been transmitted to a
particular node/port pair.

Upon count reaching a low watermark (L) the confirm_rx bit is set in the
outgoing message and when the count reaching a high watermark (H)
transmission will be blocked upon the reception of a resume_tx message
from the remote, that resets the counter to 0.

This guarantees that there will be at most 2H - L messages in flight.
Values chosen for L and H are 5 and 10 respectively.

Signed-off-by: Bjorn Andersson <bjorn.andersson@linaro.org>
---

Changes since v3:
- Fix the xmas tree
- Wrap radix_tree_lookup() in rcu_read_lock()
- Initialize qrtr_tx_lock

 net/qrtr/qrtr.c | 194 ++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 187 insertions(+), 7 deletions(-)

diff --git a/net/qrtr/qrtr.c b/net/qrtr/qrtr.c
index 6c56a8ce83ef..304d536c7353 100644
--- a/net/qrtr/qrtr.c
+++ b/net/qrtr/qrtr.c
@@ -8,6 +8,7 @@
 #include <linux/qrtr.h>
 #include <linux/termios.h>	/* For TIOCINQ/OUTQ */
 #include <linux/numa.h>
+#include <linux/wait.h>
 
 #include <net/sock.h>
 
@@ -113,6 +114,8 @@ static DEFINE_MUTEX(qrtr_port_lock);
  * @ep: endpoint
  * @ref: reference count for node
  * @nid: node id
+ * @qrtr_tx_flow: tree of qrtr_tx_flow, keyed by node << 32 | port
+ * @qrtr_tx_lock: lock for qrtr_tx_flow inserts
  * @rx_queue: receive queue
  * @work: scheduled work struct for recv work
  * @item: list item for broadcast list
@@ -123,11 +126,29 @@ struct qrtr_node {
 	struct kref ref;
 	unsigned int nid;
 
+	struct radix_tree_root qrtr_tx_flow;
+	struct mutex qrtr_tx_lock; /* for qrtr_tx_flow */
+
 	struct sk_buff_head rx_queue;
 	struct work_struct work;
 	struct list_head item;
 };
 
+/**
+ * struct qrtr_tx_flow - tx flow control
+ * @resume_tx: waiters for a resume tx from the remote
+ * @pending: number of waiting senders
+ * @tx_failed: indicates that a message with confirm_rx flag was lost
+ */
+struct qrtr_tx_flow {
+	struct wait_queue_head resume_tx;
+	int pending;
+	int tx_failed;
+};
+
+#define QRTR_TX_FLOW_HIGH	10
+#define QRTR_TX_FLOW_LOW	5
+
 static int qrtr_local_enqueue(struct qrtr_node *node, struct sk_buff *skb,
 			      int type, struct sockaddr_qrtr *from,
 			      struct sockaddr_qrtr *to);
@@ -143,6 +164,8 @@ static int qrtr_bcast_enqueue(struct qrtr_node *node, struct sk_buff *skb,
 static void __qrtr_node_release(struct kref *kref)
 {
 	struct qrtr_node *node = container_of(kref, struct qrtr_node, ref);
+	struct radix_tree_iter iter;
+	void __rcu **slot;
 
 	if (node->nid != QRTR_EP_NID_AUTO)
 		radix_tree_delete(&qrtr_nodes, node->nid);
@@ -152,6 +175,12 @@ static void __qrtr_node_release(struct kref *kref)
 
 	cancel_work_sync(&node->work);
 	skb_queue_purge(&node->rx_queue);
+
+	/* Free tx flow counters */
+	radix_tree_for_each_slot(slot, &node->qrtr_tx_flow, &iter, 0) {
+		radix_tree_iter_delete(&node->qrtr_tx_flow, &iter, slot);
+		kfree(*slot);
+	}
 	kfree(node);
 }
 
@@ -171,6 +200,126 @@ static void qrtr_node_release(struct qrtr_node *node)
 	kref_put_mutex(&node->ref, __qrtr_node_release, &qrtr_node_lock);
 }
 
+/**
+ * qrtr_tx_resume() - reset flow control counter
+ * @node:	qrtr_node that the QRTR_TYPE_RESUME_TX packet arrived on
+ * @skb:	resume_tx packet
+ */
+static void qrtr_tx_resume(struct qrtr_node *node, struct sk_buff *skb)
+{
+	struct qrtr_ctrl_pkt *pkt = (struct qrtr_ctrl_pkt *)skb->data;
+	u64 remote_node = le32_to_cpu(pkt->client.node);
+	u32 remote_port = le32_to_cpu(pkt->client.port);
+	struct qrtr_tx_flow *flow;
+	unsigned long key;
+
+	key = remote_node << 32 | remote_port;
+
+	rcu_read_lock();
+	flow = radix_tree_lookup(&node->qrtr_tx_flow, key);
+	rcu_read_unlock();
+	if (flow) {
+		spin_lock(&flow->resume_tx.lock);
+		flow->pending = 0;
+		spin_unlock(&flow->resume_tx.lock);
+		wake_up_interruptible_all(&flow->resume_tx);
+	}
+
+	consume_skb(skb);
+}
+
+/**
+ * qrtr_tx_wait() - flow control for outgoing packets
+ * @node:	qrtr_node that the packet is to be send to
+ * @dest_node:	node id of the destination
+ * @dest_port:	port number of the destination
+ * @type:	type of message
+ *
+ * The flow control scheme is based around the low and high "watermarks". When
+ * the low watermark is passed the confirm_rx flag is set on the outgoing
+ * message, which will trigger the remote to send a control message of the type
+ * QRTR_TYPE_RESUME_TX to reset the counter. If the high watermark is hit
+ * further transmision should be paused.
+ *
+ * Return: 1 if confirm_rx should be set, 0 otherwise or errno failure
+ */
+static int qrtr_tx_wait(struct qrtr_node *node, int dest_node, int dest_port,
+			int type)
+{
+	unsigned long key = (u64)dest_node << 32 | dest_port;
+	struct qrtr_tx_flow *flow;
+	int confirm_rx = 0;
+	int ret;
+
+	/* Never set confirm_rx on non-data packets */
+	if (type != QRTR_TYPE_DATA)
+		return 0;
+
+	mutex_lock(&node->qrtr_tx_lock);
+	flow = radix_tree_lookup(&node->qrtr_tx_flow, key);
+	if (!flow) {
+		flow = kzalloc(sizeof(*flow), GFP_KERNEL);
+		if (flow) {
+			init_waitqueue_head(&flow->resume_tx);
+			radix_tree_insert(&node->qrtr_tx_flow, key, flow);
+		}
+	}
+	mutex_unlock(&node->qrtr_tx_lock);
+
+	/* Set confirm_rx if we where unable to find and allocate a flow */
+	if (!flow)
+		return 1;
+
+	spin_lock_irq(&flow->resume_tx.lock);
+	ret = wait_event_interruptible_locked_irq(flow->resume_tx,
+						  flow->pending < QRTR_TX_FLOW_HIGH ||
+						  flow->tx_failed ||
+						  !node->ep);
+	if (ret < 0) {
+		confirm_rx = ret;
+	} else if (!node->ep) {
+		confirm_rx = -EPIPE;
+	} else if (flow->tx_failed) {
+		flow->tx_failed = 0;
+		confirm_rx = 1;
+	} else {
+		flow->pending++;
+		confirm_rx = flow->pending == QRTR_TX_FLOW_LOW;
+	}
+	spin_unlock_irq(&flow->resume_tx.lock);
+
+	return confirm_rx;
+}
+
+/**
+ * qrtr_tx_flow_failed() - flag that tx of confirm_rx flagged messages failed
+ * @node:	qrtr_node that the packet is to be send to
+ * @dest_node:	node id of the destination
+ * @dest_port:	port number of the destination
+ *
+ * Signal that the transmission of a message with confirm_rx flag failed. The
+ * flow's "pending" counter will keep incrementing towards QRTR_TX_FLOW_HIGH,
+ * at which point transmission would stall forever waiting for the resume TX
+ * message associated with the dropped confirm_rx message.
+ * Work around this by marking the flow as having a failed transmission and
+ * cause the next transmission attempt to be sent with the confirm_rx.
+ */
+static void qrtr_tx_flow_failed(struct qrtr_node *node, int dest_node,
+				int dest_port)
+{
+	unsigned long key = (u64)dest_node << 32 | dest_port;
+	struct qrtr_tx_flow *flow;
+
+	rcu_read_lock();
+	flow = radix_tree_lookup(&node->qrtr_tx_flow, key);
+	rcu_read_unlock();
+	if (flow) {
+		spin_lock_irq(&flow->resume_tx.lock);
+		flow->tx_failed = 1;
+		spin_unlock_irq(&flow->resume_tx.lock);
+	}
+}
+
 /* Pass an outgoing packet socket buffer to the endpoint driver. */
 static int qrtr_node_enqueue(struct qrtr_node *node, struct sk_buff *skb,
 			     int type, struct sockaddr_qrtr *from,
@@ -179,6 +328,13 @@ static int qrtr_node_enqueue(struct qrtr_node *node, struct sk_buff *skb,
 	struct qrtr_hdr_v1 *hdr;
 	size_t len = skb->len;
 	int rc = -ENODEV;
+	int confirm_rx;
+
+	confirm_rx = qrtr_tx_wait(node, to->sq_node, to->sq_port, type);
+	if (confirm_rx < 0) {
+		kfree_skb(skb);
+		return confirm_rx;
+	}
 
 	hdr = skb_push(skb, sizeof(*hdr));
 	hdr->version = cpu_to_le32(QRTR_PROTO_VER_1);
@@ -194,7 +350,7 @@ static int qrtr_node_enqueue(struct qrtr_node *node, struct sk_buff *skb,
 	}
 
 	hdr->size = cpu_to_le32(len);
-	hdr->confirm_rx = 0;
+	hdr->confirm_rx = !!confirm_rx;
 
 	skb_put_padto(skb, ALIGN(len, 4));
 
@@ -205,6 +361,11 @@ static int qrtr_node_enqueue(struct qrtr_node *node, struct sk_buff *skb,
 		kfree_skb(skb);
 	mutex_unlock(&node->ep_lock);
 
+	/* Need to ensure that a subsequent message carries the otherwise lost
+	 * confirm_rx flag if we dropped this one */
+	if (rc && confirm_rx)
+		qrtr_tx_flow_failed(node, to->sq_node, to->sq_port);
+
 	return rc;
 }
 
@@ -311,7 +472,8 @@ int qrtr_endpoint_post(struct qrtr_endpoint *ep, const void *data, size_t len)
 	if (len != ALIGN(size, 4) + hdrlen)
 		goto err;
 
-	if (cb->dst_port != QRTR_PORT_CTRL && cb->type != QRTR_TYPE_DATA)
+	if (cb->dst_port != QRTR_PORT_CTRL && cb->type != QRTR_TYPE_DATA &&
+	    cb->type != QRTR_TYPE_RESUME_TX)
 		goto err;
 
 	skb_put_data(skb, data + hdrlen, size);
@@ -370,14 +532,18 @@ static void qrtr_node_rx_work(struct work_struct *work)
 
 		qrtr_node_assign(node, cb->src_node);
 
-		ipc = qrtr_port_lookup(cb->dst_port);
-		if (!ipc) {
-			kfree_skb(skb);
+		if (cb->type == QRTR_TYPE_RESUME_TX) {
+			qrtr_tx_resume(node, skb);
 		} else {
-			if (sock_queue_rcv_skb(&ipc->sk, skb))
+			ipc = qrtr_port_lookup(cb->dst_port);
+			if (!ipc) {
 				kfree_skb(skb);
+			} else {
+				if (sock_queue_rcv_skb(&ipc->sk, skb))
+					kfree_skb(skb);
 
-			qrtr_port_put(ipc);
+				qrtr_port_put(ipc);
+			}
 		}
 	}
 }
@@ -408,6 +574,9 @@ int qrtr_endpoint_register(struct qrtr_endpoint *ep, unsigned int nid)
 	node->nid = QRTR_EP_NID_AUTO;
 	node->ep = ep;
 
+	INIT_RADIX_TREE(&node->qrtr_tx_flow, GFP_KERNEL);
+	mutex_init(&node->qrtr_tx_lock);
+
 	qrtr_node_assign(node, nid);
 
 	mutex_lock(&qrtr_node_lock);
@@ -428,8 +597,11 @@ void qrtr_endpoint_unregister(struct qrtr_endpoint *ep)
 	struct qrtr_node *node = ep->node;
 	struct sockaddr_qrtr src = {AF_QIPCRTR, node->nid, QRTR_PORT_CTRL};
 	struct sockaddr_qrtr dst = {AF_QIPCRTR, qrtr_local_nid, QRTR_PORT_CTRL};
+	struct radix_tree_iter iter;
 	struct qrtr_ctrl_pkt *pkt;
+	struct qrtr_tx_flow *flow;
 	struct sk_buff *skb;
+	void __rcu **slot;
 
 	mutex_lock(&node->ep_lock);
 	node->ep = NULL;
@@ -442,6 +614,14 @@ void qrtr_endpoint_unregister(struct qrtr_endpoint *ep)
 		qrtr_local_enqueue(NULL, skb, QRTR_TYPE_BYE, &src, &dst);
 	}
 
+	/* Wake up any transmitters waiting for resume-tx from the node */
+	mutex_lock(&node->qrtr_tx_lock);
+	radix_tree_for_each_slot(slot, &node->qrtr_tx_flow, &iter, 0) {
+		flow = *slot;
+		wake_up_interruptible_all(&flow->resume_tx);
+	}
+	mutex_unlock(&node->qrtr_tx_lock);
+
 	qrtr_node_release(node);
 	ep->node = NULL;
 }
-- 
2.24.0


  parent reply	other threads:[~2020-01-14  7:58 UTC|newest]

Thread overview: 7+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-01-14  7:56 [PATCH v4 0/5] QRTR flow control improvements Bjorn Andersson
2020-01-14  7:56 ` [PATCH v4 1/5] net: qrtr: Move resume-tx transmission to recvmsg Bjorn Andersson
2020-01-14  7:57 ` Bjorn Andersson [this message]
2020-01-14  7:57 ` [PATCH v4 3/5] net: qrtr: Migrate node lookup tree to spinlock Bjorn Andersson
2020-01-14  7:57 ` [PATCH v4 4/5] net: qrtr: Make qrtr_port_lookup() use RCU Bjorn Andersson
2020-01-14  7:57 ` [PATCH v4 5/5] net: qrtr: Remove receive worker Bjorn Andersson
2020-01-15  2:37 ` [PATCH v4 0/5] QRTR flow control improvements David Miller

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20200114075703.2145718-3-bjorn.andersson@linaro.org \
    --to=bjorn.andersson@linaro.org \
    --cc=aneela@codeaurora.org \
    --cc=clew@codeaurora.org \
    --cc=davem@davemloft.net \
    --cc=linux-arm-msm@vger.kernel.org \
    --cc=linux-kernel@vger.kernel.org \
    --cc=netdev@vger.kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.