linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH 0/3] xen: optimize xenbus performance
@ 2017-01-06 15:05 Juergen Gross
  2017-01-06 15:05 ` [PATCH 1/3] xen: clean up xenbus internal headers Juergen Gross
                   ` (2 more replies)
  0 siblings, 3 replies; 21+ messages in thread
From: Juergen Gross @ 2017-01-06 15:05 UTC (permalink / raw)
  To: linux-kernel, xen-devel
  Cc: boris.ostrovsky, Juergen Gross, konrad.wilk, roger.pau, wei.liu2,
	paul.durrant, netdev

The xenbus driver used for communication with Xenstore (all kernel
accesses to Xenstore and in case of Xenstore living in another domain
all accesses of the local domain to Xenstore) is rather simple
especially regarding multiple concurrent accesses: they are just being
serialized in spite of Xenstore being capable to handle multiple
parallel accesses.

Clean up the external interface(s) of xenbus and optimize its
performance by allowing multiple concurrent accesses to Xenstore.

Juergen Gross (3):
  xen: clean up xenbus internal headers
  xen: modify xenstore watch event interface
  xen: optimize xenbus driver for multiple concurrent xenstore accesses

 drivers/block/xen-blkback/xenbus.c         |   6 +-
 drivers/net/xen-netback/xenbus.c           |   8 +-
 drivers/xen/cpu_hotplug.c                  |   5 +-
 drivers/xen/manage.c                       |   6 +-
 drivers/xen/xen-balloon.c                  |   2 +-
 drivers/xen/xen-pciback/xenbus.c           |   2 +-
 drivers/xen/xenbus/xenbus.h                | 134 ++++++++
 drivers/xen/xenbus/xenbus_client.c         |   6 +-
 drivers/xen/xenbus/xenbus_comms.c          | 319 +++++++++++++++--
 drivers/xen/xenbus/xenbus_comms.h          |  51 ---
 drivers/xen/xenbus/xenbus_dev_backend.c    |   2 +-
 drivers/xen/xenbus/xenbus_dev_frontend.c   | 213 +++++++-----
 drivers/xen/xenbus/xenbus_probe.c          |  14 +-
 drivers/xen/xenbus/xenbus_probe.h          |  88 -----
 drivers/xen/xenbus/xenbus_probe_backend.c  |  11 +-
 drivers/xen/xenbus/xenbus_probe_frontend.c |  17 +-
 drivers/xen/xenbus/xenbus_xs.c             | 535 +++++++++++++----------------
 drivers/xen/xenfs/super.c                  |   2 +-
 drivers/xen/xenfs/xenstored.c              |   2 +-
 include/xen/xenbus.h                       |  18 +-
 20 files changed, 830 insertions(+), 611 deletions(-)
 create mode 100644 drivers/xen/xenbus/xenbus.h
 delete mode 100644 drivers/xen/xenbus/xenbus_comms.h
 delete mode 100644 drivers/xen/xenbus/xenbus_probe.h

Cc: konrad.wilk@oracle.com
Cc: roger.pau@citrix.com
Cc: wei.liu2@citrix.com
Cc: paul.durrant@citrix.com
Cc: netdev@vger.kernel.org

-- 
2.10.2

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [PATCH 1/3] xen: clean up xenbus internal headers
  2017-01-06 15:05 [PATCH 0/3] xen: optimize xenbus performance Juergen Gross
@ 2017-01-06 15:05 ` Juergen Gross
  2017-01-06 20:52   ` Boris Ostrovsky
  2017-01-06 15:05 ` [PATCH 2/3] xen: modify xenstore watch event interface Juergen Gross
  2017-01-06 15:05 ` [PATCH 3/3] xen: optimize xenbus driver for multiple concurrent xenstore accesses Juergen Gross
  2 siblings, 1 reply; 21+ messages in thread
From: Juergen Gross @ 2017-01-06 15:05 UTC (permalink / raw)
  To: linux-kernel, xen-devel; +Cc: boris.ostrovsky, Juergen Gross

The xenbus driver has an awful mixture of internal and global visible
headers: some of the internal used only stuff is defined in the
global header include/xen/xenbus.h while some stuff defined in internal
headers is used by other drivers, too.

Clean this up by moving the external used symbols to
include/xen/xenbus.h and the symbols used internal only to a new
header drivers/xen/xenbus/xenbus.h

Signed-off-by: Juergen Gross <jgross@suse.com>
---
 drivers/xen/xenbus/{xenbus_probe.h => xenbus.h} | 64 ++++++++++++++-----------
 drivers/xen/xenbus/xenbus_client.c              |  2 +-
 drivers/xen/xenbus/xenbus_comms.c               |  2 +-
 drivers/xen/xenbus/xenbus_comms.h               | 51 --------------------
 drivers/xen/xenbus/xenbus_dev_backend.c         |  2 +-
 drivers/xen/xenbus/xenbus_dev_frontend.c        |  4 +-
 drivers/xen/xenbus/xenbus_probe.c               |  3 +-
 drivers/xen/xenbus/xenbus_probe_backend.c       |  3 +-
 drivers/xen/xenbus/xenbus_probe_frontend.c      |  3 +-
 drivers/xen/xenbus/xenbus_xs.c                  |  3 +-
 drivers/xen/xenfs/super.c                       |  2 +-
 drivers/xen/xenfs/xenstored.c                   |  2 +-
 include/xen/xenbus.h                            | 12 ++---
 13 files changed, 52 insertions(+), 101 deletions(-)
 rename drivers/xen/xenbus/{xenbus_probe.h => xenbus.h} (58%)
 delete mode 100644 drivers/xen/xenbus/xenbus_comms.h

diff --git a/drivers/xen/xenbus/xenbus_probe.h b/drivers/xen/xenbus/xenbus.h
similarity index 58%
rename from drivers/xen/xenbus/xenbus_probe.h
rename to drivers/xen/xenbus/xenbus.h
index c9ec7ca..6a80c1e 100644
--- a/drivers/xen/xenbus/xenbus_probe.h
+++ b/drivers/xen/xenbus/xenbus.h
@@ -1,10 +1,7 @@
-/******************************************************************************
- * xenbus_probe.h
- *
- * Talks to Xen Store to figure out what devices we have.
+/*
+ * Private include for xenbus communications.
  *
  * Copyright (C) 2005 Rusty Russell, IBM Corporation
- * Copyright (C) 2005 XenSource Ltd.
  *
  * This program is free software; you can redistribute it and/or
  * modify it under the terms of the GNU General Public License version 2
@@ -31,8 +28,8 @@
  * IN THE SOFTWARE.
  */
 
-#ifndef _XENBUS_PROBE_H
-#define _XENBUS_PROBE_H
+#ifndef _XENBUS_XENBUS_H
+#define _XENBUS_XENBUS_H
 
 #define XEN_BUS_ID_SIZE			20
 
@@ -54,35 +51,46 @@ enum xenstore_init {
 	XS_LOCAL,
 };
 
+extern enum xenstore_init xen_store_domain_type;
 extern const struct attribute_group *xenbus_dev_groups[];
 
-extern int xenbus_match(struct device *_dev, struct device_driver *_drv);
-extern int xenbus_dev_probe(struct device *_dev);
-extern int xenbus_dev_remove(struct device *_dev);
-extern int xenbus_register_driver_common(struct xenbus_driver *drv,
-					 struct xen_bus_type *bus,
-					 struct module *owner,
-					 const char *mod_name);
-extern int xenbus_probe_node(struct xen_bus_type *bus,
-			     const char *type,
-			     const char *nodename);
-extern int xenbus_probe_devices(struct xen_bus_type *bus);
+int xs_init(void);
+int xb_init_comms(void);
+void xb_deinit_comms(void);
+int xb_write(const void *data, unsigned int len);
+int xb_read(void *data, unsigned int len);
+int xb_data_to_read(void);
+int xb_wait_for_data_to_read(void);
 
-extern void xenbus_dev_changed(const char *node, struct xen_bus_type *bus);
+int xenbus_match(struct device *_dev, struct device_driver *_drv);
+int xenbus_dev_probe(struct device *_dev);
+int xenbus_dev_remove(struct device *_dev);
+int xenbus_register_driver_common(struct xenbus_driver *drv,
+				  struct xen_bus_type *bus,
+				  struct module *owner,
+				  const char *mod_name);
+int xenbus_probe_node(struct xen_bus_type *bus,
+		      const char *type,
+		      const char *nodename);
+int xenbus_probe_devices(struct xen_bus_type *bus);
 
-extern void xenbus_dev_shutdown(struct device *_dev);
+void xenbus_dev_changed(const char *node, struct xen_bus_type *bus);
 
-extern int xenbus_dev_suspend(struct device *dev);
-extern int xenbus_dev_resume(struct device *dev);
-extern int xenbus_dev_cancel(struct device *dev);
+void xenbus_dev_shutdown(struct device *_dev);
 
-extern void xenbus_otherend_changed(struct xenbus_watch *watch,
-				    const char **vec, unsigned int len,
-				    int ignore_on_shutdown);
+int xenbus_dev_suspend(struct device *dev);
+int xenbus_dev_resume(struct device *dev);
+int xenbus_dev_cancel(struct device *dev);
 
-extern int xenbus_read_otherend_details(struct xenbus_device *xendev,
-					char *id_node, char *path_node);
+void xenbus_otherend_changed(struct xenbus_watch *watch,
+			     const char **vec, unsigned int len,
+			     int ignore_on_shutdown);
+
+int xenbus_read_otherend_details(struct xenbus_device *xendev,
+				 char *id_node, char *path_node);
 
 void xenbus_ring_ops_init(void);
 
+void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg);
+
 #endif
diff --git a/drivers/xen/xenbus/xenbus_client.c b/drivers/xen/xenbus/xenbus_client.c
index 056da6e..23edf53 100644
--- a/drivers/xen/xenbus/xenbus_client.c
+++ b/drivers/xen/xenbus/xenbus_client.c
@@ -47,7 +47,7 @@
 #include <xen/xen.h>
 #include <xen/features.h>
 
-#include "xenbus_probe.h"
+#include "xenbus.h"
 
 #define XENBUS_PAGES(_grants)	(DIV_ROUND_UP(_grants, XEN_PFN_PER_PAGE))
 
diff --git a/drivers/xen/xenbus/xenbus_comms.c b/drivers/xen/xenbus/xenbus_comms.c
index ecdecce..c21ec02 100644
--- a/drivers/xen/xenbus/xenbus_comms.c
+++ b/drivers/xen/xenbus/xenbus_comms.c
@@ -40,7 +40,7 @@
 #include <asm/xen/hypervisor.h>
 #include <xen/events.h>
 #include <xen/page.h>
-#include "xenbus_comms.h"
+#include "xenbus.h"
 
 static int xenbus_irq;
 
diff --git a/drivers/xen/xenbus/xenbus_comms.h b/drivers/xen/xenbus/xenbus_comms.h
deleted file mode 100644
index 867a2e4..0000000
--- a/drivers/xen/xenbus/xenbus_comms.h
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Private include for xenbus communications.
- *
- * Copyright (C) 2005 Rusty Russell, IBM Corporation
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License version 2
- * as published by the Free Software Foundation; or, when distributed
- * separately from the Linux kernel or incorporated into other
- * software packages, subject to the following license:
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this source file (the "Software"), to deal in the Software without
- * restriction, including without limitation the rights to use, copy, modify,
- * merge, publish, distribute, sublicense, and/or sell copies of the Software,
- * and to permit persons to whom the Software is furnished to do so, subject to
- * the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
- * IN THE SOFTWARE.
- */
-
-#ifndef _XENBUS_COMMS_H
-#define _XENBUS_COMMS_H
-
-#include <linux/fs.h>
-
-int xs_init(void);
-int xb_init_comms(void);
-void xb_deinit_comms(void);
-
-/* Low level routines. */
-int xb_write(const void *data, unsigned len);
-int xb_read(void *data, unsigned len);
-int xb_data_to_read(void);
-int xb_wait_for_data_to_read(void);
-extern struct xenstore_domain_interface *xen_store_interface;
-extern int xen_store_evtchn;
-extern enum xenstore_init xen_store_domain_type;
-
-extern const struct file_operations xen_xenbus_fops;
-
-#endif /* _XENBUS_COMMS_H */
diff --git a/drivers/xen/xenbus/xenbus_dev_backend.c b/drivers/xen/xenbus/xenbus_dev_backend.c
index 4a41ac9..1126701 100644
--- a/drivers/xen/xenbus/xenbus_dev_backend.c
+++ b/drivers/xen/xenbus/xenbus_dev_backend.c
@@ -16,7 +16,7 @@
 #include <xen/events.h>
 #include <asm/xen/hypervisor.h>
 
