All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH] mini-os: xenbus: support large messages
@ 2021-08-18 15:26 Juergen Gross
  2021-09-10  6:40 ` Juergen Gross
  2021-09-14 23:17 ` Samuel Thibault
  0 siblings, 2 replies; 7+ messages in thread
From: Juergen Gross @ 2021-08-18 15:26 UTC (permalink / raw)
  To: minios-devel, xen-devel; +Cc: samuel.thibault, wl, Juergen Gross

Today the implementation of the xenbus protocol in Mini-OS will only
allow to transfer the complete message to or from the ring page buffer.
This is limiting the maximum message size to lower values as the xenbus
protocol normally would allow.

Change that by allowing to transfer the xenbus message in chunks as
soon as they are available.

Avoid crashing Mini-OS in case of illegal data read from the ring
buffer.

Signed-off-by: Juergen Gross <jgross@suse.com>
---
 xenbus/xenbus.c | 212 ++++++++++++++++++++++++++++--------------------
 1 file changed, 124 insertions(+), 88 deletions(-)

diff --git a/xenbus/xenbus.c b/xenbus/xenbus.c
index 23de61e..3fbb122 100644
--- a/xenbus/xenbus.c
+++ b/xenbus/xenbus.c
@@ -29,6 +29,7 @@
 #include <xen/hvm/params.h>
 #include <mini-os/spinlock.h>
 #include <mini-os/xmalloc.h>
+#include <mini-os/semaphore.h>
 
 #define min(x,y) ({                       \
         typeof(x) tmpx = (x);                 \
@@ -46,6 +47,7 @@
 static struct xenstore_domain_interface *xenstore_buf;
 static DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
 DECLARE_WAIT_QUEUE_HEAD(xenbus_watch_queue);
