All of lore.kernel.org
 help / color / mirror / Atom feed
* [Qemu-devel] [PATCH V2 0/3] Introduce COLO-compare
@ 2016-03-30  8:35 Zhang Chen
  2016-03-30  8:35 ` [Qemu-devel] [PATCH V2 1/3] colo-compare: introduce colo compare initlization Zhang Chen
                   ` (3 more replies)
  0 siblings, 4 replies; 26+ messages in thread
From: Zhang Chen @ 2016-03-30  8:35 UTC (permalink / raw)
  To: qemu devel, Jason Wang
  Cc: Li Zhijian, Gui jianfeng, eddie.dong, zhanghailiang,
	Dr. David Alan Gilbert, Zhang Chen, Yang Hongyang

COLO-compare is a part of COLO project. It is used
to compare the network package to help COLO decide
whether to do checkpoint.

v2:
 - add jhash.h

v1:
 - initial patch


Zhang Chen (3):
  colo-compare: introduce colo compare initlization
  colo-compare: track connection and enqueue packet
  colo-compare: introduce packet comparison thread

 include/qemu/jhash.h |  59 ++++
 net/Makefile.objs    |   1 +
 net/colo-compare.c   | 782 +++++++++++++++++++++++++++++++++++++++++++++++++++
 vl.c                 |   3 +-
 4 files changed, 844 insertions(+), 1 deletion(-)
 create mode 100644 include/qemu/jhash.h
 create mode 100644 net/colo-compare.c

-- 
1.9.1

^ permalink raw reply	[flat|nested] 26+ messages in thread

* [Qemu-devel] [PATCH V2 1/3] colo-compare: introduce colo compare initlization
  2016-03-30  8:35 [Qemu-devel] [PATCH V2 0/3] Introduce COLO-compare Zhang Chen
@ 2016-03-30  8:35 ` Zhang Chen
  2016-03-30  9:25   ` Dr. David Alan Gilbert
  2016-03-30  8:35 ` [Qemu-devel] [PATCH V2 2/3] colo-compare: track connection and enqueue packet Zhang Chen
                   ` (2 subsequent siblings)
  3 siblings, 1 reply; 26+ messages in thread
From: Zhang Chen @ 2016-03-30  8:35 UTC (permalink / raw)
  To: qemu devel, Jason Wang
  Cc: Li Zhijian, Gui jianfeng, eddie.dong, zhanghailiang,
	Dr. David Alan Gilbert, Zhang Chen, Yang Hongyang

packet come from primary char indev will be send to
outdev - packet come from secondary char dev will be drop

Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
---
 net/Makefile.objs  |   1 +
 net/colo-compare.c | 344 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 vl.c               |   3 +-
 3 files changed, 347 insertions(+), 1 deletion(-)
 create mode 100644 net/colo-compare.c

diff --git a/net/Makefile.objs b/net/Makefile.objs
index b7c22fd..ba92f73 100644
--- a/net/Makefile.objs
+++ b/net/Makefile.objs
@@ -16,3 +16,4 @@ common-obj-$(CONFIG_NETMAP) += netmap.o
 common-obj-y += filter.o
 common-obj-y += filter-buffer.o
 common-obj-y += filter-mirror.o
+common-obj-y += colo-compare.o
diff --git a/net/colo-compare.c b/net/colo-compare.c
new file mode 100644
index 0000000..62c66df
--- /dev/null
+++ b/net/colo-compare.c
@@ -0,0 +1,344 @@
+/*
+ * Copyright (c) 2016 FUJITSU LIMITED
+ * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later.  See the COPYING file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu-common.h"
+#include "qapi/qmp/qerror.h"
+#include "qemu/error-report.h"
+
+#include "net/net.h"
+#include "net/vhost_net.h"
+#include "qom/object_interfaces.h"
+#include "qemu/iov.h"
+#include "qom/object.h"
+#include "qemu/typedefs.h"
+#include "net/queue.h"
+#include "sysemu/char.h"
+#include "qemu/sockets.h"
+
+#define TYPE_COLO_COMPARE "colo-compare"
+#define COLO_COMPARE(obj) \
+    OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
+
+#define COMPARE_READ_LEN_MAX NET_BUFSIZE
+
+static QTAILQ_HEAD(, CompareState) net_compares =
+       QTAILQ_HEAD_INITIALIZER(net_compares);
+
+typedef struct ReadState {
+    int state; /* 0 = getting length, 1 = getting data */
+    unsigned int index;
+    unsigned int packet_len;
+    uint8_t buf[COMPARE_READ_LEN_MAX];
+} ReadState;
+
+typedef struct CompareState {
+    Object parent;
+
+    char *pri_indev;
+    char *sec_indev;
+    char *outdev;
+    CharDriverState *chr_pri_in;
+    CharDriverState *chr_sec_in;
+    CharDriverState *chr_out;
+    QTAILQ_ENTRY(CompareState) next;
+    ReadState pri_rs;
+    ReadState sec_rs;
+} CompareState;
+
+static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size)
+{
+    int ret = 0;
+    uint32_t len = htonl(size);
+
+    if (!size) {
+        return 0;
+    }
+
+    ret = qemu_chr_fe_write_all(out, (uint8_t *)&len, sizeof(len));
+    if (ret != sizeof(len)) {
+        goto err;
+    }
+
+    ret = qemu_chr_fe_write_all(out, (uint8_t *)buf, size);
+    if (ret != size) {
+        goto err;
+    }
+
+    return 0;
+
+err:
+    return ret < 0 ? ret : -EIO;
+}
+
+static int compare_chr_can_read(void *opaque)
+{
+    return COMPARE_READ_LEN_MAX;
+}
+
+/* Returns
+ * 0: readstate is not ready
+ * 1: readstate is ready
+ * otherwise error occurs
+ */
+static int compare_chr_fill_rstate(ReadState *rs, const uint8_t *buf, int size)
+{
+    unsigned int l;
+    while (size > 0) {
+        /* reassemble a packet from the network */
+        switch (rs->state) { /* 0 = getting length, 1 = getting data */
+        case 0:
+            l = 4 - rs->index;
+            if (l > size) {
+                l = size;
+            }
+            memcpy(rs->buf + rs->index, buf, l);
+            buf += l;
+            size -= l;
+            rs->index += l;
+            if (rs->index == 4) {
+                /* got length */
+                rs->packet_len = ntohl(*(uint32_t *)rs->buf);
+                rs->index = 0;
+                rs->state = 1;
+            }
+            break;
+        case 1:
+            l = rs->packet_len - rs->index;
+            if (l > size) {
+                l = size;
+            }
+            if (rs->index + l <= sizeof(rs->buf)) {
+                memcpy(rs->buf + rs->index, buf, l);
+            } else {
+                error_report("serious error: oversized packet received.");
+                rs->index = rs->state = 0;
+                return -1;
+            }
+
+            rs->index += l;
+            buf += l;
+            size -= l;
+            if (rs->index >= rs->packet_len) {
+                rs->index = 0;
+                rs->state = 0;
+                return 1;
+            }
+            break;
+        }
+    }
+    return 0;
+}
+
+static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
+{
+    CompareState *s = COLO_COMPARE(opaque);
+    int ret;
+
+    ret = compare_chr_fill_rstate(&s->pri_rs, buf, size);
+    if (ret == 1) {
+        /* FIXME: enqueue to primary packet list */
+        compare_chr_send(s->chr_out, buf, size);
+    } else if (ret == -1) {
+        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
+    }
+}
+
+static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
+{
+    CompareState *s = COLO_COMPARE(opaque);
+    int ret;
+
+    ret = compare_chr_fill_rstate(&s->sec_rs, buf, size);
+    if (ret == 1) {
+        /* TODO: enqueue to secondary packet list*/
+    } else if (ret == -1) {
+        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
+    }
+}
+
+static char *compare_get_pri_indev(Object *obj, Error **errp)
+{
+    CompareState *s = COLO_COMPARE(obj);
+
+    return g_strdup(s->pri_indev);
+}
+
+static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
+{
+    CompareState *s = COLO_COMPARE(obj);
+
+    g_free(s->pri_indev);
+    s->pri_indev = g_strdup(value);
+}
+
+static char *compare_get_sec_indev(Object *obj, Error **errp)
+{
+    CompareState *s = COLO_COMPARE(obj);
+
+    return g_strdup(s->sec_indev);
+}
+
+static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
+{
+    CompareState *s = COLO_COMPARE(obj);
+
+    g_free(s->sec_indev);
+    s->sec_indev = g_strdup(value);
+}
+
+static char *compare_get_outdev(Object *obj, Error **errp)
+{
+    CompareState *s = COLO_COMPARE(obj);
+
+    return g_strdup(s->outdev);
+}
+
+static void compare_set_outdev(Object *obj, const char *value, Error **errp)
+{
+    CompareState *s = COLO_COMPARE(obj);
+
+    g_free(s->outdev);
+    s->outdev = g_strdup(value);
+}
+
+static void colo_compare_complete(UserCreatable *uc, Error **errp)
+{
+    CompareState *s = COLO_COMPARE(uc);
+
+    if (!s->pri_indev || !s->sec_indev || !s->outdev) {
+        error_setg(errp, "colo compare needs 'primary_in' ,"
+                   "'secondary_in','outdev' property set");
+        return;
+    } else if (!strcmp(s->pri_indev, s->outdev) ||
+               !strcmp(s->sec_indev, s->outdev) ||
+               !strcmp(s->pri_indev, s->sec_indev)) {
+        error_setg(errp, "'indev' and 'outdev' could not be same "
+                   "for compare module");
+        return;
+    }
+
+    s->chr_pri_in = qemu_chr_find(s->pri_indev);
+    if (s->chr_pri_in == NULL) {
+        error_set(errp, ERROR_CLASS_DEVICE_NOT_FOUND,
+                  "IN Device '%s' not found", s->pri_indev);
+        goto out;
+    }
+
+    qemu_chr_fe_claim_no_fail(s->chr_pri_in);
+    qemu_chr_add_handlers(s->chr_pri_in, compare_chr_can_read,
+                          compare_pri_chr_in, NULL, s);
+
+    s->chr_sec_in = qemu_chr_find(s->sec_indev);
+    if (s->chr_sec_in == NULL) {
+        error_set(errp, ERROR_CLASS_DEVICE_NOT_FOUND,
+                  "IN Device '%s' not found", s->sec_indev);
+        goto out;
+    }
+
+    qemu_chr_fe_claim_no_fail(s->chr_sec_in);
+    qemu_chr_add_handlers(s->chr_sec_in, compare_chr_can_read,
+                          compare_sec_chr_in, NULL, s);
+
+    s->chr_out = qemu_chr_find(s->outdev);
+    if (s->chr_out == NULL) {
+        error_set(errp, ERROR_CLASS_DEVICE_NOT_FOUND,
+                  "OUT Device '%s' not found", s->outdev);
+        goto out;
+    }
+    qemu_chr_fe_claim_no_fail(s->chr_out);
+
+    QTAILQ_INSERT_TAIL(&net_compares, s, next);
+
+    return;
+
+out:
+    if (s->chr_pri_in) {
+        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
+        qemu_chr_fe_release(s->chr_pri_in);
+        s->chr_pri_in = NULL;
+    }
+    if (s->chr_sec_in) {
+        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
+        qemu_chr_fe_release(s->chr_sec_in);
+        s->chr_pri_in = NULL;
+    }
+}
+
+static void colo_compare_class_init(ObjectClass *oc, void *data)
+{
+    UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
+
+    ucc->complete = colo_compare_complete;
+}
+
+static void colo_compare_class_finalize(ObjectClass *oc, void *data)
+{
+    UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
+    CompareState *s = COLO_COMPARE(ucc);
+
+    if (s->chr_pri_in) {
+        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
+        qemu_chr_fe_release(s->chr_pri_in);
+    }
+    if (s->chr_sec_in) {
+        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
+        qemu_chr_fe_release(s->chr_sec_in);
+    }
+    if (s->chr_out) {
+        qemu_chr_fe_release(s->chr_out);
+    }
+
+    if (!QTAILQ_EMPTY(&net_compares)) {
+        QTAILQ_REMOVE(&net_compares, s, next);
+    }
+}
+
+static void colo_compare_init(Object *obj)
+{
+    object_property_add_str(obj, "primary_in",
+                            compare_get_pri_indev, compare_set_pri_indev,
+                            NULL);
+    object_property_add_str(obj, "secondary_in",
+                            compare_get_sec_indev, compare_set_sec_indev,
+                            NULL);
+    object_property_add_str(obj, "outdev",
+                            compare_get_outdev, compare_set_outdev,
+                            NULL);
+}
+
+static void colo_compare_finalize(Object *obj)
+{
+    CompareState *s = COLO_COMPARE(obj);
+
+    g_free(s->pri_indev);
+    g_free(s->sec_indev);
+    g_free(s->outdev);
+}
+
+static const TypeInfo colo_compare_info = {
+    .name = TYPE_COLO_COMPARE,
+    .parent = TYPE_OBJECT,
+    .instance_size = sizeof(CompareState),
+    .instance_init = colo_compare_init,
+    .instance_finalize = colo_compare_finalize,
+    .class_size = sizeof(CompareState),
+    .class_init = colo_compare_class_init,
+    .class_finalize = colo_compare_class_finalize,
+    .interfaces = (InterfaceInfo[]) {
+        { TYPE_USER_CREATABLE },
+        { }
+    }
+};
+
+static void register_types(void)
+{
+    type_register_static(&colo_compare_info);
+}
+
+type_init(register_types);
diff --git a/vl.c b/vl.c
index dc6e63a..70064ad 100644
--- a/vl.c
+++ b/vl.c
@@ -2842,7 +2842,8 @@ static bool object_create_initial(const char *type)
     if (g_str_equal(type, "filter-buffer") ||
         g_str_equal(type, "filter-dump") ||
         g_str_equal(type, "filter-mirror") ||
-        g_str_equal(type, "filter-redirector")) {
+        g_str_equal(type, "filter-redirector") ||
+        g_str_equal(type, "colo-compare")) {
         return false;
     }
 
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 26+ messages in thread

* [Qemu-devel] [PATCH V2 2/3] colo-compare: track connection and enqueue packet
  2016-03-30  8:35 [Qemu-devel] [PATCH V2 0/3] Introduce COLO-compare Zhang Chen
  2016-03-30  8:35 ` [Qemu-devel] [PATCH V2 1/3] colo-compare: introduce colo compare initlization Zhang Chen
@ 2016-03-30  8:35 ` Zhang Chen
  2016-03-30 10:36   ` Dr. David Alan Gilbert
  2016-03-30  8:35 ` [Qemu-devel] [PATCH V2 3/3] colo-compare: introduce packet comparison thread Zhang Chen
  2016-03-30 12:05 ` [Qemu-devel] [PATCH V2 0/3] Introduce COLO-compare Dr. David Alan Gilbert
  3 siblings, 1 reply; 26+ messages in thread
From: Zhang Chen @ 2016-03-30  8:35 UTC (permalink / raw)
  To: qemu devel, Jason Wang
  Cc: Li Zhijian, Gui jianfeng, eddie.dong, zhanghailiang,
	Dr. David Alan Gilbert, Zhang Chen, Yang Hongyang

In this patch we use kernel jhash table to track
connection, and then enqueue net packet like this:

+ CompareState ++
|               |
+---------------+   +---------------+         +---------------+
|conn list      +--->conn           +--------->conn           |
+---------------+   +---------------+         +---------------+
|               |     |           |             |          |
+---------------+ +---v----+  +---v----+    +---v----+ +---v----+
                  |primary |  |secondary    |primary | |secondary
                  |packet  |  |packet  +    |packet  | |packet  +
                  +--------+  +--------+    +--------+ +--------+
                      |           |             |          |
                  +---v----+  +---v----+    +---v----+ +---v----+
                  |primary |  |secondary    |primary | |secondary
                  |packet  |  |packet  +    |packet  | |packet  +
                  +--------+  +--------+    +--------+ +--------+
                      |           |             |          |
                  +---v----+  +---v----+    +---v----+ +---v----+
                  |primary |  |secondary    |primary | |secondary
                  |packet  |  |packet  +    |packet  | |packet  +
                  +--------+  +--------+    +--------+ +--------+

Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
---
 include/qemu/jhash.h |  59 ++++++++++
 net/colo-compare.c   | 324 ++++++++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 380 insertions(+), 3 deletions(-)
 create mode 100644 include/qemu/jhash.h

diff --git a/include/qemu/jhash.h b/include/qemu/jhash.h
new file mode 100644
index 0000000..8a8ff0f
--- /dev/null
+++ b/include/qemu/jhash.h
@@ -0,0 +1,59 @@
+/* jhash.h: Jenkins hash support.
+  *
+  * Copyright (C) 2006. Bob Jenkins (bob_jenkins@burtleburtle.net)
+  *
+  * http://burtleburtle.net/bob/hash/
+  *
+  * These are the credits from Bob's sources:
+  *
+  * lookup3.c, by Bob Jenkins, May 2006, Public Domain.
+  *
+  * These are functions for producing 32-bit hashes for hash table lookup.
+  * hashword(), hashlittle(), hashlittle2(), hashbig(), mix(), and final()
+  * are externally useful functions.  Routines to test the hash are included
+  * if SELF_TEST is defined.  You can use this free for any purpose.It's in
+  * the public domain.  It has no warranty.
+  *
+  * Copyright (C) 2009-2010 Jozsef Kadlecsik (kadlec@blackhole.kfki.hu)
+  *
+  * I've modified Bob's hash to be useful in the Linux kernel, and
+  * any bugs present are my fault.
+  * Jozsef
+  */
+
+#ifndef QEMU_JHASH_H__
+#define QEMU_JHASH_H__
+
+#include "qemu/bitops.h"
+
+/*
+ * hashtable related is copied from linux kernel jhash
+ */
+
+/* __jhash_mix -- mix 3 32-bit values reversibly. */
+#define __jhash_mix(a, b, c)                \
+{                                           \
+    a -= c;  a ^= rol32(c, 4);  c += b;     \
+    b -= a;  b ^= rol32(a, 6);  a += c;     \
+    c -= b;  c ^= rol32(b, 8);  b += a;     \
+    a -= c;  a ^= rol32(c, 16); c += b;     \
+    b -= a;  b ^= rol32(a, 19); a += c;     \
+    c -= b;  c ^= rol32(b, 4);  b += a;     \
+}
+
+/* __jhash_final - final mixing of 3 32-bit values (a,b,c) into c */
+#define __jhash_final(a, b, c)  \
+{                               \
+    c ^= b; c -= rol32(b, 14);  \
+    a ^= c; a -= rol32(c, 11);  \
+    b ^= a; b -= rol32(a, 25);  \
+    c ^= b; c -= rol32(b, 16);  \
+    a ^= c; a -= rol32(c, 4);   \
+    b ^= a; b -= rol32(a, 14);  \
+    c ^= b; c -= rol32(b, 24);  \
+}
+
+/* An arbitrary initial parameter */
+#define JHASH_INITVAL           0xdeadbeef
+
+#endif /* QEMU_JHASH_H__ */
diff --git a/net/colo-compare.c b/net/colo-compare.c
index 62c66df..0bb5a51 100644
--- a/net/colo-compare.c
+++ b/net/colo-compare.c
@@ -20,15 +20,22 @@
 #include "net/queue.h"
 #include "sysemu/char.h"
 #include "qemu/sockets.h"
+#include <sys/sysinfo.h>
+#include "slirp/slirp.h"
+#include "qemu/jhash.h"
+#include <sys/sysinfo.h>
 
 #define TYPE_COLO_COMPARE "colo-compare"
 #define COLO_COMPARE(obj) \
     OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
 
 #define COMPARE_READ_LEN_MAX NET_BUFSIZE
+#define PAGE_SIZE 4096
+#define ETH_HLEN 14
 
 static QTAILQ_HEAD(, CompareState) net_compares =
        QTAILQ_HEAD_INITIALIZER(net_compares);
+static ssize_t hashtable_max_size;
 
 typedef struct ReadState {
     int state; /* 0 = getting length, 1 = getting data */
@@ -37,6 +44,28 @@ typedef struct ReadState {
     uint8_t buf[COMPARE_READ_LEN_MAX];
 } ReadState;
 
+/*
+  + CompareState ++
+  |               |
+  +---------------+   +---------------+         +---------------+
+  |conn list      +--->conn           +--------->conn           |
+  +---------------+   +---------------+         +---------------+
+  |               |     |           |             |          |
+  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
+                    |primary |  |secondary    |primary | |secondary
+                    |packet  |  |packet  +    |packet  | |packet  +
+                    +--------+  +--------+    +--------+ +--------+
+                        |           |             |          |
+                    +---v----+  +---v----+    +---v----+ +---v----+
+                    |primary |  |secondary    |primary | |secondary
+                    |packet  |  |packet  +    |packet  | |packet  +
+                    +--------+  +--------+    +--------+ +--------+
+                        |           |             |          |
+                    +---v----+  +---v----+    +---v----+ +---v----+
+                    |primary |  |secondary    |primary | |secondary
+                    |packet  |  |packet  +    |packet  | |packet  +
+                    +--------+  +--------+    +--------+ +--------+
+*/
 typedef struct CompareState {
     Object parent;
 
@@ -49,8 +78,268 @@ typedef struct CompareState {
     QTAILQ_ENTRY(CompareState) next;
     ReadState pri_rs;
     ReadState sec_rs;
+
+    /* connection list: the connections belonged to this NIC could be found
+     * in this list.
+     * element type: Connection
+     */
+    GQueue conn_list;
+    QemuMutex conn_list_lock; /* to protect conn_list */
+    /* hashtable to save connection */
+    GHashTable *connection_track_table;
+    /* to save unprocessed_connections */
+    GQueue unprocessed_connections;
+    /* proxy current hash size */
+    ssize_t hashtable_size;
 } CompareState;
 
+typedef struct Packet {
+    void *data;
+    union {
+        uint8_t *network_layer;
+        struct ip *ip;
+    };
+    uint8_t *transport_layer;
+    int size;
+    CompareState *s;
+} Packet;
+
+typedef struct ConnectionKey {
+    /* (src, dst) must be grouped, in the same way than in IP header */
+    struct in_addr src;
+    struct in_addr dst;
+    uint16_t src_port;
+    uint16_t dst_port;
+    uint8_t ip_proto;
+} QEMU_PACKED ConnectionKey;
+
+typedef struct Connection {
+    QemuMutex list_lock;
+    /* connection primary send queue: element type: Packet */
+    GQueue primary_list;
+    /* connection secondary send queue: element type: Packet */
+    GQueue secondary_list;
+    /* flag to enqueue unprocessed_connections */
+    bool processing;
+    int ip_proto;
+} Connection;
+
+enum {
+    PRIMARY_IN = 0,
+    SECONDARY_IN,
+};
+
+static void packet_destroy(void *opaque, void *user_data);
+static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size);
+
+static uint32_t connection_key_hash(const void *opaque)
+{
+    const ConnectionKey *key = opaque;
+    uint32_t a, b, c;
+
+    /* Jenkins hash */
+    a = b = c = JHASH_INITVAL + sizeof(*key);
+    a += key->src.s_addr;
+    b += key->dst.s_addr;
+    c += (key->src_port | key->dst_port << 16);
+    __jhash_mix(a, b, c);
+
+    a += key->ip_proto;
+    __jhash_final(a, b, c);
+
+    return c;
+}
+
+static int connection_key_equal(const void *opaque1, const void *opaque2)
+{
+    return memcmp(opaque1, opaque2, sizeof(ConnectionKey)) == 0;
+}
+
+/*
+ *  initialize connecon_key for packet
+ *  Return 0 on success, if return 1 the pkt will be sent later
+ */
+static int connection_key_init(Packet *pkt, ConnectionKey *key)
+{
+    int network_length;
+    uint8_t *data = pkt->data;
+    uint16_t l3_proto;
+    uint32_t tmp_ports;
+    ssize_t l2hdr_len = eth_get_l2_hdr_length(data);
+
+    pkt->network_layer = data + ETH_HLEN;
+    l3_proto = eth_get_l3_proto(data, l2hdr_len);
+    if (l3_proto != ETH_P_IP) {
+        return 1;
+    }
+
+    network_length = pkt->ip->ip_hl * 4;
+    pkt->transport_layer = pkt->network_layer + network_length;
+    key->ip_proto = pkt->ip->ip_p;
+    key->src = pkt->ip->ip_src;
+    key->dst = pkt->ip->ip_dst;
+
+    switch (key->ip_proto) {
+    case IPPROTO_TCP:
+    case IPPROTO_UDP:
+    case IPPROTO_DCCP:
+    case IPPROTO_ESP:
+    case IPPROTO_SCTP:
+    case IPPROTO_UDPLITE:
+        tmp_ports = *(uint32_t *)(pkt->transport_layer);
+        key->src_port = tmp_ports & 0xffff;
+        key->dst_port = tmp_ports >> 16;
+        break;
+    case IPPROTO_AH:
+        tmp_ports = *(uint32_t *)(pkt->transport_layer + 4);
+        key->src_port = tmp_ports & 0xffff;
+        key->dst_port = tmp_ports >> 16;
+        break;
+    default:
+        break;
+    }
+
+    return 0;
+}
+
+static Connection *connection_new(ConnectionKey *key)
+{
+    Connection *conn = g_slice_new(Connection);
+
+    qemu_mutex_init(&conn->list_lock);
+    conn->ip_proto = key->ip_proto;
+    conn->processing = false;
+    g_queue_init(&conn->primary_list);
+    g_queue_init(&conn->secondary_list);
+
+    return conn;
+}
+
+/*
+ * Clear hashtable, stop this hash growing really huge
+ */
+static void connection_hashtable_reset(CompareState *s)
+{
+    s->hashtable_size = 0;
+    g_hash_table_remove_all(s->connection_track_table);
+}
+
+/* if not found, creata a new connection and add to hash table */
+static Connection *connection_get(CompareState *s, ConnectionKey *key)
+{
+    /* FIXME: protect connection_track_table */
+    Connection *conn = g_hash_table_lookup(s->connection_track_table, key);
+
+    if (conn == NULL) {
+        ConnectionKey *new_key = g_memdup(key, sizeof(*key));
+
+        conn = connection_new(key);
+
+        s->hashtable_size++;
+        if (s->hashtable_size > hashtable_max_size) {
+            error_report("colo proxy connection hashtable full, clear it");
+            connection_hashtable_reset(s);
+            /* TODO:clear conn_list */
+        } else {
+            g_hash_table_insert(s->connection_track_table, new_key, conn);
+        }
+    }
+
+     return conn;
+}
+
+static void connection_destroy(void *opaque)
+{
+    Connection *conn = opaque;
+
+    qemu_mutex_lock(&conn->list_lock);
+    g_queue_foreach(&conn->primary_list, packet_destroy, NULL);
+    g_queue_free(&conn->primary_list);
+    g_queue_foreach(&conn->secondary_list, packet_destroy, NULL);
+    g_queue_free(&conn->secondary_list);
+    qemu_mutex_unlock(&conn->list_lock);
+    qemu_mutex_destroy(&conn->list_lock);
+    g_slice_free(Connection, conn);
+}
+
+static Packet *packet_new(CompareState *s, const void *data,
+                              int size, ConnectionKey *key)
+{
+    Packet *pkt = g_slice_new(Packet);
+
+    pkt->data = g_memdup(data, size);
+    pkt->size = size;
+    pkt->s = s;
+
+    if (connection_key_init(pkt, key)) {
+        packet_destroy(pkt, NULL);
+        pkt = NULL;
+    }
+
+    return pkt;
+}
+
+static int packet_enqueue(CompareState *s, int mode)
+{
+    ConnectionKey key = {{ 0 } };
+    Packet *pkt = NULL;
+    Connection *conn;
+
+    /* arp packet will be sent */
+    if (mode == PRIMARY_IN) {
+        pkt = packet_new(s, s->pri_rs.buf, s->pri_rs.packet_len, &key);
+    } else {
+        pkt = packet_new(s, s->sec_rs.buf, s->sec_rs.packet_len, &key);
+    }
+    if (!pkt) {
+        return -1;
+    }
+
+    conn = connection_get(s, &key);
+    if (!conn->processing) {
+        qemu_mutex_lock(&s->conn_list_lock);
+        g_queue_push_tail(&s->conn_list, conn);
+        qemu_mutex_unlock(&s->conn_list_lock);
+        conn->processing = true;
+    }
+
+    qemu_mutex_lock(&conn->list_lock);
+    if (mode == PRIMARY_IN) {
+        g_queue_push_tail(&conn->primary_list, pkt);
+    } else {
+        g_queue_push_tail(&conn->secondary_list, pkt);
+    }
+    qemu_mutex_unlock(&conn->list_lock);
+
+    return 0;
+}
+
+static void packet_destroy(void *opaque, void *user_data)
+{
+    Packet *pkt = opaque;
+
+    g_free(pkt->data);
+    g_slice_free(Packet, pkt);
+}
+
+static inline void colo_flush_connection(void *opaque, void *user_data)
+{
+    Connection *conn = opaque;
+    Packet *pkt = NULL;
+
+    qemu_mutex_lock(&conn->list_lock);
+    while (!g_queue_is_empty(&conn->primary_list)) {
+        pkt = g_queue_pop_head(&conn->primary_list);
+        compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size);
+        /* FIXME: destroy pkt ?*/
+    }
+    while (!g_queue_is_empty(&conn->secondary_list)) {
+        pkt = g_queue_pop_head(&conn->secondary_list);
+        packet_destroy(pkt, NULL);
+    }
+    qemu_mutex_unlock(&conn->list_lock);
+}
+
 static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size)
 {
     int ret = 0;
@@ -142,8 +431,10 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
 
     ret = compare_chr_fill_rstate(&s->pri_rs, buf, size);
     if (ret == 1) {
-        /* FIXME: enqueue to primary packet list */
-        compare_chr_send(s->chr_out, buf, size);
+        if (packet_enqueue(s, PRIMARY_IN)) {
+            error_report("primary: unsupported packet in");
+            compare_chr_send(s->chr_out, buf, size);
+        }
     } else if (ret == -1) {
         qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
     }
@@ -156,7 +447,9 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
 
     ret = compare_chr_fill_rstate(&s->sec_rs, buf, size);
     if (ret == 1) {
-        /* TODO: enqueue to secondary packet list*/
+        if (packet_enqueue(s, SECONDARY_IN)) {
+            error_report("secondary: unsupported packet in");
+        }
     } else if (ret == -1) {
         qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
     }
@@ -210,6 +503,7 @@ static void compare_set_outdev(Object *obj, const char *value, Error **errp)
 static void colo_compare_complete(UserCreatable *uc, Error **errp)
 {
     CompareState *s = COLO_COMPARE(uc);
+    struct sysinfo si;
 
     if (!s->pri_indev || !s->sec_indev || !s->outdev) {
         error_setg(errp, "colo compare needs 'primary_in' ,"
@@ -255,6 +549,29 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
 
     QTAILQ_INSERT_TAIL(&net_compares, s, next);
 
+    g_queue_init(&s->conn_list);
+    qemu_mutex_init(&s->conn_list_lock);
+
+    s->hashtable_size = 0;
+    /*
+     * Idea from kernel tcp.c: use 1/16384 of memory.  On i386: 32MB
+     * machine has 512 buckets. >= 1GB machines have 16384 buckets.
+     */
+    sysinfo(&si);
+    hashtable_max_size = si.totalram / 16384;
+    if (si.totalram > (1024 * 1024 * 1024 / PAGE_SIZE)) {
+        hashtable_max_size = 16384;
+    }
+    if (hashtable_max_size < 32) {
+        hashtable_max_size = 32;
+    }
+    hashtable_max_size = hashtable_max_size * 8; /* default factor = 8 */
+
+    s->connection_track_table = g_hash_table_new_full(connection_key_hash,
+                                                      connection_key_equal,
+                                                      g_free,
+                                                      connection_destroy);
+
     return;
 
 out:
@@ -297,6 +614,7 @@ static void colo_compare_class_finalize(ObjectClass *oc, void *data)
     if (!QTAILQ_EMPTY(&net_compares)) {
         QTAILQ_REMOVE(&net_compares, s, next);
     }
+    qemu_mutex_destroy(&s->conn_list_lock);
 }
 
 static void colo_compare_init(Object *obj)
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 26+ messages in thread

* [Qemu-devel] [PATCH V2 3/3] colo-compare: introduce packet comparison thread
  2016-03-30  8:35 [Qemu-devel] [PATCH V2 0/3] Introduce COLO-compare Zhang Chen
  2016-03-30  8:35 ` [Qemu-devel] [PATCH V2 1/3] colo-compare: introduce colo compare initlization Zhang Chen
  2016-03-30  8:35 ` [Qemu-devel] [PATCH V2 2/3] colo-compare: track connection and enqueue packet Zhang Chen
@ 2016-03-30  8:35 ` Zhang Chen
  2016-03-30 11:41   ` Dr. David Alan Gilbert
  2016-03-30 12:05 ` [Qemu-devel] [PATCH V2 0/3] Introduce COLO-compare Dr. David Alan Gilbert
  3 siblings, 1 reply; 26+ messages in thread
