All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH 0/6] virtiofsd: Support for remote blocking posix locks
@ 2021-06-16 19:39 ` Ioannis Angelakopoulos
  0 siblings, 0 replies; 14+ messages in thread
From: Ioannis Angelakopoulos @ 2021-06-16 19:39 UTC (permalink / raw)
  To: qemu-devel, virtio-fs; +Cc: iangelak, stefanha, dgilbert, vgoyal

Hi,

Here is the implementation for the remote blocking posix locks in
virtiofsd.

This patch series also addresses the issues that were not fixed in:
[Virtio-fs] [PATCH v2 0/5] [RFC] virtiofsd, vhost-user-fs: Add support
for notification queue
https://patchwork.kernel.org/project/qemu-devel/cover/20191204190836.31324-1-vgoyal@redhat.com/

The main issue was that the guest could not reboot when a thread of the
virtiofsd process was blocked, waiting to acquire a lock. The
introduction of a custom threadpool dedicated to servicing only locking
requests addresses the aforementioned issue.

Description:

Through the addition of a notification queue it is possible for a
guest now to block waiting for a remote lock held by another guest.

When the lock is available virtiofsd sends a notification to the guest
that unblocks the process/thread waiting for the lock in order to
acquire it.

This implementation also handles cases where the guest that is
waiting for a lock, hard-reboots through SYSRQ
(echo "b" > /proc/sysrq-trigger). Specifically, a custom threadpool
(posix theads) in virtiofsd is used to service the locking requests.
If one of the threads is blocked waiting for a lock and the guest
hard-reboots then virtiofsd sends a signal to the blocked thread waking 
it up so that it exits and then virtiofsd cleans up its state and
re-initializes.

Finally, the custom threadpool enables virtiofsd to concurrently service
other requests that are non-blocking or block for a limited time while
the custom threadpool services locking requests that might block. This is
especially useful in the case where the GThreadPool is not utilized.

Thanks,
Ioannis

Ioannis Angelakopoulos (2):
  virtiofsd: Thread state cleanup when blocking posix locks are used
  virtiofsd: Custom threadpool for remote blocking posix locks requests

Vivek Goyal (4):
  virtiofsd: Release file locks using F_UNLCK
  virtiofsd: Create a notification queue
  virtiofsd: Specify size of notification buffer using config space
  virtiofsd: Implement blocking posix locks

 hw/virtio/vhost-user-fs.c                  |  57 +-
 include/hw/virtio/vhost-user-fs.h          |   4 +-
 include/standard-headers/linux/fuse.h      |   8 +
 include/standard-headers/linux/virtio_fs.h |   5 +
 tools/virtiofsd/fuse_i.h                   |   2 +
 tools/virtiofsd/fuse_lowlevel.c            |  40 +-
 tools/virtiofsd/fuse_lowlevel.h            |  26 +
 tools/virtiofsd/fuse_virtio.c              | 626 +++++++++++++++++++--
 tools/virtiofsd/passthrough_ll.c           |  90 ++-
 9 files changed, 804 insertions(+), 54 deletions(-)

-- 
2.27.0



^ permalink raw reply	[flat|nested] 14+ messages in thread

* [Virtio-fs] [PATCH 0/6] virtiofsd: Support for remote blocking posix locks
@ 2021-06-16 19:39 ` Ioannis Angelakopoulos
  0 siblings, 0 replies; 14+ messages in thread
From: Ioannis Angelakopoulos @ 2021-06-16 19:39 UTC (permalink / raw)
  To: qemu-devel, virtio-fs; +Cc: vgoyal

Hi,

Here is the implementation for the remote blocking posix locks in
virtiofsd.

This patch series also addresses the issues that were not fixed in:
[Virtio-fs] [PATCH v2 0/5] [RFC] virtiofsd, vhost-user-fs: Add support
for notification queue
https://patchwork.kernel.org/project/qemu-devel/cover/20191204190836.31324-1-vgoyal@redhat.com/

The main issue was that the guest could not reboot when a thread of the
virtiofsd process was blocked, waiting to acquire a lock. The
introduction of a custom threadpool dedicated to servicing only locking
requests addresses the aforementioned issue.

Description:

Through the addition of a notification queue it is possible for a
guest now to block waiting for a remote lock held by another guest.

When the lock is available virtiofsd sends a notification to the guest
that unblocks the process/thread waiting for the lock in order to
acquire it.

This implementation also handles cases where the guest that is
waiting for a lock, hard-reboots through SYSRQ
(echo "b" > /proc/sysrq-trigger). Specifically, a custom threadpool
(posix theads) in virtiofsd is used to service the locking requests.
If one of the threads is blocked waiting for a lock and the guest
hard-reboots then virtiofsd sends a signal to the blocked thread waking 
it up so that it exits and then virtiofsd cleans up its state and
re-initializes.

Finally, the custom threadpool enables virtiofsd to concurrently service
other requests that are non-blocking or block for a limited time while
the custom threadpool services locking requests that might block. This is
especially useful in the case where the GThreadPool is not utilized.

Thanks,
Ioannis

Ioannis Angelakopoulos (2):
  virtiofsd: Thread state cleanup when blocking posix locks are used
  virtiofsd: Custom threadpool for remote blocking posix locks requests

Vivek Goyal (4):
  virtiofsd: Release file locks using F_UNLCK
  virtiofsd: Create a notification queue
  virtiofsd: Specify size of notification buffer using config space
  virtiofsd: Implement blocking posix locks

 hw/virtio/vhost-user-fs.c                  |  57 +-
 include/hw/virtio/vhost-user-fs.h          |   4 +-
 include/standard-headers/linux/fuse.h      |   8 +
 include/standard-headers/linux/virtio_fs.h |   5 +
 tools/virtiofsd/fuse_i.h                   |   2 +
 tools/virtiofsd/fuse_lowlevel.c            |  40 +-
 tools/virtiofsd/fuse_lowlevel.h            |  26 +
 tools/virtiofsd/fuse_virtio.c              | 626 +++++++++++++++++++--
 tools/virtiofsd/passthrough_ll.c           |  90 ++-
 9 files changed, 804 insertions(+), 54 deletions(-)

-- 
2.27.0


^ permalink raw reply	[flat|nested] 14+ messages in thread

* [PATCH 1/6] virtiofsd: Release file locks using F_UNLCK
  2021-06-16 19:39 ` [Virtio-fs] " Ioannis Angelakopoulos
@ 2021-06-16 19:39   ` Ioannis Angelakopoulos
  -1 siblings, 0 replies; 14+ messages in thread
From: Ioannis Angelakopoulos @ 2021-06-16 19:39 UTC (permalink / raw)
  To: qemu-devel, virtio-fs; +Cc: iangelak, stefanha, dgilbert, vgoyal

From: Vivek Goyal <vgoyal@redhat.com>

We are emulating posix locks for guest using open file description locks
in virtiofsd. When any of the fd is closed in guest, we find associated
OFD lock fd (if there is one) and close it to release all the locks.

Assumption here is that there is no other thread using lo_inode_plock
structure or plock->fd, hence it is safe to do so.

But now we are about to introduce blocking variant of locks (SETLKW),
and that means we might be waiting to a lock to be available and
using plock->fd. And that means there are still users of plock
structure.

So release locks using fcntl(SETLK, F_UNLCK) instead and plock will
be freed later.

Signed-off-by: Vivek Goyal <vgoyal@redhat.com>
Signed-off-by: Ioannis Angelakopoulos <iangelak@redhat.com>
---
 tools/virtiofsd/passthrough_ll.c | 32 ++++++++++++++++++++++++++++----
 1 file changed, 28 insertions(+), 4 deletions(-)

diff --git a/tools/virtiofsd/passthrough_ll.c b/tools/virtiofsd/passthrough_ll.c
index 49c21fd855..f2fa9d95bb 100644
--- a/tools/virtiofsd/passthrough_ll.c
+++ b/tools/virtiofsd/passthrough_ll.c
@@ -968,6 +968,14 @@ static int do_statx(struct lo_data *lo, int dirfd, const char *pathname,
     return 0;
 }
 