-#include "xenbus_comms.h"
+#include "xenbus.h"
 
 static int xenbus_backend_open(struct inode *inode, struct file *filp)
 {
diff --git a/drivers/xen/xenbus/xenbus_dev_frontend.c b/drivers/xen/xenbus/xenbus_dev_frontend.c
index 79130b3..e2bc9b3 100644
--- a/drivers/xen/xenbus/xenbus_dev_frontend.c
+++ b/drivers/xen/xenbus/xenbus_dev_frontend.c
@@ -57,12 +57,12 @@
 #include <linux/miscdevice.h>
 #include <linux/init.h>
 
-#include "xenbus_comms.h"
-
 #include <xen/xenbus.h>
 #include <xen/xen.h>
 #include <asm/xen/hypervisor.h>
 
+#include "xenbus.h"
+
 /*
  * An element of a list of outstanding transactions, for which we're
  * still waiting a reply.
diff --git a/drivers/xen/xenbus/xenbus_probe.c b/drivers/xen/xenbus/xenbus_probe.c
index 4bdf654..6baffbb 100644
--- a/drivers/xen/xenbus/xenbus_probe.c
+++ b/drivers/xen/xenbus/xenbus_probe.c
@@ -62,8 +62,7 @@
 
 #include <xen/hvm.h>
 
-#include "xenbus_comms.h"
-#include "xenbus_probe.h"
+#include "xenbus.h"
 
 
 int xen_store_evtchn;
diff --git a/drivers/xen/xenbus/xenbus_probe_backend.c b/drivers/xen/xenbus/xenbus_probe_backend.c
index 37929df..f46b4dc 100644
--- a/drivers/xen/xenbus/xenbus_probe_backend.c
+++ b/drivers/xen/xenbus/xenbus_probe_backend.c
@@ -53,8 +53,7 @@
 #include <xen/xenbus.h>
 #include <xen/features.h>
 
-#include "xenbus_comms.h"
-#include "xenbus_probe.h"
+#include "xenbus.h"
 
 /* backend/<type>/<fe-uuid>/<id> => <type>-<fe-domid>-<id> */
 static int backend_bus_id(char bus_id[XEN_BUS_ID_SIZE], const char *nodename)
diff --git a/drivers/xen/xenbus/xenbus_probe_frontend.c b/drivers/xen/xenbus/xenbus_probe_frontend.c
index 6d40a97..d7b77a6 100644
--- a/drivers/xen/xenbus/xenbus_probe_frontend.c
+++ b/drivers/xen/xenbus/xenbus_probe_frontend.c
@@ -27,8 +27,7 @@
 
 #include <xen/platform_pci.h>
 
-#include "xenbus_comms.h"
-#include "xenbus_probe.h"
+#include "xenbus.h"
 
 
 
diff --git a/drivers/xen/xenbus/xenbus_xs.c b/drivers/xen/xenbus/xenbus_xs.c
index 6afb993..4c49d87 100644
--- a/drivers/xen/xenbus/xenbus_xs.c
+++ b/drivers/xen/xenbus/xenbus_xs.c
@@ -48,8 +48,7 @@
 #include <asm/xen/hypervisor.h>
 #include <xen/xenbus.h>
 #include <xen/xen.h>
-#include "xenbus_comms.h"
-#include "xenbus_probe.h"
+#include "xenbus.h"
 
 struct xs_stored_msg {
 	struct list_head list;
diff --git a/drivers/xen/xenfs/super.c b/drivers/xen/xenfs/super.c
index 8559a71..328c398 100644
--- a/drivers/xen/xenfs/super.c
+++ b/drivers/xen/xenfs/super.c
@@ -16,10 +16,10 @@
 #include <linux/magic.h>
 
 #include <xen/xen.h>
+#include <xen/xenbus.h>
 
 #include "xenfs.h"
 #include "../privcmd.h"
-#include "../xenbus/xenbus_comms.h"
 
 #include <asm/xen/hypervisor.h>
 
diff --git a/drivers/xen/xenfs/xenstored.c b/drivers/xen/xenfs/xenstored.c
index fef20db..82fd2a3 100644
--- a/drivers/xen/xenfs/xenstored.c
+++ b/drivers/xen/xenfs/xenstored.c
@@ -4,9 +4,9 @@
 #include <linux/fs.h>
 
 #include <xen/page.h>
+#include <xen/xenbus.h>
 
 #include "xenfs.h"
-#include "../xenbus/xenbus_comms.h"
 
 static ssize_t xsd_read(struct file *file, char __user *buf,
 			    size_t size, loff_t *off)
diff --git a/include/xen/xenbus.h b/include/xen/xenbus.h
index 271ba62..98f73a2 100644
--- a/include/xen/xenbus.h
+++ b/include/xen/xenbus.h
@@ -38,6 +38,7 @@
 #include <linux/notifier.h>
 #include <linux/mutex.h>
 #include <linux/export.h>
+#include <linux/fs.h>
 #include <linux/completion.h>
 #include <linux/init.h>
 #include <linux/slab.h>
@@ -175,16 +176,9 @@ void xs_suspend(void);
 void xs_resume(void);
 void xs_suspend_cancel(void);
 
-/* Used by xenbus_dev to borrow kernel's store connection. */
-void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg);
-
 struct work_struct;
 
-/* Prepare for domain suspend: then resume or cancel the suspend. */
-void xenbus_suspend(void);
-void xenbus_resume(void);
 void xenbus_probe(struct work_struct *);
-void xenbus_suspend_cancel(void);
 
 #define XENBUS_IS_ERR_READ(str) ({			\
 	if (!IS_ERR(str) && strlen(str) == 0) {		\
@@ -235,4 +229,8 @@ const char *xenbus_strstate(enum xenbus_state state);
 int xenbus_dev_is_online(struct xenbus_device *dev);
 int xenbus_frontend_closed(struct xenbus_device *dev);
 
+extern const struct file_operations xen_xenbus_fops;
+extern struct xenstore_domain_interface *xen_store_interface;
+extern int xen_store_evtchn;
+
 #endif /* _XEN_XENBUS_H */
-- 
2.10.2

^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 2/3] xen: modify xenstore watch event interface
  2017-01-06 15:05 [PATCH 0/3] xen: optimize xenbus performance Juergen Gross
  2017-01-06 15:05 ` [PATCH 1/3] xen: clean up xenbus internal headers Juergen Gross
@ 2017-01-06 15:05 ` Juergen Gross
  2017-01-06 15:38   ` Paul Durrant
                     ` (3 more replies)
  2017-01-06 15:05 ` [PATCH 3/3] xen: optimize xenbus driver for multiple concurrent xenstore accesses Juergen Gross
  2 siblings, 4 replies; 21+ messages in thread
From: Juergen Gross @ 2017-01-06 15:05 UTC (permalink / raw)
  To: linux-kernel, xen-devel
  Cc: boris.ostrovsky, Juergen Gross, konrad.wilk, roger.pau, wei.liu2,
	paul.durrant, netdev

Today a Xenstore watch event is delivered via a callback function
declared as:

void (*callback)(struct xenbus_watch *,
                 const char **vec, unsigned int len);

As all watch events only ever come with two parameters (path and token)
changing the prototype to:

void (*callback)(struct xenbus_watch *,
                 const char *path, const char *token);

is the natural thing to do.

Apply this change and adapt all users.

Cc: konrad.wilk@oracle.com
Cc: roger.pau@citrix.com
Cc: wei.liu2@citrix.com
Cc: paul.durrant@citrix.com
Cc: netdev@vger.kernel.org

Signed-off-by: Juergen Gross <jgross@suse.com>
---
 drivers/block/xen-blkback/xenbus.c         |  6 +++---
 drivers/net/xen-netback/xenbus.c           |  8 ++++----
 drivers/xen/cpu_hotplug.c                  |  5 ++---
 drivers/xen/manage.c                       |  6 +++---
 drivers/xen/xen-balloon.c                  |  2 +-
 drivers/xen/xen-pciback/xenbus.c           |  2 +-
 drivers/xen/xenbus/xenbus.h                |  6 +++---
 drivers/xen/xenbus/xenbus_client.c         |  4 ++--
 drivers/xen/xenbus/xenbus_dev_frontend.c   | 21 ++++++++-------------
 drivers/xen/xenbus/xenbus_probe.c          | 11 ++++-------
 drivers/xen/xenbus/xenbus_probe_backend.c  |  8 ++++----
 drivers/xen/xenbus/xenbus_probe_frontend.c | 14 +++++++-------
 drivers/xen/xenbus/xenbus_xs.c             | 29 ++++++++++++++---------------
 include/xen/xenbus.h                       |  6 +++---
 14 files changed, 59 insertions(+), 69 deletions(-)

diff --git a/drivers/block/xen-blkback/xenbus.c b/drivers/block/xen-blkback/xenbus.c
index 415e79b..8fe61b5 100644
--- a/drivers/block/xen-blkback/xenbus.c
+++ b/drivers/block/xen-blkback/xenbus.c
@@ -38,8 +38,8 @@ struct backend_info {
 static struct kmem_cache *xen_blkif_cachep;
 static void connect(struct backend_info *);
 static int connect_ring(struct backend_info *);
-static void backend_changed(struct xenbus_watch *, const char **,
-			    unsigned int);
+static void backend_changed(struct xenbus_watch *, const char *,
+			    const char *);
 static void xen_blkif_free(struct xen_blkif *blkif);
 static void xen_vbd_free(struct xen_vbd *vbd);
 
@@ -661,7 +661,7 @@ static int xen_blkbk_probe(struct xenbus_device *dev,
  * ready, connect.
  */
 static void backend_changed(struct xenbus_watch *watch,
-			    const char **vec, unsigned int len)
+			    const char *path, const char *token)
 {
 	int err;
 	unsigned major;
diff --git a/drivers/net/xen-netback/xenbus.c b/drivers/net/xen-netback/xenbus.c
index 3124eae..d8a40fa 100644
--- a/drivers/net/xen-netback/xenbus.c
+++ b/drivers/net/xen-netback/xenbus.c
@@ -723,7 +723,7 @@ static int xen_net_read_mac(struct xenbus_device *dev, u8 mac[])
 }
 
 static void xen_net_rate_changed(struct xenbus_watch *watch,
-				const char **vec, unsigned int len)
+				 const char *path, const char *token)
 {
 	struct xenvif *vif = container_of(watch, struct xenvif, credit_watch);
 	struct xenbus_device *dev = xenvif_to_xenbus_device(vif);
@@ -780,7 +780,7 @@ static void xen_unregister_credit_watch(struct xenvif *vif)
 }
 
 static void xen_mcast_ctrl_changed(struct xenbus_watch *watch,
-				   const char **vec, unsigned int len)
+				   const char *path, const char *token)
 {
 	struct xenvif *vif = container_of(watch, struct xenvif,
 					  mcast_ctrl_watch);
@@ -855,8 +855,8 @@ static void unregister_hotplug_status_watch(struct backend_info *be)
 }
 
 static void hotplug_status_changed(struct xenbus_watch *watch,
-				   const char **vec,
-				   unsigned int vec_size)
+				   const char *path,
+				   const char *token)
 {
 	struct backend_info *be = container_of(watch,
 					       struct backend_info,
diff --git a/drivers/xen/cpu_hotplug.c b/drivers/xen/cpu_hotplug.c
index 5676aef..7a4daa2 100644
--- a/drivers/xen/cpu_hotplug.c
+++ b/drivers/xen/cpu_hotplug.c
@@ -68,13 +68,12 @@ static void vcpu_hotplug(unsigned int cpu)
 }
 
 static void handle_vcpu_hotplug_event(struct xenbus_watch *watch,
-					const char **vec, unsigned int len)
+				      const char *path, const char *token)
 {
 	unsigned int cpu;
 	char *cpustr;
-	const char *node = vec[XS_WATCH_PATH];
 
-	cpustr = strstr(node, "cpu/");
+	cpustr = strstr(path, "cpu/");
 	if (cpustr != NULL) {
 		sscanf(cpustr, "cpu/%u", &cpu);
 		vcpu_hotplug(cpu);
diff --git a/drivers/xen/manage.c b/drivers/xen/manage.c
index 26e5e85..ca62c09 100644
--- a/drivers/xen/manage.c
+++ b/drivers/xen/manage.c
@@ -218,7 +218,7 @@ static struct shutdown_handler shutdown_handlers[] = {
 };
 
 static void shutdown_handler(struct xenbus_watch *watch,
-			     const char **vec, unsigned int len)
+			     const char *path, const char *token)
 {
 	char *str;
 	struct xenbus_transaction xbt;
@@ -266,8 +266,8 @@ static void shutdown_handler(struct xenbus_watch *watch,
 }
 
 #ifdef CONFIG_MAGIC_SYSRQ
-static void sysrq_handler(struct xenbus_watch *watch, const char **vec,
-			  unsigned int len)
+static void sysrq_handler(struct xenbus_watch *watch, const char *path,
+			  const char *token)
 {
 	char sysrq_key = '\0';
 	struct xenbus_transaction xbt;
diff --git a/drivers/xen/xen-balloon.c b/drivers/xen/xen-balloon.c
index 79865b8..e7715cb 100644
--- a/drivers/xen/xen-balloon.c
+++ b/drivers/xen/xen-balloon.c
@@ -55,7 +55,7 @@ static int register_balloon(struct device *dev);
 
 /* React to a change in the target key */
 static void watch_target(struct xenbus_watch *watch,
-			 const char **vec, unsigned int len)
+			 const char *path, const char *token)
 {
 	unsigned long long new_target;
 	int err;
diff --git a/drivers/xen/xen-pciback/xenbus.c b/drivers/xen/xen-pciback/xenbus.c
index 3f0aee0..3814b44 100644
--- a/drivers/xen/xen-pciback/xenbus.c
+++ b/drivers/xen/xen-pciback/xenbus.c
@@ -652,7 +652,7 @@ static int xen_pcibk_setup_backend(struct xen_pcibk_device *pdev)
 }
 
 static void xen_pcibk_be_watch(struct xenbus_watch *watch,
-			     const char **vec, unsigned int len)
+			       const char *path, const char *token)
 {
 	struct xen_pcibk_device *pdev =
 	    container_of(watch, struct xen_pcibk_device, be_watch);
diff --git a/drivers/xen/xenbus/xenbus.h b/drivers/xen/xenbus/xenbus.h
index 6a80c1e..bd95c21 100644
--- a/drivers/xen/xenbus/xenbus.h
+++ b/drivers/xen/xenbus/xenbus.h
@@ -39,8 +39,8 @@ struct xen_bus_type {
 	int (*get_bus_id)(char bus_id[XEN_BUS_ID_SIZE], const char *nodename);
 	int (*probe)(struct xen_bus_type *bus, const char *type,
 		     const char *dir);
-	void (*otherend_changed)(struct xenbus_watch *watch, const char **vec,
-				 unsigned int len);
+	void (*otherend_changed)(struct xenbus_watch *watch, const char *path,
+				 const char *token);
 	struct bus_type bus;
 };
 
@@ -83,7 +83,7 @@ int xenbus_dev_resume(struct device *dev);
 int xenbus_dev_cancel(struct device *dev);
 
 void xenbus_otherend_changed(struct xenbus_watch *watch,
-			     const char **vec, unsigned int len,
+			     const char *path, const char *token,
 			     int ignore_on_shutdown);
 
 int xenbus_read_otherend_details(struct xenbus_device *xendev,
diff --git a/drivers/xen/xenbus/xenbus_client.c b/drivers/xen/xenbus/xenbus_client.c
index 23edf53..9586c24 100644
--- a/drivers/xen/xenbus/xenbus_client.c
+++ b/drivers/xen/xenbus/xenbus_client.c
@@ -115,7 +115,7 @@ EXPORT_SYMBOL_GPL(xenbus_strstate);
 int xenbus_watch_path(struct xenbus_device *dev, const char *path,
 		      struct xenbus_watch *watch,
 		      void (*callback)(struct xenbus_watch *,
-				       const char **, unsigned int))
+				       const char *, const char *))
 {
 	int err;
 
@@ -153,7 +153,7 @@ EXPORT_SYMBOL_GPL(xenbus_watch_path);
 int xenbus_watch_pathfmt(struct xenbus_device *dev,
 			 struct xenbus_watch *watch,
 			 void (*callback)(struct xenbus_watch *,
-					const char **, unsigned int),
+					  const char *, const char *),
 			 const char *pathfmt, ...)
 {
 	int err;
diff --git a/drivers/xen/xenbus/xenbus_dev_frontend.c b/drivers/xen/xenbus/xenbus_dev_frontend.c
index e2bc9b3..e4b9847 100644
--- a/drivers/xen/xenbus/xenbus_dev_frontend.c
+++ b/drivers/xen/xenbus/xenbus_dev_frontend.c
@@ -258,26 +258,23 @@ static struct watch_adapter *alloc_watch_adapter(const char *path,
 }
 
 static void watch_fired(struct xenbus_watch *watch,
-			const char **vec,
-			unsigned int len)
+			const char *path,
+			const char *token)
 {
 	struct watch_adapter *adap;
 	struct xsd_sockmsg hdr;
-	const char *path, *token;
-	int path_len, tok_len, body_len, data_len = 0;
+	const char *token_caller;
+	int path_len, tok_len, body_len;
 	int ret;
 	LIST_HEAD(staging_q);
 
 	adap = container_of(watch, struct watch_adapter, watch);
 
-	path = vec[XS_WATCH_PATH];
-	token = adap->token;
+	token_caller = adap->token;
 
 	path_len = strlen(path) + 1;
-	tok_len = strlen(token) + 1;
-	if (len > 2)
-		data_len = vec[len] - vec[2] + 1;
-	body_len = path_len + tok_len + data_len;
+	tok_len = strlen(token_caller) + 1;
+	body_len = path_len + tok_len;
 
 	hdr.type = XS_WATCH_EVENT;
 	hdr.len = body_len;
@@ -288,9 +285,7 @@ static void watch_fired(struct xenbus_watch *watch,
 	if (!ret)
 		ret = queue_reply(&staging_q, path, path_len);
 	if (!ret)
-		ret = queue_reply(&staging_q, token, tok_len);
-	if (!ret && len > 2)
-		ret = queue_reply(&staging_q, vec[2], data_len);
+		ret = queue_reply(&staging_q, token_caller, tok_len);
 
 	if (!ret) {
 		/* success: pass reply list onto watcher */
diff --git a/drivers/xen/xenbus/xenbus_probe.c b/drivers/xen/xenbus/xenbus_probe.c
index 6baffbb..74888ca 100644
--- a/drivers/xen/xenbus/xenbus_probe.c
+++ b/drivers/xen/xenbus/xenbus_probe.c
@@ -169,7 +169,7 @@ int xenbus_read_otherend_details(struct xenbus_device *xendev,
 EXPORT_SYMBOL_GPL(xenbus_read_otherend_details);
 
 void xenbus_otherend_changed(struct xenbus_watch *watch,
-			     const char **vec, unsigned int len,
+			     const char *path, const char *token,
 			     int ignore_on_shutdown)
 {
 	struct xenbus_device *dev =
@@ -180,18 +180,15 @@ void xenbus_otherend_changed(struct xenbus_watch *watch,
 	/* Protect us against watches firing on old details when the otherend
 	   details change, say immediately after a resume. */
 	if (!dev->otherend ||
-	    strncmp(dev->otherend, vec[XS_WATCH_PATH],
-		    strlen(dev->otherend))) {
-		dev_dbg(&dev->dev, "Ignoring watch at %s\n",
-			vec[XS_WATCH_PATH]);
+	    strncmp(dev->otherend, path, strlen(dev->otherend))) {
+		dev_dbg(&dev->dev, "Ignoring watch at %s\n", path);
 		return;
 	}
 
 	state = xenbus_read_driver_state(dev->otherend);
 
 	dev_dbg(&dev->dev, "state is %d, (%s), %s, %s\n",
-		state, xenbus_strstate(state), dev->otherend_watch.node,
-		vec[XS_WATCH_PATH]);
+		state, xenbus_strstate(state), dev->otherend_watch.node, path);
 
 	/*
 	 * Ignore xenbus transitions during shutdown. This prevents us doing
diff --git a/drivers/xen/xenbus/xenbus_probe_backend.c b/drivers/xen/xenbus/xenbus_probe_backend.c
index f46b4dc..b0bed4f 100644
--- a/drivers/xen/xenbus/xenbus_probe_backend.c
+++ b/drivers/xen/xenbus/xenbus_probe_backend.c
@@ -181,9 +181,9 @@ static int xenbus_probe_backend(struct xen_bus_type *bus, const char *type,
 }
 
 static void frontend_changed(struct xenbus_watch *watch,
-			    const char **vec, unsigned int len)
+			     const char *path, const char *token)
 {
-	xenbus_otherend_changed(watch, vec, len, 0);
+	xenbus_otherend_changed(watch, path, token, 0);
 }
 
 static struct xen_bus_type xenbus_backend = {
@@ -204,11 +204,11 @@ static struct xen_bus_type xenbus_backend = {
 };
 
 static void backend_changed(struct xenbus_watch *watch,
-			    const char **vec, unsigned int len)
+			    const char *path, const char *token)
 {
 	DPRINTK("");
 
-	xenbus_dev_changed(vec[XS_WATCH_PATH], &xenbus_backend);
+	xenbus_dev_changed(path, &xenbus_backend);
 }
 
 static struct xenbus_watch be_watch = {
diff --git a/drivers/xen/xenbus/xenbus_probe_frontend.c b/drivers/xen/xenbus/xenbus_probe_frontend.c
index d7b77a6..19e45ce 100644
--- a/drivers/xen/xenbus/xenbus_probe_frontend.c
+++ b/drivers/xen/xenbus/xenbus_probe_frontend.c
@@ -86,9 +86,9 @@ static int xenbus_uevent_frontend(struct device *_dev,
 
 
 static void backend_changed(struct xenbus_watch *watch,
-			    const char **vec, unsigned int len)
+			    const char *path, const char *token)
 {
-	xenbus_otherend_changed(watch, vec, len, 1);
+	xenbus_otherend_changed(watch, path, token, 1);
 }
 
 static void xenbus_frontend_delayed_resume(struct work_struct *w)
@@ -153,11 +153,11 @@ static struct xen_bus_type xenbus_frontend = {
 };
 
 static void frontend_changed(struct xenbus_watch *watch,
-			     const char **vec, unsigned int len)
+			     const char *path, const char *token)
 {
 	DPRINTK("");
 
-	xenbus_dev_changed(vec[XS_WATCH_PATH], &xenbus_frontend);
+	xenbus_dev_changed(path, &xenbus_frontend);
 }
 
 
@@ -332,13 +332,13 @@ static DECLARE_WAIT_QUEUE_HEAD(backend_state_wq);
 static int backend_state;
 
 static void xenbus_reset_backend_state_changed(struct xenbus_watch *w,
-					const char **v, unsigned int l)
+					const char *path, const char *token)
 {
-	if (xenbus_scanf(XBT_NIL, v[XS_WATCH_PATH], "", "%i",
+	if (xenbus_scanf(XBT_NIL, path, "", "%i",
 			 &backend_state) != 1)
 		backend_state = XenbusStateUnknown;
 	printk(KERN_DEBUG "XENBUS: backend %s %s\n",
-			v[XS_WATCH_PATH], xenbus_strstate(backend_state));
+	       path, xenbus_strstate(backend_state));
 	wake_up(&backend_state_wq);
 }
 
diff --git a/drivers/xen/xenbus/xenbus_xs.c b/drivers/xen/xenbus/xenbus_xs.c
index 4c49d87..ebc768f 100644
--- a/drivers/xen/xenbus/xenbus_xs.c
+++ b/drivers/xen/xenbus/xenbus_xs.c
@@ -64,8 +64,8 @@ struct xs_stored_msg {
 		/* Queued watch events. */
 		struct {
 			struct xenbus_watch *handle;
-			char **vec;
-			unsigned int vec_size;
+			const char *path;
+			const char *token;
 		} watch;
 	} u;
 };
@@ -765,7 +765,7 @@ void unregister_xenbus_watch(struct xenbus_watch *watch)
 		if (msg->u.watch.handle != watch)
 			continue;
 		list_del(&msg->list);
-		kfree(msg->u.watch.vec);
+		kfree(msg->u.watch.path);
 		kfree(msg);
 	}
 	spin_unlock(&watch_events_lock);
@@ -833,11 +833,10 @@ static int xenwatch_thread(void *unused)
 
 		if (ent != &watch_events) {
 			msg = list_entry(ent, struct xs_stored_msg, list);
-			msg->u.watch.handle->callback(
-				msg->u.watch.handle,
-				(const char **)msg->u.watch.vec,
-				msg->u.watch.vec_size);
-			kfree(msg->u.watch.vec);
+			msg->u.watch.handle->callback(msg->u.watch.handle,
+						      msg->u.watch.path,
+						      msg->u.watch.token);
+			kfree(msg->u.watch.path);
 			kfree(msg);
 		}
 
@@ -903,24 +902,24 @@ static int process_msg(void)
 	body[msg->hdr.len] = '\0';
 
 	if (msg->hdr.type == XS_WATCH_EVENT) {
-		msg->u.watch.vec = split(body, msg->hdr.len,
-					 &msg->u.watch.vec_size);
-		if (IS_ERR(msg->u.watch.vec)) {
-			err = PTR_ERR(msg->u.watch.vec);
+		if (count_strings(body, msg->hdr.len) != 2) {
+			err = -EINVAL;
 			kfree(msg);
+			kfree(body);
 			goto out;
 		}
+		msg->u.watch.path = (const char *)body;
+		msg->u.watch.token = (const char *)strchr(body, '\0') + 1;
 
 		spin_lock(&watches_lock);
-		msg->u.watch.handle = find_watch(
-			msg->u.watch.vec[XS_WATCH_TOKEN]);
+		msg->u.watch.handle = find_watch(msg->u.watch.token);
 		if (msg->u.watch.handle != NULL) {
 			spin_lock(&watch_events_lock);
 			list_add_tail(&msg->list, &watch_events);
 			wake_up(&watch_events_waitq);
 			spin_unlock(&watch_events_lock);
 		} else {
-			kfree(msg->u.watch.vec);
+			kfree(body);
 			kfree(msg);
 		}
 		spin_unlock(&watches_lock);
diff --git a/include/xen/xenbus.h b/include/xen/xenbus.h
index 98f73a2..869c816 100644
--- a/include/xen/xenbus.h
+++ b/include/xen/xenbus.h
@@ -61,7 +61,7 @@ struct xenbus_watch
 
 	/* Callback (executed in a process context with no locks held). */
 	void (*callback)(struct xenbus_watch *,
-			 const char **vec, unsigned int len);
+			 const char *path, const char *token);
 };
 
 
@@ -193,11 +193,11 @@ void xenbus_probe(struct work_struct *);
 int xenbus_watch_path(struct xenbus_device *dev, const char *path,
 		      struct xenbus_watch *watch,
 		      void (*callback)(struct xenbus_watch *,
-				       const char **, unsigned int));
+				       const char *, const char *));
 __printf(4, 5)
 int xenbus_watch_pathfmt(struct xenbus_device *dev, struct xenbus_watch *watch,
 			 void (*callback)(struct xenbus_watch *,
-					  const char **, unsigned int),
+					  const char *, const char *),
 			 const char *pathfmt, ...);
 
 int xenbus_switch_state(struct xenbus_device *dev, enum xenbus_state new_state);
-- 
2.10.2

^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [PATCH 3/3] xen: optimize xenbus driver for multiple concurrent xenstore accesses
  2017-01-06 15:05 [PATCH 0/3] xen: optimize xenbus performance Juergen Gross
  2017-01-06 15:05 ` [PATCH 1/3] xen: clean up xenbus internal headers Juergen Gross
  2017-01-06 15:05 ` [PATCH 2/3] xen: modify xenstore watch event interface Juergen Gross
@ 2017-01-06 15:05 ` Juergen Gross
  2017-01-09 21:17   ` Boris Ostrovsky
                     ` (2 more replies)
  2 siblings, 3 replies; 21+ messages in thread
From: Juergen Gross @ 2017-01-06 15:05 UTC (permalink / raw)
  To: linux-kernel, xen-devel; +Cc: boris.ostrovsky, Juergen Gross

Handling of multiple concurrent Xenstore accesses through xenbus driver
either from the kernel or user land is rather lame today: xenbus is
capable to have one access active only at one point of time.

Rewrite xenbus to handle multiple requests concurrently by making use
of the request id of the Xenstore protocol. This requires to:

- Instead of blocking inside xb_read() when trying to read data from
  the xenstore ring buffer do so only in the main loop of
  xenbus_thread().

- Instead of doing writes to the xenstore ring buffer in the context of
  the caller just queue the request and do the write in the dedicated
  xenbus thread.

- Instead of just forwarding the request id specified by the caller of
  xenbus to xenstore use a xenbus internal unique request id. This will
  allow multiple outstanding requests.

- Modify the locking scheme in order to allow multiple requests being
  active in parallel.

- Instead of waiting for the reply of a user's xenstore request after
  writing the request to the xenstore ring buffer return directly to
  the caller and do the waiting in the read path.

Additionally signal handling was optimized by avoiding waking up the
xenbus thread or sending an event to Xenstore in case the addressed
entity is known to be running already.

As a result communication with Xenstore is sped up by a factor of up
to 5: depending on the request type (read or write) and the amount of
data transferred the gain was at least 20% (small reads) and went up to
a factor of 5 for large writes.

In the end some more rough edges of xenbus have been smoothed:

- Handling of memory shortage when reading from xenstore ring buffer in
  the xenbus driver was not optimal: it was busy looping and issuing a
  warning in each loop.

- In case of xenstore not running in dom0 but in a stubdom we end up
  with two xenbus threads running as the initialization of xenbus in
  dom0 expecting a local xenstored will be redone later when connecting
  to the xenstore domain. Up to now this was no problem as locking
  would prevent the two xenbus threads interfering with each other, but
  this was just a waste of kernel resources.

- An out of memory situation while writing to or reading from the
  xenstore ring buffer no longer will lead to a possible loss of
  synchronization with xenstore.

- The user read and write part are now interruptible by signals.

Signed-off-by: Juergen Gross <jgross@suse.com>
---
I'm aware that the changes are quite large. I thought about sending a
version split into multiple patches, but a lot of lines would have been
touched by more than one patch. I still have the multiple patch variant
lying around - this patch is split into 11 smaller ones. While all
steps of this larger series is operational some steps are not optimal
as they are even slower than the original version of xenbus.

Nevertheless I can send the large series if there are requests for it.
---
 drivers/xen/xenbus/xenbus.h              |  48 ++-
 drivers/xen/xenbus/xenbus_comms.c        | 317 ++++++++++++++++--
 drivers/xen/xenbus/xenbus_dev_frontend.c | 188 +++++++----
 drivers/xen/xenbus/xenbus_xs.c           | 529 ++++++++++++++-----------------
 4 files changed, 680 insertions(+), 402 deletions(-)

diff --git a/drivers/xen/xenbus/xenbus.h b/drivers/xen/xenbus/xenbus.h
index bd95c21..d693b5f 100644
--- a/drivers/xen/xenbus/xenbus.h
+++ b/drivers/xen/xenbus/xenbus.h
@@ -31,6 +31,10 @@
 #ifndef _XENBUS_XENBUS_H
 #define _XENBUS_XENBUS_H
 
+#include <linux/mutex.h>
+#include <linux/uio.h>
+#include <xen/xenbus.h>
+
 #define XEN_BUS_ID_SIZE			20
 
 struct xen_bus_type {
@@ -51,16 +55,49 @@ enum xenstore_init {
 	XS_LOCAL,
 };
 
+struct xs_watch_event {
+	struct list_head list;
+	unsigned int len;
+	struct xenbus_watch *handle;
+	const char *path;
+	const char *token;
+	char body[];
+};
+
+enum xb_req_state {
+	xb_req_state_queued,
+	xb_req_state_wait_reply,
+	xb_req_state_got_reply,
+	xb_req_state_aborted
+};
+
+struct xb_req_data {
+	struct list_head list;
+	wait_queue_head_t wq;
+	struct xsd_sockmsg msg;
+	enum xsd_sockmsg_type type;
+	char *body;
+	const struct kvec *vec;
+	int num_vecs;
+	int err;
+	enum xb_req_state state;
+	void (*cb)(struct xb_req_data *);
+	void *par;
+};
+
 extern enum xenstore_init xen_store_domain_type;
 extern const struct attribute_group *xenbus_dev_groups[];
+extern struct mutex xs_response_mutex;
+extern struct list_head xs_reply_list;
+extern struct list_head xb_write_list;
+extern wait_queue_head_t xb_waitq;
+extern struct mutex xb_write_mutex;
 
 int xs_init(void);
 int xb_init_comms(void);
 void xb_deinit_comms(void);
-int xb_write(const void *data, unsigned int len);
-int xb_read(void *data, unsigned int len);
-int xb_data_to_read(void);
-int xb_wait_for_data_to_read(void);
+int xs_watch_msg(struct xs_watch_event *event);
+void xs_request_exit(struct xb_req_data *req);
 
 int xenbus_match(struct device *_dev, struct device_driver *_drv);
 int xenbus_dev_probe(struct device *_dev);
@@ -91,6 +128,7 @@ int xenbus_read_otherend_details(struct xenbus_device *xendev,
 
 void xenbus_ring_ops_init(void);
 
-void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg);
+int xenbus_dev_request_and_reply(struct xsd_sockmsg *msg, void *par);
+void xenbus_dev_queue_reply(struct xb_req_data *req);
 
 #endif
diff --git a/drivers/xen/xenbus/xenbus_comms.c b/drivers/xen/xenbus/xenbus_comms.c
index c21ec02..fa054ca 100644
--- a/drivers/xen/xenbus/xenbus_comms.c
+++ b/drivers/xen/xenbus/xenbus_comms.c
@@ -34,6 +34,7 @@
 
 #include <linux/wait.h>
 #include <linux/interrupt.h>
+#include <linux/kthread.h>
 #include <linux/sched.h>
 #include <linux/err.h>
 #include <xen/xenbus.h>
@@ -42,11 +43,40 @@
 #include <xen/page.h>
 #include "xenbus.h"
 
+struct xs_thread_state_write {
+	struct xb_req_data *req;
+	int idx;
+	unsigned int used;
+};
+
+struct xs_thread_state_read {
+	struct xsd_sockmsg msg;
+	char *body;
+	union {
+		void *alloc;
+		struct xs_watch_event *watch;
+	};
+	bool in_msg;
+	bool in_hdr;
+	unsigned int used;
+};
+
+/* A list of replies. Currently only one will ever be outstanding. */
+LIST_HEAD(xs_reply_list);
+
+/* A list of write requests. */
+LIST_HEAD(xb_write_list);
+DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
+DEFINE_MUTEX(xb_write_mutex);
+
+/* Protect xenbus reader thread against save/restore. */
+DEFINE_MUTEX(xs_response_mutex);
+
 static int xenbus_irq;
+static struct task_struct *xenbus_task;
 
 static DECLARE_WORK(probe_work, xenbus_probe);
 
-static DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
 
 static irqreturn_t wake_waiting(int irq, void *unused)
 {
@@ -84,30 +114,31 @@ static const void *get_input_chunk(XENSTORE_RING_IDX cons,
 	return buf + MASK_XENSTORE_IDX(cons);
 }
 
+static int xb_data_to_write(void)
+{
+	struct xenstore_domain_interface *intf = xen_store_interface;
+
+	return (intf->req_prod - intf->req_cons) != XENSTORE_RING_SIZE &&
+		!list_empty(&xb_write_list);
+}
+
 /**
  * xb_write - low level write
  * @data: buffer to send
  * @len: length of buffer
  *
- * Returns 0 on success, error otherwise.
+ * Returns number of bytes written or -err.
  */
-int xb_write(const void *data, unsigned len)
+static int xb_write(const void *data, unsigned int len)
 {
 	struct xenstore_domain_interface *intf = xen_store_interface;
 	XENSTORE_RING_IDX cons, prod;
-	int rc;
+	unsigned int bytes = 0;
 
 	while (len != 0) {
 		void *dst;
 		unsigned int avail;
 
-		rc = wait_event_interruptible(
-			xb_waitq,
-			(intf->req_prod - intf->req_cons) !=
-			XENSTORE_RING_SIZE);
-		if (rc < 0)
-			return rc;
-
 		/* Read indexes, then verify. */
 		cons = intf->req_cons;
 		prod = intf->req_prod;
@@ -115,59 +146,57 @@ int xb_write(const void *data, unsigned len)
 			intf->req_cons = intf->req_prod = 0;
 			return -EIO;
 		}
-
-		dst = get_output_chunk(cons, prod, intf->req, &avail);
-		if (avail == 0)
-			continue;
-		if (avail > len)
-			avail = len;
+		if (!xb_data_to_write())
+			return bytes;
 
 		/* Must write data /after/ reading the consumer index. */
 		virt_mb();
 
+		dst = get_output_chunk(cons, prod, intf->req, &avail);
+		if (avail == 0)
+			continue;
+		if (avail > len)
+			avail = len;
+
 		memcpy(dst, data, avail);
 		data += avail;
 		len -= avail;
+		bytes += avail;
 
 		/* Other side must not see new producer until data is there. */
 		virt_wmb();
 		intf->req_prod += avail;
 
 		/* Implies mb(): other side will see the updated producer. */
-		notify_remote_via_evtchn(xen_store_evtchn);
+		if (prod <= intf->req_cons)
+			notify_remote_via_evtchn(xen_store_evtchn);
 	}
 
-	return 0;
+	return bytes;
 }
 
-int xb_data_to_read(void)
+static int xb_data_to_read(void)
 {
 	struct xenstore_domain_interface *intf = xen_store_interface;
 	return (intf->rsp_cons != intf->rsp_prod);
 }
 
-int xb_wait_for_data_to_read(void)
-{
-	return wait_event_interruptible(xb_waitq, xb_data_to_read());
-}
-
-int xb_read(void *data, unsigned len)
+static int xb_read(void *data, unsigned int len)
 {
 	struct xenstore_domain_interface *intf = xen_store_interface;
 	XENSTORE_RING_IDX cons, prod;
-	int rc;
+	unsigned int bytes = 0;
 
 	while (len != 0) {
 		unsigned int avail;
 		const char *src;
 
-		rc = xb_wait_for_data_to_read();
-		if (rc < 0)
-			return rc;
-
 		/* Read indexes, then verify. */
 		cons = intf->rsp_cons;
 		prod = intf->rsp_prod;
+		if (cons == prod)
+			return bytes;
+
 		if (!check_indexes(cons, prod)) {
 			intf->rsp_cons = intf->rsp_prod = 0;
 			return -EIO;
@@ -185,17 +214,229 @@ int xb_read(void *data, unsigned len)
 		memcpy(data, src, avail);
 		data += avail;
 		len -= avail;
+		bytes += avail;
 
 		/* Other side must not see free space until we've copied out */
 		virt_mb();
 		intf->rsp_cons += avail;
 
-		pr_debug("Finished read of %i bytes (%i to go)\n", avail, len);
-
 		/* Implies mb(): other side will see the updated consumer. */
-		notify_remote_via_evtchn(xen_store_evtchn);
+		if (intf->rsp_prod - cons >= XENSTORE_RING_SIZE)
+			notify_remote_via_evtchn(xen_store_evtchn);
 	}
 
+	return bytes;
+}
+
+static int process_msg(void)
+{
+	static struct xs_thread_state_read state;
+	struct xb_req_data *req;
+	int err;
+	unsigned int len;
+
+	if (!state.in_msg) {
+		state.in_msg = true;
+		state.in_hdr = true;
+		state.used = 0;
+
+		/*
+		 * We must disallow save/restore while reading a message.
+		 * A partial read across s/r leaves us out of sync with
+		 * xenstored.
+		 */
+		mutex_lock(&xs_response_mutex);
+
+		if (!xb_data_to_read()) {
+			/* We raced with save/restore: pending data 'gone'. */
+			mutex_unlock(&xs_response_mutex);
+			state.in_msg = false;
+			return 0;
+		}
+	}
+
+	if (state.in_hdr) {
+		if (state.used != sizeof(state.msg)) {
+			err = xb_read((void *)&state.msg + state.used,
+				      sizeof(state.msg) - state.used);
+			if (err < 0)
+				goto out;
+			state.used += err;
+			if (state.used != sizeof(state.msg))
+				return 0;
+			if (state.msg.len > XENSTORE_PAYLOAD_MAX) {
+				err = -EINVAL;
+				goto out;
+			}
+		}
+
+		len = state.msg.len + 1;
+		if (state.msg.type == XS_WATCH_EVENT)
+			len += sizeof(*state.watch);
+
+		state.alloc = kmalloc(len, GFP_NOIO | __GFP_HIGH);
+		if (!state.alloc)
+			return -ENOMEM;
+
+		if (state.msg.type == XS_WATCH_EVENT)
+			state.body = state.watch->body;
+		else
+			state.body = state.alloc;
+		state.in_hdr = false;
+		state.used = 0;
+	}
+
+	err = xb_read(state.body + state.used, state.msg.len - state.used);
+	if (err < 0)
+		goto out;
+
+	state.used += err;
+	if (state.used != state.msg.len)
+		return 0;
+
+	state.body[state.msg.len] = '\0';
+
+	if (state.msg.type == XS_WATCH_EVENT) {
+		state.watch->len = state.msg.len;
+		err = xs_watch_msg(state.watch);
+	} else {
+		err = -ENOENT;
+		mutex_lock(&xb_write_mutex);
+		list_for_each_entry(req, &xs_reply_list, list) {
+			if (req->msg.req_id == state.msg.req_id) {
+				if (req->state == xb_req_state_wait_reply) {
+					req->msg.type = state.msg.type;
+					req->msg.len = state.msg.len;
+					req->body = state.body;
+					req->state = xb_req_state_got_reply;
+					list_del(&req->list);
+					req->cb(req);
+				} else {
+					list_del(&req->list);
+					kfree(req);
+				}
+				err = 0;
+				break;
+			}
+		}
+		mutex_unlock(&xb_write_mutex);
+		if (err)
+			goto out;
+	}
+
+	mutex_unlock(&xs_response_mutex);
+
+	state.in_msg = false;
+	state.alloc = NULL;
+	return err;
+
+ out:
+	mutex_unlock(&xs_response_mutex);
+	state.in_msg = false;
+	kfree(state.alloc);
+	state.alloc = NULL;
+	return err;
+}
+
+static int process_writes(void)
+{
+	static struct xs_thread_state_write state;
+	void *base;
+	unsigned int len;
+	int err = 0;
+
+	if (!xb_data_to_write())
+		return 0;
+
+	mutex_lock(&xb_write_mutex);
+
+	if (!state.req) {
+		state.req = list_first_entry(&xb_write_list,
+					     struct xb_req_data, list);
+		state.idx = -1;
+		state.used = 0;
+	}
+
+	if (state.req->state == xb_req_state_aborted)
+		goto out_err;
+
+	while (state.idx < state.req->num_vecs) {
+		if (state.idx < 0) {
+			base = &state.req->msg;
+			len = sizeof(state.req->msg);
+		} else {
+			base = state.req->vec[state.idx].iov_base;
+			len = state.req->vec[state.idx].iov_len;
+		}
+		err = xb_write(base + state.used, len - state.used);
+		if (err < 0)
+			goto out_err;
+		state.used += err;
+		if (state.used != len)
+			goto out;
+
+		state.idx++;
+		state.used = 0;
+	}
+
+	/*
+	 * You would expect the following to be racy, but as the response is
+	 * being read by our thread there is no risk of req being freed
+	 * under our feet.
+	 */
+	list_del(&state.req->list);
+	state.req->state = xb_req_state_wait_reply;
+	list_add_tail(&state.req->list, &xs_reply_list);
+	state.req = NULL;
+
+ out:
+	mutex_unlock(&xb_write_mutex);
+
+	return 0;
+
+ out_err:
+	state.req->msg.type = XS_ERROR;
+	state.req->err = err;
+	list_del(&state.req->list);
+	if (state.req->state == xb_req_state_aborted)
+		kfree(state.req);
+	else {
+		state.req->state = xb_req_state_got_reply;
+		wake_up(&state.req->wq);
+	}
+
+	mutex_unlock(&xb_write_mutex);
+
+	state.req = NULL;
+
+	return err;
+}
+
+static int xb_thread_work(void)
+{
+	return xb_data_to_read() || xb_data_to_write();
+}
+
+static int xenbus_thread(void *unused)
+{
+	int err;
+
+	while (!kthread_should_stop()) {
+		if (wait_event_interruptible(xb_waitq, xb_thread_work()))
+			continue;
+
+		err = process_msg();
+		if (err == -ENOMEM)
+			schedule();
+		else if (err)
+			pr_warn("error %d while reading message\n", err);
+
+		err = process_writes();
+		if (err)
+			pr_warn("error %d while writing message\n", err);
+	}
+
+	xenbus_task = NULL;
 	return 0;
 }
 
@@ -223,6 +464,7 @@ int xb_init_comms(void)
 		rebind_evtchn_irq(xen_store_evtchn, xenbus_irq);
 	} else {
 		int err;
+
 		err = bind_evtchn_to_irqhandler(xen_store_evtchn, wake_waiting,
 						0, "xenbus", &xb_waitq);
 		if (err < 0) {
@@ -231,6 +473,13 @@ int xb_init_comms(void)
 		}
 
 		xenbus_irq = err;
+
+		if (!xenbus_task) {
+			xenbus_task = kthread_run(xenbus_thread, NULL,
+						  "xenbus");
+			if (IS_ERR(xenbus_task))
+				return PTR_ERR(xenbus_task);
+		}
 	}
 
 	return 0;
diff --git a/drivers/xen/xenbus/xenbus_dev_frontend.c b/drivers/xen/xenbus/xenbus_dev_frontend.c
index e4b9847..4d343ee 100644
--- a/drivers/xen/xenbus/xenbus_dev_frontend.c
+++ b/drivers/xen/xenbus/xenbus_dev_frontend.c
@@ -113,6 +113,7 @@ struct xenbus_file_priv {
 	struct list_head read_buffers;
 	wait_queue_head_t read_waitq;
 
+	struct kref kref;
 };
 
 /* Read out any raw xenbus messages queued up. */
@@ -297,6 +298,107 @@ static void watch_fired(struct xenbus_watch *watch,
 	mutex_unlock(&adap->dev_data->reply_mutex);
 }
 
+static void xenbus_file_free(struct kref *kref)
+{
+	struct xenbus_file_priv *u;
+	struct xenbus_transaction_holder *trans, *tmp;
+	struct watch_adapter *watch, *tmp_watch;
+	struct read_buffer *rb, *tmp_rb;
+
+	u = container_of(kref, struct xenbus_file_priv, kref);
+
+	/*
+	 * No need for locking here because there are no other users,
+	 * by definition.
+	 */
+
+	list_for_each_entry_safe(trans, tmp, &u->transactions, list) {
+		xenbus_transaction_end(trans->handle, 1);
+		list_del(&trans->list);
+		kfree(trans);
+	}
+
+	list_for_each_entry_safe(watch, tmp_watch, &u->watches, list) {
+		unregister_xenbus_watch(&watch->watch);
+		list_del(&watch->list);
+		free_watch_adapter(watch);
+	}
+
+	list_for_each_entry_safe(rb, tmp_rb, &u->read_buffers, list) {
+		list_del(&rb->list);
+		kfree(rb);
+	}
+	kfree(u);
+}
+
+static struct xenbus_transaction_holder *xenbus_get_transaction(
+	struct xenbus_file_priv *u, uint32_t tx_id)
+{
+	struct xenbus_transaction_holder *trans;
+
+	list_for_each_entry(trans, &u->transactions, list)
+		if (trans->handle.id == tx_id)
+			return trans;
+
+	return NULL;
+}
+
+void xenbus_dev_queue_reply(struct xb_req_data *req)
+{
+	struct xenbus_file_priv *u = req->par;
+	struct xenbus_transaction_holder *trans = NULL;
+	int rc;
+	LIST_HEAD(staging_q);
+
+	xs_request_exit(req);
+
+	mutex_lock(&u->msgbuffer_mutex);
+
+	if (req->type == XS_TRANSACTION_START) {
+		trans = xenbus_get_transaction(u, 0);
+		if (WARN_ON(!trans))
+			goto out;
+		if (req->msg.type == XS_ERROR) {
+			list_del(&trans->list);
+			kfree(trans);
+		} else {
+			rc = kstrtou32(req->body, 10, &trans->handle.id);
+			if (WARN_ON(rc))
+				goto out;
+		}
+	} else if (req->msg.type == XS_TRANSACTION_END) {
+		trans = xenbus_get_transaction(u, req->msg.tx_id);
+		if (WARN_ON(!trans))
+			goto out;
+		list_del(&trans->list);
+		kfree(trans);
+	}
+
+	mutex_unlock(&u->msgbuffer_mutex);
+
+	mutex_lock(&u->reply_mutex);
+	rc = queue_reply(&staging_q, &req->msg, sizeof(req->msg));
+	if (!rc)
+		rc = queue_reply(&staging_q, req->body, req->msg.len);
+	if (!rc) {
+		list_splice_tail(&staging_q, &u->read_buffers);
+		wake_up(&u->read_waitq);
+	} else {
+		queue_cleanup(&staging_q);
+	}
+	mutex_unlock(&u->reply_mutex);
+
+	kfree(req->body);
+	kfree(req);
+
+	kref_put(&u->kref, xenbus_file_free);
+
+	return;
+
+ out:
+	mutex_unlock(&u->msgbuffer_mutex);
+}
+
 static int xenbus_command_reply(struct xenbus_file_priv *u,
 				unsigned int msg_type, const char *reply)
 {
@@ -317,6 +419,9 @@ static int xenbus_command_reply(struct xenbus_file_priv *u,
 	wake_up(&u->read_waitq);
 	mutex_unlock(&u->reply_mutex);
 
+	if (!rc)
+		kref_put(&u->kref, xenbus_file_free);
+
 	return rc;
 }
 
@@ -324,57 +429,22 @@ static int xenbus_write_transaction(unsigned msg_type,
 				    struct xenbus_file_priv *u)
 {
 	int rc;
-	void *reply;
 	struct xenbus_transaction_holder *trans = NULL;
-	LIST_HEAD(staging_q);
 
 	if (msg_type == XS_TRANSACTION_START) {
-		trans = kmalloc(sizeof(*trans), GFP_KERNEL);
+		trans = kzalloc(sizeof(*trans), GFP_KERNEL);
 		if (!trans) {
 			rc = -ENOMEM;
 			goto out;
 		}
-	} else if (u->u.msg.tx_id != 0) {
-		list_for_each_entry(trans, &u->transactions, list)
-			if (trans->handle.id == u->u.msg.tx_id)
-				break;
-		if (&trans->list == &u->transactions)
-			return xenbus_command_reply(u, XS_ERROR, "ENOENT");
-	}
+		list_add(&trans->list, &u->transactions);
+	} else if (u->u.msg.tx_id != 0 &&
+		   !xenbus_get_transaction(u, u->u.msg.tx_id))
+		return xenbus_command_reply(u, XS_ERROR, "ENOENT");
 
-	reply = xenbus_dev_request_and_reply(&u->u.msg);
-	if (IS_ERR(reply)) {
-		if (msg_type == XS_TRANSACTION_START)
-			kfree(trans);
-		rc = PTR_ERR(reply);
-		goto out;
-	}
-
-	if (msg_type == XS_TRANSACTION_START) {
-		if (u->u.msg.type == XS_ERROR)
-			kfree(trans);
-		else {
-			trans->handle.id = simple_strtoul(reply, NULL, 0);
-			list_add(&trans->list, &u->transactions);
-		}
-	} else if (u->u.msg.type == XS_TRANSACTION_END) {
-		list_del(&trans->list);
+	rc = xenbus_dev_request_and_reply(&u->u.msg, u);
+	if (rc)
 		kfree(trans);
-	}
-
-	mutex_lock(&u->reply_mutex);
-	rc = queue_reply(&staging_q, &u->u.msg, sizeof(u->u.msg));
-	if (!rc)
-		rc = queue_reply(&staging_q, reply, u->u.msg.len);
-	if (!rc) {
-		list_splice_tail(&staging_q, &u->read_buffers);
-		wake_up(&u->read_waitq);
-	} else {
-		queue_cleanup(&staging_q);
-	}
-	mutex_unlock(&u->reply_mutex);
-
-	kfree(reply);
 
 out:
 	return rc;
@@ -506,6 +576,8 @@ static ssize_t xenbus_file_write(struct file *filp,
 	 * OK, now we have a complete message.  Do something with it.
 	 */
 
+	kref_get(&u->kref);
+
 	msg_type = u->u.msg.type;
 
 	switch (msg_type) {
@@ -520,8 +592,10 @@ static ssize_t xenbus_file_write(struct file *filp,
 		ret = xenbus_write_transaction(msg_type, u);
 		break;
 	}
-	if (ret != 0)
+	if (ret != 0) {
 		rc = ret;
+		kref_put(&u->kref, xenbus_file_free);
+	}
 
 	/* Buffered message consumed */
 	u->len = 0;
@@ -546,6 +620,8 @@ static int xenbus_file_open(struct inode *inode, struct file *filp)
 	if (u == NULL)
 		return -ENOMEM;
 
+	kref_init(&u->kref);
+
 	INIT_LIST_HEAD(&u->transactions);
 	INIT_LIST_HEAD(&u->watches);
 	INIT_LIST_HEAD(&u->read_buffers);
@@ -562,32 +638,8 @@ static int xenbus_file_open(struct inode *inode, struct file *filp)
 static int xenbus_file_release(struct inode *inode, struct file *filp)
 {
 	struct xenbus_file_priv *u = filp->private_data;
-	struct xenbus_transaction_holder *trans, *tmp;
-	struct watch_adapter *watch, *tmp_watch;
-	struct read_buffer *rb, *tmp_rb;
 
-	/*
-	 * No need for locking here because there are no other users,
-	 * by definition.
-	 */
-
-	list_for_each_entry_safe(trans, tmp, &u->transactions, list) {
-		xenbus_transaction_end(trans->handle, 1);
-		list_del(&trans->list);
-		kfree(trans);
-	}
-
-	list_for_each_entry_safe(watch, tmp_watch, &u->watches, list) {
-		unregister_xenbus_watch(&watch->watch);
-		list_del(&watch->list);
-		free_watch_adapter(watch);
-	}
-
-	list_for_each_entry_safe(rb, tmp_rb, &u->read_buffers, list) {
-		list_del(&rb->list);
-		kfree(rb);
-	}
-	kfree(u);
+	kref_put(&u->kref, xenbus_file_free);
 
 	return 0;
 }
diff --git a/drivers/xen/xenbus/xenbus_xs.c b/drivers/xen/xenbus/xenbus_xs.c
index ebc768f..ebdfbee 100644
--- a/drivers/xen/xenbus/xenbus_xs.c
+++ b/drivers/xen/xenbus/xenbus_xs.c
@@ -43,6 +43,7 @@
 #include <linux/slab.h>
 #include <linux/fcntl.h>
 #include <linux/kthread.h>
+#include <linux/reboot.h>
 #include <linux/rwsem.h>
 #include <linux/mutex.h>
 #include <asm/xen/hypervisor.h>
@@ -50,61 +51,27 @@
 #include <xen/xen.h>
 #include "xenbus.h"
 
-struct xs_stored_msg {
-	struct list_head list;
-
-	struct xsd_sockmsg hdr;
-
-	union {
-		/* Queued replies. */
-		struct {
-			char *body;
-		} reply;
-
-		/* Queued watch events. */
-		struct {
-			struct xenbus_watch *handle;
-			const char *path;
-			const char *token;
-		} watch;
-	} u;
-};
-
-struct xs_handle {
-	/* A list of replies. Currently only one will ever be outstanding. */
-	struct list_head reply_list;
-	spinlock_t reply_lock;
-	wait_queue_head_t reply_waitq;
-
-	/*
-	 * Mutex ordering: transaction_mutex -> watch_mutex -> request_mutex.
-	 * response_mutex is never taken simultaneously with the other three.
-	 *
-	 * transaction_mutex must be held before incrementing
-	 * transaction_count. The mutex is held when a suspend is in
-	 * progress to prevent new transactions starting.
-	 *
-	 * When decrementing transaction_count to zero the wait queue
-	 * should be woken up, the suspend code waits for count to
-	 * reach zero.
-	 */
-
-	/* One request at a time. */
-	struct mutex request_mutex;
-
-	/* Protect xenbus reader thread against save/restore. */
-	struct mutex response_mutex;
-
-	/* Protect transactions against save/restore. */
-	struct mutex transaction_mutex;
-	atomic_t transaction_count;
-	wait_queue_head_t transaction_wq;
-
-	/* Protect watch (de)register against save/restore. */
-	struct rw_semaphore watch_mutex;
-};
-
-static struct xs_handle xs_state;
+/*
+ * Framework to protect suspend/resume handling against normal Xenstore
+ * message handling:
+ * During suspend/resume there must be no open transaction and no pending
+ * Xenstore request.
+ * New watch events happening in this time can be ignored by firing all watches
+ * after resume.
+ */
+/* Lock protecting enter/exit critical region. */
+static DEFINE_SPINLOCK(xs_state_lock);
+/* Wait queue for all callers waiting for critical region to become usable. */
+static DECLARE_WAIT_QUEUE_HEAD(xs_state_enter_wq);
+/* Wait queue for suspend handling waiting for critical region being empty. */
+static DECLARE_WAIT_QUEUE_HEAD(xs_state_exit_wq);
+/* Number of users in critical region. */
+static unsigned int xs_state_users;
+/* Suspend handler waiting or already active? */
+static int xs_suspend_active;
+
+/* Unique Xenstore request id. */
+static uint32_t xs_request_id;
 
 /* List of registered watches, and a lock to protect it. */
 static LIST_HEAD(watches);
@@ -114,6 +81,9 @@ static DEFINE_SPINLOCK(watches_lock);
 static LIST_HEAD(watch_events);
 static DEFINE_SPINLOCK(watch_events_lock);
 
+/* Protect watch (de)register against save/restore. */
+static DECLARE_RWSEM(xs_watch_rwsem);
+
 /*
  * Details of the xenwatch callback kernel thread. The thread waits on the
  * watch_events_waitq for work to do (queued on watch_events list). When it
@@ -124,6 +94,57 @@ static pid_t xenwatch_pid;
 static DEFINE_MUTEX(xenwatch_mutex);
 static DECLARE_WAIT_QUEUE_HEAD(watch_events_waitq);
 
+static void xs_suspend_enter(void)
+{
+	spin_lock(&xs_state_lock);
+	xs_suspend_active++;
+	spin_unlock(&xs_state_lock);
+	wait_event(xs_state_exit_wq, xs_state_users == 0);
+}
+
+static void xs_suspend_exit(void)
+{
+	spin_lock(&xs_state_lock);
+	xs_suspend_active--;
+	spin_unlock(&xs_state_lock);
+	wake_up_all(&xs_state_enter_wq);
+}
+
+static void xs_request_enter(struct xb_req_data *req)
+{
+	req->type = req->msg.type;
+
+	spin_lock(&xs_state_lock);
+	for (;;) {
+		if (req->msg.tx_id != 0)
+			break;
+		if (xs_suspend_active) {
+			spin_unlock(&xs_state_lock);
+			wait_event(xs_state_enter_wq, xs_suspend_active == 0);
+			spin_lock(&xs_state_lock);
+			continue;
+		}
+		if (req->type == XS_TRANSACTION_START)
+			xs_state_users++;
+		break;
+	}
+	xs_state_users++;
+	spin_unlock(&xs_state_lock);
+}
+
+void xs_request_exit(struct xb_req_data *req)
+{
+	spin_lock(&xs_state_lock);
+	xs_state_users--;
+	if ((req->type == XS_TRANSACTION_START && req->msg.type == XS_ERROR) ||
+	    req->msg.type == XS_TRANSACTION_END)
+		xs_state_users--;
+	spin_unlock(&xs_state_lock);
+
+	if (xs_suspend_active && !xs_state_users)
+		wake_up(&xs_state_exit_wq);
+}
+
 static int get_error(const char *errorstring)
 {
 	unsigned int i;
@@ -161,21 +182,24 @@ static bool xenbus_ok(void)
 	}
 	return false;
 }
-static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len)
+
+static bool test_reply(struct xb_req_data *req)
+{
+	if (req->state == xb_req_state_got_reply || !xenbus_ok())
+		return true;
+
+	/* Make sure to reread req->state each time. */
+	cpu_relax();
+
+	return false;
+}
+
+static void *read_reply(struct xb_req_data *req)
 {
-	struct xs_stored_msg *msg;
-	char *body;
-
-	spin_lock(&xs_state.reply_lock);
-
-	while (list_empty(&xs_state.reply_list)) {
-		spin_unlock(&xs_state.reply_lock);
-		if (xenbus_ok())
-			/* XXX FIXME: Avoid synchronous wait for response here. */
-			wait_event_timeout(xs_state.reply_waitq,
-					   !list_empty(&xs_state.reply_list),
-					   msecs_to_jiffies(500));
-		else {
+	while (req->state != xb_req_state_got_reply) {
+		wait_event(req->wq, test_reply(req));
+
+		if (!xenbus_ok())
 			/*
 			 * If we are in the process of being shut-down there is
 			 * no point of trying to contact XenBus - it is either
@@ -183,77 +207,85 @@ static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len)
 			 * has been killed or is unreachable.
 			 */
 			return ERR_PTR(-EIO);
-		}
-		spin_lock(&xs_state.reply_lock);
+		if (req->err)
+			return ERR_PTR(req->err);
+
 	}
 
-	msg = list_entry(xs_state.reply_list.next,
-			 struct xs_stored_msg, list);
-	list_del(&msg->list);
-
-	spin_unlock(&xs_state.reply_lock);
-
-	*type = msg->hdr.type;
-	if (len)
-		*len = msg->hdr.len;
-	body = msg->u.reply.body;
-
-	kfree(msg);
-
-	return body;
+	return req->body;
 }
 
-static void transaction_start(void)
+static void xs_send(struct xb_req_data *req, struct xsd_sockmsg *msg)
 {
-	mutex_lock(&xs_state.transaction_mutex);
-	atomic_inc(&xs_state.transaction_count);
-	mutex_unlock(&xs_state.transaction_mutex);
-}
+	bool notify;
 
-static void transaction_end(void)
-{
-	if (atomic_dec_and_test(&xs_state.transaction_count))
-		wake_up(&xs_state.transaction_wq);
-}
+	req->msg = *msg;
+	req->err = 0;
+	req->state = xb_req_state_queued;
+	init_waitqueue_head(&req->wq);
 
-static void transaction_suspend(void)
-{
-	mutex_lock(&xs_state.transaction_mutex);
-	wait_event(xs_state.transaction_wq,
-		   atomic_read(&xs_state.transaction_count) == 0);
-}
+	xs_request_enter(req);
 
-static void transaction_resume(void)
-{
-	mutex_unlock(&xs_state.transaction_mutex);
+	req->msg.req_id = xs_request_id++;
+
+	mutex_lock(&xb_write_mutex);
+	list_add_tail(&req->list, &xb_write_list);
+	notify = list_is_singular(&xb_write_list);
+	mutex_unlock(&xb_write_mutex);
+
+	if (notify)
+		wake_up(&xb_waitq);
 }
 
-void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg)
+static void *xs_wait_for_reply(struct xb_req_data *req, struct xsd_sockmsg *msg)
 {
 	void *ret;
-	enum xsd_sockmsg_type type = msg->type;
-	int err;
 
-	if (type == XS_TRANSACTION_START)
-		transaction_start();
+	ret = read_reply(req);
 
-	mutex_lock(&xs_state.request_mutex);
+	xs_request_exit(req);
 
-	err = xb_write(msg, sizeof(*msg) + msg->len);
-	if (err) {
-		msg->type = XS_ERROR;
-		ret = ERR_PTR(err);
-	} else
-		ret = read_reply(&msg->type, &msg->len);
+	msg->type = req->msg.type;
+	msg->len = req->msg.len;
 
-	mutex_unlock(&xs_state.request_mutex);
-
-	if ((msg->type == XS_TRANSACTION_END) ||
-	    ((type == XS_TRANSACTION_START) && (msg->type == XS_ERROR)))
-		transaction_end();
+	mutex_lock(&xb_write_mutex);
+	if (req->state == xb_req_state_queued ||
+	    req->state == xb_req_state_wait_reply)
+		req->state = xb_req_state_aborted;
+	else
+		kfree(req);
+	mutex_unlock(&xb_write_mutex);
 
 	return ret;
 }
+
+static void xs_wake_up(struct xb_req_data *req)
+{
+	wake_up(&req->wq);
+}
+
+int xenbus_dev_request_and_reply(struct xsd_sockmsg *msg, void *par)
+{
+	struct xb_req_data *req;
+	struct kvec *vec;
+
+	req = kmalloc(sizeof(*req) + sizeof(*vec), GFP_KERNEL);
+	if (!req)
+		return -ENOMEM;
+
+	vec = (struct kvec *)(req + 1);
+	vec->iov_len = msg->len;
+	vec->iov_base = msg + 1;
+
+	req->vec = vec;
+	req->num_vecs = 1;
+	req->cb = xenbus_dev_queue_reply;
+	req->par = par;
+
+	xs_send(req, msg);
+
+	return 0;
+}
 EXPORT_SYMBOL(xenbus_dev_request_and_reply);
 
 /* Send message to xs, get kmalloc'ed reply.  ERR_PTR() on error. */
@@ -263,11 +295,20 @@ static void *xs_talkv(struct xenbus_transaction t,
 		      unsigned int num_vecs,
 		      unsigned int *len)
 {
+	struct xb_req_data *req;
 	struct xsd_sockmsg msg;
 	void *ret = NULL;
 	unsigned int i;
 	int err;
 
+	req = kmalloc(sizeof(*req), GFP_NOIO | __GFP_HIGH);
+	if (!req)
+		return ERR_PTR(-ENOMEM);
+
+	req->vec = iovec;
+	req->num_vecs = num_vecs;
+	req->cb = xs_wake_up;
+
 	msg.tx_id = t.id;
 	msg.req_id = 0;
 	msg.type = type;
@@ -275,25 +316,11 @@ static void *xs_talkv(struct xenbus_transaction t,
 	for (i = 0; i < num_vecs; i++)
 		msg.len += iovec[i].iov_len;
 
-	mutex_lock(&xs_state.request_mutex);
+	xs_send(req, &msg);
 
-	err = xb_write(&msg, sizeof(msg));
-	if (err) {
-		mutex_unlock(&xs_state.request_mutex);
-		return ERR_PTR(err);
-	}
-
-	for (i = 0; i < num_vecs; i++) {
-		err = xb_write(iovec[i].iov_base, iovec[i].iov_len);
-		if (err) {
-			mutex_unlock(&xs_state.request_mutex);
-			return ERR_PTR(err);
-		}
-	}
-
-	ret = read_reply(&msg.type, len);
-
-	mutex_unlock(&xs_state.request_mutex);
+	ret = xs_wait_for_reply(req, &msg);
+	if (len)
+		*len = msg.len;
 
 	if (IS_ERR(ret))
 		return ret;
@@ -500,13 +527,9 @@ int xenbus_transaction_start(struct xenbus_transaction *t)
 {
 	char *id_str;
 
-	transaction_start();
-
 	id_str = xs_single(XBT_NIL, XS_TRANSACTION_START, "", NULL);
-	if (IS_ERR(id_str)) {
-		transaction_end();
+	if (IS_ERR(id_str))
 		return PTR_ERR(id_str);
-	}
 
 	t->id = simple_strtoul(id_str, NULL, 0);
 	kfree(id_str);
@@ -520,18 +543,13 @@ EXPORT_SYMBOL_GPL(xenbus_transaction_start);
 int xenbus_transaction_end(struct xenbus_transaction t, int abort)
 {
 	char abortstr[2];
-	int err;
 
 	if (abort)
 		strcpy(abortstr, "F");
 	else
 		strcpy(abortstr, "T");
 
-	err = xs_error(xs_single(t, XS_TRANSACTION_END, abortstr, NULL));
-
-	transaction_end();
-
-	return err;
+	return xs_error(xs_single(t, XS_TRANSACTION_END, abortstr, NULL));
 }
 EXPORT_SYMBOL_GPL(xenbus_transaction_end);
 
@@ -664,6 +682,30 @@ static struct xenbus_watch *find_watch(const char *token)
 
 	return NULL;
 }
+
+int xs_watch_msg(struct xs_watch_event *event)
+{
+	if (count_strings(event->body, event->len) != 2) {
+		kfree(event);
+		return -EINVAL;
+	}
+	event->path = (const char *)event->body;
+	event->token = (const char *)strchr(event->body, '\0') + 1;
+
+	spin_lock(&watches_lock);
+	event->handle = find_watch(event->token);
+	if (event->handle != NULL) {
+		spin_lock(&watch_events_lock);
+		list_add_tail(&event->list, &watch_events);
+		wake_up(&watch_events_waitq);
+		spin_unlock(&watch_events_lock);
+	} else
+		kfree(event);
+	spin_unlock(&watches_lock);
+
+	return 0;
+}
+
 /*
  * Certain older XenBus toolstack cannot handle reading values that are
  * not populated. Some Xen 3.4 installation are incapable of doing this
@@ -712,7 +754,7 @@ int register_xenbus_watch(struct xenbus_watch *watch)
 
 	sprintf(token, "%lX", (long)watch);
 
-	down_read(&xs_state.watch_mutex);
+	down_read(&xs_watch_rwsem);
 
 	spin_lock(&watches_lock);
 	BUG_ON(find_watch(token));
@@ -727,7 +769,7 @@ int register_xenbus_watch(struct xenbus_watch *watch)
 		spin_unlock(&watches_lock);
 	}
 
-	up_read(&xs_state.watch_mutex);
+	up_read(&xs_watch_rwsem);
 
 	return err;
 }
@@ -735,13 +777,13 @@ EXPORT_SYMBOL_GPL(register_xenbus_watch);
 
 void unregister_xenbus_watch(struct xenbus_watch *watch)
 {
-	struct xs_stored_msg *msg, *tmp;
+	struct xs_watch_event *event, *tmp;
 	char token[sizeof(watch) * 2 + 1];
 	int err;
 
 	sprintf(token, "%lX", (long)watch);
 
-	down_read(&xs_state.watch_mutex);
+	down_read(&xs_watch_rwsem);
 
 	spin_lock(&watches_lock);
 	BUG_ON(!find_watch(token));
@@ -752,7 +794,7 @@ void unregister_xenbus_watch(struct xenbus_watch *watch)
 	if (err)
 		pr_warn("Failed to release watch %s: %i\n", watch->node, err);
 
-	up_read(&xs_state.watch_mutex);
+	up_read(&xs_watch_rwsem);
 
 	/* Make sure there are no callbacks running currently (unless
 	   its us) */
@@ -761,12 +803,11 @@ void unregister_xenbus_watch(struct xenbus_watch *watch)
 
 	/* Cancel pending watch events. */
 	spin_lock(&watch_events_lock);
-	list_for_each_entry_safe(msg, tmp, &watch_events, list) {
-		if (msg->u.watch.handle != watch)
+	list_for_each_entry_safe(event, tmp, &watch_events, list) {
+		if (event->handle != watch)
 			continue;
-		list_del(&msg->list);
-		kfree(msg->u.watch.path);
-		kfree(msg);
+		list_del(&event->list);
+		kfree(event);
 	}
 	spin_unlock(&watch_events_lock);
 
@@ -777,10 +818,10 @@ EXPORT_SYMBOL_GPL(unregister_xenbus_watch);
 
 void xs_suspend(void)
 {
-	transaction_suspend();
-	down_write(&xs_state.watch_mutex);
-	mutex_lock(&xs_state.request_mutex);
-	mutex_lock(&xs_state.response_mutex);
+	xs_suspend_enter();
+
+	down_write(&xs_watch_rwsem);
+	mutex_lock(&xs_response_mutex);
 }
 
 void xs_resume(void)
@@ -790,31 +831,31 @@ void xs_resume(void)
 
 	xb_init_comms();
 
-	mutex_unlock(&xs_state.response_mutex);
-	mutex_unlock(&xs_state.request_mutex);
-	transaction_resume();
+	mutex_unlock(&xs_response_mutex);
 
-	/* No need for watches_lock: the watch_mutex is sufficient. */
+	xs_suspend_exit();
+
+	/* No need for watches_lock: the xs_watch_rwsem is sufficient. */
 	list_for_each_entry(watch, &watches, list) {
 		sprintf(token, "%lX", (long)watch);
 		xs_watch(watch->node, token);
 	}
 
-	up_write(&xs_state.watch_mutex);
+	up_write(&xs_watch_rwsem);
 }
 
 void xs_suspend_cancel(void)
 {
-	mutex_unlock(&xs_state.response_mutex);
-	mutex_unlock(&xs_state.request_mutex);
-	up_write(&xs_state.watch_mutex);
-	mutex_unlock(&xs_state.transaction_mutex);
+	mutex_unlock(&xs_response_mutex);
+	up_write(&xs_watch_rwsem);
+
+	xs_suspend_exit();
 }
 
 static int xenwatch_thread(void *unused)
 {
 	struct list_head *ent;
-	struct xs_stored_msg *msg;
+	struct xs_watch_event *event;
 
 	for (;;) {
 		wait_event_interruptible(watch_events_waitq,
@@ -832,12 +873,10 @@ static int xenwatch_thread(void *unused)
 		spin_unlock(&watch_events_lock);
 
 		if (ent != &watch_events) {
-			msg = list_entry(ent, struct xs_stored_msg, list);
-			msg->u.watch.handle->callback(msg->u.watch.handle,
-						      msg->u.watch.path,
-						      msg->u.watch.token);
-			kfree(msg->u.watch.path);
-			kfree(msg);
+			event = list_entry(ent, struct xs_watch_event, list);
+			event->handle->callback(event->handle, event->path,
+						event->token);
+			kfree(event);
 		}
 
 		mutex_unlock(&xenwatch_mutex);
@@ -846,126 +885,30 @@ static int xenwatch_thread(void *unused)
 	return 0;
 }
 
-static int process_msg(void)
+static int xs_reboot_notify(struct notifier_block *nb,
+			    unsigned long code, void *unused)
 {
-	struct xs_stored_msg *msg;
-	char *body;
-	int err;
-
-	/*
-	 * We must disallow save/restore while reading a xenstore message.
-	 * A partial read across s/r leaves us out of sync with xenstored.
-	 */
-	for (;;) {
-		err = xb_wait_for_data_to_read();
-		if (err)
-			return err;
-		mutex_lock(&xs_state.response_mutex);
-		if (xb_data_to_read())
-			break;
-		/* We raced with save/restore: pending data 'disappeared'. */
-		mutex_unlock(&xs_state.response_mutex);
-	}
-
-
-	msg = kmalloc(sizeof(*msg), GFP_NOIO | __GFP_HIGH);
-	if (msg == NULL) {
-		err = -ENOMEM;
-		goto out;
-	}
-
-	err = xb_read(&msg->hdr, sizeof(msg->hdr));
-	if (err) {
-		kfree(msg);
-		goto out;
-	}
-
-	if (msg->hdr.len > XENSTORE_PAYLOAD_MAX) {
-		kfree(msg);
-		err = -EINVAL;
-		goto out;
-	}
-
-	body = kmalloc(msg->hdr.len + 1, GFP_NOIO | __GFP_HIGH);
-	if (body == NULL) {
-		kfree(msg);
-		err = -ENOMEM;
-		goto out;
-	}
-
-	err = xb_read(body, msg->hdr.len);
-	if (err) {
-		kfree(body);
-		kfree(msg);
-		goto out;
-	}
-	body[msg->hdr.len] = '\0';
-
-	if (msg->hdr.type == XS_WATCH_EVENT) {
-		if (count_strings(body, msg->hdr.len) != 2) {
-			err = -EINVAL;
-			kfree(msg);
-			kfree(body);
-			goto out;
-		}
-		msg->u.watch.path = (const char *)body;
-		msg->u.watch.token = (const char *)strchr(body, '\0') + 1;
-
-		spin_lock(&watches_lock);
-		msg->u.watch.handle = find_watch(msg->u.watch.token);
-		if (msg->u.watch.handle != NULL) {
-			spin_lock(&watch_events_lock);
-			list_add_tail(&msg->list, &watch_events);
-			wake_up(&watch_events_waitq);
-			spin_unlock(&watch_events_lock);
-		} else {
-			kfree(body);
-			kfree(msg);
-		}
-		spin_unlock(&watches_lock);
-	} else {
-		msg->u.reply.body = body;
-		spin_lock(&xs_state.reply_lock);
-		list_add_tail(&msg->list, &xs_state.reply_list);
-		spin_unlock(&xs_state.reply_lock);
-		wake_up(&xs_state.reply_waitq);
-	}
-
- out:
-	mutex_unlock(&xs_state.response_mutex);
-	return err;
+	struct xb_req_data *req;
+
+	mutex_lock(&xb_write_mutex);
+	list_for_each_entry(req, &xs_reply_list, list)
+		wake_up(&req->wq);
+	list_for_each_entry(req, &xb_write_list, list)
+		wake_up(&req->wq);
+	mutex_unlock(&xb_write_mutex);
+	return NOTIFY_DONE;
 }
 
-static int xenbus_thread(void *unused)
-{
-	int err;
-
-	for (;;) {
-		err = process_msg();
-		if (err)
-			pr_warn("error %d while reading message\n", err);
-		if (kthread_should_stop())
-			break;
-	}
-
-	return 0;
-}
+static struct notifier_block xs_reboot_nb = {
+	.notifier_call = xs_reboot_notify,
+};
 
 int xs_init(void)
 {
 	int err;
 	struct task_struct *task;
 
-	INIT_LIST_HEAD(&xs_state.reply_list);
-	spin_lock_init(&xs_state.reply_lock);
-	init_waitqueue_head(&xs_state.reply_waitq);
-
-	mutex_init(&xs_state.request_mutex);
-	mutex_init(&xs_state.response_mutex);
-	mutex_init(&xs_state.transaction_mutex);
-	init_rwsem(&xs_state.watch_mutex);
-	atomic_set(&xs_state.transaction_count, 0);
-	init_waitqueue_head(&xs_state.transaction_wq);
+	register_reboot_notifier(&xs_reboot_nb);
 
 	/* Initialize the shared memory rings to talk to xenstored */
 	err = xb_init_comms();
@@ -977,10 +920,6 @@ int xs_init(void)
 		return PTR_ERR(task);
 	xenwatch_pid = task->pid;
 
-	task = kthread_run(xenbus_thread, NULL, "xenbus");
-	if (IS_ERR(task))
-		return PTR_ERR(task);
-
 	/* shutdown watches for kexec boot */
 	xs_reset_watches();
 
-- 
2.10.2

^ permalink raw reply related	[flat|nested] 21+ messages in thread

* RE: [PATCH 2/3] xen: modify xenstore watch event interface
  2017-01-06 15:05 ` [PATCH 2/3] xen: modify xenstore watch event interface Juergen Gross
@ 2017-01-06 15:38   ` Paul Durrant
  2017-01-06 16:29   ` Wei Liu
                     ` (2 subsequent siblings)
  3 siblings, 0 replies; 21+ messages in thread
From: Paul Durrant @ 2017-01-06 15:38 UTC (permalink / raw)
  To: Juergen Gross, linux-kernel, xen-devel
  Cc: boris.ostrovsky, konrad.wilk, Roger Pau Monne, Wei Liu, netdev

> -----Original Message-----
> From: Juergen Gross [mailto:jgross@suse.com]
> Sent: 06 January 2017 15:06
> To: linux-kernel@vger.kernel.org; xen-devel@lists.xenproject.org
> Cc: boris.ostrovsky@oracle.com; Juergen Gross <jgross@suse.com>;
> konrad.wilk@oracle.com; Roger Pau Monne <roger.pau@citrix.com>; Wei Liu
> <wei.liu2@citrix.com>; Paul Durrant <Paul.Durrant@citrix.com>;
> netdev@vger.kernel.org
> Subject: [PATCH 2/3] xen: modify xenstore watch event interface
> 
> Today a Xenstore watch event is delivered via a callback function
> declared as:
> 
> void (*callback)(struct xenbus_watch *,
>                  const char **vec, unsigned int len);
> 
> As all watch events only ever come with two parameters (path and token)
> changing the prototype to:
> 
> void (*callback)(struct xenbus_watch *,
>                  const char *path, const char *token);
> 
> is the natural thing to do.
> 
> Apply this change and adapt all users.
> 
> Cc: konrad.wilk@oracle.com
> Cc: roger.pau@citrix.com
> Cc: wei.liu2@citrix.com
> Cc: paul.durrant@citrix.com
> Cc: netdev@vger.kernel.org
> 
> Signed-off-by: Juergen Gross <jgross@suse.com>

xen-netback changes...

Reviewed-by: Paul Durrant <paul.durrant@citrix.com>

> ---
>  drivers/block/xen-blkback/xenbus.c         |  6 +++---
>  drivers/net/xen-netback/xenbus.c           |  8 ++++----
>  drivers/xen/cpu_hotplug.c                  |  5 ++---
>  drivers/xen/manage.c                       |  6 +++---
>  drivers/xen/xen-balloon.c                  |  2 +-
>  drivers/xen/xen-pciback/xenbus.c           |  2 +-
>  drivers/xen/xenbus/xenbus.h                |  6 +++---
>  drivers/xen/xenbus/xenbus_client.c         |  4 ++--
>  drivers/xen/xenbus/xenbus_dev_frontend.c   | 21 ++++++++-------------
>  drivers/xen/xenbus/xenbus_probe.c          | 11 ++++-------
>  drivers/xen/xenbus/xenbus_probe_backend.c  |  8 ++++----
>  drivers/xen/xenbus/xenbus_probe_frontend.c | 14 +++++++-------
>  drivers/xen/xenbus/xenbus_xs.c             | 29 ++++++++++++++---------------
>  include/xen/xenbus.h                       |  6 +++---
>  14 files changed, 59 insertions(+), 69 deletions(-)
> 
> diff --git a/drivers/block/xen-blkback/xenbus.c b/drivers/block/xen-
> blkback/xenbus.c
> index 415e79b..8fe61b5 100644
> --- a/drivers/block/xen-blkback/xenbus.c
> +++ b/drivers/block/xen-blkback/xenbus.c
> @@ -38,8 +38,8 @@ struct backend_info {
>  static struct kmem_cache *xen_blkif_cachep;
>  static void connect(struct backend_info *);
>  static int connect_ring(struct backend_info *);
> -static void backend_changed(struct xenbus_watch *, const char **,
> -			    unsigned int);
> +static void backend_changed(struct xenbus_watch *, const char *,
> +			    const char *);
>  static void xen_blkif_free(struct xen_blkif *blkif);
>  static void xen_vbd_free(struct xen_vbd *vbd);
> 
> @@ -661,7 +661,7 @@ static int xen_blkbk_probe(struct xenbus_device
> *dev,
>   * ready, connect.
>   */
>  static void backend_changed(struct xenbus_watch *watch,
> -			    const char **vec, unsigned int len)
> +			    const char *path, const char *token)
>  {
>  	int err;
>  	unsigned major;
> diff --git a/drivers/net/xen-netback/xenbus.c b/drivers/net/xen-
> netback/xenbus.c
> index 3124eae..d8a40fa 100644
> --- a/drivers/net/xen-netback/xenbus.c
> +++ b/drivers/net/xen-netback/xenbus.c
> @@ -723,7 +723,7 @@ static int xen_net_read_mac(struct xenbus_device
> *dev, u8 mac[])
>  }
> 
>  static void xen_net_rate_changed(struct xenbus_watch *watch,
> -				const char **vec, unsigned int len)
> +				 const char *path, const char *token)
>  {
>  	struct xenvif *vif = container_of(watch, struct xenvif, credit_watch);
>  	struct xenbus_device *dev = xenvif_to_xenbus_device(vif);
> @@ -780,7 +780,7 @@ static void xen_unregister_credit_watch(struct xenvif
> *vif)
>  }
> 
>  static void xen_mcast_ctrl_changed(struct xenbus_watch *watch,
> -				   const char **vec, unsigned int len)
> +				   const char *path, const char *token)
>  {
>  	struct xenvif *vif = container_of(watch, struct xenvif,
>  					  mcast_ctrl_watch);
> @@ -855,8 +855,8 @@ static void unregister_hotplug_status_watch(struct
> backend_info *be)
>  }
> 
>  static void hotplug_status_changed(struct xenbus_watch *watch,
> -				   const char **vec,
> -				   unsigned int vec_size)
> +				   const char *path,
> +				   const char *token)
>  {
>  	struct backend_info *be = container_of(watch,
>  					       struct backend_info,
> diff --git a/drivers/xen/cpu_hotplug.c b/drivers/xen/cpu_hotplug.c
> index 5676aef..7a4daa2 100644
> --- a/drivers/xen/cpu_hotplug.c
> +++ b/drivers/xen/cpu_hotplug.c
> @@ -68,13 +68,12 @@ static void vcpu_hotplug(unsigned int cpu)
>  }
> 
>  static void handle_vcpu_hotplug_event(struct xenbus_watch *watch,
> -					const char **vec, unsigned int len)
> +				      const char *path, const char *token)
>  {
>  	unsigned int cpu;
>  	char *cpustr;
> -	const char *node = vec[XS_WATCH_PATH];
> 
> -	cpustr = strstr(node, "cpu/");
> +	cpustr = strstr(path, "cpu/");
>  	if (cpustr != NULL) {
>  		sscanf(cpustr, "cpu/%u", &cpu);
>  		vcpu_hotplug(cpu);
> diff --git a/drivers/xen/manage.c b/drivers/xen/manage.c
> index 26e5e85..ca62c09 100644
> --- a/drivers/xen/manage.c
> +++ b/drivers/xen/manage.c
> @@ -218,7 +218,7 @@ static struct shutdown_handler shutdown_handlers[]
> = {
>  };
> 
>  static void shutdown_handler(struct xenbus_watch *watch,
> -			     const char **vec, unsigned int len)
> +			     const char *path, const char *token)
>  {
>  	char *str;
>  	struct xenbus_transaction xbt;
> @@ -266,8 +266,8 @@ static void shutdown_handler(struct xenbus_watch
> *watch,
>  }
> 
>  #ifdef CONFIG_MAGIC_SYSRQ
> -static void sysrq_handler(struct xenbus_watch *watch, const char **vec,
> -			  unsigned int len)
> +static void sysrq_handler(struct xenbus_watch *watch, const char *path,
> +			  const char *token)
>  {
>  	char sysrq_key = '\0';
>  	struct xenbus_transaction xbt;
> diff --git a/drivers/xen/xen-balloon.c b/drivers/xen/xen-balloon.c
> index 79865b8..e7715cb 100644
> --- a/drivers/xen/xen-balloon.c
> +++ b/drivers/xen/xen-balloon.c
> @@ -55,7 +55,7 @@ static int register_balloon(struct device *dev);
> 
>  /* React to a change in the target key */
>  static void watch_target(struct xenbus_watch *watch,
> -			 const char **vec, unsigned int len)
> +			 const char *path, const char *token)
>  {
>  	unsigned long long new_target;
>  	int err;
> diff --git a/drivers/xen/xen-pciback/xenbus.c b/drivers/xen/xen-
> pciback/xenbus.c
> index 3f0aee0..3814b44 100644
> --- a/drivers/xen/xen-pciback/xenbus.c
> +++ b/drivers/xen/xen-pciback/xenbus.c
> @@ -652,7 +652,7 @@ static int xen_pcibk_setup_backend(struct
> xen_pcibk_device *pdev)
>  }
> 
>  static void xen_pcibk_be_watch(struct xenbus_watch *watch,
> -			     const char **vec, unsigned int len)
> +			       const char *path, const char *token)
>  {
>  	struct xen_pcibk_device *pdev =
>  	    container_of(watch, struct xen_pcibk_device, be_watch);
> diff --git a/drivers/xen/xenbus/xenbus.h b/drivers/xen/xenbus/xenbus.h
> index 6a80c1e..bd95c21 100644
> --- a/drivers/xen/xenbus/xenbus.h
> +++ b/drivers/xen/xenbus/xenbus.h
> @@ -39,8 +39,8 @@ struct xen_bus_type {
>  	int (*get_bus_id)(char bus_id[XEN_BUS_ID_SIZE], const char
> *nodename);
>  	int (*probe)(struct xen_bus_type *bus, const char *type,
>  		     const char *dir);
> -	void (*otherend_changed)(struct xenbus_watch *watch, const char
> **vec,
> -				 unsigned int len);
> +	void (*otherend_changed)(struct xenbus_watch *watch, const char
> *path,
> +				 const char *token);
>  	struct bus_type bus;
>  };
> 
> @@ -83,7 +83,7 @@ int xenbus_dev_resume(struct device *dev);
>  int xenbus_dev_cancel(struct device *dev);
> 
>  void xenbus_otherend_changed(struct xenbus_watch *watch,
> -			     const char **vec, unsigned int len,
> +			     const char *path, const char *token,
>  			     int ignore_on_shutdown);
> 
>  int xenbus_read_otherend_details(struct xenbus_device *xendev,
> diff --git a/drivers/xen/xenbus/xenbus_client.c
> b/drivers/xen/xenbus/xenbus_client.c
> index 23edf53..9586c24 100644
> --- a/drivers/xen/xenbus/xenbus_client.c
> +++ b/drivers/xen/xenbus/xenbus_client.c
> @@ -115,7 +115,7 @@ EXPORT_SYMBOL_GPL(xenbus_strstate);
>  int xenbus_watch_path(struct xenbus_device *dev, const char *path,
>  		      struct xenbus_watch *watch,
>  		      void (*callback)(struct xenbus_watch *,
> -				       const char **, unsigned int))
> +				       const char *, const char *))
>  {
>  	int err;
> 
> @@ -153,7 +153,7 @@ EXPORT_SYMBOL_GPL(xenbus_watch_path);
>  int xenbus_watch_pathfmt(struct xenbus_device *dev,
>  			 struct xenbus_watch *watch,
>  			 void (*callback)(struct xenbus_watch *,
> -					const char **, unsigned int),
> +					  const char *, const char *),
>  			 const char *pathfmt, ...)
>  {
>  	int err;
> diff --git a/drivers/xen/xenbus/xenbus_dev_frontend.c
> b/drivers/xen/xenbus/xenbus_dev_frontend.c
> index e2bc9b3..e4b9847 100644
> --- a/drivers/xen/xenbus/xenbus_dev_frontend.c
> +++ b/drivers/xen/xenbus/xenbus_dev_frontend.c
> @@ -258,26 +258,23 @@ static struct watch_adapter
> *alloc_watch_adapter(const char *path,
>  }
> 
>  static void watch_fired(struct xenbus_watch *watch,
> -			const char **vec,
> -			unsigned int len)
> +			const char *path,
> +			const char *token)
>  {
>  	struct watch_adapter *adap;
>  	struct xsd_sockmsg hdr;
> -	const char *path, *token;
> -	int path_len, tok_len, body_len, data_len = 0;
> +	const char *token_caller;
> +	int path_len, tok_len, body_len;
>  	int ret;
>  	LIST_HEAD(staging_q);
> 
>  	adap = container_of(watch, struct watch_adapter, watch);
> 
> -	path = vec[XS_WATCH_PATH];
> -	token = adap->token;
> +	token_caller = adap->token;
> 
>  	path_len = strlen(path) + 1;
> -	tok_len = strlen(token) + 1;
> -	if (len > 2)
> -		data_len = vec[len] - vec[2] + 1;
> -	body_len = path_len + tok_len + data_len;
> +	tok_len = strlen(token_caller) + 1;
> +	body_len = path_len + tok_len;
> 
>  	hdr.type = XS_WATCH_EVENT;
>  	hdr.len = body_len;
> @@ -288,9 +285,7 @@ static void watch_fired(struct xenbus_watch *watch,
>  	if (!ret)
>  		ret = queue_reply(&staging_q, path, path_len);
>  	if (!ret)
> -		ret = queue_reply(&staging_q, token, tok_len);
> -	if (!ret && len > 2)
> -		ret = queue_reply(&staging_q, vec[2], data_len);
> +		ret = queue_reply(&staging_q, token_caller, tok_len);
> 
>  	if (!ret) {
>  		/* success: pass reply list onto watcher */
> diff --git a/drivers/xen/xenbus/xenbus_probe.c
> b/drivers/xen/xenbus/xenbus_probe.c
> index 6baffbb..74888ca 100644
> --- a/drivers/xen/xenbus/xenbus_probe.c
> +++ b/drivers/xen/xenbus/xenbus_probe.c
> @@ -169,7 +169,7 @@ int xenbus_read_otherend_details(struct
> xenbus_device *xendev,
>  EXPORT_SYMBOL_GPL(xenbus_read_otherend_details);
> 
>  void xenbus_otherend_changed(struct xenbus_watch *watch,
> -			     const char **vec, unsigned int len,
> +			     const char *path, const char *token,
>  			     int ignore_on_shutdown)
>  {
>  	struct xenbus_device *dev =
> @@ -180,18 +180,15 @@ void xenbus_otherend_changed(struct
> xenbus_watch *watch,
>  	/* Protect us against watches firing on old details when the otherend
>  	   details change, say immediately after a resume. */
>  	if (!dev->otherend ||
> -	    strncmp(dev->otherend, vec[XS_WATCH_PATH],
> -		    strlen(dev->otherend))) {
> -		dev_dbg(&dev->dev, "Ignoring watch at %s\n",
> -			vec[XS_WATCH_PATH]);
> +	    strncmp(dev->otherend, path, strlen(dev->otherend))) {
> +		dev_dbg(&dev->dev, "Ignoring watch at %s\n", path);
>  		return;
>  	}
> 
>  	state = xenbus_read_driver_state(dev->otherend);
> 
>  	dev_dbg(&dev->dev, "state is %d, (%s), %s, %s\n",
> -		state, xenbus_strstate(state), dev->otherend_watch.node,
> -		vec[XS_WATCH_PATH]);
> +		state, xenbus_strstate(state), dev->otherend_watch.node,
> path);
> 
>  	/*
>  	 * Ignore xenbus transitions during shutdown. This prevents us doing
> diff --git a/drivers/xen/xenbus/xenbus_probe_backend.c
> b/drivers/xen/xenbus/xenbus_probe_backend.c
> index f46b4dc..b0bed4f 100644
> --- a/drivers/xen/xenbus/xenbus_probe_backend.c
> +++ b/drivers/xen/xenbus/xenbus_probe_backend.c
> @@ -181,9 +181,9 @@ static int xenbus_probe_backend(struct
> xen_bus_type *bus, const char *type,
>  }
> 
>  static void frontend_changed(struct xenbus_watch *watch,
> -			    const char **vec, unsigned int len)
> +			     const char *path, const char *token)
>  {
> -	xenbus_otherend_changed(watch, vec, len, 0);
> +	xenbus_otherend_changed(watch, path, token, 0);
>  }
> 
>  static struct xen_bus_type xenbus_backend = {
> @@ -204,11 +204,11 @@ static struct xen_bus_type xenbus_backend = {
>  };
> 
>  static void backend_changed(struct xenbus_watch *watch,
> -			    const char **vec, unsigned int len)
> +			    const char *path, const char *token)
>  {
>  	DPRINTK("");
> 
> -	xenbus_dev_changed(vec[XS_WATCH_PATH], &xenbus_backend);
> +	xenbus_dev_changed(path, &xenbus_backend);
>  }
> 
>  static struct xenbus_watch be_watch = {
> diff --git a/drivers/xen/xenbus/xenbus_probe_frontend.c
> b/drivers/xen/xenbus/xenbus_probe_frontend.c
> index d7b77a6..19e45ce 100644
> --- a/drivers/xen/xenbus/xenbus_probe_frontend.c
> +++ b/drivers/xen/xenbus/xenbus_probe_frontend.c
> @@ -86,9 +86,9 @@ static int xenbus_uevent_frontend(struct device *_dev,
> 
> 
>  static void backend_changed(struct xenbus_watch *watch,
> -			    const char **vec, unsigned int len)
> +			    const char *path, const char *token)
>  {
> -	xenbus_otherend_changed(watch, vec, len, 1);
> +	xenbus_otherend_changed(watch, path, token, 1);
>  }
> 
>  static void xenbus_frontend_delayed_resume(struct work_struct *w)
> @@ -153,11 +153,11 @@ static struct xen_bus_type xenbus_frontend = {
>  };
> 
>  static void frontend_changed(struct xenbus_watch *watch,
> -			     const char **vec, unsigned int len)
> +			     const char *path, const char *token)
>  {
>  	DPRINTK("");
> 
> -	xenbus_dev_changed(vec[XS_WATCH_PATH], &xenbus_frontend);
> +	xenbus_dev_changed(path, &xenbus_frontend);
>  }
> 
> 
> @@ -332,13 +332,13 @@ static
> DECLARE_WAIT_QUEUE_HEAD(backend_state_wq);
>  static int backend_state;
> 
>  static void xenbus_reset_backend_state_changed(struct xenbus_watch *w,
> -					const char **v, unsigned int l)
> +					const char *path, const char *token)
>  {
> -	if (xenbus_scanf(XBT_NIL, v[XS_WATCH_PATH], "", "%i",
> +	if (xenbus_scanf(XBT_NIL, path, "", "%i",
>  			 &backend_state) != 1)
>  		backend_state = XenbusStateUnknown;
>  	printk(KERN_DEBUG "XENBUS: backend %s %s\n",
> -			v[XS_WATCH_PATH],
> xenbus_strstate(backend_state));
> +	       path, xenbus_strstate(backend_state));
>  	wake_up(&backend_state_wq);
>  }
> 
> diff --git a/drivers/xen/xenbus/xenbus_xs.c
> b/drivers/xen/xenbus/xenbus_xs.c
> index 4c49d87..ebc768f 100644
> --- a/drivers/xen/xenbus/xenbus_xs.c
> +++ b/drivers/xen/xenbus/xenbus_xs.c
> @@ -64,8 +64,8 @@ struct xs_stored_msg {
>  		/* Queued watch events. */
>  		struct {
>  			struct xenbus_watch *handle;
> -			char **vec;
> -			unsigned int vec_size;
> +			const char *path;
> +			const char *token;
>  		} watch;
>  	} u;
>  };
> @@ -765,7 +765,7 @@ void unregister_xenbus_watch(struct xenbus_watch
> *watch)
>  		if (msg->u.watch.handle != watch)
>  			continue;
>  		list_del(&msg->list);
> -		kfree(msg->u.watch.vec);
> +		kfree(msg->u.watch.path);
>  		kfree(msg);
>  	}
>  	spin_unlock(&watch_events_lock);
> @@ -833,11 +833,10 @@ static int xenwatch_thread(void *unused)
> 
>  		if (ent != &watch_events) {
>  			msg = list_entry(ent, struct xs_stored_msg, list);
> -			msg->u.watch.handle->callback(
> -				msg->u.watch.handle,
> -				(const char **)msg->u.watch.vec,
> -				msg->u.watch.vec_size);
> -			kfree(msg->u.watch.vec);
> +			msg->u.watch.handle->callback(msg-
> >u.watch.handle,
> +						      msg->u.watch.path,
> +						      msg->u.watch.token);
> +			kfree(msg->u.watch.path);
>  			kfree(msg);
>  		}
> 
> @@ -903,24 +902,24 @@ static int process_msg(void)
>  	body[msg->hdr.len] = '\0';
> 
>  	if (msg->hdr.type == XS_WATCH_EVENT) {
> -		msg->u.watch.vec = split(body, msg->hdr.len,
> -					 &msg->u.watch.vec_size);
> -		if (IS_ERR(msg->u.watch.vec)) {
> -			err = PTR_ERR(msg->u.watch.vec);
> +		if (count_strings(body, msg->hdr.len) != 2) {
> +			err = -EINVAL;
>  			kfree(msg);
> +			kfree(body);
>  			goto out;
>  		}
> +		msg->u.watch.path = (const char *)body;
> +		msg->u.watch.token = (const char *)strchr(body, '\0') + 1;
> 
>  		spin_lock(&watches_lock);
> -		msg->u.watch.handle = find_watch(
> -			msg->u.watch.vec[XS_WATCH_TOKEN]);
> +		msg->u.watch.handle = find_watch(msg->u.watch.token);
>  		if (msg->u.watch.handle != NULL) {
>  			spin_lock(&watch_events_lock);
>  			list_add_tail(&msg->list, &watch_events);
>  			wake_up(&watch_events_waitq);
>  			spin_unlock(&watch_events_lock);
>  		} else {
> -			kfree(msg->u.watch.vec);
> +			kfree(body);
>  			kfree(msg);
>  		}
>  		spin_unlock(&watches_lock);
> diff --git a/include/xen/xenbus.h b/include/xen/xenbus.h
> index 98f73a2..869c816 100644
> --- a/include/xen/xenbus.h
> +++ b/include/xen/xenbus.h
> @@ -61,7 +61,7 @@ struct xenbus_watch
> 
>  	/* Callback (executed in a process context with no locks held). */
>  	void (*callback)(struct xenbus_watch *,
> -			 const char **vec, unsigned int len);
> +			 const char *path, const char *token);
>  };
> 
> 
> @@ -193,11 +193,11 @@ void xenbus_probe(struct work_struct *);
>  int xenbus_watch_path(struct xenbus_device *dev, const char *path,
>  		      struct xenbus_watch *watch,
>  		      void (*callback)(struct xenbus_watch *,
> -				       const char **, unsigned int));
> +				       const char *, const char *));
>  __printf(4, 5)
>  int xenbus_watch_pathfmt(struct xenbus_device *dev, struct
> xenbus_watch *watch,
>  			 void (*callback)(struct xenbus_watch *,
> -					  const char **, unsigned int),
> +					  const char *, const char *),
>  			 const char *pathfmt, ...);
> 
>  int xenbus_switch_state(struct xenbus_device *dev, enum xenbus_state
> new_state);
> --
> 2.10.2

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 2/3] xen: modify xenstore watch event interface
  2017-01-06 15:05 ` [PATCH 2/3] xen: modify xenstore watch event interface Juergen Gross
  2017-01-06 15:38   ` Paul Durrant
@ 2017-01-06 16:29   ` Wei Liu
  2017-01-06 16:37   ` Roger Pau Monné
  2017-01-06 21:57   ` Boris Ostrovsky
  3 siblings, 0 replies; 21+ messages in thread
From: Wei Liu @ 2017-01-06 16:29 UTC (permalink / raw)
  To: Juergen Gross
  Cc: linux-kernel, xen-devel, boris.ostrovsky, konrad.wilk, roger.pau,
	wei.liu2, paul.durrant, netdev

On Fri, Jan 06, 2017 at 04:05:43PM +0100, Juergen Gross wrote:
> Today a Xenstore watch event is delivered via a callback function
> declared as:
> 
> void (*callback)(struct xenbus_watch *,
>                  const char **vec, unsigned int len);
> 
> As all watch events only ever come with two parameters (path and token)
> changing the prototype to:
> 
> void (*callback)(struct xenbus_watch *,
>                  const char *path, const char *token);
> 
> is the natural thing to do.
> 
> Apply this change and adapt all users.
> 
> Cc: konrad.wilk@oracle.com
> Cc: roger.pau@citrix.com
> Cc: wei.liu2@citrix.com
> Cc: paul.durrant@citrix.com
> Cc: netdev@vger.kernel.org
> 
> Signed-off-by: Juergen Gross <jgross@suse.com>

Reviewed-by: Wei Liu <wei.liu2@citrix.com>

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 2/3] xen: modify xenstore watch event interface
  2017-01-06 15:05 ` [PATCH 2/3] xen: modify xenstore watch event interface Juergen Gross
  2017-01-06 15:38   ` Paul Durrant
  2017-01-06 16:29   ` Wei Liu
@ 2017-01-06 16:37   ` Roger Pau Monné
  2017-01-06 21:57   ` Boris Ostrovsky
  3 siblings, 0 replies; 21+ messages in thread
From: Roger Pau Monné @ 2017-01-06 16:37 UTC (permalink / raw)
  To: Juergen Gross
  Cc: linux-kernel, xen-devel, boris.ostrovsky, konrad.wilk, wei.liu2,
	paul.durrant, netdev

On Fri, Jan 06, 2017 at 04:05:43PM +0100, Juergen Gross wrote:
> Today a Xenstore watch event is delivered via a callback function
> declared as:
> 
> void (*callback)(struct xenbus_watch *,
>                  const char **vec, unsigned int len);
> 
> As all watch events only ever come with two parameters (path and token)
> changing the prototype to:
> 
> void (*callback)(struct xenbus_watch *,
>                  const char *path, const char *token);
> 
> is the natural thing to do.
> 
> Apply this change and adapt all users.
> 
> Cc: konrad.wilk@oracle.com
> Cc: roger.pau@citrix.com
> Cc: wei.liu2@citrix.com
> Cc: paul.durrant@citrix.com
> Cc: netdev@vger.kernel.org
> 
> Signed-off-by: Juergen Gross <jgross@suse.com>

blkback changes:

Reviewed-by: Roger Pau Monné <roger.pau@citrix.com>

Thanks, Roger.

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 1/3] xen: clean up xenbus internal headers
  2017-01-06 15:05 ` [PATCH 1/3] xen: clean up xenbus internal headers Juergen Gross
@ 2017-01-06 20:52   ` Boris Ostrovsky
  2017-01-09  7:07     ` Juergen Gross
  0 siblings, 1 reply; 21+ messages in thread
From: Boris Ostrovsky @ 2017-01-06 20:52 UTC (permalink / raw)
  To: Juergen Gross, linux-kernel, xen-devel

On 01/06/2017 10:05 AM, Juergen Gross wrote:
> The xenbus driver has an awful mixture of internal and global visible
> headers: some of the internal used only stuff is defined in the
> global header include/xen/xenbus.h while some stuff defined in internal
> headers is used by other drivers, too.
>
> Clean this up by moving the external used symbols to
> include/xen/xenbus.h and the symbols used internal only to a new

I think

s/external/externally/
s/internal/internally/

> header drivers/xen/xenbus/xenbus.h

... and remove xenbus_comms.h

>
> Signed-off-by: Juergen Gross <jgross@suse.com>
> ---
>  drivers/xen/xenbus/{xenbus_probe.h => xenbus.h} | 64 ++++++++++++++-----------
>  drivers/xen/xenbus/xenbus_client.c              |  2 +-
>  drivers/xen/xenbus/xenbus_comms.c               |  2 +-
>  drivers/xen/xenbus/xenbus_comms.h               | 51 --------------------
>  drivers/xen/xenbus/xenbus_dev_backend.c         |  2 +-
>  drivers/xen/xenbus/xenbus_dev_frontend.c        |  4 +-
>  drivers/xen/xenbus/xenbus_probe.c               |  3 +-
>  drivers/xen/xenbus/xenbus_probe_backend.c       |  3 +-
>  drivers/xen/xenbus/xenbus_probe_frontend.c      |  3 +-
>  drivers/xen/xenbus/xenbus_xs.c                  |  3 +-
>  drivers/xen/xenfs/super.c                       |  2 +-
>  drivers/xen/xenfs/xenstored.c                   |  2 +-
>  include/xen/xenbus.h                            | 12 ++---
>  13 files changed, 52 insertions(+), 101 deletions(-)
>  rename drivers/xen/xenbus/{xenbus_probe.h => xenbus.h} (58%)
>  delete mode 100644 drivers/xen/xenbus/xenbus_comms.h
>
> diff --git a/drivers/xen/xenbus/xenbus_probe.h b/drivers/xen/xenbus/xenbus.h
> similarity index 58%
> rename from drivers/xen/xenbus/xenbus_probe.h
> rename to drivers/xen/xenbus/xenbus.h
> index c9ec7ca..6a80c1e 100644
> --- a/drivers/xen/xenbus/xenbus_probe.h
> +++ b/drivers/xen/xenbus/xenbus.h
> @@ -1,10 +1,7 @@
> -/******************************************************************************
> - * xenbus_probe.h
> - *
> - * Talks to Xen Store to figure out what devices we have.
> +/*
> + * Private include for xenbus communications.
>   *
>   * Copyright (C) 2005 Rusty Russell, IBM Corporation
> - * Copyright (C) 2005 XenSource Ltd.

Why is this?

-boris

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 2/3] xen: modify xenstore watch event interface
  2017-01-06 15:05 ` [PATCH 2/3] xen: modify xenstore watch event interface Juergen Gross
                     ` (2 preceding siblings ...)
  2017-01-06 16:37   ` Roger Pau Monné
@ 2017-01-06 21:57   ` Boris Ostrovsky
  2017-01-09  7:12     ` Juergen Gross
  3 siblings, 1 reply; 21+ messages in thread
From: Boris Ostrovsky @ 2017-01-06 21:57 UTC (permalink / raw)
  To: Juergen Gross, linux-kernel, xen-devel
  Cc: konrad.wilk, roger.pau, wei.liu2, paul.durrant, netdev

On 01/06/2017 10:05 AM, Juergen Gross wrote:
> Today a Xenstore watch event is delivered via a callback function
> declared as:
>
> void (*callback)(struct xenbus_watch *,
>                  const char **vec, unsigned int len);
>
> As all watch events only ever come with two parameters (path and token)
> changing the prototype to:
>
> void (*callback)(struct xenbus_watch *,
>                  const char *path, const char *token);
>
> is the natural thing to do.
>
> Apply this change and adapt all users.
>
> Cc: konrad.wilk@oracle.com
> Cc: roger.pau@citrix.com
> Cc: wei.liu2@citrix.com
> Cc: paul.durrant@citrix.com
> Cc: netdev@vger.kernel.org
>
> Signed-off-by: Juergen Gross <jgross@suse.com>


>  
> @@ -903,24 +902,24 @@ static int process_msg(void)
>  	body[msg->hdr.len] = '\0';
>  
>  	if (msg->hdr.type == XS_WATCH_EVENT) {
> -		msg->u.watch.vec = split(body, msg->hdr.len,
> -					 &msg->u.watch.vec_size);
> -		if (IS_ERR(msg->u.watch.vec)) {
> -			err = PTR_ERR(msg->u.watch.vec);
> +		if (count_strings(body, msg->hdr.len) != 2) {
> +			err = -EINVAL;

xenbus_write_watch() returns -EILSEQ when this type of error is
encountered so perhaps for we should return the same error here.

Either way

Reviewed-by: Boris Ostrovsky <boris.ostrovsky@oracle.com>

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 1/3] xen: clean up xenbus internal headers
  2017-01-06 20:52   ` Boris Ostrovsky
@ 2017-01-09  7:07     ` Juergen Gross
  0 siblings, 0 replies; 21+ messages in thread
From: Juergen Gross @ 2017-01-09  7:07 UTC (permalink / raw)
  To: Boris Ostrovsky, linux-kernel, xen-devel

On 06/01/17 21:52, Boris Ostrovsky wrote:
> On 01/06/2017 10:05 AM, Juergen Gross wrote:
>> The xenbus driver has an awful mixture of internal and global visible
>> headers: some of the internal used only stuff is defined in the
>> global header include/xen/xenbus.h while some stuff defined in internal
>> headers is used by other drivers, too.
>>
>> Clean this up by moving the external used symbols to
>> include/xen/xenbus.h and the symbols used internal only to a new
> 
> I think
> 
> s/external/externally/
> s/internal/internally/

Hmm, yes.

>> header drivers/xen/xenbus/xenbus.h
> 
> ... and remove xenbus_comms.h

and xenbus_probe.h.

> 
>>
>> Signed-off-by: Juergen Gross <jgross@suse.com>
>> ---
>>  drivers/xen/xenbus/{xenbus_probe.h => xenbus.h} | 64 ++++++++++++++-----------
>>  drivers/xen/xenbus/xenbus_client.c              |  2 +-
>>  drivers/xen/xenbus/xenbus_comms.c               |  2 +-
>>  drivers/xen/xenbus/xenbus_comms.h               | 51 --------------------
>>  drivers/xen/xenbus/xenbus_dev_backend.c         |  2 +-
>>  drivers/xen/xenbus/xenbus_dev_frontend.c        |  4 +-
>>  drivers/xen/xenbus/xenbus_probe.c               |  3 +-
>>  drivers/xen/xenbus/xenbus_probe_backend.c       |  3 +-
>>  drivers/xen/xenbus/xenbus_probe_frontend.c      |  3 +-
>>  drivers/xen/xenbus/xenbus_xs.c                  |  3 +-
>>  drivers/xen/xenfs/super.c                       |  2 +-
>>  drivers/xen/xenfs/xenstored.c                   |  2 +-
>>  include/xen/xenbus.h                            | 12 ++---
>>  13 files changed, 52 insertions(+), 101 deletions(-)
>>  rename drivers/xen/xenbus/{xenbus_probe.h => xenbus.h} (58%)
>>  delete mode 100644 drivers/xen/xenbus/xenbus_comms.h
>>
>> diff --git a/drivers/xen/xenbus/xenbus_probe.h b/drivers/xen/xenbus/xenbus.h
>> similarity index 58%
>> rename from drivers/xen/xenbus/xenbus_probe.h
>> rename to drivers/xen/xenbus/xenbus.h
>> index c9ec7ca..6a80c1e 100644
>> --- a/drivers/xen/xenbus/xenbus_probe.h
>> +++ b/drivers/xen/xenbus/xenbus.h
>> @@ -1,10 +1,7 @@
>> -/******************************************************************************
>> - * xenbus_probe.h
>> - *
>> - * Talks to Xen Store to figure out what devices we have.
>> +/*
>> + * Private include for xenbus communications.
>>   *
>>   * Copyright (C) 2005 Rusty Russell, IBM Corporation
>> - * Copyright (C) 2005 XenSource Ltd.
> 
> Why is this?

Result of merging two header files. Will re-add.


Juergen

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 2/3] xen: modify xenstore watch event interface
  2017-01-06 21:57   ` Boris Ostrovsky
@ 2017-01-09  7:12     ` Juergen Gross
  0 siblings, 0 replies; 21+ messages in thread
From: Juergen Gross @ 2017-01-09  7:12 UTC (permalink / raw)
  To: Boris Ostrovsky, linux-kernel, xen-devel
  Cc: konrad.wilk, roger.pau, wei.liu2, paul.durrant, netdev

On 06/01/17 22:57, Boris Ostrovsky wrote:
> On 01/06/2017 10:05 AM, Juergen Gross wrote:
>> Today a Xenstore watch event is delivered via a callback function
>> declared as:
>>
>> void (*callback)(struct xenbus_watch *,
>>                  const char **vec, unsigned int len);
>>
>> As all watch events only ever come with two parameters (path and token)
>> changing the prototype to:
>>
>> void (*callback)(struct xenbus_watch *,
>>                  const char *path, const char *token);
>>
>> is the natural thing to do.
>>
>> Apply this change and adapt all users.
>>
>> Cc: konrad.wilk@oracle.com
>> Cc: roger.pau@citrix.com
>> Cc: wei.liu2@citrix.com
>> Cc: paul.durrant@citrix.com
>> Cc: netdev@vger.kernel.org
>>
>> Signed-off-by: Juergen Gross <jgross@suse.com>
> 
> 
>>  
>> @@ -903,24 +902,24 @@ static int process_msg(void)
>>  	body[msg->hdr.len] = '\0';
>>  
>>  	if (msg->hdr.type == XS_WATCH_EVENT) {
>> -		msg->u.watch.vec = split(body, msg->hdr.len,
>> -					 &msg->u.watch.vec_size);
>> -		if (IS_ERR(msg->u.watch.vec)) {
>> -			err = PTR_ERR(msg->u.watch.vec);
>> +		if (count_strings(body, msg->hdr.len) != 2) {
>> +			err = -EINVAL;
> 
> xenbus_write_watch() returns -EILSEQ when this type of error is
> encountered so perhaps for we should return the same error here.

Not since 9a6161fe73bdd3ae4a1e18421b0b20cb7141f680. :-)

> 
> Either way
> 
> Reviewed-by: Boris Ostrovsky <boris.ostrovsky@oracle.com>

Thanks,

Juergen

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 3/3] xen: optimize xenbus driver for multiple concurrent xenstore accesses
  2017-01-06 15:05 ` [PATCH 3/3] xen: optimize xenbus driver for multiple concurrent xenstore accesses Juergen Gross
@ 2017-01-09 21:17   ` Boris Ostrovsky
  2017-01-10  6:18     ` Juergen Gross
  2017-01-10 19:17   ` Boris Ostrovsky
  2017-01-10 22:56   ` Boris Ostrovsky
  2 siblings, 1 reply; 21+ messages in thread
From: Boris Ostrovsky @ 2017-01-09 21:17 UTC (permalink / raw)
  To: Juergen Gross, linux-kernel, xen-devel

On 01/06/2017 10:05 AM, Juergen Gross wrote:
> Handling of multiple concurrent Xenstore accesses through xenbus driver
> either from the kernel or user land is rather lame today: xenbus is
> capable to have one access active only at one point of time.
>
> Rewrite xenbus to handle multiple requests concurrently by making use
> of the request id of the Xenstore protocol. This requires to:
>
> - Instead of blocking inside xb_read() when trying to read data from
>   the xenstore ring buffer do so only in the main loop of
>   xenbus_thread().
>
> - Instead of doing writes to the xenstore ring buffer in the context of
>   the caller just queue the request and do the write in the dedicated
>   xenbus thread.
>
> - Instead of just forwarding the request id specified by the caller of
>   xenbus to xenstore use a xenbus internal unique request id. This will
>   allow multiple outstanding requests.
>
> - Modify the locking scheme in order to allow multiple requests being
>   active in parallel.
>
> - Instead of waiting for the reply of a user's xenstore request after
>   writing the request to the xenstore ring buffer return directly to
>   the caller and do the waiting in the read path.
>
> Additionally signal handling was optimized by avoiding waking up the
> xenbus thread or sending an event to Xenstore in case the addressed
> entity is known to be running already.
>
> As a result communication with Xenstore is sped up by a factor of up
> to 5: depending on the request type (read or write) and the amount of
> data transferred the gain was at least 20% (small reads) and went up to
> a factor of 5 for large writes.
>
> In the end some more rough edges of xenbus have been smoothed:
>
> - Handling of memory shortage when reading from xenstore ring buffer in
>   the xenbus driver was not optimal: it was busy looping and issuing a
>   warning in each loop.
>
> - In case of xenstore not running in dom0 but in a stubdom we end up
>   with two xenbus threads running as the initialization of xenbus in
>   dom0 expecting a local xenstored will be redone later when connecting
>   to the xenstore domain. Up to now this was no problem as locking
>   would prevent the two xenbus threads interfering with each other, but
>   this was just a waste of kernel resources.
>
> - An out of memory situation while writing to or reading from the
>   xenstore ring buffer no longer will lead to a possible loss of
>   synchronization with xenstore.
>
> - The user read and write part are now interruptible by signals.
>
> Signed-off-by: Juergen Gross <jgross@suse.com>
> ---
> I'm aware that the changes are quite large. I thought about sending a
> version split into multiple patches, but a lot of lines would have been
> touched by more than one patch. I still have the multiple patch variant
> lying around - this patch is split into 11 smaller ones. While all
> steps of this larger series is operational some steps are not optimal
> as they are even slower than the original version of xenbus.
>
> Nevertheless I can send the large series if there are requests for it.

I will comment only on xen_comms changes for now since otherwise I am
afraid it may be difficult to keep track of conversation.


> diff --git a/drivers/xen/xenbus/xenbus_comms.c b/drivers/xen/xenbus/xenbus_comms.c
> index c21ec02..fa054ca 100644
> --- a/drivers/xen/xenbus/xenbus_comms.c
> +++ b/drivers/xen/xenbus/xenbus_comms.c
> @@ -34,6 +34,7 @@
>  
>  #include <linux/wait.h>
>  #include <linux/interrupt.h>
> +#include <linux/kthread.h>
>  #include <linux/sched.h>
>  #include <linux/err.h>
>  #include <xen/xenbus.h>
> @@ -42,11 +43,40 @@
>  #include <xen/page.h>
>  #include "xenbus.h"
>  
> +struct xs_thread_state_write {
> +	struct xb_req_data *req;
> +	int idx;
> +	unsigned int used;

"written" or "sent"?

> +};
> +
> +struct xs_thread_state_read {
> +	struct xsd_sockmsg msg;
> +	char *body;
> +	union {
> +		void *alloc;
> +		struct xs_watch_event *watch;
> +	};
> +	bool in_msg;
> +	bool in_hdr;

It may be better to keep track of which state we are in using a bitmap.
Otherwise it easy to lose track of one or the other.

> +	unsigned int used;

"read" or"received"?

> +};

Both of these are private to process_msg/process_write so perhaps they
can be declared in those routines' scopes.

> +
> +/* A list of replies. Currently only one will ever be outstanding. */
> +LIST_HEAD(xs_reply_list);
> +
> +/* A list of write requests. */
> +LIST_HEAD(xb_write_list);
> +DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
> +DEFINE_MUTEX(xb_write_mutex);
> +
> +/* Protect xenbus reader thread against save/restore. */
> +DEFINE_MUTEX(xs_response_mutex);
> +
>  static int xenbus_irq;
> +static struct task_struct *xenbus_task;
>  
>  static DECLARE_WORK(probe_work, xenbus_probe);
>  
> -static DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
>  
>  static irqreturn_t wake_waiting(int irq, void *unused)
>  {
> @@ -84,30 +114,31 @@ static const void *get_input_chunk(XENSTORE_RING_IDX cons,
>  	return buf + MASK_XENSTORE_IDX(cons);
>  }
>  
> +static int xb_data_to_write(void)
> +{
> +	struct xenstore_domain_interface *intf = xen_store_interface;
> +
> +	return (intf->req_prod - intf->req_cons) != XENSTORE_RING_SIZE &&
> +		!list_empty(&xb_write_list);
> +}
> +
>  /**
>   * xb_write - low level write
>   * @data: buffer to send
>   * @len: length of buffer
>   *
> - * Returns 0 on success, error otherwise.
> + * Returns number of bytes written or -err.
>   */
> -int xb_write(const void *data, unsigned len)
> +static int xb_write(const void *data, unsigned int len)
>  {
>  	struct xenstore_domain_interface *intf = xen_store_interface;
>  	XENSTORE_RING_IDX cons, prod;
> -	int rc;
> +	unsigned int bytes = 0;
>  
>  	while (len != 0) {
>  		void *dst;
>  		unsigned int avail;
>  
> -		rc = wait_event_interruptible(
> -			xb_waitq,
> -			(intf->req_prod - intf->req_cons) !=
> -			XENSTORE_RING_SIZE);
> -		if (rc < 0)
> -			return rc;
> -
>  		/* Read indexes, then verify. */
>  		cons = intf->req_cons;
>  		prod = intf->req_prod;
> @@ -115,59 +146,57 @@ int xb_write(const void *data, unsigned len)
>  			intf->req_cons = intf->req_prod = 0;
>  			return -EIO;
>  		}
> -
> -		dst = get_output_chunk(cons, prod, intf->req, &avail);
> -		if (avail == 0)
> -			continue;
> -		if (avail > len)
> -			avail = len;
> +		if (!xb_data_to_write())
> +			return bytes;
>  
>  		/* Must write data /after/ reading the consumer index. */
>  		virt_mb();
>  
> +		dst = get_output_chunk(cons, prod, intf->req, &avail);
> +		if (avail == 0)
> +			continue;

Should we continue the loop here or return? We are waiting for the
reader to get stuff off the ring.


> +		if (avail > len)
> +			avail = len;
> +
>  		memcpy(dst, data, avail);
>  		data += avail;
>  		len -= avail;
> +		bytes += avail;
>  
>  		/* Other side must not see new producer until data is there. */
>  		virt_wmb();
>  		intf->req_prod += avail;
>  
>  		/* Implies mb(): other side will see the updated producer. */
> -		notify_remote_via_evtchn(xen_store_evtchn);
> +		if (prod <= intf->req_cons)
> +			notify_remote_via_evtchn(xen_store_evtchn);
>  	}
>  
> -	return 0;
> +	return bytes;
>  }
>  
> -int xb_data_to_read(void)
> +static int xb_data_to_read(void)
>  {
>  	struct xenstore_domain_interface *intf = xen_store_interface;
>  	return (intf->rsp_cons != intf->rsp_prod);
>  }
>  
> -int xb_wait_for_data_to_read(void)
> -{
> -	return wait_event_interruptible(xb_waitq, xb_data_to_read());
> -}
> -
> -int xb_read(void *data, unsigned len)
> +static int xb_read(void *data, unsigned int len)
>  {
>  	struct xenstore_domain_interface *intf = xen_store_interface;
>  	XENSTORE_RING_IDX cons, prod;
> -	int rc;
> +	unsigned int bytes = 0;
>  
>  	while (len != 0) {
>  		unsigned int avail;
>  		const char *src;
>  
> -		rc = xb_wait_for_data_to_read();
> -		if (rc < 0)
> -			return rc;
> -
>  		/* Read indexes, then verify. */
>  		cons = intf->rsp_cons;
>  		prod = intf->rsp_prod;
> +		if (cons == prod)
> +			return bytes;
> +
>  		if (!check_indexes(cons, prod)) {
>  			intf->rsp_cons = intf->rsp_prod = 0;
>  			return -EIO;
> @@ -185,17 +214,229 @@ int xb_read(void *data, unsigned len)
>  		memcpy(data, src, avail);
>  		data += avail;
>  		len -= avail;
> +		bytes += avail;
>  
>  		/* Other side must not see free space until we've copied out */
>  		virt_mb();
>  		intf->rsp_cons += avail;
>  
> -		pr_debug("Finished read of %i bytes (%i to go)\n", avail, len);
> -
>  		/* Implies mb(): other side will see the updated consumer. */
> -		notify_remote_via_evtchn(xen_store_evtchn);
> +		if (intf->rsp_prod - cons >= XENSTORE_RING_SIZE)
> +			notify_remote_via_evtchn(xen_store_evtchn);
>  	}
>  
> +	return bytes;
> +}
> +
> +static int process_msg(void)
> +{
> +	static struct xs_thread_state_read state;
> +	struct xb_req_data *req;
> +	int err;
> +	unsigned int len;
> +
> +	if (!state.in_msg) {
> +		state.in_msg = true;
> +		state.in_hdr = true;
> +		state.used = 0;
> +
> +		/*
> +		 * We must disallow save/restore while reading a message.
> +		 * A partial read across s/r leaves us out of sync with
> +		 * xenstored.
> +		 */
> +		mutex_lock(&xs_response_mutex);
> +
> +		if (!xb_data_to_read()) {
> +			/* We raced with save/restore: pending data 'gone'. */
> +			mutex_unlock(&xs_response_mutex);
> +			state.in_msg = false;
> +			return 0;
> +		}
> +	}
> +
> +	if (state.in_hdr) {
> +		if (state.used != sizeof(state.msg)) {
> +			err = xb_read((void *)&state.msg + state.used,
> +				      sizeof(state.msg) - state.used);
> +			if (err < 0)
> +				goto out;
> +			state.used += err;
> +			if (state.used != sizeof(state.msg))
> +				return 0;

Would it be possible to do locking at the caller? I understand that you
are trying to hold the lock across multiple invocations of this function
but it feels somewhat counter-intuitive and bug-prone.

If it's not possible then at least please add a comment explaining
locking algorithm.

> +			if (state.msg.len > XENSTORE_PAYLOAD_MAX) {
> +				err = -EINVAL;
> +				goto out;
> +			}
> +		}
> +
> +		len = state.msg.len + 1;
> +		if (state.msg.type == XS_WATCH_EVENT)
> +			len += sizeof(*state.watch);
> +
> +		state.alloc = kmalloc(len, GFP_NOIO | __GFP_HIGH);

Why can't you kmalloc to state.body only when type!=XS_WATCH_EVENT ?

> +		if (!state.alloc)
> +			return -ENOMEM;
> +
> +		if (state.msg.type == XS_WATCH_EVENT)
> +			state.body = state.watch->body;
> +		else
> +			state.body = state.alloc;
> +		state.in_hdr = false;
> +		state.used = 0;
> +	}
> +
> +	err = xb_read(state.body + state.used, state.msg.len - state.used);
> +	if (err < 0)
> +		goto out;
> +
> +	state.used += err;
> +	if (state.used != state.msg.len)
> +		return 0;
> +
> +	state.body[state.msg.len] = '\0';
> +
> +	if (state.msg.type == XS_WATCH_EVENT) {
> +		state.watch->len = state.msg.len;
> +		err = xs_watch_msg(state.watch);
> +	} else {
> +		err = -ENOENT;
> +		mutex_lock(&xb_write_mutex);
> +		list_for_each_entry(req, &xs_reply_list, list) {
> +			if (req->msg.req_id == state.msg.req_id) {
> +				if (req->state == xb_req_state_wait_reply) {
> +					req->msg.type = state.msg.type;
> +					req->msg.len = state.msg.len;
> +					req->body = state.body;
> +					req->state = xb_req_state_got_reply;
> +					list_del(&req->list);
> +					req->cb(req);
> +				} else {
> +					list_del(&req->list);
> +					kfree(req);
> +				}
> +				err = 0;
> +				break;
> +			}
> +		}
> +		mutex_unlock(&xb_write_mutex);
> +		if (err)
> +			goto out;
> +	}
> +
> +	mutex_unlock(&xs_response_mutex);
> +
> +	state.in_msg = false;
> +	state.alloc = NULL;
> +	return err;
> +
> + out:
> +	mutex_unlock(&xs_response_mutex);
> +	state.in_msg = false;
> +	kfree(state.alloc);
> +	state.alloc = NULL;
> +	return err;
> +}
> +
> +static int process_writes(void)
> +{
> +	static struct xs_thread_state_write state;
> +	void *base;
> +	unsigned int len;
> +	int err = 0;
> +
> +	if (!xb_data_to_write())
> +		return 0;
> +
> +	mutex_lock(&xb_write_mutex);
> +
> +	if (!state.req) {
> +		state.req = list_first_entry(&xb_write_list,
> +					     struct xb_req_data, list);
> +		state.idx = -1;
> +		state.used = 0;
> +	}
> +
> +	if (state.req->state == xb_req_state_aborted)
> +		goto out_err;
> +
> +	while (state.idx < state.req->num_vecs) {
> +		if (state.idx < 0) {
> +			base = &state.req->msg;
> +			len = sizeof(state.req->msg);
> +		} else {
> +			base = state.req->vec[state.idx].iov_base;
> +			len = state.req->vec[state.idx].iov_len;
> +		}
> +		err = xb_write(base + state.used, len - state.used);
> +		if (err < 0)
> +			goto out_err;
> +		state.used += err;
> +		if (state.used != len)
> +			goto out;
> +
> +		state.idx++;
> +		state.used = 0;
> +	}
> +
> +	/*
> +	 * You would expect the following to be racy, but as the response is
> +	 * being read by our thread there is no risk of req being freed
> +	 * under our feet.
> +	 */

I don't think I understand this (and it's missing a "so" or something
like that between "thread" and "there"). If this is not racy, why are we
doing this under xb_write_mutex?

> +	list_del(&state.req->list);
> +	state.req->state = xb_req_state_wait_reply;
> +	list_add_tail(&state.req->list, &xs_reply_list);
> +	state.req = NULL;
> +
> + out:
> +	mutex_unlock(&xb_write_mutex);
> +
> +	return 0;
> +
> + out_err:
> +	state.req->msg.type = XS_ERROR;
> +	state.req->err = err;

You don't seem to need this for xb_req_state_aborted since you are
freeing state_req. OTOH, why shouldn't aborted requests generate an
error reply as well?


> +	list_del(&state.req->list);
> +	if (state.req->state == xb_req_state_aborted)
> +		kfree(state.req);
> +	else {
> +		state.req->state = xb_req_state_got_reply;
> +		wake_up(&state.req->wq);
> +	}
> +
> +	mutex_unlock(&xb_write_mutex);
> +
> +	state.req = NULL;
> +
> +	return err;
> +}
> +
> +static int xb_thread_work(void)
> +{
> +	return xb_data_to_read() || xb_data_to_write();
> +}
> +
> +static int xenbus_thread(void *unused)
> +{
> +	int err;
> +
> +	while (!kthread_should_stop()) {
> +		if (wait_event_interruptible(xb_waitq, xb_thread_work()))
> +			continue;
> +
> +		err = process_msg();
> +		if (err == -ENOMEM)
> +			schedule();
> +		else if (err)
> +			pr_warn("error %d while reading message\n", err);
> +
> +		err = process_writes();
> +		if (err)
> +			pr_warn("error %d while writing message\n", err);

Is there a chance that errors are persistent and you then spam the log?


-boris

> +	}
> +
> +	xenbus_task = NULL;
>  	return 0;
>  }
>  
> @@ -223,6 +464,7 @@ int xb_init_comms(void)
>  		rebind_evtchn_irq(xen_store_evtchn, xenbus_irq);
>  	} else {
>  		int err;
> +
>  		err = bind_evtchn_to_irqhandler(xen_store_evtchn, wake_waiting,
>  						0, "xenbus", &xb_waitq);
>  		if (err < 0) {
> @@ -231,6 +473,13 @@ int xb_init_comms(void)
>  		}
>  
>  		xenbus_irq = err;
> +
> +		if (!xenbus_task) {
> +			xenbus_task = kthread_run(xenbus_thread, NULL,
> +						  "xenbus");
> +			if (IS_ERR(xenbus_task))
> +				return PTR_ERR(xenbus_task);
> +		}
>  	}
>  
>  	return 0;
>
>
>   
>
>

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 3/3] xen: optimize xenbus driver for multiple concurrent xenstore accesses
  2017-01-09 21:17   ` Boris Ostrovsky
@ 2017-01-10  6:18     ` Juergen Gross
  2017-01-10 16:36       ` Boris Ostrovsky
  0 siblings, 1 reply; 21+ messages in thread
From: Juergen Gross @ 2017-01-10  6:18 UTC (permalink / raw)
  To: Boris Ostrovsky, linux-kernel, xen-devel

On 09/01/17 22:17, Boris Ostrovsky wrote:
> On 01/06/2017 10:05 AM, Juergen Gross wrote:
>> Handling of multiple concurrent Xenstore accesses through xenbus driver
>> either from the kernel or user land is rather lame today: xenbus is
>> capable to have one access active only at one point of time.
>>
>> Rewrite xenbus to handle multiple requests concurrently by making use
>> of the request id of the Xenstore protocol. This requires to:
>>
>> - Instead of blocking inside xb_read() when trying to read data from
>>   the xenstore ring buffer do so only in the main loop of
>>   xenbus_thread().
>>
>> - Instead of doing writes to the xenstore ring buffer in the context of
>>   the caller just queue the request and do the write in the dedicated
>>   xenbus thread.
>>
>> - Instead of just forwarding the request id specified by the caller of
>>   xenbus to xenstore use a xenbus internal unique request id. This will
>>   allow multiple outstanding requests.
>>
>> - Modify the locking scheme in order to allow multiple requests being
>>   active in parallel.
>>
>> - Instead of waiting for the reply of a user's xenstore request after
>>   writing the request to the xenstore ring buffer return directly to
>>   the caller and do the waiting in the read path.
>>
>> Additionally signal handling was optimized by avoiding waking up the
>> xenbus thread or sending an event to Xenstore in case the addressed
>> entity is known to be running already.
>>
>> As a result communication with Xenstore is sped up by a factor of up
>> to 5: depending on the request type (read or write) and the amount of
>> data transferred the gain was at least 20% (small reads) and went up to
>> a factor of 5 for large writes.
>>
>> In the end some more rough edges of xenbus have been smoothed:
>>
>> - Handling of memory shortage when reading from xenstore ring buffer in
>>   the xenbus driver was not optimal: it was busy looping and issuing a
>>   warning in each loop.
>>
>> - In case of xenstore not running in dom0 but in a stubdom we end up
>>   with two xenbus threads running as the initialization of xenbus in
>>   dom0 expecting a local xenstored will be redone later when connecting
>>   to the xenstore domain. Up to now this was no problem as locking
>>   would prevent the two xenbus threads interfering with each other, but
>>   this was just a waste of kernel resources.
>>
>> - An out of memory situation while writing to or reading from the
>>   xenstore ring buffer no longer will lead to a possible loss of
>>   synchronization with xenstore.
>>
>> - The user read and write part are now interruptible by signals.
>>
>> Signed-off-by: Juergen Gross <jgross@suse.com>
>> ---
>> I'm aware that the changes are quite large. I thought about sending a
>> version split into multiple patches, but a lot of lines would have been
>> touched by more than one patch. I still have the multiple patch variant
>> lying around - this patch is split into 11 smaller ones. While all
>> steps of this larger series is operational some steps are not optimal
>> as they are even slower than the original version of xenbus.
>>
>> Nevertheless I can send the large series if there are requests for it.
> 
> I will comment only on xen_comms changes for now since otherwise I am
> afraid it may be difficult to keep track of conversation.

Okay.

>> diff --git a/drivers/xen/xenbus/xenbus_comms.c b/drivers/xen/xenbus/xenbus_comms.c
>> index c21ec02..fa054ca 100644
>> --- a/drivers/xen/xenbus/xenbus_comms.c
>> +++ b/drivers/xen/xenbus/xenbus_comms.c
>> @@ -34,6 +34,7 @@
>>  
>>  #include <linux/wait.h>
>>  #include <linux/interrupt.h>
>> +#include <linux/kthread.h>
>>  #include <linux/sched.h>
>>  #include <linux/err.h>
>>  #include <xen/xenbus.h>
>> @@ -42,11 +43,40 @@
>>  #include <xen/page.h>
>>  #include "xenbus.h"
>>  
>> +struct xs_thread_state_write {
>> +	struct xb_req_data *req;
>> +	int idx;
>> +	unsigned int used;
> 
> "written" or "sent"?

I don't mind.

>> +};
>> +
>> +struct xs_thread_state_read {
>> +	struct xsd_sockmsg msg;
>> +	char *body;
>> +	union {
>> +		void *alloc;
>> +		struct xs_watch_event *watch;
>> +	};
>> +	bool in_msg;
>> +	bool in_hdr;
> 
> It may be better to keep track of which state we are in using a bitmap.
> Otherwise it easy to lose track of one or the other.

Hmm, really? It's rather easy:
in_msg: are we processing any message?
in_hdr: are we processing the message header (in_msg is true)?

>> +	unsigned int used;
> 
> "read" or"received"?

Sure, can change.

>> +};
> 
> Both of these are private to process_msg/process_write so perhaps they
> can be declared in those routines' scopes.

I can do this if you want.

>>  		/* Read indexes, then verify. */
>>  		cons = intf->req_cons;
>>  		prod = intf->req_prod;
>> @@ -115,59 +146,57 @@ int xb_write(const void *data, unsigned len)
>>  			intf->req_cons = intf->req_prod = 0;
>>  			return -EIO;
>>  		}
>> -
>> -		dst = get_output_chunk(cons, prod, intf->req, &avail);
>> -		if (avail == 0)
>> -			continue;
>> -		if (avail > len)
>> -			avail = len;
>> +		if (!xb_data_to_write())
>> +			return bytes;
>>  
>>  		/* Must write data /after/ reading the consumer index. */
>>  		virt_mb();
>>  
>> +		dst = get_output_chunk(cons, prod, intf->req, &avail);
>> +		if (avail == 0)
>> +			continue;
> 
> Should we continue the loop here or return? We are waiting for the
> reader to get stuff off the ring.

avail == 0 can happen only if the reader just modified req_cons
between us reading it to cons and testing for free space via
xb_data_to_write(). So the (local) retry should happen only very
rarely and exactly once between writing any further bytes. We
could return, of course, but then the retry would happen anyway
via the main thread loop.

>> +static int process_msg(void)
>> +{
>> +	static struct xs_thread_state_read state;
>> +	struct xb_req_data *req;
>> +	int err;
>> +	unsigned int len;
>> +
>> +	if (!state.in_msg) {
>> +		state.in_msg = true;
>> +		state.in_hdr = true;
>> +		state.used = 0;
>> +
>> +		/*
>> +		 * We must disallow save/restore while reading a message.
>> +		 * A partial read across s/r leaves us out of sync with
>> +		 * xenstored.
>> +		 */
>> +		mutex_lock(&xs_response_mutex);
>> +
>> +		if (!xb_data_to_read()) {
>> +			/* We raced with save/restore: pending data 'gone'. */
>> +			mutex_unlock(&xs_response_mutex);
>> +			state.in_msg = false;
>> +			return 0;
>> +		}
>> +	}
>> +
>> +	if (state.in_hdr) {
>> +		if (state.used != sizeof(state.msg)) {
>> +			err = xb_read((void *)&state.msg + state.used,
>> +				      sizeof(state.msg) - state.used);
>> +			if (err < 0)
>> +				goto out;
>> +			state.used += err;
>> +			if (state.used != sizeof(state.msg))
>> +				return 0;
> 
> Would it be possible to do locking at the caller? I understand that you
> are trying to hold the lock across multiple invocations of this function
> but it feels somewhat counter-intuitive and bug-prone.

I think that would be difficult.

> If it's not possible then at least please add a comment explaining
> locking algorithm.

Okay. Something like:

/*
 * xs_response_mutex is locked as long as we are processing one
 * message. state.in_msg will be true as long as we are holding the
 * lock in process_msg().
 */

>> +			if (state.msg.len > XENSTORE_PAYLOAD_MAX) {
>> +				err = -EINVAL;
>> +				goto out;
>> +			}
>> +		}
>> +
>> +		len = state.msg.len + 1;
>> +		if (state.msg.type == XS_WATCH_EVENT)
>> +			len += sizeof(*state.watch);
>> +
>> +		state.alloc = kmalloc(len, GFP_NOIO | __GFP_HIGH);
> 
> Why can't you kmalloc to state.body only when type!=XS_WATCH_EVENT ?

I need to read the watch data, too.

>> +		if (!state.alloc)
>> +			return -ENOMEM;
>> +
>> +		if (state.msg.type == XS_WATCH_EVENT)
>> +			state.body = state.watch->body;
>> +		else
>> +			state.body = state.alloc;
>> +		state.in_hdr = false;
>> +		state.used = 0;
>> +	}

>> +static int process_writes(void)
>> +{
>> +	static struct xs_thread_state_write state;
>> +	void *base;
>> +	unsigned int len;
>> +	int err = 0;
>> +
>> +	if (!xb_data_to_write())
>> +		return 0;
>> +
>> +	mutex_lock(&xb_write_mutex);
>> +
>> +	if (!state.req) {
>> +		state.req = list_first_entry(&xb_write_list,
>> +					     struct xb_req_data, list);
>> +		state.idx = -1;
>> +		state.used = 0;
>> +	}
>> +
>> +	if (state.req->state == xb_req_state_aborted)
>> +		goto out_err;
>> +
>> +	while (state.idx < state.req->num_vecs) {
>> +		if (state.idx < 0) {
>> +			base = &state.req->msg;
>> +			len = sizeof(state.req->msg);
>> +		} else {
>> +			base = state.req->vec[state.idx].iov_base;
>> +			len = state.req->vec[state.idx].iov_len;
>> +		}
>> +		err = xb_write(base + state.used, len - state.used);
>> +		if (err < 0)
>> +			goto out_err;
>> +		state.used += err;
>> +		if (state.used != len)
>> +			goto out;
>> +
>> +		state.idx++;
>> +		state.used = 0;
>> +	}
>> +
>> +	/*
>> +	 * You would expect the following to be racy, but as the response is
>> +	 * being read by our thread there is no risk of req being freed
>> +	 * under our feet.
>> +	 */
> 
> I don't think I understand this (and it's missing a "so" or something
> like that between "thread" and "there"). If this is not racy, why are we
> doing this under xb_write_mutex?

You are right. This was a problem in an intermediate stage of
development, but now the freeing of req is done with xb_write_mutex
held. I'll remove the comment.

>> +	list_del(&state.req->list);
>> +	state.req->state = xb_req_state_wait_reply;
>> +	list_add_tail(&state.req->list, &xs_reply_list);
>> +	state.req = NULL;
>> +
>> + out:
>> +	mutex_unlock(&xb_write_mutex);
>> +
>> +	return 0;
>> +
>> + out_err:
>> +	state.req->msg.type = XS_ERROR;
>> +	state.req->err = err;
> 
> You don't seem to need this for xb_req_state_aborted since you are
> freeing state_req. OTOH, why shouldn't aborted requests generate an
> error reply as well?

They do. Before setting xb_req_state_aborted a possible error
is taken from req (see xs_wait_for_reply()). In case of an
early error returned (EIO in read_reply()) there is nobody
waiting for (another) response.

>> +	list_del(&state.req->list);
>> +	if (state.req->state == xb_req_state_aborted)
>> +		kfree(state.req);
>> +	else {
>> +		state.req->state = xb_req_state_got_reply;
>> +		wake_up(&state.req->wq);
>> +	}
>> +
>> +	mutex_unlock(&xb_write_mutex);
>> +
>> +	state.req = NULL;
>> +
>> +	return err;
>> +}
>> +
>> +static int xb_thread_work(void)
>> +{
>> +	return xb_data_to_read() || xb_data_to_write();
>> +}
>> +
>> +static int xenbus_thread(void *unused)
>> +{
>> +	int err;
>> +
>> +	while (!kthread_should_stop()) {
>> +		if (wait_event_interruptible(xb_waitq, xb_thread_work()))
>> +			continue;
>> +
>> +		err = process_msg();
>> +		if (err == -ENOMEM)
>> +			schedule();
>> +		else if (err)
>> +			pr_warn("error %d while reading message\n", err);
>> +
>> +		err = process_writes();
>> +		if (err)
>> +			pr_warn("error %d while writing message\n", err);
> 
> Is there a chance that errors are persistent and you then spam the log?

Only in case xenstored is spamming the ring buffer with illegal data.
I believe this is rather improbable and we are doomed in this case
anyway. OTOH it doesn't hurt to switch to pr_warn_ratelimited().

> -boris

Thanks for the comments!


Juergen

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 3/3] xen: optimize xenbus driver for multiple concurrent xenstore accesses
  2017-01-10  6:18     ` Juergen Gross
@ 2017-01-10 16:36       ` Boris Ostrovsky
  2017-01-10 16:38         ` Boris Ostrovsky
  2017-01-10 16:46         ` Juergen Gross
  0 siblings, 2 replies; 21+ messages in thread
From: Boris Ostrovsky @ 2017-01-10 16:36 UTC (permalink / raw)
  To: Juergen Gross, linux-kernel, xen-devel


>>> +static int process_msg(void)
>>> +{
>>> +	static struct xs_thread_state_read state;
>>> +	struct xb_req_data *req;
>>> +	int err;
>>> +	unsigned int len;
>>> +
>>> +	if (!state.in_msg) {
>>> +		state.in_msg = true;
>>> +		state.in_hdr = true;
>>> +		state.used = 0;
>>> +
>>> +		/*
>>> +		 * We must disallow save/restore while reading a message.
>>> +		 * A partial read across s/r leaves us out of sync with
>>> +		 * xenstored.
>>> +		 */
>>> +		mutex_lock(&xs_response_mutex);
>>> +
>>> +		if (!xb_data_to_read()) {
>>> +			/* We raced with save/restore: pending data 'gone'. */
>>> +			mutex_unlock(&xs_response_mutex);
>>> +			state.in_msg = false;

Just noticed: should in_hdr be set to false here as well?

>>> +			return 0;
>>> +		}

Or set it to true here.

>>> +	}
>>> +
>>> +	if (state.in_hdr) {
>>> +		if (state.used != sizeof(state.msg)) {
>>> +			err = xb_read((void *)&state.msg + state.used,
>>> +				      sizeof(state.msg) - state.used);
>>> +			if (err < 0)
>>> +				goto out;
>>> +			state.used += err;
>>> +			if (state.used != sizeof(state.msg))
>>> +				return 0;
>> Would it be possible to do locking at the caller? I understand that you
>> are trying to hold the lock across multiple invocations of this function
>> but it feels somewhat counter-intuitive and bug-prone.
> I think that would be difficult.
>
>> If it's not possible then at least please add a comment explaining
>> locking algorithm.
> Okay. Something like:
>
> /*
>  * xs_response_mutex is locked as long as we are processing one
>  * message. state.in_msg will be true as long as we are holding the
>  * lock in process_msg().


Then in_msg is the same as mutex_is_locked(&xs_response_mutex). And if
so, do we really need it?


-boris

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 3/3] xen: optimize xenbus driver for multiple concurrent xenstore accesses
  2017-01-10 16:36       ` Boris Ostrovsky
@ 2017-01-10 16:38         ` Boris Ostrovsky
  2017-01-10 16:46         ` Juergen Gross
  1 sibling, 0 replies; 21+ messages in thread
From: Boris Ostrovsky @ 2017-01-10 16:38 UTC (permalink / raw)
  To: Juergen Gross, linux-kernel, xen-devel


>> /*
>>  * xs_response_mutex is locked as long as we are processing one
>>  * message. state.in_msg will be true as long as we are holding the
>>  * lock in process_msg().
>
> Then in_msg is the same as mutex_is_locked(&xs_response_mutex). And if
> so, do we really need it?


Nevermind this. The lock can be held by others, obviously.

-boris

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 3/3] xen: optimize xenbus driver for multiple concurrent xenstore accesses
  2017-01-10 16:36       ` Boris Ostrovsky
  2017-01-10 16:38         ` Boris Ostrovsky
@ 2017-01-10 16:46         ` Juergen Gross
  1 sibling, 0 replies; 21+ messages in thread
From: Juergen Gross @ 2017-01-10 16:46 UTC (permalink / raw)
  To: Boris Ostrovsky, linux-kernel, xen-devel

On 10/01/17 17:36, Boris Ostrovsky wrote:
> 
>>>> +static int process_msg(void)
>>>> +{
>>>> +	static struct xs_thread_state_read state;
>>>> +	struct xb_req_data *req;
>>>> +	int err;
>>>> +	unsigned int len;
>>>> +
>>>> +	if (!state.in_msg) {
>>>> +		state.in_msg = true;
>>>> +		state.in_hdr = true;
>>>> +		state.used = 0;
>>>> +
>>>> +		/*
>>>> +		 * We must disallow save/restore while reading a message.
>>>> +		 * A partial read across s/r leaves us out of sync with
>>>> +		 * xenstored.
>>>> +		 */
>>>> +		mutex_lock(&xs_response_mutex);
>>>> +
>>>> +		if (!xb_data_to_read()) {
>>>> +			/* We raced with save/restore: pending data 'gone'. */
>>>> +			mutex_unlock(&xs_response_mutex);
>>>> +			state.in_msg = false;
> 
> Just noticed: should in_hdr be set to false here as well?

Doesn't matter: It is valid only if in_msg is true.

> 
>>>> +			return 0;
>>>> +		}
> 
> Or set it to true here.
> 
>>>> +	}
>>>> +
>>>> +	if (state.in_hdr) {
>>>> +		if (state.used != sizeof(state.msg)) {
>>>> +			err = xb_read((void *)&state.msg + state.used,
>>>> +				      sizeof(state.msg) - state.used);
>>>> +			if (err < 0)
>>>> +				goto out;
>>>> +			state.used += err;
>>>> +			if (state.used != sizeof(state.msg))
>>>> +				return 0;
>>> Would it be possible to do locking at the caller? I understand that you
>>> are trying to hold the lock across multiple invocations of this function
>>> but it feels somewhat counter-intuitive and bug-prone.
>> I think that would be difficult.
>>
>>> If it's not possible then at least please add a comment explaining
>>> locking algorithm.
>> Okay. Something like:
>>
>> /*
>>  * xs_response_mutex is locked as long as we are processing one
>>  * message. state.in_msg will be true as long as we are holding the
>>  * lock in process_msg().
> 
> 
> Then in_msg is the same as mutex_is_locked(&xs_response_mutex). And if
> so, do we really need it?

Yes. xs_response_mutex is used in suspend path, too.


Juergen

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 3/3] xen: optimize xenbus driver for multiple concurrent xenstore accesses
  2017-01-06 15:05 ` [PATCH 3/3] xen: optimize xenbus driver for multiple concurrent xenstore accesses Juergen Gross
  2017-01-09 21:17   ` Boris Ostrovsky
@ 2017-01-10 19:17   ` Boris Ostrovsky
  2017-01-10 22:56   ` Boris Ostrovsky
  2 siblings, 0 replies; 21+ messages in thread
From: Boris Ostrovsky @ 2017-01-10 19:17 UTC (permalink / raw)
  To: Juergen Gross, linux-kernel, xen-devel


> diff --git a/drivers/xen/xenbus/xenbus_dev_frontend.c b/drivers/xen/xenbus/xenbus_dev_frontend.c
> index e4b9847..4d343ee 100644
> --- a/drivers/xen/xenbus/xenbus_dev_frontend.c
> +++ b/drivers/xen/xenbus/xenbus_dev_frontend.c
>


LGTM.

-boris

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 3/3] xen: optimize xenbus driver for multiple concurrent xenstore accesses
  2017-01-06 15:05 ` [PATCH 3/3] xen: optimize xenbus driver for multiple concurrent xenstore accesses Juergen Gross
  2017-01-09 21:17   ` Boris Ostrovsky
  2017-01-10 19:17   ` Boris Ostrovsky
@ 2017-01-10 22:56   ` Boris Ostrovsky
  2017-01-11  5:26     ` Juergen Gross
  2 siblings, 1 reply; 21+ messages in thread
From: Boris Ostrovsky @ 2017-01-10 22:56 UTC (permalink / raw)
  To: Juergen Gross, linux-kernel, xen-devel




> diff --git a/drivers/xen/xenbus/xenbus_xs.c b/drivers/xen/xenbus/xenbus_xs.c
> index ebc768f..ebdfbee 100644
> --- a/drivers/xen/xenbus/xenbus_xs.c
> +++ b/drivers/xen/xenbus/xenbus_xs.c


> -
> -static struct xs_handle xs_state;
> +/*
> + * Framework to protect suspend/resume handling against normal Xenstore
> + * message handling:
> + * During suspend/resume there must be no open transaction and no pending
> + * Xenstore request.
> + * New watch events happening in this time can be ignored by firing all watches
> + * after resume.
> + */
> +/* Lock protecting enter/exit critical region. */
> +static DEFINE_SPINLOCK(xs_state_lock);
> +/* Wait queue for all callers waiting for critical region to become usable. */
> +static DECLARE_WAIT_QUEUE_HEAD(xs_state_enter_wq);
> +/* Wait queue for suspend handling waiting for critical region being empty. */
> +static DECLARE_WAIT_QUEUE_HEAD(xs_state_exit_wq);
> +/* Number of users in critical region. */
> +static unsigned int xs_state_users;
> +/* Suspend handler waiting or already active? */
> +static int xs_suspend_active;

I think these two should be declared next to xs_state _lock since they
are protected by it. Or maybe even put them into some sort of a state
struct.


> +
> +
> +static bool test_reply(struct xb_req_data *req)
> +{
> +	if (req->state == xb_req_state_got_reply || !xenbus_ok())
> +		return true;
> +
> +	/* Make sure to reread req->state each time. */
> +	cpu_relax();

I don't think I understand why this is needed.

> +
> +	return false;
> +}
> +


> +static void xs_send(struct xb_req_data *req, struct xsd_sockmsg *msg)
>  {
> -	mutex_lock(&xs_state.transaction_mutex);
> -	atomic_inc(&xs_state.transaction_count);
> -	mutex_unlock(&xs_state.transaction_mutex);
> -}
> +	bool notify;
>  
> -static void transaction_end(void)
> -{
> -	if (atomic_dec_and_test(&xs_state.transaction_count))
> -		wake_up(&xs_state.transaction_wq);
> -}
> +	req->msg = *msg;
> +	req->err = 0;
> +	req->state = xb_req_state_queued;
> +	init_waitqueue_head(&req->wq);
>  
> -static void transaction_suspend(void)
> -{
> -	mutex_lock(&xs_state.transaction_mutex);
> -	wait_event(xs_state.transaction_wq,
> -		   atomic_read(&xs_state.transaction_count) == 0);
> -}
> +	xs_request_enter(req);
>  
> -static void transaction_resume(void)
> -{
> -	mutex_unlock(&xs_state.transaction_mutex);
> +	req->msg.req_id = xs_request_id++;

Is it safe to do this without a lock?

> +
> +int xenbus_dev_request_and_reply(struct xsd_sockmsg *msg, void *par)
> +{
> +	struct xb_req_data *req;
> +	struct kvec *vec;
> +
> +	req = kmalloc(sizeof(*req) + sizeof(*vec), GFP_KERNEL);

Is there a reason why you are using different flags here?

> @@ -263,11 +295,20 @@ static void *xs_talkv(struct xenbus_transaction t,
>  		      unsigned int num_vecs,
>  		      unsigned int *len)
>  {
> +	struct xb_req_data *req;
>  	struct xsd_sockmsg msg;
>  	void *ret = NULL;
>  	unsigned int i;
>  	int err;
>  
> +	req = kmalloc(sizeof(*req), GFP_NOIO | __GFP_HIGH);
> +	if (!req)
> +		return ERR_PTR(-ENOMEM);
> +
> +	req->vec = iovec;
> +	req->num_vecs = num_vecs;
> +	req->cb = xs_wake_up;
> +
>  	msg.tx_id = t.id;
>  	msg.req_id = 0;

Is this still needed? You are assigning it in xs_send().

> +static int xs_reboot_notify(struct notifier_block *nb,
> +			    unsigned long code, void *unused)
>  {
> -	struct xs_stored_msg *msg;



> +	struct xb_req_data *req;
> +
> +	mutex_lock(&xb_write_mutex);
> +	list_for_each_entry(req, &xs_reply_list, list)
> +		wake_up(&req->wq);
> +	list_for_each_entry(req, &xb_write_list, list)
> +		wake_up(&req->wq);

We are waking up waiters here but there is not guarantee that waiting
threads will have a chance to run, is there?


-boris

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 3/3] xen: optimize xenbus driver for multiple concurrent xenstore accesses
  2017-01-10 22:56   ` Boris Ostrovsky
@ 2017-01-11  5:26     ` Juergen Gross
  2017-01-11 15:29       ` Boris Ostrovsky
  0 siblings, 1 reply; 21+ messages in thread
From: Juergen Gross @ 2017-01-11  5:26 UTC (permalink / raw)
  To: Boris Ostrovsky, linux-kernel, xen-devel

On 10/01/17 23:56, Boris Ostrovsky wrote:
> 
> 
> 
>> diff --git a/drivers/xen/xenbus/xenbus_xs.c b/drivers/xen/xenbus/xenbus_xs.c
>> index ebc768f..ebdfbee 100644
>> --- a/drivers/xen/xenbus/xenbus_xs.c
>> +++ b/drivers/xen/xenbus/xenbus_xs.c
> 
> 
>> -
>> -static struct xs_handle xs_state;
>> +/*
>> + * Framework to protect suspend/resume handling against normal Xenstore
>> + * message handling:
>> + * During suspend/resume there must be no open transaction and no pending
>> + * Xenstore request.
>> + * New watch events happening in this time can be ignored by firing all watches
>> + * after resume.
>> + */
>> +/* Lock protecting enter/exit critical region. */
>> +static DEFINE_SPINLOCK(xs_state_lock);
>> +/* Wait queue for all callers waiting for critical region to become usable. */
>> +static DECLARE_WAIT_QUEUE_HEAD(xs_state_enter_wq);
>> +/* Wait queue for suspend handling waiting for critical region being empty. */
>> +static DECLARE_WAIT_QUEUE_HEAD(xs_state_exit_wq);
>> +/* Number of users in critical region. */
>> +static unsigned int xs_state_users;
>> +/* Suspend handler waiting or already active? */
>> +static int xs_suspend_active;
> 
> I think these two should be declared next to xs_state _lock since they
> are protected by it. Or maybe even put them into some sort of a state
> struct.

I think placing them near the lock and adding a comment is enough.

>> +
>> +
>> +static bool test_reply(struct xb_req_data *req)
>> +{
>> +	if (req->state == xb_req_state_got_reply || !xenbus_ok())
>> +		return true;
>> +
>> +	/* Make sure to reread req->state each time. */
>> +	cpu_relax();
> 
> I don't think I understand why this is needed.

I need a compiler barrier. Otherwise the compiler read req->state only
once outside the while loop.

>> +
>> +	return false;
>> +}
>> +
> 
> 
>> +static void xs_send(struct xb_req_data *req, struct xsd_sockmsg *msg)
>>  {
>> -	mutex_lock(&xs_state.transaction_mutex);
>> -	atomic_inc(&xs_state.transaction_count);
>> -	mutex_unlock(&xs_state.transaction_mutex);
>> -}
>> +	bool notify;
>>  
>> -static void transaction_end(void)
>> -{
>> -	if (atomic_dec_and_test(&xs_state.transaction_count))
>> -		wake_up(&xs_state.transaction_wq);
>> -}
>> +	req->msg = *msg;
>> +	req->err = 0;
>> +	req->state = xb_req_state_queued;
>> +	init_waitqueue_head(&req->wq);
>>  
>> -static void transaction_suspend(void)
>> -{
>> -	mutex_lock(&xs_state.transaction_mutex);
>> -	wait_event(xs_state.transaction_wq,
>> -		   atomic_read(&xs_state.transaction_count) == 0);
>> -}
>> +	xs_request_enter(req);
>>  
>> -static void transaction_resume(void)
>> -{
>> -	mutex_unlock(&xs_state.transaction_mutex);
>> +	req->msg.req_id = xs_request_id++;
> 
> Is it safe to do this without a lock?

You are right: I should move this to xs_request_enter() inside the
lock. I think I'll let return xs_request_enter() the request id.

>> +
>> +int xenbus_dev_request_and_reply(struct xsd_sockmsg *msg, void *par)
>> +{
>> +	struct xb_req_data *req;
>> +	struct kvec *vec;
>> +
>> +	req = kmalloc(sizeof(*req) + sizeof(*vec), GFP_KERNEL);
> 
> Is there a reason why you are using different flags here?

Yes. This function is always called in user context. No need to be
more restrictive.

>> @@ -263,11 +295,20 @@ static void *xs_talkv(struct xenbus_transaction t,
>>  		      unsigned int num_vecs,
>>  		      unsigned int *len)
>>  {
>> +	struct xb_req_data *req;
>>  	struct xsd_sockmsg msg;
>>  	void *ret = NULL;
>>  	unsigned int i;
>>  	int err;
>>  
>> +	req = kmalloc(sizeof(*req), GFP_NOIO | __GFP_HIGH);
>> +	if (!req)
>> +		return ERR_PTR(-ENOMEM);
>> +
>> +	req->vec = iovec;
>> +	req->num_vecs = num_vecs;
>> +	req->cb = xs_wake_up;
>> +
>>  	msg.tx_id = t.id;
>>  	msg.req_id = 0;
> 
> Is this still needed? You are assigning it in xs_send().

Right. Can be removed.

>> +static int xs_reboot_notify(struct notifier_block *nb,
>> +			    unsigned long code, void *unused)
>>  {
>> -	struct xs_stored_msg *msg;
> 
> 
> 
>> +	struct xb_req_data *req;
>> +
>> +	mutex_lock(&xb_write_mutex);
>> +	list_for_each_entry(req, &xs_reply_list, list)
>> +		wake_up(&req->wq);
>> +	list_for_each_entry(req, &xb_write_list, list)
>> +		wake_up(&req->wq);
> 
> We are waking up waiters here but there is not guarantee that waiting
> threads will have a chance to run, is there?

You are right. But this isn't the point. We want to avoid blocking a
reboot due to some needed thread waiting for xenstore. And this task
is being accomplished here.


Juergen

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 3/3] xen: optimize xenbus driver for multiple concurrent xenstore accesses
  2017-01-11  5:26     ` Juergen Gross
@ 2017-01-11 15:29       ` Boris Ostrovsky
  2017-01-11 16:50         ` Juergen Gross
  0 siblings, 1 reply; 21+ messages in thread
From: Boris Ostrovsky @ 2017-01-11 15:29 UTC (permalink / raw)
  To: Juergen Gross, linux-kernel, xen-devel


>>> +
>>> +
>>> +static bool test_reply(struct xb_req_data *req)
>>> +{
>>> +	if (req->state == xb_req_state_got_reply || !xenbus_ok())
>>> +		return true;
>>> +
>>> +	/* Make sure to reread req->state each time. */
>>> +	cpu_relax();
>> I don't think I understand why this is needed.
> I need a compiler barrier. Otherwise the compiler read req->state only
> once outside the while loop.


Then barrier() looks the right primitive to use here. cpu_relax(), while
doing what you want, is intended for other purposes.


>
>>> +
>>> +	return false;
>>> +}
>>> +
>>
>>> +static void xs_send(struct xb_req_data *req, struct xsd_sockmsg *msg)
>>>  {
>>> -	mutex_lock(&xs_state.transaction_mutex);
>>> -	atomic_inc(&xs_state.transaction_count);
>>> -	mutex_unlock(&xs_state.transaction_mutex);
>>> -}
>>> +	bool notify;
>>>  
>>> -static void transaction_end(void)
>>> -{
>>> -	if (atomic_dec_and_test(&xs_state.transaction_count))
>>> -		wake_up(&xs_state.transaction_wq);
>>> -}
>>> +	req->msg = *msg;
>>> +	req->err = 0;
>>> +	req->state = xb_req_state_queued;
>>> +	init_waitqueue_head(&req->wq);
>>>  
>>> -static void transaction_suspend(void)
>>> -{
>>> -	mutex_lock(&xs_state.transaction_mutex);
>>> -	wait_event(xs_state.transaction_wq,
>>> -		   atomic_read(&xs_state.transaction_count) == 0);
>>> -}
>>> +	xs_request_enter(req);
>>>  
>>> -static void transaction_resume(void)
>>> -{
>>> -	mutex_unlock(&xs_state.transaction_mutex);
>>> +	req->msg.req_id = xs_request_id++;
>> Is it safe to do this without a lock?
> You are right: I should move this to xs_request_enter() inside the
> lock. I think I'll let return xs_request_enter() the request id.


Then please move xs_request_id's declaration close to xs_state_lock's
declaration (just like you are going to move the two other state variables)


>
>>> +static int xs_reboot_notify(struct notifier_block *nb,
>>> +			    unsigned long code, void *unused)
>>>  {
>>> -	struct xs_stored_msg *msg;
>>
>>
>>> +	struct xb_req_data *req;
>>> +
>>> +	mutex_lock(&xb_write_mutex);
>>> +	list_for_each_entry(req, &xs_reply_list, list)
>>> +		wake_up(&req->wq);
>>> +	list_for_each_entry(req, &xb_write_list, list)
>>> +		wake_up(&req->wq);
>> We are waking up waiters here but there is not guarantee that waiting
>> threads will have a chance to run, is there?
> You are right. But this isn't the point. We want to avoid blocking a
> reboot due to some needed thread waiting for xenstore. And this task
> is being accomplished here.


I think it's worth adding a comment mentioning this.

-boris

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [PATCH 3/3] xen: optimize xenbus driver for multiple concurrent xenstore accesses
  2017-01-11 15:29       ` Boris Ostrovsky
@ 2017-01-11 16:50         ` Juergen Gross
  0 siblings, 0 replies; 21+ messages in thread
From: Juergen Gross @ 2017-01-11 16:50 UTC (permalink / raw)
  To: Boris Ostrovsky, linux-kernel, xen-devel

On 11/01/17 16:29, Boris Ostrovsky wrote:
> 
>>>> +
>>>> +
>>>> +static bool test_reply(struct xb_req_data *req)
>>>> +{
>>>> +	if (req->state == xb_req_state_got_reply || !xenbus_ok())
>>>> +		return true;
>>>> +
>>>> +	/* Make sure to reread req->state each time. */
>>>> +	cpu_relax();
>>> I don't think I understand why this is needed.
>> I need a compiler barrier. Otherwise the compiler read req->state only
>> once outside the while loop.
> 
> 
> Then barrier() looks the right primitive to use here. cpu_relax(), while
> doing what you want, is intended for other purposes.

Hmm, yes, this sounds better.

>>
>>>> +
>>>> +	return false;
>>>> +}
>>>> +
>>>
>>>> +static void xs_send(struct xb_req_data *req, struct xsd_sockmsg *msg)
>>>>  {
>>>> -	mutex_lock(&xs_state.transaction_mutex);
>>>> -	atomic_inc(&xs_state.transaction_count);
>>>> -	mutex_unlock(&xs_state.transaction_mutex);
>>>> -}
>>>> +	bool notify;
>>>>  
>>>> -static void transaction_end(void)
>>>> -{
>>>> -	if (atomic_dec_and_test(&xs_state.transaction_count))
>>>> -		wake_up(&xs_state.transaction_wq);
>>>> -}
>>>> +	req->msg = *msg;
>>>> +	req->err = 0;
>>>> +	req->state = xb_req_state_queued;
>>>> +	init_waitqueue_head(&req->wq);
>>>>  
>>>> -static void transaction_suspend(void)
>>>> -{
>>>> -	mutex_lock(&xs_state.transaction_mutex);
>>>> -	wait_event(xs_state.transaction_wq,
>>>> -		   atomic_read(&xs_state.transaction_count) == 0);
>>>> -}
>>>> +	xs_request_enter(req);
>>>>  
>>>> -static void transaction_resume(void)
>>>> -{
>>>> -	mutex_unlock(&xs_state.transaction_mutex);
>>>> +	req->msg.req_id = xs_request_id++;
>>> Is it safe to do this without a lock?
>> You are right: I should move this to xs_request_enter() inside the
>> lock. I think I'll let return xs_request_enter() the request id.
> 
> 
> Then please move xs_request_id's declaration close to xs_state_lock's
> declaration (just like you are going to move the two other state variables)

Already done. :-)

>>
>>>> +static int xs_reboot_notify(struct notifier_block *nb,
>>>> +			    unsigned long code, void *unused)
>>>>  {
>>>> -	struct xs_stored_msg *msg;
>>>
>>>
>>>> +	struct xb_req_data *req;
>>>> +
>>>> +	mutex_lock(&xb_write_mutex);
>>>> +	list_for_each_entry(req, &xs_reply_list, list)
>>>> +		wake_up(&req->wq);
>>>> +	list_for_each_entry(req, &xb_write_list, list)
>>>> +		wake_up(&req->wq);
>>> We are waking up waiters here but there is not guarantee that waiting
>>> threads will have a chance to run, is there?
>> You are right. But this isn't the point. We want to avoid blocking a
>> reboot due to some needed thread waiting for xenstore. And this task
>> is being accomplished here.
> 
> 
> I think it's worth adding a comment mentioning this.

Okay.


Juergen

^ permalink raw reply	[flat|nested] 21+ messages in thread

end of thread, other threads:[~2017-01-11 16:50 UTC | newest]

Thread overview: 21+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2017-01-06 15:05 [PATCH 0/3] xen: optimize xenbus performance Juergen Gross
2017-01-06 15:05 ` [PATCH 1/3] xen: clean up xenbus internal headers Juergen Gross
2017-01-06 20:52   ` Boris Ostrovsky
2017-01-09  7:07     ` Juergen Gross
2017-01-06 15:05 ` [PATCH 2/3] xen: modify xenstore watch event interface Juergen Gross
2017-01-06 15:38   ` Paul Durrant
2017-01-06 16:29   ` Wei Liu
2017-01-06 16:37   ` Roger Pau Monné
2017-01-06 21:57   ` Boris Ostrovsky
2017-01-09  7:12     ` Juergen Gross
2017-01-06 15:05 ` [PATCH 3/3] xen: optimize xenbus driver for multiple concurrent xenstore accesses Juergen Gross
2017-01-09 21:17   ` Boris Ostrovsky
2017-01-10  6:18     ` Juergen Gross
2017-01-10 16:36       ` Boris Ostrovsky
2017-01-10 16:38         ` Boris Ostrovsky
2017-01-10 16:46         ` Juergen Gross
2017-01-10 19:17   ` Boris Ostrovsky
2017-01-10 22:56   ` Boris Ostrovsky
2017-01-11  5:26     ` Juergen Gross
2017-01-11 15:29       ` Boris Ostrovsky
2017-01-11 16:50         ` Juergen Gross

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).