From: Zhang Chen @ 2016-03-30  8:35 UTC (permalink / raw)
  To: qemu devel, Jason Wang
  Cc: Li Zhijian, Gui jianfeng, eddie.dong, zhanghailiang,
	Dr. David Alan Gilbert, Zhang Chen, Yang Hongyang

if packets are same, we send primary packet and drop secondary
packet, otherwise notify COLO do checkpoint.

Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
---
 net/colo-compare.c | 122 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 121 insertions(+), 1 deletion(-)

diff --git a/net/colo-compare.c b/net/colo-compare.c
index 0bb5a51..1debc0e 100644
--- a/net/colo-compare.c
+++ b/net/colo-compare.c
@@ -36,6 +36,7 @@
 static QTAILQ_HEAD(, CompareState) net_compares =
        QTAILQ_HEAD_INITIALIZER(net_compares);
 static ssize_t hashtable_max_size;
+static int colo_need_checkpoint;
 
 typedef struct ReadState {
     int state; /* 0 = getting length, 1 = getting data */
@@ -91,6 +92,13 @@ typedef struct CompareState {
     GQueue unprocessed_connections;
     /* proxy current hash size */
     ssize_t hashtable_size;
+
+    /* notify compare thread */
+    QemuEvent event;
+    /* compare thread, a thread for each NIC */
+    QemuThread thread;
+    int thread_status;
+
 } CompareState;
 
 typedef struct Packet {
@@ -129,6 +137,15 @@ enum {
     SECONDARY_IN,
 };
 
+enum {
+    /* compare thread isn't started */
+    COMPARE_THREAD_NONE,
+    /* compare thread is running */
+    COMPARE_THREAD_RUNNING,
+    /* compare thread exit */
+    COMPARE_THREAD_EXIT,
+};
+
 static void packet_destroy(void *opaque, void *user_data);
 static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size);
 
@@ -340,6 +357,88 @@ static inline void colo_flush_connection(void *opaque, void *user_data)
     qemu_mutex_unlock(&conn->list_lock);
 }
 