+static void posix_locks_value_destroy(gpointer data)
+{
+    struct lo_inode_plock *plock = data;
+
+    close(plock->fd);
+    free(plock);
+}
+
 /*
  * Increments nlookup on the inode on success. unref_inode_lolocked() must be
  * called eventually to decrement nlookup again. If inodep is non-NULL, the
@@ -1473,9 +1481,6 @@ static void unref_inode(struct lo_data *lo, struct lo_inode *inode, uint64_t n)
         lo_map_remove(&lo->ino_map, inode->fuse_ino);
         g_hash_table_remove(lo->inodes, &inode->key);
         if (lo->posix_lock) {
-            if (g_hash_table_size(inode->posix_locks)) {
-                fuse_log(FUSE_LOG_WARNING, "Hash table is not empty\n");
-            }
             g_hash_table_destroy(inode->posix_locks);
             pthread_mutex_destroy(&inode->plock_mutex);
         }
@@ -1974,6 +1979,9 @@ static struct lo_inode_plock *lookup_create_plock_ctx(struct lo_data *lo,
     plock =
         g_hash_table_lookup(inode->posix_locks, GUINT_TO_POINTER(lock_owner));
 
+    fuse_log(FUSE_LOG_DEBUG, "lookup_create_plock_ctx():"
+             " Inserted element in posix_locks hash table"
+             " with value pointer %p\n", plock);
     if (plock) {
         return plock;
     }
@@ -2182,6 +2190,8 @@ static void lo_flush(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi)
     (void)ino;
     struct lo_inode *inode;
     struct lo_data *lo = lo_data(req);
+    struct lo_inode_plock *plock;
+    struct flock flock;
 
     inode = lo_inode(req, ino);
     if (!inode) {
@@ -2198,8 +2208,22 @@ static void lo_flush(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi)
     /* An fd is going away. Cleanup associated posix locks */
     if (lo->posix_lock) {
         pthread_mutex_lock(&inode->plock_mutex);
-        g_hash_table_remove(inode->posix_locks,
+        plock = g_hash_table_lookup(inode->posix_locks,
             GUINT_TO_POINTER(fi->lock_owner));
+
+        if (plock) {
+            /*
+             * An fd is being closed. For posix locks, this means
+             * drop all the associated locks.
+             */
+            memset(&flock, 0, sizeof(struct flock));
+            flock.l_type = F_UNLCK;
+            flock.l_whence = SEEK_SET;
+            /* Unlock whole file */
+            flock.l_start = flock.l_len = 0;
+            fcntl(plock->fd, F_OFD_SETLK, &flock);
+        }
+
         pthread_mutex_unlock(&inode->plock_mutex);
     }
     res = close(dup(lo_fi_fd(req, fi)));
-- 
2.27.0



^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [Virtio-fs] [PATCH 1/6] virtiofsd: Release file locks using F_UNLCK
@ 2021-06-16 19:39   ` Ioannis Angelakopoulos
  0 siblings, 0 replies; 14+ messages in thread
From: Ioannis Angelakopoulos @ 2021-06-16 19:39 UTC (permalink / raw)
  To: qemu-devel, virtio-fs; +Cc: vgoyal

From: Vivek Goyal <vgoyal@redhat.com>

We are emulating posix locks for guest using open file description locks
in virtiofsd. When any of the fd is closed in guest, we find associated
OFD lock fd (if there is one) and close it to release all the locks.

Assumption here is that there is no other thread using lo_inode_plock
structure or plock->fd, hence it is safe to do so.

But now we are about to introduce blocking variant of locks (SETLKW),
and that means we might be waiting to a lock to be available and
using plock->fd. And that means there are still users of plock
structure.

So release locks using fcntl(SETLK, F_UNLCK) instead and plock will
be freed later.

Signed-off-by: Vivek Goyal <vgoyal@redhat.com>
Signed-off-by: Ioannis Angelakopoulos <iangelak@redhat.com>
---
 tools/virtiofsd/passthrough_ll.c | 32 ++++++++++++++++++++++++++++----
 1 file changed, 28 insertions(+), 4 deletions(-)

diff --git a/tools/virtiofsd/passthrough_ll.c b/tools/virtiofsd/passthrough_ll.c
index 49c21fd855..f2fa9d95bb 100644
--- a/tools/virtiofsd/passthrough_ll.c
+++ b/tools/virtiofsd/passthrough_ll.c
@@ -968,6 +968,14 @@ static int do_statx(struct lo_data *lo, int dirfd, const char *pathname,
     return 0;
 }
 
+static void posix_locks_value_destroy(gpointer data)
+{
+    struct lo_inode_plock *plock = data;
+
+    close(plock->fd);
+    free(plock);
+}
+
 /*
  * Increments nlookup on the inode on success. unref_inode_lolocked() must be
  * called eventually to decrement nlookup again. If inodep is non-NULL, the
@@ -1473,9 +1481,6 @@ static void unref_inode(struct lo_data *lo, struct lo_inode *inode, uint64_t n)
         lo_map_remove(&lo->ino_map, inode->fuse_ino);
         g_hash_table_remove(lo->inodes, &inode->key);
         if (lo->posix_lock) {
-            if (g_hash_table_size(inode->posix_locks)) {
-                fuse_log(FUSE_LOG_WARNING, "Hash table is not empty\n");
-            }
             g_hash_table_destroy(inode->posix_locks);
             pthread_mutex_destroy(&inode->plock_mutex);
         }
@@ -1974,6 +1979,9 @@ static struct lo_inode_plock *lookup_create_plock_ctx(struct lo_data *lo,
     plock =
         g_hash_table_lookup(inode->posix_locks, GUINT_TO_POINTER(lock_owner));
 
+    fuse_log(FUSE_LOG_DEBUG, "lookup_create_plock_ctx():"
+             " Inserted element in posix_locks hash table"
+             " with value pointer %p\n", plock);
     if (plock) {
         return plock;
     }
@@ -2182,6 +2190,8 @@ static void lo_flush(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi)
     (void)ino;
     struct lo_inode *inode;
     struct lo_data *lo = lo_data(req);
+    struct lo_inode_plock *plock;
+    struct flock flock;
 
     inode = lo_inode(req, ino);
     if (!inode) {
@@ -2198,8 +2208,22 @@ static void lo_flush(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi)
     /* An fd is going away. Cleanup associated posix locks */
     if (lo->posix_lock) {
         pthread_mutex_lock(&inode->plock_mutex);
-        g_hash_table_remove(inode->posix_locks,
+        plock = g_hash_table_lookup(inode->posix_locks,
             GUINT_TO_POINTER(fi->lock_owner));
+
+        if (plock) {
+            /*
+             * An fd is being closed. For posix locks, this means
+             * drop all the associated locks.
+             */
+            memset(&flock, 0, sizeof(struct flock));
+            flock.l_type = F_UNLCK;
+            flock.l_whence = SEEK_SET;
+            /* Unlock whole file */
+            flock.l_start = flock.l_len = 0;
+            fcntl(plock->fd, F_OFD_SETLK, &flock);
+        }
+
         pthread_mutex_unlock(&inode->plock_mutex);
     }
     res = close(dup(lo_fi_fd(req, fi)));
-- 
2.27.0


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [PATCH 2/6] virtiofsd: Create a notification queue
  2021-06-16 19:39 ` [Virtio-fs] " Ioannis Angelakopoulos
@ 2021-06-16 19:39   ` Ioannis Angelakopoulos
  -1 siblings, 0 replies; 14+ messages in thread
From: Ioannis Angelakopoulos @ 2021-06-16 19:39 UTC (permalink / raw)
  To: qemu-devel, virtio-fs; +Cc: iangelak, stefanha, dgilbert, vgoyal

From: Vivek Goyal <vgoyal@redhat.com>

Add a notification queue which will be used to send async notifications
for file lock availability.

Signed-off-by: Vivek Goyal <vgoyal@redhat.com>
Signed-off-by: Ioannis Angelakopoulos <iangelak@redhat.com>
---
 hw/virtio/vhost-user-fs.c                  | 30 ++++++--
 include/hw/virtio/vhost-user-fs.h          |  2 +-
 include/standard-headers/linux/virtio_fs.h |  3 +
 tools/virtiofsd/fuse_i.h                   |  1 +
 tools/virtiofsd/fuse_virtio.c              | 79 +++++++++++++++-------
 5 files changed, 85 insertions(+), 30 deletions(-)

diff --git a/hw/virtio/vhost-user-fs.c b/hw/virtio/vhost-user-fs.c
index 6f7f91533d..c7fd5f3123 100644
--- a/hw/virtio/vhost-user-fs.c
+++ b/hw/virtio/vhost-user-fs.c
@@ -31,6 +31,7 @@ static const int user_feature_bits[] = {
     VIRTIO_F_NOTIFY_ON_EMPTY,
     VIRTIO_F_RING_PACKED,
     VIRTIO_F_IOMMU_PLATFORM,
+    VIRTIO_FS_F_NOTIFICATION,
 
     VHOST_INVALID_FEATURE_BIT
 };
@@ -145,9 +146,20 @@ static uint64_t vuf_get_features(VirtIODevice *vdev,
 {
     VHostUserFS *fs = VHOST_USER_FS(vdev);
 
+    virtio_add_feature(&features, VIRTIO_FS_F_NOTIFICATION);
+
     return vhost_get_features(&fs->vhost_dev, user_feature_bits, features);
 }
 
+static void vuf_set_features(VirtIODevice *vdev, uint64_t features)
+{
+    VHostUserFS *fs = VHOST_USER_FS(vdev);
+
+    if (virtio_has_feature(features, VIRTIO_FS_F_NOTIFICATION)) {
+        fs->notify_enabled = true;
+    }
+}
+
 static void vuf_handle_output(VirtIODevice *vdev, VirtQueue *vq)
 {
     /*
@@ -223,16 +235,25 @@ static void vuf_device_realize(DeviceState *dev, Error **errp)
                 sizeof(struct virtio_fs_config));
 
     /* Hiprio queue */
-    fs->hiprio_vq = virtio_add_queue(vdev, fs->conf.queue_size, vuf_handle_output);
+    fs->hiprio_vq = virtio_add_queue(vdev, fs->conf.queue_size,
+                                     vuf_handle_output);
+
+    /*
+     * Notification queue. Feature negotiation happens later. So at this
+     * point of time we don't know if driver will use notification queue
+     * or not.
+     */
+    virtio_add_queue(vdev, fs->conf.queue_size, vuf_handle_output);
 
     /* Request queues */
     fs->req_vqs = g_new(VirtQueue *, fs->conf.num_request_queues);
     for (i = 0; i < fs->conf.num_request_queues; i++) {
-        fs->req_vqs[i] = virtio_add_queue(vdev, fs->conf.queue_size, vuf_handle_output);
+        fs->req_vqs[i] = virtio_add_queue(vdev, fs->conf.queue_size,
+                                          vuf_handle_output);
     }
 
-    /* 1 high prio queue, plus the number configured */
-    fs->vhost_dev.nvqs = 1 + fs->conf.num_request_queues;
+    /* 1 high prio queue, 1 notification queue plus the number configured */
+    fs->vhost_dev.nvqs = 2 + fs->conf.num_request_queues;
     fs->vhost_dev.vqs = g_new0(struct vhost_virtqueue, fs->vhost_dev.nvqs);
     ret = vhost_dev_init(&fs->vhost_dev, &fs->vhost_user,
                          VHOST_BACKEND_TYPE_USER, 0);
@@ -311,6 +332,7 @@ static void vuf_class_init(ObjectClass *klass, void *data)
     vdc->realize = vuf_device_realize;
     vdc->unrealize = vuf_device_unrealize;
     vdc->get_features = vuf_get_features;
+    vdc->set_features = vuf_set_features;
     vdc->get_config = vuf_get_config;
     vdc->set_status = vuf_set_status;
     vdc->guest_notifier_mask = vuf_guest_notifier_mask;
diff --git a/include/hw/virtio/vhost-user-fs.h b/include/hw/virtio/vhost-user-fs.h
index 0d62834c25..13e2cbc48e 100644
--- a/include/hw/virtio/vhost-user-fs.h
+++ b/include/hw/virtio/vhost-user-fs.h
@@ -40,7 +40,7 @@ struct VHostUserFS {
     VirtQueue **req_vqs;
     VirtQueue *hiprio_vq;
     int32_t bootindex;
-
+    bool notify_enabled;
     /*< public >*/
 };
 
diff --git a/include/standard-headers/linux/virtio_fs.h b/include/standard-headers/linux/virtio_fs.h
index a32fe8a64c..6383d723a3 100644
--- a/include/standard-headers/linux/virtio_fs.h
+++ b/include/standard-headers/linux/virtio_fs.h
@@ -8,6 +8,9 @@
 #include "standard-headers/linux/virtio_config.h"
 #include "standard-headers/linux/virtio_types.h"
 
+/* Feature bits */
+#define VIRTIO_FS_F_NOTIFICATION 0   /* Notification queue supported */
+
 struct virtio_fs_config {
 	/* Filesystem name (UTF-8, not NUL-terminated, padded with NULs) */
 	uint8_t tag[36];
diff --git a/tools/virtiofsd/fuse_i.h b/tools/virtiofsd/fuse_i.h
index 492e002181..4942d080da 100644
--- a/tools/virtiofsd/fuse_i.h
+++ b/tools/virtiofsd/fuse_i.h
@@ -73,6 +73,7 @@ struct fuse_session {
     int   vu_socketfd;
     struct fv_VuDev *virtio_dev;
     int thread_pool_size;
+    bool notify_enabled;
 };
 
 struct fuse_chan {
diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c
index fa4aff9b0e..3ff4cc1430 100644
--- a/tools/virtiofsd/fuse_virtio.c
+++ b/tools/virtiofsd/fuse_virtio.c
@@ -14,6 +14,7 @@
 #include "qemu/osdep.h"
 #include "qemu/iov.h"
 #include "qapi/error.h"
+#include "standard-headers/linux/virtio_fs.h"
 #include "fuse_i.h"
 #include "standard-headers/linux/fuse.h"
 #include "fuse_misc.h"
@@ -80,23 +81,31 @@ struct fv_VuDev {
      */
     size_t nqueues;
     struct fv_QueueInfo **qi;
-};
-
-/* From spec */
-struct virtio_fs_config {
-    char tag[36];
-    uint32_t num_queues;
+    /* True if notification queue is being used */
+    bool notify_enabled;
 };
 
 /* Callback from libvhost-user */
 static uint64_t fv_get_features(VuDev *dev)
 {
-    return 1ULL << VIRTIO_F_VERSION_1;
+    uint64_t features;
+
+    features = 1ull << VIRTIO_F_VERSION_1 |
+               1ull << VIRTIO_FS_F_NOTIFICATION;
+
+    return features;
 }
 
 /* Callback from libvhost-user */
 static void fv_set_features(VuDev *dev, uint64_t features)
 {
+    struct fv_VuDev *vud = container_of(dev, struct fv_VuDev, dev);
+    struct fuse_session *se = vud->se;
+
+    if ((1ull << VIRTIO_FS_F_NOTIFICATION) & features) {
+        vud->notify_enabled = true;
+        se->notify_enabled = true;
+    }
 }
 
 /*
@@ -736,19 +745,20 @@ static void fv_queue_cleanup_thread(struct fv_VuDev *vud, int qidx)
 
     assert(qidx < vud->nqueues);
     ourqi = vud->qi[qidx];
-
-    /* Kill the thread */
-    if (eventfd_write(ourqi->kill_fd, 1)) {
-        fuse_log(FUSE_LOG_ERR, "Eventfd_write for queue %d: %s\n",
-                 qidx, strerror(errno));
-    }
-    ret = pthread_join(ourqi->thread, NULL);
-    if (ret) {
-        fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err %d\n",
-                 __func__, qidx, ret);
+    /* qidx == 1 is the notification queue  */
+    if (qidx != 1) {
+        /* Kill the thread */
+        if (eventfd_write(ourqi->kill_fd, 1)) {
+            fuse_log(FUSE_LOG_ERR, "Eventfd_read for queue: %m\n");
+        }
+        ret = pthread_join(ourqi->thread, NULL);
+        if (ret) {
+            fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err"
+                     " %d\n", __func__, qidx, ret);
+        }
+        close(ourqi->kill_fd);
     }
     pthread_mutex_destroy(&ourqi->vq_lock);
-    close(ourqi->kill_fd);
     ourqi->kick_fd = -1;
     g_free(vud->qi[qidx]);
     vud->qi[qidx] = NULL;
@@ -759,6 +769,9 @@ static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
 {
     struct fv_VuDev *vud = container_of(dev, struct fv_VuDev, dev);
     struct fv_QueueInfo *ourqi;
+    void * (*thread_func) (void *) = fv_queue_thread;
+    int valid_queues = 2; /* One hiprio queue and one request queue */
+    bool notification_q = false;
 
     fuse_log(FUSE_LOG_INFO, "%s: qidx=%d started=%d\n", __func__, qidx,
              started);
@@ -770,10 +783,19 @@ static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
      * well-behaved client in mind and may not protect against all types of
      * races yet.
      */
-    if (qidx > 1) {
-        fuse_log(FUSE_LOG_ERR,
-                 "%s: multiple request queues not yet implemented, please only "
-                 "configure 1 request queue\n",
+    if (vud->notify_enabled) {
+        valid_queues++;
+        /*
+         * If notification queue is enabled, then qidx 1 is notificaiton queue.
+         */
+        if (qidx == 1) {
+            notification_q = true;
+        }
+    }
+
+    if (qidx >= valid_queues) {
+        fuse_log(FUSE_LOG_ERR, "%s: multiple request queues not yet"
+                 "implemented, please only configure 1 request queue\n",
                  __func__);
         exit(EXIT_FAILURE);
     }
@@ -795,13 +817,20 @@ static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
             assert(vud->qi[qidx]->kick_fd == -1);
         }
         ourqi = vud->qi[qidx];
+        pthread_mutex_init(&ourqi->vq_lock, NULL);
+        /*
+         * For notification queue, we don't have to start a thread yet.
+         */
+        if (notification_q) {
+            return;
+        }
+
         ourqi->kick_fd = dev->vq[qidx].kick_fd;
 
         ourqi->kill_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
         assert(ourqi->kill_fd != -1);
-        pthread_mutex_init(&ourqi->vq_lock, NULL);
 
-        if (pthread_create(&ourqi->thread, NULL, fv_queue_thread, ourqi)) {
+        if (pthread_create(&ourqi->thread, NULL, thread_func, ourqi)) {
             fuse_log(FUSE_LOG_ERR, "%s: Failed to create thread for queue %d\n",
                      __func__, qidx);
             assert(0);
@@ -1058,7 +1087,7 @@ int virtio_session_mount(struct fuse_session *se)
     se->vu_socketfd = data_sock;
     se->virtio_dev->se = se;
     pthread_rwlock_init(&se->virtio_dev->vu_dispatch_rwlock, NULL);
-    if (!vu_init(&se->virtio_dev->dev, 2, se->vu_socketfd, fv_panic, NULL,
+    if (!vu_init(&se->virtio_dev->dev, 3, se->vu_socketfd, fv_panic, NULL,
                  fv_set_watch, fv_remove_watch, &fv_iface)) {
         fuse_log(FUSE_LOG_ERR, "%s: vu_init failed\n", __func__);
         return -1;
-- 
2.27.0



^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [Virtio-fs] [PATCH 2/6] virtiofsd: Create a notification queue
@ 2021-06-16 19:39   ` Ioannis Angelakopoulos
  0 siblings, 0 replies; 14+ messages in thread
From: Ioannis Angelakopoulos @ 2021-06-16 19:39 UTC (permalink / raw)
  To: qemu-devel, virtio-fs; +Cc: vgoyal

From: Vivek Goyal <vgoyal@redhat.com>

Add a notification queue which will be used to send async notifications
for file lock availability.

Signed-off-by: Vivek Goyal <vgoyal@redhat.com>
Signed-off-by: Ioannis Angelakopoulos <iangelak@redhat.com>
---
 hw/virtio/vhost-user-fs.c                  | 30 ++++++--
 include/hw/virtio/vhost-user-fs.h          |  2 +-
 include/standard-headers/linux/virtio_fs.h |  3 +
 tools/virtiofsd/fuse_i.h                   |  1 +
 tools/virtiofsd/fuse_virtio.c              | 79 +++++++++++++++-------
 5 files changed, 85 insertions(+), 30 deletions(-)

diff --git a/hw/virtio/vhost-user-fs.c b/hw/virtio/vhost-user-fs.c
index 6f7f91533d..c7fd5f3123 100644
--- a/hw/virtio/vhost-user-fs.c
+++ b/hw/virtio/vhost-user-fs.c
@@ -31,6 +31,7 @@ static const int user_feature_bits[] = {
     VIRTIO_F_NOTIFY_ON_EMPTY,
     VIRTIO_F_RING_PACKED,
     VIRTIO_F_IOMMU_PLATFORM,
+    VIRTIO_FS_F_NOTIFICATION,
 
     VHOST_INVALID_FEATURE_BIT
 };
@@ -145,9 +146,20 @@ static uint64_t vuf_get_features(VirtIODevice *vdev,
 {
     VHostUserFS *fs = VHOST_USER_FS(vdev);
 
+    virtio_add_feature(&features, VIRTIO_FS_F_NOTIFICATION);
+
     return vhost_get_features(&fs->vhost_dev, user_feature_bits, features);
 }
 
+static void vuf_set_features(VirtIODevice *vdev, uint64_t features)
+{
+    VHostUserFS *fs = VHOST_USER_FS(vdev);
+
+    if (virtio_has_feature(features, VIRTIO_FS_F_NOTIFICATION)) {
+        fs->notify_enabled = true;
+    }
+}
+
 static void vuf_handle_output(VirtIODevice *vdev, VirtQueue *vq)
 {
     /*
@@ -223,16 +235,25 @@ static void vuf_device_realize(DeviceState *dev, Error **errp)
                 sizeof(struct virtio_fs_config));
 
     /* Hiprio queue */
-    fs->hiprio_vq = virtio_add_queue(vdev, fs->conf.queue_size, vuf_handle_output);
+    fs->hiprio_vq = virtio_add_queue(vdev, fs->conf.queue_size,
+                                     vuf_handle_output);
+
+    /*
+     * Notification queue. Feature negotiation happens later. So at this
+     * point of time we don't know if driver will use notification queue
+     * or not.
+     */
+    virtio_add_queue(vdev, fs->conf.queue_size, vuf_handle_output);
 
     /* Request queues */
     fs->req_vqs = g_new(VirtQueue *, fs->conf.num_request_queues);
     for (i = 0; i < fs->conf.num_request_queues; i++) {
-        fs->req_vqs[i] = virtio_add_queue(vdev, fs->conf.queue_size, vuf_handle_output);
+        fs->req_vqs[i] = virtio_add_queue(vdev, fs->conf.queue_size,
+                                          vuf_handle_output);
     }
 
-    /* 1 high prio queue, plus the number configured */
-    fs->vhost_dev.nvqs = 1 + fs->conf.num_request_queues;
+    /* 1 high prio queue, 1 notification queue plus the number configured */
+    fs->vhost_dev.nvqs = 2 + fs->conf.num_request_queues;
     fs->vhost_dev.vqs = g_new0(struct vhost_virtqueue, fs->vhost_dev.nvqs);
     ret = vhost_dev_init(&fs->vhost_dev, &fs->vhost_user,
                          VHOST_BACKEND_TYPE_USER, 0);
@@ -311,6 +332,7 @@ static void vuf_class_init(ObjectClass *klass, void *data)
     vdc->realize = vuf_device_realize;
     vdc->unrealize = vuf_device_unrealize;
     vdc->get_features = vuf_get_features;
+    vdc->set_features = vuf_set_features;
     vdc->get_config = vuf_get_config;
     vdc->set_status = vuf_set_status;
     vdc->guest_notifier_mask = vuf_guest_notifier_mask;
diff --git a/include/hw/virtio/vhost-user-fs.h b/include/hw/virtio/vhost-user-fs.h
index 0d62834c25..13e2cbc48e 100644
--- a/include/hw/virtio/vhost-user-fs.h
+++ b/include/hw/virtio/vhost-user-fs.h
@@ -40,7 +40,7 @@ struct VHostUserFS {
     VirtQueue **req_vqs;
     VirtQueue *hiprio_vq;
     int32_t bootindex;
-
+    bool notify_enabled;
     /*< public >*/
 };
 
diff --git a/include/standard-headers/linux/virtio_fs.h b/include/standard-headers/linux/virtio_fs.h
index a32fe8a64c..6383d723a3 100644
--- a/include/standard-headers/linux/virtio_fs.h
+++ b/include/standard-headers/linux/virtio_fs.h
@@ -8,6 +8,9 @@
 #include "standard-headers/linux/virtio_config.h"
 #include "standard-headers/linux/virtio_types.h"
 
+/* Feature bits */
+#define VIRTIO_FS_F_NOTIFICATION 0   /* Notification queue supported */
+
 struct virtio_fs_config {
 	/* Filesystem name (UTF-8, not NUL-terminated, padded with NULs) */
 	uint8_t tag[36];
diff --git a/tools/virtiofsd/fuse_i.h b/tools/virtiofsd/fuse_i.h
index 492e002181..4942d080da 100644
--- a/tools/virtiofsd/fuse_i.h
+++ b/tools/virtiofsd/fuse_i.h
@@ -73,6 +73,7 @@ struct fuse_session {
     int   vu_socketfd;
     struct fv_VuDev *virtio_dev;
     int thread_pool_size;
+    bool notify_enabled;
 };
 
 struct fuse_chan {
diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c
index fa4aff9b0e..3ff4cc1430 100644
--- a/tools/virtiofsd/fuse_virtio.c
+++ b/tools/virtiofsd/fuse_virtio.c
@@ -14,6 +14,7 @@
 #include "qemu/osdep.h"
 #include "qemu/iov.h"
 #include "qapi/error.h"
+#include "standard-headers/linux/virtio_fs.h"
 #include "fuse_i.h"
 #include "standard-headers/linux/fuse.h"
 #include "fuse_misc.h"
@@ -80,23 +81,31 @@ struct fv_VuDev {
      */
     size_t nqueues;
     struct fv_QueueInfo **qi;
-};
-
-/* From spec */
-struct virtio_fs_config {
-    char tag[36];
-    uint32_t num_queues;
+    /* True if notification queue is being used */
+    bool notify_enabled;
 };
 
 /* Callback from libvhost-user */
 static uint64_t fv_get_features(VuDev *dev)
 {
-    return 1ULL << VIRTIO_F_VERSION_1;
+    uint64_t features;
+
+    features = 1ull << VIRTIO_F_VERSION_1 |
+               1ull << VIRTIO_FS_F_NOTIFICATION;
+
+    return features;
 }
 
 /* Callback from libvhost-user */
 static void fv_set_features(VuDev *dev, uint64_t features)
 {
+    struct fv_VuDev *vud = container_of(dev, struct fv_VuDev, dev);
+    struct fuse_session *se = vud->se;
+
+    if ((1ull << VIRTIO_FS_F_NOTIFICATION) & features) {
+        vud->notify_enabled = true;
+        se->notify_enabled = true;
+    }
 }
 
 /*
@@ -736,19 +745,20 @@ static void fv_queue_cleanup_thread(struct fv_VuDev *vud, int qidx)
 
     assert(qidx < vud->nqueues);
     ourqi = vud->qi[qidx];
-
-    /* Kill the thread */
-    if (eventfd_write(ourqi->kill_fd, 1)) {
-        fuse_log(FUSE_LOG_ERR, "Eventfd_write for queue %d: %s\n",
-                 qidx, strerror(errno));
-    }
-    ret = pthread_join(ourqi->thread, NULL);
-    if (ret) {
-        fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err %d\n",
-                 __func__, qidx, ret);
+    /* qidx == 1 is the notification queue  */
+    if (qidx != 1) {
+        /* Kill the thread */
+        if (eventfd_write(ourqi->kill_fd, 1)) {
+            fuse_log(FUSE_LOG_ERR, "Eventfd_read for queue: %m\n");
+        }
+        ret = pthread_join(ourqi->thread, NULL);
+        if (ret) {
+            fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err"
+                     " %d\n", __func__, qidx, ret);
+        }
+        close(ourqi->kill_fd);
     }
     pthread_mutex_destroy(&ourqi->vq_lock);
-    close(ourqi->kill_fd);
     ourqi->kick_fd = -1;
     g_free(vud->qi[qidx]);
     vud->qi[qidx] = NULL;
@@ -759,6 +769,9 @@ static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
 {
     struct fv_VuDev *vud = container_of(dev, struct fv_VuDev, dev);
     struct fv_QueueInfo *ourqi;
+    void * (*thread_func) (void *) = fv_queue_thread;
+    int valid_queues = 2; /* One hiprio queue and one request queue */
+    bool notification_q = false;
 
     fuse_log(FUSE_LOG_INFO, "%s: qidx=%d started=%d\n", __func__, qidx,
              started);
@@ -770,10 +783,19 @@ static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
      * well-behaved client in mind and may not protect against all types of
      * races yet.
      */
-    if (qidx > 1) {
-        fuse_log(FUSE_LOG_ERR,
-                 "%s: multiple request queues not yet implemented, please only "
-                 "configure 1 request queue\n",
+    if (vud->notify_enabled) {
+        valid_queues++;
+        /*
+         * If notification queue is enabled, then qidx 1 is notificaiton queue.
+         */
+        if (qidx == 1) {
+            notification_q = true;
+        }
+    }
+
+    if (qidx >= valid_queues) {
+        fuse_log(FUSE_LOG_ERR, "%s: multiple request queues not yet"
+                 "implemented, please only configure 1 request queue\n",
                  __func__);
         exit(EXIT_FAILURE);
     }
@@ -795,13 +817,20 @@ static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
             assert(vud->qi[qidx]->kick_fd == -1);
         }
         ourqi = vud->qi[qidx];
+        pthread_mutex_init(&ourqi->vq_lock, NULL);
+        /*
+         * For notification queue, we don't have to start a thread yet.
+         */
+        if (notification_q) {
+            return;
+        }
+
         ourqi->kick_fd = dev->vq[qidx].kick_fd;
 
         ourqi->kill_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
         assert(ourqi->kill_fd != -1);
-        pthread_mutex_init(&ourqi->vq_lock, NULL);
 
-        if (pthread_create(&ourqi->thread, NULL, fv_queue_thread, ourqi)) {
+        if (pthread_create(&ourqi->thread, NULL, thread_func, ourqi)) {
             fuse_log(FUSE_LOG_ERR, "%s: Failed to create thread for queue %d\n",
                      __func__, qidx);
             assert(0);
@@ -1058,7 +1087,7 @@ int virtio_session_mount(struct fuse_session *se)
     se->vu_socketfd = data_sock;
     se->virtio_dev->se = se;
     pthread_rwlock_init(&se->virtio_dev->vu_dispatch_rwlock, NULL);
-    if (!vu_init(&se->virtio_dev->dev, 2, se->vu_socketfd, fv_panic, NULL,
+    if (!vu_init(&se->virtio_dev->dev, 3, se->vu_socketfd, fv_panic, NULL,
                  fv_set_watch, fv_remove_watch, &fv_iface)) {
         fuse_log(FUSE_LOG_ERR, "%s: vu_init failed\n", __func__);
         return -1;
-- 
2.27.0


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [PATCH 3/6] virtiofsd: Specify size of notification buffer using config space
  2021-06-16 19:39 ` [Virtio-fs] " Ioannis Angelakopoulos
@ 2021-06-16 19:39   ` Ioannis Angelakopoulos
  -1 siblings, 0 replies; 14+ messages in thread
From: Ioannis Angelakopoulos @ 2021-06-16 19:39 UTC (permalink / raw)
  To: qemu-devel, virtio-fs; +Cc: iangelak, stefanha, dgilbert, vgoyal

From: Vivek Goyal <vgoyal@redhat.com>

Daemon specifies size of notification buffer needed and that should be
done using config space.

Only ->notify_buf_size value of config space comes from daemon. Rest of
it is filled by qemu device emulation code.

Signed-off-by: Vivek Goyal <vgoyal@redhat.com>
Signed-off-by: Ioannis Angelakopoulos <iangelak@redhat.com>
---
 hw/virtio/vhost-user-fs.c                  | 27 +++++++++++++++++++
 include/hw/virtio/vhost-user-fs.h          |  4 ++-
 include/standard-headers/linux/virtio_fs.h |  2 ++
 tools/virtiofsd/fuse_virtio.c              | 31 ++++++++++++++++++++++
 4 files changed, 63 insertions(+), 1 deletion(-)

diff --git a/hw/virtio/vhost-user-fs.c b/hw/virtio/vhost-user-fs.c
index c7fd5f3123..f510bd8029 100644
--- a/hw/virtio/vhost-user-fs.c
+++ b/hw/virtio/vhost-user-fs.c
@@ -36,15 +36,40 @@ static const int user_feature_bits[] = {
     VHOST_INVALID_FEATURE_BIT
 };
 
+static int vhost_user_fs_handle_config_change(struct vhost_dev *dev)
+{
+    return 0;
+}
+
+const VhostDevConfigOps fs_ops = {
+    .vhost_dev_config_notifier = vhost_user_fs_handle_config_change,
+};
+
 static void vuf_get_config(VirtIODevice *vdev, uint8_t *config)
 {
     VHostUserFS *fs = VHOST_USER_FS(vdev);
     struct virtio_fs_config fscfg = {};
+    int ret;
+
+    /*
+     * As of now we only get notification buffer size from device. And that's
+     * needed only if notification queue is enabled.
+     */
+    if (fs->notify_enabled) {
+        ret = vhost_dev_get_config(&fs->vhost_dev, (uint8_t *)&fs->fscfg,
+                                   sizeof(struct virtio_fs_config));
+        if (ret < 0) {
+            error_report("vhost-user-fs: get device config space failed."
+                         " ret=%d", ret);
+            return;
+        }
+    }
 
     memcpy((char *)fscfg.tag, fs->conf.tag,
            MIN(strlen(fs->conf.tag) + 1, sizeof(fscfg.tag)));
 
     virtio_stl_p(vdev, &fscfg.num_request_queues, fs->conf.num_request_queues);
+    virtio_stl_p(vdev, &fscfg.notify_buf_size, fs->fscfg.notify_buf_size);
 
     memcpy(config, &fscfg, sizeof(fscfg));
 }
@@ -255,6 +280,8 @@ static void vuf_device_realize(DeviceState *dev, Error **errp)
     /* 1 high prio queue, 1 notification queue plus the number configured */
     fs->vhost_dev.nvqs = 2 + fs->conf.num_request_queues;
     fs->vhost_dev.vqs = g_new0(struct vhost_virtqueue, fs->vhost_dev.nvqs);
+
+    vhost_dev_set_config_notifier(&fs->vhost_dev, &fs_ops);
     ret = vhost_dev_init(&fs->vhost_dev, &fs->vhost_user,
                          VHOST_BACKEND_TYPE_USER, 0);
     if (ret < 0) {
diff --git a/include/hw/virtio/vhost-user-fs.h b/include/hw/virtio/vhost-user-fs.h
index 13e2cbc48e..03780322ee 100644
--- a/include/hw/virtio/vhost-user-fs.h
+++ b/include/hw/virtio/vhost-user-fs.h
@@ -14,6 +14,7 @@
 #ifndef _QEMU_VHOST_USER_FS_H
 #define _QEMU_VHOST_USER_FS_H
 
+#include "standard-headers/linux/virtio_fs.h"
 #include "hw/virtio/virtio.h"
 #include "hw/virtio/vhost.h"
 #include "hw/virtio/vhost-user.h"
@@ -37,11 +38,12 @@ struct VHostUserFS {
     struct vhost_virtqueue *vhost_vqs;
     struct vhost_dev vhost_dev;
     VhostUserState vhost_user;
+    struct virtio_fs_config fscfg;
     VirtQueue **req_vqs;
     VirtQueue *hiprio_vq;
     int32_t bootindex;
-    bool notify_enabled;
     /*< public >*/
+    bool notify_enabled;
 };
 
 #endif /* _QEMU_VHOST_USER_FS_H */
diff --git a/include/standard-headers/linux/virtio_fs.h b/include/standard-headers/linux/virtio_fs.h
index 6383d723a3..8f0075269a 100644
--- a/include/standard-headers/linux/virtio_fs.h
+++ b/include/standard-headers/linux/virtio_fs.h
@@ -17,6 +17,8 @@ struct virtio_fs_config {
 
 	/* Number of request queues */
 	uint32_t num_request_queues;
+	/* Size of notification buffer */
+	uint32_t notify_buf_size;
 } QEMU_PACKED;
 
 /* For the id field in virtio_pci_shm_cap */
diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c
index 3ff4cc1430..f16801bbee 100644
--- a/tools/virtiofsd/fuse_virtio.c
+++ b/tools/virtiofsd/fuse_virtio.c
@@ -851,6 +851,35 @@ static bool fv_queue_order(VuDev *dev, int qidx)
     return false;
 }
 
+static uint64_t fv_get_protocol_features(VuDev *dev)
+{
+    return 1ull << VHOST_USER_PROTOCOL_F_CONFIG;
+}
+
+static int fv_get_config(VuDev *dev, uint8_t *config, uint32_t len)
+{
+    struct virtio_fs_config fscfg = {};
+    unsigned notify_size, roundto = 64;
+    union fuse_notify_union {
+        struct fuse_notify_poll_wakeup_out  wakeup_out;
+        struct fuse_notify_inval_inode_out  inode_out;
+        struct fuse_notify_inval_entry_out  entry_out;
+        struct fuse_notify_delete_out       delete_out;
+        struct fuse_notify_store_out        store_out;
+        struct fuse_notify_retrieve_out     retrieve_out;
+    };
+
+    notify_size = sizeof(struct fuse_out_header) +
+              sizeof(union fuse_notify_union);
+    notify_size = ((notify_size + roundto) / roundto) * roundto;
+
+    fscfg.notify_buf_size = notify_size;
+    memcpy(config, &fscfg, len);
+    fuse_log(FUSE_LOG_DEBUG, "%s:Setting notify_buf_size=%d\n", __func__,
+             fscfg.notify_buf_size);
+    return 0;
+}
+
 static const VuDevIface fv_iface = {
     .get_features = fv_get_features,
     .set_features = fv_set_features,
@@ -859,6 +888,8 @@ static const VuDevIface fv_iface = {
     .queue_set_started = fv_queue_set_started,
 
     .queue_is_processed_in_order = fv_queue_order,
+    .get_protocol_features = fv_get_protocol_features,
+    .get_config = fv_get_config,
 };
 
 /*
-- 
2.27.0



^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [Virtio-fs] [PATCH 3/6] virtiofsd: Specify size of notification buffer using config space
@ 2021-06-16 19:39   ` Ioannis Angelakopoulos
  0 siblings, 0 replies; 14+ messages in thread
From: Ioannis Angelakopoulos @ 2021-06-16 19:39 UTC (permalink / raw)
  To: qemu-devel, virtio-fs; +Cc: vgoyal

From: Vivek Goyal <vgoyal@redhat.com>

Daemon specifies size of notification buffer needed and that should be
done using config space.

Only ->notify_buf_size value of config space comes from daemon. Rest of
it is filled by qemu device emulation code.

Signed-off-by: Vivek Goyal <vgoyal@redhat.com>
Signed-off-by: Ioannis Angelakopoulos <iangelak@redhat.com>
---
 hw/virtio/vhost-user-fs.c                  | 27 +++++++++++++++++++
 include/hw/virtio/vhost-user-fs.h          |  4 ++-
 include/standard-headers/linux/virtio_fs.h |  2 ++
 tools/virtiofsd/fuse_virtio.c              | 31 ++++++++++++++++++++++
 4 files changed, 63 insertions(+), 1 deletion(-)

diff --git a/hw/virtio/vhost-user-fs.c b/hw/virtio/vhost-user-fs.c
index c7fd5f3123..f510bd8029 100644
--- a/hw/virtio/vhost-user-fs.c
+++ b/hw/virtio/vhost-user-fs.c
@@ -36,15 +36,40 @@ static const int user_feature_bits[] = {
     VHOST_INVALID_FEATURE_BIT
 };
 
+static int vhost_user_fs_handle_config_change(struct vhost_dev *dev)
+{
+    return 0;
+}
+
+const VhostDevConfigOps fs_ops = {
+    .vhost_dev_config_notifier = vhost_user_fs_handle_config_change,
+};
+
 static void vuf_get_config(VirtIODevice *vdev, uint8_t *config)
 {
     VHostUserFS *fs = VHOST_USER_FS(vdev);
     struct virtio_fs_config fscfg = {};
+    int ret;
+
+    /*
+     * As of now we only get notification buffer size from device. And that's
+     * needed only if notification queue is enabled.
+     */
+    if (fs->notify_enabled) {
+        ret = vhost_dev_get_config(&fs->vhost_dev, (uint8_t *)&fs->fscfg,
+                                   sizeof(struct virtio_fs_config));
+        if (ret < 0) {
+            error_report("vhost-user-fs: get device config space failed."
+                         " ret=%d", ret);
+            return;
+        }
+    }
 
     memcpy((char *)fscfg.tag, fs->conf.tag,
            MIN(strlen(fs->conf.tag) + 1, sizeof(fscfg.tag)));
 
     virtio_stl_p(vdev, &fscfg.num_request_queues, fs->conf.num_request_queues);
+    virtio_stl_p(vdev, &fscfg.notify_buf_size, fs->fscfg.notify_buf_size);
 
     memcpy(config, &fscfg, sizeof(fscfg));
 }
@@ -255,6 +280,8 @@ static void vuf_device_realize(DeviceState *dev, Error **errp)
     /* 1 high prio queue, 1 notification queue plus the number configured */
     fs->vhost_dev.nvqs = 2 + fs->conf.num_request_queues;
     fs->vhost_dev.vqs = g_new0(struct vhost_virtqueue, fs->vhost_dev.nvqs);
+
+    vhost_dev_set_config_notifier(&fs->vhost_dev, &fs_ops);
     ret = vhost_dev_init(&fs->vhost_dev, &fs->vhost_user,
                          VHOST_BACKEND_TYPE_USER, 0);
     if (ret < 0) {
diff --git a/include/hw/virtio/vhost-user-fs.h b/include/hw/virtio/vhost-user-fs.h
index 13e2cbc48e..03780322ee 100644
--- a/include/hw/virtio/vhost-user-fs.h
+++ b/include/hw/virtio/vhost-user-fs.h
@@ -14,6 +14,7 @@
 #ifndef _QEMU_VHOST_USER_FS_H
 #define _QEMU_VHOST_USER_FS_H
 
+#include "standard-headers/linux/virtio_fs.h"
 #include "hw/virtio/virtio.h"
 #include "hw/virtio/vhost.h"
 #include "hw/virtio/vhost-user.h"
@@ -37,11 +38,12 @@ struct VHostUserFS {
     struct vhost_virtqueue *vhost_vqs;
     struct vhost_dev vhost_dev;
     VhostUserState vhost_user;
+    struct virtio_fs_config fscfg;
     VirtQueue **req_vqs;
     VirtQueue *hiprio_vq;
     int32_t bootindex;
-    bool notify_enabled;
     /*< public >*/
+    bool notify_enabled;
 };
 
 #endif /* _QEMU_VHOST_USER_FS_H */
diff --git a/include/standard-headers/linux/virtio_fs.h b/include/standard-headers/linux/virtio_fs.h
index 6383d723a3..8f0075269a 100644
--- a/include/standard-headers/linux/virtio_fs.h
+++ b/include/standard-headers/linux/virtio_fs.h
@@ -17,6 +17,8 @@ struct virtio_fs_config {
 
 	/* Number of request queues */
 	uint32_t num_request_queues;
+	/* Size of notification buffer */
+	uint32_t notify_buf_size;
 } QEMU_PACKED;
 
 /* For the id field in virtio_pci_shm_cap */
diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c
index 3ff4cc1430..f16801bbee 100644
--- a/tools/virtiofsd/fuse_virtio.c
+++ b/tools/virtiofsd/fuse_virtio.c
@@ -851,6 +851,35 @@ static bool fv_queue_order(VuDev *dev, int qidx)
     return false;
 }
 
+static uint64_t fv_get_protocol_features(VuDev *dev)
+{
+    return 1ull << VHOST_USER_PROTOCOL_F_CONFIG;
+}
+
+static int fv_get_config(VuDev *dev, uint8_t *config, uint32_t len)
+{
+    struct virtio_fs_config fscfg = {};
+    unsigned notify_size, roundto = 64;
+    union fuse_notify_union {
+        struct fuse_notify_poll_wakeup_out  wakeup_out;
+        struct fuse_notify_inval_inode_out  inode_out;
+        struct fuse_notify_inval_entry_out  entry_out;
+        struct fuse_notify_delete_out       delete_out;
+        struct fuse_notify_store_out        store_out;
+        struct fuse_notify_retrieve_out     retrieve_out;
+    };
+
+    notify_size = sizeof(struct fuse_out_header) +
+              sizeof(union fuse_notify_union);
+    notify_size = ((notify_size + roundto) / roundto) * roundto;
+
+    fscfg.notify_buf_size = notify_size;
+    memcpy(config, &fscfg, len);
+    fuse_log(FUSE_LOG_DEBUG, "%s:Setting notify_buf_size=%d\n", __func__,
+             fscfg.notify_buf_size);
+    return 0;
+}
+
 static const VuDevIface fv_iface = {
     .get_features = fv_get_features,
     .set_features = fv_set_features,
@@ -859,6 +888,8 @@ static const VuDevIface fv_iface = {
     .queue_set_started = fv_queue_set_started,
 
     .queue_is_processed_in_order = fv_queue_order,
+    .get_protocol_features = fv_get_protocol_features,
+    .get_config = fv_get_config,
 };
 
 /*
-- 
2.27.0


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [PATCH 4/6] virtiofsd: Implement blocking posix locks
  2021-06-16 19:39 ` [Virtio-fs] " Ioannis Angelakopoulos
@ 2021-06-16 19:39   ` Ioannis Angelakopoulos
  -1 siblings, 0 replies; 14+ messages in thread
From: Ioannis Angelakopoulos @ 2021-06-16 19:39 UTC (permalink / raw)
  To: qemu-devel, virtio-fs; +Cc: iangelak, stefanha, dgilbert, vgoyal

From: Vivek Goyal <vgoyal@redhat.com>

As of now we don't support fcntl(F_SETLKW) and if we see one, we return
-EOPNOTSUPP.

Change that by accepting these requests and returning a reply
immediately asking caller to wait. Once lock is available, send a
notification to the waiter indicating lock is available.

Signed-off-by: Vivek Goyal <vgoyal@redhat.com>
Signed-off-by: Ioannis Angelakopoulos <iangelak@redhat.com>
---
 include/standard-headers/linux/fuse.h |   8 ++
 tools/virtiofsd/fuse_lowlevel.c       |  38 +++++++++-
 tools/virtiofsd/fuse_lowlevel.h       |  26 +++++++
 tools/virtiofsd/fuse_virtio.c         | 101 ++++++++++++++++++++++++--
 tools/virtiofsd/passthrough_ll.c      |  59 ++++++++++-----
 5 files changed, 207 insertions(+), 25 deletions(-)

diff --git a/include/standard-headers/linux/fuse.h b/include/standard-headers/linux/fuse.h
index 950d7edb7e..4680efc531 100644
--- a/include/standard-headers/linux/fuse.h
+++ b/include/standard-headers/linux/fuse.h
@@ -511,6 +511,7 @@ enum fuse_notify_code {
 	FUSE_NOTIFY_STORE = 4,
 	FUSE_NOTIFY_RETRIEVE = 5,
 	FUSE_NOTIFY_DELETE = 6,
+        FUSE_NOTIFY_LOCK = 7,
 	FUSE_NOTIFY_CODE_MAX,
 };
 
@@ -898,6 +899,13 @@ struct fuse_notify_retrieve_in {
 	uint64_t	dummy4;
 };
 
+struct fuse_notify_lock_out {
+	uint64_t	unique;
+	int32_t		error;
+	int32_t		padding;
+};
+
+
 /* Device ioctls: */
 #define FUSE_DEV_IOC_CLONE	_IOR(229, 0, uint32_t)
 
diff --git a/tools/virtiofsd/fuse_lowlevel.c b/tools/virtiofsd/fuse_lowlevel.c
index 7fe2cef1eb..4b03ec2f9f 100644
--- a/tools/virtiofsd/fuse_lowlevel.c
+++ b/tools/virtiofsd/fuse_lowlevel.c
@@ -179,8 +179,8 @@ int fuse_send_reply_iov_nofree(fuse_req_t req, int error, struct iovec *iov,
         .unique = req->unique,
         .error = error,
     };
-
-    if (error <= -1000 || error > 0) {
+    /* error = 1 has been used to signal client to wait for notificaiton */
+    if (error <= -1000 || error > 1) {
         fuse_log(FUSE_LOG_ERR, "fuse: bad error value: %i\n", error);
         out.error = -ERANGE;
     }
@@ -290,6 +290,12 @@ int fuse_reply_err(fuse_req_t req, int err)
     return send_reply(req, -err, NULL, 0);
 }
 
+int fuse_reply_wait(fuse_req_t req)
+{
+    /* TODO: This is a hack. Fix it */
+    return send_reply(req, 1, NULL, 0);
+}
+
 void fuse_reply_none(fuse_req_t req)
 {
     fuse_free_req(req);
@@ -2145,6 +2151,34 @@ static void do_destroy(fuse_req_t req, fuse_ino_t nodeid,
     send_reply_ok(req, NULL, 0);
 }
 
+static int send_notify_iov(struct fuse_session *se, int notify_code,
+                           struct iovec *iov, int count)
+{
+    struct fuse_out_header out;
+    if (!se->got_init) {
+        return -ENOTCONN;
+    }
+    out.unique = 0;
+    out.error = notify_code;
+    iov[0].iov_base = &out;
+    iov[0].iov_len = sizeof(struct fuse_out_header);
+    return fuse_send_msg(se, NULL, iov, count);
+}
+
+int fuse_lowlevel_notify_lock(struct fuse_session *se, uint64_t unique,
+                  int32_t error)
+{
+    struct fuse_notify_lock_out outarg = {0};
+    struct iovec iov[2];
+
+    outarg.unique = unique;
+    outarg.error = -error;
+
+    iov[1].iov_base = &outarg;
+    iov[1].iov_len = sizeof(outarg);
+    return send_notify_iov(se, FUSE_NOTIFY_LOCK, iov, 2);
+}
+
 int fuse_lowlevel_notify_store(struct fuse_session *se, fuse_ino_t ino,
                                off_t offset, struct fuse_bufvec *bufv)
 {
diff --git a/tools/virtiofsd/fuse_lowlevel.h b/tools/virtiofsd/fuse_lowlevel.h
index 3bf786b034..1e8b3d2c35 100644
--- a/tools/virtiofsd/fuse_lowlevel.h
+++ b/tools/virtiofsd/fuse_lowlevel.h
@@ -1250,6 +1250,22 @@ struct fuse_lowlevel_ops {
  */
 int fuse_reply_err(fuse_req_t req, int err);
 
+/**
+ * Ask caller to wait for lock.
+ *
+ * Possible requests:
+ *   setlkw
+ *
+ * If caller sends a blocking lock request (setlkw), then reply to caller
+ * that wait for lock to be available. Once lock is available caller will
+ * receive a notification with request's unique id. Notification will
+ * carry info whether lock was successfully obtained or not.
+ *
+ * @param req request handle
+ * @return zero for success, -errno for failure to send reply
+ */
+int fuse_reply_wait(fuse_req_t req);
+
 /**
  * Don't send reply
  *
@@ -1684,6 +1700,16 @@ int fuse_lowlevel_notify_delete(struct fuse_session *se, fuse_ino_t parent,
 int fuse_lowlevel_notify_store(struct fuse_session *se, fuse_ino_t ino,
                                off_t offset, struct fuse_bufvec *bufv);
 
+/**
+ * Notify event related to previous lock request
+ *
+ * @param se the session object
+ * @param unique the unique id of the request which requested setlkw
+ * @param error zero for success, -errno for the failure
+ */
+int fuse_lowlevel_notify_lock(struct fuse_session *se, uint64_t unique,
+                              int32_t error);
+
 /*
  * Utility functions
  */
diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c
index f16801bbee..cb4dbafd91 100644
--- a/tools/virtiofsd/fuse_virtio.c
+++ b/tools/virtiofsd/fuse_virtio.c
@@ -233,6 +233,86 @@ static void copy_iov(struct iovec *src_iov, int src_count,
     }
 }
 
+static int virtio_send_notify_msg(struct fuse_session *se, struct iovec *iov,
+                                  int count)
+{
+    struct fv_QueueInfo *qi;
+    VuDev *dev = &se->virtio_dev->dev;
+    VuVirtq *q;
+    FVRequest *req;
+    VuVirtqElement *elem;
+    unsigned int in_num;
+    struct fuse_out_header *out = iov[0].iov_base;
+    size_t in_len, tosend_len = iov_size(iov, count);
+    struct iovec *in_sg;
+    int ret = 0;
+
+    /* Notifications have unique == 0 */
+    assert(!out->unique);
+
+    if (!se->notify_enabled) {
+        return -EOPNOTSUPP;
+    }
+
+    /* If notifications are enabled, queue index 1 is notification queue */
+    qi = se->virtio_dev->qi[1];
+    q = vu_get_queue(dev, qi->qidx);
+
+    pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
+    pthread_mutex_lock(&qi->vq_lock);
+    /* Pop an element from queue */
+    req = vu_queue_pop(dev, q, sizeof(FVRequest));
+    if (!req) {
+        /*
+         * TODO: Implement some sort of ring buffer and queue notifications
+         * on that and send these later when notification queue has space
+         * available.
+         */
+        ret = -ENOSPC;
+    }
+    pthread_mutex_unlock(&qi->vq_lock);
+    pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
+
+    if (ret) {
+        return ret;
+    }
+
+    out->len = tosend_len;
+    elem = &req->elem;
+    in_num = elem->in_num;
+    in_sg = elem->in_sg;
+    in_len = iov_size(in_sg, in_num);
+    fuse_log(FUSE_LOG_DEBUG, "%s: elem %d: with %d in desc of length %zd\n",
+             __func__, elem->index, in_num,  in_len);
+
+    if (in_len < sizeof(struct fuse_out_header)) {
+        fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for out_header\n",
+                 __func__, elem->index);
+        ret = -E2BIG;
+        goto out;
+    }
+
+    if (in_len < tosend_len) {
+        fuse_log(FUSE_LOG_ERR, "%s: elem %d too small for data len"
+                 " %zd\n", __func__, elem->index, tosend_len);
+        ret = -E2BIG;
+        goto out;
+    }
+
+    /* First copy the header data from iov->in_sg */
+    copy_iov(iov, count, in_sg, in_num, tosend_len);
+
+    pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
+    pthread_mutex_lock(&qi->vq_lock);
+    vu_queue_push(dev, q, elem, tosend_len);
+    vu_queue_notify(dev, q);
+    pthread_mutex_unlock(&qi->vq_lock);
+    pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
+out:
+    free(req);
+    return ret;
+}
+
 /*
  * pthread_rwlock_rdlock() and pthread_rwlock_wrlock can fail if
  * a deadlock condition is detected or the current thread already
@@ -266,11 +346,11 @@ static void vu_dispatch_unlock(struct fv_VuDev *vud)
 int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
                     struct iovec *iov, int count)
 {
-    FVRequest *req = container_of(ch, FVRequest, ch);
-    struct fv_QueueInfo *qi = ch->qi;
+    FVRequest *req;
+    struct fv_QueueInfo *qi;
     VuDev *dev = &se->virtio_dev->dev;
-    VuVirtq *q = vu_get_queue(dev, qi->qidx);
-    VuVirtqElement *elem = &req->elem;
+    VuVirtq *q;
+    VuVirtqElement *elem;
     int ret = 0;
 
     assert(count >= 1);
@@ -281,8 +361,16 @@ int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
 
     size_t tosend_len = iov_size(iov, count);
 
-    /* unique == 0 is notification, which we don't support */
-    assert(out->unique);
+    /* unique == 0 is notification */
+    if (!out->unique) {
+        return virtio_send_notify_msg(se, iov, count);
+    }
+
+    assert(ch);
+    req = container_of(ch, FVRequest, ch);
+    elem = &req->elem;
+    qi = ch->qi;
+    q = vu_get_queue(dev, qi->qidx);
     assert(!req->reply_sent);
 
     /* The 'in' part of the elem is to qemu */
@@ -867,6 +955,7 @@ static int fv_get_config(VuDev *dev, uint8_t *config, uint32_t len)
         struct fuse_notify_delete_out       delete_out;
         struct fuse_notify_store_out        store_out;
         struct fuse_notify_retrieve_out     retrieve_out;
+        struct fuse_notify_lock_out         lock_out;
     };
 
     notify_size = sizeof(struct fuse_out_header) +
diff --git a/tools/virtiofsd/passthrough_ll.c b/tools/virtiofsd/passthrough_ll.c
index f2fa9d95bb..8f24954a00 100644
--- a/tools/virtiofsd/passthrough_ll.c
+++ b/tools/virtiofsd/passthrough_ll.c
@@ -968,14 +968,6 @@ static int do_statx(struct lo_data *lo, int dirfd, const char *pathname,
     return 0;
 }
 
-static void posix_locks_value_destroy(gpointer data)
-{
-    struct lo_inode_plock *plock = data;
-
-    close(plock->fd);
-    free(plock);
-}
-
 /*
  * Increments nlookup on the inode on success. unref_inode_lolocked() must be
  * called eventually to decrement nlookup again. If inodep is non-NULL, the
@@ -2064,7 +2056,10 @@ static void lo_setlk(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi,
     struct lo_data *lo = lo_data(req);
     struct lo_inode *inode;
     struct lo_inode_plock *plock;
-    int ret, saverr = 0;
+    int ret, saverr = 0, ofd;
+    uint64_t unique;
+    struct fuse_session *se = req->se;
+    bool async_lock = false;
 
     fuse_log(FUSE_LOG_DEBUG,
              "lo_setlk(ino=%" PRIu64 ", flags=%d)"
@@ -2078,11 +2073,6 @@ static void lo_setlk(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi,
         return;
     }
 
-    if (sleep) {
-        fuse_reply_err(req, EOPNOTSUPP);
-        return;
-    }
-
     inode = lo_inode(req, ino);
     if (!inode) {
         fuse_reply_err(req, EBADF);
@@ -2095,21 +2085,56 @@ static void lo_setlk(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi,
 
     if (!plock) {
         saverr = ret;
+        pthread_mutex_unlock(&inode->plock_mutex);
         goto out;
     }
 
+    /*
+     * plock is now released when inode is going away. We already have
+     * a reference on inode, so it is guaranteed that plock->fd is
+     * still around even after dropping inode->plock_mutex lock
+     */
+    ofd = plock->fd;
+    pthread_mutex_unlock(&inode->plock_mutex);
+
+    /*
+     * If this lock request can block, request caller to wait for
+     * notification. Do not access req after this. Once lock is
+     * available, send a notification instead.
+     */
+    if (sleep && lock->l_type != F_UNLCK) {
+        /*
+         * If notification queue is not enabled, can't support async
+         * locks.
+         */
+        if (!se->notify_enabled) {
+            saverr = EOPNOTSUPP;
+            goto out;
+        }
+        async_lock = true;
+        unique = req->unique;
+        fuse_reply_wait(req);
+    }
+
     /* TODO: Is it alright to modify flock? */
     lock->l_pid = 0;
-    ret = fcntl(plock->fd, F_OFD_SETLK, lock);
+    if (async_lock) {
+        ret = fcntl(ofd, F_OFD_SETLKW, lock);
+    } else {
+        ret = fcntl(ofd, F_OFD_SETLK, lock);
+    }
     if (ret == -1) {
         saverr = errno;
     }
 
 out:
-    pthread_mutex_unlock(&inode->plock_mutex);
     lo_inode_put(lo, &inode);
 
-    fuse_reply_err(req, saverr);
+    if (!async_lock) {
+        fuse_reply_err(req, saverr);
+    } else {
+        fuse_lowlevel_notify_lock(se, unique, saverr);
+    }
 }
 
 static void lo_fsyncdir(fuse_req_t req, fuse_ino_t ino, int datasync,
-- 
2.27.0



^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [Virtio-fs] [PATCH 4/6] virtiofsd: Implement blocking posix locks
@ 2021-06-16 19:39   ` Ioannis Angelakopoulos
  0 siblings, 0 replies; 14+ messages in thread
From: Ioannis Angelakopoulos @ 2021-06-16 19:39 UTC (permalink / raw)
  To: qemu-devel, virtio-fs; +Cc: vgoyal

From: Vivek Goyal <vgoyal@redhat.com>

As of now we don't support fcntl(F_SETLKW) and if we see one, we return
-EOPNOTSUPP.

Change that by accepting these requests and returning a reply
immediately asking caller to wait. Once lock is available, send a
notification to the waiter indicating lock is available.

Signed-off-by: Vivek Goyal <vgoyal@redhat.com>
Signed-off-by: Ioannis Angelakopoulos <iangelak@redhat.com>
---
 include/standard-headers/linux/fuse.h |   8 ++
 tools/virtiofsd/fuse_lowlevel.c       |  38 +++++++++-
 tools/virtiofsd/fuse_lowlevel.h       |  26 +++++++
 tools/virtiofsd/fuse_virtio.c         | 101 ++++++++++++++++++++++++--
 tools/virtiofsd/passthrough_ll.c      |  59 ++++++++++-----
 5 files changed, 207 insertions(+), 25 deletions(-)

diff --git a/include/standard-headers/linux/fuse.h b/include/standard-headers/linux/fuse.h
index 950d7edb7e..4680efc531 100644
--- a/include/standard-headers/linux/fuse.h
+++ b/include/standard-headers/linux/fuse.h
@@ -511,6 +511,7 @@ enum fuse_notify_code {
 	FUSE_NOTIFY_STORE = 4,
 	FUSE_NOTIFY_RETRIEVE = 5,
 	FUSE_NOTIFY_DELETE = 6,
+        FUSE_NOTIFY_LOCK = 7,
 	FUSE_NOTIFY_CODE_MAX,
 };
 
@@ -898,6 +899,13 @@ struct fuse_notify_retrieve_in {
 	uint64_t	dummy4;
 };
 
+struct fuse_notify_lock_out {
+	uint64_t	unique;
+	int32_t		error;
+	int32_t		padding;
+};
+
+
 /* Device ioctls: */
 #define FUSE_DEV_IOC_CLONE	_IOR(229, 0, uint32_t)
 
diff --git a/tools/virtiofsd/fuse_lowlevel.c b/tools/virtiofsd/fuse_lowlevel.c
index 7fe2cef1eb..4b03ec2f9f 100644
--- a/tools/virtiofsd/fuse_lowlevel.c
+++ b/tools/virtiofsd/fuse_lowlevel.c
@@ -179,8 +179,8 @@ int fuse_send_reply_iov_nofree(fuse_req_t req, int error, struct iovec *iov,
         .unique = req->unique,
         .error = error,
     };
-
-    if (error <= -1000 || error > 0) {
+    /* error = 1 has been used to signal client to wait for notificaiton */
+    if (error <= -1000 || error > 1) {
         fuse_log(FUSE_LOG_ERR, "fuse: bad error value: %i\n", error);
         out.error = -ERANGE;
     }
@@ -290,6 +290,12 @@ int fuse_reply_err(fuse_req_t req, int err)
     return send_reply(req, -err, NULL, 0);
 }
 
+int fuse_reply_wait(fuse_req_t req)
+{
+    /* TODO: This is a hack. Fix it */
+    return send_reply(req, 1, NULL, 0);
+}
+
 void fuse_reply_none(fuse_req_t req)
 {
     fuse_free_req(req);
@@ -2145,6 +2151,34 @@ static void do_destroy(fuse_req_t req, fuse_ino_t nodeid,
     send_reply_ok(req, NULL, 0);
 }
 
+static int send_notify_iov(struct fuse_session *se, int notify_code,
+                           struct iovec *iov, int count)
+{
+    struct fuse_out_header out;
+    if (!se->got_init) {
+        return -ENOTCONN;
+    }
+    out.unique = 0;
+    out.error = notify_code;
+    iov[0].iov_base = &out;
+    iov[0].iov_len = sizeof(struct fuse_out_header);
+    return fuse_send_msg(se, NULL, iov, count);
+}
+
+int fuse_lowlevel_notify_lock(struct fuse_session *se, uint64_t unique,
+                  int32_t error)
+{
+    struct fuse_notify_lock_out outarg = {0};
+    struct iovec iov[2];
+
+    outarg.unique = unique;
+    outarg.error = -error;
+
+    iov[1].iov_base = &outarg;
+    iov[1].iov_len = sizeof(outarg);
+    return send_notify_iov(se, FUSE_NOTIFY_LOCK, iov, 2);
+}
+
 int fuse_lowlevel_notify_store(struct fuse_session *se, fuse_ino_t ino,
                                off_t offset, struct fuse_bufvec *bufv)
 {
diff --git a/tools/virtiofsd/fuse_lowlevel.h b/tools/virtiofsd/fuse_lowlevel.h
index 3bf786b034..1e8b3d2c35 100644
--- a/tools/virtiofsd/fuse_lowlevel.h
+++ b/tools/virtiofsd/fuse_lowlevel.h
@@ -1250,6 +1250,22 @@ struct fuse_lowlevel_ops {
  */
 int fuse_reply_err(fuse_req_t req, int err);
 
+/**
+ * Ask caller to wait for lock.
+ *
+ * Possible requests:
+ *   setlkw
+ *
+ * If caller sends a blocking lock request (setlkw), then reply to caller
+ * that wait for lock to be available. Once lock is available caller will
+ * receive a notification with request's unique id. Notification will
+ * carry info whether lock was successfully obtained or not.
+ *
+ * @param req request handle
+ * @return zero for success, -errno for failure to send reply
+ */
+int fuse_reply_wait(fuse_req_t req);
+
 /**
  * Don't send reply
  *
@@ -1684,6 +1700,16 @@ int fuse_lowlevel_notify_delete(struct fuse_session *se, fuse_ino_t parent,
 int fuse_lowlevel_notify_store(struct fuse_session *se, fuse_ino_t ino,
                                off_t offset, struct fuse_bufvec *bufv);
 
+/**
+ * Notify event related to previous lock request
+ *
+ * @param se the session object
+ * @param unique the unique id of the request which requested setlkw
+ * @param error zero for success, -errno for the failure
+ */
+int fuse_lowlevel_notify_lock(struct fuse_session *se, uint64_t unique,
+                              int32_t error);
+
 /*
  * Utility functions
  */
diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c
index f16801bbee..cb4dbafd91 100644
--- a/tools/virtiofsd/fuse_virtio.c
+++ b/tools/virtiofsd/fuse_virtio.c
@@ -233,6 +233,86 @@ static void copy_iov(struct iovec *src_iov, int src_count,
     }
 }
 
+static int virtio_send_notify_msg(struct fuse_session *se, struct iovec *iov,
+                                  int count)
+{
+    struct fv_QueueInfo *qi;
+    VuDev *dev = &se->virtio_dev->dev;
+    VuVirtq *q;
+    FVRequest *req;
+    VuVirtqElement *elem;
+    unsigned int in_num;
+    struct fuse_out_header *out = iov[0].iov_base;
+    size_t in_len, tosend_len = iov_size(iov, count);
+    struct iovec *in_sg;
+    int ret = 0;
+
+    /* Notifications have unique == 0 */
+    assert(!out->unique);
+
+    if (!se->notify_enabled) {
+        return -EOPNOTSUPP;
+    }
+
+    /* If notifications are enabled, queue index 1 is notification queue */
+    qi = se->virtio_dev->qi[1];
+    q = vu_get_queue(dev, qi->qidx);
+
+    pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
+    pthread_mutex_lock(&qi->vq_lock);
+    /* Pop an element from queue */
+    req = vu_queue_pop(dev, q, sizeof(FVRequest));
+    if (!req) {
+        /*
+         * TODO: Implement some sort of ring buffer and queue notifications
+         * on that and send these later when notification queue has space
+         * available.
+         */
+        ret = -ENOSPC;
+    }
+    pthread_mutex_unlock(&qi->vq_lock);
+    pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
+
+    if (ret) {
+        return ret;
+    }
+
+    out->len = tosend_len;
+    elem = &req->elem;
+    in_num = elem->in_num;
+    in_sg = elem->in_sg;
+    in_len = iov_size(in_sg, in_num);
+    fuse_log(FUSE_LOG_DEBUG, "%s: elem %d: with %d in desc of length %zd\n",
+             __func__, elem->index, in_num,  in_len);
+
+    if (in_len < sizeof(struct fuse_out_header)) {
+        fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for out_header\n",
+                 __func__, elem->index);
+        ret = -E2BIG;
+        goto out;
+    }
+
+    if (in_len < tosend_len) {
+        fuse_log(FUSE_LOG_ERR, "%s: elem %d too small for data len"
+                 " %zd\n", __func__, elem->index, tosend_len);
+        ret = -E2BIG;
+        goto out;
+    }
+
+    /* First copy the header data from iov->in_sg */
+    copy_iov(iov, count, in_sg, in_num, tosend_len);
+
+    pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
+    pthread_mutex_lock(&qi->vq_lock);
+    vu_queue_push(dev, q, elem, tosend_len);
+    vu_queue_notify(dev, q);
+    pthread_mutex_unlock(&qi->vq_lock);
+    pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
+out:
+    free(req);
+    return ret;
+}
+
 /*
  * pthread_rwlock_rdlock() and pthread_rwlock_wrlock can fail if
  * a deadlock condition is detected or the current thread already
@@ -266,11 +346,11 @@ static void vu_dispatch_unlock(struct fv_VuDev *vud)
 int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
                     struct iovec *iov, int count)
 {
-    FVRequest *req = container_of(ch, FVRequest, ch);
-    struct fv_QueueInfo *qi = ch->qi;
+    FVRequest *req;
+    struct fv_QueueInfo *qi;
     VuDev *dev = &se->virtio_dev->dev;
-    VuVirtq *q = vu_get_queue(dev, qi->qidx);
-    VuVirtqElement *elem = &req->elem;
+    VuVirtq *q;
+    VuVirtqElement *elem;
     int ret = 0;
 
     assert(count >= 1);
@@ -281,8 +361,16 @@ int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
 
     size_t tosend_len = iov_size(iov, count);
 
-    /* unique == 0 is notification, which we don't support */
-    assert(out->unique);
+    /* unique == 0 is notification */
+    if (!out->unique) {
+        return virtio_send_notify_msg(se, iov, count);
+    }
+
+    assert(ch);
+    req = container_of(ch, FVRequest, ch);
+    elem = &req->elem;
+    qi = ch->qi;
+    q = vu_get_queue(dev, qi->qidx);
     assert(!req->reply_sent);
 
     /* The 'in' part of the elem is to qemu */
@@ -867,6 +955,7 @@ static int fv_get_config(VuDev *dev, uint8_t *config, uint32_t len)
         struct fuse_notify_delete_out       delete_out;
         struct fuse_notify_store_out        store_out;
         struct fuse_notify_retrieve_out     retrieve_out;
+        struct fuse_notify_lock_out         lock_out;
     };
 
     notify_size = sizeof(struct fuse_out_header) +
diff --git a/tools/virtiofsd/passthrough_ll.c b/tools/virtiofsd/passthrough_ll.c
index f2fa9d95bb..8f24954a00 100644
--- a/tools/virtiofsd/passthrough_ll.c
+++ b/tools/virtiofsd/passthrough_ll.c
@@ -968,14 +968,6 @@ static int do_statx(struct lo_data *lo, int dirfd, const char *pathname,
     return 0;
 }
 
-static void posix_locks_value_destroy(gpointer data)
-{
-    struct lo_inode_plock *plock = data;
-
-    close(plock->fd);
-    free(plock);
-}
-
 /*
  * Increments nlookup on the inode on success. unref_inode_lolocked() must be
  * called eventually to decrement nlookup again. If inodep is non-NULL, the
@@ -2064,7 +2056,10 @@ static void lo_setlk(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi,
     struct lo_data *lo = lo_data(req);
     struct lo_inode *inode;
     struct lo_inode_plock *plock;
-    int ret, saverr = 0;
+    int ret, saverr = 0, ofd;
+    uint64_t unique;
+    struct fuse_session *se = req->se;
+    bool async_lock = false;
 
     fuse_log(FUSE_LOG_DEBUG,
              "lo_setlk(ino=%" PRIu64 ", flags=%d)"
@@ -2078,11 +2073,6 @@ static void lo_setlk(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi,
         return;
     }
 
-    if (sleep) {
-        fuse_reply_err(req, EOPNOTSUPP);
-        return;
-    }
-
     inode = lo_inode(req, ino);
     if (!inode) {
         fuse_reply_err(req, EBADF);
@@ -2095,21 +2085,56 @@ static void lo_setlk(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info *fi,
 
     if (!plock) {
         saverr = ret;
+        pthread_mutex_unlock(&inode->plock_mutex);
         goto out;
     }
 
+    /*
+     * plock is now released when inode is going away. We already have
+     * a reference on inode, so it is guaranteed that plock->fd is
+     * still around even after dropping inode->plock_mutex lock
+     */
+    ofd = plock->fd;
+    pthread_mutex_unlock(&inode->plock_mutex);
+
+    /*
+     * If this lock request can block, request caller to wait for
+     * notification. Do not access req after this. Once lock is
+     * available, send a notification instead.
+     */
+    if (sleep && lock->l_type != F_UNLCK) {
+        /*
+         * If notification queue is not enabled, can't support async
+         * locks.
+         */
+        if (!se->notify_enabled) {
+            saverr = EOPNOTSUPP;
+            goto out;
+        }
+        async_lock = true;
+        unique = req->unique;
+        fuse_reply_wait(req);
+    }
+
     /* TODO: Is it alright to modify flock? */
     lock->l_pid = 0;
-    ret = fcntl(plock->fd, F_OFD_SETLK, lock);
+    if (async_lock) {
+        ret = fcntl(ofd, F_OFD_SETLKW, lock);
+    } else {
+        ret = fcntl(ofd, F_OFD_SETLK, lock);
+    }
     if (ret == -1) {
         saverr = errno;
     }
 
 out:
-    pthread_mutex_unlock(&inode->plock_mutex);
     lo_inode_put(lo, &inode);
 
-    fuse_reply_err(req, saverr);
+    if (!async_lock) {
+        fuse_reply_err(req, saverr);
+    } else {
+        fuse_lowlevel_notify_lock(se, unique, saverr);
+    }
 }
 
 static void lo_fsyncdir(fuse_req_t req, fuse_ino_t ino, int datasync,
-- 
2.27.0


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [PATCH 5/6] virtiofsd: Thread state cleanup when blocking posix locks are used
  2021-06-16 19:39 ` [Virtio-fs] " Ioannis Angelakopoulos
@ 2021-06-16 19:39   ` Ioannis Angelakopoulos
  -1 siblings, 0 replies; 14+ messages in thread
From: Ioannis Angelakopoulos @ 2021-06-16 19:39 UTC (permalink / raw)
  To: qemu-devel, virtio-fs; +Cc: iangelak, stefanha, dgilbert, vgoyal

Stop virtiofsd thread from sending any notifications/messages through
the virtqueue while the guest hard-reboots.

If a guest attempts to hard reboot while a virtiofsd thread blocks
waiting for a lock held by another guest's virtiofsd process, then
QEMU will block the guest from rebooting until the lock is released.

When the virtiofsd thread acquires the lock it will not attempt to
send a message to the notification virtqueue since the queue is
now destroyed, due to the hard-reboot attempt of the guest. The thread
will only release the lock, without sending any notifications through the
virtqueue.

Then the cleanup process can proceed normally.

Signed-off-by: Ioannis Angelakopoulos <iangelak@redhat.com>
---
 tools/virtiofsd/fuse_i.h         |  1 +
 tools/virtiofsd/fuse_lowlevel.c  |  2 ++
 tools/virtiofsd/fuse_virtio.c    | 10 ++++++++++
 tools/virtiofsd/passthrough_ll.c | 23 +++++++++++++++++++----
 4 files changed, 32 insertions(+), 4 deletions(-)

diff --git a/tools/virtiofsd/fuse_i.h b/tools/virtiofsd/fuse_i.h
index 4942d080da..269fd5e77b 100644
--- a/tools/virtiofsd/fuse_i.h
+++ b/tools/virtiofsd/fuse_i.h
@@ -62,6 +62,7 @@ struct fuse_session {
     pthread_mutex_t lock;
     pthread_rwlock_t init_rwlock;
     int got_destroy;
+    int in_cleanup;
     int broken_splice_nonblock;
     uint64_t notify_ctr;
     struct fuse_notify_req notify_list;
diff --git a/tools/virtiofsd/fuse_lowlevel.c b/tools/virtiofsd/fuse_lowlevel.c
index 4b03ec2f9f..a9f6ea61dc 100644
--- a/tools/virtiofsd/fuse_lowlevel.c
+++ b/tools/virtiofsd/fuse_lowlevel.c
@@ -1905,6 +1905,7 @@ static void do_init(fuse_req_t req, fuse_ino_t nodeid,
     se->conn.proto_minor = arg->minor;
     se->conn.capable = 0;
     se->conn.want = 0;
+    se->in_cleanup = 0;
 
     memset(&outarg, 0, sizeof(outarg));
     outarg.major = FUSE_KERNEL_VERSION;
@@ -2397,6 +2398,7 @@ void fuse_session_process_buf_int(struct fuse_session *se,
             fuse_log(FUSE_LOG_DEBUG, "%s: reinit\n", __func__);
             se->got_destroy = 1;
             se->got_init = 0;
+            se->in_cleanup = 0;
             if (se->op.destroy) {
                 se->op.destroy(se->userdata);
             }
diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c
index cb4dbafd91..7efaf9ae68 100644
--- a/tools/virtiofsd/fuse_virtio.c
+++ b/tools/virtiofsd/fuse_virtio.c
@@ -839,11 +839,14 @@ static void fv_queue_cleanup_thread(struct fv_VuDev *vud, int qidx)
         if (eventfd_write(ourqi->kill_fd, 1)) {
             fuse_log(FUSE_LOG_ERR, "Eventfd_read for queue: %m\n");
         }
+
         ret = pthread_join(ourqi->thread, NULL);
+
         if (ret) {
             fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err"
                      " %d\n", __func__, qidx, ret);
         }
+
         close(ourqi->kill_fd);
     }
     pthread_mutex_destroy(&ourqi->vq_lock);
@@ -929,6 +932,13 @@ static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
          * the queue thread doesn't block in virtio_send_msg().
          */
         vu_dispatch_unlock(vud);
+        /*
+         * Indicate to any thread that was blocked and wakes up
+         * that we are in the thread cleanup process
+         */
+        if (!vud->se->in_cleanup) {
+            vud->se->in_cleanup = 1;
+        }
         fv_queue_cleanup_thread(vud, qidx);
         vu_dispatch_wrlock(vud);
     }
diff --git a/tools/virtiofsd/passthrough_ll.c b/tools/virtiofsd/passthrough_ll.c
index 8f24954a00..8a2aa10b9c 100644
--- a/tools/virtiofsd/passthrough_ll.c
+++ b/tools/virtiofsd/passthrough_ll.c
@@ -1971,9 +1971,6 @@ static struct lo_inode_plock *lookup_create_plock_ctx(struct lo_data *lo,
     plock =
         g_hash_table_lookup(inode->posix_locks, GUINT_TO_POINTER(lock_owner));
 
-    fuse_log(FUSE_LOG_DEBUG, "lookup_create_plock_ctx():"
-             " Inserted element in posix_locks hash table"
-             " with value pointer %p\n", plock);
     if (plock) {
         return plock;
     }
@@ -1997,6 +1994,10 @@ static struct lo_inode_plock *lookup_create_plock_ctx(struct lo_data *lo,
     plock->fd = fd;
     g_hash_table_insert(inode->posix_locks, GUINT_TO_POINTER(plock->lock_owner),
                         plock);
+    fuse_log(FUSE_LOG_DEBUG, "lookup_create_plock_ctx():"
+             " Inserted element in posix_locks hash table"
+             " with value pointer %p\n", plock);
+
     return plock;
 }
 
@@ -2133,7 +2134,21 @@ out:
     if (!async_lock) {
         fuse_reply_err(req, saverr);
     } else {
-        fuse_lowlevel_notify_lock(se, unique, saverr);
+        /*
+         * Before attempting to send any message through
+         * the thread should check if the queue actually
+         * exists
+         */
+        if (!se->in_cleanup) {
+            fuse_lowlevel_notify_lock(se, unique, saverr);
+        } else {
+            /* Release the locks */
+            lock->l_type = F_UNLCK;
+            lock->l_whence = SEEK_SET;
+            /* Unlock whole file */
+            lock->l_start = lock->l_len = 0;
+            fcntl(ofd, F_OFD_SETLKW, lock);
+        }
     }
 }
 
-- 
2.27.0



^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [Virtio-fs] [PATCH 5/6] virtiofsd: Thread state cleanup when blocking posix locks are used
@ 2021-06-16 19:39   ` Ioannis Angelakopoulos
  0 siblings, 0 replies; 14+ messages in thread
From: Ioannis Angelakopoulos @ 2021-06-16 19:39 UTC (permalink / raw)
  To: qemu-devel, virtio-fs; +Cc: vgoyal

Stop virtiofsd thread from sending any notifications/messages through
the virtqueue while the guest hard-reboots.

If a guest attempts to hard reboot while a virtiofsd thread blocks
waiting for a lock held by another guest's virtiofsd process, then
QEMU will block the guest from rebooting until the lock is released.

When the virtiofsd thread acquires the lock it will not attempt to
send a message to the notification virtqueue since the queue is
now destroyed, due to the hard-reboot attempt of the guest. The thread
will only release the lock, without sending any notifications through the
virtqueue.

Then the cleanup process can proceed normally.

Signed-off-by: Ioannis Angelakopoulos <iangelak@redhat.com>
---
 tools/virtiofsd/fuse_i.h         |  1 +
 tools/virtiofsd/fuse_lowlevel.c  |  2 ++
 tools/virtiofsd/fuse_virtio.c    | 10 ++++++++++
 tools/virtiofsd/passthrough_ll.c | 23 +++++++++++++++++++----
 4 files changed, 32 insertions(+), 4 deletions(-)

diff --git a/tools/virtiofsd/fuse_i.h b/tools/virtiofsd/fuse_i.h
index 4942d080da..269fd5e77b 100644
--- a/tools/virtiofsd/fuse_i.h
+++ b/tools/virtiofsd/fuse_i.h
@@ -62,6 +62,7 @@ struct fuse_session {
     pthread_mutex_t lock;
     pthread_rwlock_t init_rwlock;
     int got_destroy;
+    int in_cleanup;
     int broken_splice_nonblock;
     uint64_t notify_ctr;
     struct fuse_notify_req notify_list;
diff --git a/tools/virtiofsd/fuse_lowlevel.c b/tools/virtiofsd/fuse_lowlevel.c
index 4b03ec2f9f..a9f6ea61dc 100644
--- a/tools/virtiofsd/fuse_lowlevel.c
+++ b/tools/virtiofsd/fuse_lowlevel.c
@@ -1905,6 +1905,7 @@ static void do_init(fuse_req_t req, fuse_ino_t nodeid,
     se->conn.proto_minor = arg->minor;
     se->conn.capable = 0;
     se->conn.want = 0;
+    se->in_cleanup = 0;
 
     memset(&outarg, 0, sizeof(outarg));
     outarg.major = FUSE_KERNEL_VERSION;
@@ -2397,6 +2398,7 @@ void fuse_session_process_buf_int(struct fuse_session *se,
             fuse_log(FUSE_LOG_DEBUG, "%s: reinit\n", __func__);
             se->got_destroy = 1;
             se->got_init = 0;
+            se->in_cleanup = 0;
             if (se->op.destroy) {
                 se->op.destroy(se->userdata);
             }
diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c
index cb4dbafd91..7efaf9ae68 100644
--- a/tools/virtiofsd/fuse_virtio.c
+++ b/tools/virtiofsd/fuse_virtio.c
@@ -839,11 +839,14 @@ static void fv_queue_cleanup_thread(struct fv_VuDev *vud, int qidx)
         if (eventfd_write(ourqi->kill_fd, 1)) {
             fuse_log(FUSE_LOG_ERR, "Eventfd_read for queue: %m\n");
         }
+
         ret = pthread_join(ourqi->thread, NULL);
+
         if (ret) {
             fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err"
                      " %d\n", __func__, qidx, ret);
         }
+
         close(ourqi->kill_fd);
     }
     pthread_mutex_destroy(&ourqi->vq_lock);
@@ -929,6 +932,13 @@ static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
          * the queue thread doesn't block in virtio_send_msg().
          */
         vu_dispatch_unlock(vud);
+        /*
+         * Indicate to any thread that was blocked and wakes up
+         * that we are in the thread cleanup process
+         */
+        if (!vud->se->in_cleanup) {
+            vud->se->in_cleanup = 1;
+        }
         fv_queue_cleanup_thread(vud, qidx);
         vu_dispatch_wrlock(vud);
     }
diff --git a/tools/virtiofsd/passthrough_ll.c b/tools/virtiofsd/passthrough_ll.c
index 8f24954a00..8a2aa10b9c 100644
--- a/tools/virtiofsd/passthrough_ll.c
+++ b/tools/virtiofsd/passthrough_ll.c
@@ -1971,9 +1971,6 @@ static struct lo_inode_plock *lookup_create_plock_ctx(struct lo_data *lo,
     plock =
         g_hash_table_lookup(inode->posix_locks, GUINT_TO_POINTER(lock_owner));
 
-    fuse_log(FUSE_LOG_DEBUG, "lookup_create_plock_ctx():"
-             " Inserted element in posix_locks hash table"
-             " with value pointer %p\n", plock);
     if (plock) {
         return plock;
     }
@@ -1997,6 +1994,10 @@ static struct lo_inode_plock *lookup_create_plock_ctx(struct lo_data *lo,
     plock->fd = fd;
     g_hash_table_insert(inode->posix_locks, GUINT_TO_POINTER(plock->lock_owner),
                         plock);
+    fuse_log(FUSE_LOG_DEBUG, "lookup_create_plock_ctx():"
+             " Inserted element in posix_locks hash table"
+             " with value pointer %p\n", plock);
+
     return plock;
 }
 
@@ -2133,7 +2134,21 @@ out:
     if (!async_lock) {
         fuse_reply_err(req, saverr);
     } else {
-        fuse_lowlevel_notify_lock(se, unique, saverr);
+        /*
+         * Before attempting to send any message through
+         * the thread should check if the queue actually
+         * exists
+         */
+        if (!se->in_cleanup) {
+            fuse_lowlevel_notify_lock(se, unique, saverr);
+        } else {
+            /* Release the locks */
+            lock->l_type = F_UNLCK;
+            lock->l_whence = SEEK_SET;
+            /* Unlock whole file */
+            lock->l_start = lock->l_len = 0;
+            fcntl(ofd, F_OFD_SETLKW, lock);
+        }
     }
 }
 
-- 
2.27.0


^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [PATCH 6/6] virtiofsd: Custom threadpool for remote blocking posix locks requests
  2021-06-16 19:39 ` [Virtio-fs] " Ioannis Angelakopoulos
@ 2021-06-16 19:39   ` Ioannis Angelakopoulos
  -1 siblings, 0 replies; 14+ messages in thread
From: Ioannis Angelakopoulos @ 2021-06-16 19:39 UTC (permalink / raw)
  To: qemu-devel, virtio-fs; +Cc: iangelak, stefanha, dgilbert, vgoyal

Add a new custom threadpool using posix threads that specifically
service locking requests.

In the case of a fcntl(SETLKW) request, if the guest is waiting
for a lock or locks and issues a hard-reboot through SYSRQ then virtiofsd
unblocks the blocked threads by sending a signal to them and waking
them up.

The current threadpool (GThreadPool) is not adequate to service the
locking requests that result in a thread blocking. That is because
GLib does not provide an API to cancel the request while it is
serviced by a thread. In addition, a user might be running virtiofsd
without a threadpool (--thread-pool-size=0), thus a locking request
that blocks, will block the main virtqueue thread that services requests
from servicing any other requests.

Then virtiofsd proceeds to cleanup the state of the threads, release
them back to the system and re-initialize.

Signed-off-by: Ioannis Angelakopoulos <iangelak@redhat.com>
---
 tools/virtiofsd/fuse_virtio.c | 407 +++++++++++++++++++++++++++++++++-
 1 file changed, 403 insertions(+), 4 deletions(-)

diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c
index 7efaf9ae68..b23aff5a50 100644
--- a/tools/virtiofsd/fuse_virtio.c
+++ b/tools/virtiofsd/fuse_virtio.c
@@ -29,6 +29,45 @@
 #include "libvhost-user.h"
 
 struct fv_VuDev;
+
+/*
+ * Create a separate thread pool for handling locking requests. This way we
+ * can safely monitor, wake up and clean the threads during a hard-reboot
+ */
+
+struct fv_LockReq {
+    struct fv_LockReq *next;                        /* pointer to next task */
+    void (*worker_func)(void *arg1, void *arg2);    /* worker function */
+    void *arg1;                                     /* 1st arg: Request */
+    void *arg2;                                     /* 2nd arg: Virtqueue */
+} fv_LockReq;
+
+struct fv_LockReqQueue {
+    pthread_mutex_t lock;
+    struct fv_LockReq *head;                        /* Front of the queue */
+    struct fv_LockReq *tail;                        /* Back of the queue */
+    pthread_cond_t notify;                         /* Conditional variable */
+    int size;                                       /* Size of the queue */
+
+} fv_LockTaskQueue;
+
+struct fv_LockThread {
+    pthread_t pthread;
+    int alive;
+    int id;
+    struct fv_LockThreadPool *lock_t_pool;
+};
+
+struct fv_LockThreadPool {
+    struct fv_LockThread **threads;
+    struct fv_LockReqQueue *lreq_queue;              /* Locking Request Queue*/
+    pthread_mutex_t tp_lock;
+
+    int num_threads;                                 /* Total threads */
+    int created;                                     /* Threads created */
+    int destroy_pool;                                /* Destroy pool flag */
+};
+
 struct fv_QueueInfo {
     pthread_t thread;
     /*
@@ -710,6 +749,325 @@ out:
     free(req);
 }
 
+/* Reuse of code in fv fv_queue_worker. Need to clean up */
+static int fv_get_request_opcode(gpointer data, gpointer user_data)
+{
+    struct fv_QueueInfo *qi = user_data;
+    struct fuse_session *se = qi->virtio_dev->se;
+    FVRequest *req = data;
+    VuVirtqElement *elem = &req->elem;
+    struct fuse_buf fbuf = {};
+    struct fuse_in_header inh;
+
+    assert(se->bufsize > sizeof(struct fuse_in_header));
+
+    /*
+     * An element contains one request and the space to send our response
+     * They're spread over multiple descriptors in a scatter/gather set
+     * and we can't trust the guest to keep them still; so copy in/out.
+     */
+    fbuf.mem = g_malloc(se->bufsize);
+
+    /* The 'out' part of the elem is from qemu */
+    unsigned int out_num = elem->out_num;
+    struct iovec *out_sg = elem->out_sg;
+    size_t out_len = iov_size(out_sg, out_num);
+    fuse_log(FUSE_LOG_DEBUG,
+             "%s: elem %d: with %d out desc of length %zd\n",
+             __func__, elem->index, out_num, out_len);
+
+    /*
+     * The elem should contain a 'fuse_in_header' (in to fuse)
+     * plus the data based on the len in the header.
+     */
+    if (out_len < sizeof(struct fuse_in_header)) {
+        fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
+                 __func__, elem->index);
+        assert(0); /* TODO */
+    }
+    if (out_len > se->bufsize) {
+        fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n", __func__,
+                 elem->index);
+        assert(0); /* TODO */
+    }
+    /* Copy just the fuse_in_header and get the request opcode */
+    copy_from_iov(&fbuf, out_num, out_sg,
+                  sizeof(struct fuse_in_header));
+    memcpy(&inh, fbuf.mem, sizeof(struct fuse_in_header));
+
+    g_free(fbuf.mem);
+    /* Return the request opcode */
+    return inh.opcode;
+}
+
+/* Initialize the Locking Request Queue */
+static struct fv_LockReqQueue *fv_lock_request_queue_init(void)
+{
+    struct fv_LockReqQueue *lock_req_queue;
+
+    lock_req_queue = g_new(struct fv_LockReqQueue, 1);
+    lock_req_queue->size = 0;
+    lock_req_queue->head = NULL;
+    lock_req_queue->tail = NULL;
+
+    pthread_mutex_init(&(lock_req_queue->lock), NULL);
+    pthread_cond_init(&(lock_req_queue->notify), NULL);
+
+    return lock_req_queue;
+}
+
+/* Push a new locking request to the queue*/
+static void fv_lock_tpool_push(struct fv_LockThreadPool *tpool,
+                        void (*worker_func)(void *, void *),
+                        void *arg1, void *arg2)
+{
+    struct fv_LockReq *newreq;
+
+    newreq = g_new(struct fv_LockReq, 1);
+    newreq->worker_func = worker_func;
+    newreq->arg1 = arg1;
+    newreq->arg2 = arg2;
+    newreq->next = NULL;
+
+    /* Now add the request to the queue */
+    pthread_mutex_lock(&tpool->lreq_queue->lock);
+
+    if (tpool->lreq_queue->size == 0) {
+        tpool->lreq_queue->head = newreq;
+        tpool->lreq_queue->tail = newreq;
+    } else {
+        tpool->lreq_queue->tail->next = newreq;
+        tpool->lreq_queue->tail = tpool->lreq_queue->tail->next;
+    }
+
+    tpool->lreq_queue->size++;
+
+    /* Notify the threads that a request is available */
+    pthread_cond_signal(&tpool->lreq_queue->notify);
+
+    pthread_mutex_unlock(&tpool->lreq_queue->lock);
+
+}
+
+/* Pop a locking request from the queue*/
+static struct fv_LockReq *fv_lock_tpool_pop(struct fv_LockThreadPool *tpool)
+{
+    struct fv_LockReq *lock_req;
+
+    pthread_mutex_lock(&tpool->lreq_queue->lock);
+
+    lock_req = tpool->lreq_queue->head;
+
+    /* Must remove the element from the queue */
+    if (!tpool->lreq_queue->size) {
+        ;
+    } else if (tpool->lreq_queue->size == 1) {
+        tpool->lreq_queue->head = NULL;
+        tpool->lreq_queue->tail = NULL;
+        tpool->lreq_queue->size--;
+    } else {
+        tpool->lreq_queue->head = tpool->lreq_queue->head->next;
+        tpool->lreq_queue->size--;
+        /*
+         * Notify the rest of the threads
+         * that a request is available
+         */
+        pthread_cond_signal(&tpool->lreq_queue->notify);
+    }
+
+    pthread_mutex_unlock(&tpool->lreq_queue->lock);
+
+    return lock_req;
+
+}
+
+static void fv_lock_request_queue_destroy(struct fv_LockThreadPool *tpool)
+{
+    while (tpool->lreq_queue->size) {
+        g_free(fv_lock_tpool_pop(tpool));
+    }
+
+    /* Now free the actual queue itself */
+    g_free(tpool->lreq_queue);
+}
+
+/*
+ * Signal handler for blcking threads that wait on a remote lock to be released
+ * Called when virtiofsd does cleanup and wants to wake up these threads
+ */
+static void fv_thread_unblock_handler(int signal)
+{
+    fuse_log(FUSE_LOG_INFO, "Thread received a wake up signal...unblocking\n");
+    return;
+}
+
+static void *fv_lock_thread_do_work(void *thread)
+{
+    struct fv_LockThread *lk_thread = (struct fv_LockThread *)thread;
+    struct fv_LockThreadPool *tpool = lk_thread->lock_t_pool;
+    struct fv_LockReq *lock_request;
+    /* Actual worker function and arguments. Same as non locking requests */
+    void (*worker_func)(void*, void*);
+    void *arg1;
+    void *arg2;
+
+    /*
+     * Register a signal handler to wake up the thread when it is blocking on
+     * waiting for a lock
+     */
+    struct sigaction sa;
+    sigemptyset(&sa.sa_mask);
+    sa.sa_flags = 0;
+    sa.sa_handler = fv_thread_unblock_handler;
+    if (sigaction(SIGUSR1, &sa, NULL) == -1) {
+        fuse_log(FUSE_LOG_ERR, "Cannot register the signal handler for"
+                 " thread %d\n", lk_thread->id);
+    }
+
+    while (!tpool->destroy_pool) {
+        /*
+         * Get the queue lock first so that we can wait on the conditional
+         * variable afterwards
+         */
+        pthread_mutex_lock(&tpool->lreq_queue->lock);
+
+        /* Wait on the condition variable until it is available */
+        while (tpool->lreq_queue->size == 0 && !tpool->destroy_pool) {
+            pthread_cond_wait(&tpool->lreq_queue->notify,
+                            &tpool->lreq_queue->lock);
+        }
+
+        /* Unlock the queue for other threads */
+        pthread_mutex_unlock(&tpool->lreq_queue->lock);
+
+        if (tpool->destroy_pool) {
+            break;
+        }
+
+        /* Now the request must be serviced */
+        lock_request = fv_lock_tpool_pop(tpool);
+
+        if (lock_request && !tpool->destroy_pool) {
+            fuse_log(FUSE_LOG_DEBUG, "%s: Locking Thread:%d handling"
+                    " a request\n", __func__, lk_thread->id);
+            worker_func = lock_request->worker_func;
+            arg1 = lock_request->arg1;
+            arg2 = lock_request->arg2;
+            worker_func(arg1, arg2);
+            g_free(lock_request);
+        }
+    }
+
+    /* Mark the thread as inactive */
+    pthread_mutex_lock(&tpool->tp_lock);
+    tpool->threads[lk_thread->id]->alive = 0;
+    tpool->created--;
+    pthread_mutex_unlock(&tpool->tp_lock);
+
+    return NULL;
+}
+
+/* Create a single thread that handles locking requests */
+static void fv_lock_thread_init(struct fv_LockThreadPool *tpool,
+                                struct fv_LockThread **l_thread, int id)
+{
+    *l_thread = g_new(struct fv_LockThread, 1);
+    (*l_thread)->lock_t_pool = tpool;
+    (*l_thread)->id = id;
+    (*l_thread)->alive = 1;
+
+    pthread_create(&(*l_thread)->pthread, NULL,
+                fv_lock_thread_do_work, (*l_thread));
+    pthread_detach((*l_thread)->pthread);
+}
+
+/* Initialize the thread pool for the locking posix threads */
+static struct fv_LockThreadPool *fv_lock_thread_pool_init(int thread_num)
+{
+    struct fv_LockThreadPool *tpool = NULL;
+    int i;
+
+    if (thread_num < 0) {
+        thread_num = 0;
+    }
+
+    tpool = g_new(struct fv_LockThreadPool, 1);
+    tpool->num_threads = 0;
+    tpool->destroy_pool = 0;
+    tpool->created = 0;
+    pthread_mutex_init(&(tpool->tp_lock), NULL);
+
+    /* Initialize the Lock Request Queue */
+    tpool->lreq_queue = fv_lock_request_queue_init();
+
+    /* Create the threads in the pool */
+    tpool->threads = g_new(struct fv_LockThread *, thread_num);
+
+    for (i = 0; i < thread_num; i++) {
+        fv_lock_thread_init(tpool, &tpool->threads[i], i);
+        tpool->num_threads++;
+        tpool->created++;
+    }
+
+    return tpool;
+}
+
+static void fv_lock_thread_pool_destroy(struct fv_LockThreadPool *tpool)
+{
+    int i, tmp;
+
+    if (!tpool) {
+        return;
+    }
+
+     /*Get the lock to the queue */
+    pthread_mutex_lock(&tpool->lreq_queue->lock);
+
+    /* We want to destroy the pool */
+    pthread_mutex_lock(&tpool->tp_lock);
+    tpool->destroy_pool = 1;
+    pthread_mutex_unlock(&tpool->tp_lock);
+
+    /* Wake up threads waiting for requests */
+    pthread_cond_broadcast(&tpool->lreq_queue->notify);
+    pthread_mutex_unlock(&tpool->lreq_queue->lock);
+
+    for (i = 0; i < tpool->num_threads; i++) {
+        /*
+         * Even though the threads are notified about the conditional variable
+         * there still might be blocking threads on a request. Signal them to
+         * wake up
+         */
+        if (tpool->threads[i]->alive) {
+            pthread_kill(tpool->threads[i]->pthread, SIGUSR1);
+        }
+    }
+
+    /*
+     * Now wait for the threads to exit before releasing the pool resources
+     * back to the system
+     */
+    while (1) {
+        pthread_mutex_lock(&tpool->tp_lock);
+        tmp = tpool->created;
+        pthread_mutex_unlock(&tpool->tp_lock);
+        if (tmp == 0) {
+            break;
+        }
+    }
+
+    /* Destroy the locking request queue */
+    fv_lock_request_queue_destroy(tpool);
+    for (i = 0; i < tpool->num_threads; i++) {
+        g_free(tpool->threads[i]);
+    }
+
+    /* Now free the threadpool */
+    g_free(tpool->threads);
+    g_free(tpool);
+
+}
+
 /* Thread function for individual queues, created when a queue is 'started' */
 static void *fv_queue_thread(void *opaque)
 {
@@ -717,18 +1075,36 @@ static void *fv_queue_thread(void *opaque)
     struct VuDev *dev = &qi->virtio_dev->dev;
     struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
     struct fuse_session *se = qi->virtio_dev->se;
+    struct fv_LockThreadPool *lk_tpool = NULL;
+    int request_opcode;
     GThreadPool *pool = NULL;
     GList *req_list = NULL;
 
     if (se->thread_pool_size) {
+        /* Create the GThreadPool to handle normal requests */
         fuse_log(FUSE_LOG_DEBUG, "%s: Creating thread pool for Queue %d\n",
-                 __func__, qi->qidx);
+             __func__, qi->qidx);
         pool = g_thread_pool_new(fv_queue_worker, qi, se->thread_pool_size,
-                                 FALSE, NULL);
+                     FALSE, NULL);
         if (!pool) {
             fuse_log(FUSE_LOG_ERR, "%s: g_thread_pool_new failed\n", __func__);
             return NULL;
         }
+
+    }
+
+    fuse_log(FUSE_LOG_DEBUG, "%s: Creating a locking thread pool for"
+            " Queue %d with size %d\n", __func__, qi->qidx, 4);
+    /*
+     * Create the custom thread pool to handle locking requests
+     * TODO: Add or remove threads dynamically from the queue depending on
+     * the number of locking requests that are pending
+     */
+    lk_tpool = fv_lock_thread_pool_init(4);
+    if (!lk_tpool) {
+        fuse_log(FUSE_LOG_ERR, "%s: fv_lock_thread_pool"
+                " failed\n", __func__);
+        return NULL;
     }
 
     fuse_log(FUSE_LOG_INFO, "%s: Start for queue %d kick_fd %d\n", __func__,
@@ -801,10 +1177,28 @@ static void *fv_queue_thread(void *opaque)
 
             req->reply_sent = false;
 
+            /*
+             * In every case we get the opcode of the request and check if it
+             * is a locking request. If yes, we assign the request to the
+             * custom thread pool
+             */
+            request_opcode = fv_get_request_opcode(req, qi);
             if (!se->thread_pool_size) {
-                req_list = g_list_prepend(req_list, req);
+                if (request_opcode == FUSE_GETLK ||
+                        request_opcode == FUSE_SETLK ||
+                        request_opcode == FUSE_SETLKW) {
+                    fv_lock_tpool_push(lk_tpool, fv_queue_worker, req, qi);
+                } else {
+                    req_list = g_list_prepend(req_list, req);
+                }
             } else {
-                g_thread_pool_push(pool, req, NULL);
+                if (request_opcode == FUSE_GETLK ||
+                        request_opcode == FUSE_SETLK ||
+                        request_opcode == FUSE_SETLKW) {
+                    fv_lock_tpool_push(lk_tpool, fv_queue_worker, req, qi);
+                } else {
+                    g_thread_pool_push(pool, req, NULL);
+                }
             }
         }
 
@@ -819,10 +1213,15 @@ static void *fv_queue_thread(void *opaque)
         }
     }
 
+    /* Free the pools */
     if (pool) {
         g_thread_pool_free(pool, FALSE, TRUE);
     }
 
+    if (lk_tpool) {
+        fv_lock_thread_pool_destroy(lk_tpool);
+    }
+
     return NULL;
 }
 
-- 
2.27.0



^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [Virtio-fs] [PATCH 6/6] virtiofsd: Custom threadpool for remote blocking posix locks requests
@ 2021-06-16 19:39   ` Ioannis Angelakopoulos
  0 siblings, 0 replies; 14+ messages in thread
From: Ioannis Angelakopoulos @ 2021-06-16 19:39 UTC (permalink / raw)
  To: qemu-devel, virtio-fs; +Cc: vgoyal

Add a new custom threadpool using posix threads that specifically
service locking requests.

In the case of a fcntl(SETLKW) request, if the guest is waiting
for a lock or locks and issues a hard-reboot through SYSRQ then virtiofsd
unblocks the blocked threads by sending a signal to them and waking
them up.

The current threadpool (GThreadPool) is not adequate to service the
locking requests that result in a thread blocking. That is because
GLib does not provide an API to cancel the request while it is
serviced by a thread. In addition, a user might be running virtiofsd
without a threadpool (--thread-pool-size=0), thus a locking request
that blocks, will block the main virtqueue thread that services requests
from servicing any other requests.

Then virtiofsd proceeds to cleanup the state of the threads, release
them back to the system and re-initialize.

Signed-off-by: Ioannis Angelakopoulos <iangelak@redhat.com>
---
 tools/virtiofsd/fuse_virtio.c | 407 +++++++++++++++++++++++++++++++++-
 1 file changed, 403 insertions(+), 4 deletions(-)

diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c
index 7efaf9ae68..b23aff5a50 100644
--- a/tools/virtiofsd/fuse_virtio.c
+++ b/tools/virtiofsd/fuse_virtio.c
@@ -29,6 +29,45 @@
 #include "libvhost-user.h"
 
 struct fv_VuDev;
+
+/*
+ * Create a separate thread pool for handling locking requests. This way we
+ * can safely monitor, wake up and clean the threads during a hard-reboot
+ */
+
+struct fv_LockReq {
+    struct fv_LockReq *next;                        /* pointer to next task */
+    void (*worker_func)(void *arg1, void *arg2);    /* worker function */
+    void *arg1;                                     /* 1st arg: Request */
+    void *arg2;                                     /* 2nd arg: Virtqueue */
+} fv_LockReq;
+
+struct fv_LockReqQueue {
+    pthread_mutex_t lock;
+    struct fv_LockReq *head;                        /* Front of the queue */
+    struct fv_LockReq *tail;                        /* Back of the queue */
+    pthread_cond_t notify;                         /* Conditional variable */
+    int size;                                       /* Size of the queue */
+
+} fv_LockTaskQueue;
+
+struct fv_LockThread {
+    pthread_t pthread;
+    int alive;
+    int id;
+    struct fv_LockThreadPool *lock_t_pool;
+};
+
+struct fv_LockThreadPool {
+    struct fv_LockThread **threads;
+    struct fv_LockReqQueue *lreq_queue;              /* Locking Request Queue*/
+    pthread_mutex_t tp_lock;
+
+    int num_threads;                                 /* Total threads */
+    int created;                                     /* Threads created */
+    int destroy_pool;                                /* Destroy pool flag */
+};
+
 struct fv_QueueInfo {
     pthread_t thread;
     /*
@@ -710,6 +749,325 @@ out:
     free(req);
 }
 
+/* Reuse of code in fv fv_queue_worker. Need to clean up */
+static int fv_get_request_opcode(gpointer data, gpointer user_data)
+{
+    struct fv_QueueInfo *qi = user_data;
+    struct fuse_session *se = qi->virtio_dev->se;
+    FVRequest *req = data;
+    VuVirtqElement *elem = &req->elem;
+    struct fuse_buf fbuf = {};
+    struct fuse_in_header inh;
+
+    assert(se->bufsize > sizeof(struct fuse_in_header));
+
+    /*
+     * An element contains one request and the space to send our response
+     * They're spread over multiple descriptors in a scatter/gather set
+     * and we can't trust the guest to keep them still; so copy in/out.
+     */
+    fbuf.mem = g_malloc(se->bufsize);
+
+    /* The 'out' part of the elem is from qemu */
+    unsigned int out_num = elem->out_num;
+    struct iovec *out_sg = elem->out_sg;
+    size_t out_len = iov_size(out_sg, out_num);
+    fuse_log(FUSE_LOG_DEBUG,
+             "%s: elem %d: with %d out desc of length %zd\n",
+             __func__, elem->index, out_num, out_len);
+
+    /*
+     * The elem should contain a 'fuse_in_header' (in to fuse)
+     * plus the data based on the len in the header.
+     */
+    if (out_len < sizeof(struct fuse_in_header)) {
+        fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
+                 __func__, elem->index);
+        assert(0); /* TODO */
+    }
+    if (out_len > se->bufsize) {
+        fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n", __func__,
+                 elem->index);
+        assert(0); /* TODO */
+    }
+    /* Copy just the fuse_in_header and get the request opcode */
+    copy_from_iov(&fbuf, out_num, out_sg,
+                  sizeof(struct fuse_in_header));
+    memcpy(&inh, fbuf.mem, sizeof(struct fuse_in_header));
+
+    g_free(fbuf.mem);
+    /* Return the request opcode */
+    return inh.opcode;
+}
+
+/* Initialize the Locking Request Queue */
+static struct fv_LockReqQueue *fv_lock_request_queue_init(void)
+{
+    struct fv_LockReqQueue *lock_req_queue;
+
+    lock_req_queue = g_new(struct fv_LockReqQueue, 1);
+    lock_req_queue->size = 0;
+    lock_req_queue->head = NULL;
+    lock_req_queue->tail = NULL;
+
+    pthread_mutex_init(&(lock_req_queue->lock), NULL);
+    pthread_cond_init(&(lock_req_queue->notify), NULL);
+
+    return lock_req_queue;
+}
+
+/* Push a new locking request to the queue*/
+static void fv_lock_tpool_push(struct fv_LockThreadPool *tpool,
+                        void (*worker_func)(void *, void *),
+                        void *arg1, void *arg2)
+{
+    struct fv_LockReq *newreq;
+
+    newreq = g_new(struct fv_LockReq, 1);
+    newreq->worker_func = worker_func;
+    newreq->arg1 = arg1;
+    newreq->arg2 = arg2;
+    newreq->next = NULL;
+
+    /* Now add the request to the queue */
+    pthread_mutex_lock(&tpool->lreq_queue->lock);
+
+    if (tpool->lreq_queue->size == 0) {
+        tpool->lreq_queue->head = newreq;
+        tpool->lreq_queue->tail = newreq;
+    } else {
+        tpool->lreq_queue->tail->next = newreq;
+        tpool->lreq_queue->tail = tpool->lreq_queue->tail->next;
+    }
+
+    tpool->lreq_queue->size++;
+
+    /* Notify the threads that a request is available */
+    pthread_cond_signal(&tpool->lreq_queue->notify);
+
+    pthread_mutex_unlock(&tpool->lreq_queue->lock);
+
+}
+
+/* Pop a locking request from the queue*/
+static struct fv_LockReq *fv_lock_tpool_pop(struct fv_LockThreadPool *tpool)
+{
+    struct fv_LockReq *lock_req;
+
+    pthread_mutex_lock(&tpool->lreq_queue->lock);
+
+    lock_req = tpool->lreq_queue->head;
+
+    /* Must remove the element from the queue */
+    if (!tpool->lreq_queue->size) {
+        ;
+    } else if (tpool->lreq_queue->size == 1) {
+        tpool->lreq_queue->head = NULL;
+        tpool->lreq_queue->tail = NULL;
+        tpool->lreq_queue->size--;
+    } else {
+        tpool->lreq_queue->head = tpool->lreq_queue->head->next;
+        tpool->lreq_queue->size--;
+        /*
+         * Notify the rest of the threads
+         * that a request is available
+         */
+        pthread_cond_signal(&tpool->lreq_queue->notify);
+    }
+
+    pthread_mutex_unlock(&tpool->lreq_queue->lock);
+
+    return lock_req;
+
+}
+
+static void fv_lock_request_queue_destroy(struct fv_LockThreadPool *tpool)
+{
+    while (tpool->lreq_queue->size) {
+        g_free(fv_lock_tpool_pop(tpool));
+    }
+
+    /* Now free the actual queue itself */
+    g_free(tpool->lreq_queue);
+}
+
+/*
+ * Signal handler for blcking threads that wait on a remote lock to be released
+ * Called when virtiofsd does cleanup and wants to wake up these threads
+ */
+static void fv_thread_unblock_handler(int signal)
+{
+    fuse_log(FUSE_LOG_INFO, "Thread received a wake up signal...unblocking\n");
+    return;
+}
+
+static void *fv_lock_thread_do_work(void *thread)
+{
+    struct fv_LockThread *lk_thread = (struct fv_LockThread *)thread;
+    struct fv_LockThreadPool *tpool = lk_thread->lock_t_pool;
+    struct fv_LockReq *lock_request;
+    /* Actual worker function and arguments. Same as non locking requests */
+    void (*worker_func)(void*, void*);
+    void *arg1;
+    void *arg2;
+
+    /*
+     * Register a signal handler to wake up the thread when it is blocking on
+     * waiting for a lock
+     */
+    struct sigaction sa;
+    sigemptyset(&sa.sa_mask);
+    sa.sa_flags = 0;
+    sa.sa_handler = fv_thread_unblock_handler;
+    if (sigaction(SIGUSR1, &sa, NULL) == -1) {
+        fuse_log(FUSE_LOG_ERR, "Cannot register the signal handler for"
+                 " thread %d\n", lk_thread->id);
+    }
+
+    while (!tpool->destroy_pool) {
+        /*
+         * Get the queue lock first so that we can wait on the conditional
+         * variable afterwards
+         */
+        pthread_mutex_lock(&tpool->lreq_queue->lock);
+
+        /* Wait on the condition variable until it is available */
+        while (tpool->lreq_queue->size == 0 && !tpool->destroy_pool) {
+            pthread_cond_wait(&tpool->lreq_queue->notify,
+                            &tpool->lreq_queue->lock);
+        }
+
+        /* Unlock the queue for other threads */
+        pthread_mutex_unlock(&tpool->lreq_queue->lock);
+
+        if (tpool->destroy_pool) {
+            break;
+        }
+
+        /* Now the request must be serviced */
+        lock_request = fv_lock_tpool_pop(tpool);
+
+        if (lock_request && !tpool->destroy_pool) {
+            fuse_log(FUSE_LOG_DEBUG, "%s: Locking Thread:%d handling"
+                    " a request\n", __func__, lk_thread->id);
+            worker_func = lock_request->worker_func;
+            arg1 = lock_request->arg1;
+            arg2 = lock_request->arg2;
+            worker_func(arg1, arg2);
+            g_free(lock_request);
+        }
+    }
+
+    /* Mark the thread as inactive */
+    pthread_mutex_lock(&tpool->tp_lock);
+    tpool->threads[lk_thread->id]->alive = 0;
+    tpool->created--;
+    pthread_mutex_unlock(&tpool->tp_lock);
+
+    return NULL;
+}
+
+/* Create a single thread that handles locking requests */
+static void fv_lock_thread_init(struct fv_LockThreadPool *tpool,
+                                struct fv_LockThread **l_thread, int id)
+{
+    *l_thread = g_new(struct fv_LockThread, 1);
+    (*l_thread)->lock_t_pool = tpool;
+    (*l_thread)->id = id;
+    (*l_thread)->alive = 1;
+
+    pthread_create(&(*l_thread)->pthread, NULL,
+                fv_lock_thread_do_work, (*l_thread));
+    pthread_detach((*l_thread)->pthread);
+}
+
+/* Initialize the thread pool for the locking posix threads */
+static struct fv_LockThreadPool *fv_lock_thread_pool_init(int thread_num)
+{
+    struct fv_LockThreadPool *tpool = NULL;
+    int i;
+
+    if (thread_num < 0) {
+        thread_num = 0;
+    }
+
+    tpool = g_new(struct fv_LockThreadPool, 1);
+    tpool->num_threads = 0;
+    tpool->destroy_pool = 0;
+    tpool->created = 0;
+    pthread_mutex_init(&(tpool->tp_lock), NULL);
+
+    /* Initialize the Lock Request Queue */
+    tpool->lreq_queue = fv_lock_request_queue_init();
+
+    /* Create the threads in the pool */
+    tpool->threads = g_new(struct fv_LockThread *, thread_num);
+
+    for (i = 0; i < thread_num; i++) {
+        fv_lock_thread_init(tpool, &tpool->threads[i], i);
+        tpool->num_threads++;
+        tpool->created++;
+    }
+
+    return tpool;
+}
+
+static void fv_lock_thread_pool_destroy(struct fv_LockThreadPool *tpool)
+{
+    int i, tmp;
+
+    if (!tpool) {
+        return;
+    }
+
+     /*Get the lock to the queue */
+    pthread_mutex_lock(&tpool->lreq_queue->lock);
+
+    /* We want to destroy the pool */
+    pthread_mutex_lock(&tpool->tp_lock);
+    tpool->destroy_pool = 1;
+    pthread_mutex_unlock(&tpool->tp_lock);
+
+    /* Wake up threads waiting for requests */
+    pthread_cond_broadcast(&tpool->lreq_queue->notify);
+    pthread_mutex_unlock(&tpool->lreq_queue->lock);
+
+    for (i = 0; i < tpool->num_threads; i++) {
+        /*
+         * Even though the threads are notified about the conditional variable
+         * there still might be blocking threads on a request. Signal them to
+         * wake up
+         */
+        if (tpool->threads[i]->alive) {
+            pthread_kill(tpool->threads[i]->pthread, SIGUSR1);
+        }
+    }
+
+    /*
+     * Now wait for the threads to exit before releasing the pool resources
+     * back to the system
+     */
+    while (1) {
+        pthread_mutex_lock(&tpool->tp_lock);
+        tmp = tpool->created;
+        pthread_mutex_unlock(&tpool->tp_lock);
+        if (tmp == 0) {
+            break;
+        }
+    }
+
+    /* Destroy the locking request queue */
+    fv_lock_request_queue_destroy(tpool);
+    for (i = 0; i < tpool->num_threads; i++) {
+        g_free(tpool->threads[i]);
+    }
+
+    /* Now free the threadpool */
+    g_free(tpool->threads);
+    g_free(tpool);
+
+}
+
 /* Thread function for individual queues, created when a queue is 'started' */
 static void *fv_queue_thread(void *opaque)
 {
@@ -717,18 +1075,36 @@ static void *fv_queue_thread(void *opaque)
     struct VuDev *dev = &qi->virtio_dev->dev;
     struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
     struct fuse_session *se = qi->virtio_dev->se;
+    struct fv_LockThreadPool *lk_tpool = NULL;
+    int request_opcode;
     GThreadPool *pool = NULL;
     GList *req_list = NULL;
 
     if (se->thread_pool_size) {
+        /* Create the GThreadPool to handle normal requests */
         fuse_log(FUSE_LOG_DEBUG, "%s: Creating thread pool for Queue %d\n",
-                 __func__, qi->qidx);
+             __func__, qi->qidx);
         pool = g_thread_pool_new(fv_queue_worker, qi, se->thread_pool_size,
-                                 FALSE, NULL);
+                     FALSE, NULL);
         if (!pool) {
             fuse_log(FUSE_LOG_ERR, "%s: g_thread_pool_new failed\n", __func__);
             return NULL;
         }
+
+    }
+
+    fuse_log(FUSE_LOG_DEBUG, "%s: Creating a locking thread pool for"
+            " Queue %d with size %d\n", __func__, qi->qidx, 4);
+    /*
+     * Create the custom thread pool to handle locking requests
+     * TODO: Add or remove threads dynamically from the queue depending on
+     * the number of locking requests that are pending
+     */
+    lk_tpool = fv_lock_thread_pool_init(4);
+    if (!lk_tpool) {
+        fuse_log(FUSE_LOG_ERR, "%s: fv_lock_thread_pool"
+                " failed\n", __func__);
+        return NULL;
     }
 
     fuse_log(FUSE_LOG_INFO, "%s: Start for queue %d kick_fd %d\n", __func__,
@@ -801,10 +1177,28 @@ static void *fv_queue_thread(void *opaque)
 
             req->reply_sent = false;
 
+            /*
+             * In every case we get the opcode of the request and check if it
+             * is a locking request. If yes, we assign the request to the
+             * custom thread pool
+             */
+            request_opcode = fv_get_request_opcode(req, qi);
             if (!se->thread_pool_size) {
-                req_list = g_list_prepend(req_list, req);
+                if (request_opcode == FUSE_GETLK ||
+                        request_opcode == FUSE_SETLK ||
+                        request_opcode == FUSE_SETLKW) {
+                    fv_lock_tpool_push(lk_tpool, fv_queue_worker, req, qi);
+                } else {
+                    req_list = g_list_prepend(req_list, req);
+                }
             } else {
-                g_thread_pool_push(pool, req, NULL);
+                if (request_opcode == FUSE_GETLK ||
+                        request_opcode == FUSE_SETLK ||
+                        request_opcode == FUSE_SETLKW) {
+                    fv_lock_tpool_push(lk_tpool, fv_queue_worker, req, qi);
+                } else {
+                    g_thread_pool_push(pool, req, NULL);
+                }
             }
         }
 
@@ -819,10 +1213,15 @@ static void *fv_queue_thread(void *opaque)
         }
     }
 
+    /* Free the pools */
     if (pool) {
         g_thread_pool_free(pool, FALSE, TRUE);
     }
 
+    if (lk_tpool) {
+        fv_lock_thread_pool_destroy(lk_tpool);
+    }
+
     return NULL;
 }
 
-- 
2.27.0


^ permalink raw reply related	[flat|nested] 14+ messages in thread

end of thread, other threads:[~2021-06-16 20:52 UTC | newest]

Thread overview: 14+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-06-16 19:39 [PATCH 0/6] virtiofsd: Support for remote blocking posix locks Ioannis Angelakopoulos
2021-06-16 19:39 ` [Virtio-fs] " Ioannis Angelakopoulos
2021-06-16 19:39 ` [PATCH 1/6] virtiofsd: Release file locks using F_UNLCK Ioannis Angelakopoulos
2021-06-16 19:39   ` [Virtio-fs] " Ioannis Angelakopoulos
2021-06-16 19:39 ` [PATCH 2/6] virtiofsd: Create a notification queue Ioannis Angelakopoulos
2021-06-16 19:39   ` [Virtio-fs] " Ioannis Angelakopoulos
2021-06-16 19:39 ` [PATCH 3/6] virtiofsd: Specify size of notification buffer using config space Ioannis Angelakopoulos
2021-06-16 19:39   ` [Virtio-fs] " Ioannis Angelakopoulos
2021-06-16 19:39 ` [PATCH 4/6] virtiofsd: Implement blocking posix locks Ioannis Angelakopoulos
2021-06-16 19:39   ` [Virtio-fs] " Ioannis Angelakopoulos
2021-06-16 19:39 ` [PATCH 5/6] virtiofsd: Thread state cleanup when blocking posix locks are used Ioannis Angelakopoulos
2021-06-16 19:39   ` [Virtio-fs] " Ioannis Angelakopoulos
2021-06-16 19:39 ` [PATCH 6/6] virtiofsd: Custom threadpool for remote blocking posix locks requests Ioannis Angelakopoulos
2021-06-16 19:39   ` [Virtio-fs] " Ioannis Angelakopoulos

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.