linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [RFC v2 0/5] virtio/vsock: introduce SOCK_DGRAM support
@ 2021-09-14  5:54 Jiang Wang
  2021-09-14  5:54 ` [RFC v2 1/5] virtio/vsock: add VIRTIO_VSOCK_F_DGRAM feature bit Jiang Wang
                   ` (4 more replies)
  0 siblings, 5 replies; 8+ messages in thread
From: Jiang Wang @ 2021-09-14  5:54 UTC (permalink / raw)
  To: jiangleetcode
  Cc: virtualization, stefanha, sgarzare, mst, arseny.krasnov, jhansen,
	cong.wang, duanxiongchun, xieyongji, chaiwen.cc, Jason Wang,
	David S. Miller, Jakub Kicinski, Steven Rostedt, Ingo Molnar,
	kvm, netdev, linux-kernel

This patchset implements support of SOCK_DGRAM for virtio
transport.

Datagram sockets are connectionless and unreliable. To avoid unfair contention
with stream and other sockets, add two more virtqueues and
a new feature bit to indicate if those two new queues exist or not.

Dgram does not use the existing credit update mechanism for
stream sockets. When sending from the guest/driver, sending packets
synchronously, so the sender will get an error when the virtqueue is full.
When sending from the host/device, send packets asynchronously
because the descriptor memory belongs to the corresponding QEMU
process.

The virtio spec patch is here:
https://www.mail-archive.com/virtualization@lists.linux-foundation.org/msg47457.html

For those who prefer git repo, here is the link for the linux kernel:
https://github.com/Jiang1155/linux/tree/vsock-dgram-v2

qemu patch link:
https://lists.gnu.org/archive/html/qemu-devel/2021-09/msg03462.html


To do:
1. use skb when receiving packets
2. support multiple transport
3. support mergeable rx buffer
4. support disabling F_STREAM feature bit


v1 -> v2 :
  - fix migration bug in vhost-vsock
  - rename some variables
  - clean up some code in virtio-vsock
  - use le_to_cpu16 in virtio-vsock



Jiang Wang (5):
  virtio/vsock: add VIRTIO_VSOCK_F_DGRAM feature bit
  virtio/vsock: add support for virtio datagram
  vhost/vsock: add support for vhost dgram.
  vsock_test: add tests for vsock dgram
  virtio/vsock: add sysfs for rx buf len for dgram

 drivers/vhost/vsock.c                         | 220 +++++++--
 include/linux/virtio_vsock.h                  |   9 +
 include/net/af_vsock.h                        |   1 +
 .../events/vsock_virtio_transport_common.h    |   2 +
 include/uapi/linux/virtio_vsock.h             |   3 +
 net/vmw_vsock/af_vsock.c                      |  12 +
 net/vmw_vsock/virtio_transport.c              | 463 +++++++++++++++---
 net/vmw_vsock/virtio_transport_common.c       | 181 ++++++-
 tools/testing/vsock/util.c                    | 105 ++++
 tools/testing/vsock/util.h                    |   4 +
 tools/testing/vsock/vsock_test.c              | 195 ++++++++
 11 files changed, 1083 insertions(+), 112 deletions(-)

-- 
2.20.1


^ permalink raw reply	[flat|nested] 8+ messages in thread

* [RFC v2 1/5] virtio/vsock: add VIRTIO_VSOCK_F_DGRAM feature bit
  2021-09-14  5:54 [RFC v2 0/5] virtio/vsock: introduce SOCK_DGRAM support Jiang Wang
@ 2021-09-14  5:54 ` Jiang Wang
  2021-09-14  5:54 ` [RFC v2 2/5] virtio/vsock: add support for virtio datagram Jiang Wang
                   ` (3 subsequent siblings)
  4 siblings, 0 replies; 8+ messages in thread
From: Jiang Wang @ 2021-09-14  5:54 UTC (permalink / raw)
  To: jiangleetcode
  Cc: virtualization, stefanha, sgarzare, mst, arseny.krasnov, jhansen,
	cong.wang, duanxiongchun, xieyongji, chaiwen.cc, Jason Wang,
	David S. Miller, Jakub Kicinski, Steven Rostedt, Ingo Molnar,
	kvm, netdev, linux-kernel

When this feature is enabled, allocate 5 queues,
otherwise, allocate 3 queues to be compatible with
old QEMU versions.

Signed-off-by: Jiang Wang <jiang.wang@bytedance.com>
---
 drivers/vhost/vsock.c             |  3 +-
 include/linux/virtio_vsock.h      |  9 ++++
 include/uapi/linux/virtio_vsock.h |  2 +
 net/vmw_vsock/virtio_transport.c  | 79 +++++++++++++++++++++++++++----
 4 files changed, 82 insertions(+), 11 deletions(-)

diff --git a/drivers/vhost/vsock.c b/drivers/vhost/vsock.c
index f249622ef11b..c79789af0365 100644
--- a/drivers/vhost/vsock.c
+++ b/drivers/vhost/vsock.c
@@ -32,7 +32,8 @@
 enum {
 	VHOST_VSOCK_FEATURES = VHOST_FEATURES |
 			       (1ULL << VIRTIO_F_ACCESS_PLATFORM) |
-			       (1ULL << VIRTIO_VSOCK_F_SEQPACKET)
+			       (1ULL << VIRTIO_VSOCK_F_SEQPACKET) |
+			       (1ULL << VIRTIO_VSOCK_F_DGRAM)
 };
 
 enum {
diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
index 35d7eedb5e8e..87d849aeb3ec 100644
--- a/include/linux/virtio_vsock.h
+++ b/include/linux/virtio_vsock.h
@@ -18,6 +18,15 @@ enum {
 	VSOCK_VQ_MAX    = 3,
 };
 
+enum {
+	VSOCK_VQ_STREAM_RX     = 0, /* for host to guest data */
+	VSOCK_VQ_STREAM_TX     = 1, /* for guest to host data */
+	VSOCK_VQ_DGRAM_RX       = 2,
+	VSOCK_VQ_DGRAM_TX       = 3,
+	VSOCK_VQ_EX_EVENT       = 4,
+	VSOCK_VQ_EX_MAX         = 5,
+};
+
 /* Per-socket state (accessed via vsk->trans) */
 struct virtio_vsock_sock {
 	struct vsock_sock *vsk;
diff --git a/include/uapi/linux/virtio_vsock.h b/include/uapi/linux/virtio_vsock.h
index 3dd3555b2740..cff54ba9b924 100644
--- a/include/uapi/linux/virtio_vsock.h
+++ b/include/uapi/linux/virtio_vsock.h
@@ -40,6 +40,8 @@
 
 /* The feature bitmap for virtio vsock */
 #define VIRTIO_VSOCK_F_SEQPACKET	1	/* SOCK_SEQPACKET supported */
+/* The feature bitmap for virtio net */
+#define VIRTIO_VSOCK_F_DGRAM	0	/* Host support dgram vsock */
 
 struct virtio_vsock_config {
 	__le64 guest_cid;
diff --git a/net/vmw_vsock/virtio_transport.c b/net/vmw_vsock/virtio_transport.c
index 4f7c99dfd16c..bb89f538f5f3 100644
--- a/net/vmw_vsock/virtio_transport.c
+++ b/net/vmw_vsock/virtio_transport.c
@@ -27,7 +27,8 @@ static DEFINE_MUTEX(the_virtio_vsock_mutex); /* protects the_virtio_vsock */
 
 struct virtio_vsock {
 	struct virtio_device *vdev;
-	struct virtqueue *vqs[VSOCK_VQ_MAX];
+	struct virtqueue **vqs;
+	bool has_dgram;
 
 	/* Virtqueue processing is deferred to a workqueue */
 	struct work_struct tx_work;
@@ -334,7 +335,10 @@ static int virtio_vsock_event_fill_one(struct virtio_vsock *vsock,
 	struct scatterlist sg;
 	struct virtqueue *vq;
 
-	vq = vsock->vqs[VSOCK_VQ_EVENT];
+	if (vsock->has_dgram)
+		vq = vsock->vqs[VSOCK_VQ_EX_EVENT];
+	else
+		vq = vsock->vqs[VSOCK_VQ_EVENT];
 
 	sg_init_one(&sg, event, sizeof(*event));
 
@@ -352,7 +356,10 @@ static void virtio_vsock_event_fill(struct virtio_vsock *vsock)
 		virtio_vsock_event_fill_one(vsock, event);
 	}
 
-	virtqueue_kick(vsock->vqs[VSOCK_VQ_EVENT]);
+	if (vsock->has_dgram)
+		virtqueue_kick(vsock->vqs[VSOCK_VQ_EX_EVENT]);
+	else
+		virtqueue_kick(vsock->vqs[VSOCK_VQ_EVENT]);
 }
 
 static void virtio_vsock_reset_sock(struct sock *sk)
@@ -395,7 +402,10 @@ static void virtio_transport_event_work(struct work_struct *work)
 		container_of(work, struct virtio_vsock, event_work);
 	struct virtqueue *vq;
 
-	vq = vsock->vqs[VSOCK_VQ_EVENT];
+	if (vsock->has_dgram)
+		vq = vsock->vqs[VSOCK_VQ_EX_EVENT];
+	else
+		vq = vsock->vqs[VSOCK_VQ_EVENT];
 
 	mutex_lock(&vsock->event_lock);
 
@@ -415,7 +425,10 @@ static void virtio_transport_event_work(struct work_struct *work)
 		}
 	} while (!virtqueue_enable_cb(vq));
 
-	virtqueue_kick(vsock->vqs[VSOCK_VQ_EVENT]);
+	if (vsock->has_dgram)
+		virtqueue_kick(vsock->vqs[VSOCK_VQ_EX_EVENT]);
+	else
+		virtqueue_kick(vsock->vqs[VSOCK_VQ_EVENT]);
 out:
 	mutex_unlock(&vsock->event_lock);
 }
@@ -438,6 +451,10 @@ static void virtio_vsock_tx_done(struct virtqueue *vq)
 	queue_work(virtio_vsock_workqueue, &vsock->tx_work);
 }
 
+static void virtio_vsock_dgram_tx_done(struct virtqueue *vq)
+{
+}
+
 static void virtio_vsock_rx_done(struct virtqueue *vq)
 {
 	struct virtio_vsock *vsock = vq->vdev->priv;
@@ -449,6 +466,10 @@ static void virtio_vsock_rx_done(struct virtqueue *vq)
 
 static bool virtio_transport_seqpacket_allow(u32 remote_cid);
 
+static void virtio_vsock_dgram_rx_done(struct virtqueue *vq)
+{
+}
+
 static struct virtio_transport virtio_transport = {
 	.transport = {
 		.module                   = THIS_MODULE,
@@ -571,13 +592,29 @@ static int virtio_vsock_probe(struct virtio_device *vdev)
 		virtio_vsock_tx_done,
 		virtio_vsock_event_done,
 	};
+	vq_callback_t *ex_callbacks[] = {
+		virtio_vsock_rx_done,
+		virtio_vsock_tx_done,
+		virtio_vsock_dgram_rx_done,
+		virtio_vsock_dgram_tx_done,
+		virtio_vsock_event_done,
+	};
+
 	static const char * const names[] = {
 		"rx",
 		"tx",
 		"event",
 	};
+	static const char * const ex_names[] = {
+		"rx",
+		"tx",
+		"dgram_rx",
+		"dgram_tx",
+		"event",
+	};
+
 	struct virtio_vsock *vsock = NULL;
-	int ret;
+	int ret, max_vq;
 
 	ret = mutex_lock_interruptible(&the_virtio_vsock_mutex);
 	if (ret)
@@ -598,9 +635,30 @@ static int virtio_vsock_probe(struct virtio_device *vdev)
 
 	vsock->vdev = vdev;
 
-	ret = virtio_find_vqs(vsock->vdev, VSOCK_VQ_MAX,
-			      vsock->vqs, callbacks, names,
-			      NULL);
+	if (virtio_has_feature(vdev, VIRTIO_VSOCK_F_DGRAM))
+		vsock->has_dgram = true;
+
+	if (vsock->has_dgram)
+		max_vq = VSOCK_VQ_EX_MAX;
+	else
+		max_vq = VSOCK_VQ_MAX;
+
+	vsock->vqs = kmalloc_array(max_vq, sizeof(struct virtqueue *), GFP_KERNEL);
+	if (!vsock->vqs) {
+		ret = -ENOMEM;
+		goto out;
+	}
+
+	if (vsock->has_dgram) {
+		ret = virtio_find_vqs(vsock->vdev, max_vq,
+				      vsock->vqs, ex_callbacks, ex_names,
+				      NULL);
+	} else {
+		ret = virtio_find_vqs(vsock->vdev, max_vq,
+				      vsock->vqs, callbacks, names,
+				      NULL);
+	}
+
 	if (ret < 0)
 		goto out;
 
@@ -725,7 +783,8 @@ static struct virtio_device_id id_table[] = {
 };
 
 static unsigned int features[] = {
-	VIRTIO_VSOCK_F_SEQPACKET
+	VIRTIO_VSOCK_F_SEQPACKET,
+	VIRTIO_VSOCK_F_DGRAM
 };
 
 static struct virtio_driver virtio_vsock_driver = {
-- 
2.20.1


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [RFC v2 2/5] virtio/vsock: add support for virtio datagram
  2021-09-14  5:54 [RFC v2 0/5] virtio/vsock: introduce SOCK_DGRAM support Jiang Wang
  2021-09-14  5:54 ` [RFC v2 1/5] virtio/vsock: add VIRTIO_VSOCK_F_DGRAM feature bit Jiang Wang
@ 2021-09-14  5:54 ` Jiang Wang
  2021-09-14  6:56   ` Michael S. Tsirkin
  2021-09-14  5:54 ` [RFC v2 3/5] vhost/vsock: add support for vhost dgram Jiang Wang
                   ` (2 subsequent siblings)
  4 siblings, 1 reply; 8+ messages in thread
From: Jiang Wang @ 2021-09-14  5:54 UTC (permalink / raw)
  To: jiangleetcode
  Cc: virtualization, stefanha, sgarzare, mst, arseny.krasnov, jhansen,
	cong.wang, duanxiongchun, xieyongji, chaiwen.cc, Jason Wang,
	David S. Miller, Jakub Kicinski, Steven Rostedt, Ingo Molnar,
	kvm, netdev, linux-kernel

This patch add support for virtio dgram for the driver.
Implemented related functions for tx and rx, enqueue
and dequeue. Send packets synchronously to give sender
indication when the virtqueue is full.
Refactored virtio_transport_send_pkt_work() a little bit but
no functions changes for it.

Support for the host/device side is in another
patch.

Signed-off-by: Jiang Wang <jiang.wang@bytedance.com>
---
 include/net/af_vsock.h                        |   1 +
 .../events/vsock_virtio_transport_common.h    |   2 +
 include/uapi/linux/virtio_vsock.h             |   1 +
 net/vmw_vsock/af_vsock.c                      |  12 +
 net/vmw_vsock/virtio_transport.c              | 344 +++++++++++++++---
 net/vmw_vsock/virtio_transport_common.c       | 181 ++++++++-
 6 files changed, 467 insertions(+), 74 deletions(-)

diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
index ab207677e0a8..58c46c694670 100644
--- a/include/net/af_vsock.h
+++ b/include/net/af_vsock.h
@@ -208,6 +208,7 @@ void vsock_remove_sock(struct vsock_sock *vsk);
 void vsock_for_each_connected_socket(void (*fn)(struct sock *sk));
 int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk);
 bool vsock_find_cid(unsigned int cid);
+int vsock_bind_stream(struct vsock_sock *vsk, struct sockaddr_vm *addr);
 
 /**** TAP ****/
 
diff --git a/include/trace/events/vsock_virtio_transport_common.h b/include/trace/events/vsock_virtio_transport_common.h
index d0b3f0ea9ba1..1d8647a6b476 100644
--- a/include/trace/events/vsock_virtio_transport_common.h
+++ b/include/trace/events/vsock_virtio_transport_common.h
@@ -10,10 +10,12 @@
 
 TRACE_DEFINE_ENUM(VIRTIO_VSOCK_TYPE_STREAM);
 TRACE_DEFINE_ENUM(VIRTIO_VSOCK_TYPE_SEQPACKET);
+TRACE_DEFINE_ENUM(VIRTIO_VSOCK_TYPE_DGRAM);
 
 #define show_type(val) \
 	__print_symbolic(val, \
 			 { VIRTIO_VSOCK_TYPE_STREAM, "STREAM" }, \
+			 { VIRTIO_VSOCK_TYPE_DGRAM, "DGRAM" }, \
 			 { VIRTIO_VSOCK_TYPE_SEQPACKET, "SEQPACKET" })
 
 TRACE_DEFINE_ENUM(VIRTIO_VSOCK_OP_INVALID);