+static void colo_notify_checkpoint(void)
+{
+    colo_need_checkpoint = true;
+}
+
+/* TODO colo_do_checkpoint() {
+ * we flush the connections and reset 'colo_need_checkpoint'
+ * }
+ */
+
+static inline void colo_dump_packet(Packet *pkt)
+{
+    int i;
+    for (i = 0; i < pkt->size; i++) {
+        printf("%02x ", ((uint8_t *)pkt->data)[i]);
+    }
+    printf("\n");
+}
+
+/*
+ * The IP packets sent by primary and secondary
+ * will be compared in here
+ * TODO support ip fragment, Out-Of-Order
+ * return:    0  means packet same
+ *            > 0 || < 0 means packet different
+ */
+static int colo_packet_compare(Packet *ppkt, Packet *spkt)
+{
+    colo_dump_packet(ppkt);
+    colo_dump_packet(spkt);
+
+    if (ppkt->size == spkt->size) {
+        return memcmp(ppkt->data, spkt->data, spkt->size);
+    } else {
+        return -1;
+    }
+}
+
+static void colo_compare_connection(void *opaque, void *user_data)
+{
+    Connection *conn = opaque;
+    Packet *pkt = NULL;
+    GList *result = NULL;
+    int ret;
+
+    qemu_mutex_lock(&conn->list_lock);
+    while (!g_queue_is_empty(&conn->primary_list) &&
+           !g_queue_is_empty(&conn->secondary_list)) {
+        pkt = g_queue_pop_head(&conn->primary_list);
+        result = g_queue_find_custom(&conn->secondary_list,
+                              pkt, (GCompareFunc)colo_packet_compare);
+
+        if (result) {
+            ret = compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size);
+            if (ret < 0) {
+                error_report("colo_send_primary_packet failed");
+            }
+            g_queue_remove(&conn->secondary_list, result);
+        } else {
+            g_queue_push_head(&conn->primary_list, pkt);
+            colo_notify_checkpoint();
+            break;
+        }
+    }
+    qemu_mutex_unlock(&conn->list_lock);
+}
+
+static void *colo_compare_thread(void *opaque)
+{
+    CompareState *s = opaque;
+
+    while (s->thread_status == COMPARE_THREAD_RUNNING) {
+        qemu_event_wait(&s->event);
+        qemu_event_reset(&s->event);
+        qemu_mutex_lock(&s->conn_list_lock);
+        g_queue_foreach(&s->conn_list, colo_compare_connection, NULL);
+        qemu_mutex_unlock(&s->conn_list_lock);
+    }
+
+    return NULL;
+}
+
 static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size)
 {
     int ret = 0;
@@ -433,7 +532,9 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
     if (ret == 1) {
         if (packet_enqueue(s, PRIMARY_IN)) {
             error_report("primary: unsupported packet in");
-            compare_chr_send(s->chr_out, buf, size);
+            compare_chr_send(s->chr_out, s->pri_rs.buf, s->pri_rs.packet_len);
+        } else {
+            qemu_event_set(&s->event);
         }
     } else if (ret == -1) {
         qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
@@ -449,6 +550,8 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
     if (ret == 1) {
         if (packet_enqueue(s, SECONDARY_IN)) {
             error_report("secondary: unsupported packet in");
+        } else {
+            qemu_event_set(&s->event);
         }
     } else if (ret == -1) {
         qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
@@ -504,6 +607,8 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
 {
     CompareState *s = COLO_COMPARE(uc);
     struct sysinfo si;
+    char thread_name[64];
+    static int compare_id;
 
     if (!s->pri_indev || !s->sec_indev || !s->outdev) {
         error_setg(errp, "colo compare needs 'primary_in' ,"
@@ -552,6 +657,7 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
     g_queue_init(&s->conn_list);
     qemu_mutex_init(&s->conn_list_lock);
 
+    colo_need_checkpoint = false;
     s->hashtable_size = 0;
     /*
      * Idea from kernel tcp.c: use 1/16384 of memory.  On i386: 32MB
@@ -572,6 +678,13 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
                                                       g_free,
                                                       connection_destroy);
 
+    s->thread_status = COMPARE_THREAD_RUNNING;
+    sprintf(thread_name, "proxy compare %d", compare_id);
+    qemu_thread_create(&s->thread, thread_name,
+                       colo_compare_thread, s,
+                       QEMU_THREAD_JOINABLE);
+    compare_id++;
+
     return;
 
 out:
@@ -615,6 +728,13 @@ static void colo_compare_class_finalize(ObjectClass *oc, void *data)
         QTAILQ_REMOVE(&net_compares, s, next);
     }
     qemu_mutex_destroy(&s->conn_list_lock);
+
+    if (s->thread.thread) {
+        s->thread_status = COMPARE_THREAD_EXIT;
+        qemu_event_set(&s->event);
+        qemu_thread_join(&s->thread);
+    }
+    qemu_event_destroy(&s->event);
 }
 
 static void colo_compare_init(Object *obj)
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH V2 1/3] colo-compare: introduce colo compare initlization
  2016-03-30  8:35 ` [Qemu-devel] [PATCH V2 1/3] colo-compare: introduce colo compare initlization Zhang Chen
@ 2016-03-30  9:25   ` Dr. David Alan Gilbert
  2016-03-31  1:41     ` Zhang Chen
  2016-04-13  2:02     ` Zhang Chen
  0 siblings, 2 replies; 26+ messages in thread
From: Dr. David Alan Gilbert @ 2016-03-30  9:25 UTC (permalink / raw)
  To: Zhang Chen
  Cc: Li Zhijian, Gui jianfeng, Jason Wang, eddie.dong, qemu devel,
	Yang Hongyang, zhanghailiang

* Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
> packet come from primary char indev will be send to
> outdev - packet come from secondary char dev will be drop

Please put in the description an example of how you invoke
the filter on the primary and secondary.

> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
> ---
>  net/Makefile.objs  |   1 +
>  net/colo-compare.c | 344 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>  vl.c               |   3 +-
>  3 files changed, 347 insertions(+), 1 deletion(-)
>  create mode 100644 net/colo-compare.c
> 
> diff --git a/net/Makefile.objs b/net/Makefile.objs
> index b7c22fd..ba92f73 100644
> --- a/net/Makefile.objs
> +++ b/net/Makefile.objs
> @@ -16,3 +16,4 @@ common-obj-$(CONFIG_NETMAP) += netmap.o
>  common-obj-y += filter.o
>  common-obj-y += filter-buffer.o
>  common-obj-y += filter-mirror.o
> +common-obj-y += colo-compare.o
> diff --git a/net/colo-compare.c b/net/colo-compare.c
> new file mode 100644
> index 0000000..62c66df
> --- /dev/null
> +++ b/net/colo-compare.c
> @@ -0,0 +1,344 @@
> +/*
> + * Copyright (c) 2016 FUJITSU LIMITED
> + * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or
> + * later.  See the COPYING file in the top-level directory.
> + */
> +
> +#include "qemu/osdep.h"
> +#include "qemu-common.h"
> +#include "qapi/qmp/qerror.h"
> +#include "qemu/error-report.h"
> +
> +#include "net/net.h"
> +#include "net/vhost_net.h"
> +#include "qom/object_interfaces.h"
> +#include "qemu/iov.h"
> +#include "qom/object.h"
> +#include "qemu/typedefs.h"
> +#include "net/queue.h"
> +#include "sysemu/char.h"
> +#include "qemu/sockets.h"
> +
> +#define TYPE_COLO_COMPARE "colo-compare"
> +#define COLO_COMPARE(obj) \
> +    OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
> +
> +#define COMPARE_READ_LEN_MAX NET_BUFSIZE
> +
> +static QTAILQ_HEAD(, CompareState) net_compares =
> +       QTAILQ_HEAD_INITIALIZER(net_compares);
> +
> +typedef struct ReadState {
> +    int state; /* 0 = getting length, 1 = getting data */
> +    unsigned int index;
> +    unsigned int packet_len;

Please make packet_len and index  uint32_t, since they're sent over the wire
as 32bit.

> +    uint8_t buf[COMPARE_READ_LEN_MAX];
> +} ReadState;
> +
> +typedef struct CompareState {
> +    Object parent;
> +
> +    char *pri_indev;
> +    char *sec_indev;
> +    char *outdev;
> +    CharDriverState *chr_pri_in;
> +    CharDriverState *chr_sec_in;
> +    CharDriverState *chr_out;
> +    QTAILQ_ENTRY(CompareState) next;
> +    ReadState pri_rs;
> +    ReadState sec_rs;
> +} CompareState;
> +
> +static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size)
> +{
> +    int ret = 0;
> +    uint32_t len = htonl(size);
> +

Similarly, make 'size' uint32_t - everything that gets converted into a uint32_t
it's probably best to make a uint32_t.

> +    if (!size) {
> +        return 0;
> +    }
> +
> +    ret = qemu_chr_fe_write_all(out, (uint8_t *)&len, sizeof(len));
> +    if (ret != sizeof(len)) {
> +        goto err;
> +    }
> +
> +    ret = qemu_chr_fe_write_all(out, (uint8_t *)buf, size);
> +    if (ret != size) {
> +        goto err;
> +    }
> +

You can make this slightly simpler and save the return 0;

> +    return 0;
> +
> +err:
> +    return ret < 0 ? ret : -EIO;

err:
       return ret <= 0 ? ret : -EIO;

> +}
> +
> +static int compare_chr_can_read(void *opaque)
> +{
> +    return COMPARE_READ_LEN_MAX;
> +}
> +
> +/* Returns
> + * 0: readstate is not ready
> + * 1: readstate is ready
> + * otherwise error occurs
> + */
> +static int compare_chr_fill_rstate(ReadState *rs, const uint8_t *buf, int size)
> +{
> +    unsigned int l;
> +    while (size > 0) {
> +        /* reassemble a packet from the network */
> +        switch (rs->state) { /* 0 = getting length, 1 = getting data */
> +        case 0:
> +            l = 4 - rs->index;
> +            if (l > size) {
> +                l = size;
> +            }
> +            memcpy(rs->buf + rs->index, buf, l);
> +            buf += l;
> +            size -= l;
> +            rs->index += l;
> +            if (rs->index == 4) {
> +                /* got length */
> +                rs->packet_len = ntohl(*(uint32_t *)rs->buf);
> +                rs->index = 0;
> +                rs->state = 1;
> +            }
> +            break;
> +        case 1:
> +            l = rs->packet_len - rs->index;
> +            if (l > size) {
> +                l = size;
> +            }
> +            if (rs->index + l <= sizeof(rs->buf)) {
> +                memcpy(rs->buf + rs->index, buf, l);
> +            } else {
> +                error_report("serious error: oversized packet received.");

Isn't it easier to do this check above in the 'got length' if ?
Instead of 'serious error' say where it's coming from;
  'colo-compare: Received oversized packet over socket'

that makes it a lot easier when people see the error for the first time.
Also, should you check for the error case of receiving a packet where
packet_len == 0 ?

> +                rs->index = rs->state = 0;
> +                return -1;
> +            }
> +
> +            rs->index += l;
> +            buf += l;
> +            size -= l;
> +            if (rs->index >= rs->packet_len) {
> +                rs->index = 0;
> +                rs->state = 0;
> +                return 1;
> +            }
> +            break;
> +        }
> +    }
> +    return 0;
> +}
> +
> +static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
> +{
> +    CompareState *s = COLO_COMPARE(opaque);
> +    int ret;
> +
> +    ret = compare_chr_fill_rstate(&s->pri_rs, buf, size);
> +    if (ret == 1) {
> +        /* FIXME: enqueue to primary packet list */
> +        compare_chr_send(s->chr_out, buf, size);
> +    } else if (ret == -1) {
> +        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
> +    }
> +}
> +
> +static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
> +{
> +    CompareState *s = COLO_COMPARE(opaque);
> +    int ret;
> +
> +    ret = compare_chr_fill_rstate(&s->sec_rs, buf, size);
> +    if (ret == 1) {
> +        /* TODO: enqueue to secondary packet list*/
> +    } else if (ret == -1) {
> +        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
> +    }
> +}
> +
> +static char *compare_get_pri_indev(Object *obj, Error **errp)
> +{
> +    CompareState *s = COLO_COMPARE(obj);
> +
> +    return g_strdup(s->pri_indev);
> +}
> +
> +static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
> +{
> +    CompareState *s = COLO_COMPARE(obj);
> +
> +    g_free(s->pri_indev);
> +    s->pri_indev = g_strdup(value);
> +}
> +
> +static char *compare_get_sec_indev(Object *obj, Error **errp)
> +{
> +    CompareState *s = COLO_COMPARE(obj);
> +
> +    return g_strdup(s->sec_indev);
> +}
> +
> +static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
> +{
> +    CompareState *s = COLO_COMPARE(obj);
> +
> +    g_free(s->sec_indev);
> +    s->sec_indev = g_strdup(value);
> +}
> +
> +static char *compare_get_outdev(Object *obj, Error **errp)
> +{
> +    CompareState *s = COLO_COMPARE(obj);
> +
> +    return g_strdup(s->outdev);
> +}
> +
> +static void compare_set_outdev(Object *obj, const char *value, Error **errp)
> +{
> +    CompareState *s = COLO_COMPARE(obj);
> +
> +    g_free(s->outdev);
> +    s->outdev = g_strdup(value);
> +}
> +
> +static void colo_compare_complete(UserCreatable *uc, Error **errp)
> +{
> +    CompareState *s = COLO_COMPARE(uc);
> +
> +    if (!s->pri_indev || !s->sec_indev || !s->outdev) {
> +        error_setg(errp, "colo compare needs 'primary_in' ,"
> +                   "'secondary_in','outdev' property set");
> +        return;
> +    } else if (!strcmp(s->pri_indev, s->outdev) ||
> +               !strcmp(s->sec_indev, s->outdev) ||
> +               !strcmp(s->pri_indev, s->sec_indev)) {
> +        error_setg(errp, "'indev' and 'outdev' could not be same "
> +                   "for compare module");
> +        return;
> +    }
> +
> +    s->chr_pri_in = qemu_chr_find(s->pri_indev);
> +    if (s->chr_pri_in == NULL) {
> +        error_set(errp, ERROR_CLASS_DEVICE_NOT_FOUND,

I think error_set seems to be discouraged these days, just use error_setg
(see the comment in include/qapi/error.h just above error_set).

> +                  "IN Device '%s' not found", s->pri_indev);
> +        goto out;
> +    }
> +
> +    qemu_chr_fe_claim_no_fail(s->chr_pri_in);
> +    qemu_chr_add_handlers(s->chr_pri_in, compare_chr_can_read,
> +                          compare_pri_chr_in, NULL, s);
> +
> +    s->chr_sec_in = qemu_chr_find(s->sec_indev);
> +    if (s->chr_sec_in == NULL) {
> +        error_set(errp, ERROR_CLASS_DEVICE_NOT_FOUND,
> +                  "IN Device '%s' not found", s->sec_indev);
> +        goto out;
> +    }

Can you explain/give an example of the way you create one of these
filters?
Would you ever have a pri_indev and sec_indev on the same filter?
If not, then why not just have an 'indev' option rather than the
two separate configs.
If you cna have both then you need to change hte error 'IN Device'
to say either 'Primary IN device' or secondary.

> +    qemu_chr_fe_claim_no_fail(s->chr_sec_in);
> +    qemu_chr_add_handlers(s->chr_sec_in, compare_chr_can_read,
> +                          compare_sec_chr_in, NULL, s);
> +
> +    s->chr_out = qemu_chr_find(s->outdev);
> +    if (s->chr_out == NULL) {
> +        error_set(errp, ERROR_CLASS_DEVICE_NOT_FOUND,
> +                  "OUT Device '%s' not found", s->outdev);
> +        goto out;
> +    }
> +    qemu_chr_fe_claim_no_fail(s->chr_out);
> +
> +    QTAILQ_INSERT_TAIL(&net_compares, s, next);
> +
> +    return;
> +
> +out:
> +    if (s->chr_pri_in) {
> +        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
> +        qemu_chr_fe_release(s->chr_pri_in);
> +        s->chr_pri_in = NULL;
> +    }
> +    if (s->chr_sec_in) {
> +        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
> +        qemu_chr_fe_release(s->chr_sec_in);
> +        s->chr_pri_in = NULL;
> +    }

Can't you avoid this by making the code:

     s->chr_pri_in = qemu_chr_find(...)
     if (s->chr_pri_in == NULL) {
        error (...)
     }
     s->chr_sec_in = qemu_chr_find(...)
     if (s->chr_sec_in == NULL) {
        error (...)
     }
     s->chr_out = qemu_chr_find(...)
     if (s->chr_out == NULL) {
        error (...)
     }

     qemu_chr_fe_claim_no_fail(pri)
     add_handlers(pri...)
     qemu_chr_fe_claim_no_fail(sec)
     add_handlers(sec...)
     qemu_chr_fe_claim_no_fail(out)
     add_handlers(out...)

so you don't have to clean up the handlers/release in the out: ?

> +}
> +
> +static void colo_compare_class_init(ObjectClass *oc, void *data)
> +{
> +    UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
> +
> +    ucc->complete = colo_compare_complete;
> +}
> +
> +static void colo_compare_class_finalize(ObjectClass *oc, void *data)
> +{
> +    UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
> +    CompareState *s = COLO_COMPARE(ucc);
> +
> +    if (s->chr_pri_in) {
> +        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
> +        qemu_chr_fe_release(s->chr_pri_in);
> +    }
> +    if (s->chr_sec_in) {
> +        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
> +        qemu_chr_fe_release(s->chr_sec_in);
> +    }
> +    if (s->chr_out) {
> +        qemu_chr_fe_release(s->chr_out);
> +    }
> +
> +    if (!QTAILQ_EMPTY(&net_compares)) {
> +        QTAILQ_REMOVE(&net_compares, s, next);
> +    }
> +}
> +
> +static void colo_compare_init(Object *obj)
> +{
> +    object_property_add_str(obj, "primary_in",
> +                            compare_get_pri_indev, compare_set_pri_indev,
> +                            NULL);
> +    object_property_add_str(obj, "secondary_in",
> +                            compare_get_sec_indev, compare_set_sec_indev,
> +                            NULL);
> +    object_property_add_str(obj, "outdev",
> +                            compare_get_outdev, compare_set_outdev,
> +                            NULL);
> +}
> +
> +static void colo_compare_finalize(Object *obj)
> +{
> +    CompareState *s = COLO_COMPARE(obj);
> +
> +    g_free(s->pri_indev);
> +    g_free(s->sec_indev);
> +    g_free(s->outdev);
> +}
> +
> +static const TypeInfo colo_compare_info = {
> +    .name = TYPE_COLO_COMPARE,
> +    .parent = TYPE_OBJECT,
> +    .instance_size = sizeof(CompareState),
> +    .instance_init = colo_compare_init,
> +    .instance_finalize = colo_compare_finalize,
> +    .class_size = sizeof(CompareState),
> +    .class_init = colo_compare_class_init,
> +    .class_finalize = colo_compare_class_finalize,
> +    .interfaces = (InterfaceInfo[]) {
> +        { TYPE_USER_CREATABLE },
> +        { }
> +    }
> +};
> +
> +static void register_types(void)
> +{
> +    type_register_static(&colo_compare_info);
> +}
> +
> +type_init(register_types);
> diff --git a/vl.c b/vl.c
> index dc6e63a..70064ad 100644
> --- a/vl.c
> +++ b/vl.c
> @@ -2842,7 +2842,8 @@ static bool object_create_initial(const char *type)
>      if (g_str_equal(type, "filter-buffer") ||
>          g_str_equal(type, "filter-dump") ||
>          g_str_equal(type, "filter-mirror") ||
> -        g_str_equal(type, "filter-redirector")) {
> +        g_str_equal(type, "filter-redirector") ||
> +        g_str_equal(type, "colo-compare")) {
>          return false;
>      }
>  
> -- 
> 1.9.1
> 
> 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH V2 2/3] colo-compare: track connection and enqueue packet
  2016-03-30  8:35 ` [Qemu-devel] [PATCH V2 2/3] colo-compare: track connection and enqueue packet Zhang Chen
@ 2016-03-30 10:36   ` Dr. David Alan Gilbert
  2016-03-31  2:09     ` Li Zhijian
  2016-03-31  4:06     ` Zhang Chen
  0 siblings, 2 replies; 26+ messages in thread
From: Dr. David Alan Gilbert @ 2016-03-30 10:36 UTC (permalink / raw)
  To: Zhang Chen
  Cc: Li Zhijian, Gui jianfeng, Jason Wang, eddie.dong, qemu devel,
	Yang Hongyang, zhanghailiang

* Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
> In this patch we use kernel jhash table to track
> connection, and then enqueue net packet like this:
> 
> + CompareState ++
> |               |
> +---------------+   +---------------+         +---------------+
> |conn list      +--->conn           +--------->conn           |
> +---------------+   +---------------+         +---------------+
> |               |     |           |             |          |
> +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
>                   |primary |  |secondary    |primary | |secondary
>                   |packet  |  |packet  +    |packet  | |packet  +
>                   +--------+  +--------+    +--------+ +--------+
>                       |           |             |          |
>                   +---v----+  +---v----+    +---v----+ +---v----+
>                   |primary |  |secondary    |primary | |secondary
>                   |packet  |  |packet  +    |packet  | |packet  +
>                   +--------+  +--------+    +--------+ +--------+
>                       |           |             |          |
>                   +---v----+  +---v----+    +---v----+ +---v----+
>                   |primary |  |secondary    |primary | |secondary
>                   |packet  |  |packet  +    |packet  | |packet  +
>                   +--------+  +--------+    +--------+ +--------+
> 
> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
> ---
>  include/qemu/jhash.h |  59 ++++++++++
>  net/colo-compare.c   | 324 ++++++++++++++++++++++++++++++++++++++++++++++++++-
>  2 files changed, 380 insertions(+), 3 deletions(-)
>  create mode 100644 include/qemu/jhash.h
> 
> diff --git a/include/qemu/jhash.h b/include/qemu/jhash.h
> new file mode 100644
> index 0000000..8a8ff0f
> --- /dev/null
> +++ b/include/qemu/jhash.h
> @@ -0,0 +1,59 @@
> +/* jhash.h: Jenkins hash support.
> +  *
> +  * Copyright (C) 2006. Bob Jenkins (bob_jenkins@burtleburtle.net)
> +  *
> +  * http://burtleburtle.net/bob/hash/
> +  *
> +  * These are the credits from Bob's sources:
> +  *
> +  * lookup3.c, by Bob Jenkins, May 2006, Public Domain.
> +  *
> +  * These are functions for producing 32-bit hashes for hash table lookup.
> +  * hashword(), hashlittle(), hashlittle2(), hashbig(), mix(), and final()
> +  * are externally useful functions.  Routines to test the hash are included
> +  * if SELF_TEST is defined.  You can use this free for any purpose.It's in
> +  * the public domain.  It has no warranty.
> +  *
> +  * Copyright (C) 2009-2010 Jozsef Kadlecsik (kadlec@blackhole.kfki.hu)
> +  *
> +  * I've modified Bob's hash to be useful in the Linux kernel, and
> +  * any bugs present are my fault.
> +  * Jozsef
> +  */
> +
> +#ifndef QEMU_JHASH_H__
> +#define QEMU_JHASH_H__
> +
> +#include "qemu/bitops.h"
> +
> +/*
> + * hashtable related is copied from linux kernel jhash
> + */
> +
> +/* __jhash_mix -- mix 3 32-bit values reversibly. */
> +#define __jhash_mix(a, b, c)                \
> +{                                           \
> +    a -= c;  a ^= rol32(c, 4);  c += b;     \
> +    b -= a;  b ^= rol32(a, 6);  a += c;     \
> +    c -= b;  c ^= rol32(b, 8);  b += a;     \
> +    a -= c;  a ^= rol32(c, 16); c += b;     \
> +    b -= a;  b ^= rol32(a, 19); a += c;     \
> +    c -= b;  c ^= rol32(b, 4);  b += a;     \
> +}
> +
> +/* __jhash_final - final mixing of 3 32-bit values (a,b,c) into c */
> +#define __jhash_final(a, b, c)  \
> +{                               \
> +    c ^= b; c -= rol32(b, 14);  \
> +    a ^= c; a -= rol32(c, 11);  \
> +    b ^= a; b -= rol32(a, 25);  \
> +    c ^= b; c -= rol32(b, 16);  \
> +    a ^= c; a -= rol32(c, 4);   \
> +    b ^= a; b -= rol32(a, 14);  \
> +    c ^= b; c -= rol32(b, 24);  \
> +}
> +
> +/* An arbitrary initial parameter */
> +#define JHASH_INITVAL           0xdeadbeef
> +
> +#endif /* QEMU_JHASH_H__ */
> diff --git a/net/colo-compare.c b/net/colo-compare.c
> index 62c66df..0bb5a51 100644
> --- a/net/colo-compare.c
> +++ b/net/colo-compare.c
> @@ -20,15 +20,22 @@
>  #include "net/queue.h"
>  #include "sysemu/char.h"
>  #include "qemu/sockets.h"
> +#include <sys/sysinfo.h>
> +#include "slirp/slirp.h"
> +#include "qemu/jhash.h"
> +#include <sys/sysinfo.h>
>  
>  #define TYPE_COLO_COMPARE "colo-compare"
>  #define COLO_COMPARE(obj) \
>      OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
>  
>  #define COMPARE_READ_LEN_MAX NET_BUFSIZE
> +#define PAGE_SIZE 4096
> +#define ETH_HLEN 14

PAGE_SIZE is not just 4k; use one of the system headers.
Also, don't define ETH_HLEN - include net/eth.h

>  static QTAILQ_HEAD(, CompareState) net_compares =
>         QTAILQ_HEAD_INITIALIZER(net_compares);
> +static ssize_t hashtable_max_size;
>  
>  typedef struct ReadState {
>      int state; /* 0 = getting length, 1 = getting data */
> @@ -37,6 +44,28 @@ typedef struct ReadState {
>      uint8_t buf[COMPARE_READ_LEN_MAX];
>  } ReadState;
>  
> +/*
> +  + CompareState ++
> +  |               |
> +  +---------------+   +---------------+         +---------------+
> +  |conn list      +--->conn           +--------->conn           |
> +  +---------------+   +---------------+         +---------------+
> +  |               |     |           |             |          |
> +  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
> +                    |primary |  |secondary    |primary | |secondary
> +                    |packet  |  |packet  +    |packet  | |packet  +
> +                    +--------+  +--------+    +--------+ +--------+
> +                        |           |             |          |
> +                    +---v----+  +---v----+    +---v----+ +---v----+
> +                    |primary |  |secondary    |primary | |secondary
> +                    |packet  |  |packet  +    |packet  | |packet  +
> +                    +--------+  +--------+    +--------+ +--------+
> +                        |           |             |          |
> +                    +---v----+  +---v----+    +---v----+ +---v----+
> +                    |primary |  |secondary    |primary | |secondary
> +                    |packet  |  |packet  +    |packet  | |packet  +
> +                    +--------+  +--------+    +--------+ +--------+
> +*/
>  typedef struct CompareState {
>      Object parent;
>  
> @@ -49,8 +78,268 @@ typedef struct CompareState {
>      QTAILQ_ENTRY(CompareState) next;
>      ReadState pri_rs;
>      ReadState sec_rs;
> +
> +    /* connection list: the connections belonged to this NIC could be found
> +     * in this list.
> +     * element type: Connection
> +     */
> +    GQueue conn_list;
> +    QemuMutex conn_list_lock; /* to protect conn_list */
> +    /* hashtable to save connection */
> +    GHashTable *connection_track_table;
> +    /* to save unprocessed_connections */
> +    GQueue unprocessed_connections;
> +    /* proxy current hash size */
> +    ssize_t hashtable_size;
>  } CompareState;
>  
> +typedef struct Packet {
> +    void *data;
> +    union {
> +        uint8_t *network_layer;
> +        struct ip *ip;
> +    };
> +    uint8_t *transport_layer;
> +    int size;
> +    CompareState *s;
> +} Packet;
> +
> +typedef struct ConnectionKey {
> +    /* (src, dst) must be grouped, in the same way than in IP header */
> +    struct in_addr src;
> +    struct in_addr dst;
> +    uint16_t src_port;
> +    uint16_t dst_port;
> +    uint8_t ip_proto;
> +} QEMU_PACKED ConnectionKey;

Someone will want IPv6 at some point, so think about that, but not
too worried for now.

> +typedef struct Connection {
> +    QemuMutex list_lock;
> +    /* connection primary send queue: element type: Packet */
> +    GQueue primary_list;
> +    /* connection secondary send queue: element type: Packet */
> +    GQueue secondary_list;
> +    /* flag to enqueue unprocessed_connections */
> +    bool processing;
> +    int ip_proto;

in ConnectionKey you use uint8_t for ip_proto  - should
be consistent?

> +} Connection;
> +
> +enum {
> +    PRIMARY_IN = 0,
> +    SECONDARY_IN,
> +};
> +
> +static void packet_destroy(void *opaque, void *user_data);
> +static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size);
> +
> +static uint32_t connection_key_hash(const void *opaque)
> +{
> +    const ConnectionKey *key = opaque;
> +    uint32_t a, b, c;
> +
> +    /* Jenkins hash */
> +    a = b = c = JHASH_INITVAL + sizeof(*key);
> +    a += key->src.s_addr;
> +    b += key->dst.s_addr;
> +    c += (key->src_port | key->dst_port << 16);
> +    __jhash_mix(a, b, c);
> +
> +    a += key->ip_proto;
> +    __jhash_final(a, b, c);
> +
> +    return c;
> +}
> +
> +static int connection_key_equal(const void *opaque1, const void *opaque2)
> +{
> +    return memcmp(opaque1, opaque2, sizeof(ConnectionKey)) == 0;
> +}
> +
> +/*
> + *  initialize connecon_key for packet
                        ^ti

> + *  Return 0 on success, if return 1 the pkt will be sent later
> + */
> +static int connection_key_init(Packet *pkt, ConnectionKey *key)
> +{
> +    int network_length;
> +    uint8_t *data = pkt->data;
> +    uint16_t l3_proto;
> +    uint32_t tmp_ports;
> +    ssize_t l2hdr_len = eth_get_l2_hdr_length(data);
> +
> +    pkt->network_layer = data + ETH_HLEN;
> +    l3_proto = eth_get_l3_proto(data, l2hdr_len);
> +    if (l3_proto != ETH_P_IP) {
> +        return 1;
> +    }
> +
> +    network_length = pkt->ip->ip_hl * 4;
> +    pkt->transport_layer = pkt->network_layer + network_length;

Have we checked that this is valid - this is guest/external network
data, so is that 'network_length' actually pointing to valid data
or off the end of the packet?

> +    key->ip_proto = pkt->ip->ip_p;
> +    key->src = pkt->ip->ip_src;
> +    key->dst = pkt->ip->ip_dst;
> +
> +    switch (key->ip_proto) {
> +    case IPPROTO_TCP:
> +    case IPPROTO_UDP:
> +    case IPPROTO_DCCP:
> +    case IPPROTO_ESP:
> +    case IPPROTO_SCTP:
> +    case IPPROTO_UDPLITE:
> +        tmp_ports = *(uint32_t *)(pkt->transport_layer);
> +        key->src_port = tmp_ports & 0xffff;
> +        key->dst_port = tmp_ports >> 16;

Do these need ntohs - or do you want to keep them in network
order?  In my world on your older code I added ntohs's because
it made debugging make a lot more sense when you print out src_port/dst_port.

> +        break;
> +    case IPPROTO_AH:
> +        tmp_ports = *(uint32_t *)(pkt->transport_layer + 4);
> +        key->src_port = tmp_ports & 0xffff;
> +        key->dst_port = tmp_ports >> 16;
> +        break;
> +    default:

   Do you need to set src_port/dst_port here (to 0 ?? ) ?

> +        break;
> +    }
> +
> +    return 0;
> +}
> +
> +static Connection *connection_new(ConnectionKey *key)
> +{
> +    Connection *conn = g_slice_new(Connection);
> +
> +    qemu_mutex_init(&conn->list_lock);
> +    conn->ip_proto = key->ip_proto;
> +    conn->processing = false;
> +    g_queue_init(&conn->primary_list);
> +    g_queue_init(&conn->secondary_list);
> +
> +    return conn;
> +}
> +
> +/*
> + * Clear hashtable, stop this hash growing really huge
> + */
> +static void connection_hashtable_reset(CompareState *s)
> +{
> +    s->hashtable_size = 0;
> +    g_hash_table_remove_all(s->connection_track_table);
> +}
> +
> +/* if not found, creata a new connection and add to hash table */

 Typo                    ^

> +static Connection *connection_get(CompareState *s, ConnectionKey *key)
> +{
> +    /* FIXME: protect connection_track_table */
> +    Connection *conn = g_hash_table_lookup(s->connection_track_table, key);
> +
> +    if (conn == NULL) {
> +        ConnectionKey *new_key = g_memdup(key, sizeof(*key));
> +
> +        conn = connection_new(key);
> +
> +        s->hashtable_size++;
> +        if (s->hashtable_size > hashtable_max_size) {
> +            error_report("colo proxy connection hashtable full, clear it");
> +            connection_hashtable_reset(s);
> +            /* TODO:clear conn_list */

> +        } else {

This feels wrong; should this actually be in an else? If you've just cleared
the hash table, then you probably want to add this new connection to the empty
table? (And for example at the moment the 'new_key' is not used if we go down
this if).

> +            g_hash_table_insert(s->connection_track_table, new_key, conn);
> +        }
> +    }
> +
> +     return conn;
> +}
> +
> +static void connection_destroy(void *opaque)
> +{
> +    Connection *conn = opaque;
> +
> +    qemu_mutex_lock(&conn->list_lock);
> +    g_queue_foreach(&conn->primary_list, packet_destroy, NULL);
> +    g_queue_free(&conn->primary_list);
> +    g_queue_foreach(&conn->secondary_list, packet_destroy, NULL);
> +    g_queue_free(&conn->secondary_list);
> +    qemu_mutex_unlock(&conn->list_lock);
> +    qemu_mutex_destroy(&conn->list_lock);
> +    g_slice_free(Connection, conn);
> +}
> +
> +static Packet *packet_new(CompareState *s, const void *data,
> +                              int size, ConnectionKey *key)
> +{
> +    Packet *pkt = g_slice_new(Packet);
> +
> +    pkt->data = g_memdup(data, size);
> +    pkt->size = size;
> +    pkt->s = s;
> +
> +    if (connection_key_init(pkt, key)) {
> +        packet_destroy(pkt, NULL);
> +        pkt = NULL;
> +    }
> +
> +    return pkt;
> +}
> +
> +static int packet_enqueue(CompareState *s, int mode)
> +{
> +    ConnectionKey key = {{ 0 } };
> +    Packet *pkt = NULL;
> +    Connection *conn;
> +
> +    /* arp packet will be sent */

Can you add some more detail about that - what do the return
values of packet_enqueue mean; what happens to things like IPv6 or ARP packets?

> +    if (mode == PRIMARY_IN) {
> +        pkt = packet_new(s, s->pri_rs.buf, s->pri_rs.packet_len, &key);
> +    } else {
> +        pkt = packet_new(s, s->sec_rs.buf, s->sec_rs.packet_len, &key);
> +    }
> +    if (!pkt) {
> +        return -1;
> +    }
> +
> +    conn = connection_get(s, &key);
> +    if (!conn->processing) {
> +        qemu_mutex_lock(&s->conn_list_lock);
> +        g_queue_push_tail(&s->conn_list, conn);
> +        qemu_mutex_unlock(&s->conn_list_lock);
> +        conn->processing = true;
> +    }
> +
> +    qemu_mutex_lock(&conn->list_lock);
> +    if (mode == PRIMARY_IN) {
> +        g_queue_push_tail(&conn->primary_list, pkt);
> +    } else {
> +        g_queue_push_tail(&conn->secondary_list, pkt);
> +    }
> +    qemu_mutex_unlock(&conn->list_lock);
> +
> +    return 0;
> +}
> +
> +static void packet_destroy(void *opaque, void *user_data)
> +{
> +    Packet *pkt = opaque;
> +
> +    g_free(pkt->data);
> +    g_slice_free(Packet, pkt);
> +}
> +
> +static inline void colo_flush_connection(void *opaque, void *user_data)
> +{

Is this used?

> +    Connection *conn = opaque;
> +    Packet *pkt = NULL;
> +
> +    qemu_mutex_lock(&conn->list_lock);
> +    while (!g_queue_is_empty(&conn->primary_list)) {
> +        pkt = g_queue_pop_head(&conn->primary_list);
> +        compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size);
> +        /* FIXME: destroy pkt ?*/
> +    }
> +    while (!g_queue_is_empty(&conn->secondary_list)) {
> +        pkt = g_queue_pop_head(&conn->secondary_list);
> +        packet_destroy(pkt, NULL);
> +    }
> +    qemu_mutex_unlock(&conn->list_lock);
> +}
> +
>  static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size)
>  {
>      int ret = 0;
> @@ -142,8 +431,10 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
>  
>      ret = compare_chr_fill_rstate(&s->pri_rs, buf, size);
>      if (ret == 1) {
> -        /* FIXME: enqueue to primary packet list */
> -        compare_chr_send(s->chr_out, buf, size);
> +        if (packet_enqueue(s, PRIMARY_IN)) {
> +            error_report("primary: unsupported packet in");

Is this for non-IP packets?  If so you don't want an error_report - because non-IP are
quite common; a trace would be useful giving the packet type etc

> +            compare_chr_send(s->chr_out, buf, size);
> +        }
>      } else if (ret == -1) {
>          qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
>      }
> @@ -156,7 +447,9 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
>  
>      ret = compare_chr_fill_rstate(&s->sec_rs, buf, size);
>      if (ret == 1) {
> -        /* TODO: enqueue to secondary packet list*/
> +        if (packet_enqueue(s, SECONDARY_IN)) {
> +            error_report("secondary: unsupported packet in");
> +        }
>      } else if (ret == -1) {
>          qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
>      }
> @@ -210,6 +503,7 @@ static void compare_set_outdev(Object *obj, const char *value, Error **errp)
>  static void colo_compare_complete(UserCreatable *uc, Error **errp)
>  {
>      CompareState *s = COLO_COMPARE(uc);
> +    struct sysinfo si;
>  
>      if (!s->pri_indev || !s->sec_indev || !s->outdev) {
>          error_setg(errp, "colo compare needs 'primary_in' ,"
> @@ -255,6 +549,29 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>  
>      QTAILQ_INSERT_TAIL(&net_compares, s, next);
>  
> +    g_queue_init(&s->conn_list);
> +    qemu_mutex_init(&s->conn_list_lock);
> +
> +    s->hashtable_size = 0;
> +    /*
> +     * Idea from kernel tcp.c: use 1/16384 of memory.  On i386: 32MB
> +     * machine has 512 buckets. >= 1GB machines have 16384 buckets.
> +     */
> +    sysinfo(&si);
> +    hashtable_max_size = si.totalram / 16384;
> +    if (si.totalram > (1024 * 1024 * 1024 / PAGE_SIZE)) {
> +        hashtable_max_size = 16384;
> +    }
> +    if (hashtable_max_size < 32) {
> +        hashtable_max_size = 32;
> +    }
> +    hashtable_max_size = hashtable_max_size * 8; /* default factor = 8 */

Make this a lot simpler; just pick a size and if it's a problem then we'll worry
about it later, or make it an option on the filter if you want it changeable.

> +    s->connection_track_table = g_hash_table_new_full(connection_key_hash,
> +                                                      connection_key_equal,
> +                                                      g_free,
> +                                                      connection_destroy);
> +
>      return;
>  
>  out:
> @@ -297,6 +614,7 @@ static void colo_compare_class_finalize(ObjectClass *oc, void *data)
>      if (!QTAILQ_EMPTY(&net_compares)) {
>          QTAILQ_REMOVE(&net_compares, s, next);
>      }
> +    qemu_mutex_destroy(&s->conn_list_lock);
>  }
>  
>  static void colo_compare_init(Object *obj)
> -- 
> 1.9.1
> 
> 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH V2 3/3] colo-compare: introduce packet comparison thread
  2016-03-30  8:35 ` [Qemu-devel] [PATCH V2 3/3] colo-compare: introduce packet comparison thread Zhang Chen
@ 2016-03-30 11:41   ` Dr. David Alan Gilbert
  2016-03-31  2:17     ` Li Zhijian
  2016-03-31  6:00     ` Zhang Chen
  0 siblings, 2 replies; 26+ messages in thread
From: Dr. David Alan Gilbert @ 2016-03-30 11:41 UTC (permalink / raw)
  To: Zhang Chen
  Cc: Li Zhijian, Gui jianfeng, Jason Wang, eddie.dong, qemu devel,
	Yang Hongyang, zhanghailiang

* Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
> if packets are same, we send primary packet and drop secondary
> packet, otherwise notify COLO do checkpoint.
> 
> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
> ---
>  net/colo-compare.c | 122 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 121 insertions(+), 1 deletion(-)
> 
> diff --git a/net/colo-compare.c b/net/colo-compare.c
> index 0bb5a51..1debc0e 100644
> --- a/net/colo-compare.c
> +++ b/net/colo-compare.c
> @@ -36,6 +36,7 @@
>  static QTAILQ_HEAD(, CompareState) net_compares =
>         QTAILQ_HEAD_INITIALIZER(net_compares);
>  static ssize_t hashtable_max_size;
> +static int colo_need_checkpoint;
>  
>  typedef struct ReadState {
>      int state; /* 0 = getting length, 1 = getting data */
> @@ -91,6 +92,13 @@ typedef struct CompareState {
>      GQueue unprocessed_connections;
>      /* proxy current hash size */
>      ssize_t hashtable_size;
> +
> +    /* notify compare thread */
> +    QemuEvent event;
> +    /* compare thread, a thread for each NIC */
> +    QemuThread thread;
> +    int thread_status;
> +
>  } CompareState;
>  
>  typedef struct Packet {
> @@ -129,6 +137,15 @@ enum {
>      SECONDARY_IN,
>  };
>  
> +enum {
> +    /* compare thread isn't started */
> +    COMPARE_THREAD_NONE,
> +    /* compare thread is running */
> +    COMPARE_THREAD_RUNNING,
> +    /* compare thread exit */
> +    COMPARE_THREAD_EXIT,
> +};
> +
>  static void packet_destroy(void *opaque, void *user_data);
>  static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size);
>  
> @@ -340,6 +357,88 @@ static inline void colo_flush_connection(void *opaque, void *user_data)
>      qemu_mutex_unlock(&conn->list_lock);
>  }
>  
> +static void colo_notify_checkpoint(void)
> +{
> +    colo_need_checkpoint = true;
> +}
> +
> +/* TODO colo_do_checkpoint() {
> + * we flush the connections and reset 'colo_need_checkpoint'
> + * }
> + */
> +
> +static inline void colo_dump_packet(Packet *pkt)
> +{
> +    int i;
> +    for (i = 0; i < pkt->size; i++) {
> +        printf("%02x ", ((uint8_t *)pkt->data)[i]);
> +    }
> +    printf("\n");
> +}
> +
> +/*
> + * The IP packets sent by primary and secondary
> + * will be compared in here
> + * TODO support ip fragment, Out-Of-Order
> + * return:    0  means packet same
> + *            > 0 || < 0 means packet different
> + */
> +static int colo_packet_compare(Packet *ppkt, Packet *spkt)
> +{
> +    colo_dump_packet(ppkt);
> +    colo_dump_packet(spkt);

Obviously those need to become conditional on something.

> +    if (ppkt->size == spkt->size) {
> +        return memcmp(ppkt->data, spkt->data, spkt->size);
> +    } else {
> +        return -1;
> +    }
> +}
> +
> +static void colo_compare_connection(void *opaque, void *user_data)
> +{
> +    Connection *conn = opaque;
> +    Packet *pkt = NULL;
> +    GList *result = NULL;
> +    int ret;
> +
> +    qemu_mutex_lock(&conn->list_lock);
> +    while (!g_queue_is_empty(&conn->primary_list) &&
> +           !g_queue_is_empty(&conn->secondary_list)) {
> +        pkt = g_queue_pop_head(&conn->primary_list);
> +        result = g_queue_find_custom(&conn->secondary_list,
> +                              pkt, (GCompareFunc)colo_packet_compare);

I think the order of parameters passed to the colo_packet_compare
is the wrong way around - although it doesn't really matter with your current
simple comparison;  https://developer.gnome.org/glib/stable/glib-Double-ended-Queues.html
says that

    'The function takes two gconstpointer arguments, the GQueue element's data as the
     first argument and the given user data as the second argument'

  so that makes the first argument the element out of the secondary_list and
the second argument the 'pkt' that you popped off the primary.

> +
> +        if (result) {
> +            ret = compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size);
> +            if (ret < 0) {
> +                error_report("colo_send_primary_packet failed");
> +            }
> +            g_queue_remove(&conn->secondary_list, result);
> +        } else {
> +            g_queue_push_head(&conn->primary_list, pkt);
> +            colo_notify_checkpoint();
> +            break;
> +        }
> +    }
> +    qemu_mutex_unlock(&conn->list_lock);
> +}
> +
> +static void *colo_compare_thread(void *opaque)
> +{
> +    CompareState *s = opaque;
> +
> +    while (s->thread_status == COMPARE_THREAD_RUNNING) {
> +        qemu_event_wait(&s->event);
> +        qemu_event_reset(&s->event);
> +        qemu_mutex_lock(&s->conn_list_lock);
> +        g_queue_foreach(&s->conn_list, colo_compare_connection, NULL);
> +        qemu_mutex_unlock(&s->conn_list_lock);

Interesting; holding the 'conn_list_lock' around the whole of the comparison
is probably quite expensive if you've got a lot of packets coming in then
the lock could be held for most of the time.
I'm not sure of a better solution; maybe use the qemu/rcu_queue.h ?

> +    }
> +
> +    return NULL;
> +}
> +
>  static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size)
>  {
>      int ret = 0;
> @@ -433,7 +532,9 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
>      if (ret == 1) {
>          if (packet_enqueue(s, PRIMARY_IN)) {
>              error_report("primary: unsupported packet in");
> -            compare_chr_send(s->chr_out, buf, size);
> +            compare_chr_send(s->chr_out, s->pri_rs.buf, s->pri_rs.packet_len);

Doesn't that change belong in an earlier patch?

> +        } else {
> +            qemu_event_set(&s->event);

Also these - why are these in this patch?

>          }
>      } else if (ret == -1) {
>          qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
> @@ -449,6 +550,8 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
>      if (ret == 1) {
>          if (packet_enqueue(s, SECONDARY_IN)) {
>              error_report("secondary: unsupported packet in");
> +        } else {
> +            qemu_event_set(&s->event);
>          }
>      } else if (ret == -1) {
>          qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
> @@ -504,6 +607,8 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>  {
>      CompareState *s = COLO_COMPARE(uc);
>      struct sysinfo si;
> +    char thread_name[64];
> +    static int compare_id;
>  
>      if (!s->pri_indev || !s->sec_indev || !s->outdev) {
>          error_setg(errp, "colo compare needs 'primary_in' ,"
> @@ -552,6 +657,7 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>      g_queue_init(&s->conn_list);
>      qemu_mutex_init(&s->conn_list_lock);
>  
> +    colo_need_checkpoint = false;
>      s->hashtable_size = 0;
>      /*
>       * Idea from kernel tcp.c: use 1/16384 of memory.  On i386: 32MB
> @@ -572,6 +678,13 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>                                                        g_free,
>                                                        connection_destroy);
>  
> +    s->thread_status = COMPARE_THREAD_RUNNING;
> +    sprintf(thread_name, "proxy compare %d", compare_id);

As with my comment from last month; the thread names are limited
to 14 characters on Linux (and most other Unixes) so keep this short;
I use "proxy:%s" and the device name.

> +    qemu_thread_create(&s->thread, thread_name,
> +                       colo_compare_thread, s,
> +                       QEMU_THREAD_JOINABLE);
> +    compare_id++;
> +
>      return;
>  
>  out:
> @@ -615,6 +728,13 @@ static void colo_compare_class_finalize(ObjectClass *oc, void *data)
>          QTAILQ_REMOVE(&net_compares, s, next);
>      }
>      qemu_mutex_destroy(&s->conn_list_lock);
> +
> +    if (s->thread.thread) {
> +        s->thread_status = COMPARE_THREAD_EXIT;
> +        qemu_event_set(&s->event);
> +        qemu_thread_join(&s->thread);
> +    }
> +    qemu_event_destroy(&s->event);
>  }
>  
>  static void colo_compare_init(Object *obj)
> -- 
> 1.9.1
> 
> 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH V2 0/3] Introduce COLO-compare
  2016-03-30  8:35 [Qemu-devel] [PATCH V2 0/3] Introduce COLO-compare Zhang Chen
                   ` (2 preceding siblings ...)
  2016-03-30  8:35 ` [Qemu-devel] [PATCH V2 3/3] colo-compare: introduce packet comparison thread Zhang Chen
@ 2016-03-30 12:05 ` Dr. David Alan Gilbert
  2016-03-31  3:01   ` Li Zhijian
  2016-03-31  6:48   ` Zhang Chen
  3 siblings, 2 replies; 26+ messages in thread
From: Dr. David Alan Gilbert @ 2016-03-30 12:05 UTC (permalink / raw)
  To: Zhang Chen
  Cc: Li Zhijian, Gui jianfeng, Jason Wang, eddie.dong, qemu devel,
	Yang Hongyang, zhanghailiang

* Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
> COLO-compare is a part of COLO project. It is used
> to compare the network package to help COLO decide
> whether to do checkpoint.

Hi Zhang Chen,
  I've put comments on the individual patches, but some more general things:

  1) Please add a coment giving the example of the command line for the primary
    and secondary use of this module - it helps make it easier to understand the patches.

  2) There's no tracing in here - please add some; I found when I tried to get
    COLO working I needed to use lots of tracing and debugging to understand the
    packet flow.

  3) Add comments; e.g. for each function say which thread is using it and where
     the packets are coming from; e.g. 
        called from the main thread on the primary for packets arriving over the socket
        from the secondary.

     There's just so many packets going in so many directions it would make it
     easier to follow.

  4) A more fundamental problem is what happens if the secondary never sends anything
     on the socket, the result is you end up running until the end of the long COLO
     checkpoint without triggering a discompare - in my world I added a timeout (400ms)
     for an unmatched packet from the primary, where if no matching packet was received
     a checkpoint would be triggered.

  5) I see the packet comparison is still the simple memcmpy that you had in December;
     are you planning on doing anything more complicated; you must be seing most packets
     miscompare?

You can see my current world at; https://github.com/orbitfp7/qemu/commits/orbit-wp4-colo-mar16
which has my basic TCP comparison (it's only tracking incoming connections) and I know it's
not complete either.  It mostly works OK, although I've got an occasional seg
(which makes me wonder if I need to add the conn_list_lock I see you added).  I'm also
not doing any TCP reassembly which is probably needed.

Dave
    
> v2:
>  - add jhash.h
> 
> v1:
>  - initial patch
> 
> 
> Zhang Chen (3):
>   colo-compare: introduce colo compare initlization
>   colo-compare: track connection and enqueue packet
>   colo-compare: introduce packet comparison thread
> 
>  include/qemu/jhash.h |  59 ++++
>  net/Makefile.objs    |   1 +
>  net/colo-compare.c   | 782 +++++++++++++++++++++++++++++++++++++++++++++++++++
>  vl.c                 |   3 +-
>  4 files changed, 844 insertions(+), 1 deletion(-)
>  create mode 100644 include/qemu/jhash.h
>  create mode 100644 net/colo-compare.c
> 
> -- 
> 1.9.1
> 
> 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH V2 1/3] colo-compare: introduce colo compare initlization
  2016-03-30  9:25   ` Dr. David Alan Gilbert
@ 2016-03-31  1:41     ` Zhang Chen
  2016-03-31  7:25       ` Zhang Chen
  2016-03-31  9:24       ` Dr. David Alan Gilbert
  2016-04-13  2:02     ` Zhang Chen
  1 sibling, 2 replies; 26+ messages in thread
From: Zhang Chen @ 2016-03-31  1:41 UTC (permalink / raw)
  To: Dr. David Alan Gilbert
  Cc: Li Zhijian, Gui jianfeng, Jason Wang, eddie.dong, qemu devel,
	Yang Hongyang, zhanghailiang



On 03/30/2016 05:25 PM, Dr. David Alan Gilbert wrote:
> * Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
>> packet come from primary char indev will be send to
>> outdev - packet come from secondary char dev will be drop
> Please put in the description an example of how you invoke
> the filter on the primary and secondary.

OK.
colo-compare get packet from chardev(primary_in,secondary_in),
and output to other chardev(outdev), so, we can use it with the
help of filter-mirror and filter-redirector. like that:

primary:
-netdev 
tap,id=hn0,vhost=off,script=/etc/qemu-ifup,downscript=/etc/qemu-ifdown
-device e1000,id=e0,netdev=hn0,mac=52:a4:00:12:78:66
-chardev socket,id=mirror0,host=3.3.3.3,port=9003,server,nowait
-chardev socket,id=compare1,host=3.3.3.3,port=9004,server,nowait
-chardev socket,id=compare0,host=3.3.3.3,port=9001,server,nowait
-chardev socket,id=compare0-0,host=3.3.3.3,port=9001
-chardev socket,id=compare_out,host=3.3.3.3,port=9005,server,nowait
-chardev socket,id=compare_out0,host=3.3.3.3,port=9005
-object filter-mirror,id=m0,netdev=hn0,queue=tx,outdev=mirror0
-object filter-redirector,netdev=hn0,id=redire0,queue=rx,indev=compare_out
-object filter-redirector,netdev=hn0,id=redire1,queue=rx,outdev=compare0
-object 
colo-compare,id=comp0,primary_in=compare0-0,secondary_in=compare1,outdev=compare_out0 


secondary:
-netdev tap,id=hn0,vhost=off,script=/etc/qemu-ifup,down 
script=/etc/qemu-ifdown
-device e1000,netdev=hn0,mac=52:a4:00:12:78:66
-chardev socket,id=red0,host=3.3.3.3,port=9003
-chardev socket,id=red1,host=3.3.3.3,port=9004
-object filter-redirector,id=f1,netdev=hn0,queue=tx,indev=red0
-object filter-redirector,id=f2,netdev=hn0,queue=rx,outdev=red1



>
>> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
>> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
>> ---
>>   net/Makefile.objs  |   1 +
>>   net/colo-compare.c | 344 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>>   vl.c               |   3 +-
>>   3 files changed, 347 insertions(+), 1 deletion(-)
>>   create mode 100644 net/colo-compare.c
>>
>> diff --git a/net/Makefile.objs b/net/Makefile.objs
>> index b7c22fd..ba92f73 100644
>> --- a/net/Makefile.objs
>> +++ b/net/Makefile.objs
>> @@ -16,3 +16,4 @@ common-obj-$(CONFIG_NETMAP) += netmap.o
>>   common-obj-y += filter.o
>>   common-obj-y += filter-buffer.o
>>   common-obj-y += filter-mirror.o
>> +common-obj-y += colo-compare.o
>> diff --git a/net/colo-compare.c b/net/colo-compare.c
>> new file mode 100644
>> index 0000000..62c66df
>> --- /dev/null
>> +++ b/net/colo-compare.c
>> @@ -0,0 +1,344 @@
>> +/*
>> + * Copyright (c) 2016 FUJITSU LIMITED
>> + * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2 or
>> + * later.  See the COPYING file in the top-level directory.
>> + */
>> +
>> +#include "qemu/osdep.h"
>> +#include "qemu-common.h"
>> +#include "qapi/qmp/qerror.h"
>> +#include "qemu/error-report.h"
>> +
>> +#include "net/net.h"
>> +#include "net/vhost_net.h"
>> +#include "qom/object_interfaces.h"
>> +#include "qemu/iov.h"
>> +#include "qom/object.h"
>> +#include "qemu/typedefs.h"
>> +#include "net/queue.h"
>> +#include "sysemu/char.h"
>> +#include "qemu/sockets.h"
>> +
>> +#define TYPE_COLO_COMPARE "colo-compare"
>> +#define COLO_COMPARE(obj) \
>> +    OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
>> +
>> +#define COMPARE_READ_LEN_MAX NET_BUFSIZE
>> +
>> +static QTAILQ_HEAD(, CompareState) net_compares =
>> +       QTAILQ_HEAD_INITIALIZER(net_compares);
>> +
>> +typedef struct ReadState {
>> +    int state; /* 0 = getting length, 1 = getting data */
>> +    unsigned int index;
>> +    unsigned int packet_len;
> Please make packet_len and index  uint32_t, since they're sent over the wire
> as 32bit.
>
>> +    uint8_t buf[COMPARE_READ_LEN_MAX];
>> +} ReadState;
>> +
>> +typedef struct CompareState {
>> +    Object parent;
>> +
>> +    char *pri_indev;
>> +    char *sec_indev;
>> +    char *outdev;
>> +    CharDriverState *chr_pri_in;
>> +    CharDriverState *chr_sec_in;
>> +    CharDriverState *chr_out;
>> +    QTAILQ_ENTRY(CompareState) next;
>> +    ReadState pri_rs;
>> +    ReadState sec_rs;
>> +} CompareState;
>> +
>> +static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size)
>> +{
>> +    int ret = 0;
>> +    uint32_t len = htonl(size);
>> +
> Similarly, make 'size' uint32_t - everything that gets converted into a uint32_t
> it's probably best to make a uint32_t.
>
>> +    if (!size) {
>> +        return 0;
>> +    }
>> +
>> +    ret = qemu_chr_fe_write_all(out, (uint8_t *)&len, sizeof(len));
>> +    if (ret != sizeof(len)) {
>> +        goto err;
>> +    }
>> +
>> +    ret = qemu_chr_fe_write_all(out, (uint8_t *)buf, size);
>> +    if (ret != size) {
>> +        goto err;
>> +    }
>> +
> You can make this slightly simpler and save the return 0;
>
>> +    return 0;
>> +
>> +err:
>> +    return ret < 0 ? ret : -EIO;
> err:
>         return ret <= 0 ? ret : -EIO;
>
>> +}
>> +
>> +static int compare_chr_can_read(void *opaque)
>> +{
>> +    return COMPARE_READ_LEN_MAX;
>> +}
>> +
>> +/* Returns
>> + * 0: readstate is not ready
>> + * 1: readstate is ready
>> + * otherwise error occurs
>> + */
>> +static int compare_chr_fill_rstate(ReadState *rs, const uint8_t *buf, int size)
>> +{
>> +    unsigned int l;
>> +    while (size > 0) {
>> +        /* reassemble a packet from the network */
>> +        switch (rs->state) { /* 0 = getting length, 1 = getting data */
>> +        case 0:
>> +            l = 4 - rs->index;
>> +            if (l > size) {
>> +                l = size;
>> +            }
>> +            memcpy(rs->buf + rs->index, buf, l);
>> +            buf += l;
>> +            size -= l;
>> +            rs->index += l;
>> +            if (rs->index == 4) {
>> +                /* got length */
>> +                rs->packet_len = ntohl(*(uint32_t *)rs->buf);
>> +                rs->index = 0;
>> +                rs->state = 1;
>> +            }
>> +            break;
>> +        case 1:
>> +            l = rs->packet_len - rs->index;
>> +            if (l > size) {
>> +                l = size;
>> +            }
>> +            if (rs->index + l <= sizeof(rs->buf)) {
>> +                memcpy(rs->buf + rs->index, buf, l);
>> +            } else {
>> +                error_report("serious error: oversized packet received.");
> Isn't it easier to do this check above in the 'got length' if ?
> Instead of 'serious error' say where it's coming from;
>    'colo-compare: Received oversized packet over socket'
>
> that makes it a lot easier when people see the error for the first time.
> Also, should you check for the error case of receiving a packet where
> packet_len == 0 ?
>
>> +                rs->index = rs->state = 0;
>> +                return -1;
>> +            }
>> +
>> +            rs->index += l;
>> +            buf += l;
>> +            size -= l;
>> +            if (rs->index >= rs->packet_len) {
>> +                rs->index = 0;
>> +                rs->state = 0;
>> +                return 1;
>> +            }
>> +            break;
>> +        }
>> +    }
>> +    return 0;
>> +}
>> +
>> +static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
>> +{
>> +    CompareState *s = COLO_COMPARE(opaque);
>> +    int ret;
>> +
>> +    ret = compare_chr_fill_rstate(&s->pri_rs, buf, size);
>> +    if (ret == 1) {
>> +        /* FIXME: enqueue to primary packet list */
>> +        compare_chr_send(s->chr_out, buf, size);
>> +    } else if (ret == -1) {
>> +        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
>> +    }
>> +}
>> +
>> +static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
>> +{
>> +    CompareState *s = COLO_COMPARE(opaque);
>> +    int ret;
>> +
>> +    ret = compare_chr_fill_rstate(&s->sec_rs, buf, size);
>> +    if (ret == 1) {
>> +        /* TODO: enqueue to secondary packet list*/
>> +    } else if (ret == -1) {
>> +        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
>> +    }
>> +}
>> +
>> +static char *compare_get_pri_indev(Object *obj, Error **errp)
>> +{
>> +    CompareState *s = COLO_COMPARE(obj);
>> +
>> +    return g_strdup(s->pri_indev);
>> +}
>> +
>> +static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
>> +{
>> +    CompareState *s = COLO_COMPARE(obj);
>> +
>> +    g_free(s->pri_indev);
>> +    s->pri_indev = g_strdup(value);
>> +}
>> +
>> +static char *compare_get_sec_indev(Object *obj, Error **errp)
>> +{
>> +    CompareState *s = COLO_COMPARE(obj);
>> +
>> +    return g_strdup(s->sec_indev);
>> +}
>> +
>> +static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
>> +{
>> +    CompareState *s = COLO_COMPARE(obj);
>> +
>> +    g_free(s->sec_indev);
>> +    s->sec_indev = g_strdup(value);
>> +}
>> +
>> +static char *compare_get_outdev(Object *obj, Error **errp)
>> +{
>> +    CompareState *s = COLO_COMPARE(obj);
>> +
>> +    return g_strdup(s->outdev);
>> +}
>> +
>> +static void compare_set_outdev(Object *obj, const char *value, Error **errp)
>> +{
>> +    CompareState *s = COLO_COMPARE(obj);
>> +
>> +    g_free(s->outdev);
>> +    s->outdev = g_strdup(value);
>> +}
>> +
>> +static void colo_compare_complete(UserCreatable *uc, Error **errp)
>> +{
>> +    CompareState *s = COLO_COMPARE(uc);
>> +
>> +    if (!s->pri_indev || !s->sec_indev || !s->outdev) {
>> +        error_setg(errp, "colo compare needs 'primary_in' ,"
>> +                   "'secondary_in','outdev' property set");
>> +        return;
>> +    } else if (!strcmp(s->pri_indev, s->outdev) ||
>> +               !strcmp(s->sec_indev, s->outdev) ||
>> +               !strcmp(s->pri_indev, s->sec_indev)) {
>> +        error_setg(errp, "'indev' and 'outdev' could not be same "
>> +                   "for compare module");
>> +        return;
>> +    }
>> +
>> +    s->chr_pri_in = qemu_chr_find(s->pri_indev);
>> +    if (s->chr_pri_in == NULL) {
>> +        error_set(errp, ERROR_CLASS_DEVICE_NOT_FOUND,
> I think error_set seems to be discouraged these days, just use error_setg
> (see the comment in include/qapi/error.h just above error_set).
>
>> +                  "IN Device '%s' not found", s->pri_indev);
>> +        goto out;
>> +    }
>> +
>> +    qemu_chr_fe_claim_no_fail(s->chr_pri_in);
>> +    qemu_chr_add_handlers(s->chr_pri_in, compare_chr_can_read,
>> +                          compare_pri_chr_in, NULL, s);
>> +
>> +    s->chr_sec_in = qemu_chr_find(s->sec_indev);
>> +    if (s->chr_sec_in == NULL) {
>> +        error_set(errp, ERROR_CLASS_DEVICE_NOT_FOUND,
>> +                  "IN Device '%s' not found", s->sec_indev);
>> +        goto out;
>> +    }
> Can you explain/give an example of the way you create one of these
> filters?
> Would you ever have a pri_indev and sec_indev on the same filter?
> If not, then why not just have an 'indev' option rather than the
> two separate configs.
> If you cna have both then you need to change hte error 'IN Device'
> to say either 'Primary IN device' or secondary.
>
>> +    qemu_chr_fe_claim_no_fail(s->chr_sec_in);
>> +    qemu_chr_add_handlers(s->chr_sec_in, compare_chr_can_read,
>> +                          compare_sec_chr_in, NULL, s);
>> +
>> +    s->chr_out = qemu_chr_find(s->outdev);
>> +    if (s->chr_out == NULL) {
>> +        error_set(errp, ERROR_CLASS_DEVICE_NOT_FOUND,
>> +                  "OUT Device '%s' not found", s->outdev);
>> +        goto out;
>> +    }
>> +    qemu_chr_fe_claim_no_fail(s->chr_out);
>> +
>> +    QTAILQ_INSERT_TAIL(&net_compares, s, next);
>> +
>> +    return;
>> +
>> +out:
>> +    if (s->chr_pri_in) {
>> +        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
>> +        qemu_chr_fe_release(s->chr_pri_in);
>> +        s->chr_pri_in = NULL;
>> +    }
>> +    if (s->chr_sec_in) {
>> +        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
>> +        qemu_chr_fe_release(s->chr_sec_in);
>> +        s->chr_pri_in = NULL;
>> +    }
> Can't you avoid this by making the code:
>
>       s->chr_pri_in = qemu_chr_find(...)
>       if (s->chr_pri_in == NULL) {
>          error (...)
>       }
>       s->chr_sec_in = qemu_chr_find(...)
>       if (s->chr_sec_in == NULL) {
>          error (...)
>       }
>       s->chr_out = qemu_chr_find(...)
>       if (s->chr_out == NULL) {
>          error (...)
>       }
>
>       qemu_chr_fe_claim_no_fail(pri)
>       add_handlers(pri...)
>       qemu_chr_fe_claim_no_fail(sec)
>       add_handlers(sec...)
>       qemu_chr_fe_claim_no_fail(out)
>       add_handlers(out...)
>
> so you don't have to clean up the handlers/release in the out: ?
>
>> +}
>> +
>> +static void colo_compare_class_init(ObjectClass *oc, void *data)
>> +{
>> +    UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
>> +
>> +    ucc->complete = colo_compare_complete;
>> +}
>> +
>> +static void colo_compare_class_finalize(ObjectClass *oc, void *data)
>> +{
>> +    UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
>> +    CompareState *s = COLO_COMPARE(ucc);
>> +
>> +    if (s->chr_pri_in) {
>> +        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
>> +        qemu_chr_fe_release(s->chr_pri_in);
>> +    }
>> +    if (s->chr_sec_in) {
>> +        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
>> +        qemu_chr_fe_release(s->chr_sec_in);
>> +    }
>> +    if (s->chr_out) {
>> +        qemu_chr_fe_release(s->chr_out);
>> +    }
>> +
>> +    if (!QTAILQ_EMPTY(&net_compares)) {
>> +        QTAILQ_REMOVE(&net_compares, s, next);
>> +    }
>> +}
>> +
>> +static void colo_compare_init(Object *obj)
>> +{
>> +    object_property_add_str(obj, "primary_in",
>> +                            compare_get_pri_indev, compare_set_pri_indev,
>> +                            NULL);
>> +    object_property_add_str(obj, "secondary_in",
>> +                            compare_get_sec_indev, compare_set_sec_indev,
>> +                            NULL);
>> +    object_property_add_str(obj, "outdev",
>> +                            compare_get_outdev, compare_set_outdev,
>> +                            NULL);
>> +}
>> +
>> +static void colo_compare_finalize(Object *obj)
>> +{
>> +    CompareState *s = COLO_COMPARE(obj);
>> +
>> +    g_free(s->pri_indev);
>> +    g_free(s->sec_indev);
>> +    g_free(s->outdev);
>> +}
>> +
>> +static const TypeInfo colo_compare_info = {
>> +    .name = TYPE_COLO_COMPARE,
>> +    .parent = TYPE_OBJECT,
>> +    .instance_size = sizeof(CompareState),
>> +    .instance_init = colo_compare_init,
>> +    .instance_finalize = colo_compare_finalize,
>> +    .class_size = sizeof(CompareState),
>> +    .class_init = colo_compare_class_init,
>> +    .class_finalize = colo_compare_class_finalize,
>> +    .interfaces = (InterfaceInfo[]) {
>> +        { TYPE_USER_CREATABLE },
>> +        { }
>> +    }
>> +};
>> +
>> +static void register_types(void)
>> +{
>> +    type_register_static(&colo_compare_info);
>> +}
>> +
>> +type_init(register_types);
>> diff --git a/vl.c b/vl.c
>> index dc6e63a..70064ad 100644
>> --- a/vl.c
>> +++ b/vl.c
>> @@ -2842,7 +2842,8 @@ static bool object_create_initial(const char *type)
>>       if (g_str_equal(type, "filter-buffer") ||
>>           g_str_equal(type, "filter-dump") ||
>>           g_str_equal(type, "filter-mirror") ||
>> -        g_str_equal(type, "filter-redirector")) {
>> +        g_str_equal(type, "filter-redirector") ||
>> +        g_str_equal(type, "colo-compare")) {
>>           return false;
>>       }
>>   
>> -- 
>> 1.9.1
>>
>>
>>
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
>
>
> .
>

-- 
Thanks
zhangchen

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH V2 2/3] colo-compare: track connection and enqueue packet
  2016-03-30 10:36   ` Dr. David Alan Gilbert
@ 2016-03-31  2:09     ` Li Zhijian
  2016-03-31  8:47       ` Dr. David Alan Gilbert
  2016-03-31  4:06     ` Zhang Chen
  1 sibling, 1 reply; 26+ messages in thread
From: Li Zhijian @ 2016-03-31  2:09 UTC (permalink / raw)
  To: Dr. David Alan Gilbert, Zhang Chen
  Cc: zhanghailiang, Gui jianfeng, Jason Wang, eddie.dong, qemu devel,
	Yang Hongyang



On 03/30/2016 06:36 PM, Dr. David Alan Gilbert wrote:
> * Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
>> In this patch we use kernel jhash table to track
>> connection, and then enqueue net packet like this:
>>
>> + CompareState ++
>> |               |
>> +---------------+   +---------------+         +---------------+
>> |conn list      +--->conn           +--------->conn           |
>> +---------------+   +---------------+         +---------------+
>> |               |     |           |             |          |
>> +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
>>                    |primary |  |secondary    |primary | |secondary
>>                    |packet  |  |packet  +    |packet  | |packet  +
>>                    +--------+  +--------+    +--------+ +--------+
>>                        |           |             |          |
>>                    +---v----+  +---v----+    +---v----+ +---v----+
>>                    |primary |  |secondary    |primary | |secondary
>>                    |packet  |  |packet  +    |packet  | |packet  +
>>                    +--------+  +--------+    +--------+ +--------+
>>                        |           |             |          |
>>                    +---v----+  +---v----+    +---v----+ +---v----+
>>                    |primary |  |secondary    |primary | |secondary
>>                    |packet  |  |packet  +    |packet  | |packet  +
>>                    +--------+  +--------+    +--------+ +--------+
>>
>> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
>> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
>> ---
>>   include/qemu/jhash.h |  59 ++++++++++
>>   net/colo-compare.c   | 324 ++++++++++++++++++++++++++++++++++++++++++++++++++-
>>   2 files changed, 380 insertions(+), 3 deletions(-)
>>   create mode 100644 include/qemu/jhash.h
>>
>> diff --git a/include/qemu/jhash.h b/include/qemu/jhash.h
>> new file mode 100644
>> index 0000000..8a8ff0f
>> --- /dev/null
>> +++ b/include/qemu/jhash.h
>> @@ -0,0 +1,59 @@
>> +/* jhash.h: Jenkins hash support.
>> +  *
>> +  * Copyright (C) 2006. Bob Jenkins (bob_jenkins@burtleburtle.net)
>> +  *
>> +  * http://burtleburtle.net/bob/hash/
>> +  *
>> +  * These are the credits from Bob's sources:
>> +  *
>> +  * lookup3.c, by Bob Jenkins, May 2006, Public Domain.
>> +  *
>> +  * These are functions for producing 32-bit hashes for hash table lookup.
>> +  * hashword(), hashlittle(), hashlittle2(), hashbig(), mix(), and final()
>> +  * are externally useful functions.  Routines to test the hash are included
>> +  * if SELF_TEST is defined.  You can use this free for any purpose.It's in
>> +  * the public domain.  It has no warranty.
>> +  *
>> +  * Copyright (C) 2009-2010 Jozsef Kadlecsik (kadlec@blackhole.kfki.hu)
>> +  *
>> +  * I've modified Bob's hash to be useful in the Linux kernel, and
>> +  * any bugs present are my fault.
>> +  * Jozsef
>> +  */
>> +
>> +#ifndef QEMU_JHASH_H__
>> +#define QEMU_JHASH_H__
>> +
>> +#include "qemu/bitops.h"
>> +
>> +/*
>> + * hashtable related is copied from linux kernel jhash
>> + */
>> +
>> +/* __jhash_mix -- mix 3 32-bit values reversibly. */
>> +#define __jhash_mix(a, b, c)                \
>> +{                                           \
>> +    a -= c;  a ^= rol32(c, 4);  c += b;     \
>> +    b -= a;  b ^= rol32(a, 6);  a += c;     \
>> +    c -= b;  c ^= rol32(b, 8);  b += a;     \
>> +    a -= c;  a ^= rol32(c, 16); c += b;     \
>> +    b -= a;  b ^= rol32(a, 19); a += c;     \
>> +    c -= b;  c ^= rol32(b, 4);  b += a;     \
>> +}
>> +
>> +/* __jhash_final - final mixing of 3 32-bit values (a,b,c) into c */
>> +#define __jhash_final(a, b, c)  \
>> +{                               \
>> +    c ^= b; c -= rol32(b, 14);  \
>> +    a ^= c; a -= rol32(c, 11);  \
>> +    b ^= a; b -= rol32(a, 25);  \
>> +    c ^= b; c -= rol32(b, 16);  \
>> +    a ^= c; a -= rol32(c, 4);   \
>> +    b ^= a; b -= rol32(a, 14);  \
>> +    c ^= b; c -= rol32(b, 24);  \
>> +}
>> +
>> +/* An arbitrary initial parameter */
>> +#define JHASH_INITVAL           0xdeadbeef
>> +
>> +#endif /* QEMU_JHASH_H__ */
>> diff --git a/net/colo-compare.c b/net/colo-compare.c
>> index 62c66df..0bb5a51 100644
>> --- a/net/colo-compare.c
>> +++ b/net/colo-compare.c
>> @@ -20,15 +20,22 @@
>>   #include "net/queue.h"
>>   #include "sysemu/char.h"
>>   #include "qemu/sockets.h"
>> +#include <sys/sysinfo.h>
>> +#include "slirp/slirp.h"
>> +#include "qemu/jhash.h"
>> +#include <sys/sysinfo.h>
>>
>>   #define TYPE_COLO_COMPARE "colo-compare"
>>   #define COLO_COMPARE(obj) \
>>       OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
>>
>>   #define COMPARE_READ_LEN_MAX NET_BUFSIZE
>> +#define PAGE_SIZE 4096
>> +#define ETH_HLEN 14
>
> PAGE_SIZE is not just 4k; use one of the system headers.
> Also, don't define ETH_HLEN - include net/eth.h
>
>>   static QTAILQ_HEAD(, CompareState) net_compares =
>>          QTAILQ_HEAD_INITIALIZER(net_compares);
>> +static ssize_t hashtable_max_size;
>>
>>   typedef struct ReadState {
>>       int state; /* 0 = getting length, 1 = getting data */
>> @@ -37,6 +44,28 @@ typedef struct ReadState {
>>       uint8_t buf[COMPARE_READ_LEN_MAX];
>>   } ReadState;
>>
>> +/*
>> +  + CompareState ++
>> +  |               |
>> +  +---------------+   +---------------+         +---------------+
>> +  |conn list      +--->conn           +--------->conn           |
>> +  +---------------+   +---------------+         +---------------+
>> +  |               |     |           |             |          |
>> +  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
>> +                    |primary |  |secondary    |primary | |secondary
>> +                    |packet  |  |packet  +    |packet  | |packet  +
>> +                    +--------+  +--------+    +--------+ +--------+
>> +                        |           |             |          |
>> +                    +---v----+  +---v----+    +---v----+ +---v----+
>> +                    |primary |  |secondary    |primary | |secondary
>> +                    |packet  |  |packet  +    |packet  | |packet  +
>> +                    +--------+  +--------+    +--------+ +--------+
>> +                        |           |             |          |
>> +                    +---v----+  +---v----+    +---v----+ +---v----+
>> +                    |primary |  |secondary    |primary | |secondary
>> +                    |packet  |  |packet  +    |packet  | |packet  +
>> +                    +--------+  +--------+    +--------+ +--------+
>> +*/
>>   typedef struct CompareState {
>>       Object parent;
>>
>> @@ -49,8 +78,268 @@ typedef struct CompareState {
>>       QTAILQ_ENTRY(CompareState) next;
>>       ReadState pri_rs;
>>       ReadState sec_rs;
>> +
>> +    /* connection list: the connections belonged to this NIC could be found
>> +     * in this list.
>> +     * element type: Connection
>> +     */
>> +    GQueue conn_list;
>> +    QemuMutex conn_list_lock; /* to protect conn_list */
>> +    /* hashtable to save connection */
>> +    GHashTable *connection_track_table;
>> +    /* to save unprocessed_connections */
>> +    GQueue unprocessed_connections;
>> +    /* proxy current hash size */
>> +    ssize_t hashtable_size;
>>   } CompareState;
>>
>> +typedef struct Packet {
>> +    void *data;
>> +    union {
>> +        uint8_t *network_layer;
>> +        struct ip *ip;
>> +    };
>> +    uint8_t *transport_layer;
>> +    int size;
>> +    CompareState *s;
>> +} Packet;
>> +
>> +typedef struct ConnectionKey {
>> +    /* (src, dst) must be grouped, in the same way than in IP header */
>> +    struct in_addr src;
>> +    struct in_addr dst;
>> +    uint16_t src_port;
>> +    uint16_t dst_port;
>> +    uint8_t ip_proto;
>> +} QEMU_PACKED ConnectionKey;
>
> Someone will want IPv6 at some point, so think about that, but not
> too worried for now.
>
>> +typedef struct Connection {
>> +    QemuMutex list_lock;
>> +    /* connection primary send queue: element type: Packet */
>> +    GQueue primary_list;
>> +    /* connection secondary send queue: element type: Packet */
>> +    GQueue secondary_list;
>> +    /* flag to enqueue unprocessed_connections */
>> +    bool processing;
>> +    int ip_proto;
>
> in ConnectionKey you use uint8_t for ip_proto  - should
> be consistent?
>
>> +} Connection;
>> +
>> +enum {
>> +    PRIMARY_IN = 0,
>> +    SECONDARY_IN,
>> +};
>> +
>> +static void packet_destroy(void *opaque, void *user_data);
>> +static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size);
>> +
>> +static uint32_t connection_key_hash(const void *opaque)
>> +{
>> +    const ConnectionKey *key = opaque;
>> +    uint32_t a, b, c;
>> +
>> +    /* Jenkins hash */
>> +    a = b = c = JHASH_INITVAL + sizeof(*key);
>> +    a += key->src.s_addr;
>> +    b += key->dst.s_addr;
>> +    c += (key->src_port | key->dst_port << 16);
>> +    __jhash_mix(a, b, c);
>> +
>> +    a += key->ip_proto;
>> +    __jhash_final(a, b, c);
>> +
>> +    return c;
>> +}
>> +
>> +static int connection_key_equal(const void *opaque1, const void *opaque2)
>> +{
>> +    return memcmp(opaque1, opaque2, sizeof(ConnectionKey)) == 0;
>> +}
>> +
>> +/*
>> + *  initialize connecon_key for packet
>                          ^ti
>
>> + *  Return 0 on success, if return 1 the pkt will be sent later
>> + */
>> +static int connection_key_init(Packet *pkt, ConnectionKey *key)
>> +{
>> +    int network_length;
>> +    uint8_t *data = pkt->data;
>> +    uint16_t l3_proto;
>> +    uint32_t tmp_ports;
>> +    ssize_t l2hdr_len = eth_get_l2_hdr_length(data);
>> +
>> +    pkt->network_layer = data + ETH_HLEN;
>> +    l3_proto = eth_get_l3_proto(data, l2hdr_len);
>> +    if (l3_proto != ETH_P_IP) {
>> +        return 1;
>> +    }
>> +
>> +    network_length = pkt->ip->ip_hl * 4;
>> +    pkt->transport_layer = pkt->network_layer + network_length;
>
> Have we checked that this is valid - this is guest/external network
> data, so is that 'network_length' actually pointing to valid data
> or off the end of the packet?
>
>> +    key->ip_proto = pkt->ip->ip_p;
>> +    key->src = pkt->ip->ip_src;
>> +    key->dst = pkt->ip->ip_dst;
>> +
>> +    switch (key->ip_proto) {
>> +    case IPPROTO_TCP:
>> +    case IPPROTO_UDP:
>> +    case IPPROTO_DCCP:
>> +    case IPPROTO_ESP:
>> +    case IPPROTO_SCTP:
>> +    case IPPROTO_UDPLITE:
>> +        tmp_ports = *(uint32_t *)(pkt->transport_layer);
>> +        key->src_port = tmp_ports & 0xffff;
>> +        key->dst_port = tmp_ports >> 16;
>
> Do these need ntohs - or do you want to keep them in network
> order?  In my world on your older code I added ntohs's because
> it made debugging make a lot more sense when you print out src_port/dst_port.

Agree.

>
>> +        break;
>> +    case IPPROTO_AH:
>> +        tmp_ports = *(uint32_t *)(pkt->transport_layer + 4);
>> +        key->src_port = tmp_ports & 0xffff;
>> +        key->dst_port = tmp_ports >> 16;
>> +        break;
>> +    default:
>
>     Do you need to set src_port/dst_port here (to 0 ?? ) ?
>
>> +        break;
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +static Connection *connection_new(ConnectionKey *key)
>> +{
>> +    Connection *conn = g_slice_new(Connection);
>> +
>> +    qemu_mutex_init(&conn->list_lock);
>> +    conn->ip_proto = key->ip_proto;
>> +    conn->processing = false;
>> +    g_queue_init(&conn->primary_list);
>> +    g_queue_init(&conn->secondary_list);
>> +
>> +    return conn;
>> +}
>> +
>> +/*
>> + * Clear hashtable, stop this hash growing really huge
>> + */
>> +static void connection_hashtable_reset(CompareState *s)
>> +{
>> +    s->hashtable_size = 0;
>> +    g_hash_table_remove_all(s->connection_track_table);
>> +}
>> +
>> +/* if not found, creata a new connection and add to hash table */
>
>   Typo                    ^
>
>> +static Connection *connection_get(CompareState *s, ConnectionKey *key)
>> +{
>> +    /* FIXME: protect connection_track_table */
>> +    Connection *conn = g_hash_table_lookup(s->connection_track_table, key);
>> +
>> +    if (conn == NULL) {
>> +        ConnectionKey *new_key = g_memdup(key, sizeof(*key));
>> +
>> +        conn = connection_new(key);
>> +
>> +        s->hashtable_size++;
>> +        if (s->hashtable_size > hashtable_max_size) {
>> +            error_report("colo proxy connection hashtable full, clear it");
>> +            connection_hashtable_reset(s);
>> +            /* TODO:clear conn_list */
>
>> +        } else {
>
> This feels wrong;
Agree.

> should this actually be in an else? If you've just cleared
> the hash table, then you probably want to add this new connection to the empty
> table? (And for example at the moment the 'new_key' is not used if we go down
> this if).
>
>> +            g_hash_table_insert(s->connection_track_table, new_key, conn);
>> +        }
>> +    }
>> +
>> +     return conn;
>> +}
>> +
>> +static void connection_destroy(void *opaque)
>> +{
>> +    Connection *conn = opaque;
>> +
>> +    qemu_mutex_lock(&conn->list_lock);
>> +    g_queue_foreach(&conn->primary_list, packet_destroy, NULL);
>> +    g_queue_free(&conn->primary_list);
>> +    g_queue_foreach(&conn->secondary_list, packet_destroy, NULL);
>> +    g_queue_free(&conn->secondary_list);
>> +    qemu_mutex_unlock(&conn->list_lock);
>> +    qemu_mutex_destroy(&conn->list_lock);
>> +    g_slice_free(Connection, conn);
>> +}
>> +
>> +static Packet *packet_new(CompareState *s, const void *data,
>> +                              int size, ConnectionKey *key)
>> +{
>> +    Packet *pkt = g_slice_new(Packet);
>> +
>> +    pkt->data = g_memdup(data, size);
>> +    pkt->size = size;
>> +    pkt->s = s;
>> +
>> +    if (connection_key_init(pkt, key)) {
>> +        packet_destroy(pkt, NULL);
>> +        pkt = NULL;
>> +    }
>> +
>> +    return pkt;
>> +}
>> +
>> +static int packet_enqueue(CompareState *s, int mode)
>> +{
>> +    ConnectionKey key = {{ 0 } };
>> +    Packet *pkt = NULL;
>> +    Connection *conn;
>> +
>> +    /* arp packet will be sent */
>
> Can you add some more detail about that - what do the return
> values of packet_enqueue mean; what happens to things like IPv6 or ARP packets?
>
>> +    if (mode == PRIMARY_IN) {
>> +        pkt = packet_new(s, s->pri_rs.buf, s->pri_rs.packet_len, &key);
>> +    } else {
>> +        pkt = packet_new(s, s->sec_rs.buf, s->sec_rs.packet_len, &key);
>> +    }
>> +    if (!pkt) {
>> +        return -1;
>> +    }
>> +
>> +    conn = connection_get(s, &key);
>> +    if (!conn->processing) {
>> +        qemu_mutex_lock(&s->conn_list_lock);
>> +        g_queue_push_tail(&s->conn_list, conn);
>> +        qemu_mutex_unlock(&s->conn_list_lock);
>> +        conn->processing = true;
>> +    }
>> +
>> +    qemu_mutex_lock(&conn->list_lock);
>> +    if (mode == PRIMARY_IN) {
>> +        g_queue_push_tail(&conn->primary_list, pkt);
>> +    } else {
>> +        g_queue_push_tail(&conn->secondary_list, pkt);
>> +    }
>> +    qemu_mutex_unlock(&conn->list_lock);
>> +
>> +    return 0;
>> +}
>> +
>> +static void packet_destroy(void *opaque, void *user_data)
>> +{
>> +    Packet *pkt = opaque;
>> +
>> +    g_free(pkt->data);
>> +    g_slice_free(Packet, pkt);
>> +}
>> +
>> +static inline void colo_flush_connection(void *opaque, void *user_data)
>> +{
>
> Is this used?
Yes, it isn't used currently.
Actually this is needed after compare module is integrated to COLO frame.


>
>> +    Connection *conn = opaque;
>> +    Packet *pkt = NULL;
>> +
>> +    qemu_mutex_lock(&conn->list_lock);
>> +    while (!g_queue_is_empty(&conn->primary_list)) {
>> +        pkt = g_queue_pop_head(&conn->primary_list);
>> +        compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size);
>> +        /* FIXME: destroy pkt ?*/
>> +    }
>> +    while (!g_queue_is_empty(&conn->secondary_list)) {
>> +        pkt = g_queue_pop_head(&conn->secondary_list);
>> +        packet_destroy(pkt, NULL);
>> +    }
>> +    qemu_mutex_unlock(&conn->list_lock);
>> +}
>> +
>>   static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size)
>>   {
>>       int ret = 0;
>> @@ -142,8 +431,10 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
>>
>>       ret = compare_chr_fill_rstate(&s->pri_rs, buf, size);
>>       if (ret == 1) {
>> -        /* FIXME: enqueue to primary packet list */
>> -        compare_chr_send(s->chr_out, buf, size);
>> +        if (packet_enqueue(s, PRIMARY_IN)) {
>> +            error_report("primary: unsupported packet in");
>
> Is this for non-IP packets?  If so you don't want an error_report - because non-IP are
> quite common; a trace would be useful giving the packet type etc
Agree. And further more, IMO release the packet to client is not always correct for all non-IP
but at current stage, this looks fine.

Thanks
Li Zhijian

>
>> +            compare_chr_send(s->chr_out, buf, size);
>> +        }
>>       } else if (ret == -1) {
>>           qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
>>       }
>> @@ -156,7 +447,9 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
>>
>>       ret = compare_chr_fill_rstate(&s->sec_rs, buf, size);
>>       if (ret == 1) {
>> -        /* TODO: enqueue to secondary packet list*/
>> +        if (packet_enqueue(s, SECONDARY_IN)) {
>> +            error_report("secondary: unsupported packet in");
>> +        }
>>       } else if (ret == -1) {
>>           qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
>>       }
>> @@ -210,6 +503,7 @@ static void compare_set_outdev(Object *obj, const char *value, Error **errp)
>>   static void colo_compare_complete(UserCreatable *uc, Error **errp)
>>   {
>>       CompareState *s = COLO_COMPARE(uc);
>> +    struct sysinfo si;
>>
>>       if (!s->pri_indev || !s->sec_indev || !s->outdev) {
>>           error_setg(errp, "colo compare needs 'primary_in' ,"
>> @@ -255,6 +549,29 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>>
>>       QTAILQ_INSERT_TAIL(&net_compares, s, next);
>>
>> +    g_queue_init(&s->conn_list);
>> +    qemu_mutex_init(&s->conn_list_lock);
>> +
>> +    s->hashtable_size = 0;
>> +    /*
>> +     * Idea from kernel tcp.c: use 1/16384 of memory.  On i386: 32MB
>> +     * machine has 512 buckets. >= 1GB machines have 16384 buckets.
>> +     */
>> +    sysinfo(&si);
>> +    hashtable_max_size = si.totalram / 16384;
>> +    if (si.totalram > (1024 * 1024 * 1024 / PAGE_SIZE)) {
>> +        hashtable_max_size = 16384;
>> +    }
>> +    if (hashtable_max_size < 32) {
>> +        hashtable_max_size = 32;
>> +    }
>> +    hashtable_max_size = hashtable_max_size * 8; /* default factor = 8 */
>
> Make this a lot simpler; just pick a size and if it's a problem then we'll worry
> about it later, or make it an option on the filter if you want it changeable.
>
>> +    s->connection_track_table = g_hash_table_new_full(connection_key_hash,
>> +                                                      connection_key_equal,
>> +                                                      g_free,
>> +                                                      connection_destroy);
>> +
>>       return;
>>
>>   out:
>> @@ -297,6 +614,7 @@ static void colo_compare_class_finalize(ObjectClass *oc, void *data)
>>       if (!QTAILQ_EMPTY(&net_compares)) {
>>           QTAILQ_REMOVE(&net_compares, s, next);
>>       }
>> +    qemu_mutex_destroy(&s->conn_list_lock);
>>   }
>>
>>   static void colo_compare_init(Object *obj)
>> --
>> 1.9.1
>>
>>
>>
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
>
>
> .
>

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH V2 3/3] colo-compare: introduce packet comparison thread
  2016-03-30 11:41   ` Dr. David Alan Gilbert
@ 2016-03-31  2:17     ` Li Zhijian
  2016-03-31  8:50       ` Dr. David Alan Gilbert
  2016-03-31  6:00     ` Zhang Chen
  1 sibling, 1 reply; 26+ messages in thread
From: Li Zhijian @ 2016-03-31  2:17 UTC (permalink / raw)
  To: Dr. David Alan Gilbert, Zhang Chen
  Cc: zhanghailiang, Gui jianfeng, Jason Wang, eddie.dong, qemu devel,
	Yang Hongyang



On 03/30/2016 07:41 PM, Dr. David Alan Gilbert wrote:
[...]

>> >@@ -433,7 +532,9 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
>> >      if (ret == 1) {
>> >          if (packet_enqueue(s, PRIMARY_IN)) {
>> >              error_report("primary: unsupported packet in");
>> >-            compare_chr_send(s->chr_out, buf, size);
>> >+            compare_chr_send(s->chr_out, s->pri_rs.buf, s->pri_rs.packet_len);
> Doesn't that change belong in an earlier patch?
>
>> >+        } else {
>> >+            qemu_event_set(&s->event);
> Also these - why are these in this patch?
This event is to wakeup comparison thread to do compare.
Do you think we should put event related code to patch 2 ?

Thanks
Li
>
>> >          }

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH V2 0/3] Introduce COLO-compare
  2016-03-30 12:05 ` [Qemu-devel] [PATCH V2 0/3] Introduce COLO-compare Dr. David Alan Gilbert
@ 2016-03-31  3:01   ` Li Zhijian
  2016-03-31  9:43     ` Dr. David Alan Gilbert
  2016-03-31  6:48   ` Zhang Chen
  1 sibling, 1 reply; 26+ messages in thread
From: Li Zhijian @ 2016-03-31  3:01 UTC (permalink / raw)
  To: Dr. David Alan Gilbert, Zhang Chen
  Cc: zhanghailiang, Gui jianfeng, Jason Wang, eddie.dong, qemu devel,
	Yang Hongyang



On 03/30/2016 08:05 PM, Dr. David Alan Gilbert wrote:
> * Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
>> COLO-compare is a part of COLO project. It is used
>> to compare the network package to help COLO decide
>> whether to do checkpoint.
>
> Hi Zhang Chen,
>    I've put comments on the individual patches, but some more general things:
>
>    1) Please add a coment giving the example of the command line for the primary
>      and secondary use of this module - it helps make it easier to understand the patches.
>
>    2) There's no tracing in here - please add some; I found when I tried to get
>      COLO working I needed to use lots of tracing and debugging to understand the
>      packet flow.
>
>    3) Add comments; e.g. for each function say which thread is using it and where
>       the packets are coming from; e.g.
>          called from the main thread on the primary for packets arriving over the socket
>          from the secondary.
>
>       There's just so many packets going in so many directions it would make it
>       easier to follow.
>
>    4) A more fundamental problem is what happens if the secondary never sends anything
>       on the socket, the result is you end up running until the end of the long COLO
>       checkpoint without triggering a discompare - in my world I added a timeout (400ms)
>       for an unmatched packet from the primary, where if no matching packet was received
>       a checkpoint would be triggered.
>
>    5) I see the packet comparison is still the simple memcmpy that you had in December;
>       are you planning on doing anything more complicated; you must be seing most packets
>       miscompare?
>
> You can see my current world at; https://github.com/orbitfp7/qemu/commits/orbit-wp4-colo-mar16
> which has my basic TCP comparison (it's only tracking incoming connections) and I know it's
> not complete either.  It mostly works OK, although I've got an occasional seg
> (which makes me wonder if I need to add the conn_list_lock I see you added).  I'm also
> not doing any TCP reassembly which is probably needed.
>
Thank you very much for your comments.
I just see you tree, you put in a lot of work(tcp comparison enhance, sequence/acknowledge
number re-write, timeout...)

Actually, this compare module is just in a RFC stage(only including compare frame), there are
many works to be done:

1) Integrate to COLO frame(and Let COLO primary and secondary at running state)

2) ip segment defrag

3) comparison base on the sequence number(tcp and udp) if packet has
    Because tcp re-transmission is quit common. IRC, your code will compare the whole tcp
     packet(sequence number will be compare)

4) packet belongs to the same connection is sort by sequence number

5) Out-Of-Oder packet handle

6) cleanup the un-active conn_list which maybe closed. the simple way is to introduce a
    timer to record whether a connection have packet come within a timeout, connection gone
     beyond this timeout should be cleanup.

