All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH v4] xenbus: support large messages
@ 2021-10-04  9:40 Juergen Gross
  2021-11-24  7:00 ` Juergen Gross
  0 siblings, 1 reply; 3+ messages in thread
From: Juergen Gross @ 2021-10-04  9:40 UTC (permalink / raw)
  To: minios-devel, xen-devel; +Cc: samuel.thibault, wl, Juergen Gross

Today the implementation of the xenbus protocol in Mini-OS will only
allow to transfer the complete message to or from the ring page buffer.
This is limiting the maximum message size to lower values as the xenbus
protocol normally would allow.

Change that by allowing to transfer the xenbus message in chunks as
soon as they are available.

Avoid crashing Mini-OS in case of illegal data read from the ring
buffer.

Signed-off-by: Juergen Gross <jgross@suse.com>
---
V2:
- drop redundant if (Samuel Thibault)
- move rmb() (Samuel Thibault)
V3:
- correct notification test (Samuel Thibault)
V4:
- more memory barriers (Samuel Thibault)
---
 xenbus/xenbus.c | 210 ++++++++++++++++++++++++++++--------------------
 1 file changed, 122 insertions(+), 88 deletions(-)

diff --git a/xenbus/xenbus.c b/xenbus/xenbus.c
index 23de61e..b687678 100644
--- a/xenbus/xenbus.c
+++ b/xenbus/xenbus.c
@@ -29,6 +29,7 @@
 #include <xen/hvm/params.h>
 #include <mini-os/spinlock.h>
 #include <mini-os/xmalloc.h>
+#include <mini-os/semaphore.h>
 
 #define min(x,y) ({                       \
         typeof(x) tmpx = (x);                 \
@@ -46,6 +47,7 @@
 static struct xenstore_domain_interface *xenstore_buf;
 static DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
 DECLARE_WAIT_QUEUE_HEAD(xenbus_watch_queue);
+static __DECLARE_SEMAPHORE_GENERIC(xb_write_sem, 1);
 
 xenbus_event_queue xenbus_events;
 static struct watch {
@@ -231,75 +233,103 @@ char *xenbus_wait_for_state_change(const char* path, XenbusState *state, xenbus_
 }
 
 
+static void xenbus_read_data(char *buf, unsigned int len)
+{
+    unsigned int off = 0;
+    unsigned int prod, cons;
+    unsigned int size;
+
+    while (off != len)
+    {
+        wait_event(xb_waitq, xenstore_buf->rsp_prod != xenstore_buf->rsp_cons);
+
+        prod = xenstore_buf->rsp_prod;
+        cons = xenstore_buf->rsp_cons;
+        DEBUG("Rsp_cons %d, rsp_prod %d.\n", cons, prod);
+        size = min(len - off, prod - cons);
+
+        rmb();   /* Make sure data read from ring is ordered with rsp_prod. */
+        memcpy_from_ring(xenstore_buf->rsp, buf + off,
+                         MASK_XENSTORE_IDX(cons), size);
+        off += size;
+        mb();    /* memcpy() and rsp_cons update must not be reordered. */
+        xenstore_buf->rsp_cons += size;
+        mb();    /* rsp_cons must be visible before we look at rsp_prod. */
+        if (xenstore_buf->rsp_prod - cons >= XENSTORE_RING_SIZE)
+            notify_remote_via_evtchn(xenbus_evtchn);
+    }
+}
+
 static void xenbus_thread_func(void *ign)
 {
     struct xsd_sockmsg msg;
-    unsigned prod = xenstore_buf->rsp_prod;
+    char *data;
 
     for (;;) {
-        wait_event(xb_waitq, prod != xenstore_buf->rsp_prod);
-        while (1) {
-            prod = xenstore_buf->rsp_prod;
-            DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons,
-                  xenstore_buf->rsp_prod);
-            if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons < sizeof(msg))
-                break;
-            rmb();
-            memcpy_from_ring(xenstore_buf->rsp, &msg,
-                             MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
-                             sizeof(msg));
-            DEBUG("Msg len %d, %d avail, id %d.\n", msg.len + sizeof(msg),
-                  xenstore_buf->rsp_prod - xenstore_buf->rsp_cons, msg.req_id);
-
-            if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons <
-                sizeof(msg) + msg.len)
-                break;
-
-            DEBUG("Message is good.\n");
-
-            if (msg.type == XS_WATCH_EVENT) {
-                struct xenbus_event *event = malloc(sizeof(*event) + msg.len);
-                xenbus_event_queue *events = NULL;
-                char *data = (char*)event + sizeof(*event);
-                struct watch *watch;
-
-                memcpy_from_ring(xenstore_buf->rsp, data,
-                    MASK_XENSTORE_IDX(xenstore_buf->rsp_cons + sizeof(msg)),
-                    msg.len);
-
-                event->path = data;
-                event->token = event->path + strlen(event->path) + 1;
-
-                mb();
-                xenstore_buf->rsp_cons += msg.len + sizeof(msg);
-
-                for (watch = watches; watch; watch = watch->next)
-                    if (!strcmp(watch->token, event->token)) {
-                        events = watch->events;
-                        break;
-                    }
-
-                if (events) {
-                    event->next = *events;
-                    *events = event;
-                    wake_up(&xenbus_watch_queue);
-                } else {
-                    printk("unexpected watch token %s\n", event->token);
-                    free(event);
+        xenbus_read_data((char *)&msg, sizeof(msg));
+        DEBUG("Msg len %d, %d avail, id %d.\n", msg.len + sizeof(msg),
+              xenstore_buf->rsp_prod - xenstore_buf->rsp_cons, msg.req_id);
+
+        if (msg.len > XENSTORE_PAYLOAD_MAX) {
+            printk("Xenstore violates protocol, message longer than allowed.\n");
+            return;
+        }
+
+        if (msg.type == XS_WATCH_EVENT) {
+            struct xenbus_event *event = malloc(sizeof(*event) + msg.len);
+            xenbus_event_queue *events = NULL;
+            struct watch *watch;
+            char *c;
+            int zeroes = 0;
+
+            data = (char*)event + sizeof(*event);
+            xenbus_read_data(data, msg.len);
+
+            for (c = data; c < data + msg.len; c++)
+                if (!*c)
+                    zeroes++;
+            if (zeroes != 2) {
+                printk("Xenstore: illegal watch event data\n");
+                free(event);
+                continue;
+            }
+
+            event->path = data;
+            event->token = event->path + strlen(event->path) + 1;
+
+            for (watch = watches; watch; watch = watch->next)
+                if (!strcmp(watch->token, event->token)) {
+                    events = watch->events;
+                    break;
                 }
+
+            if (events) {
+                event->next = *events;
+                *events = event;
+                wake_up(&xenbus_watch_queue);
             } else {
-                req_info[msg.req_id].reply = malloc(sizeof(msg) + msg.len);
-                memcpy_from_ring(xenstore_buf->rsp, req_info[msg.req_id].reply,
-                                 MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
-                                 msg.len + sizeof(msg));
-                mb();
-                xenstore_buf->rsp_cons += msg.len + sizeof(msg);
-                wake_up(&req_info[msg.req_id].waitq);
+                printk("Xenstore: unexpected watch token %s\n", event->token);
+                free(event);
             }
 
-            wmb();
-            notify_remote_via_evtchn(xenbus_evtchn);
+            continue;
         }
+
+        data = malloc(sizeof(msg) + msg.len);
+        memcpy(data, &msg, sizeof(msg));
+        xenbus_read_data(data + sizeof(msg), msg.len);
+
+        if (msg.req_id >= NR_REQS || !req_info[msg.req_id].in_use) {
+            printk("Xenstore: illegal request id %d\n", msg.req_id);
+            free(data);
+            continue;
+        }
+
+        DEBUG("Message is good.\n");
+
+        req_info[msg.req_id].reply = data;
+
+        wake_up(&req_info[msg.req_id].waitq);
     }
 }
 
@@ -451,36 +481,40 @@ static void xb_write(int type, int req_id, xenbus_transaction_t trans_id,
 
     cur_req = &header_req;
 
-    BUG_ON(len > XENSTORE_RING_SIZE);
-    /* Wait for the ring to drain to the point where we can send the
-       message. */
-    prod = xenstore_buf->req_prod;
-    if (prod + len - xenstore_buf->req_cons > XENSTORE_RING_SIZE) 
-    {
-        /* Wait for there to be space on the ring */
-        DEBUG("prod %d, len %d, cons %d, size %d; waiting.\n",
-                prod, len, xenstore_buf->req_cons, XENSTORE_RING_SIZE);
-        wait_event(xb_waitq,
-                xenstore_buf->req_prod + len - xenstore_buf->req_cons <=
-                XENSTORE_RING_SIZE);
-        DEBUG("Back from wait.\n");
-        prod = xenstore_buf->req_prod;
-    }
+    BUG_ON(len > XENSTORE_PAYLOAD_MAX);
+
+    /* Make sure we are the only thread trying to write. */
+    down(&xb_write_sem);
 
-    /* We're now guaranteed to be able to send the message without
-       overflowing the ring.  Do so. */
+    /* Send the message in chunks using free ring space when available. */
     total_off = 0;
     req_off = 0;
-    while (total_off < len) 
+    while (total_off < len)
     {
+        prod = xenstore_buf->req_prod;
+        if (prod - xenstore_buf->req_cons >= XENSTORE_RING_SIZE)
+        {
+            /* Send evtchn to notify remote */
+            notify_remote_via_evtchn(xenbus_evtchn);
+
+            /* Wait for there to be space on the ring */
+            DEBUG("prod %d, len %d, cons %d, size %d; waiting.\n", prod,
+                  len - total_off, xenstore_buf->req_cons, XENSTORE_RING_SIZE);
+            wait_event(xb_waitq,
+                       prod - xenstore_buf->req_cons < XENSTORE_RING_SIZE);
+            DEBUG("Back from wait.\n");
+        }
+
         this_chunk = min(cur_req->len - req_off,
-                XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod));
+                         XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod));
+        this_chunk = min(this_chunk,
+                         xenstore_buf->req_cons + XENSTORE_RING_SIZE - prod);
         memcpy((char *)xenstore_buf->req + MASK_XENSTORE_IDX(prod),
-                (char *)cur_req->data + req_off, this_chunk);
+               (char *)cur_req->data + req_off, this_chunk);
         prod += this_chunk;
         req_off += this_chunk;
         total_off += this_chunk;
-        if (req_off == cur_req->len) 
+        if (req_off == cur_req->len)
         {
             req_off = 0;
             if (cur_req == &header_req)
@@ -488,20 +522,20 @@ static void xb_write(int type, int req_id, xenbus_transaction_t trans_id,
             else
                 cur_req++;
         }
+
+        /* Remote must see entire message before updating indexes */
+        wmb();
+        xenstore_buf->req_prod = prod;
     }
 
+    /* Send evtchn to notify remote */
+    notify_remote_via_evtchn(xenbus_evtchn);
+
     DEBUG("Complete main loop of xb_write.\n");
     BUG_ON(req_off != 0);
     BUG_ON(total_off != len);
-    BUG_ON(prod > xenstore_buf->req_cons + XENSTORE_RING_SIZE);
 
-    /* Remote must see entire message before updating indexes */
-    wmb();
-
-    xenstore_buf->req_prod += len;
-
-    /* Send evtchn to notify remote */
-    notify_remote_via_evtchn(xenbus_evtchn);
+    up(&xb_write_sem);
 }
 
 /* Send a mesasge to xenbus, in the same fashion as xb_write, and
-- 
2.26.2



^ permalink raw reply related	[flat|nested] 3+ messages in thread

* Re: [PATCH v4] xenbus: support large messages
  2021-10-04  9:40 [PATCH v4] xenbus: support large messages Juergen Gross
@ 2021-11-24  7:00 ` Juergen Gross
  2021-11-28 19:36   ` Samuel Thibault
  0 siblings, 1 reply; 3+ messages in thread
From: Juergen Gross @ 2021-11-24  7:00 UTC (permalink / raw)
  To: minios-devel, xen-devel; +Cc: samuel.thibault, wl


[-- Attachment #1.1.1: Type: text/plain, Size: 11784 bytes --]

Ping?

On 04.10.21 11:40, Juergen Gross wrote:
> Today the implementation of the xenbus protocol in Mini-OS will only
> allow to transfer the complete message to or from the ring page buffer.
> This is limiting the maximum message size to lower values as the xenbus
> protocol normally would allow.
> 
> Change that by allowing to transfer the xenbus message in chunks as
> soon as they are available.
> 
> Avoid crashing Mini-OS in case of illegal data read from the ring
> buffer.
> 
> Signed-off-by: Juergen Gross <jgross@suse.com>
> ---
> V2:
> - drop redundant if (Samuel Thibault)
> - move rmb() (Samuel Thibault)
> V3:
> - correct notification test (Samuel Thibault)
> V4:
> - more memory barriers (Samuel Thibault)
> ---
>   xenbus/xenbus.c | 210 ++++++++++++++++++++++++++++--------------------
>   1 file changed, 122 insertions(+), 88 deletions(-)
> 
> diff --git a/xenbus/xenbus.c b/xenbus/xenbus.c
> index 23de61e..b687678 100644
> --- a/xenbus/xenbus.c
> +++ b/xenbus/xenbus.c
> @@ -29,6 +29,7 @@
>   #include <xen/hvm/params.h>
>   #include <mini-os/spinlock.h>
>   #include <mini-os/xmalloc.h>
> +#include <mini-os/semaphore.h>
>   
>   #define min(x,y) ({                       \
>           typeof(x) tmpx = (x);                 \
> @@ -46,6 +47,7 @@
>   static struct xenstore_domain_interface *xenstore_buf;
>   static DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
>   DECLARE_WAIT_QUEUE_HEAD(xenbus_watch_queue);
> +static __DECLARE_SEMAPHORE_GENERIC(xb_write_sem, 1);
>   
>   xenbus_event_queue xenbus_events;
>   static struct watch {
> @@ -231,75 +233,103 @@ char *xenbus_wait_for_state_change(const char* path, XenbusState *state, xenbus_
>   }
>   
>   
> +static void xenbus_read_data(char *buf, unsigned int len)
> +{
> +    unsigned int off = 0;
> +    unsigned int prod, cons;
> +    unsigned int size;
> +
> +    while (off != len)
> +    {
> +        wait_event(xb_waitq, xenstore_buf->rsp_prod != xenstore_buf->rsp_cons);
> +
> +        prod = xenstore_buf->rsp_prod;
> +        cons = xenstore_buf->rsp_cons;
> +        DEBUG("Rsp_cons %d, rsp_prod %d.\n", cons, prod);
> +        size = min(len - off, prod - cons);
> +
> +        rmb();   /* Make sure data read from ring is ordered with rsp_prod. */
> +        memcpy_from_ring(xenstore_buf->rsp, buf + off,
> +                         MASK_XENSTORE_IDX(cons), size);
> +        off += size;
> +        mb();    /* memcpy() and rsp_cons update must not be reordered. */
> +        xenstore_buf->rsp_cons += size;
> +        mb();    /* rsp_cons must be visible before we look at rsp_prod. */
> +        if (xenstore_buf->rsp_prod - cons >= XENSTORE_RING_SIZE)
> +            notify_remote_via_evtchn(xenbus_evtchn);
> +    }
> +}
> +
>   static void xenbus_thread_func(void *ign)
>   {
>       struct xsd_sockmsg msg;
> -    unsigned prod = xenstore_buf->rsp_prod;
> +    char *data;
>   
>       for (;;) {
> -        wait_event(xb_waitq, prod != xenstore_buf->rsp_prod);
> -        while (1) {
> -            prod = xenstore_buf->rsp_prod;
> -            DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons,
> -                  xenstore_buf->rsp_prod);
> -            if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons < sizeof(msg))
> -                break;
> -            rmb();
> -            memcpy_from_ring(xenstore_buf->rsp, &msg,
> -                             MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
> -                             sizeof(msg));
> -            DEBUG("Msg len %d, %d avail, id %d.\n", msg.len + sizeof(msg),
> -                  xenstore_buf->rsp_prod - xenstore_buf->rsp_cons, msg.req_id);
> -
> -            if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons <
> -                sizeof(msg) + msg.len)
> -                break;
> -
> -            DEBUG("Message is good.\n");
> -
> -            if (msg.type == XS_WATCH_EVENT) {
> -                struct xenbus_event *event = malloc(sizeof(*event) + msg.len);
> -                xenbus_event_queue *events = NULL;
> -                char *data = (char*)event + sizeof(*event);
> -                struct watch *watch;
> -
> -                memcpy_from_ring(xenstore_buf->rsp, data,
> -                    MASK_XENSTORE_IDX(xenstore_buf->rsp_cons + sizeof(msg)),
> -                    msg.len);
> -
> -                event->path = data;
> -                event->token = event->path + strlen(event->path) + 1;
> -
> -                mb();
> -                xenstore_buf->rsp_cons += msg.len + sizeof(msg);
> -
> -                for (watch = watches; watch; watch = watch->next)
> -                    if (!strcmp(watch->token, event->token)) {
> -                        events = watch->events;
> -                        break;
> -                    }
> -
> -                if (events) {
> -                    event->next = *events;
> -                    *events = event;
> -                    wake_up(&xenbus_watch_queue);
> -                } else {
> -                    printk("unexpected watch token %s\n", event->token);
> -                    free(event);
> +        xenbus_read_data((char *)&msg, sizeof(msg));
> +        DEBUG("Msg len %d, %d avail, id %d.\n", msg.len + sizeof(msg),
> +              xenstore_buf->rsp_prod - xenstore_buf->rsp_cons, msg.req_id);
> +
> +        if (msg.len > XENSTORE_PAYLOAD_MAX) {
> +            printk("Xenstore violates protocol, message longer than allowed.\n");
> +            return;
> +        }
> +
> +        if (msg.type == XS_WATCH_EVENT) {
> +            struct xenbus_event *event = malloc(sizeof(*event) + msg.len);
> +            xenbus_event_queue *events = NULL;
> +            struct watch *watch;
> +            char *c;
> +            int zeroes = 0;
> +
> +            data = (char*)event + sizeof(*event);
> +            xenbus_read_data(data, msg.len);
> +
> +            for (c = data; c < data + msg.len; c++)
> +                if (!*c)
> +                    zeroes++;
> +            if (zeroes != 2) {
> +                printk("Xenstore: illegal watch event data\n");
> +                free(event);
> +                continue;
> +            }
> +
> +            event->path = data;
> +            event->token = event->path + strlen(event->path) + 1;
> +
> +            for (watch = watches; watch; watch = watch->next)
> +                if (!strcmp(watch->token, event->token)) {
> +                    events = watch->events;
> +                    break;
>                   }
> +
> +            if (events) {
> +                event->next = *events;
> +                *events = event;
> +                wake_up(&xenbus_watch_queue);
>               } else {
> -                req_info[msg.req_id].reply = malloc(sizeof(msg) + msg.len);
> -                memcpy_from_ring(xenstore_buf->rsp, req_info[msg.req_id].reply,
> -                                 MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
> -                                 msg.len + sizeof(msg));
> -                mb();
> -                xenstore_buf->rsp_cons += msg.len + sizeof(msg);
> -                wake_up(&req_info[msg.req_id].waitq);
> +                printk("Xenstore: unexpected watch token %s\n", event->token);
> +                free(event);
>               }
>   
> -            wmb();
> -            notify_remote_via_evtchn(xenbus_evtchn);
> +            continue;
>           }
> +
> +        data = malloc(sizeof(msg) + msg.len);
> +        memcpy(data, &msg, sizeof(msg));
> +        xenbus_read_data(data + sizeof(msg), msg.len);
> +
> +        if (msg.req_id >= NR_REQS || !req_info[msg.req_id].in_use) {
> +            printk("Xenstore: illegal request id %d\n", msg.req_id);
> +            free(data);
> +            continue;
> +        }
> +
> +        DEBUG("Message is good.\n");
> +
> +        req_info[msg.req_id].reply = data;
> +
> +        wake_up(&req_info[msg.req_id].waitq);
>       }
>   }
>   
> @@ -451,36 +481,40 @@ static void xb_write(int type, int req_id, xenbus_transaction_t trans_id,
>   
>       cur_req = &header_req;
>   
> -    BUG_ON(len > XENSTORE_RING_SIZE);
> -    /* Wait for the ring to drain to the point where we can send the
> -       message. */
> -    prod = xenstore_buf->req_prod;
> -    if (prod + len - xenstore_buf->req_cons > XENSTORE_RING_SIZE)
> -    {
> -        /* Wait for there to be space on the ring */
> -        DEBUG("prod %d, len %d, cons %d, size %d; waiting.\n",
> -                prod, len, xenstore_buf->req_cons, XENSTORE_RING_SIZE);
> -        wait_event(xb_waitq,
> -                xenstore_buf->req_prod + len - xenstore_buf->req_cons <=
> -                XENSTORE_RING_SIZE);
> -        DEBUG("Back from wait.\n");
> -        prod = xenstore_buf->req_prod;
> -    }
> +    BUG_ON(len > XENSTORE_PAYLOAD_MAX);
> +
> +    /* Make sure we are the only thread trying to write. */
> +    down(&xb_write_sem);
>   
> -    /* We're now guaranteed to be able to send the message without
> -       overflowing the ring.  Do so. */
> +    /* Send the message in chunks using free ring space when available. */
>       total_off = 0;
>       req_off = 0;
> -    while (total_off < len)
> +    while (total_off < len)
>       {
> +        prod = xenstore_buf->req_prod;
> +        if (prod - xenstore_buf->req_cons >= XENSTORE_RING_SIZE)
> +        {
> +            /* Send evtchn to notify remote */
> +            notify_remote_via_evtchn(xenbus_evtchn);
> +
> +            /* Wait for there to be space on the ring */
> +            DEBUG("prod %d, len %d, cons %d, size %d; waiting.\n", prod,
> +                  len - total_off, xenstore_buf->req_cons, XENSTORE_RING_SIZE);
> +            wait_event(xb_waitq,
> +                       prod - xenstore_buf->req_cons < XENSTORE_RING_SIZE);
> +            DEBUG("Back from wait.\n");
> +        }
> +
>           this_chunk = min(cur_req->len - req_off,
> -                XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod));
> +                         XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod));
> +        this_chunk = min(this_chunk,
> +                         xenstore_buf->req_cons + XENSTORE_RING_SIZE - prod);
>           memcpy((char *)xenstore_buf->req + MASK_XENSTORE_IDX(prod),
> -                (char *)cur_req->data + req_off, this_chunk);
> +               (char *)cur_req->data + req_off, this_chunk);
>           prod += this_chunk;
>           req_off += this_chunk;
>           total_off += this_chunk;
> -        if (req_off == cur_req->len)
> +        if (req_off == cur_req->len)
>           {
>               req_off = 0;
>               if (cur_req == &header_req)
> @@ -488,20 +522,20 @@ static void xb_write(int type, int req_id, xenbus_transaction_t trans_id,
>               else
>                   cur_req++;
>           }
> +
> +        /* Remote must see entire message before updating indexes */
> +        wmb();
> +        xenstore_buf->req_prod = prod;
>       }
>   
> +    /* Send evtchn to notify remote */
> +    notify_remote_via_evtchn(xenbus_evtchn);
> +
>       DEBUG("Complete main loop of xb_write.\n");
>       BUG_ON(req_off != 0);
>       BUG_ON(total_off != len);
> -    BUG_ON(prod > xenstore_buf->req_cons + XENSTORE_RING_SIZE);
>   
> -    /* Remote must see entire message before updating indexes */
> -    wmb();
> -
> -    xenstore_buf->req_prod += len;
> -
> -    /* Send evtchn to notify remote */
> -    notify_remote_via_evtchn(xenbus_evtchn);
> +    up(&xb_write_sem);
>   }
>   
>   /* Send a mesasge to xenbus, in the same fashion as xb_write, and
> 


[-- Attachment #1.1.2: OpenPGP public key --]
[-- Type: application/pgp-keys, Size: 3135 bytes --]

[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 495 bytes --]

^ permalink raw reply	[flat|nested] 3+ messages in thread

* Re: [PATCH v4] xenbus: support large messages
  2021-11-24  7:00 ` Juergen Gross