+static __DECLARE_SEMAPHORE_GENERIC(xb_write_sem, 1);
 
 xenbus_event_queue xenbus_events;
 static struct watch {
@@ -231,75 +233,105 @@ char *xenbus_wait_for_state_change(const char* path, XenbusState *state, xenbus_
 }
 
 
+static void xenbus_read_data(char *buf, unsigned int len)
+{
+    unsigned int off = 0;
+    unsigned int prod;
+    unsigned int size;
+    int notify;
+
+    while (off != len)
+    {
+        if (xenstore_buf->rsp_prod == xenstore_buf->rsp_cons)
+            wait_event(xb_waitq,
+                       xenstore_buf->rsp_prod != xenstore_buf->rsp_cons);
+
+        prod = xenstore_buf->rsp_prod;
+        DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons, prod);
+        size = min(len - off, prod - xenstore_buf->rsp_cons);
+        memcpy_from_ring(xenstore_buf->rsp, buf + off,
+                         MASK_XENSTORE_IDX(xenstore_buf->rsp_cons), size);
+        off += size;
+        notify = (xenstore_buf->rsp_cons + XENSTORE_RING_SIZE ==
+                  xenstore_buf->rsp_prod);
+        rmb();
+        xenstore_buf->rsp_cons += size;
+        wmb();
+        if (notify)
+            notify_remote_via_evtchn(xenbus_evtchn);
+    }
+}
+
 static void xenbus_thread_func(void *ign)
 {
     struct xsd_sockmsg msg;
-    unsigned prod = xenstore_buf->rsp_prod;
+    char *data;
 
     for (;;) {
-        wait_event(xb_waitq, prod != xenstore_buf->rsp_prod);
-        while (1) {
-            prod = xenstore_buf->rsp_prod;
-            DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons,
-                  xenstore_buf->rsp_prod);
-            if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons < sizeof(msg))
-                break;
-            rmb();
-            memcpy_from_ring(xenstore_buf->rsp, &msg,
-                             MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
-                             sizeof(msg));
-            DEBUG("Msg len %d, %d avail, id %d.\n", msg.len + sizeof(msg),
-                  xenstore_buf->rsp_prod - xenstore_buf->rsp_cons, msg.req_id);
-
-            if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons <
-                sizeof(msg) + msg.len)
-                break;
-
-            DEBUG("Message is good.\n");
-
-            if (msg.type == XS_WATCH_EVENT) {
-                struct xenbus_event *event = malloc(sizeof(*event) + msg.len);
-                xenbus_event_queue *events = NULL;
-                char *data = (char*)event + sizeof(*event);
-                struct watch *watch;
-
-                memcpy_from_ring(xenstore_buf->rsp, data,
-                    MASK_XENSTORE_IDX(xenstore_buf->rsp_cons + sizeof(msg)),
-                    msg.len);
-
-                event->path = data;
-                event->token = event->path + strlen(event->path) + 1;
-
-                mb();
-                xenstore_buf->rsp_cons += msg.len + sizeof(msg);
-
-                for (watch = watches; watch; watch = watch->next)
-                    if (!strcmp(watch->token, event->token)) {
-                        events = watch->events;
-                        break;
-                    }
-
-                if (events) {
-                    event->next = *events;
-                    *events = event;
-                    wake_up(&xenbus_watch_queue);
-                } else {
-                    printk("unexpected watch token %s\n", event->token);
-                    free(event);
+        xenbus_read_data((char *)&msg, sizeof(msg));
+        DEBUG("Msg len %d, %d avail, id %d.\n", msg.len + sizeof(msg),
+              xenstore_buf->rsp_prod - xenstore_buf->rsp_cons, msg.req_id);
+
+        if (msg.len > XENSTORE_PAYLOAD_MAX) {
+            printk("Xenstore violates protocol, message longer than allowed.\n");
+            return;
+        }
+
+        if (msg.type == XS_WATCH_EVENT) {
+            struct xenbus_event *event = malloc(sizeof(*event) + msg.len);
+            xenbus_event_queue *events = NULL;
+            struct watch *watch;
+            char *c;
+            int zeroes = 0;
+
+            data = (char*)event + sizeof(*event);
+            xenbus_read_data(data, msg.len);
+
+            for (c = data; c < data + msg.len; c++)
+                if (!*c)
+                    zeroes++;
+            if (zeroes != 2) {
+                printk("Xenstore: illegal watch event data\n");
+                free(event);
+                continue;
+            }
+
+            event->path = data;
+            event->token = event->path + strlen(event->path) + 1;
+
+            for (watch = watches; watch; watch = watch->next)
+                if (!strcmp(watch->token, event->token)) {
+                    events = watch->events;
+                    break;
                 }
+
+            if (events) {
+                event->next = *events;
+                *events = event;
+                wake_up(&xenbus_watch_queue);
             } else {
-                req_info[msg.req_id].reply = malloc(sizeof(msg) + msg.len);
-                memcpy_from_ring(xenstore_buf->rsp, req_info[msg.req_id].reply,
-                                 MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
-                                 msg.len + sizeof(msg));
-                mb();
-                xenstore_buf->rsp_cons += msg.len + sizeof(msg);
-                wake_up(&req_info[msg.req_id].waitq);
+                printk("Xenstore: unexpected watch token %s\n", event->token);
+                free(event);
             }
 
-            wmb();
-            notify_remote_via_evtchn(xenbus_evtchn);
+            continue;
         }
+
+        data = malloc(sizeof(msg) + msg.len);
+        memcpy(data, &msg, sizeof(msg));
+        xenbus_read_data(data + sizeof(msg), msg.len);
+
+        if (msg.req_id >= NR_REQS || !req_info[msg.req_id].in_use) {
+            printk("Xenstore: illegal request id %d\n", msg.req_id);
+            free(data);
+            continue;
+        }
+
+        DEBUG("Message is good.\n");
+
+        req_info[msg.req_id].reply = data;
+
+        wake_up(&req_info[msg.req_id].waitq);
     }
 }
 