7) Dave point out above (4)

8) something I miss...

For Various reasons, not all the works can be done immediately, So we hope to discuss and
decide which function have the high priority.
Any comments and suggestions are welcome.

IMO, a compare frame and a COLO frame hack patch could be simple enough.

Thanks
Li

> Dave
>
>> v2:
>>   - add jhash.h
>>
>> v1:
>>   - initial patch
>>
>>
>> Zhang Chen (3):
>>    colo-compare: introduce colo compare initlization
>>    colo-compare: track connection and enqueue packet
>>    colo-compare: introduce packet comparison thread
>>
>>   include/qemu/jhash.h |  59 ++++
>>   net/Makefile.objs    |   1 +
>>   net/colo-compare.c   | 782 +++++++++++++++++++++++++++++++++++++++++++++++++++
>>   vl.c                 |   3 +-
>>   4 files changed, 844 insertions(+), 1 deletion(-)
>>   create mode 100644 include/qemu/jhash.h
>>   create mode 100644 net/colo-compare.c
>>
>> --
>> 1.9.1
>>
>>
>>
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
>
>
> .
>

-- 
Best regards.
Li Zhijian (8555)

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH V2 2/3] colo-compare: track connection and enqueue packet
  2016-03-30 10:36   ` Dr. David Alan Gilbert
  2016-03-31  2:09     ` Li Zhijian