diff --git a/include/uapi/linux/virtio_vsock.h b/include/uapi/linux/virtio_vsock.h
index cff54ba9b924..3e93b75f2707 100644
--- a/include/uapi/linux/virtio_vsock.h
+++ b/include/uapi/linux/virtio_vsock.h
@@ -71,6 +71,7 @@ struct virtio_vsock_hdr {
 enum virtio_vsock_type {
 	VIRTIO_VSOCK_TYPE_STREAM = 1,
 	VIRTIO_VSOCK_TYPE_SEQPACKET = 2,
+	VIRTIO_VSOCK_TYPE_DGRAM = 3,
 };
 
 enum virtio_vsock_op {
diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index 3e02cc3b24f8..adf11db32506 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -669,6 +669,18 @@ static int __vsock_bind_connectible(struct vsock_sock *vsk,
 	return 0;
 }
 
+int vsock_bind_stream(struct vsock_sock *vsk,
+		      struct sockaddr_vm *addr)
+{
+	int retval;
+
+	spin_lock_bh(&vsock_table_lock);
+	retval = __vsock_bind_connectible(vsk, addr);
+	spin_unlock_bh(&vsock_table_lock);
+	return retval;
+}
+EXPORT_SYMBOL(vsock_bind_stream);
+
 static int __vsock_bind_dgram(struct vsock_sock *vsk,
 			      struct sockaddr_vm *addr)
 {
diff --git a/net/vmw_vsock/virtio_transport.c b/net/vmw_vsock/virtio_transport.c
index bb89f538f5f3..8d5bfcd79555 100644
--- a/net/vmw_vsock/virtio_transport.c
+++ b/net/vmw_vsock/virtio_transport.c
@@ -20,21 +20,29 @@
 #include <net/sock.h>
 #include <linux/mutex.h>
 #include <net/af_vsock.h>
+#include<linux/kobject.h>
+#include<linux/sysfs.h>
+#include <linux/refcount.h>
 
 static struct workqueue_struct *virtio_vsock_workqueue;
 static struct virtio_vsock __rcu *the_virtio_vsock;
+static struct virtio_vsock *the_virtio_vsock_dgram;
 static DEFINE_MUTEX(the_virtio_vsock_mutex); /* protects the_virtio_vsock */
 
 struct virtio_vsock {
 	struct virtio_device *vdev;
 	struct virtqueue **vqs;
 	bool has_dgram;
+	refcount_t active;
 
 	/* Virtqueue processing is deferred to a workqueue */
 	struct work_struct tx_work;
 	struct work_struct rx_work;
 	struct work_struct event_work;
 
+	struct work_struct dgram_tx_work;
+	struct work_struct dgram_rx_work;
+
 	/* The following fields are protected by tx_lock.  vqs[VSOCK_VQ_TX]
 	 * must be accessed with tx_lock held.
 	 */
@@ -55,6 +63,22 @@ struct virtio_vsock {
 	int rx_buf_nr;
 	int rx_buf_max_nr;
 
+	/* The following fields are protected by dgram_tx_lock.  vqs[VSOCK_VQ_DGRAM_TX]
+	 * must be accessed with dgram_tx_lock held.
+	 */
+	struct mutex dgram_tx_lock;
+	bool dgram_tx_run;
+
+	atomic_t dgram_queued_replies;
+
+	/* The following fields are protected by dgram_rx_lock.  vqs[VSOCK_VQ_DGRAM_RX]
+	 * must be accessed with dgram_rx_lock held.
+	 */
+	struct mutex dgram_rx_lock;
+	bool dgram_rx_run;
+	int dgram_rx_buf_nr;
+	int dgram_rx_buf_max_nr;
+
 	/* The following fields are protected by event_lock.
 	 * vqs[VSOCK_VQ_EVENT] must be accessed with event_lock held.
 	 */
@@ -84,21 +108,12 @@ static u32 virtio_transport_get_local_cid(void)
 	return ret;
 }
 
-static void
-virtio_transport_send_pkt_work(struct work_struct *work)
+static void virtio_transport_do_send_pkt(struct virtio_vsock *vsock,
+					 struct virtqueue *vq,  spinlock_t *lock,
+					 struct list_head *send_pkt_list,
+					 bool *restart_rx)
 {
-	struct virtio_vsock *vsock =
-		container_of(work, struct virtio_vsock, send_pkt_work);
-	struct virtqueue *vq;
 	bool added = false;
-	bool restart_rx = false;
-
-	mutex_lock(&vsock->tx_lock);
-
-	if (!vsock->tx_run)
-		goto out;
-
-	vq = vsock->vqs[VSOCK_VQ_TX];
 
 	for (;;) {
 		struct virtio_vsock_pkt *pkt;
@@ -106,16 +121,16 @@ virtio_transport_send_pkt_work(struct work_struct *work)
 		int ret, in_sg = 0, out_sg = 0;
 		bool reply;
 
-		spin_lock_bh(&vsock->send_pkt_list_lock);
-		if (list_empty(&vsock->send_pkt_list)) {
-			spin_unlock_bh(&vsock->send_pkt_list_lock);
+		spin_lock_bh(lock);
+		if (list_empty(send_pkt_list)) {
+			spin_unlock_bh(lock);
 			break;
 		}
 
-		pkt = list_first_entry(&vsock->send_pkt_list,
+		pkt = list_first_entry(send_pkt_list,
 				       struct virtio_vsock_pkt, list);
 		list_del_init(&pkt->list);
-		spin_unlock_bh(&vsock->send_pkt_list_lock);
+		spin_unlock_bh(lock);
 
 		virtio_transport_deliver_tap_pkt(pkt);
 
@@ -133,9 +148,9 @@ virtio_transport_send_pkt_work(struct work_struct *work)
 		 * the vq
 		 */
 		if (ret < 0) {
-			spin_lock_bh(&vsock->send_pkt_list_lock);
-			list_add(&pkt->list, &vsock->send_pkt_list);
-			spin_unlock_bh(&vsock->send_pkt_list_lock);
+			spin_lock_bh(lock);
+			list_add(&pkt->list, send_pkt_list);
+			spin_unlock_bh(lock);
 			break;
 		}
 
@@ -147,7 +162,7 @@ virtio_transport_send_pkt_work(struct work_struct *work)
 
 			/* Do we now have resources to resume rx processing? */
 			if (val + 1 == virtqueue_get_vring_size(rx_vq))
-				restart_rx = true;
+				*restart_rx = true;
 		}
 
 		added = true;
@@ -155,7 +170,55 @@ virtio_transport_send_pkt_work(struct work_struct *work)
 
 	if (added)
 		virtqueue_kick(vq);
+}
+
+static int virtio_transport_do_send_dgram_pkt(struct virtio_vsock *vsock,
+					      struct virtqueue *vq,
+					      struct virtio_vsock_pkt *pkt)
+{
+	struct scatterlist hdr, buf, *sgs[2];
+	int ret, in_sg = 0, out_sg = 0;
+
+	virtio_transport_deliver_tap_pkt(pkt);
+
+	sg_init_one(&hdr, &pkt->hdr, sizeof(pkt->hdr));
+	sgs[out_sg++] = &hdr;
+	if (pkt->buf) {
+		sg_init_one(&buf, pkt->buf, pkt->len);
+		sgs[out_sg++] = &buf;
+	}
+
+	ret = virtqueue_add_sgs(vq, sgs, out_sg, in_sg, pkt, GFP_KERNEL);
+	/* Usually this means that there is no more space available in
+	 * the vq
+	 */
+	if (ret < 0) {
+		virtio_transport_free_pkt(pkt);
+		return -ENOMEM;
+	}
+
+	virtqueue_kick(vq);
+
+	return pkt->len;
+}
+
+static void
+virtio_transport_send_pkt_work(struct work_struct *work)
+{
+	struct virtio_vsock *vsock =
+		container_of(work, struct virtio_vsock, send_pkt_work);
+	struct virtqueue *vq;
+	bool restart_rx = false;
 
+	mutex_lock(&vsock->tx_lock);
+
+	if (!vsock->tx_run)
+		goto out;
+
+	vq = vsock->vqs[VSOCK_VQ_TX];
+
+	virtio_transport_do_send_pkt(vsock, vq, &vsock->send_pkt_list_lock,
+				     &vsock->send_pkt_list, &restart_rx);
 out:
 	mutex_unlock(&vsock->tx_lock);
 
@@ -163,12 +226,65 @@ virtio_transport_send_pkt_work(struct work_struct *work)
 		queue_work(virtio_vsock_workqueue, &vsock->rx_work);
 }
 
+static int
+virtio_transport_send_dgram_pkt(struct virtio_vsock_pkt *pkt)
+{
+	struct virtio_vsock *vsock;
+	int len = pkt->len;
+	struct virtqueue *vq;
+
+	vsock = the_virtio_vsock_dgram;
+
+	if (!vsock) {
+		virtio_transport_free_pkt(pkt);
+		return -ENODEV;
+	}
+
+	if (!vsock->dgram_tx_run) {
+		virtio_transport_free_pkt(pkt);
+		return -ENODEV;
+	}
+
+	if (!refcount_inc_not_zero(&vsock->active)) {
+		virtio_transport_free_pkt(pkt);
+		return -ENODEV;
+	}
+
+	if (le64_to_cpu(pkt->hdr.dst_cid) == vsock->guest_cid) {
+		virtio_transport_free_pkt(pkt);
+		len = -ENODEV;
+		goto out_ref;
+	}
+
+	/* send the pkt */
+	mutex_lock(&vsock->dgram_tx_lock);
+
+	if (!vsock->dgram_tx_run)
+		goto out_mutex;
+
+	vq = vsock->vqs[VSOCK_VQ_DGRAM_TX];
+
+	len = virtio_transport_do_send_dgram_pkt(vsock, vq, pkt);
+
+out_mutex:
+	mutex_unlock(&vsock->dgram_tx_lock);
+
+out_ref:
+	if (!refcount_dec_not_one(&vsock->active))
+		return -EFAULT;
+
+	return len;
+}
+
 static int
 virtio_transport_send_pkt(struct virtio_vsock_pkt *pkt)
 {
 	struct virtio_vsock *vsock;
 	int len = pkt->len;
 
+	if (pkt->hdr.type == VIRTIO_VSOCK_TYPE_DGRAM)
+		return virtio_transport_send_dgram_pkt(pkt);
+
 	rcu_read_lock();
 	vsock = rcu_dereference(the_virtio_vsock);
 	if (!vsock) {
@@ -244,7 +360,7 @@ virtio_transport_cancel_pkt(struct vsock_sock *vsk)
 	return ret;
 }
 
-static void virtio_vsock_rx_fill(struct virtio_vsock *vsock)
+static void virtio_vsock_rx_fill(struct virtio_vsock *vsock, bool is_dgram)
 {
 	int buf_len = VIRTIO_VSOCK_DEFAULT_RX_BUF_SIZE;
 	struct virtio_vsock_pkt *pkt;
@@ -252,7 +368,10 @@ static void virtio_vsock_rx_fill(struct virtio_vsock *vsock)
 	struct virtqueue *vq;
 	int ret;
 
-	vq = vsock->vqs[VSOCK_VQ_RX];
+	if (is_dgram)
+		vq = vsock->vqs[VSOCK_VQ_DGRAM_RX];
+	else
+		vq = vsock->vqs[VSOCK_VQ_RX];
 
 	do {
 		pkt = kzalloc(sizeof(*pkt), GFP_KERNEL);
@@ -278,26 +397,26 @@ static void virtio_vsock_rx_fill(struct virtio_vsock *vsock)
 			virtio_transport_free_pkt(pkt);
 			break;
 		}
-		vsock->rx_buf_nr++;
+		if (is_dgram)
+			vsock->dgram_rx_buf_nr++;
+		else
+			vsock->rx_buf_nr++;
 	} while (vq->num_free);
-	if (vsock->rx_buf_nr > vsock->rx_buf_max_nr)
-		vsock->rx_buf_max_nr = vsock->rx_buf_nr;
+	if (is_dgram) {
+		if (vsock->dgram_rx_buf_nr > vsock->dgram_rx_buf_max_nr)
+			vsock->dgram_rx_buf_max_nr = vsock->dgram_rx_buf_nr;
+	} else {
+		if (vsock->rx_buf_nr > vsock->rx_buf_max_nr)
+			vsock->rx_buf_max_nr = vsock->rx_buf_nr;
+	}
+
 	virtqueue_kick(vq);
 }
 
-static void virtio_transport_tx_work(struct work_struct *work)
+static bool virtio_transport_free_pkt_batch(struct virtqueue *vq)
 {
-	struct virtio_vsock *vsock =
-		container_of(work, struct virtio_vsock, tx_work);
-	struct virtqueue *vq;
 	bool added = false;
 
-	vq = vsock->vqs[VSOCK_VQ_TX];
-	mutex_lock(&vsock->tx_lock);
-
-	if (!vsock->tx_run)
-		goto out;
-
 	do {
 		struct virtio_vsock_pkt *pkt;
 		unsigned int len;
@@ -309,13 +428,43 @@ static void virtio_transport_tx_work(struct work_struct *work)
 		}
 	} while (!virtqueue_enable_cb(vq));
 
-out:
+	return added;
+}
+
+static void virtio_transport_tx_work(struct work_struct *work)
+{
+	struct virtio_vsock *vsock =
+		container_of(work, struct virtio_vsock, tx_work);
+	struct virtqueue *vq;
+	bool added = false;
+
+	vq = vsock->vqs[VSOCK_VQ_TX];
+	mutex_lock(&vsock->tx_lock);
+
+	if (vsock->tx_run)
+		added = virtio_transport_free_pkt_batch(vq);
+
 	mutex_unlock(&vsock->tx_lock);
 
 	if (added)
 		queue_work(virtio_vsock_workqueue, &vsock->send_pkt_work);
 }
 
+static void virtio_transport_dgram_tx_work(struct work_struct *work)
+{
+	struct virtio_vsock *vsock =
+		container_of(work, struct virtio_vsock, dgram_tx_work);
+	struct virtqueue *vq;
+
+	vq = vsock->vqs[VSOCK_VQ_DGRAM_TX];
+	mutex_lock(&vsock->dgram_tx_lock);
+
+	if (vsock->dgram_tx_run)
+		virtio_transport_free_pkt_batch(vq);
+
+	mutex_unlock(&vsock->dgram_tx_lock);
+}
+
 /* Is there space left for replies to rx packets? */
 static bool virtio_transport_more_replies(struct virtio_vsock *vsock)
 {
@@ -453,6 +602,11 @@ static void virtio_vsock_tx_done(struct virtqueue *vq)
 
 static void virtio_vsock_dgram_tx_done(struct virtqueue *vq)
 {
+	struct virtio_vsock *vsock = vq->vdev->priv;
+
+	if (!vsock)
+		return;
+	queue_work(virtio_vsock_workqueue, &vsock->dgram_tx_work);
 }
 
 static void virtio_vsock_rx_done(struct virtqueue *vq)
@@ -468,8 +622,12 @@ static bool virtio_transport_seqpacket_allow(u32 remote_cid);
 
 static void virtio_vsock_dgram_rx_done(struct virtqueue *vq)
 {
-}
+	struct virtio_vsock *vsock = vq->vdev->priv;
 
+	if (!vsock)
+		return;
+	queue_work(virtio_vsock_workqueue, &vsock->dgram_rx_work);
+}
 static struct virtio_transport virtio_transport = {
 	.transport = {
 		.module                   = THIS_MODULE,
@@ -532,19 +690,9 @@ static bool virtio_transport_seqpacket_allow(u32 remote_cid)
 	return seqpacket_allow;
 }
 
-static void virtio_transport_rx_work(struct work_struct *work)
+static void virtio_transport_do_rx_work(struct virtio_vsock *vsock,
+					struct virtqueue *vq, bool is_dgram)
 {
-	struct virtio_vsock *vsock =
-		container_of(work, struct virtio_vsock, rx_work);
-	struct virtqueue *vq;
-
-	vq = vsock->vqs[VSOCK_VQ_RX];
-
-	mutex_lock(&vsock->rx_lock);
-
-	if (!vsock->rx_run)
-		goto out;
-
 	do {
 		virtqueue_disable_cb(vq);
 		for (;;) {
@@ -564,7 +712,10 @@ static void virtio_transport_rx_work(struct work_struct *work)
 				break;
 			}
 
-			vsock->rx_buf_nr--;
+			if (is_dgram)
+				vsock->dgram_rx_buf_nr--;
+			else
+				vsock->rx_buf_nr--;
 
 			/* Drop short/long packets */
 			if (unlikely(len < sizeof(pkt->hdr) ||
@@ -580,11 +731,45 @@ static void virtio_transport_rx_work(struct work_struct *work)
 	} while (!virtqueue_enable_cb(vq));
 
 out:
+	return;
+}
+
+static void virtio_transport_rx_work(struct work_struct *work)
+{
+	struct virtio_vsock *vsock =
+		container_of(work, struct virtio_vsock, rx_work);
+	struct virtqueue *vq;
+
+	vq = vsock->vqs[VSOCK_VQ_RX];
+
+	mutex_lock(&vsock->rx_lock);
+
+	if (vsock->rx_run)
+		virtio_transport_do_rx_work(vsock, vq, false);
+
 	if (vsock->rx_buf_nr < vsock->rx_buf_max_nr / 2)
-		virtio_vsock_rx_fill(vsock);
+		virtio_vsock_rx_fill(vsock, false);
 	mutex_unlock(&vsock->rx_lock);
 }
 
+static void virtio_transport_dgram_rx_work(struct work_struct *work)
+{
+	struct virtio_vsock *vsock =
+		container_of(work, struct virtio_vsock, dgram_rx_work);
+	struct virtqueue *vq;
+
+	vq = vsock->vqs[VSOCK_VQ_DGRAM_RX];
+
+	mutex_lock(&vsock->dgram_rx_lock);
+
+	if (vsock->dgram_rx_run)
+		virtio_transport_do_rx_work(vsock, vq, true);
+
+	if (vsock->dgram_rx_buf_nr < vsock->dgram_rx_buf_max_nr / 2)
+		virtio_vsock_rx_fill(vsock, true);
+	mutex_unlock(&vsock->dgram_rx_lock);
+}
+
 static int virtio_vsock_probe(struct virtio_device *vdev)
 {
 	vq_callback_t *callbacks[] = {
@@ -592,7 +777,7 @@ static int virtio_vsock_probe(struct virtio_device *vdev)
 		virtio_vsock_tx_done,
 		virtio_vsock_event_done,
 	};
-	vq_callback_t *ex_callbacks[] = {
+	vq_callback_t *dgram_callbacks[] = {
 		virtio_vsock_rx_done,
 		virtio_vsock_tx_done,
 		virtio_vsock_dgram_rx_done,
@@ -651,7 +836,7 @@ static int virtio_vsock_probe(struct virtio_device *vdev)
 
 	if (vsock->has_dgram) {
 		ret = virtio_find_vqs(vsock->vdev, max_vq,
-				      vsock->vqs, ex_callbacks, ex_names,
+				      vsock->vqs, dgram_callbacks, ex_names,
 				      NULL);
 	} else {
 		ret = virtio_find_vqs(vsock->vdev, max_vq,
@@ -668,8 +853,14 @@ static int virtio_vsock_probe(struct virtio_device *vdev)
 	vsock->rx_buf_max_nr = 0;
 	atomic_set(&vsock->queued_replies, 0);
 
+	vsock->dgram_rx_buf_nr = 0;
+	vsock->dgram_rx_buf_max_nr = 0;
+	atomic_set(&vsock->dgram_queued_replies, 0);
+
 	mutex_init(&vsock->tx_lock);
 	mutex_init(&vsock->rx_lock);
+	mutex_init(&vsock->dgram_tx_lock);
+	mutex_init(&vsock->dgram_rx_lock);
 	mutex_init(&vsock->event_lock);
 	spin_lock_init(&vsock->send_pkt_list_lock);
 	INIT_LIST_HEAD(&vsock->send_pkt_list);
@@ -677,16 +868,27 @@ static int virtio_vsock_probe(struct virtio_device *vdev)
 	INIT_WORK(&vsock->tx_work, virtio_transport_tx_work);
 	INIT_WORK(&vsock->event_work, virtio_transport_event_work);
 	INIT_WORK(&vsock->send_pkt_work, virtio_transport_send_pkt_work);
+	INIT_WORK(&vsock->dgram_rx_work, virtio_transport_dgram_rx_work);
+	INIT_WORK(&vsock->dgram_tx_work, virtio_transport_dgram_tx_work);
 
 	mutex_lock(&vsock->tx_lock);
 	vsock->tx_run = true;
 	mutex_unlock(&vsock->tx_lock);
 
+	mutex_lock(&vsock->dgram_tx_lock);
+	vsock->dgram_tx_run = true;
+	mutex_unlock(&vsock->dgram_tx_lock);
+
 	mutex_lock(&vsock->rx_lock);
-	virtio_vsock_rx_fill(vsock);
+	virtio_vsock_rx_fill(vsock, false);
 	vsock->rx_run = true;
 	mutex_unlock(&vsock->rx_lock);
 
+	mutex_lock(&vsock->dgram_rx_lock);
+	virtio_vsock_rx_fill(vsock, true);
+	vsock->dgram_rx_run = true;
+	mutex_unlock(&vsock->dgram_rx_lock);
+
 	mutex_lock(&vsock->event_lock);
 	virtio_vsock_event_fill(vsock);
 	vsock->event_run = true;
@@ -698,6 +900,9 @@ static int virtio_vsock_probe(struct virtio_device *vdev)
 	vdev->priv = vsock;
 	rcu_assign_pointer(the_virtio_vsock, vsock);
 
+	the_virtio_vsock_dgram = vsock;
+	refcount_set(&the_virtio_vsock_dgram->active, 1);
+
 	mutex_unlock(&the_virtio_vsock_mutex);
 
 	return 0;
@@ -729,14 +934,27 @@ static void virtio_vsock_remove(struct virtio_device *vdev)
 	vsock->rx_run = false;
 	mutex_unlock(&vsock->rx_lock);
 
+	mutex_lock(&vsock->dgram_rx_lock);
+	vsock->dgram_rx_run = false;
+	mutex_unlock(&vsock->dgram_rx_lock);
+
 	mutex_lock(&vsock->tx_lock);
 	vsock->tx_run = false;
 	mutex_unlock(&vsock->tx_lock);
 
+	mutex_lock(&vsock->dgram_tx_lock);
+	vsock->dgram_tx_run = false;
+	mutex_unlock(&vsock->dgram_tx_lock);
+
 	mutex_lock(&vsock->event_lock);
 	vsock->event_run = false;
 	mutex_unlock(&vsock->event_lock);
 
+	while (!refcount_dec_if_one(&the_virtio_vsock_dgram->active)) {
+		if (signal_pending(current))
+			break;
+	}
+
 	/* Flush all device writes and interrupts, device will not use any
 	 * more buffers.
 	 */
@@ -747,11 +965,21 @@ static void virtio_vsock_remove(struct virtio_device *vdev)
 		virtio_transport_free_pkt(pkt);
 	mutex_unlock(&vsock->rx_lock);
 
+	mutex_lock(&vsock->dgram_rx_lock);
+	while ((pkt = virtqueue_detach_unused_buf(vsock->vqs[VSOCK_VQ_DGRAM_RX])))
+		virtio_transport_free_pkt(pkt);
+	mutex_unlock(&vsock->dgram_rx_lock);
+
 	mutex_lock(&vsock->tx_lock);
 	while ((pkt = virtqueue_detach_unused_buf(vsock->vqs[VSOCK_VQ_TX])))
 		virtio_transport_free_pkt(pkt);
 	mutex_unlock(&vsock->tx_lock);
 
+	mutex_lock(&vsock->dgram_tx_lock);
+	while ((pkt = virtqueue_detach_unused_buf(vsock->vqs[VSOCK_VQ_DGRAM_TX])))
+		virtio_transport_free_pkt(pkt);
+	mutex_unlock(&vsock->dgram_tx_lock);
+
 	spin_lock_bh(&vsock->send_pkt_list_lock);
 	while (!list_empty(&vsock->send_pkt_list)) {
 		pkt = list_first_entry(&vsock->send_pkt_list,
@@ -769,6 +997,8 @@ static void virtio_vsock_remove(struct virtio_device *vdev)
 	 */
 	flush_work(&vsock->rx_work);
 	flush_work(&vsock->tx_work);
+	flush_work(&vsock->dgram_rx_work);
+	flush_work(&vsock->dgram_tx_work);
 	flush_work(&vsock->event_work);
 	flush_work(&vsock->send_pkt_work);
 
@@ -806,7 +1036,7 @@ static int __init virtio_vsock_init(void)
 		return -ENOMEM;
 
 	ret = vsock_core_register(&virtio_transport.transport,
-				  VSOCK_TRANSPORT_F_G2H);
+				  VSOCK_TRANSPORT_F_G2H | VSOCK_TRANSPORT_F_DGRAM);
 	if (ret)
 		goto out_wq;
 
diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index 081e7ae93cb1..034de35fe7c8 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -26,6 +26,8 @@
 /* Threshold for detecting small packets to copy */
 #define GOOD_COPY_LEN  128
 
+static s64 virtio_transport_dgram_has_data(struct vsock_sock *vsk);
+
 static const struct virtio_transport *
 virtio_transport_get_ops(struct vsock_sock *vsk)
 {
@@ -210,21 +212,28 @@ static int virtio_transport_send_pkt_info(struct vsock_sock *vsk,
 	vvs = vsk->trans;
 
 	/* we can send less than pkt_len bytes */
-	if (pkt_len > VIRTIO_VSOCK_MAX_PKT_BUF_SIZE)
-		pkt_len = VIRTIO_VSOCK_MAX_PKT_BUF_SIZE;
+	if (pkt_len > VIRTIO_VSOCK_MAX_PKT_BUF_SIZE) {
+		if (info->type == VIRTIO_VSOCK_TYPE_STREAM)
+			pkt_len = VIRTIO_VSOCK_MAX_PKT_BUF_SIZE;
+		else
+			return 0;
+	}
 
-	/* virtio_transport_get_credit might return less than pkt_len credit */
-	pkt_len = virtio_transport_get_credit(vvs, pkt_len);
+	if (info->type == VIRTIO_VSOCK_TYPE_STREAM) {
+		/* virtio_transport_get_credit might return less than pkt_len credit */
+		pkt_len = virtio_transport_get_credit(vvs, pkt_len);
 
-	/* Do not send zero length OP_RW pkt */
-	if (pkt_len == 0 && info->op == VIRTIO_VSOCK_OP_RW)
-		return pkt_len;
+		/* Do not send zero length OP_RW pkt */
+		if (pkt_len == 0 && info->op == VIRTIO_VSOCK_OP_RW)
+			return pkt_len;
+	}
 
 	pkt = virtio_transport_alloc_pkt(info, pkt_len,
 					 src_cid, src_port,
 					 dst_cid, dst_port);
 	if (!pkt) {
-		virtio_transport_put_credit(vvs, pkt_len);
+		if (info->type == VIRTIO_VSOCK_TYPE_STREAM)
+			virtio_transport_put_credit(vvs, pkt_len);
 		return -ENOMEM;
 	}
 
@@ -474,6 +483,55 @@ static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
 	return dequeued_len;
 }
 
+static ssize_t
+virtio_transport_dgram_do_dequeue(struct vsock_sock *vsk,
+				  struct msghdr *msg, size_t len)
+{
+	struct virtio_vsock_sock *vvs = vsk->trans;
+	struct virtio_vsock_pkt *pkt;
+	size_t total = 0;
+	int err = -EFAULT;
+
+	spin_lock_bh(&vvs->rx_lock);
+	if (total < len && !list_empty(&vvs->rx_queue)) {
+		pkt = list_first_entry(&vvs->rx_queue,
+				       struct virtio_vsock_pkt, list);
+
+		total = len;
+		if (total > pkt->len - pkt->off)
+			total = pkt->len - pkt->off;
+		else if (total < pkt->len - pkt->off)
+			msg->msg_flags |= MSG_TRUNC;
+
+		/* sk_lock is held by caller so no one else can dequeue.
+		 * Unlock rx_lock since memcpy_to_msg() may sleep.
+		 */
+		spin_unlock_bh(&vvs->rx_lock);
+
+		err = memcpy_to_msg(msg, pkt->buf + pkt->off, total);
+		if (err)
+			return err;
+
+		spin_lock_bh(&vvs->rx_lock);
+
+		virtio_transport_dec_rx_pkt(vvs, pkt);
+		list_del(&pkt->list);
+		virtio_transport_free_pkt(pkt);
+	}
+
+	spin_unlock_bh(&vvs->rx_lock);
+
+	if (total > 0 && msg->msg_name) {
+		/* Provide the address of the sender. */
+		DECLARE_SOCKADDR(struct sockaddr_vm *, vm_addr, msg->msg_name);
+
+		vsock_addr_init(vm_addr, le64_to_cpu(pkt->hdr.src_cid),
+				le32_to_cpu(pkt->hdr.src_port));
+		msg->msg_namelen = sizeof(*vm_addr);
+	}
+	return total;
+}
+
 ssize_t
 virtio_transport_stream_dequeue(struct vsock_sock *vsk,
 				struct msghdr *msg,
@@ -523,7 +581,66 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
 			       struct msghdr *msg,
 			       size_t len, int flags)
 {
-	return -EOPNOTSUPP;
+	struct sock *sk;
+	size_t err = 0;
+	long timeout;
+
+	DEFINE_WAIT(wait);
+
+	sk = &vsk->sk;
+	err = 0;
+
+	lock_sock(sk);
+
+	if (flags & MSG_OOB || flags & MSG_ERRQUEUE || flags & MSG_PEEK)
+		return -EOPNOTSUPP;
+
+	if (!len)
+		goto out;
+
+	timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
+
+	while (1) {
+		s64 ready;
+
+		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
+		ready = virtio_transport_dgram_has_data(vsk);
+
+		if (ready == 0) {
+			if (timeout == 0) {
+				err = -EAGAIN;
+				finish_wait(sk_sleep(sk), &wait);
+				break;
+			}
+
+			release_sock(sk);
+			timeout = schedule_timeout(timeout);
+			lock_sock(sk);
+
+			if (signal_pending(current)) {
+				err = sock_intr_errno(timeout);
+				finish_wait(sk_sleep(sk), &wait);
+				break;
+			} else if (timeout == 0) {
+				err = -EAGAIN;
+				finish_wait(sk_sleep(sk), &wait);
+				break;
+			}
+		} else {
+			finish_wait(sk_sleep(sk), &wait);
+
+			if (ready < 0) {
+				err = -ENOMEM;
+				goto out;
+			}
+
+			err = virtio_transport_dgram_do_dequeue(vsk, msg, len);
+			break;
+		}
+	}
+out:
+	release_sock(sk);
+	return err;
 }
 EXPORT_SYMBOL_GPL(virtio_transport_dgram_dequeue);
 
@@ -553,6 +670,11 @@ u32 virtio_transport_seqpacket_has_data(struct vsock_sock *vsk)
 }
 EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_has_data);
 
+static s64 virtio_transport_dgram_has_data(struct vsock_sock *vsk)
+{
+	return virtio_transport_stream_has_data(vsk);
+}
+
 static s64 virtio_transport_has_space(struct vsock_sock *vsk)
 {
 	struct virtio_vsock_sock *vvs = vsk->trans;
@@ -731,13 +853,15 @@ EXPORT_SYMBOL_GPL(virtio_transport_stream_allow);
 int virtio_transport_dgram_bind(struct vsock_sock *vsk,
 				struct sockaddr_vm *addr)
 {
-	return -EOPNOTSUPP;
+	//use same stream bind for dgram
+	int ret = vsock_bind_stream(vsk, addr);
+	return ret;
 }
 EXPORT_SYMBOL_GPL(virtio_transport_dgram_bind);
 
 bool virtio_transport_dgram_allow(u32 cid, u32 port)
 {
-	return false;
+	return true;
 }
 EXPORT_SYMBOL_GPL(virtio_transport_dgram_allow);
 
@@ -773,7 +897,17 @@ virtio_transport_dgram_enqueue(struct vsock_sock *vsk,
 			       struct msghdr *msg,
 			       size_t dgram_len)
 {
-	return -EOPNOTSUPP;
+	struct virtio_vsock_pkt_info info = {
+		.op = VIRTIO_VSOCK_OP_RW,
+		.type = VIRTIO_VSOCK_TYPE_DGRAM,
+		.msg = msg,
+		.pkt_len = dgram_len,
+		.vsk = vsk,
+		.remote_cid = remote_addr->svm_cid,
+		.remote_port = remote_addr->svm_port,
+	};
+
+	return virtio_transport_send_pkt_info(vsk, &info);
 }
 EXPORT_SYMBOL_GPL(virtio_transport_dgram_enqueue);
 
@@ -846,7 +980,6 @@ static int virtio_transport_reset_no_sock(const struct virtio_transport *t,
 		virtio_transport_free_pkt(reply);
 		return -ENOTCONN;
 	}
-
 	return t->send_pkt(reply);
 }
 
@@ -1049,7 +1182,8 @@ virtio_transport_recv_enqueue(struct vsock_sock *vsk,
 		 * of a new record.
 		 */
 		if ((pkt->len <= last_pkt->buf_len - last_pkt->len) &&
-		    !(le32_to_cpu(last_pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR)) {
+		    !(le32_to_cpu(last_pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR) &&
+		    (le32_to_cpu(pkt->hdr.type) != VIRTIO_VSOCK_TYPE_DGRAM)) {
 			memcpy(last_pkt->buf + last_pkt->len, pkt->buf,
 			       pkt->len);
 			last_pkt->len += pkt->len;
@@ -1074,6 +1208,12 @@ virtio_transport_recv_connected(struct sock *sk,
 	struct vsock_sock *vsk = vsock_sk(sk);
 	int err = 0;
 
+	if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_DGRAM) {
+		virtio_transport_recv_enqueue(vsk, pkt);
+		sk->sk_data_ready(sk);
+		return err;
+	}
+
 	switch (le16_to_cpu(pkt->hdr.op)) {
 	case VIRTIO_VSOCK_OP_RW:
 		virtio_transport_recv_enqueue(vsk, pkt);
@@ -1226,7 +1366,8 @@ virtio_transport_recv_listen(struct sock *sk, struct virtio_vsock_pkt *pkt,
 static bool virtio_transport_valid_type(u16 type)
 {
 	return (type == VIRTIO_VSOCK_TYPE_STREAM) ||
-	       (type == VIRTIO_VSOCK_TYPE_SEQPACKET);
+	       (type == VIRTIO_VSOCK_TYPE_SEQPACKET) ||
+	       (type == VIRTIO_VSOCK_TYPE_DGRAM);
 }
 
 /* We are under the virtio-vsock's vsock->rx_lock or vhost-vsock's vq->mutex
@@ -1289,11 +1430,16 @@ void virtio_transport_recv_pkt(struct virtio_transport *t,
 		goto free_pkt;
 	}
 
-	space_available = virtio_transport_space_update(sk, pkt);
-
 	/* Update CID in case it has changed after a transport reset event */
 	vsk->local_addr.svm_cid = dst.svm_cid;
 
+	if (sk->sk_type == SOCK_DGRAM) {
+		virtio_transport_recv_connected(sk, pkt);
+		goto out;
+	}
+
+	space_available = virtio_transport_space_update(sk, pkt);
+
 	if (space_available)
 		sk->sk_write_space(sk);
 
@@ -1319,6 +1465,7 @@ void virtio_transport_recv_pkt(struct virtio_transport *t,
 		break;
 	}
 
+out:
 	release_sock(sk);
 
 	/* Release refcnt obtained when we fetched this socket out of the
-- 
2.20.1


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [RFC v2 3/5] vhost/vsock: add support for vhost dgram.
  2021-09-14  5:54 [RFC v2 0/5] virtio/vsock: introduce SOCK_DGRAM support Jiang Wang
  2021-09-14  5:54 ` [RFC v2 1/5] virtio/vsock: add VIRTIO_VSOCK_F_DGRAM feature bit Jiang Wang
  2021-09-14  5:54 ` [RFC v2 2/5] virtio/vsock: add support for virtio datagram Jiang Wang
@ 2021-09-14  5:54 ` Jiang Wang
  2021-09-14  6:52   ` Michael S. Tsirkin
  2021-09-14  5:54 ` [RFC v2 4/5] vsock_test: add tests for vsock dgram Jiang Wang
  2021-09-14  5:54 ` [RFC v2 5/5] virtio/vsock: add sysfs for rx buf len for dgram Jiang Wang
  4 siblings, 1 reply; 8+ messages in thread
From: Jiang Wang @ 2021-09-14  5:54 UTC (permalink / raw)
  To: jiangleetcode
  Cc: virtualization, stefanha, sgarzare, mst, arseny.krasnov, jhansen,
	cong.wang, duanxiongchun, xieyongji, chaiwen.cc, Jason Wang,
	David S. Miller, Jakub Kicinski, Steven Rostedt, Ingo Molnar,
	kvm, netdev, linux-kernel

This patch supports dgram on vhost side, including
tx and rx. The vhost send packets asynchronously.

Also, ignore vq errors when vq number is larger than 2,
so it will be comptaible with old versions.

Signed-off-by: Jiang Wang <jiang.wang@bytedance.com>
---
 drivers/vhost/vsock.c | 217 ++++++++++++++++++++++++++++++++++++------
 1 file changed, 189 insertions(+), 28 deletions(-)

diff --git a/drivers/vhost/vsock.c b/drivers/vhost/vsock.c
index c79789af0365..a8755cbebd40 100644
--- a/drivers/vhost/vsock.c
+++ b/drivers/vhost/vsock.c
@@ -28,7 +28,10 @@
  * small pkts.
  */
 #define VHOST_VSOCK_PKT_WEIGHT 256
+#define VHOST_VSOCK_DGRM_MAX_PENDING_PKT 128
 
+/* Max wait time in busy poll in microseconds */
+#define VHOST_VSOCK_BUSY_POLL_TIMEOUT 20
 enum {
 	VHOST_VSOCK_FEATURES = VHOST_FEATURES |
 			       (1ULL << VIRTIO_F_ACCESS_PLATFORM) |
@@ -46,7 +49,7 @@ static DEFINE_READ_MOSTLY_HASHTABLE(vhost_vsock_hash, 8);
 
 struct vhost_vsock {
 	struct vhost_dev dev;
-	struct vhost_virtqueue vqs[2];
+	struct vhost_virtqueue vqs[4];
 
 	/* Link to global vhost_vsock_hash, writes use vhost_vsock_mutex */
 	struct hlist_node hash;
@@ -55,6 +58,11 @@ struct vhost_vsock {
 	spinlock_t send_pkt_list_lock;
 	struct list_head send_pkt_list;	/* host->guest pending packets */
 
+	spinlock_t dgram_send_pkt_list_lock;
+	struct list_head dgram_send_pkt_list;	/* host->guest pending packets */
+	struct vhost_work dgram_send_pkt_work;
+	int  dgram_used; /*pending packets to be send */
+
 	atomic_t queued_replies;
 
 	u32 guest_cid;
@@ -92,10 +100,22 @@ static void
 vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
 			    struct vhost_virtqueue *vq)
 {
-	struct vhost_virtqueue *tx_vq = &vsock->vqs[VSOCK_VQ_TX];
+	struct vhost_virtqueue *tx_vq;
 	int pkts = 0, total_len = 0;
 	bool added = false;
 	bool restart_tx = false;
+	spinlock_t *lock;
+	struct list_head *send_pkt_list;
+
+	if (vq == &vsock->vqs[VSOCK_VQ_RX]) {
+		tx_vq = &vsock->vqs[VSOCK_VQ_TX];
+		lock = &vsock->send_pkt_list_lock;
+		send_pkt_list = &vsock->send_pkt_list;
+	} else {
+		tx_vq = &vsock->vqs[VSOCK_VQ_DGRAM_TX];
+		lock = &vsock->dgram_send_pkt_list_lock;
+		send_pkt_list = &vsock->dgram_send_pkt_list;
+	}
 
 	mutex_lock(&vq->mutex);
 
@@ -116,36 +136,48 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
 		size_t iov_len, payload_len;
 		int head;
 		bool restore_flag = false;
+		bool is_dgram = false;
 
-		spin_lock_bh(&vsock->send_pkt_list_lock);
-		if (list_empty(&vsock->send_pkt_list)) {
-			spin_unlock_bh(&vsock->send_pkt_list_lock);
+		spin_lock_bh(lock);
+		if (list_empty(send_pkt_list)) {
+			spin_unlock_bh(lock);
 			vhost_enable_notify(&vsock->dev, vq);
 			break;
 		}
 
-		pkt = list_first_entry(&vsock->send_pkt_list,
+		pkt = list_first_entry(send_pkt_list,
 				       struct virtio_vsock_pkt, list);
 		list_del_init(&pkt->list);
-		spin_unlock_bh(&vsock->send_pkt_list_lock);
+		spin_unlock_bh(lock);
+
+		if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_DGRAM)
+			is_dgram = true;
 
 		head = vhost_get_vq_desc(vq, vq->iov, ARRAY_SIZE(vq->iov),
 					 &out, &in, NULL, NULL);
 		if (head < 0) {
-			spin_lock_bh(&vsock->send_pkt_list_lock);
-			list_add(&pkt->list, &vsock->send_pkt_list);
-			spin_unlock_bh(&vsock->send_pkt_list_lock);
+			spin_lock_bh(lock);
+			list_add(&pkt->list, send_pkt_list);
+			spin_unlock_bh(lock);
 			break;
 		}
 
 		if (head == vq->num) {
-			spin_lock_bh(&vsock->send_pkt_list_lock);
-			list_add(&pkt->list, &vsock->send_pkt_list);
-			spin_unlock_bh(&vsock->send_pkt_list_lock);
+			if (is_dgram) {
+				virtio_transport_free_pkt(pkt);
+				vq_err(vq, "Dgram virtqueue is full!");
+				spin_lock_bh(lock);
+				vsock->dgram_used--;
+				spin_unlock_bh(lock);
+				break;
+			}
+			spin_lock_bh(lock);
+			list_add(&pkt->list, send_pkt_list);
+			spin_unlock_bh(lock);
 
 			/* We cannot finish yet if more buffers snuck in while
-			 * re-enabling notify.
-			 */
+			* re-enabling notify.
+			*/
 			if (unlikely(vhost_enable_notify(&vsock->dev, vq))) {
 				vhost_disable_notify(&vsock->dev, vq);
 				continue;
@@ -156,6 +188,12 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
 		if (out) {
 			virtio_transport_free_pkt(pkt);
 			vq_err(vq, "Expected 0 output buffers, got %u\n", out);
+			if (is_dgram) {
+				spin_lock_bh(lock);
+				vsock->dgram_used--;
+				spin_unlock_bh(lock);
+			}
+
 			break;
 		}
 
@@ -163,6 +201,18 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
 		if (iov_len < sizeof(pkt->hdr)) {
 			virtio_transport_free_pkt(pkt);
 			vq_err(vq, "Buffer len [%zu] too small\n", iov_len);
+			if (is_dgram) {
+				spin_lock_bh(lock);
+				vsock->dgram_used--;
+				spin_unlock_bh(lock);
+			}
+			break;
+		}
+
+		if (iov_len < pkt->len - pkt->off &&
+			vq == &vsock->vqs[VSOCK_VQ_DGRAM_RX]) {
+			virtio_transport_free_pkt(pkt);
+			vq_err(vq, "Buffer len [%zu] too small for dgram\n", iov_len);
 			break;
 		}
 
@@ -199,6 +249,11 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
 		if (nbytes != sizeof(pkt->hdr)) {
 			virtio_transport_free_pkt(pkt);
 			vq_err(vq, "Faulted on copying pkt hdr\n");
+			if (is_dgram) {
+				spin_lock_bh(lock);
+				vsock->dgram_used--;
+				spin_unlock_bh(lock);
+			}
 			break;
 		}
 
@@ -224,19 +279,19 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
 		/* If we didn't send all the payload we can requeue the packet
 		 * to send it with the next available buffer.
 		 */
-		if (pkt->off < pkt->len) {
+		if ((pkt->off < pkt->len)
+			&& (vq == &vsock->vqs[VSOCK_VQ_RX])) {
 			if (restore_flag)
 				pkt->hdr.flags |= cpu_to_le32(VIRTIO_VSOCK_SEQ_EOR);
-
 			/* We are queueing the same virtio_vsock_pkt to handle
 			 * the remaining bytes, and we want to deliver it
 			 * to monitoring devices in the next iteration.
 			 */
 			pkt->tap_delivered = false;
 
-			spin_lock_bh(&vsock->send_pkt_list_lock);
-			list_add(&pkt->list, &vsock->send_pkt_list);
-			spin_unlock_bh(&vsock->send_pkt_list_lock);
+			spin_lock_bh(lock);
+			list_add(&pkt->list, send_pkt_list);
+			spin_unlock_bh(lock);
 		} else {
 			if (pkt->reply) {
 				int val;
@@ -251,6 +306,11 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
 			}
 
 			virtio_transport_free_pkt(pkt);
+			if (is_dgram) {
+				spin_lock_bh(lock);
+				vsock->dgram_used--;
+				spin_unlock_bh(lock);
+			}
 		}
 	} while(likely(!vhost_exceeds_weight(vq, ++pkts, total_len)));
 	if (added)
@@ -274,11 +334,25 @@ static void vhost_transport_send_pkt_work(struct vhost_work *work)
 	vhost_transport_do_send_pkt(vsock, vq);
 }
 
+static void vhost_transport_dgram_send_pkt_work(struct vhost_work *work)
+{
+	struct vhost_virtqueue *vq;
+	struct vhost_vsock *vsock;
+
+	vsock = container_of(work, struct vhost_vsock, dgram_send_pkt_work);
+	vq = &vsock->vqs[VSOCK_VQ_DGRAM_RX];
+
+	vhost_transport_do_send_pkt(vsock, vq);
+}
+
 static int
 vhost_transport_send_pkt(struct virtio_vsock_pkt *pkt)
 {
 	struct vhost_vsock *vsock;
 	int len = pkt->len;
+	spinlock_t *lock;
+	struct list_head *send_pkt_list;
+	struct vhost_work *work;
 
 	rcu_read_lock();
 
@@ -290,14 +364,39 @@ vhost_transport_send_pkt(struct virtio_vsock_pkt *pkt)
 		return -ENODEV;
 	}
 
+	if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_STREAM ||
+	     le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_SEQPACKET) {
+		lock = &vsock->send_pkt_list_lock;
+		send_pkt_list = &vsock->send_pkt_list;
+		work = &vsock->send_pkt_work;
+	} else if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_DGRAM) {
+		lock = &vsock->dgram_send_pkt_list_lock;
+		send_pkt_list = &vsock->dgram_send_pkt_list;
+		work = &vsock->dgram_send_pkt_work;
+	} else {
+		rcu_read_unlock();
+		virtio_transport_free_pkt(pkt);
+		return -EINVAL;
+	}
+
+
 	if (pkt->reply)
 		atomic_inc(&vsock->queued_replies);
 
-	spin_lock_bh(&vsock->send_pkt_list_lock);
-	list_add_tail(&pkt->list, &vsock->send_pkt_list);
-	spin_unlock_bh(&vsock->send_pkt_list_lock);
+	spin_lock_bh(lock);
+	if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_DGRAM) {
+		if (vsock->dgram_used  == VHOST_VSOCK_DGRM_MAX_PENDING_PKT)
+			len = -ENOMEM;
+		else {
+			vsock->dgram_used++;
+			list_add_tail(&pkt->list, send_pkt_list);
+		}
+	} else
+		list_add_tail(&pkt->list, send_pkt_list);
 
-	vhost_work_queue(&vsock->dev, &vsock->send_pkt_work);
+	spin_unlock_bh(lock);
+
+	vhost_work_queue(&vsock->dev, work);
 
 	rcu_read_unlock();
 	return len;
@@ -487,6 +586,18 @@ static bool vhost_transport_seqpacket_allow(u32 remote_cid)
 	return seqpacket_allow;
 }
 
+static inline unsigned long busy_clock(void)
+{
+	return local_clock() >> 10;
+}
+
+static bool vhost_can_busy_poll(unsigned long endtime)
+{
+	return likely(!need_resched() && !time_after(busy_clock(), endtime) &&
+		      !signal_pending(current));
+}
+
+
 static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
 {
 	struct vhost_virtqueue *vq = container_of(work, struct vhost_virtqueue,
@@ -497,6 +608,8 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
 	int head, pkts = 0, total_len = 0;
 	unsigned int out, in;
 	bool added = false;
+	unsigned long busyloop_timeout = VHOST_VSOCK_BUSY_POLL_TIMEOUT;
+	unsigned long endtime;
 
 	mutex_lock(&vq->mutex);
 
@@ -506,11 +619,14 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
 	if (!vq_meta_prefetch(vq))
 		goto out;
 
+	endtime = busy_clock() + busyloop_timeout;
 	vhost_disable_notify(&vsock->dev, vq);
+	preempt_disable();
 	do {
 		u32 len;
 
-		if (!vhost_vsock_more_replies(vsock)) {
+		if (vq == &vsock->vqs[VSOCK_VQ_TX]
+			&& !vhost_vsock_more_replies(vsock)) {
 			/* Stop tx until the device processes already
 			 * pending replies.  Leave tx virtqueue
 			 * callbacks disabled.
@@ -524,6 +640,11 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
 			break;
 
 		if (head == vq->num) {
+			if (vhost_can_busy_poll(endtime)) {
+				cpu_relax();
+				continue;
+			}
+
 			if (unlikely(vhost_enable_notify(&vsock->dev, vq))) {
 				vhost_disable_notify(&vsock->dev, vq);
 				continue;
@@ -555,6 +676,7 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
 		total_len += len;
 		added = true;
 	} while(likely(!vhost_exceeds_weight(vq, ++pkts, total_len)));
+	preempt_enable();
 
 no_more_replies:
 	if (added)
@@ -593,14 +715,30 @@ static int vhost_vsock_start(struct vhost_vsock *vsock)
 
 		if (!vhost_vq_access_ok(vq)) {
 			ret = -EFAULT;
+			/* when running with old guest and qemu, vq 2 may
+			 * not exist, so return 0 in this case.
+			 */
+			if (i == 2) {
+				mutex_unlock(&vq->mutex);
+				break;
+			}
 			goto err_vq;
 		}
 
 		if (!vhost_vq_get_backend(vq)) {
 			vhost_vq_set_backend(vq, vsock);
 			ret = vhost_vq_init_access(vq);
-			if (ret)
-				goto err_vq;
+			if (ret) {
+				mutex_unlock(&vq->mutex);
+				/* when running with old guest and qemu, vq 2 may
+				 * not exist, so return 0 in this case.
+				 */
+				if (i == 2) {
+					mutex_unlock(&vq->mutex);
+					break;
+				}
+				goto err;
+			}
 		}
 
 		mutex_unlock(&vq->mutex);
@@ -610,6 +748,7 @@ static int vhost_vsock_start(struct vhost_vsock *vsock)
 	 * let's kick the send worker to send them.
 	 */
 	vhost_work_queue(&vsock->dev, &vsock->send_pkt_work);
+	vhost_work_queue(&vsock->dev, &vsock->dgram_send_pkt_work);
 
 	mutex_unlock(&vsock->dev.mutex);
 	return 0;
@@ -684,8 +823,14 @@ static int vhost_vsock_dev_open(struct inode *inode, struct file *file)
 
 	vqs[VSOCK_VQ_TX] = &vsock->vqs[VSOCK_VQ_TX];
 	vqs[VSOCK_VQ_RX] = &vsock->vqs[VSOCK_VQ_RX];
+	vqs[VSOCK_VQ_DGRAM_TX] = &vsock->vqs[VSOCK_VQ_DGRAM_TX];
+	vqs[VSOCK_VQ_DGRAM_RX] = &vsock->vqs[VSOCK_VQ_DGRAM_RX];
 	vsock->vqs[VSOCK_VQ_TX].handle_kick = vhost_vsock_handle_tx_kick;
 	vsock->vqs[VSOCK_VQ_RX].handle_kick = vhost_vsock_handle_rx_kick;
+	vsock->vqs[VSOCK_VQ_DGRAM_TX].handle_kick =
+						vhost_vsock_handle_tx_kick;
+	vsock->vqs[VSOCK_VQ_DGRAM_RX].handle_kick =
+						vhost_vsock_handle_rx_kick;
 
 	vhost_dev_init(&vsock->dev, vqs, ARRAY_SIZE(vsock->vqs),
 		       UIO_MAXIOV, VHOST_VSOCK_PKT_WEIGHT,
@@ -695,6 +840,11 @@ static int vhost_vsock_dev_open(struct inode *inode, struct file *file)
 	spin_lock_init(&vsock->send_pkt_list_lock);
 	INIT_LIST_HEAD(&vsock->send_pkt_list);
 	vhost_work_init(&vsock->send_pkt_work, vhost_transport_send_pkt_work);
+	spin_lock_init(&vsock->dgram_send_pkt_list_lock);
+	INIT_LIST_HEAD(&vsock->dgram_send_pkt_list);
+	vhost_work_init(&vsock->dgram_send_pkt_work,
+			vhost_transport_dgram_send_pkt_work);
+
 	return 0;
 
 out:
@@ -769,6 +919,17 @@ static int vhost_vsock_dev_release(struct inode *inode, struct file *file)
 	}
 	spin_unlock_bh(&vsock->send_pkt_list_lock);
 
+	spin_lock_bh(&vsock->dgram_send_pkt_list_lock);
+	while (!list_empty(&vsock->dgram_send_pkt_list)) {
+		struct virtio_vsock_pkt *pkt;
+
+		pkt = list_first_entry(&vsock->dgram_send_pkt_list,
+				struct virtio_vsock_pkt, list);
+		list_del_init(&pkt->list);
+		virtio_transport_free_pkt(pkt);
+	}
+	spin_unlock_bh(&vsock->dgram_send_pkt_list_lock);
+
 	vhost_dev_cleanup(&vsock->dev);
 	kfree(vsock->dev.vqs);
 	vhost_vsock_free(vsock);
@@ -954,7 +1115,7 @@ static int __init vhost_vsock_init(void)
 	int ret;
 
 	ret = vsock_core_register(&vhost_transport.transport,
-				  VSOCK_TRANSPORT_F_H2G);
+				  VSOCK_TRANSPORT_F_H2G | VSOCK_TRANSPORT_F_DGRAM);
 	if (ret < 0)
 		return ret;
 	return misc_register(&vhost_vsock_misc);
-- 
2.20.1


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [RFC v2 4/5] vsock_test: add tests for vsock dgram
  2021-09-14  5:54 [RFC v2 0/5] virtio/vsock: introduce SOCK_DGRAM support Jiang Wang
                   ` (2 preceding siblings ...)
  2021-09-14  5:54 ` [RFC v2 3/5] vhost/vsock: add support for vhost dgram Jiang Wang
@ 2021-09-14  5:54 ` Jiang Wang
  2021-09-14  5:54 ` [RFC v2 5/5] virtio/vsock: add sysfs for rx buf len for dgram Jiang Wang
  4 siblings, 0 replies; 8+ messages in thread
From: Jiang Wang @ 2021-09-14  5:54 UTC (permalink / raw)
  To: jiangleetcode
  Cc: virtualization, stefanha, sgarzare, mst, arseny.krasnov, jhansen,
	cong.wang, duanxiongchun, xieyongji, chaiwen.cc, Jason Wang,
	David S. Miller, Jakub Kicinski, Steven Rostedt, Ingo Molnar,
	kvm, netdev, linux-kernel

Added test cases for vsock dgram types.

Signed-off-by: Jiang Wang <jiang.wang@bytedance.com>
---
 tools/testing/vsock/util.c       | 105 +++++++++++++++++
 tools/testing/vsock/util.h       |   4 +
 tools/testing/vsock/vsock_test.c | 195 +++++++++++++++++++++++++++++++
 3 files changed, 304 insertions(+)

diff --git a/tools/testing/vsock/util.c b/tools/testing/vsock/util.c
index 2acbb7703c6a..d2f5b223bf85 100644
--- a/tools/testing/vsock/util.c
+++ b/tools/testing/vsock/util.c
@@ -260,6 +260,57 @@ void send_byte(int fd, int expected_ret, int flags)
 	}
 }
 
+/* Transmit one byte and check the return value.
+ *
+ * expected_ret:
+ *  <0 Negative errno (for testing errors)
+ *   0 End-of-file
+ *   1 Success
+ */
+void sendto_byte(int fd, const struct sockaddr *dest_addr, int len, int expected_ret,
+				int flags)
+{
+	const uint8_t byte = 'A';
+	ssize_t nwritten;
+
+	timeout_begin(TIMEOUT);
+	do {
+		nwritten = sendto(fd, &byte, sizeof(byte), flags, dest_addr,
+						len);
+		timeout_check("write");
+	} while (nwritten < 0 && errno == EINTR);
+	timeout_end();
+
+	if (expected_ret < 0) {
+		if (nwritten != -1) {
+			fprintf(stderr, "bogus sendto(2) return value %zd\n",
+				nwritten);
+			exit(EXIT_FAILURE);
+		}
+		if (errno != -expected_ret) {
+			perror("write");
+			exit(EXIT_FAILURE);
+		}
+		return;
+	}
+
+	if (nwritten < 0) {
+		perror("write");
+		exit(EXIT_FAILURE);
+	}
+	if (nwritten == 0) {
+		if (expected_ret == 0)
+			return;
+
+		fprintf(stderr, "unexpected EOF while sending byte\n");
+		exit(EXIT_FAILURE);
+	}
+	if (nwritten != sizeof(byte)) {
+		fprintf(stderr, "bogus sendto(2) return value %zd\n", nwritten);
+		exit(EXIT_FAILURE);
+	}
+}
+
 /* Receive one byte and check the return value.
  *
  * expected_ret:
@@ -313,6 +364,60 @@ void recv_byte(int fd, int expected_ret, int flags)
 	}
 }
 
+/* Receive one byte and check the return value.
+ *
+ * expected_ret:
+ *  <0 Negative errno (for testing errors)
+ *   0 End-of-file
+ *   1 Success
+ */
+void recvfrom_byte(int fd, struct sockaddr *src_addr, socklen_t *addrlen,
+				int expected_ret, int flags)
+{
+	uint8_t byte;
+	ssize_t nread;
+
+	timeout_begin(TIMEOUT);
+	do {
+		nread = recvfrom(fd, &byte, sizeof(byte), flags, src_addr, addrlen);
+		timeout_check("read");
+	} while (nread < 0 && errno == EINTR);
+	timeout_end();
+
+	if (expected_ret < 0) {
+		if (nread != -1) {
+			fprintf(stderr, "bogus recvfrom(2) return value %zd\n",
+				nread);
+			exit(EXIT_FAILURE);
+		}
+		if (errno != -expected_ret) {
+			perror("read");
+			exit(EXIT_FAILURE);
+		}
+		return;
+	}
+
+	if (nread < 0) {
+		perror("read");
+		exit(EXIT_FAILURE);
+	}
+	if (nread == 0) {
+		if (expected_ret == 0)
+			return;
+
+		fprintf(stderr, "unexpected EOF while receiving byte\n");
+		exit(EXIT_FAILURE);
+	}
+	if (nread != sizeof(byte)) {
+		fprintf(stderr, "bogus recvfrom(2) return value %zd\n", nread);
+		exit(EXIT_FAILURE);
+	}
+	if (byte != 'A') {
+		fprintf(stderr, "unexpected byte read %c\n", byte);
+		exit(EXIT_FAILURE);
+	}
+}
+
 /* Run test cases.  The program terminates if a failure occurs. */
 void run_tests(const struct test_case *test_cases,
 	       const struct test_opts *opts)
diff --git a/tools/testing/vsock/util.h b/tools/testing/vsock/util.h
index a3375ad2fb7f..7213f2a51c1e 100644
--- a/tools/testing/vsock/util.h
+++ b/tools/testing/vsock/util.h
@@ -43,7 +43,11 @@ int vsock_seqpacket_accept(unsigned int cid, unsigned int port,
 			   struct sockaddr_vm *clientaddrp);
 void vsock_wait_remote_close(int fd);
 void send_byte(int fd, int expected_ret, int flags);
+void sendto_byte(int fd, const struct sockaddr *dest_addr, int len, int expected_ret,
+				int flags);
 void recv_byte(int fd, int expected_ret, int flags);
+void recvfrom_byte(int fd, struct sockaddr *src_addr, socklen_t *addrlen,
+				int expected_ret, int flags);
 void run_tests(const struct test_case *test_cases,
 	       const struct test_opts *opts);
 void list_tests(const struct test_case *test_cases);
diff --git a/tools/testing/vsock/vsock_test.c b/tools/testing/vsock/vsock_test.c
index 67766bfe176f..fa2605c9d58c 100644
--- a/tools/testing/vsock/vsock_test.c
+++ b/tools/testing/vsock/vsock_test.c
@@ -199,6 +199,115 @@ static void test_stream_server_close_server(const struct test_opts *opts)
 	close(fd);
 }
 
+static void test_dgram_sendto_client(const struct test_opts *opts)
+{
+	union {
+		struct sockaddr sa;
+		struct sockaddr_vm svm;
+	} addr = {
+		.svm = {
+			.svm_family = AF_VSOCK,
+			.svm_port = 1234,
+			.svm_cid = opts->peer_cid,
+		},
+	};
+	int fd;
+
+	/* Wait for the server to be ready */
+	control_expectln("BIND");
+
+	fd = socket(AF_VSOCK, SOCK_DGRAM, 0);
+	if (fd < 0) {
+		perror("socket");
+		exit(EXIT_FAILURE);
+	}
+
+	sendto_byte(fd, &addr.sa, sizeof(addr.svm), 1, 0);
+
+	/* Notify the server that the client has finished */
+	control_writeln("DONE");
+
+	close(fd);
+}
+
+static void test_dgram_sendto_server(const struct test_opts *opts)
+{
+	union {
+		struct sockaddr sa;
+		struct sockaddr_vm svm;
+	} addr = {
+		.svm = {
+			.svm_family = AF_VSOCK,
+			.svm_port = 1234,
+			.svm_cid = VMADDR_CID_ANY,
+		},
+	};
+	int fd;
+	int len = sizeof(addr.sa);
+
+	fd = socket(AF_VSOCK, SOCK_DGRAM, 0);
+
+	if (bind(fd, &addr.sa, sizeof(addr.svm)) < 0) {
+		perror("bind");
+		exit(EXIT_FAILURE);
+	}
+
+	/* Notify the client that the server is ready */
+	control_writeln("BIND");
+
+	recvfrom_byte(fd, &addr.sa, &len, 1, 0);
+	printf("got message from cid:%d, port %u ", addr.svm.svm_cid,
+			addr.svm.svm_port);
+
+	/* Wait for the client to finish */
+	control_expectln("DONE");
+
+	close(fd);
+}
+
+static void test_dgram_connect_client(const struct test_opts *opts)
+{
+	union {
+		struct sockaddr sa;
+		struct sockaddr_vm svm;
+	} addr = {
+		.svm = {
+			.svm_family = AF_VSOCK,
+			.svm_port = 1234,
+			.svm_cid = opts->peer_cid,
+		},
+	};
+	int fd;
+	int ret;
+
+	/* Wait for the server to be ready */
+	control_expectln("BIND");
+
+	fd = socket(AF_VSOCK, SOCK_DGRAM, 0);
+	if (fd < 0) {
+		perror("bind");
+		exit(EXIT_FAILURE);
+	}
+
+	ret = connect(fd, &addr.sa, sizeof(addr.svm));
+	if (ret < 0) {
+		perror("connect");
+		exit(EXIT_FAILURE);
+	}
+
+	send_byte(fd, 1, 0);
+
+	/* Notify the server that the client has finished */
+	control_writeln("DONE");
+
+	close(fd);
+}
+
+static void test_dgram_connect_server(const struct test_opts *opts)
+{
+	test_dgram_sendto_server(opts);
+}
+
 /* With the standard socket sizes, VMCI is able to support about 100
  * concurrent stream connections.
  */
@@ -252,6 +361,77 @@ static void test_stream_multiconn_server(const struct test_opts *opts)
 		close(fds[i]);
 }
 
+static void test_dgram_multiconn_client(const struct test_opts *opts)
+{
+	int fds[MULTICONN_NFDS];
+	int i;
+	union {
+		struct sockaddr sa;
+		struct sockaddr_vm svm;
+	} addr = {
+		.svm = {
+			.svm_family = AF_VSOCK,
+			.svm_port = 1234,
+			.svm_cid = opts->peer_cid,
+		},
+	};
+
+	/* Wait for the server to be ready */
+	control_expectln("BIND");
+
+	for (i = 0; i < MULTICONN_NFDS; i++) {
+		fds[i] = socket(AF_VSOCK, SOCK_DGRAM, 0);
+		if (fds[i] < 0) {
+			perror("socket");
+			exit(EXIT_FAILURE);
+		}
+	}
+
+	for (i = 0; i < MULTICONN_NFDS; i++)
+		sendto_byte(fds[i], &addr.sa, sizeof(addr.svm), 1, 0);
+
+	/* Notify the server that the client has finished */
+	control_writeln("DONE");
+
+	for (i = 0; i < MULTICONN_NFDS; i++)
+		close(fds[i]);
+}
+
+static void test_dgram_multiconn_server(const struct test_opts *opts)
+{
+	union {
+		struct sockaddr sa;
+		struct sockaddr_vm svm;
+	} addr = {
+		.svm = {
+			.svm_family = AF_VSOCK,
+			.svm_port = 1234,
+			.svm_cid = VMADDR_CID_ANY,
+		},
+	};
+	int fd;
+	int len = sizeof(addr.sa);
+	int i;
+
+	fd = socket(AF_VSOCK, SOCK_DGRAM, 0);
+
+	if (bind(fd, &addr.sa, sizeof(addr.svm)) < 0) {
+		perror("bind");
+		exit(EXIT_FAILURE);
+	}
+
+	/* Notify the client that the server is ready */
+	control_writeln("BIND");
+
+	for (i = 0; i < MULTICONN_NFDS; i++)
+		recvfrom_byte(fd, &addr.sa, &len, 1, 0);
+
+	/* Wait for the client to finish */
+	control_expectln("DONE");
+
+	close(fd);
+}
+
 static void test_stream_msg_peek_client(const struct test_opts *opts)
 {
 	int fd;
@@ -425,6 +605,21 @@ static struct test_case test_cases[] = {
 		.run_client = test_seqpacket_msg_trunc_client,
 		.run_server = test_seqpacket_msg_trunc_server,
 	},
+	{
+		.name = "SOCK_DGRAM client close",
+		.run_client = test_dgram_sendto_client,
+		.run_server = test_dgram_sendto_server,
+	},
+	{
+		.name = "SOCK_DGRAM client connect",
+		.run_client = test_dgram_connect_client,
+		.run_server = test_dgram_connect_server,
+	},
+	{
+		.name = "SOCK_DGRAM multiple connections",
+		.run_client = test_dgram_multiconn_client,
+		.run_server = test_dgram_multiconn_server,
+	},
 	{},
 };
 
-- 
2.20.1


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [RFC v2 5/5] virtio/vsock: add sysfs for rx buf len for dgram
  2021-09-14  5:54 [RFC v2 0/5] virtio/vsock: introduce SOCK_DGRAM support Jiang Wang
                   ` (3 preceding siblings ...)
  2021-09-14  5:54 ` [RFC v2 4/5] vsock_test: add tests for vsock dgram Jiang Wang
@ 2021-09-14  5:54 ` Jiang Wang
  4 siblings, 0 replies; 8+ messages in thread
From: Jiang Wang @ 2021-09-14  5:54 UTC (permalink / raw)
  To: jiangleetcode
  Cc: virtualization, stefanha, sgarzare, mst, arseny.krasnov, jhansen,
	cong.wang, duanxiongchun, xieyongji, chaiwen.cc, Jason Wang,
	David S. Miller, Jakub Kicinski, Steven Rostedt, Ingo Molnar,
	kvm, netdev, linux-kernel

Make rx buf len configurable via sysfs

Signed-off-by: Jiang Wang <jiang.wang@bytedance.com>
---
 net/vmw_vsock/virtio_transport.c | 46 ++++++++++++++++++++++++++++++--
 1 file changed, 44 insertions(+), 2 deletions(-)

diff --git a/net/vmw_vsock/virtio_transport.c b/net/vmw_vsock/virtio_transport.c
index 8d5bfcd79555..55216d979080 100644
--- a/net/vmw_vsock/virtio_transport.c
+++ b/net/vmw_vsock/virtio_transport.c
@@ -29,6 +29,16 @@ static struct virtio_vsock __rcu *the_virtio_vsock;
 static struct virtio_vsock *the_virtio_vsock_dgram;
 static DEFINE_MUTEX(the_virtio_vsock_mutex); /* protects the_virtio_vsock */
 
+static int rx_buf_len = VIRTIO_VSOCK_DEFAULT_RX_BUF_SIZE;
+static struct kobject *kobj_ref, *kobj_ref2;
+static ssize_t  dgram_sysfs_show(struct kobject *kobj,
+				 struct kobj_attribute *attr, char *buf);
+static ssize_t  dgram_sysfs_store(struct kobject *kobj,
+				  struct kobj_attribute *attr, const char *buf,
+				  size_t count);
+static struct kobj_attribute rxbuf_attr = __ATTR(dgram_rx_buf_size, 0660, dgram_sysfs_show,
+						 dgram_sysfs_store);
+
 struct virtio_vsock {
 	struct virtio_device *vdev;
 	struct virtqueue **vqs;
@@ -362,7 +372,7 @@ virtio_transport_cancel_pkt(struct vsock_sock *vsk)
 
 static void virtio_vsock_rx_fill(struct virtio_vsock *vsock, bool is_dgram)
 {
-	int buf_len = VIRTIO_VSOCK_DEFAULT_RX_BUF_SIZE;
+	int buf_len = rx_buf_len;
 	struct virtio_vsock_pkt *pkt;
 	struct scatterlist hdr, buf, *sgs[2];
 	struct virtqueue *vq;
@@ -1027,6 +1037,23 @@ static struct virtio_driver virtio_vsock_driver = {
 	.remove = virtio_vsock_remove,
 };
 
+static ssize_t dgram_sysfs_show(struct kobject *kobj,
+				struct kobj_attribute *attr, char *buf)
+{
+	return sprintf(buf, "%d", rx_buf_len);
+}
+
+static ssize_t dgram_sysfs_store(struct kobject *kobj,
+				 struct kobj_attribute *attr, const char *buf,
+				 size_t count)
+{
+	if (kstrtou32(buf, 0, &rx_buf_len) < 0)
+		return -EINVAL;
+	if (rx_buf_len < 1024)
+		rx_buf_len = 1024;
+	return count;
+}
+
 static int __init virtio_vsock_init(void)
 {
 	int ret;
@@ -1044,8 +1071,19 @@ static int __init virtio_vsock_init(void)
 	if (ret)
 		goto out_vci;
 
-	return 0;
+	kobj_ref = kobject_create_and_add("vsock", kernel_kobj);
+	kobj_ref2 = kobject_create_and_add("virtio", kobj_ref);
+
+	/*Creating sysfs file for etx_value*/
+	ret = sysfs_create_file(kobj_ref2, &rxbuf_attr.attr);
+	if (ret)
+		goto out_sysfs;
 
+	return 0;
+out_sysfs:
+	kobject_put(kobj_ref);
+	kobject_put(kobj_ref2);
+	sysfs_remove_file(kobj_ref2, &rxbuf_attr.attr);
 out_vci:
 	vsock_core_unregister(&virtio_transport.transport);
 out_wq:
@@ -1058,6 +1096,10 @@ static void __exit virtio_vsock_exit(void)
 	unregister_virtio_driver(&virtio_vsock_driver);
 	vsock_core_unregister(&virtio_transport.transport);
 	destroy_workqueue(virtio_vsock_workqueue);
+	kobject_put(kobj_ref);
+	kobject_put(kobj_ref2);
+	sysfs_remove_file(kobj_ref2, &rxbuf_attr.attr);
+
 }
 
 module_init(virtio_vsock_init);
-- 
2.20.1


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* Re: [RFC v2 3/5] vhost/vsock: add support for vhost dgram.
  2021-09-14  5:54 ` [RFC v2 3/5] vhost/vsock: add support for vhost dgram Jiang Wang
@ 2021-09-14  6:52   ` Michael S. Tsirkin
  0 siblings, 0 replies; 8+ messages in thread
From: Michael S. Tsirkin @ 2021-09-14  6:52 UTC (permalink / raw)
  To: Jiang Wang
  Cc: jiangleetcode, virtualization, stefanha, sgarzare,
	arseny.krasnov, jhansen, cong.wang, duanxiongchun, xieyongji,
	chaiwen.cc, Jason Wang, David S. Miller, Jakub Kicinski,
	Steven Rostedt, Ingo Molnar, kvm, netdev, linux-kernel

On Tue, Sep 14, 2021 at 05:54:36AM +0000, Jiang Wang wrote:
> This patch supports dgram on vhost side, including
> tx and rx. The vhost send packets asynchronously.

how exactly is fairness ensured? what prevents one socket
from monopolizing the device? spec proposal seems to
leave it to the implementation.

> Also, ignore vq errors when vq number is larger than 2,
> so it will be comptaible with old versions.

this needs more explanation.

> Signed-off-by: Jiang Wang <jiang.wang@bytedance.com>



> ---
>  drivers/vhost/vsock.c | 217 ++++++++++++++++++++++++++++++++++++------
>  1 file changed, 189 insertions(+), 28 deletions(-)
> 
> diff --git a/drivers/vhost/vsock.c b/drivers/vhost/vsock.c
> index c79789af0365..a8755cbebd40 100644
> --- a/drivers/vhost/vsock.c
> +++ b/drivers/vhost/vsock.c
> @@ -28,7 +28,10 @@
>   * small pkts.
>   */
>  #define VHOST_VSOCK_PKT_WEIGHT 256
> +#define VHOST_VSOCK_DGRM_MAX_PENDING_PKT 128
>  
> +/* Max wait time in busy poll in microseconds */
> +#define VHOST_VSOCK_BUSY_POLL_TIMEOUT 20

While adding busy polling isn't out of the question, I would think
it should be kept separate from initial dgram support.


>  enum {
>  	VHOST_VSOCK_FEATURES = VHOST_FEATURES |
>  			       (1ULL << VIRTIO_F_ACCESS_PLATFORM) |
> @@ -46,7 +49,7 @@ static DEFINE_READ_MOSTLY_HASHTABLE(vhost_vsock_hash, 8);
>  
>  struct vhost_vsock {
>  	struct vhost_dev dev;
> -	struct vhost_virtqueue vqs[2];
> +	struct vhost_virtqueue vqs[4];
>  
>  	/* Link to global vhost_vsock_hash, writes use vhost_vsock_mutex */
>  	struct hlist_node hash;
> @@ -55,6 +58,11 @@ struct vhost_vsock {
>  	spinlock_t send_pkt_list_lock;
>  	struct list_head send_pkt_list;	/* host->guest pending packets */
>  
> +	spinlock_t dgram_send_pkt_list_lock;
> +	struct list_head dgram_send_pkt_list;	/* host->guest pending packets */
> +	struct vhost_work dgram_send_pkt_work;
> +	int  dgram_used; /*pending packets to be send */

to be sent?

> +
>  	atomic_t queued_replies;
>  
>  	u32 guest_cid;
> @@ -92,10 +100,22 @@ static void
>  vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
>  			    struct vhost_virtqueue *vq)
>  {
> -	struct vhost_virtqueue *tx_vq = &vsock->vqs[VSOCK_VQ_TX];
> +	struct vhost_virtqueue *tx_vq;
>  	int pkts = 0, total_len = 0;
>  	bool added = false;
>  	bool restart_tx = false;
> +	spinlock_t *lock;
> +	struct list_head *send_pkt_list;
> +
> +	if (vq == &vsock->vqs[VSOCK_VQ_RX]) {
> +		tx_vq = &vsock->vqs[VSOCK_VQ_TX];
> +		lock = &vsock->send_pkt_list_lock;
> +		send_pkt_list = &vsock->send_pkt_list;
> +	} else {
> +		tx_vq = &vsock->vqs[VSOCK_VQ_DGRAM_TX];
> +		lock = &vsock->dgram_send_pkt_list_lock;
> +		send_pkt_list = &vsock->dgram_send_pkt_list;
> +	}
>  
>  	mutex_lock(&vq->mutex);
>  
> @@ -116,36 +136,48 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
>  		size_t iov_len, payload_len;
>  		int head;
>  		bool restore_flag = false;
> +		bool is_dgram = false;
>  
> -		spin_lock_bh(&vsock->send_pkt_list_lock);
> -		if (list_empty(&vsock->send_pkt_list)) {
> -			spin_unlock_bh(&vsock->send_pkt_list_lock);
> +		spin_lock_bh(lock);
> +		if (list_empty(send_pkt_list)) {
> +			spin_unlock_bh(lock);
>  			vhost_enable_notify(&vsock->dev, vq);
>  			break;
>  		}
>  
> -		pkt = list_first_entry(&vsock->send_pkt_list,
> +		pkt = list_first_entry(send_pkt_list,
>  				       struct virtio_vsock_pkt, list);
>  		list_del_init(&pkt->list);
> -		spin_unlock_bh(&vsock->send_pkt_list_lock);
> +		spin_unlock_bh(lock);
> +
> +		if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_DGRAM)
> +			is_dgram = true;
>  
>  		head = vhost_get_vq_desc(vq, vq->iov, ARRAY_SIZE(vq->iov),
>  					 &out, &in, NULL, NULL);
>  		if (head < 0) {
> -			spin_lock_bh(&vsock->send_pkt_list_lock);
> -			list_add(&pkt->list, &vsock->send_pkt_list);
> -			spin_unlock_bh(&vsock->send_pkt_list_lock);
> +			spin_lock_bh(lock);
> +			list_add(&pkt->list, send_pkt_list);
> +			spin_unlock_bh(lock);
>  			break;
>  		}
>  
>  		if (head == vq->num) {
> -			spin_lock_bh(&vsock->send_pkt_list_lock);
> -			list_add(&pkt->list, &vsock->send_pkt_list);
> -			spin_unlock_bh(&vsock->send_pkt_list_lock);
> +			if (is_dgram) {
> +				virtio_transport_free_pkt(pkt);
> +				vq_err(vq, "Dgram virtqueue is full!");
> +				spin_lock_bh(lock);
> +				vsock->dgram_used--;
> +				spin_unlock_bh(lock);
> +				break;
> +			}
> +			spin_lock_bh(lock);
> +			list_add(&pkt->list, send_pkt_list);
> +			spin_unlock_bh(lock);
>  
>  			/* We cannot finish yet if more buffers snuck in while
> -			 * re-enabling notify.
> -			 */
> +			* re-enabling notify.
> +			*/

looks like you are breaking alignment of comments...

>  			if (unlikely(vhost_enable_notify(&vsock->dev, vq))) {
>  				vhost_disable_notify(&vsock->dev, vq);
>  				continue;
> @@ -156,6 +188,12 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
>  		if (out) {
>  			virtio_transport_free_pkt(pkt);
>  			vq_err(vq, "Expected 0 output buffers, got %u\n", out);
> +			if (is_dgram) {
> +				spin_lock_bh(lock);
> +				vsock->dgram_used--;
> +				spin_unlock_bh(lock);
> +			}
> +
>  			break;
>  		}
>  
> @@ -163,6 +201,18 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
>  		if (iov_len < sizeof(pkt->hdr)) {
>  			virtio_transport_free_pkt(pkt);
>  			vq_err(vq, "Buffer len [%zu] too small\n", iov_len);
> +			if (is_dgram) {
> +				spin_lock_bh(lock);
> +				vsock->dgram_used--;
> +				spin_unlock_bh(lock);
> +			}
> +			break;
> +		}
> +
> +		if (iov_len < pkt->len - pkt->off &&
> +			vq == &vsock->vqs[VSOCK_VQ_DGRAM_RX]) {
> +			virtio_transport_free_pkt(pkt);
> +			vq_err(vq, "Buffer len [%zu] too small for dgram\n", iov_len);
>  			break;
>  		}
>  
> @@ -199,6 +249,11 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
>  		if (nbytes != sizeof(pkt->hdr)) {
>  			virtio_transport_free_pkt(pkt);
>  			vq_err(vq, "Faulted on copying pkt hdr\n");
> +			if (is_dgram) {
> +				spin_lock_bh(lock);
> +				vsock->dgram_used--;
> +				spin_unlock_bh(lock);
> +			}
>  			break;
>  		}
>  
> @@ -224,19 +279,19 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
>  		/* If we didn't send all the payload we can requeue the packet
>  		 * to send it with the next available buffer.
>  		 */
> -		if (pkt->off < pkt->len) {
> +		if ((pkt->off < pkt->len)
> +			&& (vq == &vsock->vqs[VSOCK_VQ_RX])) {

&& on the previous line and no need for extra ().

>  			if (restore_flag)
>  				pkt->hdr.flags |= cpu_to_le32(VIRTIO_VSOCK_SEQ_EOR);
> -

no need to drop this imho.

>  			/* We are queueing the same virtio_vsock_pkt to handle
>  			 * the remaining bytes, and we want to deliver it
>  			 * to monitoring devices in the next iteration.
>  			 */
>  			pkt->tap_delivered = false;
>  
> -			spin_lock_bh(&vsock->send_pkt_list_lock);
> -			list_add(&pkt->list, &vsock->send_pkt_list);
> -			spin_unlock_bh(&vsock->send_pkt_list_lock);
> +			spin_lock_bh(lock);
> +			list_add(&pkt->list, send_pkt_list);
> +			spin_unlock_bh(lock);
>  		} else {
>  			if (pkt->reply) {
>  				int val;
> @@ -251,6 +306,11 @@ vhost_transport_do_send_pkt(struct vhost_vsock *vsock,
>  			}
>  
>  			virtio_transport_free_pkt(pkt);
> +			if (is_dgram) {
> +				spin_lock_bh(lock);
> +				vsock->dgram_used--;
> +				spin_unlock_bh(lock);
> +			}
>  		}
>  	} while(likely(!vhost_exceeds_weight(vq, ++pkts, total_len)));
>  	if (added)
> @@ -274,11 +334,25 @@ static void vhost_transport_send_pkt_work(struct vhost_work *work)
>  	vhost_transport_do_send_pkt(vsock, vq);
>  }
>  
> +static void vhost_transport_dgram_send_pkt_work(struct vhost_work *work)
> +{
> +	struct vhost_virtqueue *vq;
> +	struct vhost_vsock *vsock;
> +
> +	vsock = container_of(work, struct vhost_vsock, dgram_send_pkt_work);
> +	vq = &vsock->vqs[VSOCK_VQ_DGRAM_RX];
> +
> +	vhost_transport_do_send_pkt(vsock, vq);
> +}
> +
>  static int
>  vhost_transport_send_pkt(struct virtio_vsock_pkt *pkt)
>  {
>  	struct vhost_vsock *vsock;
>  	int len = pkt->len;
> +	spinlock_t *lock;
> +	struct list_head *send_pkt_list;
> +	struct vhost_work *work;
>  
>  	rcu_read_lock();
>  
> @@ -290,14 +364,39 @@ vhost_transport_send_pkt(struct virtio_vsock_pkt *pkt)
>  		return -ENODEV;
>  	}
>  
> +	if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_STREAM ||
> +	     le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_SEQPACKET) {
> +		lock = &vsock->send_pkt_list_lock;
> +		send_pkt_list = &vsock->send_pkt_list;
> +		work = &vsock->send_pkt_work;
> +	} else if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_DGRAM) {
> +		lock = &vsock->dgram_send_pkt_list_lock;
> +		send_pkt_list = &vsock->dgram_send_pkt_list;
> +		work = &vsock->dgram_send_pkt_work;
> +	} else {
> +		rcu_read_unlock();
> +		virtio_transport_free_pkt(pkt);
> +		return -EINVAL;
> +	}
> +
> +
>  	if (pkt->reply)
>  		atomic_inc(&vsock->queued_replies);
>  
> -	spin_lock_bh(&vsock->send_pkt_list_lock);
> -	list_add_tail(&pkt->list, &vsock->send_pkt_list);
> -	spin_unlock_bh(&vsock->send_pkt_list_lock);
> +	spin_lock_bh(lock);
> +	if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_DGRAM) {
> +		if (vsock->dgram_used  == VHOST_VSOCK_DGRM_MAX_PENDING_PKT)
> +			len = -ENOMEM;
> +		else {
> +			vsock->dgram_used++;
> +			list_add_tail(&pkt->list, send_pkt_list);
> +		}
> +	} else
> +		list_add_tail(&pkt->list, send_pkt_list);
>  
> -	vhost_work_queue(&vsock->dev, &vsock->send_pkt_work);
> +	spin_unlock_bh(lock);
> +
> +	vhost_work_queue(&vsock->dev, work);
>  
>  	rcu_read_unlock();
>  	return len;
> @@ -487,6 +586,18 @@ static bool vhost_transport_seqpacket_allow(u32 remote_cid)
>  	return seqpacket_allow;
>  }
>  
> +static inline unsigned long busy_clock(void)
> +{
> +	return local_clock() >> 10;
> +}
> +
> +static bool vhost_can_busy_poll(unsigned long endtime)
> +{
> +	return likely(!need_resched() && !time_after(busy_clock(), endtime) &&
> +		      !signal_pending(current));
> +}
> +
> +
>  static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
>  {
>  	struct vhost_virtqueue *vq = container_of(work, struct vhost_virtqueue,
> @@ -497,6 +608,8 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
>  	int head, pkts = 0, total_len = 0;
>  	unsigned int out, in;
>  	bool added = false;
> +	unsigned long busyloop_timeout = VHOST_VSOCK_BUSY_POLL_TIMEOUT;
> +	unsigned long endtime;
>  
>  	mutex_lock(&vq->mutex);
>  
> @@ -506,11 +619,14 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
>  	if (!vq_meta_prefetch(vq))
>  		goto out;
>  
> +	endtime = busy_clock() + busyloop_timeout;
>  	vhost_disable_notify(&vsock->dev, vq);
> +	preempt_disable();

That's quite a bit of work done with preempt disabled. Why?
What's going on here?

>  	do {
>  		u32 len;
>  
> -		if (!vhost_vsock_more_replies(vsock)) {
> +		if (vq == &vsock->vqs[VSOCK_VQ_TX]
> +			&& !vhost_vsock_more_replies(vsock)) {
>  			/* Stop tx until the device processes already
>  			 * pending replies.  Leave tx virtqueue
>  			 * callbacks disabled.
> @@ -524,6 +640,11 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
>  			break;
>  
>  		if (head == vq->num) {
> +			if (vhost_can_busy_poll(endtime)) {
> +				cpu_relax();
> +				continue;
> +			}
> +
>  			if (unlikely(vhost_enable_notify(&vsock->dev, vq))) {
>  				vhost_disable_notify(&vsock->dev, vq);
>  				continue;
> @@ -555,6 +676,7 @@ static void vhost_vsock_handle_tx_kick(struct vhost_work *work)
>  		total_len += len;
>  		added = true;
>  	} while(likely(!vhost_exceeds_weight(vq, ++pkts, total_len)));
> +	preempt_enable();
>  
>  no_more_replies:
>  	if (added)
> @@ -593,14 +715,30 @@ static int vhost_vsock_start(struct vhost_vsock *vsock)
>  
>  		if (!vhost_vq_access_ok(vq)) {
>  			ret = -EFAULT;
> +			/* when running with old guest and qemu, vq 2 may
> +			 * not exist, so return 0 in this case.

Can't the new stuff be gated by a feature bit as usual?

> +			 */
> +			if (i == 2) {
> +				mutex_unlock(&vq->mutex);
> +				break;
> +			}
>  			goto err_vq;
>  		}
>  
>  		if (!vhost_vq_get_backend(vq)) {
>  			vhost_vq_set_backend(vq, vsock);
>  			ret = vhost_vq_init_access(vq);
> -			if (ret)
> -				goto err_vq;
> +			if (ret) {
> +				mutex_unlock(&vq->mutex);
> +				/* when running with old guest and qemu, vq 2 may
> +				 * not exist, so return 0 in this case.
> +				 */
> +				if (i == 2) {
> +					mutex_unlock(&vq->mutex);
> +					break;
> +				}
> +				goto err;
> +			}
>  		}
>  
>  		mutex_unlock(&vq->mutex);
> @@ -610,6 +748,7 @@ static int vhost_vsock_start(struct vhost_vsock *vsock)
>  	 * let's kick the send worker to send them.
>  	 */
>  	vhost_work_queue(&vsock->dev, &vsock->send_pkt_work);
> +	vhost_work_queue(&vsock->dev, &vsock->dgram_send_pkt_work);
>  
>  	mutex_unlock(&vsock->dev.mutex);
>  	return 0;
> @@ -684,8 +823,14 @@ static int vhost_vsock_dev_open(struct inode *inode, struct file *file)
>  
>  	vqs[VSOCK_VQ_TX] = &vsock->vqs[VSOCK_VQ_TX];
>  	vqs[VSOCK_VQ_RX] = &vsock->vqs[VSOCK_VQ_RX];
> +	vqs[VSOCK_VQ_DGRAM_TX] = &vsock->vqs[VSOCK_VQ_DGRAM_TX];
> +	vqs[VSOCK_VQ_DGRAM_RX] = &vsock->vqs[VSOCK_VQ_DGRAM_RX];
>  	vsock->vqs[VSOCK_VQ_TX].handle_kick = vhost_vsock_handle_tx_kick;
>  	vsock->vqs[VSOCK_VQ_RX].handle_kick = vhost_vsock_handle_rx_kick;
> +	vsock->vqs[VSOCK_VQ_DGRAM_TX].handle_kick =
> +						vhost_vsock_handle_tx_kick;
> +	vsock->vqs[VSOCK_VQ_DGRAM_RX].handle_kick =
> +						vhost_vsock_handle_rx_kick;
>  
>  	vhost_dev_init(&vsock->dev, vqs, ARRAY_SIZE(vsock->vqs),
>  		       UIO_MAXIOV, VHOST_VSOCK_PKT_WEIGHT,
> @@ -695,6 +840,11 @@ static int vhost_vsock_dev_open(struct inode *inode, struct file *file)
>  	spin_lock_init(&vsock->send_pkt_list_lock);
>  	INIT_LIST_HEAD(&vsock->send_pkt_list);
>  	vhost_work_init(&vsock->send_pkt_work, vhost_transport_send_pkt_work);
> +	spin_lock_init(&vsock->dgram_send_pkt_list_lock);
> +	INIT_LIST_HEAD(&vsock->dgram_send_pkt_list);
> +	vhost_work_init(&vsock->dgram_send_pkt_work,
> +			vhost_transport_dgram_send_pkt_work);
> +
>  	return 0;
>  
>  out:
> @@ -769,6 +919,17 @@ static int vhost_vsock_dev_release(struct inode *inode, struct file *file)
>  	}
>  	spin_unlock_bh(&vsock->send_pkt_list_lock);
>  
> +	spin_lock_bh(&vsock->dgram_send_pkt_list_lock);
> +	while (!list_empty(&vsock->dgram_send_pkt_list)) {
> +		struct virtio_vsock_pkt *pkt;
> +
> +		pkt = list_first_entry(&vsock->dgram_send_pkt_list,
> +				struct virtio_vsock_pkt, list);
> +		list_del_init(&pkt->list);
> +		virtio_transport_free_pkt(pkt);
> +	}
> +	spin_unlock_bh(&vsock->dgram_send_pkt_list_lock);
> +
>  	vhost_dev_cleanup(&vsock->dev);
>  	kfree(vsock->dev.vqs);
>  	vhost_vsock_free(vsock);
> @@ -954,7 +1115,7 @@ static int __init vhost_vsock_init(void)
>  	int ret;
>  
>  	ret = vsock_core_register(&vhost_transport.transport,
> -				  VSOCK_TRANSPORT_F_H2G);
> +				  VSOCK_TRANSPORT_F_H2G | VSOCK_TRANSPORT_F_DGRAM);
>  	if (ret < 0)
>  		return ret;
>  	return misc_register(&vhost_vsock_misc);
> -- 
> 2.20.1


^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [RFC v2 2/5] virtio/vsock: add support for virtio datagram
  2021-09-14  5:54 ` [RFC v2 2/5] virtio/vsock: add support for virtio datagram Jiang Wang
@ 2021-09-14  6:56   ` Michael S. Tsirkin
  0 siblings, 0 replies; 8+ messages in thread
From: Michael S. Tsirkin @ 2021-09-14  6:56 UTC (permalink / raw)
  To: Jiang Wang
  Cc: jiangleetcode, virtualization, stefanha, sgarzare,
	arseny.krasnov, jhansen, cong.wang, duanxiongchun, xieyongji,
	chaiwen.cc, Jason Wang, David S. Miller, Jakub Kicinski,
	Steven Rostedt, Ingo Molnar, kvm, netdev, linux-kernel

On Tue, Sep 14, 2021 at 05:54:35AM +0000, Jiang Wang wrote:
> This patch add support for virtio dgram for the driver.
> Implemented related functions for tx and rx, enqueue
> and dequeue. Send packets synchronously to give sender
> indication when the virtqueue is full.


Hmm I don't see this in code.
virtio_transport_do_send_dgram_pkt just does add buf and returns.

In any case, how exactly is fairness handled?
what prevents one socket from monopolizing the device?

> Refactored virtio_transport_send_pkt_work() a little bit but
> no functions changes for it.

split the refactoring to a separate patch then pls.

> 
> Support for the host/device side is in another
> patch.
> 
> Signed-off-by: Jiang Wang <jiang.wang@bytedance.com>
> ---
>  include/net/af_vsock.h                        |   1 +
>  .../events/vsock_virtio_transport_common.h    |   2 +
>  include/uapi/linux/virtio_vsock.h             |   1 +
>  net/vmw_vsock/af_vsock.c                      |  12 +
>  net/vmw_vsock/virtio_transport.c              | 344 +++++++++++++++---
>  net/vmw_vsock/virtio_transport_common.c       | 181 ++++++++-
>  6 files changed, 467 insertions(+), 74 deletions(-)
> 
> diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
> index ab207677e0a8..58c46c694670 100644
> --- a/include/net/af_vsock.h
> +++ b/include/net/af_vsock.h
> @@ -208,6 +208,7 @@ void vsock_remove_sock(struct vsock_sock *vsk);
>  void vsock_for_each_connected_socket(void (*fn)(struct sock *sk));
>  int vsock_assign_transport(struct vsock_sock *vsk, struct vsock_sock *psk);
>  bool vsock_find_cid(unsigned int cid);
> +int vsock_bind_stream(struct vsock_sock *vsk, struct sockaddr_vm *addr);
>  
>  /**** TAP ****/
>  
> diff --git a/include/trace/events/vsock_virtio_transport_common.h b/include/trace/events/vsock_virtio_transport_common.h
> index d0b3f0ea9ba1..1d8647a6b476 100644
> --- a/include/trace/events/vsock_virtio_transport_common.h
> +++ b/include/trace/events/vsock_virtio_transport_common.h
> @@ -10,10 +10,12 @@
>  
>  TRACE_DEFINE_ENUM(VIRTIO_VSOCK_TYPE_STREAM);
>  TRACE_DEFINE_ENUM(VIRTIO_VSOCK_TYPE_SEQPACKET);
> +TRACE_DEFINE_ENUM(VIRTIO_VSOCK_TYPE_DGRAM);
>  
>  #define show_type(val) \
>  	__print_symbolic(val, \
>  			 { VIRTIO_VSOCK_TYPE_STREAM, "STREAM" }, \
> +			 { VIRTIO_VSOCK_TYPE_DGRAM, "DGRAM" }, \
>  			 { VIRTIO_VSOCK_TYPE_SEQPACKET, "SEQPACKET" })
>  
>  TRACE_DEFINE_ENUM(VIRTIO_VSOCK_OP_INVALID);
> diff --git a/include/uapi/linux/virtio_vsock.h b/include/uapi/linux/virtio_vsock.h
> index cff54ba9b924..3e93b75f2707 100644
> --- a/include/uapi/linux/virtio_vsock.h
> +++ b/include/uapi/linux/virtio_vsock.h
> @@ -71,6 +71,7 @@ struct virtio_vsock_hdr {
>  enum virtio_vsock_type {
>  	VIRTIO_VSOCK_TYPE_STREAM = 1,
>  	VIRTIO_VSOCK_TYPE_SEQPACKET = 2,
> +	VIRTIO_VSOCK_TYPE_DGRAM = 3,
>  };
>  
>  enum virtio_vsock_op {
> diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
> index 3e02cc3b24f8..adf11db32506 100644
> --- a/net/vmw_vsock/af_vsock.c
> +++ b/net/vmw_vsock/af_vsock.c
> @@ -669,6 +669,18 @@ static int __vsock_bind_connectible(struct vsock_sock *vsk,
>  	return 0;
>  }
>  
> +int vsock_bind_stream(struct vsock_sock *vsk,
> +		      struct sockaddr_vm *addr)
> +{
> +	int retval;
> +
> +	spin_lock_bh(&vsock_table_lock);
> +	retval = __vsock_bind_connectible(vsk, addr);
> +	spin_unlock_bh(&vsock_table_lock);
> +	return retval;
> +}
> +EXPORT_SYMBOL(vsock_bind_stream);
> +
>  static int __vsock_bind_dgram(struct vsock_sock *vsk,
>  			      struct sockaddr_vm *addr)
>  {
> diff --git a/net/vmw_vsock/virtio_transport.c b/net/vmw_vsock/virtio_transport.c
> index bb89f538f5f3..8d5bfcd79555 100644
> --- a/net/vmw_vsock/virtio_transport.c
> +++ b/net/vmw_vsock/virtio_transport.c
> @@ -20,21 +20,29 @@
>  #include <net/sock.h>
>  #include <linux/mutex.h>
>  #include <net/af_vsock.h>
> +#include<linux/kobject.h>
> +#include<linux/sysfs.h>
> +#include <linux/refcount.h>
>  
>  static struct workqueue_struct *virtio_vsock_workqueue;
>  static struct virtio_vsock __rcu *the_virtio_vsock;
> +static struct virtio_vsock *the_virtio_vsock_dgram;
>  static DEFINE_MUTEX(the_virtio_vsock_mutex); /* protects the_virtio_vsock */
>  
>  struct virtio_vsock {
>  	struct virtio_device *vdev;
>  	struct virtqueue **vqs;
>  	bool has_dgram;
> +	refcount_t active;
>  
>  	/* Virtqueue processing is deferred to a workqueue */
>  	struct work_struct tx_work;
>  	struct work_struct rx_work;
>  	struct work_struct event_work;
>  
> +	struct work_struct dgram_tx_work;
> +	struct work_struct dgram_rx_work;
> +
>  	/* The following fields are protected by tx_lock.  vqs[VSOCK_VQ_TX]
>  	 * must be accessed with tx_lock held.
>  	 */
> @@ -55,6 +63,22 @@ struct virtio_vsock {
>  	int rx_buf_nr;
>  	int rx_buf_max_nr;
>  
> +	/* The following fields are protected by dgram_tx_lock.  vqs[VSOCK_VQ_DGRAM_TX]
> +	 * must be accessed with dgram_tx_lock held.
> +	 */
> +	struct mutex dgram_tx_lock;
> +	bool dgram_tx_run;
> +
> +	atomic_t dgram_queued_replies;
> +
> +	/* The following fields are protected by dgram_rx_lock.  vqs[VSOCK_VQ_DGRAM_RX]
> +	 * must be accessed with dgram_rx_lock held.
> +	 */
> +	struct mutex dgram_rx_lock;
> +	bool dgram_rx_run;
> +	int dgram_rx_buf_nr;
> +	int dgram_rx_buf_max_nr;
> +
>  	/* The following fields are protected by event_lock.
>  	 * vqs[VSOCK_VQ_EVENT] must be accessed with event_lock held.
>  	 */
> @@ -84,21 +108,12 @@ static u32 virtio_transport_get_local_cid(void)
>  	return ret;
>  }
>  
> -static void
> -virtio_transport_send_pkt_work(struct work_struct *work)
> +static void virtio_transport_do_send_pkt(struct virtio_vsock *vsock,
> +					 struct virtqueue *vq,  spinlock_t *lock,
> +					 struct list_head *send_pkt_list,
> +					 bool *restart_rx)
>  {
> -	struct virtio_vsock *vsock =
> -		container_of(work, struct virtio_vsock, send_pkt_work);
> -	struct virtqueue *vq;
>  	bool added = false;
> -	bool restart_rx = false;
> -
> -	mutex_lock(&vsock->tx_lock);
> -
> -	if (!vsock->tx_run)
> -		goto out;
> -
> -	vq = vsock->vqs[VSOCK_VQ_TX];
>  
>  	for (;;) {
>  		struct virtio_vsock_pkt *pkt;
> @@ -106,16 +121,16 @@ virtio_transport_send_pkt_work(struct work_struct *work)
>  		int ret, in_sg = 0, out_sg = 0;
>  		bool reply;
>  
> -		spin_lock_bh(&vsock->send_pkt_list_lock);
> -		if (list_empty(&vsock->send_pkt_list)) {
> -			spin_unlock_bh(&vsock->send_pkt_list_lock);
> +		spin_lock_bh(lock);
> +		if (list_empty(send_pkt_list)) {
> +			spin_unlock_bh(lock);
>  			break;
>  		}
>  
> -		pkt = list_first_entry(&vsock->send_pkt_list,
> +		pkt = list_first_entry(send_pkt_list,
>  				       struct virtio_vsock_pkt, list);
>  		list_del_init(&pkt->list);
> -		spin_unlock_bh(&vsock->send_pkt_list_lock);
> +		spin_unlock_bh(lock);
>  
>  		virtio_transport_deliver_tap_pkt(pkt);
>  
> @@ -133,9 +148,9 @@ virtio_transport_send_pkt_work(struct work_struct *work)
>  		 * the vq
>  		 */
>  		if (ret < 0) {
> -			spin_lock_bh(&vsock->send_pkt_list_lock);
> -			list_add(&pkt->list, &vsock->send_pkt_list);
> -			spin_unlock_bh(&vsock->send_pkt_list_lock);
> +			spin_lock_bh(lock);
> +			list_add(&pkt->list, send_pkt_list);
> +			spin_unlock_bh(lock);
>  			break;
>  		}
>  
> @@ -147,7 +162,7 @@ virtio_transport_send_pkt_work(struct work_struct *work)
>  
>  			/* Do we now have resources to resume rx processing? */
>  			if (val + 1 == virtqueue_get_vring_size(rx_vq))
> -				restart_rx = true;
> +				*restart_rx = true;
>  		}
>  
>  		added = true;
> @@ -155,7 +170,55 @@ virtio_transport_send_pkt_work(struct work_struct *work)
>  
>  	if (added)
>  		virtqueue_kick(vq);
> +}
> +
> +static int virtio_transport_do_send_dgram_pkt(struct virtio_vsock *vsock,
> +					      struct virtqueue *vq,
> +					      struct virtio_vsock_pkt *pkt)
> +{
> +	struct scatterlist hdr, buf, *sgs[2];
> +	int ret, in_sg = 0, out_sg = 0;
> +
> +	virtio_transport_deliver_tap_pkt(pkt);
> +
> +	sg_init_one(&hdr, &pkt->hdr, sizeof(pkt->hdr));
> +	sgs[out_sg++] = &hdr;
> +	if (pkt->buf) {
> +		sg_init_one(&buf, pkt->buf, pkt->len);
> +		sgs[out_sg++] = &buf;
> +	}
> +
> +	ret = virtqueue_add_sgs(vq, sgs, out_sg, in_sg, pkt, GFP_KERNEL);
> +	/* Usually this means that there is no more space available in
> +	 * the vq
> +	 */
> +	if (ret < 0) {
> +		virtio_transport_free_pkt(pkt);
> +		return -ENOMEM;
> +	}
> +
> +	virtqueue_kick(vq);
> +
> +	return pkt->len;
> +}
> +
> +static void
> +virtio_transport_send_pkt_work(struct work_struct *work)
> +{
> +	struct virtio_vsock *vsock =
> +		container_of(work, struct virtio_vsock, send_pkt_work);
> +	struct virtqueue *vq;
> +	bool restart_rx = false;
>  
> +	mutex_lock(&vsock->tx_lock);
> +
> +	if (!vsock->tx_run)
> +		goto out;
> +
> +	vq = vsock->vqs[VSOCK_VQ_TX];
> +
> +	virtio_transport_do_send_pkt(vsock, vq, &vsock->send_pkt_list_lock,
> +				     &vsock->send_pkt_list, &restart_rx);
>  out:
>  	mutex_unlock(&vsock->tx_lock);
>  
> @@ -163,12 +226,65 @@ virtio_transport_send_pkt_work(struct work_struct *work)
>  		queue_work(virtio_vsock_workqueue, &vsock->rx_work);
>  }
>  
> +static int
> +virtio_transport_send_dgram_pkt(struct virtio_vsock_pkt *pkt)
> +{
> +	struct virtio_vsock *vsock;
> +	int len = pkt->len;
> +	struct virtqueue *vq;
> +
> +	vsock = the_virtio_vsock_dgram;
> +
> +	if (!vsock) {
> +		virtio_transport_free_pkt(pkt);
> +		return -ENODEV;
> +	}
> +
> +	if (!vsock->dgram_tx_run) {
> +		virtio_transport_free_pkt(pkt);
> +		return -ENODEV;
> +	}
> +
> +	if (!refcount_inc_not_zero(&vsock->active)) {
> +		virtio_transport_free_pkt(pkt);
> +		return -ENODEV;
> +	}
> +
> +	if (le64_to_cpu(pkt->hdr.dst_cid) == vsock->guest_cid) {
> +		virtio_transport_free_pkt(pkt);
> +		len = -ENODEV;
> +		goto out_ref;
> +	}
> +
> +	/* send the pkt */
> +	mutex_lock(&vsock->dgram_tx_lock);
> +
> +	if (!vsock->dgram_tx_run)
> +		goto out_mutex;
> +
> +	vq = vsock->vqs[VSOCK_VQ_DGRAM_TX];
> +
> +	len = virtio_transport_do_send_dgram_pkt(vsock, vq, pkt);
> +
> +out_mutex:
> +	mutex_unlock(&vsock->dgram_tx_lock);
> +
> +out_ref:
> +	if (!refcount_dec_not_one(&vsock->active))
> +		return -EFAULT;
> +
> +	return len;
> +}
> +
>  static int
>  virtio_transport_send_pkt(struct virtio_vsock_pkt *pkt)
>  {
>  	struct virtio_vsock *vsock;
>  	int len = pkt->len;
>  
> +	if (pkt->hdr.type == VIRTIO_VSOCK_TYPE_DGRAM)
> +		return virtio_transport_send_dgram_pkt(pkt);
> +
>  	rcu_read_lock();
>  	vsock = rcu_dereference(the_virtio_vsock);
>  	if (!vsock) {
> @@ -244,7 +360,7 @@ virtio_transport_cancel_pkt(struct vsock_sock *vsk)
>  	return ret;
>  }
>  
> -static void virtio_vsock_rx_fill(struct virtio_vsock *vsock)
> +static void virtio_vsock_rx_fill(struct virtio_vsock *vsock, bool is_dgram)
>  {
>  	int buf_len = VIRTIO_VSOCK_DEFAULT_RX_BUF_SIZE;
>  	struct virtio_vsock_pkt *pkt;
> @@ -252,7 +368,10 @@ static void virtio_vsock_rx_fill(struct virtio_vsock *vsock)
>  	struct virtqueue *vq;
>  	int ret;
>  
> -	vq = vsock->vqs[VSOCK_VQ_RX];
> +	if (is_dgram)
> +		vq = vsock->vqs[VSOCK_VQ_DGRAM_RX];
> +	else
> +		vq = vsock->vqs[VSOCK_VQ_RX];
>  
>  	do {
>  		pkt = kzalloc(sizeof(*pkt), GFP_KERNEL);
> @@ -278,26 +397,26 @@ static void virtio_vsock_rx_fill(struct virtio_vsock *vsock)
>  			virtio_transport_free_pkt(pkt);
>  			break;
>  		}
> -		vsock->rx_buf_nr++;
> +		if (is_dgram)
> +			vsock->dgram_rx_buf_nr++;
> +		else
> +			vsock->rx_buf_nr++;
>  	} while (vq->num_free);
> -	if (vsock->rx_buf_nr > vsock->rx_buf_max_nr)
> -		vsock->rx_buf_max_nr = vsock->rx_buf_nr;
> +	if (is_dgram) {
> +		if (vsock->dgram_rx_buf_nr > vsock->dgram_rx_buf_max_nr)
> +			vsock->dgram_rx_buf_max_nr = vsock->dgram_rx_buf_nr;
> +	} else {
> +		if (vsock->rx_buf_nr > vsock->rx_buf_max_nr)
> +			vsock->rx_buf_max_nr = vsock->rx_buf_nr;
> +	}
> +
>  	virtqueue_kick(vq);
>  }
>  
> -static void virtio_transport_tx_work(struct work_struct *work)
> +static bool virtio_transport_free_pkt_batch(struct virtqueue *vq)
>  {
> -	struct virtio_vsock *vsock =
> -		container_of(work, struct virtio_vsock, tx_work);
> -	struct virtqueue *vq;
>  	bool added = false;
>  
> -	vq = vsock->vqs[VSOCK_VQ_TX];
> -	mutex_lock(&vsock->tx_lock);
> -
> -	if (!vsock->tx_run)
> -		goto out;
> -
>  	do {
>  		struct virtio_vsock_pkt *pkt;
>  		unsigned int len;
> @@ -309,13 +428,43 @@ static void virtio_transport_tx_work(struct work_struct *work)
>  		}
>  	} while (!virtqueue_enable_cb(vq));
>  
> -out:
> +	return added;
> +}
> +
> +static void virtio_transport_tx_work(struct work_struct *work)
> +{
> +	struct virtio_vsock *vsock =
> +		container_of(work, struct virtio_vsock, tx_work);
> +	struct virtqueue *vq;
> +	bool added = false;
> +
> +	vq = vsock->vqs[VSOCK_VQ_TX];
> +	mutex_lock(&vsock->tx_lock);
> +
> +	if (vsock->tx_run)
> +		added = virtio_transport_free_pkt_batch(vq);
> +
>  	mutex_unlock(&vsock->tx_lock);
>  
>  	if (added)
>  		queue_work(virtio_vsock_workqueue, &vsock->send_pkt_work);
>  }
>  
> +static void virtio_transport_dgram_tx_work(struct work_struct *work)
> +{
> +	struct virtio_vsock *vsock =
> +		container_of(work, struct virtio_vsock, dgram_tx_work);
> +	struct virtqueue *vq;
> +
> +	vq = vsock->vqs[VSOCK_VQ_DGRAM_TX];
> +	mutex_lock(&vsock->dgram_tx_lock);
> +
> +	if (vsock->dgram_tx_run)
> +		virtio_transport_free_pkt_batch(vq);
> +
> +	mutex_unlock(&vsock->dgram_tx_lock);
> +}
> +
>  /* Is there space left for replies to rx packets? */
>  static bool virtio_transport_more_replies(struct virtio_vsock *vsock)
>  {
> @@ -453,6 +602,11 @@ static void virtio_vsock_tx_done(struct virtqueue *vq)
>  
>  static void virtio_vsock_dgram_tx_done(struct virtqueue *vq)
>  {
> +	struct virtio_vsock *vsock = vq->vdev->priv;
> +
> +	if (!vsock)
> +		return;
> +	queue_work(virtio_vsock_workqueue, &vsock->dgram_tx_work);
>  }
>  
>  static void virtio_vsock_rx_done(struct virtqueue *vq)
> @@ -468,8 +622,12 @@ static bool virtio_transport_seqpacket_allow(u32 remote_cid);
>  
>  static void virtio_vsock_dgram_rx_done(struct virtqueue *vq)
>  {
> -}
> +	struct virtio_vsock *vsock = vq->vdev->priv;
>  
> +	if (!vsock)
> +		return;
> +	queue_work(virtio_vsock_workqueue, &vsock->dgram_rx_work);
> +}
>  static struct virtio_transport virtio_transport = {
>  	.transport = {
>  		.module                   = THIS_MODULE,
> @@ -532,19 +690,9 @@ static bool virtio_transport_seqpacket_allow(u32 remote_cid)
>  	return seqpacket_allow;
>  }
>  
> -static void virtio_transport_rx_work(struct work_struct *work)
> +static void virtio_transport_do_rx_work(struct virtio_vsock *vsock,
> +					struct virtqueue *vq, bool is_dgram)
>  {
> -	struct virtio_vsock *vsock =
> -		container_of(work, struct virtio_vsock, rx_work);
> -	struct virtqueue *vq;
> -
> -	vq = vsock->vqs[VSOCK_VQ_RX];
> -
> -	mutex_lock(&vsock->rx_lock);
> -
> -	if (!vsock->rx_run)
> -		goto out;
> -
>  	do {
>  		virtqueue_disable_cb(vq);
>  		for (;;) {
> @@ -564,7 +712,10 @@ static void virtio_transport_rx_work(struct work_struct *work)
>  				break;
>  			}
>  
> -			vsock->rx_buf_nr--;
> +			if (is_dgram)
> +				vsock->dgram_rx_buf_nr--;
> +			else
> +				vsock->rx_buf_nr--;
>  
>  			/* Drop short/long packets */
>  			if (unlikely(len < sizeof(pkt->hdr) ||
> @@ -580,11 +731,45 @@ static void virtio_transport_rx_work(struct work_struct *work)
>  	} while (!virtqueue_enable_cb(vq));
>  
>  out:
> +	return;
> +}
> +
> +static void virtio_transport_rx_work(struct work_struct *work)
> +{
> +	struct virtio_vsock *vsock =
> +		container_of(work, struct virtio_vsock, rx_work);
> +	struct virtqueue *vq;
> +
> +	vq = vsock->vqs[VSOCK_VQ_RX];
> +
> +	mutex_lock(&vsock->rx_lock);
> +
> +	if (vsock->rx_run)
> +		virtio_transport_do_rx_work(vsock, vq, false);
> +
>  	if (vsock->rx_buf_nr < vsock->rx_buf_max_nr / 2)
> -		virtio_vsock_rx_fill(vsock);
> +		virtio_vsock_rx_fill(vsock, false);
>  	mutex_unlock(&vsock->rx_lock);
>  }
>  
> +static void virtio_transport_dgram_rx_work(struct work_struct *work)
> +{
> +	struct virtio_vsock *vsock =
> +		container_of(work, struct virtio_vsock, dgram_rx_work);
> +	struct virtqueue *vq;
> +
> +	vq = vsock->vqs[VSOCK_VQ_DGRAM_RX];
> +
> +	mutex_lock(&vsock->dgram_rx_lock);
> +
> +	if (vsock->dgram_rx_run)
> +		virtio_transport_do_rx_work(vsock, vq, true);
> +
> +	if (vsock->dgram_rx_buf_nr < vsock->dgram_rx_buf_max_nr / 2)
> +		virtio_vsock_rx_fill(vsock, true);
> +	mutex_unlock(&vsock->dgram_rx_lock);
> +}
> +
>  static int virtio_vsock_probe(struct virtio_device *vdev)
>  {
>  	vq_callback_t *callbacks[] = {
> @@ -592,7 +777,7 @@ static int virtio_vsock_probe(struct virtio_device *vdev)
>  		virtio_vsock_tx_done,
>  		virtio_vsock_event_done,
>  	};
> -	vq_callback_t *ex_callbacks[] = {
> +	vq_callback_t *dgram_callbacks[] = {
>  		virtio_vsock_rx_done,
>  		virtio_vsock_tx_done,
>  		virtio_vsock_dgram_rx_done,
> @@ -651,7 +836,7 @@ static int virtio_vsock_probe(struct virtio_device *vdev)
>  
>  	if (vsock->has_dgram) {
>  		ret = virtio_find_vqs(vsock->vdev, max_vq,
> -				      vsock->vqs, ex_callbacks, ex_names,
> +				      vsock->vqs, dgram_callbacks, ex_names,
>  				      NULL);
>  	} else {
>  		ret = virtio_find_vqs(vsock->vdev, max_vq,
> @@ -668,8 +853,14 @@ static int virtio_vsock_probe(struct virtio_device *vdev)
>  	vsock->rx_buf_max_nr = 0;
>  	atomic_set(&vsock->queued_replies, 0);
>  
> +	vsock->dgram_rx_buf_nr = 0;
> +	vsock->dgram_rx_buf_max_nr = 0;
> +	atomic_set(&vsock->dgram_queued_replies, 0);


Seems to be the only use of dgram_queued_replies. why do we bother then?

> +
>  	mutex_init(&vsock->tx_lock);
>  	mutex_init(&vsock->rx_lock);
> +	mutex_init(&vsock->dgram_tx_lock);
> +	mutex_init(&vsock->dgram_rx_lock);
>  	mutex_init(&vsock->event_lock);
>  	spin_lock_init(&vsock->send_pkt_list_lock);
>  	INIT_LIST_HEAD(&vsock->send_pkt_list);
> @@ -677,16 +868,27 @@ static int virtio_vsock_probe(struct virtio_device *vdev)
>  	INIT_WORK(&vsock->tx_work, virtio_transport_tx_work);
>  	INIT_WORK(&vsock->event_work, virtio_transport_event_work);
>  	INIT_WORK(&vsock->send_pkt_work, virtio_transport_send_pkt_work);
> +	INIT_WORK(&vsock->dgram_rx_work, virtio_transport_dgram_rx_work);
> +	INIT_WORK(&vsock->dgram_tx_work, virtio_transport_dgram_tx_work);
>  
>  	mutex_lock(&vsock->tx_lock);
>  	vsock->tx_run = true;
>  	mutex_unlock(&vsock->tx_lock);
>  
> +	mutex_lock(&vsock->dgram_tx_lock);
> +	vsock->dgram_tx_run = true;
> +	mutex_unlock(&vsock->dgram_tx_lock);
> +
>  	mutex_lock(&vsock->rx_lock);
> -	virtio_vsock_rx_fill(vsock);
> +	virtio_vsock_rx_fill(vsock, false);
>  	vsock->rx_run = true;
>  	mutex_unlock(&vsock->rx_lock);
>  
> +	mutex_lock(&vsock->dgram_rx_lock);
> +	virtio_vsock_rx_fill(vsock, true);
> +	vsock->dgram_rx_run = true;
> +	mutex_unlock(&vsock->dgram_rx_lock);
> +
>  	mutex_lock(&vsock->event_lock);
>  	virtio_vsock_event_fill(vsock);
>  	vsock->event_run = true;
> @@ -698,6 +900,9 @@ static int virtio_vsock_probe(struct virtio_device *vdev)
>  	vdev->priv = vsock;
>  	rcu_assign_pointer(the_virtio_vsock, vsock);
>  
> +	the_virtio_vsock_dgram = vsock;
> +	refcount_set(&the_virtio_vsock_dgram->active, 1);
> +
>  	mutex_unlock(&the_virtio_vsock_mutex);
>  
>  	return 0;
> @@ -729,14 +934,27 @@ static void virtio_vsock_remove(struct virtio_device *vdev)
>  	vsock->rx_run = false;
>  	mutex_unlock(&vsock->rx_lock);
>  
> +	mutex_lock(&vsock->dgram_rx_lock);
> +	vsock->dgram_rx_run = false;
> +	mutex_unlock(&vsock->dgram_rx_lock);
> +
>  	mutex_lock(&vsock->tx_lock);
>  	vsock->tx_run = false;
>  	mutex_unlock(&vsock->tx_lock);
>  
> +	mutex_lock(&vsock->dgram_tx_lock);
> +	vsock->dgram_tx_run = false;
> +	mutex_unlock(&vsock->dgram_tx_lock);
> +
>  	mutex_lock(&vsock->event_lock);
>  	vsock->event_run = false;
>  	mutex_unlock(&vsock->event_lock);
>  
> +	while (!refcount_dec_if_one(&the_virtio_vsock_dgram->active)) {
> +		if (signal_pending(current))
> +			break;
> +	}
> +
>  	/* Flush all device writes and interrupts, device will not use any
>  	 * more buffers.
>  	 */
> @@ -747,11 +965,21 @@ static void virtio_vsock_remove(struct virtio_device *vdev)
>  		virtio_transport_free_pkt(pkt);
>  	mutex_unlock(&vsock->rx_lock);
>  
> +	mutex_lock(&vsock->dgram_rx_lock);
> +	while ((pkt = virtqueue_detach_unused_buf(vsock->vqs[VSOCK_VQ_DGRAM_RX])))
> +		virtio_transport_free_pkt(pkt);
> +	mutex_unlock(&vsock->dgram_rx_lock);
> +
>  	mutex_lock(&vsock->tx_lock);
>  	while ((pkt = virtqueue_detach_unused_buf(vsock->vqs[VSOCK_VQ_TX])))
>  		virtio_transport_free_pkt(pkt);
>  	mutex_unlock(&vsock->tx_lock);
>  
> +	mutex_lock(&vsock->dgram_tx_lock);
> +	while ((pkt = virtqueue_detach_unused_buf(vsock->vqs[VSOCK_VQ_DGRAM_TX])))
> +		virtio_transport_free_pkt(pkt);
> +	mutex_unlock(&vsock->dgram_tx_lock);
> +
>  	spin_lock_bh(&vsock->send_pkt_list_lock);
>  	while (!list_empty(&vsock->send_pkt_list)) {
>  		pkt = list_first_entry(&vsock->send_pkt_list,
> @@ -769,6 +997,8 @@ static void virtio_vsock_remove(struct virtio_device *vdev)
>  	 */
>  	flush_work(&vsock->rx_work);
>  	flush_work(&vsock->tx_work);
> +	flush_work(&vsock->dgram_rx_work);
> +	flush_work(&vsock->dgram_tx_work);
>  	flush_work(&vsock->event_work);
>  	flush_work(&vsock->send_pkt_work);
>  
> @@ -806,7 +1036,7 @@ static int __init virtio_vsock_init(void)
>  		return -ENOMEM;
>  
>  	ret = vsock_core_register(&virtio_transport.transport,
> -				  VSOCK_TRANSPORT_F_G2H);
> +				  VSOCK_TRANSPORT_F_G2H | VSOCK_TRANSPORT_F_DGRAM);
>  	if (ret)
>  		goto out_wq;
>  
> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
> index 081e7ae93cb1..034de35fe7c8 100644
> --- a/net/vmw_vsock/virtio_transport_common.c
> +++ b/net/vmw_vsock/virtio_transport_common.c
> @@ -26,6 +26,8 @@
>  /* Threshold for detecting small packets to copy */
>  #define GOOD_COPY_LEN  128
>  
> +static s64 virtio_transport_dgram_has_data(struct vsock_sock *vsk);
> +
>  static const struct virtio_transport *
>  virtio_transport_get_ops(struct vsock_sock *vsk)
>  {
> @@ -210,21 +212,28 @@ static int virtio_transport_send_pkt_info(struct vsock_sock *vsk,
>  	vvs = vsk->trans;
>  
>  	/* we can send less than pkt_len bytes */
> -	if (pkt_len > VIRTIO_VSOCK_MAX_PKT_BUF_SIZE)
> -		pkt_len = VIRTIO_VSOCK_MAX_PKT_BUF_SIZE;
> +	if (pkt_len > VIRTIO_VSOCK_MAX_PKT_BUF_SIZE) {
> +		if (info->type == VIRTIO_VSOCK_TYPE_STREAM)
> +			pkt_len = VIRTIO_VSOCK_MAX_PKT_BUF_SIZE;
> +		else
> +			return 0;
> +	}
>  
> -	/* virtio_transport_get_credit might return less than pkt_len credit */
> -	pkt_len = virtio_transport_get_credit(vvs, pkt_len);
> +	if (info->type == VIRTIO_VSOCK_TYPE_STREAM) {
> +		/* virtio_transport_get_credit might return less than pkt_len credit */
> +		pkt_len = virtio_transport_get_credit(vvs, pkt_len);
>  
> -	/* Do not send zero length OP_RW pkt */
> -	if (pkt_len == 0 && info->op == VIRTIO_VSOCK_OP_RW)
> -		return pkt_len;
> +		/* Do not send zero length OP_RW pkt */
> +		if (pkt_len == 0 && info->op == VIRTIO_VSOCK_OP_RW)
> +			return pkt_len;
> +	}
>  
>  	pkt = virtio_transport_alloc_pkt(info, pkt_len,
>  					 src_cid, src_port,
>  					 dst_cid, dst_port);
>  	if (!pkt) {
> -		virtio_transport_put_credit(vvs, pkt_len);
> +		if (info->type == VIRTIO_VSOCK_TYPE_STREAM)
> +			virtio_transport_put_credit(vvs, pkt_len);
>  		return -ENOMEM;
>  	}
>  
> @@ -474,6 +483,55 @@ static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
>  	return dequeued_len;
>  }
>  
> +static ssize_t
> +virtio_transport_dgram_do_dequeue(struct vsock_sock *vsk,
> +				  struct msghdr *msg, size_t len)
> +{
> +	struct virtio_vsock_sock *vvs = vsk->trans;
> +	struct virtio_vsock_pkt *pkt;
> +	size_t total = 0;
> +	int err = -EFAULT;
> +
> +	spin_lock_bh(&vvs->rx_lock);
> +	if (total < len && !list_empty(&vvs->rx_queue)) {
> +		pkt = list_first_entry(&vvs->rx_queue,
> +				       struct virtio_vsock_pkt, list);
> +
> +		total = len;
> +		if (total > pkt->len - pkt->off)
> +			total = pkt->len - pkt->off;
> +		else if (total < pkt->len - pkt->off)
> +			msg->msg_flags |= MSG_TRUNC;
> +
> +		/* sk_lock is held by caller so no one else can dequeue.
> +		 * Unlock rx_lock since memcpy_to_msg() may sleep.
> +		 */
> +		spin_unlock_bh(&vvs->rx_lock);
> +
> +		err = memcpy_to_msg(msg, pkt->buf + pkt->off, total);
> +		if (err)
> +			return err;
> +
> +		spin_lock_bh(&vvs->rx_lock);
> +
> +		virtio_transport_dec_rx_pkt(vvs, pkt);
> +		list_del(&pkt->list);
> +		virtio_transport_free_pkt(pkt);
> +	}
> +
> +	spin_unlock_bh(&vvs->rx_lock);
> +
> +	if (total > 0 && msg->msg_name) {
> +		/* Provide the address of the sender. */
> +		DECLARE_SOCKADDR(struct sockaddr_vm *, vm_addr, msg->msg_name);
> +
> +		vsock_addr_init(vm_addr, le64_to_cpu(pkt->hdr.src_cid),
> +				le32_to_cpu(pkt->hdr.src_port));
> +		msg->msg_namelen = sizeof(*vm_addr);
> +	}
> +	return total;
> +}
> +
>  ssize_t
>  virtio_transport_stream_dequeue(struct vsock_sock *vsk,
>  				struct msghdr *msg,
> @@ -523,7 +581,66 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
>  			       struct msghdr *msg,
>  			       size_t len, int flags)
>  {
> -	return -EOPNOTSUPP;
> +	struct sock *sk;
> +	size_t err = 0;
> +	long timeout;
> +
> +	DEFINE_WAIT(wait);
> +
> +	sk = &vsk->sk;
> +	err = 0;
> +
> +	lock_sock(sk);
> +
> +	if (flags & MSG_OOB || flags & MSG_ERRQUEUE || flags & MSG_PEEK)
> +		return -EOPNOTSUPP;
> +
> +	if (!len)
> +		goto out;
> +
> +	timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
> +
> +	while (1) {
> +		s64 ready;
> +
> +		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
> +		ready = virtio_transport_dgram_has_data(vsk);
> +
> +		if (ready == 0) {
> +			if (timeout == 0) {
> +				err = -EAGAIN;
> +				finish_wait(sk_sleep(sk), &wait);
> +				break;
> +			}
> +
> +			release_sock(sk);
> +			timeout = schedule_timeout(timeout);
> +			lock_sock(sk);
> +
> +			if (signal_pending(current)) {
> +				err = sock_intr_errno(timeout);
> +				finish_wait(sk_sleep(sk), &wait);
> +				break;
> +			} else if (timeout == 0) {
> +				err = -EAGAIN;
> +				finish_wait(sk_sleep(sk), &wait);
> +				break;
> +			}
> +		} else {
> +			finish_wait(sk_sleep(sk), &wait);
> +
> +			if (ready < 0) {
> +				err = -ENOMEM;
> +				goto out;
> +			}
> +
> +			err = virtio_transport_dgram_do_dequeue(vsk, msg, len);
> +			break;
> +		}
> +	}
> +out:
> +	release_sock(sk);
> +	return err;
>  }
>  EXPORT_SYMBOL_GPL(virtio_transport_dgram_dequeue);
>  
> @@ -553,6 +670,11 @@ u32 virtio_transport_seqpacket_has_data(struct vsock_sock *vsk)
>  }
>  EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_has_data);
>  
> +static s64 virtio_transport_dgram_has_data(struct vsock_sock *vsk)
> +{
> +	return virtio_transport_stream_has_data(vsk);
> +}
> +
>  static s64 virtio_transport_has_space(struct vsock_sock *vsk)
>  {
>  	struct virtio_vsock_sock *vvs = vsk->trans;
> @@ -731,13 +853,15 @@ EXPORT_SYMBOL_GPL(virtio_transport_stream_allow);
>  int virtio_transport_dgram_bind(struct vsock_sock *vsk,
>  				struct sockaddr_vm *addr)
>  {
> -	return -EOPNOTSUPP;
> +	//use same stream bind for dgram
> +	int ret = vsock_bind_stream(vsk, addr);
> +	return ret;
>  }
>  EXPORT_SYMBOL_GPL(virtio_transport_dgram_bind);
>  
>  bool virtio_transport_dgram_allow(u32 cid, u32 port)
>  {
> -	return false;
> +	return true;
>  }
>  EXPORT_SYMBOL_GPL(virtio_transport_dgram_allow);
>  
> @@ -773,7 +897,17 @@ virtio_transport_dgram_enqueue(struct vsock_sock *vsk,
>  			       struct msghdr *msg,
>  			       size_t dgram_len)
>  {
> -	return -EOPNOTSUPP;
> +	struct virtio_vsock_pkt_info info = {
> +		.op = VIRTIO_VSOCK_OP_RW,
> +		.type = VIRTIO_VSOCK_TYPE_DGRAM,
> +		.msg = msg,
> +		.pkt_len = dgram_len,
> +		.vsk = vsk,
> +		.remote_cid = remote_addr->svm_cid,
> +		.remote_port = remote_addr->svm_port,
> +	};
> +
> +	return virtio_transport_send_pkt_info(vsk, &info);
>  }
>  EXPORT_SYMBOL_GPL(virtio_transport_dgram_enqueue);
>  
> @@ -846,7 +980,6 @@ static int virtio_transport_reset_no_sock(const struct virtio_transport *t,
>  		virtio_transport_free_pkt(reply);
>  		return -ENOTCONN;
>  	}
> -
>  	return t->send_pkt(reply);
>  }
>  
> @@ -1049,7 +1182,8 @@ virtio_transport_recv_enqueue(struct vsock_sock *vsk,
>  		 * of a new record.
>  		 */
>  		if ((pkt->len <= last_pkt->buf_len - last_pkt->len) &&
> -		    !(le32_to_cpu(last_pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR)) {
> +		    !(le32_to_cpu(last_pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR) &&
> +		    (le32_to_cpu(pkt->hdr.type) != VIRTIO_VSOCK_TYPE_DGRAM)) {
>  			memcpy(last_pkt->buf + last_pkt->len, pkt->buf,
>  			       pkt->len);
>  			last_pkt->len += pkt->len;
> @@ -1074,6 +1208,12 @@ virtio_transport_recv_connected(struct sock *sk,
>  	struct vsock_sock *vsk = vsock_sk(sk);
>  	int err = 0;
>  
> +	if (le16_to_cpu(pkt->hdr.type) == VIRTIO_VSOCK_TYPE_DGRAM) {
> +		virtio_transport_recv_enqueue(vsk, pkt);
> +		sk->sk_data_ready(sk);
> +		return err;
> +	}
> +
>  	switch (le16_to_cpu(pkt->hdr.op)) {
>  	case VIRTIO_VSOCK_OP_RW:
>  		virtio_transport_recv_enqueue(vsk, pkt);
> @@ -1226,7 +1366,8 @@ virtio_transport_recv_listen(struct sock *sk, struct virtio_vsock_pkt *pkt,
>  static bool virtio_transport_valid_type(u16 type)
>  {
>  	return (type == VIRTIO_VSOCK_TYPE_STREAM) ||
> -	       (type == VIRTIO_VSOCK_TYPE_SEQPACKET);
> +	       (type == VIRTIO_VSOCK_TYPE_SEQPACKET) ||
> +	       (type == VIRTIO_VSOCK_TYPE_DGRAM);
>  }
>  
>  /* We are under the virtio-vsock's vsock->rx_lock or vhost-vsock's vq->mutex
> @@ -1289,11 +1430,16 @@ void virtio_transport_recv_pkt(struct virtio_transport *t,
>  		goto free_pkt;
>  	}
>  
> -	space_available = virtio_transport_space_update(sk, pkt);
> -
>  	/* Update CID in case it has changed after a transport reset event */
>  	vsk->local_addr.svm_cid = dst.svm_cid;
>  
> +	if (sk->sk_type == SOCK_DGRAM) {
> +		virtio_transport_recv_connected(sk, pkt);
> +		goto out;
> +	}
> +
> +	space_available = virtio_transport_space_update(sk, pkt);
> +
>  	if (space_available)
>  		sk->sk_write_space(sk);
>  
> @@ -1319,6 +1465,7 @@ void virtio_transport_recv_pkt(struct virtio_transport *t,
>  		break;
>  	}
>  
> +out:
>  	release_sock(sk);
>  
>  	/* Release refcnt obtained when we fetched this socket out of the
> -- 
> 2.20.1


^ permalink raw reply	[flat|nested] 8+ messages in thread

end of thread, other threads:[~2021-09-14  6:56 UTC | newest]

Thread overview: 8+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-09-14  5:54 [RFC v2 0/5] virtio/vsock: introduce SOCK_DGRAM support Jiang Wang
2021-09-14  5:54 ` [RFC v2 1/5] virtio/vsock: add VIRTIO_VSOCK_F_DGRAM feature bit Jiang Wang
2021-09-14  5:54 ` [RFC v2 2/5] virtio/vsock: add support for virtio datagram Jiang Wang
2021-09-14  6:56   ` Michael S. Tsirkin
2021-09-14  5:54 ` [RFC v2 3/5] vhost/vsock: add support for vhost dgram Jiang Wang
2021-09-14  6:52   ` Michael S. Tsirkin
2021-09-14  5:54 ` [RFC v2 4/5] vsock_test: add tests for vsock dgram Jiang Wang
2021-09-14  5:54 ` [RFC v2 5/5] virtio/vsock: add sysfs for rx buf len for dgram Jiang Wang

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).