@@ -451,36 +483,40 @@ static void xb_write(int type, int req_id, xenbus_transaction_t trans_id,
 
     cur_req = &header_req;
 
-    BUG_ON(len > XENSTORE_RING_SIZE);
-    /* Wait for the ring to drain to the point where we can send the
-       message. */
-    prod = xenstore_buf->req_prod;
-    if (prod + len - xenstore_buf->req_cons > XENSTORE_RING_SIZE) 
-    {
-        /* Wait for there to be space on the ring */
-        DEBUG("prod %d, len %d, cons %d, size %d; waiting.\n",
-                prod, len, xenstore_buf->req_cons, XENSTORE_RING_SIZE);
-        wait_event(xb_waitq,
-                xenstore_buf->req_prod + len - xenstore_buf->req_cons <=
-                XENSTORE_RING_SIZE);
-        DEBUG("Back from wait.\n");
-        prod = xenstore_buf->req_prod;
-    }
+    BUG_ON(len > XENSTORE_PAYLOAD_MAX);
 
-    /* We're now guaranteed to be able to send the message without
-       overflowing the ring.  Do so. */
+    /* Make sure we are the only thread trying to write. */
+    down(&xb_write_sem);
+
+    /* Send the message in chunks using free ring space when available. */
     total_off = 0;
     req_off = 0;
-    while (total_off < len) 
+    while (total_off < len)
     {
+        prod = xenstore_buf->req_prod;
+        if (prod - xenstore_buf->req_cons >= XENSTORE_RING_SIZE)
+        {
+            /* Send evtchn to notify remote */
+            notify_remote_via_evtchn(xenbus_evtchn);
+
+            /* Wait for there to be space on the ring */
+            DEBUG("prod %d, len %d, cons %d, size %d; waiting.\n", prod,
+                  len - total_off, xenstore_buf->req_cons, XENSTORE_RING_SIZE);
+            wait_event(xb_waitq,
+                       prod - xenstore_buf->req_cons < XENSTORE_RING_SIZE);
+            DEBUG("Back from wait.\n");
+        }
+
         this_chunk = min(cur_req->len - req_off,
-                XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod));
+                         XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod));
+        this_chunk = min(this_chunk,
+                         xenstore_buf->req_cons + XENSTORE_RING_SIZE - prod);
         memcpy((char *)xenstore_buf->req + MASK_XENSTORE_IDX(prod),
-                (char *)cur_req->data + req_off, this_chunk);
+               (char *)cur_req->data + req_off, this_chunk);
         prod += this_chunk;
         req_off += this_chunk;
         total_off += this_chunk;
-        if (req_off == cur_req->len) 
+        if (req_off == cur_req->len)
         {
             req_off = 0;
             if (cur_req == &header_req)
@@ -488,20 +524,20 @@ static void xb_write(int type, int req_id, xenbus_transaction_t trans_id,
             else
                 cur_req++;
         }
+
+        /* Remote must see entire message before updating indexes */
+        wmb();
+        xenstore_buf->req_prod = prod;
     }
 
+    /* Send evtchn to notify remote */
+    notify_remote_via_evtchn(xenbus_evtchn);
+
     DEBUG("Complete main loop of xb_write.\n");
     BUG_ON(req_off != 0);
     BUG_ON(total_off != len);
-    BUG_ON(prod > xenstore_buf->req_cons + XENSTORE_RING_SIZE);
-
-    /* Remote must see entire message before updating indexes */
-    wmb();
-
-    xenstore_buf->req_prod += len;
 