@ 2016-03-31  4:06     ` Zhang Chen
  2016-03-31  4:23       ` Li Zhijian
  1 sibling, 1 reply; 26+ messages in thread
From: Zhang Chen @ 2016-03-31  4:06 UTC (permalink / raw)
  To: Dr. David Alan Gilbert
  Cc: Li Zhijian, Gui jianfeng, Jason Wang, eddie.dong, qemu devel,
	Yang Hongyang, zhanghailiang



On 03/30/2016 06:36 PM, Dr. David Alan Gilbert wrote:
> * Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
>> In this patch we use kernel jhash table to track
>> connection, and then enqueue net packet like this:
>>
>> + CompareState ++
>> |               |
>> +---------------+   +---------------+         +---------------+
>> |conn list      +--->conn           +--------->conn           |
>> +---------------+   +---------------+         +---------------+
>> |               |     |           |             |          |
>> +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
>>                    |primary |  |secondary    |primary | |secondary
>>                    |packet  |  |packet  +    |packet  | |packet  +
>>                    +--------+  +--------+    +--------+ +--------+
>>                        |           |             |          |
>>                    +---v----+  +---v----+    +---v----+ +---v----+
>>                    |primary |  |secondary    |primary | |secondary
>>                    |packet  |  |packet  +    |packet  | |packet  +
>>                    +--------+  +--------+    +--------+ +--------+
>>                        |           |             |          |
>>                    +---v----+  +---v----+    +---v----+ +---v----+
>>                    |primary |  |secondary    |primary | |secondary
>>                    |packet  |  |packet  +    |packet  | |packet  +
>>                    +--------+  +--------+    +--------+ +--------+
>>
>> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
>> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
>> ---
>>   include/qemu/jhash.h |  59 ++++++++++
>>   net/colo-compare.c   | 324 ++++++++++++++++++++++++++++++++++++++++++++++++++-
>>   2 files changed, 380 insertions(+), 3 deletions(-)
>>   create mode 100644 include/qemu/jhash.h
>>
>> diff --git a/include/qemu/jhash.h b/include/qemu/jhash.h
>> new file mode 100644
>> index 0000000..8a8ff0f
>> --- /dev/null
>> +++ b/include/qemu/jhash.h
>> @@ -0,0 +1,59 @@
>> +/* jhash.h: Jenkins hash support.
>> +  *
>> +  * Copyright (C) 2006. Bob Jenkins (bob_jenkins@burtleburtle.net)
>> +  *
>> +  * http://burtleburtle.net/bob/hash/
>> +  *
>> +  * These are the credits from Bob's sources:
>> +  *
>> +  * lookup3.c, by Bob Jenkins, May 2006, Public Domain.
>> +  *
>> +  * These are functions for producing 32-bit hashes for hash table lookup.
>> +  * hashword(), hashlittle(), hashlittle2(), hashbig(), mix(), and final()
>> +  * are externally useful functions.  Routines to test the hash are included
>> +  * if SELF_TEST is defined.  You can use this free for any purpose.It's in
>> +  * the public domain.  It has no warranty.
>> +  *
>> +  * Copyright (C) 2009-2010 Jozsef Kadlecsik (kadlec@blackhole.kfki.hu)
>> +  *
>> +  * I've modified Bob's hash to be useful in the Linux kernel, and
>> +  * any bugs present are my fault.
>> +  * Jozsef
>> +  */
>> +
>> +#ifndef QEMU_JHASH_H__
>> +#define QEMU_JHASH_H__
>> +
>> +#include "qemu/bitops.h"
>> +
>> +/*
>> + * hashtable related is copied from linux kernel jhash
>> + */
>> +
>> +/* __jhash_mix -- mix 3 32-bit values reversibly. */
>> +#define __jhash_mix(a, b, c)                \
>> +{                                           \
>> +    a -= c;  a ^= rol32(c, 4);  c += b;     \
>> +    b -= a;  b ^= rol32(a, 6);  a += c;     \
>> +    c -= b;  c ^= rol32(b, 8);  b += a;     \
>> +    a -= c;  a ^= rol32(c, 16); c += b;     \
>> +    b -= a;  b ^= rol32(a, 19); a += c;     \
>> +    c -= b;  c ^= rol32(b, 4);  b += a;     \
>> +}
>> +
>> +/* __jhash_final - final mixing of 3 32-bit values (a,b,c) into c */
>> +#define __jhash_final(a, b, c)  \
>> +{                               \
>> +    c ^= b; c -= rol32(b, 14);  \
>> +    a ^= c; a -= rol32(c, 11);  \
>> +    b ^= a; b -= rol32(a, 25);  \
>> +    c ^= b; c -= rol32(b, 16);  \
>> +    a ^= c; a -= rol32(c, 4);   \
>> +    b ^= a; b -= rol32(a, 14);  \
>> +    c ^= b; c -= rol32(b, 24);  \
>> +}
>> +
>> +/* An arbitrary initial parameter */
>> +#define JHASH_INITVAL           0xdeadbeef
>> +
>> +#endif /* QEMU_JHASH_H__ */
>> diff --git a/net/colo-compare.c b/net/colo-compare.c
>> index 62c66df..0bb5a51 100644
>> --- a/net/colo-compare.c
>> +++ b/net/colo-compare.c
>> @@ -20,15 +20,22 @@
>>   #include "net/queue.h"
>>   #include "sysemu/char.h"
>>   #include "qemu/sockets.h"
>> +#include <sys/sysinfo.h>
>> +#include "slirp/slirp.h"
>> +#include "qemu/jhash.h"
>> +#include <sys/sysinfo.h>
>>   
>>   #define TYPE_COLO_COMPARE "colo-compare"
>>   #define COLO_COMPARE(obj) \
>>       OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
>>   
>>   #define COMPARE_READ_LEN_MAX NET_BUFSIZE
>> +#define PAGE_SIZE 4096
>> +#define ETH_HLEN 14
> PAGE_SIZE is not just 4k; use one of the system headers.

OK, I will fix it with include/exec/cpu-all.h TARGET_PAGE_SIZE

> Also, don't define ETH_HLEN - include net/eth.h

OK