@ 2021-11-28 19:36   ` Samuel Thibault
  0 siblings, 0 replies; 3+ messages in thread
From: Samuel Thibault @ 2021-11-28 19:36 UTC (permalink / raw)
  To: Juergen Gross; +Cc: minios-devel, xen-devel, wl

Hello,

Sorry, it seems I missed that mail :/

Added my tag below.

BTW, I didn't see the mb() fix between rsp_cons+= and reading rsp_prod
on the Linux side?

Samuel

Juergen Gross, le mer. 24 nov. 2021 08:00:55 +0100, a ecrit:
> Ping?
> 
> On 04.10.21 11:40, Juergen Gross wrote:
> > Today the implementation of the xenbus protocol in Mini-OS will only
> > allow to transfer the complete message to or from the ring page buffer.
> > This is limiting the maximum message size to lower values as the xenbus
> > protocol normally would allow.
> > 
> > Change that by allowing to transfer the xenbus message in chunks as
> > soon as they are available.
> > 
> > Avoid crashing Mini-OS in case of illegal data read from the ring
> > buffer.
> > 
> > Signed-off-by: Juergen Gross <jgross@suse.com>

Reviewed-by: Samuel Thibault <samuel.thibault@ens-lyon.org>

> > ---
> > V2:
> > - drop redundant if (Samuel Thibault)
> > - move rmb() (Samuel Thibault)
> > V3:
> > - correct notification test (Samuel Thibault)
> > V4:
> > - more memory barriers (Samuel Thibault)
> > ---
> >   xenbus/xenbus.c | 210 ++++++++++++++++++++++++++++--------------------
> >   1 file changed, 122 insertions(+), 88 deletions(-)
> > 
> > diff --git a/xenbus/xenbus.c b/xenbus/xenbus.c
> > index 23de61e..b687678 100644
> > --- a/xenbus/xenbus.c
> > +++ b/xenbus/xenbus.c
> > @@ -29,6 +29,7 @@
> >   #include <xen/hvm/params.h>
> >   #include <mini-os/spinlock.h>
> >   #include <mini-os/xmalloc.h>
> > +#include <mini-os/semaphore.h>
> >   #define min(x,y) ({                       \
> >           typeof(x) tmpx = (x);                 \
> > @@ -46,6 +47,7 @@
> >   static struct xenstore_domain_interface *xenstore_buf;
> >   static DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
> >   DECLARE_WAIT_QUEUE_HEAD(xenbus_watch_queue);
> > +static __DECLARE_SEMAPHORE_GENERIC(xb_write_sem, 1);
> >   xenbus_event_queue xenbus_events;
> >   static struct watch {
> > @@ -231,75 +233,103 @@ char *xenbus_wait_for_state_change(const char* path, XenbusState *state, xenbus_
> >   }
> > +static void xenbus_read_data(char *buf, unsigned int len)
> > +{
> > +    unsigned int off = 0;
> > +    unsigned int prod, cons;
> > +    unsigned int size;
> > +
> > +    while (off != len)
> > +    {
> > +        wait_event(xb_waitq, xenstore_buf->rsp_prod != xenstore_buf->rsp_cons);
> > +
> > +        prod = xenstore_buf->rsp_prod;
> > +        cons = xenstore_buf->rsp_cons;
> > +        DEBUG("Rsp_cons %d, rsp_prod %d.\n", cons, prod);
> > +        size = min(len - off, prod - cons);
> > +
> > +        rmb();   /* Make sure data read from ring is ordered with rsp_prod. */
> > +        memcpy_from_ring(xenstore_buf->rsp, buf + off,
> > +                         MASK_XENSTORE_IDX(cons), size);
> > +        off += size;
> > +        mb();    /* memcpy() and rsp_cons update must not be reordered. */
> > +        xenstore_buf->rsp_cons += size;
> > +        mb();    /* rsp_cons must be visible before we look at rsp_prod. */
> > +        if (xenstore_buf->rsp_prod - cons >= XENSTORE_RING_SIZE)
> > +            notify_remote_via_evtchn(xenbus_evtchn);
> > +    }
> > +}
> > +
> >   static void xenbus_thread_func(void *ign)
> >   {
> >       struct xsd_sockmsg msg;
> > -    unsigned prod = xenstore_buf->rsp_prod;
> > +    char *data;
> >       for (;;) {
> > -        wait_event(xb_waitq, prod != xenstore_buf->rsp_prod);
> > -        while (1) {
> > -            prod = xenstore_buf->rsp_prod;
> > -            DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons,
> > -                  xenstore_buf->rsp_prod);
> > -            if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons < sizeof(msg))
> > -                break;
> > -            rmb();
> > -            memcpy_from_ring(xenstore_buf->rsp, &msg,
> > -                             MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
> > -                             sizeof(msg));
> > -            DEBUG("Msg len %d, %d avail, id %d.\n", msg.len + sizeof(msg),
> > -                  xenstore_buf->rsp_prod - xenstore_buf->rsp_cons, msg.req_id);
> > -
> > -            if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons <
> > -                sizeof(msg) + msg.len)
> > -                break;
> > -
> > -            DEBUG("Message is good.\n");
> > -
> > -            if (msg.type == XS_WATCH_EVENT) {
> > -                struct xenbus_event *event = malloc(sizeof(*event) + msg.len);
> > -                xenbus_event_queue *events = NULL;
> > -                char *data = (char*)event + sizeof(*event);
> > -                struct watch *watch;
> > -
> > -                memcpy_from_ring(xenstore_buf->rsp, data,
> > -                    MASK_XENSTORE_IDX(xenstore_buf->rsp_cons + sizeof(msg)),
> > -                    msg.len);
> > -
> > -                event->path = data;
> > -                event->token = event->path + strlen(event->path) + 1;
> > -
> > -                mb();
> > -                xenstore_buf->rsp_cons += msg.len + sizeof(msg);
> > -
> > -                for (watch = watches; watch; watch = watch->next)
> > -                    if (!strcmp(watch->token, event->token)) {
> > -                        events = watch->events;
> > -                        break;
> > -                    }
> > -
> > -                if (events) {
> > -                    event->next = *events;
> > -                    *events = event;
> > -                    wake_up(&xenbus_watch_queue);
> > -                } else {
> > -                    printk("unexpected watch token %s\n", event->token);
> > -                    free(event);
> > +        xenbus_read_data((char *)&msg, sizeof(msg));
> > +        DEBUG("Msg len %d, %d avail, id %d.\n", msg.len + sizeof(msg),
> > +              xenstore_buf->rsp_prod - xenstore_buf->rsp_cons, msg.req_id);
> > +
> > +        if (msg.len > XENSTORE_PAYLOAD_MAX) {
> > +            printk("Xenstore violates protocol, message longer than allowed.\n");
> > +            return;
> > +        }
> > +
> > +        if (msg.type == XS_WATCH_EVENT) {
> > +            struct xenbus_event *event = malloc(sizeof(*event) + msg.len);
> > +            xenbus_event_queue *events = NULL;
> > +            struct watch *watch;
> > +            char *c;
> > +            int zeroes = 0;
> > +
> > +            data = (char*)event + sizeof(*event);
> > +            xenbus_read_data(data, msg.len);
> > +
> > +            for (c = data; c < data + msg.len; c++)
> > +                if (!*c)
> > +                    zeroes++;
> > +            if (zeroes != 2) {
> > +                printk("Xenstore: illegal watch event data\n");
> > +                free(event);
> > +                continue;
> > +            }
> > +
> > +            event->path = data;
> > +            event->token = event->path + strlen(event->path) + 1;
> > +
> > +            for (watch = watches; watch; watch = watch->next)
> > +                if (!strcmp(watch->token, event->token)) {
> > +                    events = watch->events;
> > +                    break;
> >                   }
> > +
> > +            if (events) {
> > +                event->next = *events;
> > +                *events = event;
> > +                wake_up(&xenbus_watch_queue);
> >               } else {
> > -                req_info[msg.req_id].reply = malloc(sizeof(msg) + msg.len);
> > -                memcpy_from_ring(xenstore_buf->rsp, req_info[msg.req_id].reply,
> > -                                 MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
> > -                                 msg.len + sizeof(msg));
> > -                mb();
> > -                xenstore_buf->rsp_cons += msg.len + sizeof(msg);
> > -                wake_up(&req_info[msg.req_id].waitq);
> > +                printk("Xenstore: unexpected watch token %s\n", event->token);
> > +                free(event);
> >               }
> > -            wmb();
> > -            notify_remote_via_evtchn(xenbus_evtchn);
> > +            continue;
> >           }
> > +
> > +        data = malloc(sizeof(msg) + msg.len);
> > +        memcpy(data, &msg, sizeof(msg));
> > +        xenbus_read_data(data + sizeof(msg), msg.len);
> > +
> > +        if (msg.req_id >= NR_REQS || !req_info[msg.req_id].in_use) {
> > +            printk("Xenstore: illegal request id %d\n", msg.req_id);
> > +            free(data);
> > +            continue;
> > +        }
> > +
> > +        DEBUG("Message is good.\n");
> > +
> > +        req_info[msg.req_id].reply = data;
> > +
> > +        wake_up(&req_info[msg.req_id].waitq);
> >       }
> >   }
> > @@ -451,36 +481,40 @@ static void xb_write(int type, int req_id, xenbus_transaction_t trans_id,
> >       cur_req = &header_req;
> > -    BUG_ON(len > XENSTORE_RING_SIZE);
> > -    /* Wait for the ring to drain to the point where we can send the
> > -       message. */
> > -    prod = xenstore_buf->req_prod;
> > -    if (prod + len - xenstore_buf->req_cons > XENSTORE_RING_SIZE)
> > -    {
> > -        /* Wait for there to be space on the ring */
> > -        DEBUG("prod %d, len %d, cons %d, size %d; waiting.\n",
> > -                prod, len, xenstore_buf->req_cons, XENSTORE_RING_SIZE);
> > -        wait_event(xb_waitq,
> > -                xenstore_buf->req_prod + len - xenstore_buf->req_cons <=
> > -                XENSTORE_RING_SIZE);
> > -        DEBUG("Back from wait.\n");
> > -        prod = xenstore_buf->req_prod;
> > -    }
> > +    BUG_ON(len > XENSTORE_PAYLOAD_MAX);
> > +
> > +    /* Make sure we are the only thread trying to write. */
> > +    down(&xb_write_sem);
> > -    /* We're now guaranteed to be able to send the message without
> > -       overflowing the ring.  Do so. */
> > +    /* Send the message in chunks using free ring space when available. */
> >       total_off = 0;
> >       req_off = 0;
> > -    while (total_off < len)
> > +    while (total_off < len)
> >       {
> > +        prod = xenstore_buf->req_prod;
> > +        if (prod - xenstore_buf->req_cons >= XENSTORE_RING_SIZE)
> > +        {
> > +            /* Send evtchn to notify remote */
> > +            notify_remote_via_evtchn(xenbus_evtchn);
> > +
> > +            /* Wait for there to be space on the ring */
> > +            DEBUG("prod %d, len %d, cons %d, size %d; waiting.\n", prod,
> > +                  len - total_off, xenstore_buf->req_cons, XENSTORE_RING_SIZE);
> > +            wait_event(xb_waitq,
> > +                       prod - xenstore_buf->req_cons < XENSTORE_RING_SIZE);
> > +            DEBUG("Back from wait.\n");
> > +        }
> > +
> >           this_chunk = min(cur_req->len - req_off,
> > -                XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod));
> > +                         XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod));
> > +        this_chunk = min(this_chunk,
> > +                         xenstore_buf->req_cons + XENSTORE_RING_SIZE - prod);
> >           memcpy((char *)xenstore_buf->req + MASK_XENSTORE_IDX(prod),
> > -                (char *)cur_req->data + req_off, this_chunk);
> > +               (char *)cur_req->data + req_off, this_chunk);
> >           prod += this_chunk;
> >           req_off += this_chunk;
> >           total_off += this_chunk;
> > -        if (req_off == cur_req->len)
> > +        if (req_off == cur_req->len)
> >           {
> >               req_off = 0;
> >               if (cur_req == &header_req)
> > @@ -488,20 +522,20 @@ static void xb_write(int type, int req_id, xenbus_transaction_t trans_id,
> >               else
> >                   cur_req++;
> >           }
> > +
> > +        /* Remote must see entire message before updating indexes */
> > +        wmb();
> > +        xenstore_buf->req_prod = prod;
> >       }
> > +    /* Send evtchn to notify remote */
> > +    notify_remote_via_evtchn(xenbus_evtchn);
> > +
> >       DEBUG("Complete main loop of xb_write.\n");
> >       BUG_ON(req_off != 0);
> >       BUG_ON(total_off != len);
> > -    BUG_ON(prod > xenstore_buf->req_cons + XENSTORE_RING_SIZE);
> > -    /* Remote must see entire message before updating indexes */
> > -    wmb();
> > -
> > -    xenstore_buf->req_prod += len;
> > -
> > -    /* Send evtchn to notify remote */
> > -    notify_remote_via_evtchn(xenbus_evtchn);
> > +    up(&xb_write_sem);
> >   }
> >   /* Send a mesasge to xenbus, in the same fashion as xb_write, and
> > 
> 


^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2021-11-28 19:36 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-10-04  9:40 [PATCH v4] xenbus: support large messages Juergen Gross
2021-11-24  7:00 ` Juergen Gross
2021-11-28 19:36   ` Samuel Thibault

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.