-    /* Send evtchn to notify remote */
-    notify_remote_via_evtchn(xenbus_evtchn);
+    up(&xb_write_sem);
 }
 
 /* Send a mesasge to xenbus, in the same fashion as xb_write, and
-- 
2.26.2



^ permalink raw reply related	[flat|nested] 7+ messages in thread

* Re: [PATCH] mini-os: xenbus: support large messages
  2021-08-18 15:26 [PATCH] mini-os: xenbus: support large messages Juergen Gross
@ 2021-09-10  6:40 ` Juergen Gross
  2021-09-14 23:17 ` Samuel Thibault
  1 sibling, 0 replies; 7+ messages in thread
From: Juergen Gross @ 2021-09-10  6:40 UTC (permalink / raw)
  To: minios-devel, xen-devel; +Cc: samuel.thibault, wl


[-- Attachment #1.1.1: Type: text/plain, Size: 11587 bytes --]

On 18.08.21 17:26, Juergen Gross wrote:
> Today the implementation of the xenbus protocol in Mini-OS will only
> allow to transfer the complete message to or from the ring page buffer.
> This is limiting the maximum message size to lower values as the xenbus
> protocol normally would allow.
> 
> Change that by allowing to transfer the xenbus message in chunks as
> soon as they are available.
> 
> Avoid crashing Mini-OS in case of illegal data read from the ring
> buffer.
> 
> Signed-off-by: Juergen Gross <jgross@suse.com>

Ping?


Juergen

> ---
>   xenbus/xenbus.c | 212 ++++++++++++++++++++++++++++--------------------
>   1 file changed, 124 insertions(+), 88 deletions(-)
> 
> diff --git a/xenbus/xenbus.c b/xenbus/xenbus.c
> index 23de61e..3fbb122 100644
> --- a/xenbus/xenbus.c
> +++ b/xenbus/xenbus.c
> @@ -29,6 +29,7 @@
>   #include <xen/hvm/params.h>
>   #include <mini-os/spinlock.h>
>   #include <mini-os/xmalloc.h>
> +#include <mini-os/semaphore.h>
>   
>   #define min(x,y) ({                       \
>           typeof(x) tmpx = (x);                 \
> @@ -46,6 +47,7 @@
>   static struct xenstore_domain_interface *xenstore_buf;
>   static DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
>   DECLARE_WAIT_QUEUE_HEAD(xenbus_watch_queue);
> +static __DECLARE_SEMAPHORE_GENERIC(xb_write_sem, 1);
>   
>   xenbus_event_queue xenbus_events;
>   static struct watch {
> @@ -231,75 +233,105 @@ char *xenbus_wait_for_state_change(const char* path, XenbusState *state, xenbus_
>   }
>   
>   
> +static void xenbus_read_data(char *buf, unsigned int len)
> +{
> +    unsigned int off = 0;
> +    unsigned int prod;
> +    unsigned int size;
> +    int notify;
> +
> +    while (off != len)
> +    {
> +        if (xenstore_buf->rsp_prod == xenstore_buf->rsp_cons)
> +            wait_event(xb_waitq,
> +                       xenstore_buf->rsp_prod != xenstore_buf->rsp_cons);
> +
> +        prod = xenstore_buf->rsp_prod;
> +        DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons, prod);
> +        size = min(len - off, prod - xenstore_buf->rsp_cons);
> +        memcpy_from_ring(xenstore_buf->rsp, buf + off,
> +                         MASK_XENSTORE_IDX(xenstore_buf->rsp_cons), size);
> +        off += size;
> +        notify = (xenstore_buf->rsp_cons + XENSTORE_RING_SIZE ==
> +                  xenstore_buf->rsp_prod);
> +        rmb();
> +        xenstore_buf->rsp_cons += size;
> +        wmb();
> +        if (notify)
> +            notify_remote_via_evtchn(xenbus_evtchn);
> +    }
> +}
> +
>   static void xenbus_thread_func(void *ign)
>   {
>       struct xsd_sockmsg msg;
> -    unsigned prod = xenstore_buf->rsp_prod;
> +    char *data;
>   
>       for (;;) {
> -        wait_event(xb_waitq, prod != xenstore_buf->rsp_prod);
> -        while (1) {
> -            prod = xenstore_buf->rsp_prod;
> -            DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons,
> -                  xenstore_buf->rsp_prod);
> -            if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons < sizeof(msg))
> -                break;
> -            rmb();
> -            memcpy_from_ring(xenstore_buf->rsp, &msg,
> -                             MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
> -                             sizeof(msg));
> -            DEBUG("Msg len %d, %d avail, id %d.\n", msg.len + sizeof(msg),
> -                  xenstore_buf->rsp_prod - xenstore_buf->rsp_cons, msg.req_id);
> -
> -            if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons <
> -                sizeof(msg) + msg.len)
> -                break;
> -
> -            DEBUG("Message is good.\n");
> -
> -            if (msg.type == XS_WATCH_EVENT) {
> -                struct xenbus_event *event = malloc(sizeof(*event) + msg.len);
> -                xenbus_event_queue *events = NULL;
> -                char *data = (char*)event + sizeof(*event);
> -                struct watch *watch;
> -
> -                memcpy_from_ring(xenstore_buf->rsp, data,
> -                    MASK_XENSTORE_IDX(xenstore_buf->rsp_cons + sizeof(msg)),
> -                    msg.len);
> -
> -                event->path = data;
> -                event->token = event->path + strlen(event->path) + 1;
> -
> -                mb();
> -                xenstore_buf->rsp_cons += msg.len + sizeof(msg);
> -
> -                for (watch = watches; watch; watch = watch->next)
> -                    if (!strcmp(watch->token, event->token)) {
> -                        events = watch->events;
> -                        break;
> -                    }
> -
> -                if (events) {
> -                    event->next = *events;
> -                    *events = event;
> -                    wake_up(&xenbus_watch_queue);
> -                } else {
> -                    printk("unexpected watch token %s\n", event->token);
> -                    free(event);
> +        xenbus_read_data((char *)&msg, sizeof(msg));
> +        DEBUG("Msg len %d, %d avail, id %d.\n", msg.len + sizeof(msg),
> +              xenstore_buf->rsp_prod - xenstore_buf->rsp_cons, msg.req_id);
> +
> +        if (msg.len > XENSTORE_PAYLOAD_MAX) {
> +            printk("Xenstore violates protocol, message longer than allowed.\n");
> +            return;
> +        }
> +
> +        if (msg.type == XS_WATCH_EVENT) {
> +            struct xenbus_event *event = malloc(sizeof(*event) + msg.len);
> +            xenbus_event_queue *events = NULL;
> +            struct watch *watch;
> +            char *c;
> +            int zeroes = 0;
> +
> +            data = (char*)event + sizeof(*event);
> +            xenbus_read_data(data, msg.len);
> +
> +            for (c = data; c < data + msg.len; c++)
> +                if (!*c)
> +                    zeroes++;
> +            if (zeroes != 2) {
> +                printk("Xenstore: illegal watch event data\n");
> +                free(event);
> +                continue;
> +            }
> +
> +            event->path = data;
> +            event->token = event->path + strlen(event->path) + 1;
> +
> +            for (watch = watches; watch; watch = watch->next)
> +                if (!strcmp(watch->token, event->token)) {
> +                    events = watch->events;
> +                    break;
>                   }
> +
> +            if (events) {
> +                event->next = *events;
> +                *events = event;
> +                wake_up(&xenbus_watch_queue);
>               } else {
> -                req_info[msg.req_id].reply = malloc(sizeof(msg) + msg.len);
> -                memcpy_from_ring(xenstore_buf->rsp, req_info[msg.req_id].reply,
> -                                 MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
> -                                 msg.len + sizeof(msg));
> -                mb();
> -                xenstore_buf->rsp_cons += msg.len + sizeof(msg);
> -                wake_up(&req_info[msg.req_id].waitq);
> +                printk("Xenstore: unexpected watch token %s\n", event->token);
> +                free(event);
>               }
>   
> -            wmb();
> -            notify_remote_via_evtchn(xenbus_evtchn);
> +            continue;
>           }
> +
> +        data = malloc(sizeof(msg) + msg.len);
> +        memcpy(data, &msg, sizeof(msg));
> +        xenbus_read_data(data + sizeof(msg), msg.len);
> +
> +        if (msg.req_id >= NR_REQS || !req_info[msg.req_id].in_use) {
> +            printk("Xenstore: illegal request id %d\n", msg.req_id);
> +            free(data);
> +            continue;
> +        }
> +
> +        DEBUG("Message is good.\n");
> +
> +        req_info[msg.req_id].reply = data;
> +
> +        wake_up(&req_info[msg.req_id].waitq);
>       }
>   }
>   
> @@ -451,36 +483,40 @@ static void xb_write(int type, int req_id, xenbus_transaction_t trans_id,
>   
>       cur_req = &header_req;
>   
> -    BUG_ON(len > XENSTORE_RING_SIZE);
> -    /* Wait for the ring to drain to the point where we can send the
> -       message. */
> -    prod = xenstore_buf->req_prod;
> -    if (prod + len - xenstore_buf->req_cons > XENSTORE_RING_SIZE)
> -    {
> -        /* Wait for there to be space on the ring */
> -        DEBUG("prod %d, len %d, cons %d, size %d; waiting.\n",
> -                prod, len, xenstore_buf->req_cons, XENSTORE_RING_SIZE);
> -        wait_event(xb_waitq,
> -                xenstore_buf->req_prod + len - xenstore_buf->req_cons <=
> -                XENSTORE_RING_SIZE);
> -        DEBUG("Back from wait.\n");
> -        prod = xenstore_buf->req_prod;
> -    }
> +    BUG_ON(len > XENSTORE_PAYLOAD_MAX);
>   
> -    /* We're now guaranteed to be able to send the message without
> -       overflowing the ring.  Do so. */
> +    /* Make sure we are the only thread trying to write. */
> +    down(&xb_write_sem);
> +
> +    /* Send the message in chunks using free ring space when available. */
>       total_off = 0;
>       req_off = 0;
> -    while (total_off < len)
> +    while (total_off < len)
>       {
> +        prod = xenstore_buf->req_prod;
> +        if (prod - xenstore_buf->req_cons >= XENSTORE_RING_SIZE)
> +        {
> +            /* Send evtchn to notify remote */
> +            notify_remote_via_evtchn(xenbus_evtchn);
> +
> +            /* Wait for there to be space on the ring */
> +            DEBUG("prod %d, len %d, cons %d, size %d; waiting.\n", prod,
> +                  len - total_off, xenstore_buf->req_cons, XENSTORE_RING_SIZE);
> +            wait_event(xb_waitq,
> +                       prod - xenstore_buf->req_cons < XENSTORE_RING_SIZE);
> +            DEBUG("Back from wait.\n");
> +        }
> +
>           this_chunk = min(cur_req->len - req_off,
> -                XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod));
> +                         XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod));
> +        this_chunk = min(this_chunk,
> +                         xenstore_buf->req_cons + XENSTORE_RING_SIZE - prod);
>           memcpy((char *)xenstore_buf->req + MASK_XENSTORE_IDX(prod),
> -                (char *)cur_req->data + req_off, this_chunk);
> +               (char *)cur_req->data + req_off, this_chunk);
>           prod += this_chunk;
>           req_off += this_chunk;
>           total_off += this_chunk;
> -        if (req_off == cur_req->len)
> +        if (req_off == cur_req->len)
>           {
>               req_off = 0;
>               if (cur_req == &header_req)
> @@ -488,20 +524,20 @@ static void xb_write(int type, int req_id, xenbus_transaction_t trans_id,
>               else
>                   cur_req++;
>           }
> +
> +        /* Remote must see entire message before updating indexes */
> +        wmb();
> +        xenstore_buf->req_prod = prod;
>       }
>   
> +    /* Send evtchn to notify remote */
> +    notify_remote_via_evtchn(xenbus_evtchn);
> +
>       DEBUG("Complete main loop of xb_write.\n");
>       BUG_ON(req_off != 0);
>       BUG_ON(total_off != len);
> -    BUG_ON(prod > xenstore_buf->req_cons + XENSTORE_RING_SIZE);
> -
> -    /* Remote must see entire message before updating indexes */
> -    wmb();
> -
> -    xenstore_buf->req_prod += len;
>   
> -    /* Send evtchn to notify remote */
> -    notify_remote_via_evtchn(xenbus_evtchn);
> +    up(&xb_write_sem);
>   }
>   
>   /* Send a mesasge to xenbus, in the same fashion as xb_write, and
> 


[-- Attachment #1.1.2: OpenPGP public key --]
[-- Type: application/pgp-keys, Size: 3135 bytes --]

[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 495 bytes --]

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH] mini-os: xenbus: support large messages
  2021-08-18 15:26 [PATCH] mini-os: xenbus: support large messages Juergen Gross
  2021-09-10  6:40 ` Juergen Gross
@ 2021-09-14 23:17 ` Samuel Thibault
  2021-09-15 10:48   ` Juergen Gross
  1 sibling, 1 reply; 7+ messages in thread
From: Samuel Thibault @ 2021-09-14 23:17 UTC (permalink / raw)
  To: Juergen Gross; +Cc: minios-devel, xen-devel, wl

Hello,

Thanks for having worked on this!

Juergen Gross, le mer. 18 août 2021 17:26:10 +0200, a ecrit:
> +static void xenbus_read_data(char *buf, unsigned int len)
> +{
> +    unsigned int off = 0;
> +    unsigned int prod;
> +    unsigned int size;
> +    int notify;
> +
> +    while (off != len)
> +    {
> +        if (xenstore_buf->rsp_prod == xenstore_buf->rsp_cons)
> +            wait_event(xb_waitq,
> +                       xenstore_buf->rsp_prod != xenstore_buf->rsp_cons);

The if is redundant since wait_event already tests it.

> +        prod = xenstore_buf->rsp_prod;
> +        DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons, prod);
> +        size = min(len - off, prod - xenstore_buf->rsp_cons);
> +        memcpy_from_ring(xenstore_buf->rsp, buf + off,
> +                         MASK_XENSTORE_IDX(xenstore_buf->rsp_cons), size);
> +        off += size;
> +        notify = (xenstore_buf->rsp_cons + XENSTORE_RING_SIZE ==
> +                  xenstore_buf->rsp_prod);

This looks odd to me?  We want to notify as soon as the ring is empty,
which can happen at any place in the ring right?

Linux' code uses (intf->rsp_prod - cons >= XENSTORE_RING_SIZE), *after*
the rsp_cons increase.

> +        rmb();

rmb() must be placed before memcpy_from_ring, to make sure that the data
we read from the buffer is up-to-date according to the read we made from
rsp_prod.

The rest seems ok to me.

Samuel


^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH] mini-os: xenbus: support large messages
  2021-09-14 23:17 ` Samuel Thibault
@ 2021-09-15 10:48   ` Juergen Gross
  2021-09-15 11:20     ` Samuel Thibault
  0 siblings, 1 reply; 7+ messages in thread
From: Juergen Gross @ 2021-09-15 10:48 UTC (permalink / raw)
  To: Samuel Thibault, minios-devel, xen-devel, wl


[-- Attachment #1.1.1: Type: text/plain, Size: 1859 bytes --]

On 15.09.21 01:17, Samuel Thibault wrote:
> Hello,
> 
> Thanks for having worked on this!
> 
> Juergen Gross, le mer. 18 août 2021 17:26:10 +0200, a ecrit:
>> +static void xenbus_read_data(char *buf, unsigned int len)
>> +{
>> +    unsigned int off = 0;
>> +    unsigned int prod;
>> +    unsigned int size;
>> +    int notify;
>> +
>> +    while (off != len)
>> +    {
>> +        if (xenstore_buf->rsp_prod == xenstore_buf->rsp_cons)
>> +            wait_event(xb_waitq,
>> +                       xenstore_buf->rsp_prod != xenstore_buf->rsp_cons);
> 
> The if is redundant since wait_event already tests it.

Ah, right.

> 
>> +        prod = xenstore_buf->rsp_prod;
>> +        DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons, prod);
>> +        size = min(len - off, prod - xenstore_buf->rsp_cons);
>> +        memcpy_from_ring(xenstore_buf->rsp, buf + off,
>> +                         MASK_XENSTORE_IDX(xenstore_buf->rsp_cons), size);
>> +        off += size;
>> +        notify = (xenstore_buf->rsp_cons + XENSTORE_RING_SIZE ==
>> +                  xenstore_buf->rsp_prod);
> 
> This looks odd to me?  We want to notify as soon as the ring is empty,
> which can happen at any place in the ring right?

No, we want to notify if the ring was full and is about to gain some
space again, as the other side was probably not able to put all data
in and is now waiting for more space to become available.

> 
> Linux' code uses (intf->rsp_prod - cons >= XENSTORE_RING_SIZE), *after*
> the rsp_cons increase.
> 
>> +        rmb();
> 
> rmb() must be placed before memcpy_from_ring, to make sure that the data
> we read from the buffer is up-to-date according to the read we made from
> rsp_prod.

Ah, yes. Thanks for spotting that one!

> 
> The rest seems ok to me.

Thanks,


Juergen

[-- Attachment #1.1.2: OpenPGP public key --]
[-- Type: application/pgp-keys, Size: 3135 bytes --]

[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 495 bytes --]

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH] mini-os: xenbus: support large messages
  2021-09-15 10:48   ` Juergen Gross
@ 2021-09-15 11:20     ` Samuel Thibault
  2021-09-15 12:03       ` Juergen Gross
  0 siblings, 1 reply; 7+ messages in thread
From: Samuel Thibault @ 2021-09-15 11:20 UTC (permalink / raw)
  To: Juergen Gross; +Cc: minios-devel, xen-devel, wl

Juergen Gross, le mer. 15 sept. 2021 12:48:44 +0200, a ecrit:
> On 15.09.21 01:17, Samuel Thibault wrote:
> > > +        prod = xenstore_buf->rsp_prod;
> > > +        DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons, prod);
> > > +        size = min(len - off, prod - xenstore_buf->rsp_cons);
> > > +        memcpy_from_ring(xenstore_buf->rsp, buf + off,
> > > +                         MASK_XENSTORE_IDX(xenstore_buf->rsp_cons), size);
> > > +        off += size;
> > > +        notify = (xenstore_buf->rsp_cons + XENSTORE_RING_SIZE ==
> > > +                  xenstore_buf->rsp_prod);
> > 
> > This looks odd to me?  We want to notify as soon as the ring is empty,
> > which can happen at any place in the ring right?
> 
> No, we want to notify if the ring was full and is about to gain some
> space again, as the other side was probably not able to put all data
> in and is now waiting for more space to become available.

Ok, that said, the producer may fill the ring between this test and
the rsp_cons update, and thus the producer will go sleep and here the
consumer will not notice it and thus never notify it.

So we really need to make the test after the rsp_cons update, like Linux
does:

> > Linux' code uses (intf->rsp_prod - cons >= XENSTORE_RING_SIZE), *after*
> > the rsp_cons increase.

Samuel


^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH] mini-os: xenbus: support large messages
  2021-09-15 11:20     ` Samuel Thibault
@ 2021-09-15 12:03       ` Juergen Gross
  2021-09-15 12:20         ` Samuel Thibault
  0 siblings, 1 reply; 7+ messages in thread
From: Juergen Gross @ 2021-09-15 12:03 UTC (permalink / raw)
  To: Samuel Thibault, minios-devel, xen-devel, wl


[-- Attachment #1.1.1: Type: text/plain, Size: 1480 bytes --]

On 15.09.21 13:20, Samuel Thibault wrote:
> Juergen Gross, le mer. 15 sept. 2021 12:48:44 +0200, a ecrit:
>> On 15.09.21 01:17, Samuel Thibault wrote:
>>>> +        prod = xenstore_buf->rsp_prod;
>>>> +        DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons, prod);
>>>> +        size = min(len - off, prod - xenstore_buf->rsp_cons);
>>>> +        memcpy_from_ring(xenstore_buf->rsp, buf + off,
>>>> +                         MASK_XENSTORE_IDX(xenstore_buf->rsp_cons), size);
>>>> +        off += size;
>>>> +        notify = (xenstore_buf->rsp_cons + XENSTORE_RING_SIZE ==
>>>> +                  xenstore_buf->rsp_prod);
>>>
>>> This looks odd to me?  We want to notify as soon as the ring is empty,
>>> which can happen at any place in the ring right?
>>
>> No, we want to notify if the ring was full and is about to gain some
>> space again, as the other side was probably not able to put all data
>> in and is now waiting for more space to become available.
> 
> Ok, that said, the producer may fill the ring between this test and
> the rsp_cons update, and thus the producer will go sleep and here the
> consumer will not notice it and thus never notify it.
> 
> So we really need to make the test after the rsp_cons update, like Linux
> does:
> 
>>> Linux' code uses (intf->rsp_prod - cons >= XENSTORE_RING_SIZE), *after*
>>> the rsp_cons increase.

Oh, you are right, of course. How could I overlook this?

Thanks,


Juergen

[-- Attachment #1.1.2: OpenPGP public key --]
[-- Type: application/pgp-keys, Size: 3135 bytes --]

[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 495 bytes --]

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [PATCH] mini-os: xenbus: support large messages
  2021-09-15 12:03       ` Juergen Gross
@ 2021-09-15 12:20         ` Samuel Thibault
  0 siblings, 0 replies; 7+ messages in thread
From: Samuel Thibault @ 2021-09-15 12:20 UTC (permalink / raw)
  To: Juergen Gross; +Cc: minios-devel, xen-devel, wl

Juergen Gross, le mer. 15 sept. 2021 14:03:35 +0200, a ecrit:
> On 15.09.21 13:20, Samuel Thibault wrote:
> > So we really need to make the test after the rsp_cons update, like Linux
> > does:
> > 
> > > > Linux' code uses (intf->rsp_prod - cons >= XENSTORE_RING_SIZE), *after*
> > > > the rsp_cons increase.
> 
> Oh, you are right, of course. How could I overlook this?

One person alone will always overlook something, that's why we make
code review, to have several pairs of eyes check whether we may have
forgotten something :)

Samuel


^ permalink raw reply	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2021-09-15 12:20 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-08-18 15:26 [PATCH] mini-os: xenbus: support large messages Juergen Gross
2021-09-10  6:40 ` Juergen Gross
2021-09-14 23:17 ` Samuel Thibault
2021-09-15 10:48   ` Juergen Gross
2021-09-15 11:20     ` Samuel Thibault
2021-09-15 12:03       ` Juergen Gross
2021-09-15 12:20         ` Samuel Thibault

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.