>>   static QTAILQ_HEAD(, CompareState) net_compares =
>>          QTAILQ_HEAD_INITIALIZER(net_compares);
>> +static ssize_t hashtable_max_size;
>>   
>>   typedef struct ReadState {
>>       int state; /* 0 = getting length, 1 = getting data */
>> @@ -37,6 +44,28 @@ typedef struct ReadState {
>>       uint8_t buf[COMPARE_READ_LEN_MAX];
>>   } ReadState;
>>   
>> +/*
>> +  + CompareState ++
>> +  |               |
>> +  +---------------+   +---------------+         +---------------+
>> +  |conn list      +--->conn           +--------->conn           |
>> +  +---------------+   +---------------+         +---------------+
>> +  |               |     |           |             |          |
>> +  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
>> +                    |primary |  |secondary    |primary | |secondary
>> +                    |packet  |  |packet  +    |packet  | |packet  +
>> +                    +--------+  +--------+    +--------+ +--------+
>> +                        |           |             |          |
>> +                    +---v----+  +---v----+    +---v----+ +---v----+
>> +                    |primary |  |secondary    |primary | |secondary
>> +                    |packet  |  |packet  +    |packet  | |packet  +
>> +                    +--------+  +--------+    +--------+ +--------+
>> +                        |           |             |          |
>> +                    +---v----+  +---v----+    +---v----+ +---v----+
>> +                    |primary |  |secondary    |primary | |secondary
>> +                    |packet  |  |packet  +    |packet  | |packet  +
>> +                    +--------+  +--------+    +--------+ +--------+
>> +*/
>>   typedef struct CompareState {
>>       Object parent;
>>   
>> @@ -49,8 +78,268 @@ typedef struct CompareState {
>>       QTAILQ_ENTRY(CompareState) next;
>>       ReadState pri_rs;
>>       ReadState sec_rs;
>> +
>> +    /* connection list: the connections belonged to this NIC could be found
>> +     * in this list.
>> +     * element type: Connection
>> +     */
>> +    GQueue conn_list;
>> +    QemuMutex conn_list_lock; /* to protect conn_list */
>> +    /* hashtable to save connection */
>> +    GHashTable *connection_track_table;
>> +    /* to save unprocessed_connections */
>> +    GQueue unprocessed_connections;
>> +    /* proxy current hash size */
>> +    ssize_t hashtable_size;
>>   } CompareState;
>>   
>> +typedef struct Packet {
>> +    void *data;
>> +    union {
>> +        uint8_t *network_layer;
>> +        struct ip *ip;
>> +    };
>> +    uint8_t *transport_layer;
>> +    int size;
>> +    CompareState *s;
>> +} Packet;
>> +
>> +typedef struct ConnectionKey {
>> +    /* (src, dst) must be grouped, in the same way than in IP header */
>> +    struct in_addr src;
>> +    struct in_addr dst;
>> +    uint16_t src_port;
>> +    uint16_t dst_port;
>> +    uint8_t ip_proto;
>> +} QEMU_PACKED ConnectionKey;
> Someone will want IPv6 at some point, so think about that, but not
> too worried for now.
>
>> +typedef struct Connection {
>> +    QemuMutex list_lock;
>> +    /* connection primary send queue: element type: Packet */
>> +    GQueue primary_list;
>> +    /* connection secondary send queue: element type: Packet */
>> +    GQueue secondary_list;
>> +    /* flag to enqueue unprocessed_connections */
>> +    bool processing;
>> +    int ip_proto;
> in ConnectionKey you use uint8_t for ip_proto  - should
> be consistent?

OK, will fix it

>
>> +} Connection;
>> +
>> +enum {
>> +    PRIMARY_IN = 0,
>> +    SECONDARY_IN,
>> +};
>> +
>> +static void packet_destroy(void *opaque, void *user_data);
>> +static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size);
>> +
>> +static uint32_t connection_key_hash(const void *opaque)
>> +{
>> +    const ConnectionKey *key = opaque;
>> +    uint32_t a, b, c;
>> +
>> +    /* Jenkins hash */
>> +    a = b = c = JHASH_INITVAL + sizeof(*key);
>> +    a += key->src.s_addr;
>> +    b += key->dst.s_addr;
>> +    c += (key->src_port | key->dst_port << 16);
>> +    __jhash_mix(a, b, c);
>> +
>> +    a += key->ip_proto;
>> +    __jhash_final(a, b, c);
>> +
>> +    return c;
>> +}
>> +
>> +static int connection_key_equal(const void *opaque1, const void *opaque2)
>> +{
>> +    return memcmp(opaque1, opaque2, sizeof(ConnectionKey)) == 0;
>> +}
>> +
>> +/*
>> + *  initialize connecon_key for packet
>                          ^ti
>
>> + *  Return 0 on success, if return 1 the pkt will be sent later
>> + */
>> +static int connection_key_init(Packet *pkt, ConnectionKey *key)
>> +{
>> +    int network_length;
>> +    uint8_t *data = pkt->data;
>> +    uint16_t l3_proto;
>> +    uint32_t tmp_ports;
>> +    ssize_t l2hdr_len = eth_get_l2_hdr_length(data);
>> +
>> +    pkt->network_layer = data + ETH_HLEN;
>> +    l3_proto = eth_get_l3_proto(data, l2hdr_len);
>> +    if (l3_proto != ETH_P_IP) {
>> +        return 1;
>> +    }
>> +
>> +    network_length = pkt->ip->ip_hl * 4;
>> +    pkt->transport_layer = pkt->network_layer + network_length;
> Have we checked that this is valid - this is guest/external network
> data, so is that 'network_length' actually pointing to valid data
> or off the end of the packet?

I will check it before use in next version.

>
>> +    key->ip_proto = pkt->ip->ip_p;
>> +    key->src = pkt->ip->ip_src;
>> +    key->dst = pkt->ip->ip_dst;
>> +
>> +    switch (key->ip_proto) {
>> +    case IPPROTO_TCP:
>> +    case IPPROTO_UDP:
>> +    case IPPROTO_DCCP:
>> +    case IPPROTO_ESP:
>> +    case IPPROTO_SCTP:
>> +    case IPPROTO_UDPLITE:
>> +        tmp_ports = *(uint32_t *)(pkt->transport_layer);
>> +        key->src_port = tmp_ports & 0xffff;
>> +        key->dst_port = tmp_ports >> 16;
> Do these need ntohs - or do you want to keep them in network
> order?  In my world on your older code I added ntohs's because
> it made debugging make a lot more sense when you print out src_port/dst_port.

Make sense, I will fix it.

>
>> +        break;
>> +    case IPPROTO_AH:
>> +        tmp_ports = *(uint32_t *)(pkt->transport_layer + 4);
>> +        key->src_port = tmp_ports & 0xffff;
>> +        key->dst_port = tmp_ports >> 16;
>> +        break;
>> +    default:
>     Do you need to set src_port/dst_port here (to 0 ?? ) ?

Yes,will fix it.

>
>> +        break;
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +static Connection *connection_new(ConnectionKey *key)
>> +{
>> +    Connection *conn = g_slice_new(Connection);
>> +
>> +    qemu_mutex_init(&conn->list_lock);
>> +    conn->ip_proto = key->ip_proto;
>> +    conn->processing = false;
>> +    g_queue_init(&conn->primary_list);
>> +    g_queue_init(&conn->secondary_list);
>> +
>> +    return conn;
>> +}
>> +
>> +/*
>> + * Clear hashtable, stop this hash growing really huge
>> + */
>> +static void connection_hashtable_reset(CompareState *s)
>> +{
>> +    s->hashtable_size = 0;
>> +    g_hash_table_remove_all(s->connection_track_table);
>> +}
>> +
>> +/* if not found, creata a new connection and add to hash table */
>   Typo                    ^

will fix.

>> +static Connection *connection_get(CompareState *s, ConnectionKey *key)
>> +{
>> +    /* FIXME: protect connection_track_table */
>> +    Connection *conn = g_hash_table_lookup(s->connection_track_table, key);
>> +
>> +    if (conn == NULL) {
>> +        ConnectionKey *new_key = g_memdup(key, sizeof(*key));
>> +
>> +        conn = connection_new(key);
>> +
>> +        s->hashtable_size++;
>> +        if (s->hashtable_size > hashtable_max_size) {
>> +            error_report("colo proxy connection hashtable full, clear it");
>> +            connection_hashtable_reset(s);
>> +            /* TODO:clear conn_list */
>> +        } else {
> This feels wrong; should this actually be in an else? If you've just cleared
> the hash table, then you probably want to add this new connection to the empty
> table? (And for example at the moment the 'new_key' is not used if we go down
> this if).
>

you are right, I will fix.

>> +            g_hash_table_insert(s->connection_track_table, new_key, conn);
>> +        }
>> +    }
>> +
>> +     return conn;
>> +}
>> +
>> +static void connection_destroy(void *opaque)
>> +{
>> +    Connection *conn = opaque;
>> +
>> +    qemu_mutex_lock(&conn->list_lock);
>> +    g_queue_foreach(&conn->primary_list, packet_destroy, NULL);
>> +    g_queue_free(&conn->primary_list);
>> +    g_queue_foreach(&conn->secondary_list, packet_destroy, NULL);
>> +    g_queue_free(&conn->secondary_list);
>> +    qemu_mutex_unlock(&conn->list_lock);
>> +    qemu_mutex_destroy(&conn->list_lock);
>> +    g_slice_free(Connection, conn);
>> +}
>> +
>> +static Packet *packet_new(CompareState *s, const void *data,
>> +                              int size, ConnectionKey *key)
>> +{
>> +    Packet *pkt = g_slice_new(Packet);
>> +
>> +    pkt->data = g_memdup(data, size);
>> +    pkt->size = size;
>> +    pkt->s = s;
>> +
>> +    if (connection_key_init(pkt, key)) {
>> +        packet_destroy(pkt, NULL);
>> +        pkt = NULL;
>> +    }
>> +
>> +    return pkt;
>> +}
>> +
>> +static int packet_enqueue(CompareState *s, int mode)
>> +{
>> +    ConnectionKey key = {{ 0 } };
>> +    Packet *pkt = NULL;
>> +    Connection *conn;
>> +
>> +    /* arp packet will be sent */
> Can you add some more detail about that - what do the return
> values of packet_enqueue mean; what happens to things like IPv6 or ARP packets?

OK, will add some comments.
If the primary input packet is not IP packet, we will send it for now.


>
>> +    if (mode == PRIMARY_IN) {
>> +        pkt = packet_new(s, s->pri_rs.buf, s->pri_rs.packet_len, &key);
>> +    } else {
>> +        pkt = packet_new(s, s->sec_rs.buf, s->sec_rs.packet_len, &key);
>> +    }
>> +    if (!pkt) {
>> +        return -1;
>> +    }
>> +
>> +    conn = connection_get(s, &key);
>> +    if (!conn->processing) {
>> +        qemu_mutex_lock(&s->conn_list_lock);
>> +        g_queue_push_tail(&s->conn_list, conn);
>> +        qemu_mutex_unlock(&s->conn_list_lock);
>> +        conn->processing = true;
>> +    }
>> +
>> +    qemu_mutex_lock(&conn->list_lock);
>> +    if (mode == PRIMARY_IN) {
>> +        g_queue_push_tail(&conn->primary_list, pkt);
>> +    } else {
>> +        g_queue_push_tail(&conn->secondary_list, pkt);
>> +    }
>> +    qemu_mutex_unlock(&conn->list_lock);
>> +
>> +    return 0;
>> +}
>> +
>> +static void packet_destroy(void *opaque, void *user_data)
>> +{
>> +    Packet *pkt = opaque;
>> +
>> +    g_free(pkt->data);
>> +    g_slice_free(Packet, pkt);
>> +}
>> +
>> +static inline void colo_flush_connection(void *opaque, void *user_data)
>> +{
> Is this used?

will be used, like lizhijian said.

>
>> +    Connection *conn = opaque;
>> +    Packet *pkt = NULL;
>> +
>> +    qemu_mutex_lock(&conn->list_lock);
>> +    while (!g_queue_is_empty(&conn->primary_list)) {
>> +        pkt = g_queue_pop_head(&conn->primary_list);
>> +        compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size);
>> +        /* FIXME: destroy pkt ?*/
>> +    }
>> +    while (!g_queue_is_empty(&conn->secondary_list)) {
>> +        pkt = g_queue_pop_head(&conn->secondary_list);
>> +        packet_destroy(pkt, NULL);
>> +    }
>> +    qemu_mutex_unlock(&conn->list_lock);
>> +}
>> +
>>   static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size)
>>   {
>>       int ret = 0;
>> @@ -142,8 +431,10 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
>>   
>>       ret = compare_chr_fill_rstate(&s->pri_rs, buf, size);
>>       if (ret == 1) {
>> -        /* FIXME: enqueue to primary packet list */
>> -        compare_chr_send(s->chr_out, buf, size);
>> +        if (packet_enqueue(s, PRIMARY_IN)) {
>> +            error_report("primary: unsupported packet in");
> Is this for non-IP packets?  If so you don't want an error_report - because non-IP are
> quite common; a trace would be useful giving the packet type etc
>
>> +            compare_chr_send(s->chr_out, buf, size);
>> +        }
>>       } else if (ret == -1) {
>>           qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
>>       }
>> @@ -156,7 +447,9 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
>>   
>>       ret = compare_chr_fill_rstate(&s->sec_rs, buf, size);
>>       if (ret == 1) {
>> -        /* TODO: enqueue to secondary packet list*/
>> +        if (packet_enqueue(s, SECONDARY_IN)) {
>> +            error_report("secondary: unsupported packet in");
>> +        }
>>       } else if (ret == -1) {
>>           qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
>>       }
>> @@ -210,6 +503,7 @@ static void compare_set_outdev(Object *obj, const char *value, Error **errp)
>>   static void colo_compare_complete(UserCreatable *uc, Error **errp)
>>   {
>>       CompareState *s = COLO_COMPARE(uc);
>> +    struct sysinfo si;
>>   
>>       if (!s->pri_indev || !s->sec_indev || !s->outdev) {
>>           error_setg(errp, "colo compare needs 'primary_in' ,"
>> @@ -255,6 +549,29 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>>   
>>       QTAILQ_INSERT_TAIL(&net_compares, s, next);
>>   
>> +    g_queue_init(&s->conn_list);
>> +    qemu_mutex_init(&s->conn_list_lock);
>> +
>> +    s->hashtable_size = 0;
>> +    /*
>> +     * Idea from kernel tcp.c: use 1/16384 of memory.  On i386: 32MB
>> +     * machine has 512 buckets. >= 1GB machines have 16384 buckets.
>> +     */
>> +    sysinfo(&si);
>> +    hashtable_max_size = si.totalram / 16384;
>> +    if (si.totalram > (1024 * 1024 * 1024 / PAGE_SIZE)) {
>> +        hashtable_max_size = 16384;
>> +    }
>> +    if (hashtable_max_size < 32) {
>> +        hashtable_max_size = 32;
>> +    }
>> +    hashtable_max_size = hashtable_max_size * 8; /* default factor = 8 */
> Make this a lot simpler; just pick a size and if it's a problem then we'll worry
> about it later, or make it an option on the filter if you want it changeable.
>

OK, I will add hashtable_max_size to make it changeable.

Thanks
zhangchen

>> +    s->connection_track_table = g_hash_table_new_full(connection_key_hash,
>> +                                                      connection_key_equal,
>> +                                                      g_free,
>> +                                                      connection_destroy);
>> +
>>       return;
>>   
>>   out:
>> @@ -297,6 +614,7 @@ static void colo_compare_class_finalize(ObjectClass *oc, void *data)
>>       if (!QTAILQ_EMPTY(&net_compares)) {
>>           QTAILQ_REMOVE(&net_compares, s, next);
>>       }
>> +    qemu_mutex_destroy(&s->conn_list_lock);
>>   }
>>   
>>   static void colo_compare_init(Object *obj)
>> -- 
>> 1.9.1
>>
>>
>>
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
>
>
> .
>

-- 
Thanks
zhangchen

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH V2 2/3] colo-compare: track connection and enqueue packet
  2016-03-31  4:06     ` Zhang Chen
@ 2016-03-31  4:23       ` Li Zhijian
  2016-03-31  4:44         ` Zhang Chen
  0 siblings, 1 reply; 26+ messages in thread
From: Li Zhijian @ 2016-03-31  4:23 UTC (permalink / raw)
  To: Zhang Chen, Dr. David Alan Gilbert
  Cc: zhanghailiang, Gui jianfeng, Jason Wang, eddie.dong, qemu devel,
	Yang Hongyang



On 03/31/2016 12:06 PM, Zhang Chen wrote:
>>>
>>> +static void packet_destroy(void *opaque, void *user_data)
>>> +{
>>> +    Packet *pkt = opaque;
>>> +
>>> +    g_free(pkt->data);
>>> +    g_slice_free(Packet, pkt);
>>> +}
>>> +
>>> +static inline void colo_flush_connection(void *opaque, void *user_data)
>>> +{
>> Is this used?
>
> will be used, like lizhijian said.

I mean you should remove this now, and re-introduce when you integrate to COLO frame

Thanks
Li Zhijian

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH V2 2/3] colo-compare: track connection and enqueue packet
  2016-03-31  4:23       ` Li Zhijian
@ 2016-03-31  4:44         ` Zhang Chen
  0 siblings, 0 replies; 26+ messages in thread
From: Zhang Chen @ 2016-03-31  4:44 UTC (permalink / raw)
  To: Li Zhijian, Dr. David Alan Gilbert
  Cc: zhanghailiang, Gui jianfeng, Jason Wang, eddie.dong, qemu devel,
	Yang Hongyang



On 03/31/2016 12:23 PM, Li Zhijian wrote:
>
>
> On 03/31/2016 12:06 PM, Zhang Chen wrote:
>>>>
>>>> +static void packet_destroy(void *opaque, void *user_data)
>>>> +{
>>>> +    Packet *pkt = opaque;
>>>> +
>>>> +    g_free(pkt->data);
>>>> +    g_slice_free(Packet, pkt);
>>>> +}
>>>> +
>>>> +static inline void colo_flush_connection(void *opaque, void 
>>>> *user_data)
>>>> +{
>>> Is this used?
>>
>> will be used, like lizhijian said.
>
> I mean you should remove this now, and re-introduce when you integrate 
> to COLO frame
>

OK ~~

> Thanks
> Li Zhijian
> .
>

-- 
Thanks
zhangchen

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH V2 3/3] colo-compare: introduce packet comparison thread
  2016-03-30 11:41   ` Dr. David Alan Gilbert
  2016-03-31  2:17     ` Li Zhijian
@ 2016-03-31  6:00     ` Zhang Chen
  1 sibling, 0 replies; 26+ messages in thread
From: Zhang Chen @ 2016-03-31  6:00 UTC (permalink / raw)
  To: Dr. David Alan Gilbert
  Cc: Li Zhijian, Gui jianfeng, Jason Wang, eddie.dong, qemu devel,
	Yang Hongyang, zhanghailiang



On 03/30/2016 07:41 PM, Dr. David Alan Gilbert wrote:
> * Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
>> if packets are same, we send primary packet and drop secondary
>> packet, otherwise notify COLO do checkpoint.
>>
>> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
>> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
>> ---
>>   net/colo-compare.c | 122 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
>>   1 file changed, 121 insertions(+), 1 deletion(-)
>>
>> diff --git a/net/colo-compare.c b/net/colo-compare.c
>> index 0bb5a51..1debc0e 100644
>> --- a/net/colo-compare.c
>> +++ b/net/colo-compare.c
>> @@ -36,6 +36,7 @@
>>   static QTAILQ_HEAD(, CompareState) net_compares =
>>          QTAILQ_HEAD_INITIALIZER(net_compares);
>>   static ssize_t hashtable_max_size;
>> +static int colo_need_checkpoint;
>>   
>>   typedef struct ReadState {
>>       int state; /* 0 = getting length, 1 = getting data */
>> @@ -91,6 +92,13 @@ typedef struct CompareState {
>>       GQueue unprocessed_connections;
>>       /* proxy current hash size */
>>       ssize_t hashtable_size;
>> +
>> +    /* notify compare thread */
>> +    QemuEvent event;
>> +    /* compare thread, a thread for each NIC */
>> +    QemuThread thread;
>> +    int thread_status;
>> +
>>   } CompareState;
>>   
>>   typedef struct Packet {
>> @@ -129,6 +137,15 @@ enum {
>>       SECONDARY_IN,
>>   };
>>   
>> +enum {
>> +    /* compare thread isn't started */
>> +    COMPARE_THREAD_NONE,
>> +    /* compare thread is running */
>> +    COMPARE_THREAD_RUNNING,
>> +    /* compare thread exit */
>> +    COMPARE_THREAD_EXIT,
>> +};
>> +
>>   static void packet_destroy(void *opaque, void *user_data);
>>   static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size);
>>   
>> @@ -340,6 +357,88 @@ static inline void colo_flush_connection(void *opaque, void *user_data)
>>       qemu_mutex_unlock(&conn->list_lock);
>>   }
>>   
>> +static void colo_notify_checkpoint(void)
>> +{
>> +    colo_need_checkpoint = true;
>> +}
>> +
>> +/* TODO colo_do_checkpoint() {
>> + * we flush the connections and reset 'colo_need_checkpoint'
>> + * }
>> + */
>> +
>> +static inline void colo_dump_packet(Packet *pkt)
>> +{
>> +    int i;
>> +    for (i = 0; i < pkt->size; i++) {
>> +        printf("%02x ", ((uint8_t *)pkt->data)[i]);
>> +    }
>> +    printf("\n");
>> +}
>> +
>> +/*
>> + * The IP packets sent by primary and secondary
>> + * will be compared in here
>> + * TODO support ip fragment, Out-Of-Order
>> + * return:    0  means packet same
>> + *            > 0 || < 0 means packet different
>> + */
>> +static int colo_packet_compare(Packet *ppkt, Packet *spkt)
>> +{
>> +    colo_dump_packet(ppkt);
>> +    colo_dump_packet(spkt);
> Obviously those need to become conditional on something.

OK, I will add trace in next.

>
>> +    if (ppkt->size == spkt->size) {
>> +        return memcmp(ppkt->data, spkt->data, spkt->size);
>> +    } else {
>> +        return -1;
>> +    }
>> +}
>> +
>> +static void colo_compare_connection(void *opaque, void *user_data)
>> +{
>> +    Connection *conn = opaque;
>> +    Packet *pkt = NULL;
>> +    GList *result = NULL;
>> +    int ret;
>> +
>> +    qemu_mutex_lock(&conn->list_lock);
>> +    while (!g_queue_is_empty(&conn->primary_list) &&
>> +           !g_queue_is_empty(&conn->secondary_list)) {
>> +        pkt = g_queue_pop_head(&conn->primary_list);
>> +        result = g_queue_find_custom(&conn->secondary_list,
>> +                              pkt, (GCompareFunc)colo_packet_compare);
> I think the order of parameters passed to the colo_packet_compare
> is the wrong way around - although it doesn't really matter with your current
> simple comparison;  https://developer.gnome.org/glib/stable/glib-Double-ended-Queues.html
> says that
>
>      'The function takes two gconstpointer arguments, the GQueue element's data as the
>       first argument and the given user data as the second argument'
>
>    so that makes the first argument the element out of the secondary_list and
> the second argument the 'pkt' that you popped off the primary.

OK, thinks a lot. will fix.

>
>> +
>> +        if (result) {
>> +            ret = compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size);
>> +            if (ret < 0) {
>> +                error_report("colo_send_primary_packet failed");
>> +            }
>> +            g_queue_remove(&conn->secondary_list, result);
>> +        } else {
>> +            g_queue_push_head(&conn->primary_list, pkt);
>> +            colo_notify_checkpoint();
>> +            break;
>> +        }
>> +    }
>> +    qemu_mutex_unlock(&conn->list_lock);
>> +}
>> +
>> +static void *colo_compare_thread(void *opaque)
>> +{
>> +    CompareState *s = opaque;
>> +
>> +    while (s->thread_status == COMPARE_THREAD_RUNNING) {
>> +        qemu_event_wait(&s->event);
>> +        qemu_event_reset(&s->event);
>> +        qemu_mutex_lock(&s->conn_list_lock);
>> +        g_queue_foreach(&s->conn_list, colo_compare_connection, NULL);
>> +        qemu_mutex_unlock(&s->conn_list_lock);
> Interesting; holding the 'conn_list_lock' around the whole of the comparison
> is probably quite expensive if you've got a lot of packets coming in then
> the lock could be held for most of the time.
> I'm not sure of a better solution; maybe use the qemu/rcu_queue.h ?
>
>> +    }
>> +
>> +    return NULL;
>> +}
>> +
>>   static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size)
>>   {
>>       int ret = 0;
>> @@ -433,7 +532,9 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
>>       if (ret == 1) {
>>           if (packet_enqueue(s, PRIMARY_IN)) {
>>               error_report("primary: unsupported packet in");
>> -            compare_chr_send(s->chr_out, buf, size);
>> +            compare_chr_send(s->chr_out, s->pri_rs.buf, s->pri_rs.packet_len);
> Doesn't that change belong in an earlier patch?

Yes, will fix

>
>> +        } else {
>> +            qemu_event_set(&s->event);
> Also these - why are these in this patch?

will fix

>
>>           }
>>       } else if (ret == -1) {
>>           qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
>> @@ -449,6 +550,8 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
>>       if (ret == 1) {
>>           if (packet_enqueue(s, SECONDARY_IN)) {
>>               error_report("secondary: unsupported packet in");
>> +        } else {
>> +            qemu_event_set(&s->event);
>>           }
>>       } else if (ret == -1) {
>>           qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
>> @@ -504,6 +607,8 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>>   {
>>       CompareState *s = COLO_COMPARE(uc);
>>       struct sysinfo si;
>> +    char thread_name[64];
>> +    static int compare_id;
>>   
>>       if (!s->pri_indev || !s->sec_indev || !s->outdev) {
>>           error_setg(errp, "colo compare needs 'primary_in' ,"
>> @@ -552,6 +657,7 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>>       g_queue_init(&s->conn_list);
>>       qemu_mutex_init(&s->conn_list_lock);
>>   
>> +    colo_need_checkpoint = false;
>>       s->hashtable_size = 0;
>>       /*
>>        * Idea from kernel tcp.c: use 1/16384 of memory.  On i386: 32MB
>> @@ -572,6 +678,13 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>>                                                         g_free,
>>                                                         connection_destroy);
>>   
>> +    s->thread_status = COMPARE_THREAD_RUNNING;
>> +    sprintf(thread_name, "proxy compare %d", compare_id);
> As with my comment from last month; the thread names are limited
> to 14 characters on Linux (and most other Unixes) so keep this short;
> I use "proxy:%s" and the device name.

OK, I will fix it.

>
>> +    qemu_thread_create(&s->thread, thread_name,
>> +                       colo_compare_thread, s,
>> +                       QEMU_THREAD_JOINABLE);
>> +    compare_id++;
>> +
>>       return;
>>   
>>   out:
>> @@ -615,6 +728,13 @@ static void colo_compare_class_finalize(ObjectClass *oc, void *data)
>>           QTAILQ_REMOVE(&net_compares, s, next);
>>       }
>>       qemu_mutex_destroy(&s->conn_list_lock);
>> +
>> +    if (s->thread.thread) {
>> +        s->thread_status = COMPARE_THREAD_EXIT;
>> +        qemu_event_set(&s->event);
>> +        qemu_thread_join(&s->thread);
>> +    }
>> +    qemu_event_destroy(&s->event);
>>   }
>>   
>>   static void colo_compare_init(Object *obj)
>> -- 
>> 1.9.1
>>
>>
>>
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
>
>
> .
>

-- 
Thanks
zhangchen

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH V2 0/3] Introduce COLO-compare
  2016-03-30 12:05 ` [Qemu-devel] [PATCH V2 0/3] Introduce COLO-compare Dr. David Alan Gilbert
  2016-03-31  3:01   ` Li Zhijian
@ 2016-03-31  6:48   ` Zhang Chen
  1 sibling, 0 replies; 26+ messages in thread
From: Zhang Chen @ 2016-03-31  6:48 UTC (permalink / raw)
  To: Dr. David Alan Gilbert
  Cc: Li Zhijian, Gui jianfeng, Jason Wang, eddie.dong, qemu devel,
	Yang Hongyang, zhanghailiang



On 03/30/2016 08:05 PM, Dr. David Alan Gilbert wrote:
> * Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
>> COLO-compare is a part of COLO project. It is used
>> to compare the network package to help COLO decide
>> whether to do checkpoint.
> Hi Zhang Chen,
>    I've put comments on the individual patches, but some more general things:
>
>    1) Please add a coment giving the example of the command line for the primary
>      and secondary use of this module - it helps make it easier to understand the patches.

Yes, have reply in
Re: [Qemu-devel] [PATCH V2 1/3] colo-compare: introduce colo compare 
initlization

>
>    2) There's no tracing in here - please add some; I found when I tried to get
>      COLO working I needed to use lots of tracing and debugging to understand the
>      packet flow.

I will add some trace in next.

>
>    3) Add comments; e.g. for each function say which thread is using it and where
>       the packets are coming from; e.g.
>          called from the main thread on the primary for packets arriving over the socket
>          from the secondary.
>
>       There's just so many packets going in so many directions it would make it
>       easier to follow.

OK~

>
>    4) A more fundamental problem is what happens if the secondary never sends anything
>       on the socket, the result is you end up running until the end of the long COLO
>       checkpoint without triggering a discompare - in my world I added a timeout (400ms)
>       for an unmatched packet from the primary, where if no matching packet was received
>       a checkpoint would be triggered.

OK,I will do this in futrue

>
>    5) I see the packet comparison is still the simple memcmpy that you had in December;
>       are you planning on doing anything more complicated; you must be seing most packets
>       miscompare?

Yes, this is just a compare-frame RFC, We will continue to improve it in 
the future

Thanks
Zhang Chen


>
> You can see my current world at; https://github.com/orbitfp7/qemu/commits/orbit-wp4-colo-mar16
> which has my basic TCP comparison (it's only tracking incoming connections) and I know it's
> not complete either.  It mostly works OK, although I've got an occasional seg
> (which makes me wonder if I need to add the conn_list_lock I see you added).  I'm also
> not doing any TCP reassembly which is probably needed.
>
> Dave
>      
>> v2:
>>   - add jhash.h
>>
>> v1:
>>   - initial patch
>>
>>
>> Zhang Chen (3):
>>    colo-compare: introduce colo compare initlization
>>    colo-compare: track connection and enqueue packet
>>    colo-compare: introduce packet comparison thread
>>
>>   include/qemu/jhash.h |  59 ++++
>>   net/Makefile.objs    |   1 +
>>   net/colo-compare.c   | 782 +++++++++++++++++++++++++++++++++++++++++++++++++++
>>   vl.c                 |   3 +-
>>   4 files changed, 844 insertions(+), 1 deletion(-)
>>   create mode 100644 include/qemu/jhash.h
>>   create mode 100644 net/colo-compare.c
>>
>> -- 
>> 1.9.1
>>
>>
>>
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
>
>
> .
>

-- 
Thanks
zhangchen

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH V2 1/3] colo-compare: introduce colo compare initlization
  2016-03-31  1:41     ` Zhang Chen
@ 2016-03-31  7:25       ` Zhang Chen
  2016-03-31  9:24       ` Dr. David Alan Gilbert
  1 sibling, 0 replies; 26+ messages in thread
From: Zhang Chen @ 2016-03-31  7:25 UTC (permalink / raw)
  To: Dr. David Alan Gilbert
  Cc: zhanghailiang, Li Zhijian, Gui jianfeng, Jason Wang, eddie.dong,
	qemu devel, Yang Hongyang



On 03/31/2016 09:41 AM, Zhang Chen wrote:
>
>
> On 03/30/2016 05:25 PM, Dr. David Alan Gilbert wrote:
>> * Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
>>> packet come from primary char indev will be send to
>>> outdev - packet come from secondary char dev will be drop
>> Please put in the description an example of how you invoke
>> the filter on the primary and secondary.
>
> OK.
> colo-compare get packet from chardev(primary_in,secondary_in),
> and output to other chardev(outdev), so, we can use it with the
> help of filter-mirror and filter-redirector. like that:
>
> primary:
> -netdev 
> tap,id=hn0,vhost=off,script=/etc/qemu-ifup,downscript=/etc/qemu-ifdown
> -device e1000,id=e0,netdev=hn0,mac=52:a4:00:12:78:66
> -chardev socket,id=mirror0,host=3.3.3.3,port=9003,server,nowait
> -chardev socket,id=compare1,host=3.3.3.3,port=9004,server,nowait
> -chardev socket,id=compare0,host=3.3.3.3,port=9001,server,nowait
> -chardev socket,id=compare0-0,host=3.3.3.3,port=9001
> -chardev socket,id=compare_out,host=3.3.3.3,port=9005,server,nowait
> -chardev socket,id=compare_out0,host=3.3.3.3,port=9005
> -object filter-mirror,id=m0,netdev=hn0,queue=tx,outdev=mirror0
> -object 
> filter-redirector,netdev=hn0,id=redire0,queue=rx,indev=compare_out
> -object filter-redirector,netdev=hn0,id=redire1,queue=rx,outdev=compare0
> -object 
> colo-compare,id=comp0,primary_in=compare0-0,secondary_in=compare1,outdev=compare_out0 
>
>
> secondary:
> -netdev tap,id=hn0,vhost=off,script=/etc/qemu-ifup,down 
> script=/etc/qemu-ifdown
> -device e1000,netdev=hn0,mac=52:a4:00:12:78:66
> -chardev socket,id=red0,host=3.3.3.3,port=9003
> -chardev socket,id=red1,host=3.3.3.3,port=9004
> -object filter-redirector,id=f1,netdev=hn0,queue=tx,indev=red0
> -object filter-redirector,id=f2,netdev=hn0,queue=rx,outdev=red1
>
>
>
>>
>>> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
>>> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>>> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
>>> ---
>>>   net/Makefile.objs  |   1 +
>>>   net/colo-compare.c | 344 
>>> +++++++++++++++++++++++++++++++++++++++++++++++++++++
>>>   vl.c               |   3 +-
>>>   3 files changed, 347 insertions(+), 1 deletion(-)
>>>   create mode 100644 net/colo-compare.c
>>>
>>> diff --git a/net/Makefile.objs b/net/Makefile.objs
>>> index b7c22fd..ba92f73 100644
>>> --- a/net/Makefile.objs
>>> +++ b/net/Makefile.objs
>>> @@ -16,3 +16,4 @@ common-obj-$(CONFIG_NETMAP) += netmap.o
>>>   common-obj-y += filter.o
>>>   common-obj-y += filter-buffer.o
>>>   common-obj-y += filter-mirror.o
>>> +common-obj-y += colo-compare.o
>>> diff --git a/net/colo-compare.c b/net/colo-compare.c
>>> new file mode 100644
>>> index 0000000..62c66df
>>> --- /dev/null
>>> +++ b/net/colo-compare.c
>>> @@ -0,0 +1,344 @@
>>> +/*
>>> + * Copyright (c) 2016 FUJITSU LIMITED
>>> + * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>>> + *
>>> + * This work is licensed under the terms of the GNU GPL, version 2 or
>>> + * later.  See the COPYING file in the top-level directory.
>>> + */
>>> +
>>> +#include "qemu/osdep.h"
>>> +#include "qemu-common.h"
>>> +#include "qapi/qmp/qerror.h"
>>> +#include "qemu/error-report.h"
>>> +
>>> +#include "net/net.h"
>>> +#include "net/vhost_net.h"
>>> +#include "qom/object_interfaces.h"
>>> +#include "qemu/iov.h"
>>> +#include "qom/object.h"
>>> +#include "qemu/typedefs.h"
>>> +#include "net/queue.h"
>>> +#include "sysemu/char.h"
>>> +#include "qemu/sockets.h"
>>> +
>>> +#define TYPE_COLO_COMPARE "colo-compare"
>>> +#define COLO_COMPARE(obj) \
>>> +    OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
>>> +
>>> +#define COMPARE_READ_LEN_MAX NET_BUFSIZE
>>> +
>>> +static QTAILQ_HEAD(, CompareState) net_compares =
>>> +       QTAILQ_HEAD_INITIALIZER(net_compares);
>>> +
>>> +typedef struct ReadState {
>>> +    int state; /* 0 = getting length, 1 = getting data */
>>> +    unsigned int index;
>>> +    unsigned int packet_len;
>> Please make packet_len and index  uint32_t, since they're sent over 
>> the wire
>> as 32bit.

OK~ will fix.

>>
>>> +    uint8_t buf[COMPARE_READ_LEN_MAX];
>>> +} ReadState;
>>> +
>>> +typedef struct CompareState {
>>> +    Object parent;
>>> +
>>> +    char *pri_indev;
>>> +    char *sec_indev;
>>> +    char *outdev;
>>> +    CharDriverState *chr_pri_in;
>>> +    CharDriverState *chr_sec_in;
>>> +    CharDriverState *chr_out;
>>> +    QTAILQ_ENTRY(CompareState) next;
>>> +    ReadState pri_rs;
>>> +    ReadState sec_rs;
>>> +} CompareState;
>>> +
>>> +static int compare_chr_send(CharDriverState *out, const uint8_t 
>>> *buf, int size)
>>> +{
>>> +    int ret = 0;
>>> +    uint32_t len = htonl(size);
>>> +
>> Similarly, make 'size' uint32_t - everything that gets converted into 
>> a uint32_t
>> it's probably best to make a uint32_t.

OK

>>
>>> +    if (!size) {
>>> +        return 0;
>>> +    }
>>> +
>>> +    ret = qemu_chr_fe_write_all(out, (uint8_t *)&len, sizeof(len));
>>> +    if (ret != sizeof(len)) {
>>> +        goto err;
>>> +    }
>>> +
>>> +    ret = qemu_chr_fe_write_all(out, (uint8_t *)buf, size);
>>> +    if (ret != size) {
>>> +        goto err;
>>> +    }
>>> +
>> You can make this slightly simpler and save the return 0;
>>

will fix~~

>>> +    return 0;
>>> +
>>> +err:
>>> +    return ret < 0 ? ret : -EIO;
>> err:
>>         return ret <= 0 ? ret : -EIO;
>>
>>> +}
>>> +
>>> +static int compare_chr_can_read(void *opaque)
>>> +{
>>> +    return COMPARE_READ_LEN_MAX;
>>> +}
>>> +
>>> +/* Returns
>>> + * 0: readstate is not ready
>>> + * 1: readstate is ready
>>> + * otherwise error occurs
>>> + */
>>> +static int compare_chr_fill_rstate(ReadState *rs, const uint8_t 
>>> *buf, int size)
>>> +{
>>> +    unsigned int l;
>>> +    while (size > 0) {
>>> +        /* reassemble a packet from the network */
>>> +        switch (rs->state) { /* 0 = getting length, 1 = getting 
>>> data */
>>> +        case 0:
>>> +            l = 4 - rs->index;
>>> +            if (l > size) {
>>> +                l = size;
>>> +            }
>>> +            memcpy(rs->buf + rs->index, buf, l);
>>> +            buf += l;
>>> +            size -= l;
>>> +            rs->index += l;
>>> +            if (rs->index == 4) {
>>> +                /* got length */
>>> +                rs->packet_len = ntohl(*(uint32_t *)rs->buf);
>>> +                rs->index = 0;
>>> +                rs->state = 1;
>>> +            }
>>> +            break;
>>> +        case 1:
>>> +            l = rs->packet_len - rs->index;
>>> +            if (l > size) {
>>> +                l = size;
>>> +            }
>>> +            if (rs->index + l <= sizeof(rs->buf)) {
>>> +                memcpy(rs->buf + rs->index, buf, l);
>>> +            } else {
>>> +                error_report("serious error: oversized packet 
>>> received.");
>> Isn't it easier to do this check above in the 'got length' if ?
>> Instead of 'serious error' say where it's coming from;
>>    'colo-compare: Received oversized packet over socket'
>>
>> that makes it a lot easier when people see the error for the first time.
>> Also, should you check for the error case of receiving a packet where
>> packet_len == 0 ?

Yes , will fix it in next version.

>>
>>> +                rs->index = rs->state = 0;
>>> +                return -1;
>>> +            }
>>> +
>>> +            rs->index += l;
>>> +            buf += l;
>>> +            size -= l;
>>> +            if (rs->index >= rs->packet_len) {
>>> +                rs->index = 0;
>>> +                rs->state = 0;
>>> +                return 1;
>>> +            }
>>> +            break;
>>> +        }
>>> +    }
>>> +    return 0;
>>> +}
>>> +
>>> +static void compare_pri_chr_in(void *opaque, const uint8_t *buf, 
>>> int size)
>>> +{
>>> +    CompareState *s = COLO_COMPARE(opaque);
>>> +    int ret;
>>> +
>>> +    ret = compare_chr_fill_rstate(&s->pri_rs, buf, size);
>>> +    if (ret == 1) {
>>> +        /* FIXME: enqueue to primary packet list */
>>> +        compare_chr_send(s->chr_out, buf, size);
>>> +    } else if (ret == -1) {
>>> +        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
>>> +    }
>>> +}
>>> +
>>> +static void compare_sec_chr_in(void *opaque, const uint8_t *buf, 
>>> int size)
>>> +{
>>> +    CompareState *s = COLO_COMPARE(opaque);
>>> +    int ret;
>>> +
>>> +    ret = compare_chr_fill_rstate(&s->sec_rs, buf, size);
>>> +    if (ret == 1) {
>>> +        /* TODO: enqueue to secondary packet list*/
>>> +    } else if (ret == -1) {
>>> +        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
>>> +    }
>>> +}
>>> +
>>> +static char *compare_get_pri_indev(Object *obj, Error **errp)
>>> +{
>>> +    CompareState *s = COLO_COMPARE(obj);
>>> +
>>> +    return g_strdup(s->pri_indev);
>>> +}
>>> +
>>> +static void compare_set_pri_indev(Object *obj, const char *value, 
>>> Error **errp)
>>> +{
>>> +    CompareState *s = COLO_COMPARE(obj);
>>> +
>>> +    g_free(s->pri_indev);
>>> +    s->pri_indev = g_strdup(value);
>>> +}
>>> +
>>> +static char *compare_get_sec_indev(Object *obj, Error **errp)
>>> +{
>>> +    CompareState *s = COLO_COMPARE(obj);
>>> +
>>> +    return g_strdup(s->sec_indev);
>>> +}
>>> +
>>> +static void compare_set_sec_indev(Object *obj, const char *value, 
>>> Error **errp)
>>> +{
>>> +    CompareState *s = COLO_COMPARE(obj);
>>> +
>>> +    g_free(s->sec_indev);
>>> +    s->sec_indev = g_strdup(value);
>>> +}
>>> +
>>> +static char *compare_get_outdev(Object *obj, Error **errp)
>>> +{
>>> +    CompareState *s = COLO_COMPARE(obj);
>>> +
>>> +    return g_strdup(s->outdev);
>>> +}
>>> +
>>> +static void compare_set_outdev(Object *obj, const char *value, 
>>> Error **errp)
>>> +{
>>> +    CompareState *s = COLO_COMPARE(obj);
>>> +
>>> +    g_free(s->outdev);
>>> +    s->outdev = g_strdup(value);
>>> +}
>>> +
>>> +static void colo_compare_complete(UserCreatable *uc, Error **errp)
>>> +{
>>> +    CompareState *s = COLO_COMPARE(uc);
>>> +
>>> +    if (!s->pri_indev || !s->sec_indev || !s->outdev) {
>>> +        error_setg(errp, "colo compare needs 'primary_in' ,"
>>> +                   "'secondary_in','outdev' property set");
>>> +        return;
>>> +    } else if (!strcmp(s->pri_indev, s->outdev) ||
>>> +               !strcmp(s->sec_indev, s->outdev) ||
>>> +               !strcmp(s->pri_indev, s->sec_indev)) {
>>> +        error_setg(errp, "'indev' and 'outdev' could not be same "
>>> +                   "for compare module");
>>> +        return;
>>> +    }
>>> +
>>> +    s->chr_pri_in = qemu_chr_find(s->pri_indev);
>>> +    if (s->chr_pri_in == NULL) {
>>> +        error_set(errp, ERROR_CLASS_DEVICE_NOT_FOUND,
>> I think error_set seems to be discouraged these days, just use 
>> error_setg
>> (see the comment in include/qapi/error.h just above error_set).
>>

OK

>>> +                  "IN Device '%s' not found", s->pri_indev);
>>> +        goto out;
>>> +    }
>>> +
>>> +    qemu_chr_fe_claim_no_fail(s->chr_pri_in);
>>> +    qemu_chr_add_handlers(s->chr_pri_in, compare_chr_can_read,
>>> +                          compare_pri_chr_in, NULL, s);
>>> +
>>> +    s->chr_sec_in = qemu_chr_find(s->sec_indev);
>>> +    if (s->chr_sec_in == NULL) {
>>> +        error_set(errp, ERROR_CLASS_DEVICE_NOT_FOUND,
>>> +                  "IN Device '%s' not found", s->sec_indev);
>>> +        goto out;
>>> +    }
>> Can you explain/give an example of the way you create one of these
>> filters?

Yes, the way on top of this mail.

>> Would you ever have a pri_indev and sec_indev on the same filter?

Not same one.

>> If not, then why not just have an 'indev' option rather than the
>> two separate configs.
>> If you cna have both then you need to change hte error 'IN Device'
>> to say either 'Primary IN device' or secondary.

will fix


>>
>>> + qemu_chr_fe_claim_no_fail(s->chr_sec_in);
>>> +    qemu_chr_add_handlers(s->chr_sec_in, compare_chr_can_read,
>>> +                          compare_sec_chr_in, NULL, s);
>>> +
>>> +    s->chr_out = qemu_chr_find(s->outdev);
>>> +    if (s->chr_out == NULL) {
>>> +        error_set(errp, ERROR_CLASS_DEVICE_NOT_FOUND,
>>> +                  "OUT Device '%s' not found", s->outdev);
>>> +        goto out;
>>> +    }
>>> +    qemu_chr_fe_claim_no_fail(s->chr_out);
>>> +
>>> +    QTAILQ_INSERT_TAIL(&net_compares, s, next);
>>> +
>>> +    return;
>>> +
>>> +out:
>>> +    if (s->chr_pri_in) {
>>> +        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
>>> +        qemu_chr_fe_release(s->chr_pri_in);
>>> +        s->chr_pri_in = NULL;
>>> +    }
>>> +    if (s->chr_sec_in) {
>>> +        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
>>> +        qemu_chr_fe_release(s->chr_sec_in);
>>> +        s->chr_pri_in = NULL;
>>> +    }
>> Can't you avoid this by making the code:
>>
>>       s->chr_pri_in = qemu_chr_find(...)
>>       if (s->chr_pri_in == NULL) {
>>          error (...)
>>       }
>>       s->chr_sec_in = qemu_chr_find(...)
>>       if (s->chr_sec_in == NULL) {
>>          error (...)
>>       }
>>       s->chr_out = qemu_chr_find(...)
>>       if (s->chr_out == NULL) {
>>          error (...)
>>       }
>>
>>       qemu_chr_fe_claim_no_fail(pri)
>>       add_handlers(pri...)
>>       qemu_chr_fe_claim_no_fail(sec)
>>       add_handlers(sec...)
>>       qemu_chr_fe_claim_no_fail(out)
>>       add_handlers(out...)
>>
>> so you don't have to clean up the handlers/release in the out: ?

I will fix code style to this.
we don't need set a handler for out, because we just send packet to out.
so we don't have to clean up out.

Thanks
Zhang Chen

>>
>>> +}
>>> +
>>> +static void colo_compare_class_init(ObjectClass *oc, void *data)
>>> +{
>>> +    UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
>>> +
>>> +    ucc->complete = colo_compare_complete;
>>> +}
>>> +
>>> +static void colo_compare_class_finalize(ObjectClass *oc, void *data)
>>> +{
>>> +    UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
>>> +    CompareState *s = COLO_COMPARE(ucc);
>>> +
>>> +    if (s->chr_pri_in) {
>>> +        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
>>> +        qemu_chr_fe_release(s->chr_pri_in);
>>> +    }
>>> +    if (s->chr_sec_in) {
>>> +        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
>>> +        qemu_chr_fe_release(s->chr_sec_in);
>>> +    }
>>> +    if (s->chr_out) {
>>> +        qemu_chr_fe_release(s->chr_out);
>>> +    }
>>> +
>>> +    if (!QTAILQ_EMPTY(&net_compares)) {
>>> +        QTAILQ_REMOVE(&net_compares, s, next);
>>> +    }
>>> +}
>>> +
>>> +static void colo_compare_init(Object *obj)
>>> +{
>>> +    object_property_add_str(obj, "primary_in",
>>> +                            compare_get_pri_indev, 
>>> compare_set_pri_indev,
>>> +                            NULL);
>>> +    object_property_add_str(obj, "secondary_in",
>>> +                            compare_get_sec_indev, 
>>> compare_set_sec_indev,
>>> +                            NULL);
>>> +    object_property_add_str(obj, "outdev",
>>> +                            compare_get_outdev, compare_set_outdev,
>>> +                            NULL);
>>> +}
>>> +
>>> +static void colo_compare_finalize(Object *obj)
>>> +{
>>> +    CompareState *s = COLO_COMPARE(obj);
>>> +
>>> +    g_free(s->pri_indev);
>>> +    g_free(s->sec_indev);
>>> +    g_free(s->outdev);
>>> +}
>>> +
>>> +static const TypeInfo colo_compare_info = {
>>> +    .name = TYPE_COLO_COMPARE,
>>> +    .parent = TYPE_OBJECT,
>>> +    .instance_size = sizeof(CompareState),
>>> +    .instance_init = colo_compare_init,
>>> +    .instance_finalize = colo_compare_finalize,
>>> +    .class_size = sizeof(CompareState),
>>> +    .class_init = colo_compare_class_init,
>>> +    .class_finalize = colo_compare_class_finalize,
>>> +    .interfaces = (InterfaceInfo[]) {
>>> +        { TYPE_USER_CREATABLE },
>>> +        { }
>>> +    }
>>> +};
>>> +
>>> +static void register_types(void)
>>> +{
>>> +    type_register_static(&colo_compare_info);
>>> +}
>>> +
>>> +type_init(register_types);
>>> diff --git a/vl.c b/vl.c
>>> index dc6e63a..70064ad 100644
>>> --- a/vl.c
>>> +++ b/vl.c
>>> @@ -2842,7 +2842,8 @@ static bool object_create_initial(const char 
>>> *type)
>>>       if (g_str_equal(type, "filter-buffer") ||
>>>           g_str_equal(type, "filter-dump") ||
>>>           g_str_equal(type, "filter-mirror") ||
>>> -        g_str_equal(type, "filter-redirector")) {
>>> +        g_str_equal(type, "filter-redirector") ||
>>> +        g_str_equal(type, "colo-compare")) {
>>>           return false;
>>>       }
>>>   --
>>> 1.9.1
>>>
>>>
>>>
>> -- 
>> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
>>
>>
>> .
>>
>

-- 
Thanks
zhangchen

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH V2 2/3] colo-compare: track connection and enqueue packet
  2016-03-31  2:09     ` Li Zhijian
@ 2016-03-31  8:47       ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 26+ messages in thread
From: Dr. David Alan Gilbert @ 2016-03-31  8:47 UTC (permalink / raw)
  To: Li Zhijian
  Cc: Zhang Chen, Gui jianfeng, Jason Wang, eddie.dong, qemu devel,
	Yang Hongyang, zhanghailiang

* Li Zhijian (lizhijian@cn.fujitsu.com) wrote:
> 
> 
> On 03/30/2016 06:36 PM, Dr. David Alan Gilbert wrote:
> >* Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:

...

> >>+static inline void colo_flush_connection(void *opaque, void *user_data)
> >>+{
> >
> >Is this used?
> Yes, it isn't used currently.
> Actually this is needed after compare module is integrated to COLO frame.

OK, leave it out for the moment; I think some compilers complain when
you compile with an unused static.

Dave

> 
> 
> >
> >>+    Connection *conn = opaque;
> >>+    Packet *pkt = NULL;
> >>+
> >>+    qemu_mutex_lock(&conn->list_lock);
> >>+    while (!g_queue_is_empty(&conn->primary_list)) {
> >>+        pkt = g_queue_pop_head(&conn->primary_list);
> >>+        compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size);
> >>+        /* FIXME: destroy pkt ?*/
> >>+    }
> >>+    while (!g_queue_is_empty(&conn->secondary_list)) {
> >>+        pkt = g_queue_pop_head(&conn->secondary_list);
> >>+        packet_destroy(pkt, NULL);
> >>+    }
> >>+    qemu_mutex_unlock(&conn->list_lock);
> >>+}
> >>+
> >>  static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size)
> >>  {
> >>      int ret = 0;
> >>@@ -142,8 +431,10 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
> >>
> >>      ret = compare_chr_fill_rstate(&s->pri_rs, buf, size);
> >>      if (ret == 1) {
> >>-        /* FIXME: enqueue to primary packet list */
> >>-        compare_chr_send(s->chr_out, buf, size);
> >>+        if (packet_enqueue(s, PRIMARY_IN)) {
> >>+            error_report("primary: unsupported packet in");
> >
> >Is this for non-IP packets?  If so you don't want an error_report - because non-IP are
> >quite common; a trace would be useful giving the packet type etc
> Agree. And further more, IMO release the packet to client is not always correct for all non-IP
> but at current stage, this looks fine.
> 
> Thanks
> Li Zhijian
> 
> >
> >>+            compare_chr_send(s->chr_out, buf, size);
> >>+        }
> >>      } else if (ret == -1) {
> >>          qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
> >>      }
> >>@@ -156,7 +447,9 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
> >>
> >>      ret = compare_chr_fill_rstate(&s->sec_rs, buf, size);
> >>      if (ret == 1) {
> >>-        /* TODO: enqueue to secondary packet list*/
> >>+        if (packet_enqueue(s, SECONDARY_IN)) {
> >>+            error_report("secondary: unsupported packet in");
> >>+        }
> >>      } else if (ret == -1) {
> >>          qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
> >>      }
> >>@@ -210,6 +503,7 @@ static void compare_set_outdev(Object *obj, const char *value, Error **errp)
> >>  static void colo_compare_complete(UserCreatable *uc, Error **errp)
> >>  {
> >>      CompareState *s = COLO_COMPARE(uc);
> >>+    struct sysinfo si;
> >>
> >>      if (!s->pri_indev || !s->sec_indev || !s->outdev) {
> >>          error_setg(errp, "colo compare needs 'primary_in' ,"
> >>@@ -255,6 +549,29 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
> >>
> >>      QTAILQ_INSERT_TAIL(&net_compares, s, next);
> >>
> >>+    g_queue_init(&s->conn_list);
> >>+    qemu_mutex_init(&s->conn_list_lock);
> >>+
> >>+    s->hashtable_size = 0;
> >>+    /*
> >>+     * Idea from kernel tcp.c: use 1/16384 of memory.  On i386: 32MB
> >>+     * machine has 512 buckets. >= 1GB machines have 16384 buckets.
> >>+     */
> >>+    sysinfo(&si);
> >>+    hashtable_max_size = si.totalram / 16384;
> >>+    if (si.totalram > (1024 * 1024 * 1024 / PAGE_SIZE)) {
> >>+        hashtable_max_size = 16384;
> >>+    }
> >>+    if (hashtable_max_size < 32) {
> >>+        hashtable_max_size = 32;
> >>+    }
> >>+    hashtable_max_size = hashtable_max_size * 8; /* default factor = 8 */
> >
> >Make this a lot simpler; just pick a size and if it's a problem then we'll worry
> >about it later, or make it an option on the filter if you want it changeable.
> >
> >>+    s->connection_track_table = g_hash_table_new_full(connection_key_hash,
> >>+                                                      connection_key_equal,
> >>+                                                      g_free,
> >>+                                                      connection_destroy);
> >>+
> >>      return;
> >>
> >>  out:
> >>@@ -297,6 +614,7 @@ static void colo_compare_class_finalize(ObjectClass *oc, void *data)
> >>      if (!QTAILQ_EMPTY(&net_compares)) {
> >>          QTAILQ_REMOVE(&net_compares, s, next);
> >>      }
> >>+    qemu_mutex_destroy(&s->conn_list_lock);
> >>  }
> >>
> >>  static void colo_compare_init(Object *obj)
> >>--
> >>1.9.1
> >>
> >>
> >>
> >--
> >Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
> >
> >
> >.
> >
> 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH V2 3/3] colo-compare: introduce packet comparison thread
  2016-03-31  2:17     ` Li Zhijian
@ 2016-03-31  8:50       ` Dr. David Alan Gilbert
  0 siblings, 0 replies; 26+ messages in thread
From: Dr. David Alan Gilbert @ 2016-03-31  8:50 UTC (permalink / raw)
  To: Li Zhijian
  Cc: Zhang Chen, Gui jianfeng, Jason Wang, eddie.dong, qemu devel,
	Yang Hongyang, zhanghailiang

* Li Zhijian (lizhijian@cn.fujitsu.com) wrote:
> 
> 
> On 03/30/2016 07:41 PM, Dr. David Alan Gilbert wrote:
> [...]
> 
> >>>@@ -433,7 +532,9 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
> >>>      if (ret == 1) {
> >>>          if (packet_enqueue(s, PRIMARY_IN)) {
> >>>              error_report("primary: unsupported packet in");
> >>>-            compare_chr_send(s->chr_out, buf, size);
> >>>+            compare_chr_send(s->chr_out, s->pri_rs.buf, s->pri_rs.packet_len);
> >Doesn't that change belong in an earlier patch?
> >
> >>>+        } else {
> >>>+            qemu_event_set(&s->event);
> >Also these - why are these in this patch?
> This event is to wakeup comparison thread to do compare.
> Do you think we should put event related code to patch 2 ?

Ah OK; yes the event_set makes sense in this patch.

Dave

> Thanks
> Li
> >
> >>>          }
> 
> 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH V2 1/3] colo-compare: introduce colo compare initlization
  2016-03-31  1:41     ` Zhang Chen
  2016-03-31  7:25       ` Zhang Chen
@ 2016-03-31  9:24       ` Dr. David Alan Gilbert
  2016-04-01  5:11         ` Jason Wang
  1 sibling, 1 reply; 26+ messages in thread
From: Dr. David Alan Gilbert @ 2016-03-31  9:24 UTC (permalink / raw)
  To: Zhang Chen
  Cc: Li Zhijian, Gui jianfeng, Jason Wang, eddie.dong, qemu devel,
	Yang Hongyang, zhanghailiang

* Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
> 
> 
> On 03/30/2016 05:25 PM, Dr. David Alan Gilbert wrote:
> >* Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
> >>packet come from primary char indev will be send to
> >>outdev - packet come from secondary char dev will be drop
> >Please put in the description an example of how you invoke
> >the filter on the primary and secondary.
> 
> OK.
> colo-compare get packet from chardev(primary_in,secondary_in),
> and output to other chardev(outdev), so, we can use it with the
> help of filter-mirror and filter-redirector. like that:

Wow, this is a bit more complicated than I expected - I was expecting just one
socket.  So let me draw this out; see comments below.

> primary:
> -netdev
> tap,id=hn0,vhost=off,script=/etc/qemu-ifup,downscript=/etc/qemu-ifdown
> -device e1000,id=e0,netdev=hn0,mac=52:a4:00:12:78:66
> -chardev socket,id=mirror0,host=3.3.3.3,port=9003,server,nowait
> -chardev socket,id=compare1,host=3.3.3.3,port=9004,server,nowait
> -chardev socket,id=compare0,host=3.3.3.3,port=9001,server,nowait
> -chardev socket,id=compare0-0,host=3.3.3.3,port=9001
> -chardev socket,id=compare_out,host=3.3.3.3,port=9005,server,nowait
> -chardev socket,id=compare_out0,host=3.3.3.3,port=9005
> -object filter-mirror,id=m0,netdev=hn0,queue=tx,outdev=mirror0
> -object filter-redirector,netdev=hn0,id=redire0,queue=rx,indev=compare_out
> -object filter-redirector,netdev=hn0,id=redire1,queue=rx,outdev=compare0
> -object colo-compare,id=comp0,primary_in=compare0-0,secondary_in=compare1,outdev=compare_out0

            ----> mirror0: socket:secondary:9003
            |
        mirror-m0     <-- e1000
            |
            v
        redirector-redire1 ---> compare0 socket:primary:9001 (to compare0-0)
                          
            -----< compare0-0 socket:primary:9001 (to compare0)
            |  primary_in
            |
        compare-comp0       -----> compare_out0 socket:primary:9005
            |
            |  secondary_in
            -----< compare1   socket:secondary:9004

tap <-- redirector-redire0 <--- compare_out socket:primary:9005 (from compare_out0)


> secondary:
> -netdev tap,id=hn0,vhost=off,script=/etc/qemu-ifup,down
> script=/etc/qemu-ifdown
> -device e1000,netdev=hn0,mac=52:a4:00:12:78:66
> -chardev socket,id=red0,host=3.3.3.3,port=9003
> -chardev socket,id=red1,host=3.3.3.3,port=9004
> -object filter-redirector,id=f1,netdev=hn0,queue=tx,indev=red0
> -object filter-redirector,id=f2,netdev=hn0,queue=rx,outdev=red1

            ----> red0 socket::9003
            |
tap <-- redirector-f1 <--
                           e1000
    --> redirector-f2 -->
            |
            ----< red1 socket::9004

OK, so I think we need to find a way to fix two things:
   a) There must be an easier way of connecting two filters within the same
      qemu than going up to the kernel's socket code, around it's TCP stack
      and back.  Is there a better type of chardev to use we can short circuit
      with - it shouldn't need to leave QEMU (although I can see it's easier
      for debugging like this).  Jason/Dan - any ideas?

   b) We should only need one socket for the connection between primary and
      secondary - I'm not sure how to change it to let it do that.

   c) You need to be able to turn off nagling (socket no delay) on the sockets;
     I found a noticeable benefit of doing this on the connection between primary
     and secondary in my world.

Dave

> 
> 
> 
> >
> >>Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
> >>Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
> >>Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
> >>---
> >>  net/Makefile.objs  |   1 +
> >>  net/colo-compare.c | 344 +++++++++++++++++++++++++++++++++++++++++++++++++++++
> >>  vl.c               |   3 +-
> >>  3 files changed, 347 insertions(+), 1 deletion(-)
> >>  create mode 100644 net/colo-compare.c
> >>
> >>diff --git a/net/Makefile.objs b/net/Makefile.objs
> >>index b7c22fd..ba92f73 100644
> >>--- a/net/Makefile.objs
> >>+++ b/net/Makefile.objs
> >>@@ -16,3 +16,4 @@ common-obj-$(CONFIG_NETMAP) += netmap.o
> >>  common-obj-y += filter.o
> >>  common-obj-y += filter-buffer.o
> >>  common-obj-y += filter-mirror.o
> >>+common-obj-y += colo-compare.o
> >>diff --git a/net/colo-compare.c b/net/colo-compare.c
> >>new file mode 100644
> >>index 0000000..62c66df
> >>--- /dev/null
> >>+++ b/net/colo-compare.c
> >>@@ -0,0 +1,344 @@
> >>+/*
> >>+ * Copyright (c) 2016 FUJITSU LIMITED
> >>+ * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
> >>+ *
> >>+ * This work is licensed under the terms of the GNU GPL, version 2 or
> >>+ * later.  See the COPYING file in the top-level directory.
> >>+ */
> >>+
> >>+#include "qemu/osdep.h"
> >>+#include "qemu-common.h"
> >>+#include "qapi/qmp/qerror.h"
> >>+#include "qemu/error-report.h"
> >>+
> >>+#include "net/net.h"
> >>+#include "net/vhost_net.h"
> >>+#include "qom/object_interfaces.h"
> >>+#include "qemu/iov.h"
> >>+#include "qom/object.h"
> >>+#include "qemu/typedefs.h"
> >>+#include "net/queue.h"
> >>+#include "sysemu/char.h"
> >>+#include "qemu/sockets.h"
> >>+
> >>+#define TYPE_COLO_COMPARE "colo-compare"
> >>+#define COLO_COMPARE(obj) \
> >>+    OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
> >>+
> >>+#define COMPARE_READ_LEN_MAX NET_BUFSIZE
> >>+
> >>+static QTAILQ_HEAD(, CompareState) net_compares =
> >>+       QTAILQ_HEAD_INITIALIZER(net_compares);
> >>+
> >>+typedef struct ReadState {
> >>+    int state; /* 0 = getting length, 1 = getting data */
> >>+    unsigned int index;
> >>+    unsigned int packet_len;
> >Please make packet_len and index  uint32_t, since they're sent over the wire
> >as 32bit.
> >
> >>+    uint8_t buf[COMPARE_READ_LEN_MAX];
> >>+} ReadState;
> >>+
> >>+typedef struct CompareState {
> >>+    Object parent;
> >>+
> >>+    char *pri_indev;
> >>+    char *sec_indev;
> >>+    char *outdev;
> >>+    CharDriverState *chr_pri_in;
> >>+    CharDriverState *chr_sec_in;
> >>+    CharDriverState *chr_out;
> >>+    QTAILQ_ENTRY(CompareState) next;
> >>+    ReadState pri_rs;
> >>+    ReadState sec_rs;
> >>+} CompareState;
> >>+
> >>+static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size)
> >>+{
> >>+    int ret = 0;
> >>+    uint32_t len = htonl(size);
> >>+
> >Similarly, make 'size' uint32_t - everything that gets converted into a uint32_t
> >it's probably best to make a uint32_t.
> >
> >>+    if (!size) {
> >>+        return 0;
> >>+    }
> >>+
> >>+    ret = qemu_chr_fe_write_all(out, (uint8_t *)&len, sizeof(len));
> >>+    if (ret != sizeof(len)) {
> >>+        goto err;
> >>+    }
> >>+
> >>+    ret = qemu_chr_fe_write_all(out, (uint8_t *)buf, size);
> >>+    if (ret != size) {
> >>+        goto err;
> >>+    }
> >>+
> >You can make this slightly simpler and save the return 0;
> >
> >>+    return 0;
> >>+
> >>+err:
> >>+    return ret < 0 ? ret : -EIO;
> >err:
> >        return ret <= 0 ? ret : -EIO;
> >
> >>+}
> >>+
> >>+static int compare_chr_can_read(void *opaque)
> >>+{
> >>+    return COMPARE_READ_LEN_MAX;
> >>+}
> >>+
> >>+/* Returns
> >>+ * 0: readstate is not ready
> >>+ * 1: readstate is ready
> >>+ * otherwise error occurs
> >>+ */
> >>+static int compare_chr_fill_rstate(ReadState *rs, const uint8_t *buf, int size)
> >>+{
> >>+    unsigned int l;
> >>+    while (size > 0) {
> >>+        /* reassemble a packet from the network */
> >>+        switch (rs->state) { /* 0 = getting length, 1 = getting data */
> >>+        case 0:
> >>+            l = 4 - rs->index;
> >>+            if (l > size) {
> >>+                l = size;
> >>+            }
> >>+            memcpy(rs->buf + rs->index, buf, l);
> >>+            buf += l;
> >>+            size -= l;
> >>+            rs->index += l;
> >>+            if (rs->index == 4) {
> >>+                /* got length */
> >>+                rs->packet_len = ntohl(*(uint32_t *)rs->buf);
> >>+                rs->index = 0;
> >>+                rs->state = 1;
> >>+            }
> >>+            break;
> >>+        case 1:
> >>+            l = rs->packet_len - rs->index;
> >>+            if (l > size) {
> >>+                l = size;
> >>+            }
> >>+            if (rs->index + l <= sizeof(rs->buf)) {
> >>+                memcpy(rs->buf + rs->index, buf, l);
> >>+            } else {
> >>+                error_report("serious error: oversized packet received.");
> >Isn't it easier to do this check above in the 'got length' if ?
> >Instead of 'serious error' say where it's coming from;
> >   'colo-compare: Received oversized packet over socket'
> >
> >that makes it a lot easier when people see the error for the first time.
> >Also, should you check for the error case of receiving a packet where
> >packet_len == 0 ?
> >
> >>+                rs->index = rs->state = 0;
> >>+                return -1;
> >>+            }
> >>+
> >>+            rs->index += l;
> >>+            buf += l;
> >>+            size -= l;
> >>+            if (rs->index >= rs->packet_len) {
> >>+                rs->index = 0;
> >>+                rs->state = 0;
> >>+                return 1;
> >>+            }
> >>+            break;
> >>+        }
> >>+    }
> >>+    return 0;
> >>+}
> >>+
> >>+static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
> >>+{
> >>+    CompareState *s = COLO_COMPARE(opaque);
> >>+    int ret;
> >>+
> >>+    ret = compare_chr_fill_rstate(&s->pri_rs, buf, size);
> >>+    if (ret == 1) {
> >>+        /* FIXME: enqueue to primary packet list */
> >>+        compare_chr_send(s->chr_out, buf, size);
> >>+    } else if (ret == -1) {
> >>+        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
> >>+    }
> >>+}
> >>+
> >>+static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
> >>+{
> >>+    CompareState *s = COLO_COMPARE(opaque);
> >>+    int ret;
> >>+
> >>+    ret = compare_chr_fill_rstate(&s->sec_rs, buf, size);
> >>+    if (ret == 1) {
> >>+        /* TODO: enqueue to secondary packet list*/
> >>+    } else if (ret == -1) {
> >>+        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
> >>+    }
> >>+}
> >>+
> >>+static char *compare_get_pri_indev(Object *obj, Error **errp)
> >>+{
> >>+    CompareState *s = COLO_COMPARE(obj);
> >>+
> >>+    return g_strdup(s->pri_indev);
> >>+}
> >>+
> >>+static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
> >>+{
> >>+    CompareState *s = COLO_COMPARE(obj);
> >>+
> >>+    g_free(s->pri_indev);
> >>+    s->pri_indev = g_strdup(value);
> >>+}
> >>+
> >>+static char *compare_get_sec_indev(Object *obj, Error **errp)
> >>+{
> >>+    CompareState *s = COLO_COMPARE(obj);
> >>+
> >>+    return g_strdup(s->sec_indev);
> >>+}
> >>+
> >>+static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
> >>+{
> >>+    CompareState *s = COLO_COMPARE(obj);
> >>+
> >>+    g_free(s->sec_indev);
> >>+    s->sec_indev = g_strdup(value);
> >>+}
> >>+
> >>+static char *compare_get_outdev(Object *obj, Error **errp)
> >>+{
> >>+    CompareState *s = COLO_COMPARE(obj);
> >>+
> >>+    return g_strdup(s->outdev);
> >>+}
> >>+
> >>+static void compare_set_outdev(Object *obj, const char *value, Error **errp)
> >>+{
> >>+    CompareState *s = COLO_COMPARE(obj);
> >>+
> >>+    g_free(s->outdev);
> >>+    s->outdev = g_strdup(value);
> >>+}
> >>+
> >>+static void colo_compare_complete(UserCreatable *uc, Error **errp)
> >>+{
> >>+    CompareState *s = COLO_COMPARE(uc);
> >>+
> >>+    if (!s->pri_indev || !s->sec_indev || !s->outdev) {
> >>+        error_setg(errp, "colo compare needs 'primary_in' ,"
> >>+                   "'secondary_in','outdev' property set");
> >>+        return;
> >>+    } else if (!strcmp(s->pri_indev, s->outdev) ||
> >>+               !strcmp(s->sec_indev, s->outdev) ||
> >>+               !strcmp(s->pri_indev, s->sec_indev)) {
> >>+        error_setg(errp, "'indev' and 'outdev' could not be same "
> >>+                   "for compare module");
> >>+        return;
> >>+    }
> >>+
> >>+    s->chr_pri_in = qemu_chr_find(s->pri_indev);
> >>+    if (s->chr_pri_in == NULL) {
> >>+        error_set(errp, ERROR_CLASS_DEVICE_NOT_FOUND,
> >I think error_set seems to be discouraged these days, just use error_setg
> >(see the comment in include/qapi/error.h just above error_set).
> >
> >>+                  "IN Device '%s' not found", s->pri_indev);
> >>+        goto out;
> >>+    }
> >>+
> >>+    qemu_chr_fe_claim_no_fail(s->chr_pri_in);
> >>+    qemu_chr_add_handlers(s->chr_pri_in, compare_chr_can_read,
> >>+                          compare_pri_chr_in, NULL, s);
> >>+
> >>+    s->chr_sec_in = qemu_chr_find(s->sec_indev);
> >>+    if (s->chr_sec_in == NULL) {
> >>+        error_set(errp, ERROR_CLASS_DEVICE_NOT_FOUND,
> >>+                  "IN Device '%s' not found", s->sec_indev);
> >>+        goto out;
> >>+    }
> >Can you explain/give an example of the way you create one of these
> >filters?
> >Would you ever have a pri_indev and sec_indev on the same filter?
> >If not, then why not just have an 'indev' option rather than the
> >two separate configs.
> >If you cna have both then you need to change hte error 'IN Device'
> >to say either 'Primary IN device' or secondary.
> >
> >>+    qemu_chr_fe_claim_no_fail(s->chr_sec_in);
> >>+    qemu_chr_add_handlers(s->chr_sec_in, compare_chr_can_read,
> >>+                          compare_sec_chr_in, NULL, s);
> >>+
> >>+    s->chr_out = qemu_chr_find(s->outdev);
> >>+    if (s->chr_out == NULL) {
> >>+        error_set(errp, ERROR_CLASS_DEVICE_NOT_FOUND,
> >>+                  "OUT Device '%s' not found", s->outdev);
> >>+        goto out;
> >>+    }
> >>+    qemu_chr_fe_claim_no_fail(s->chr_out);
> >>+
> >>+    QTAILQ_INSERT_TAIL(&net_compares, s, next);
> >>+
> >>+    return;
> >>+
> >>+out:
> >>+    if (s->chr_pri_in) {
> >>+        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
> >>+        qemu_chr_fe_release(s->chr_pri_in);
> >>+        s->chr_pri_in = NULL;
> >>+    }
> >>+    if (s->chr_sec_in) {
> >>+        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
> >>+        qemu_chr_fe_release(s->chr_sec_in);
> >>+        s->chr_pri_in = NULL;
> >>+    }
> >Can't you avoid this by making the code:
> >
> >      s->chr_pri_in = qemu_chr_find(...)
> >      if (s->chr_pri_in == NULL) {
> >         error (...)
> >      }
> >      s->chr_sec_in = qemu_chr_find(...)
> >      if (s->chr_sec_in == NULL) {
> >         error (...)
> >      }
> >      s->chr_out = qemu_chr_find(...)
> >      if (s->chr_out == NULL) {
> >         error (...)
> >      }
> >
> >      qemu_chr_fe_claim_no_fail(pri)
> >      add_handlers(pri...)
> >      qemu_chr_fe_claim_no_fail(sec)
> >      add_handlers(sec...)
> >      qemu_chr_fe_claim_no_fail(out)
> >      add_handlers(out...)
> >
> >so you don't have to clean up the handlers/release in the out: ?
> >
> >>+}
> >>+
> >>+static void colo_compare_class_init(ObjectClass *oc, void *data)
> >>+{
> >>+    UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
> >>+
> >>+    ucc->complete = colo_compare_complete;
> >>+}
> >>+
> >>+static void colo_compare_class_finalize(ObjectClass *oc, void *data)
> >>+{
> >>+    UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
> >>+    CompareState *s = COLO_COMPARE(ucc);
> >>+
> >>+    if (s->chr_pri_in) {
> >>+        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
> >>+        qemu_chr_fe_release(s->chr_pri_in);
> >>+    }
> >>+    if (s->chr_sec_in) {
> >>+        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
> >>+        qemu_chr_fe_release(s->chr_sec_in);
> >>+    }
> >>+    if (s->chr_out) {
> >>+        qemu_chr_fe_release(s->chr_out);
> >>+    }
> >>+
> >>+    if (!QTAILQ_EMPTY(&net_compares)) {
> >>+        QTAILQ_REMOVE(&net_compares, s, next);
> >>+    }
> >>+}
> >>+
> >>+static void colo_compare_init(Object *obj)
> >>+{
> >>+    object_property_add_str(obj, "primary_in",
> >>+                            compare_get_pri_indev, compare_set_pri_indev,
> >>+                            NULL);
> >>+    object_property_add_str(obj, "secondary_in",
> >>+                            compare_get_sec_indev, compare_set_sec_indev,
> >>+                            NULL);
> >>+    object_property_add_str(obj, "outdev",
> >>+                            compare_get_outdev, compare_set_outdev,
> >>+                            NULL);
> >>+}
> >>+
> >>+static void colo_compare_finalize(Object *obj)
> >>+{
> >>+    CompareState *s = COLO_COMPARE(obj);
> >>+
> >>+    g_free(s->pri_indev);
> >>+    g_free(s->sec_indev);
> >>+    g_free(s->outdev);
> >>+}
> >>+
> >>+static const TypeInfo colo_compare_info = {
> >>+    .name = TYPE_COLO_COMPARE,
> >>+    .parent = TYPE_OBJECT,
> >>+    .instance_size = sizeof(CompareState),
> >>+    .instance_init = colo_compare_init,
> >>+    .instance_finalize = colo_compare_finalize,
> >>+    .class_size = sizeof(CompareState),
> >>+    .class_init = colo_compare_class_init,
> >>+    .class_finalize = colo_compare_class_finalize,
> >>+    .interfaces = (InterfaceInfo[]) {
> >>+        { TYPE_USER_CREATABLE },
> >>+        { }
> >>+    }
> >>+};
> >>+
> >>+static void register_types(void)
> >>+{
> >>+    type_register_static(&colo_compare_info);
> >>+}
> >>+
> >>+type_init(register_types);
> >>diff --git a/vl.c b/vl.c
> >>index dc6e63a..70064ad 100644
> >>--- a/vl.c
> >>+++ b/vl.c
> >>@@ -2842,7 +2842,8 @@ static bool object_create_initial(const char *type)
> >>      if (g_str_equal(type, "filter-buffer") ||
> >>          g_str_equal(type, "filter-dump") ||
> >>          g_str_equal(type, "filter-mirror") ||
> >>-        g_str_equal(type, "filter-redirector")) {
> >>+        g_str_equal(type, "filter-redirector") ||
> >>+        g_str_equal(type, "colo-compare")) {
> >>          return false;
> >>      }
> >>-- 
> >>1.9.1
> >>
> >>
> >>
> >--
> >Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
> >
> >
> >.
> >
> 
> -- 
> Thanks
> zhangchen
> 
> 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH V2 0/3] Introduce COLO-compare
  2016-03-31  3:01   ` Li Zhijian
@ 2016-03-31  9:43     ` Dr. David Alan Gilbert
  2016-04-01  1:40       ` Li Zhijian
  0 siblings, 1 reply; 26+ messages in thread
From: Dr. David Alan Gilbert @ 2016-03-31  9:43 UTC (permalink / raw)
  To: Li Zhijian
  Cc: Zhang Chen, Gui jianfeng, Jason Wang, eddie.dong, qemu devel,
	Yang Hongyang, zhanghailiang

* Li Zhijian (lizhijian@cn.fujitsu.com) wrote:
> 
> 
> On 03/30/2016 08:05 PM, Dr. David Alan Gilbert wrote:
> >* Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
> >>COLO-compare is a part of COLO project. It is used
> >>to compare the network package to help COLO decide
> >>whether to do checkpoint.
> >
> >Hi Zhang Chen,
> >   I've put comments on the individual patches, but some more general things:
> >
> >   1) Please add a coment giving the example of the command line for the primary
> >     and secondary use of this module - it helps make it easier to understand the patches.
> >
> >   2) There's no tracing in here - please add some; I found when I tried to get
> >     COLO working I needed to use lots of tracing and debugging to understand the
> >     packet flow.
> >
> >   3) Add comments; e.g. for each function say which thread is using it and where
> >      the packets are coming from; e.g.
> >         called from the main thread on the primary for packets arriving over the socket
> >         from the secondary.
> >
> >      There's just so many packets going in so many directions it would make it
> >      easier to follow.
> >
> >   4) A more fundamental problem is what happens if the secondary never sends anything
> >      on the socket, the result is you end up running until the end of the long COLO
> >      checkpoint without triggering a discompare - in my world I added a timeout (400ms)
> >      for an unmatched packet from the primary, where if no matching packet was received
> >      a checkpoint would be triggered.
> >
> >   5) I see the packet comparison is still the simple memcmpy that you had in December;
> >      are you planning on doing anything more complicated; you must be seing most packets
> >      miscompare?
> >
> >You can see my current world at; https://github.com/orbitfp7/qemu/commits/orbit-wp4-colo-mar16
> >which has my basic TCP comparison (it's only tracking incoming connections) and I know it's
> >not complete either.  It mostly works OK, although I've got an occasional seg
> >(which makes me wonder if I need to add the conn_list_lock I see you added).  I'm also
> >not doing any TCP reassembly which is probably needed.
> >
> Thank you very much for your comments.
> I just see you tree, you put in a lot of work(tcp comparison enhance, sequence/acknowledge
> number re-write, timeout...)
> 
> Actually, this compare module is just in a RFC stage(only including compare frame), there are
> many works to be done:
> 
> 1) Integrate to COLO frame(and Let COLO primary and secondary at running state)

Yes; although I think you've had most of the code for that, also feel free to use
any of the bits I've changed.

> 2) ip segment defrag

Yes, this seems the hardest bit to me. It is needed for some workloads; for example
a simple case if just an ssh connection, the differences in timing between the primary
and secondary you see that the primary might send two short packets and the secondary
might send one long packet with the same data.

> 3) comparison base on the sequence number(tcp and udp) if packet has
>    Because tcp re-transmission is quit common. IRC, your code will compare the whole tcp
>     packet(sequence number will be compare)

Yes; once the secondary reworks the sequence number I found the seuence number
matched most of the time on the primary.   One problem (depending on the traffic)
is that the ack number might not match and I'm not sure of the best fix, for example,
consider:

    a) Send packet 1
    b) Send packet 2
    c) Receive response

  The order of b & c is random - if the response on the same socket arrives (c) before (b)
then the ack number in packet 2 is different; on one workload this caused a lot of miscompares
but only if the timing is just wrong.

(I also had to turn off TCP timestamping to get useful comparisons)

> 4) packet belongs to the same connection is sort by sequence number
> 
> 5) Out-Of-Oder packet handle

I think 4 & 5 both happen as part of the defrag; out of order packets were a problem for
me on some of the workloads; I had to turn off multiqueue in one case.

> 6) cleanup the un-active conn_list which maybe closed. the simple way is to introduce a
>    timer to record whether a connection have packet come within a timeout, connection gone
>     beyond this timeout should be cleanup.

At the moment that isn't a big problem because when you receive the next checkpoint you
can flush the list.
The only case where you have to deal with this is for continuous failover when the
original secondary is promoted to primary, then it's connection list has to live
on longer for any connections it created prior to the failover.
Perhaps this gets more complex with defrag?

> 7) Dave point out above (4)
> 
> 8) something I miss...
> 
> For Various reasons, not all the works can be done immediately, So we hope to discuss and
> decide which function have the high priority.
> Any comments and suggestions are welcome.

Yes, there's a lot of work;  as I say, feel free to use any of my patches
from the world above, I wasn't planning on doing much more work on that set.

> IMO, a compare frame and a COLO frame hack patch could be simple enough.

I think you'd have to show that you got some useful comparison matches;
if it almost always failed the comparison then I can't see the point.

Dave

> 
> Thanks
> Li
> 
> >Dave
> >
> >>v2:
> >>  - add jhash.h
> >>
> >>v1:
> >>  - initial patch
> >>
> >>
> >>Zhang Chen (3):
> >>   colo-compare: introduce colo compare initlization
> >>   colo-compare: track connection and enqueue packet
> >>   colo-compare: introduce packet comparison thread
> >>
> >>  include/qemu/jhash.h |  59 ++++
> >>  net/Makefile.objs    |   1 +
> >>  net/colo-compare.c   | 782 +++++++++++++++++++++++++++++++++++++++++++++++++++
> >>  vl.c                 |   3 +-
> >>  4 files changed, 844 insertions(+), 1 deletion(-)
> >>  create mode 100644 include/qemu/jhash.h
> >>  create mode 100644 net/colo-compare.c
> >>
> >>--
> >>1.9.1
> >>
> >>
> >>
> >--
> >Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
> >
> >
> >.
> >
> 
> -- 
> Best regards.
> Li Zhijian (8555)
> 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH V2 0/3] Introduce COLO-compare
  2016-03-31  9:43     ` Dr. David Alan Gilbert
@ 2016-04-01  1:40       ` Li Zhijian
  0 siblings, 0 replies; 26+ messages in thread
From: Li Zhijian @ 2016-04-01  1:40 UTC (permalink / raw)
  To: Dr. David Alan Gilbert
  Cc: Zhang Chen, Gui jianfeng, Jason Wang, eddie.dong, qemu devel,
	Yang Hongyang, zhanghailiang



On 03/31/2016 05:43 PM, Dr. David Alan Gilbert wrote:
> * Li Zhijian (lizhijian@cn.fujitsu.com) wrote:
>>
>>
>> On 03/30/2016 08:05 PM, Dr. David Alan Gilbert wrote:
>>> * Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
>>>> COLO-compare is a part of COLO project. It is used
>>>> to compare the network package to help COLO decide
>>>> whether to do checkpoint.
>>>
>>> Hi Zhang Chen,
>>>    I've put comments on the individual patches, but some more general things:
>>>
>>>    1) Please add a coment giving the example of the command line for the primary
>>>      and secondary use of this module - it helps make it easier to understand the patches.
>>>
>>>    2) There's no tracing in here - please add some; I found when I tried to get
>>>      COLO working I needed to use lots of tracing and debugging to understand the
>>>      packet flow.
>>>
>>>    3) Add comments; e.g. for each function say which thread is using it and where
>>>       the packets are coming from; e.g.
>>>          called from the main thread on the primary for packets arriving over the socket
>>>          from the secondary.
>>>
>>>       There's just so many packets going in so many directions it would make it
>>>       easier to follow.
>>>
>>>    4) A more fundamental problem is what happens if the secondary never sends anything
>>>       on the socket, the result is you end up running until the end of the long COLO
>>>       checkpoint without triggering a discompare - in my world I added a timeout (400ms)
>>>       for an unmatched packet from the primary, where if no matching packet was received
>>>       a checkpoint would be triggered.
>>>
>>>    5) I see the packet comparison is still the simple memcmpy that you had in December;
>>>       are you planning on doing anything more complicated; you must be seing most packets
>>>       miscompare?
>>>
>>> You can see my current world at; https://github.com/orbitfp7/qemu/commits/orbit-wp4-colo-mar16
>>> which has my basic TCP comparison (it's only tracking incoming connections) and I know it's
>>> not complete either.  It mostly works OK, although I've got an occasional seg
>>> (which makes me wonder if I need to add the conn_list_lock I see you added).  I'm also
>>> not doing any TCP reassembly which is probably needed.
>>>
>> Thank you very much for your comments.
>> I just see you tree, you put in a lot of work(tcp comparison enhance, sequence/acknowledge
>> number re-write, timeout...)
>>
>> Actually, this compare module is just in a RFC stage(only including compare frame), there are
>> many works to be done:
>>
>> 1) Integrate to COLO frame(and Let COLO primary and secondary at running state)
>
> Yes; although I think you've had most of the code for that, also feel free to use
> any of the bits I've changed.
Thank you, we will add this part to next version.

>
>> 2) ip segment defrag
>
> Yes, this seems the hardest bit to me. It is needed for some workloads; for example
> a simple case if just an ssh connection, the differences in timing between the primary
> and secondary you see that the primary might send two short packets and the secondary
> might send one long packet with the same data.
OK, let it as a TODO issue

>
>> 3) comparison base on the sequence number(tcp and udp) if packet has
>>     Because tcp re-transmission is quit common. IRC, your code will compare the whole tcp
>>      packet(sequence number will be compare)
>
> Yes; once the secondary reworks the sequence number I found the seuence number
> matched most of the time on the primary.
well, let (3) (4) (5) as a TODO issue too or move it to an extra patchset(depending on 3)


>  One problem (depending on the traffic)
> is that the ack number might not match and I'm not sure of the best fix, for example,
> consider:
>
>      a) Send packet 1
>      b) Send packet 2
>      c) Receive response
>
>    The order of b & c is random - if the response on the same socket arrives (c) before (b)
> then the ack number in packet 2 is different; on one workload this caused a lot of miscompares
> but only if the timing is just wrong.
Yes, I've had this problem too.
the logic is a bit complicated, as kernel colo-proxy, we compare the tcp playload(excluding sequence/ack number)
only. And introduce the 'max_ack'(max_ack = MAX(primary_max_ack, secondary_max_ack)) to guarantee
the packet which ack is <= 'max_ack' can be release to client.

>
> (I also had to turn off TCP timestamping to get useful comparisons)
Yes, it works.

>
>> 4) packet belongs to the same connection is sort by sequence number
>>
>> 5) Out-Of-Oder packet handle
>
> I think 4 & 5 both happen as part of the defrag; out of order packets were a problem for
> me on some of the workloads; I had to turn off multiqueue in one case.
>
>> 6) cleanup the un-active conn_list which maybe closed. the simple way is to introduce a
>>     timer to record whether a connection have packet come within a timeout, connection gone
>>      beyond this timeout should be cleanup.
>
> At the moment that isn't a big problem because when you receive the next checkpoint you
> can flush the list.
> The only case where you have to deal with this is for continuous failover when the
> original secondary is promoted to primary, then it's connection list has to live
> on longer for any connections it created prior to the failover.
> Perhaps this gets more complex with defrag?
>
Yes, that need comparison support save/restore operation. move to TODO list.


>> 7) Dave point out above (4)
It make sense, we will consider add this one.


>>
>> 8) something I miss...
>>
>> For Various reasons, not all the works can be done immediately, So we hope to discuss and
>> decide which function have the high priority.
>> Any comments and suggestions are welcome.
>
> Yes, there's a lot of work;  as I say, feel free to use any of my patches
> from the world above, I wasn't planning on doing much more work on that set.
>
>> IMO, a compare frame and a COLO frame hack patch could be simple enough.
>
> I think you'd have to show that you got some useful comparison matches;
> if it almost always failed the comparison then I can't see the point.
>

In a word, we will enhance the comparison and try to add (1) and (7) in next version.
PS, we will pick code from you tree. ^_^

Thanks
Li Zhijian

> Dave
>
>>
>> Thanks
>> Li
>>
>>> Dave
>>>
>>>> v2:
>>>>   - add jhash.h
>>>>
>>>> v1:
>>>>   - initial patch
>>>>
>>>>
>>>> Zhang Chen (3):
>>>>    colo-compare: introduce colo compare initlization
>>>>    colo-compare: track connection and enqueue packet
>>>>    colo-compare: introduce packet comparison thread
>>>>
>>>>   include/qemu/jhash.h |  59 ++++
>>>>   net/Makefile.objs    |   1 +
>>>>   net/colo-compare.c   | 782 +++++++++++++++++++++++++++++++++++++++++++++++++++
>>>>   vl.c                 |   3 +-
>>>>   4 files changed, 844 insertions(+), 1 deletion(-)
>>>>   create mode 100644 include/qemu/jhash.h
>>>>   create mode 100644 net/colo-compare.c
>>>>
>>>> --
>>>> 1.9.1
>>>>
>>>>
>>>>
>>> --
>>> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
>>>
>>>
>>> .
>>>
>>
>> --
>> Best regards.
>> Li Zhijian (8555)
>>
>>
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
>
>
> .
>

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH V2 1/3] colo-compare: introduce colo compare initlization
  2016-03-31  9:24       ` Dr. David Alan Gilbert
@ 2016-04-01  5:11         ` Jason Wang
  2016-04-01  5:41           ` Li Zhijian
  0 siblings, 1 reply; 26+ messages in thread
From: Jason Wang @ 2016-04-01  5:11 UTC (permalink / raw)
  To: Dr. David Alan Gilbert, Zhang Chen
  Cc: zhanghailiang, Li Zhijian, Gui jianfeng, eddie.dong, qemu devel,
	Yang Hongyang



On 03/31/2016 05:24 PM, Dr. David Alan Gilbert wrote:
> * Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
>>
>> On 03/30/2016 05:25 PM, Dr. David Alan Gilbert wrote:
>>> * Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
>>>> packet come from primary char indev will be send to
>>>> outdev - packet come from secondary char dev will be drop
>>> Please put in the description an example of how you invoke
>>> the filter on the primary and secondary.
>> OK.
>> colo-compare get packet from chardev(primary_in,secondary_in),
>> and output to other chardev(outdev), so, we can use it with the
>> help of filter-mirror and filter-redirector. like that:
> Wow, this is a bit more complicated than I expected - I was expecting just one
> socket.  So let me draw this out; see comments below.
>
>> primary:
>> -netdev
>> tap,id=hn0,vhost=off,script=/etc/qemu-ifup,downscript=/etc/qemu-ifdown
>> -device e1000,id=e0,netdev=hn0,mac=52:a4:00:12:78:66
>> -chardev socket,id=mirror0,host=3.3.3.3,port=9003,server,nowait
>> -chardev socket,id=compare1,host=3.3.3.3,port=9004,server,nowait
>> -chardev socket,id=compare0,host=3.3.3.3,port=9001,server,nowait
>> -chardev socket,id=compare0-0,host=3.3.3.3,port=9001
>> -chardev socket,id=compare_out,host=3.3.3.3,port=9005,server,nowait
>> -chardev socket,id=compare_out0,host=3.3.3.3,port=9005
>> -object filter-mirror,id=m0,netdev=hn0,queue=tx,outdev=mirror0
>> -object filter-redirector,netdev=hn0,id=redire0,queue=rx,indev=compare_out
>> -object filter-redirector,netdev=hn0,id=redire1,queue=rx,outdev=compare0
>> -object colo-compare,id=comp0,primary_in=compare0-0,secondary_in=compare1,outdev=compare_out0
>             ----> mirror0: socket:secondary:9003
>             |
>         mirror-m0     <-- e1000
>             |
>             v
>         redirector-redire1 ---> compare0 socket:primary:9001 (to compare0-0)
>                           
>             -----< compare0-0 socket:primary:9001 (to compare0)
>             |  primary_in
>             |
>         compare-comp0       -----> compare_out0 socket:primary:9005
>             |
>             |  secondary_in
>             -----< compare1   socket:secondary:9004
>
> tap <-- redirector-redire0 <--- compare_out socket:primary:9005 (from compare_out0)
>
>
>> secondary:
>> -netdev tap,id=hn0,vhost=off,script=/etc/qemu-ifup,down
>> script=/etc/qemu-ifdown
>> -device e1000,netdev=hn0,mac=52:a4:00:12:78:66
>> -chardev socket,id=red0,host=3.3.3.3,port=9003
>> -chardev socket,id=red1,host=3.3.3.3,port=9004
>> -object filter-redirector,id=f1,netdev=hn0,queue=tx,indev=red0
>> -object filter-redirector,id=f2,netdev=hn0,queue=rx,outdev=red1
>             ----> red0 socket::9003
>             |
> tap <-- redirector-f1 <--
>                            e1000
>     --> redirector-f2 -->
>             |
>             ----< red1 socket::9004
>
> OK, so I think we need to find a way to fix two things:
>    a) There must be an easier way of connecting two filters within the same
>       qemu than going up to the kernel's socket code, around it's TCP stack
>       and back.  Is there a better type of chardev to use we can short circuit
>       with - it shouldn't need to leave QEMU (although I can see it's easier
>       for debugging like this).  Jason/Dan - any ideas?

Looks like there's no such type of chardev. I think we could start with
tcp socket chardev first and do necessary optimization on top. In fact,
there's also advantages, with tcp sockets, the colo-compare codes could
even be an independent program out of qemu.

For the chardev type, this also reminds me one thing. Since
mirror/redirector can only work for tcp socket, we may need inspect the
chardev types and fail if not a tcp socket like what vhost-user did.


>
>    b) We should only need one socket for the connection between primary and
>       secondary - I'm not sure how to change it to let it do that.

Looks like we can (e.g for secondary):

-device e1000,netdev=hn0,mac=52:a4:00:12:78:66
-chardev socket,id=red0,host=3.3.3.3,port=9003
-object filter-redirector,id=f1,netdev=hn0,queue=tx,indev=red0
-object filter-redirector,id=f2,netdev=hn0,queue=rx,outdev=red0

If not, probably a bug.

>    c) You need to be able to turn off nagling (socket no delay) on the sockets;
>      I found a noticeable benefit of doing this on the connection between primary
>      and secondary in my world.

Right.

>
> Dave
>

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH V2 1/3] colo-compare: introduce colo compare initlization
  2016-04-01  5:11         ` Jason Wang
@ 2016-04-01  5:41           ` Li Zhijian
  0 siblings, 0 replies; 26+ messages in thread
From: Li Zhijian @ 2016-04-01  5:41 UTC (permalink / raw)
  To: Jason Wang, Dr. David Alan Gilbert, Zhang Chen
  Cc: Gui jianfeng, Yang Hongyang, eddie.dong, zhanghailiang, qemu devel



On 04/01/2016 01:11 PM, Jason Wang wrote:
>
>
> On 03/31/2016 05:24 PM, Dr. David Alan Gilbert wrote:
>> * Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
>>>
>>> On 03/30/2016 05:25 PM, Dr. David Alan Gilbert wrote:
>>>> * Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
>>>>> packet come from primary char indev will be send to
>>>>> outdev - packet come from secondary char dev will be drop
>>>> Please put in the description an example of how you invoke
>>>> the filter on the primary and secondary.
>>> OK.
>>> colo-compare get packet from chardev(primary_in,secondary_in),
>>> and output to other chardev(outdev), so, we can use it with the
>>> help of filter-mirror and filter-redirector. like that:
>> Wow, this is a bit more complicated than I expected - I was expecting just one
>> socket.  So let me draw this out; see comments below.
>>
>>> primary:
>>> -netdev
>>> tap,id=hn0,vhost=off,script=/etc/qemu-ifup,downscript=/etc/qemu-ifdown
>>> -device e1000,id=e0,netdev=hn0,mac=52:a4:00:12:78:66
>>> -chardev socket,id=mirror0,host=3.3.3.3,port=9003,server,nowait
>>> -chardev socket,id=compare1,host=3.3.3.3,port=9004,server,nowait
>>> -chardev socket,id=compare0,host=3.3.3.3,port=9001,server,nowait
>>> -chardev socket,id=compare0-0,host=3.3.3.3,port=9001
>>> -chardev socket,id=compare_out,host=3.3.3.3,port=9005,server,nowait
>>> -chardev socket,id=compare_out0,host=3.3.3.3,port=9005
>>> -object filter-mirror,id=m0,netdev=hn0,queue=tx,outdev=mirror0
>>> -object filter-redirector,netdev=hn0,id=redire0,queue=rx,indev=compare_out
>>> -object filter-redirector,netdev=hn0,id=redire1,queue=rx,outdev=compare0
>>> -object colo-compare,id=comp0,primary_in=compare0-0,secondary_in=compare1,outdev=compare_out0
>>              ----> mirror0: socket:secondary:9003
>>              |
>>          mirror-m0     <-- e1000
>>              |
>>              v
>>          redirector-redire1 ---> compare0 socket:primary:9001 (to compare0-0)
>>
>>              -----< compare0-0 socket:primary:9001 (to compare0)
>>              |  primary_in
>>              |
>>          compare-comp0       -----> compare_out0 socket:primary:9005
>>              |
>>              |  secondary_in
>>              -----< compare1   socket:secondary:9004
>>
>> tap <-- redirector-redire0 <--- compare_out socket:primary:9005 (from compare_out0)
>>
>>
>>> secondary:
>>> -netdev tap,id=hn0,vhost=off,script=/etc/qemu-ifup,down
>>> script=/etc/qemu-ifdown
>>> -device e1000,netdev=hn0,mac=52:a4:00:12:78:66
>>> -chardev socket,id=red0,host=3.3.3.3,port=9003
>>> -chardev socket,id=red1,host=3.3.3.3,port=9004
>>> -object filter-redirector,id=f1,netdev=hn0,queue=tx,indev=red0
>>> -object filter-redirector,id=f2,netdev=hn0,queue=rx,outdev=red1
>>              ----> red0 socket::9003
>>              |
>> tap <-- redirector-f1 <--
>>                             e1000
>>      --> redirector-f2 -->
>>              |
>>              ----< red1 socket::9004
>>
>> OK, so I think we need to find a way to fix two things:
>>     a) There must be an easier way of connecting two filters within the same
>>        qemu than going up to the kernel's socket code, around it's TCP stack
>>        and back.  Is there a better type of chardev to use we can short circuit
>>        with - it shouldn't need to leave QEMU (although I can see it's easier
>>        for debugging like this).  Jason/Dan - any ideas?
>
> Looks like there's no such type of chardev. I think we could start with
> tcp socket chardev first and do necessary optimization on top. In fact,
> there's also advantages, with tcp sockets, the colo-compare codes could
> even be an independent program out of qemu.
>
> For the chardev type, this also reminds me one thing. Since
> mirror/redirector can only work for tcp socket, we may need inspect the
> chardev types and fail if not a tcp socket like what vhost-user did.
>
>
>>
>>     b) We should only need one socket for the connection between primary and
>>        secondary - I'm not sure how to change it to let it do that.
>
> Looks like we can (e.g for secondary):
>
> -device e1000,netdev=hn0,mac=52:a4:00:12:78:66
> -chardev socket,id=red0,host=3.3.3.3,port=9003
> -object filter-redirector,id=f1,netdev=hn0,queue=tx,indev=red0
> -object filter-redirector,id=f2,netdev=hn0,queue=rx,outdev=red0
>
> If not, probably a bug.

Right,
but firstly, 'chardev socket' need to enable 'mux=on' if we want two filter reference the
same socket.

Thanks

>
>>     c) You need to be able to turn off nagling (socket no delay) on the sockets;
>>       I found a noticeable benefit of doing this on the connection between primary
>>       and secondary in my world.
>
> Right.
>
>>
>> Dave
>>
>
>
>
>
> .
>

^ permalink raw reply	[flat|nested] 26+ messages in thread

* Re: [Qemu-devel] [PATCH V2 1/3] colo-compare: introduce colo compare initlization
  2016-03-30  9:25   ` Dr. David Alan Gilbert
  2016-03-31  1:41     ` Zhang Chen
@ 2016-04-13  2:02     ` Zhang Chen
  1 sibling, 0 replies; 26+ messages in thread
From: Zhang Chen @ 2016-04-13  2:02 UTC (permalink / raw)
  To: Dr. David Alan Gilbert
  Cc: qemu devel, Jason Wang, Li Zhijian, Gui jianfeng, Wen Congyang,
	zhanghailiang, Yang Hongyang, eddie.dong

>> +    if (!size) {
>> +        return 0;
>> +    }
>> +
>> +    ret = qemu_chr_fe_write_all(out, (uint8_t *)&len, sizeof(len));
>> +    if (ret != sizeof(len)) {
>> +        goto err;
>> +    }
>> +
>> +    ret = qemu_chr_fe_write_all(out, (uint8_t *)buf, size);
>> +    if (ret != size) {
>> +        goto err;
>> +    }
>> +
> You can make this slightly simpler and save the return 0;

If we want to save the return 0 , the code will be changed like that:

err:
     return (ret < 0 || ret == size) ? ret : -EIO;

I think it too complex to be understood, so should we keep the original ?

>> +    return 0;
>> +
>> +err:
>> +    return ret < 0 ? ret : -EIO;
> err:
>         return ret <= 0 ? ret : -EIO;

This is wrong, if qemu_chr_fe_write_all success, ret will equal size.
return -EIO.

>> +}
>> +
>> +static int compare_chr_can_read(void *opaque)
>> +{
>> +    return COMPARE_READ_LEN_MAX;
>> +}
>>

-- 
Thanks
zhangchen

^ permalink raw reply	[flat|nested] 26+ messages in thread

end of thread, other threads:[~2016-04-13  2:01 UTC | newest]

Thread overview: 26+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-03-30  8:35 [Qemu-devel] [PATCH V2 0/3] Introduce COLO-compare Zhang Chen
2016-03-30  8:35 ` [Qemu-devel] [PATCH V2 1/3] colo-compare: introduce colo compare initlization Zhang Chen
2016-03-30  9:25   ` Dr. David Alan Gilbert
2016-03-31  1:41     ` Zhang Chen
2016-03-31  7:25       ` Zhang Chen
2016-03-31  9:24       ` Dr. David Alan Gilbert
2016-04-01  5:11         ` Jason Wang
2016-04-01  5:41           ` Li Zhijian
2016-04-13  2:02     ` Zhang Chen
2016-03-30  8:35 ` [Qemu-devel] [PATCH V2 2/3] colo-compare: track connection and enqueue packet Zhang Chen
2016-03-30 10:36   ` Dr. David Alan Gilbert
2016-03-31  2:09     ` Li Zhijian
2016-03-31  8:47       ` Dr. David Alan Gilbert
2016-03-31  4:06     ` Zhang Chen
2016-03-31  4:23       ` Li Zhijian
2016-03-31  4:44         ` Zhang Chen
2016-03-30  8:35 ` [Qemu-devel] [PATCH V2 3/3] colo-compare: introduce packet comparison thread Zhang Chen
2016-03-30 11:41   ` Dr. David Alan Gilbert
2016-03-31  2:17     ` Li Zhijian
2016-03-31  8:50       ` Dr. David Alan Gilbert
2016-03-31  6:00     ` Zhang Chen
2016-03-30 12:05 ` [Qemu-devel] [PATCH V2 0/3] Introduce COLO-compare Dr. David Alan Gilbert
2016-03-31  3:01   ` Li Zhijian
2016-03-31  9:43     ` Dr. David Alan Gilbert
2016-04-01  1:40       ` Li Zhijian
2016-03-31  6:48   ` Zhang Chen

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.