All of lore.kernel.org
 help / color / mirror / Atom feed
* [Qemu-devel] [RFC PATCH V5 0/4] Introduce COLO-compare
@ 2016-06-23 11:34 Zhang Chen
  2016-06-23 11:34 ` [Qemu-devel] [RFC PATCH V5 1/4] colo-compare: introduce colo compare initialization Zhang Chen
                   ` (4 more replies)
  0 siblings, 5 replies; 21+ messages in thread
From: Zhang Chen @ 2016-06-23 11:34 UTC (permalink / raw)
  To: qemu devel, Jason Wang
  Cc: Zhang Chen, Li Zhijian, Wen Congyang, zhanghailiang,
	eddie . dong, Dr . David Alan Gilbert

COLO-compare is a part of COLO project. It is used
to compare the network package to help COLO decide
whether to do checkpoint.

the full version in this github:
https://github.com/zhangckid/qemu/tree/colo-v2.7-proxy-mode-compare-with-colo-base-jun20

v5:
 p3:
    - comments from Jason
      we poll and handle chardev in comapre thread,
      Through this way, there's no need for extra 
      synchronization with main loop
      this depend on another patch:
      qemu-char: Fix context for g_source_attach()
    - remove QemuEvent
 p2:
    - remove conn->list_lock
 p1:
    - move compare_pri/sec_chr_in to p3
    - move compare_chr_send to p2

v4:
 p4:
    - add some comments
    - fix some trace-events
    - fix tcp compare error
 p3:
    - add rcu_read_lock().
    - fix trace name
    - fix jason's other comments
    - rebase some Dave's branch function
 p2:
    - colo_compare_connection() change g_queue_push_head() to
    - g_queue_push_tail() match to sorted order.
    - remove pkt->s
    - move data structure to colo-base.h
    - add colo-base.c reuse codes for filter-rewriter
    - add some filter-rewriter needs struct
    - depends on previous SocketReadState patch
 p1:
    - except move qemu_chr_add_handlers()
      to colo thread
    - remove class_finalize
    - remove secondary arp codes
    - depends on previous SocketReadState patch

v3:
  - rebase colo-compare to colo-frame v2.7
  - fix most of Dave's comments
    (except RCU)
  - add TCP,UDP,ICMP and other packet comparison
  - add trace-event
  - add some comments
  - other bug fix
  - add RFC index
  - add usage in patch 1/4

v2:
  - add jhash.h

v1:
  - initial patch


Zhang Chen (4):
  colo-compare: introduce colo compare initialization
  colo-compare: track connection and enqueue packet
  colo-compare: introduce packet comparison thread
  colo-compare: add TCP,UDP,ICMP packet comparison

 include/qemu/jhash.h |  61 +++++
 net/Makefile.objs    |   2 +
 net/colo-base.c      | 195 ++++++++++++++
 net/colo-base.h      |  91 +++++++
 net/colo-compare.c   | 742 +++++++++++++++++++++++++++++++++++++++++++++++++++
 qemu-options.hx      |  34 +++
 trace-events         |  11 +
 vl.c                 |   3 +-
 8 files changed, 1138 insertions(+), 1 deletion(-)
 create mode 100644 include/qemu/jhash.h
 create mode 100644 net/colo-base.c
 create mode 100644 net/colo-base.h
 create mode 100644 net/colo-compare.c

-- 
2.7.4

^ permalink raw reply	[flat|nested] 21+ messages in thread

* [Qemu-devel] [RFC PATCH V5 1/4] colo-compare: introduce colo compare initialization
  2016-06-23 11:34 [Qemu-devel] [RFC PATCH V5 0/4] Introduce COLO-compare Zhang Chen
@ 2016-06-23 11:34 ` Zhang Chen
  2016-07-08  3:40   ` Jason Wang
  2016-06-23 11:34 ` [Qemu-devel] [RFC PATCH V5 2/4] colo-compare: track connection and enqueue packet Zhang Chen
                   ` (3 subsequent siblings)
  4 siblings, 1 reply; 21+ messages in thread
From: Zhang Chen @ 2016-06-23 11:34 UTC (permalink / raw)
  To: qemu devel, Jason Wang
  Cc: Zhang Chen, Li Zhijian, Wen Congyang, zhanghailiang,
	eddie . dong, Dr . David Alan Gilbert

Packets coming from the primary char indev will be sent to outdev
Packets coming from the secondary char dev will be dropped

usage:

primary:
-netdev tap,id=hn0,vhost=off,script=/etc/qemu-ifup,downscript=/etc/qemu-ifdown
-device e1000,id=e0,netdev=hn0,mac=52:a4:00:12:78:66
-chardev socket,id=mirror0,host=3.3.3.3,port=9003,server,nowait
-chardev socket,id=compare1,host=3.3.3.3,port=9004,server,nowait
-chardev socket,id=compare0,host=3.3.3.3,port=9001,server,nowait
-chardev socket,id=compare0-0,host=3.3.3.3,port=9001
-chardev socket,id=compare_out,host=3.3.3.3,port=9005,server,nowait
-chardev socket,id=compare_out0,host=3.3.3.3,port=9005
-object filter-mirror,id=m0,netdev=hn0,queue=tx,outdev=mirror0
-object filter-redirector,netdev=hn0,id=redire0,queue=rx,indev=compare_out
-object filter-redirector,netdev=hn0,id=redire1,queue=rx,outdev=compare0
-object colo-compare,id=comp0,primary_in=compare0-0,secondary_in=compare1,outdev=compare_out0

secondary:
-netdev tap,id=hn0,vhost=off,script=/etc/qemu-ifup,down script=/etc/qemu-ifdown
-device e1000,netdev=hn0,mac=52:a4:00:12:78:66
-chardev socket,id=red0,host=3.3.3.3,port=9003
-chardev socket,id=red1,host=3.3.3.3,port=9004
-object filter-redirector,id=f1,netdev=hn0,queue=tx,indev=red0
-object filter-redirector,id=f2,netdev=hn0,queue=rx,outdev=red1

Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
---
 net/Makefile.objs  |   1 +
 net/colo-compare.c | 231 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 qemu-options.hx    |  34 ++++++++
 vl.c               |   3 +-
 4 files changed, 268 insertions(+), 1 deletion(-)
 create mode 100644 net/colo-compare.c

diff --git a/net/Makefile.objs b/net/Makefile.objs
index b7c22fd..ba92f73 100644
--- a/net/Makefile.objs
+++ b/net/Makefile.objs
@@ -16,3 +16,4 @@ common-obj-$(CONFIG_NETMAP) += netmap.o
 common-obj-y += filter.o
 common-obj-y += filter-buffer.o
 common-obj-y += filter-mirror.o
+common-obj-y += colo-compare.o
diff --git a/net/colo-compare.c b/net/colo-compare.c
new file mode 100644
index 0000000..a3e1456
--- /dev/null
+++ b/net/colo-compare.c
@@ -0,0 +1,231 @@
+/*
+ * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
+ * (a.k.a. Fault Tolerance or Continuous Replication)
+ *
+ * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
+ * Copyright (c) 2016 FUJITSU LIMITED
+ * Copyright (c) 2016 Intel Corporation
+ *
+ * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later.  See the COPYING file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/error-report.h"
+#include "qemu-common.h"
+#include "qapi/qmp/qerror.h"
+#include "qapi/error.h"
+#include "net/net.h"
+#include "net/vhost_net.h"
+#include "qom/object_interfaces.h"
+#include "qemu/iov.h"
+#include "qom/object.h"
+#include "qemu/typedefs.h"
+#include "net/queue.h"
+#include "sysemu/char.h"
+#include "qemu/sockets.h"
+#include "qapi-visit.h"
+#include "trace.h"
+
+#define TYPE_COLO_COMPARE "colo-compare"
+#define COLO_COMPARE(obj) \
+    OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
+
+#define COMPARE_READ_LEN_MAX NET_BUFSIZE
+
+static QTAILQ_HEAD(, CompareState) net_compares =
+       QTAILQ_HEAD_INITIALIZER(net_compares);
+
+typedef struct CompareState {
+    Object parent;
+
+    char *pri_indev;
+    char *sec_indev;
+    char *outdev;
+    CharDriverState *chr_pri_in;
+    CharDriverState *chr_sec_in;
+    CharDriverState *chr_out;
+    QTAILQ_ENTRY(CompareState) next;
+    SocketReadState pri_rs;
+    SocketReadState sec_rs;
+} CompareState;
+
+typedef struct CompareClass {
+    ObjectClass parent_class;
+} CompareClass;
+
+static char *compare_get_pri_indev(Object *obj, Error **errp)
+{
+    CompareState *s = COLO_COMPARE(obj);
+
+    return g_strdup(s->pri_indev);
+}
+
+static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
+{
+    CompareState *s = COLO_COMPARE(obj);
+
+    g_free(s->pri_indev);
+    s->pri_indev = g_strdup(value);
+}
+
+static char *compare_get_sec_indev(Object *obj, Error **errp)
+{
+    CompareState *s = COLO_COMPARE(obj);
+
+    return g_strdup(s->sec_indev);
+}
+
+static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
+{
+    CompareState *s = COLO_COMPARE(obj);
+
+    g_free(s->sec_indev);
+    s->sec_indev = g_strdup(value);
+}
+
+static char *compare_get_outdev(Object *obj, Error **errp)
+{
+    CompareState *s = COLO_COMPARE(obj);
+
+    return g_strdup(s->outdev);
+}
+
+static void compare_set_outdev(Object *obj, const char *value, Error **errp)
+{
+    CompareState *s = COLO_COMPARE(obj);
+
+    g_free(s->outdev);
+    s->outdev = g_strdup(value);
+}
+
+static void compare_pri_rs_finalize(SocketReadState *pri_rs)
+{
+    /* if packet_enqueue pri pkt failed we will send unsupported packet */
+}
+
+static void compare_sec_rs_finalize(SocketReadState *sec_rs)
+{
+    /* if packet_enqueue sec pkt failed we will notify trace */
+}
+
+/*
+ * called from the main thread on the primary
+ * to setup colo-compare.
+ */
+static void colo_compare_complete(UserCreatable *uc, Error **errp)
+{
+    CompareState *s = COLO_COMPARE(uc);
+
+    if (!s->pri_indev || !s->sec_indev || !s->outdev) {
+        error_setg(errp, "colo compare needs 'primary_in' ,"
+                   "'secondary_in','outdev' property set");
+        return;
+    } else if (!strcmp(s->pri_indev, s->outdev) ||
+               !strcmp(s->sec_indev, s->outdev) ||
+               !strcmp(s->pri_indev, s->sec_indev)) {
+        error_setg(errp, "'indev' and 'outdev' could not be same "
+                   "for compare module");
+        return;
+    }
+
+    s->chr_pri_in = qemu_chr_find(s->pri_indev);
+    if (s->chr_pri_in == NULL) {
+        error_setg(errp, "Primary IN Device '%s' not found",
+                   s->pri_indev);
+        return;
+    }
+
+    s->chr_sec_in = qemu_chr_find(s->sec_indev);
+    if (s->chr_sec_in == NULL) {
+        error_setg(errp, "Secondary IN Device '%s' not found",
+                   s->sec_indev);
+        return;
+    }
+
+    s->chr_out = qemu_chr_find(s->outdev);
+    if (s->chr_out == NULL) {
+        error_setg(errp, "OUT Device '%s' not found", s->outdev);
+        return;
+    }
+
+    qemu_chr_fe_claim_no_fail(s->chr_pri_in);
+
+    qemu_chr_fe_claim_no_fail(s->chr_sec_in);
+
+    qemu_chr_fe_claim_no_fail(s->chr_out);
+    QTAILQ_INSERT_TAIL(&net_compares, s, next);
+
+    net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize);
+    net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize);
+
+    return;
+}
+
+static void colo_compare_class_init(ObjectClass *oc, void *data)
+{
+    UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
+
+    ucc->complete = colo_compare_complete;
+}
+
+static void colo_compare_init(Object *obj)
+{
+    object_property_add_str(obj, "primary_in",
+                            compare_get_pri_indev, compare_set_pri_indev,
+                            NULL);
+    object_property_add_str(obj, "secondary_in",
+                            compare_get_sec_indev, compare_set_sec_indev,
+                            NULL);
+    object_property_add_str(obj, "outdev",
+                            compare_get_outdev, compare_set_outdev,
+                            NULL);
+}
+
+static void colo_compare_finalize(Object *obj)
+{
+    CompareState *s = COLO_COMPARE(obj);
+
+    if (s->chr_pri_in) {
+        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
+        qemu_chr_fe_release(s->chr_pri_in);
+    }
+    if (s->chr_sec_in) {
+        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
+        qemu_chr_fe_release(s->chr_sec_in);
+    }
+    if (s->chr_out) {
+        qemu_chr_fe_release(s->chr_out);
+    }
+
+    if (!QTAILQ_EMPTY(&net_compares)) {
+        QTAILQ_REMOVE(&net_compares, s, next);
+    }
+
+    g_free(s->pri_indev);
+    g_free(s->sec_indev);
+    g_free(s->outdev);
+}
+
+static const TypeInfo colo_compare_info = {
+    .name = TYPE_COLO_COMPARE,
+    .parent = TYPE_OBJECT,
+    .instance_size = sizeof(CompareState),
+    .instance_init = colo_compare_init,
+    .instance_finalize = colo_compare_finalize,
+    .class_size = sizeof(CompareClass),
+    .class_init = colo_compare_class_init,
+    .interfaces = (InterfaceInfo[]) {
+        { TYPE_USER_CREATABLE },
+        { }
+    }
+};
+
+static void register_types(void)
+{
+    type_register_static(&colo_compare_info);
+}
+
+type_init(register_types);
diff --git a/qemu-options.hx b/qemu-options.hx
index 587de8f..14bade5 100644
--- a/qemu-options.hx
+++ b/qemu-options.hx
@@ -3866,6 +3866,40 @@ Dump the network traffic on netdev @var{dev} to the file specified by
 The file format is libpcap, so it can be analyzed with tools such as tcpdump
 or Wireshark.
 
+@item -object colo-compare,id=@var{id},primary_in=@var{chardevid},secondary_in=@var{chardevid},
+outdev=@var{chardevid}
+
+Colo-compare gets packet from primary_in@var{chardevid} and secondary_in@var{chardevid},
+and outputs to outdev@var{chardevid}, we can use it with the help of filter-mirror and filter-redirector.
+
+The simple usage:
+
+@example
+
+primary:
+-netdev tap,id=hn0,vhost=off,script=/etc/qemu-ifup,downscript=/etc/qemu-ifdown
+-device e1000,id=e0,netdev=hn0,mac=52:a4:00:12:78:66
+-chardev socket,id=mirror0,host=3.3.3.3,port=9003,server,nowait
+-chardev socket,id=compare1,host=3.3.3.3,port=9004,server,nowait
+-chardev socket,id=compare0,host=3.3.3.3,port=9001,server,nowait
+-chardev socket,id=compare0-0,host=3.3.3.3,port=9001
+-chardev socket,id=compare_out,host=3.3.3.3,port=9005,server,nowait
+-chardev socket,id=compare_out0,host=3.3.3.3,port=9005
+-object filter-mirror,id=m0,netdev=hn0,queue=tx,outdev=mirror0
+-object filter-redirector,netdev=hn0,id=redire0,queue=rx,indev=compare_out
+-object filter-redirector,netdev=hn0,id=redire1,queue=rx,outdev=compare0
+-object colo-compare,id=comp0,primary_in=compare0-0,secondary_in=compare1,outdev=compare_out0
+
+secondary:
+-netdev tap,id=hn0,vhost=off,script=/etc/qemu-ifup,down script=/etc/qemu-ifdown
+-device e1000,netdev=hn0,mac=52:a4:00:12:78:66
+-chardev socket,id=red0,host=3.3.3.3,port=9003
+-chardev socket,id=red1,host=3.3.3.3,port=9004
+-object filter-redirector,id=f1,netdev=hn0,queue=tx,indev=red0
+-object filter-redirector,id=f2,netdev=hn0,queue=rx,outdev=red1
+
+@end example
+
 @item -object secret,id=@var{id},data=@var{string},format=@var{raw|base64}[,keyid=@var{secretid},iv=@var{string}]
 @item -object secret,id=@var{id},file=@var{filename},format=@var{raw|base64}[,keyid=@var{secretid},iv=@var{string}]
 
diff --git a/vl.c b/vl.c
index cbe51ac..c6b9a6f 100644
--- a/vl.c
+++ b/vl.c
@@ -2865,7 +2865,8 @@ static bool object_create_initial(const char *type)
     if (g_str_equal(type, "filter-buffer") ||
         g_str_equal(type, "filter-dump") ||
         g_str_equal(type, "filter-mirror") ||
-        g_str_equal(type, "filter-redirector")) {
+        g_str_equal(type, "filter-redirector") ||
+        g_str_equal(type, "colo-compare")) {
         return false;
     }
 
-- 
2.7.4

^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [Qemu-devel] [RFC PATCH V5 2/4] colo-compare: track connection and enqueue packet
  2016-06-23 11:34 [Qemu-devel] [RFC PATCH V5 0/4] Introduce COLO-compare Zhang Chen
  2016-06-23 11:34 ` [Qemu-devel] [RFC PATCH V5 1/4] colo-compare: introduce colo compare initialization Zhang Chen
@ 2016-06-23 11:34 ` Zhang Chen
  2016-07-08  4:07   ` Jason Wang
  2016-06-23 11:34 ` [Qemu-devel] [RFC PATCH V5 3/4] colo-compare: introduce packet comparison thread Zhang Chen
                   ` (2 subsequent siblings)
  4 siblings, 1 reply; 21+ messages in thread
From: Zhang Chen @ 2016-06-23 11:34 UTC (permalink / raw)
  To: qemu devel, Jason Wang
  Cc: Zhang Chen, Li Zhijian, Wen Congyang, zhanghailiang,
	eddie . dong, Dr . David Alan Gilbert

In this patch we use kernel jhash table to track
connection, and then enqueue net packet like this:

+ CompareState ++
|               |
+---------------+   +---------------+         +---------------+
|conn list      +--->conn           +--------->conn           |
+---------------+   +---------------+         +---------------+
|               |     |           |             |          |
+---------------+ +---v----+  +---v----+    +---v----+ +---v----+
                  |primary |  |secondary    |primary | |secondary
                  |packet  |  |packet  +    |packet  | |packet  +
                  +--------+  +--------+    +--------+ +--------+
                      |           |             |          |
                  +---v----+  +---v----+    +---v----+ +---v----+
                  |primary |  |secondary    |primary | |secondary
                  |packet  |  |packet  +    |packet  | |packet  +
                  +--------+  +--------+    +--------+ +--------+
                      |           |             |          |
                  +---v----+  +---v----+    +---v----+ +---v----+
                  |primary |  |secondary    |primary | |secondary
                  |packet  |  |packet  +    |packet  | |packet  +
                  +--------+  +--------+    +--------+ +--------+

Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
---
 include/qemu/jhash.h |  61 ++++++++++++++++
 net/Makefile.objs    |   1 +
 net/colo-base.c      | 194 +++++++++++++++++++++++++++++++++++++++++++++++++++
 net/colo-base.h      |  88 +++++++++++++++++++++++
 net/colo-compare.c   | 138 +++++++++++++++++++++++++++++++++++-
 trace-events         |   3 +
 6 files changed, 483 insertions(+), 2 deletions(-)
 create mode 100644 include/qemu/jhash.h
 create mode 100644 net/colo-base.c
 create mode 100644 net/colo-base.h

diff --git a/include/qemu/jhash.h b/include/qemu/jhash.h
new file mode 100644
index 0000000..0fcd875
--- /dev/null
+++ b/include/qemu/jhash.h
@@ -0,0 +1,61 @@
+/* jhash.h: Jenkins hash support.
+  *
+  * Copyright (C) 2006. Bob Jenkins (bob_jenkins@burtleburtle.net)
+  *
+  * http://burtleburtle.net/bob/hash/
+  *
+  * These are the credits from Bob's sources:
+  *
+  * lookup3.c, by Bob Jenkins, May 2006, Public Domain.
+  *
+  * These are functions for producing 32-bit hashes for hash table lookup.
+  * hashword(), hashlittle(), hashlittle2(), hashbig(), mix(), and final()
+  * are externally useful functions.  Routines to test the hash are
+included
+  * if SELF_TEST is defined.  You can use this free for any purpose.
+It's in
+  * the public domain.  It has no warranty.
+  *
+  * Copyright (C) 2009-2010 Jozsef Kadlecsik (kadlec@blackhole.kfki.hu)
+  *
+  * I've modified Bob's hash to be useful in the Linux kernel, and
+  * any bugs present are my fault.
+  * Jozsef
+  */
+
+#ifndef QEMU_JHASH_H__
+#define QEMU_JHASH_H__
+
+#include "qemu/bitops.h"
+
+/*
+ * hashtable relation copy from linux kernel jhash
+ */
+
+/* __jhash_mix -- mix 3 32-bit values reversibly. */
+#define __jhash_mix(a, b, c)                \
+{                                           \
+    a -= c;  a ^= rol32(c, 4);  c += b;     \
+    b -= a;  b ^= rol32(a, 6);  a += c;     \
+    c -= b;  c ^= rol32(b, 8);  b += a;     \
+    a -= c;  a ^= rol32(c, 16); c += b;     \
+    b -= a;  b ^= rol32(a, 19); a += c;     \
+    c -= b;  c ^= rol32(b, 4);  b += a;     \
+}
+
+/* __jhash_final - final mixing of 3 32-bit values (a,b,c) into c */
+#define __jhash_final(a, b, c)  \
+{                               \
+    c ^= b; c -= rol32(b, 14);  \
+    a ^= c; a -= rol32(c, 11);  \
+    b ^= a; b -= rol32(a, 25);  \
+    c ^= b; c -= rol32(b, 16);  \
+    a ^= c; a -= rol32(c, 4);   \
+    b ^= a; b -= rol32(a, 14);  \
+    c ^= b; c -= rol32(b, 24);  \
+}
+
+/* An arbitrary initial parameter */
+#define JHASH_INITVAL           0xdeadbeef
+
+#endif /* QEMU_JHASH_H__ */
diff --git a/net/Makefile.objs b/net/Makefile.objs
index ba92f73..119589f 100644
--- a/net/Makefile.objs
+++ b/net/Makefile.objs
@@ -17,3 +17,4 @@ common-obj-y += filter.o
 common-obj-y += filter-buffer.o
 common-obj-y += filter-mirror.o
 common-obj-y += colo-compare.o
+common-obj-y += colo-base.o
diff --git a/net/colo-base.c b/net/colo-base.c
new file mode 100644
index 0000000..7e263e8
--- /dev/null
+++ b/net/colo-base.c
@@ -0,0 +1,194 @@
+/*
+ * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
+ * (a.k.a. Fault Tolerance or Continuous Replication)
+ *
+ * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
+ * Copyright (c) 2016 FUJITSU LIMITED
+ * Copyright (c) 2016 Intel Corporation
+ *
+ * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later.  See the COPYING file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/error-report.h"
+#include "net/colo-base.h"
+
+uint32_t connection_key_hash(const void *opaque)
+{
+    const ConnectionKey *key = opaque;
+    uint32_t a, b, c;
+
+    /* Jenkins hash */
+    a = b = c = JHASH_INITVAL + sizeof(*key);
+    a += key->src.s_addr;
+    b += key->dst.s_addr;
+    c += (key->src_port | key->dst_port << 16);
+    __jhash_mix(a, b, c);
+
+    a += key->ip_proto;
+    __jhash_final(a, b, c);
+
+    return c;
+}
+
+int connection_key_equal(const void *key1, const void *key2)
+{
+    return memcmp(key1, key2, sizeof(ConnectionKey)) == 0;
+}
+
+int parse_packet_early(Packet *pkt)
+{
+    int network_length;
+    uint8_t *data = pkt->data;
+    uint16_t l3_proto;
+    ssize_t l2hdr_len = eth_get_l2_hdr_length(data);
+
+    if (pkt->size < ETH_HLEN) {
+        error_report("pkt->size < ETH_HLEN");
+        return 1;
+    }
+    pkt->network_layer = data + ETH_HLEN;
+    l3_proto = eth_get_l3_proto(data, l2hdr_len);
+    if (l3_proto != ETH_P_IP) {
+        return 1;
+    }
+
+    network_length = pkt->ip->ip_hl * 4;
+    if (pkt->size < ETH_HLEN + network_length) {
+        error_report("pkt->size < network_layer + network_length");
+        return 1;
+    }
+    pkt->transport_layer = pkt->network_layer + network_length;
+    if (!pkt->transport_layer) {
+        error_report("pkt->transport_layer is valid");
+        return 1;
+    }
+
+    return 0;
+}
+
+void fill_connection_key(Packet *pkt, ConnectionKey *key, int mode)
+{
+    uint32_t tmp_ports;
+
+    key->ip_proto = pkt->ip->ip_p;
+
+    switch (key->ip_proto) {
+    case IPPROTO_TCP:
+    case IPPROTO_UDP:
+    case IPPROTO_DCCP:
+    case IPPROTO_ESP:
+    case IPPROTO_SCTP:
+    case IPPROTO_UDPLITE:
+        tmp_ports = *(uint32_t *)(pkt->transport_layer);
+        if (mode) {
+            key->src = pkt->ip->ip_src;
+            key->dst = pkt->ip->ip_dst;
+            key->src_port = ntohs(tmp_ports & 0xffff);
+            key->dst_port = ntohs(tmp_ports >> 16);
+        } else {
+            key->dst = pkt->ip->ip_src;
+            key->src = pkt->ip->ip_dst;
+            key->dst_port = ntohs(tmp_ports & 0xffff);
+            key->src_port = ntohs(tmp_ports >> 16);
+        }
+        break;
+    case IPPROTO_AH:
+        tmp_ports = *(uint32_t *)(pkt->transport_layer + 4);
+        if (mode) {
+            key->src = pkt->ip->ip_src;
+            key->dst = pkt->ip->ip_dst;
+            key->src_port = ntohs(tmp_ports & 0xffff);
+            key->dst_port = ntohs(tmp_ports >> 16);
+        } else {
+            key->dst = pkt->ip->ip_src;
+            key->src = pkt->ip->ip_dst;
+            key->dst_port = ntohs(tmp_ports & 0xffff);
+            key->src_port = ntohs(tmp_ports >> 16);
+        }
+        break;
+    default:
+        key->src_port = 0;
+        key->dst_port = 0;
+        break;
+    }
+}
+
+Connection *connection_new(ConnectionKey *key)
+{
+    Connection *conn = g_slice_new(Connection);
+
+    conn->ip_proto = key->ip_proto;
+    conn->processing = false;
+    g_queue_init(&conn->primary_list);
+    g_queue_init(&conn->secondary_list);
+
+    return conn;
+}
+
+void connection_destroy(void *opaque)
+{
+    Connection *conn = opaque;
+
+    g_queue_foreach(&conn->primary_list, packet_destroy, NULL);
+    g_queue_free(&conn->primary_list);
+    g_queue_foreach(&conn->secondary_list, packet_destroy, NULL);
+    g_queue_free(&conn->secondary_list);
+    g_slice_free(Connection, conn);
+}
+
+Packet *packet_new(const void *data, int size)
+{
+    Packet *pkt = g_slice_new(Packet);
+
+    pkt->data = g_memdup(data, size);
+    pkt->size = size;
+
+    return pkt;
+}
+
+void packet_destroy(void *opaque, void *user_data)
+{
+    Packet *pkt = opaque;
+
+    g_free(pkt->data);
+    g_slice_free(Packet, pkt);
+}
+
+/*
+ * Clear hashtable, stop this hash growing really huge
+ */
+void connection_hashtable_reset(GHashTable *connection_track_table)
+{
+    g_hash_table_remove_all(connection_track_table);
+}
+
+/* if not found, create a new connection and add to hash table */
+Connection *connection_get(GHashTable *connection_track_table,
+                           ConnectionKey *key,
+                           uint32_t *hashtable_size)
+{
+    /* FIXME: protect connection_track_table */
+    Connection *conn = g_hash_table_lookup(connection_track_table, key);
+
+    if (conn == NULL) {
+        ConnectionKey *new_key = g_memdup(key, sizeof(*key));
+
+        conn = connection_new(key);
+
+        (*hashtable_size) += 1;
+        if (*hashtable_size > HASHTABLE_MAX_SIZE) {
+            error_report("colo proxy connection hashtable full, clear it");
+            connection_hashtable_reset(connection_track_table);
+            *hashtable_size = 0;
+            /* TODO:clear conn_list */
+        }
+
+        g_hash_table_insert(connection_track_table, new_key, conn);
+    }
+
+    return conn;
+}
diff --git a/net/colo-base.h b/net/colo-base.h
new file mode 100644
index 0000000..01c1a5d
--- /dev/null
+++ b/net/colo-base.h
@@ -0,0 +1,88 @@
+/*
+ * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
+ * (a.k.a. Fault Tolerance or Continuous Replication)
+ *
+ * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
+ * Copyright (c) 2016 FUJITSU LIMITED
+ * Copyright (c) 2016 Intel Corporation
+ *
+ * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later.  See the COPYING file in the top-level directory.
+ */
+
+#ifndef QEMU_COLO_BASE_H
+#define QEMU_COLO_BASE_H
+
+#include "slirp/slirp.h"
+#include "qemu/jhash.h"
+#include "qemu/rcu.h"
+
+#define HASHTABLE_MAX_SIZE 16384
+
+typedef enum colo_conn_state {
+     COLO_CONN_IDLE,
+
+    /* States on the primary: For incoming connection */
+     COLO_CONN_PRI_IN_SYN,   /* Received Syn */
+     COLO_CONN_PRI_IN_PSYNACK, /* Received syn/ack from primary, but not
+                                yet from secondary */
+     COLO_CONN_PRI_IN_SSYNACK, /* Received syn/ack from secondary, but
+                                  not yet from primary */
+     COLO_CONN_PRI_IN_SYNACK,  /* Received syn/ack from both */
+     COLO_CONN_PRI_IN_ESTABLISHED, /* Got the ACK */
+
+    /* States on the secondary: For incoming connection */
+     COLO_CONN_SEC_IN_SYNACK,      /* We sent a syn/ack */
+     COLO_CONN_SEC_IN_ACK,         /* Saw the ack but didn't yet see our syn/ack */
+     COLO_CONN_SEC_IN_ESTABLISHED, /* Got the ACK from the outside */
+} colo_conn_state;
+
+typedef struct Packet {
+    void *data;
+    union {
+        uint8_t *network_layer;
+        struct ip *ip;
+    };
+    uint8_t *transport_layer;
+    int size;
+} Packet;
+
+typedef struct ConnectionKey {
+    /* (src, dst) must be grouped, in the same way than in IP header */
+    struct in_addr src;
+    struct in_addr dst;
+    uint16_t src_port;
+    uint16_t dst_port;
+    uint8_t ip_proto;
+} QEMU_PACKED ConnectionKey;
+
+typedef struct Connection {
+    /* connection primary send queue: element type: Packet */
+    GQueue primary_list;
+    /* connection secondary send queue: element type: Packet */
+    GQueue secondary_list;
+    /* flag to enqueue unprocessed_connections */
+    bool processing;
+    uint8_t ip_proto;
+    /* be used by filter-rewriter */
+    colo_conn_state state;
+    tcp_seq  primary_seq;
+    tcp_seq  secondary_seq;
+} Connection;
+
+uint32_t connection_key_hash(const void *opaque);
+int connection_key_equal(const void *opaque1, const void *opaque2);
+int parse_packet_early(Packet *pkt);
+void fill_connection_key(Packet *pkt, ConnectionKey *key, int mode);
+Connection *connection_new(ConnectionKey *key);
+void connection_destroy(void *opaque);
+Connection *connection_get(GHashTable *connection_track_table,
+                           ConnectionKey *key,
+                           uint32_t *hashtable_size);
+void connection_hashtable_reset(GHashTable *connection_track_table);
+Packet *packet_new(const void *data, int size);
+void packet_destroy(void *opaque, void *user_data);
+
+#endif /* QEMU_COLO_BASE_H */
diff --git a/net/colo-compare.c b/net/colo-compare.c
index a3e1456..4231fe7 100644
--- a/net/colo-compare.c
+++ b/net/colo-compare.c
@@ -28,6 +28,7 @@
 #include "qemu/sockets.h"
 #include "qapi-visit.h"
 #include "trace.h"
+#include "net/colo-base.h"
 
 #define TYPE_COLO_COMPARE "colo-compare"
 #define COLO_COMPARE(obj) \
@@ -38,6 +39,28 @@
 static QTAILQ_HEAD(, CompareState) net_compares =
        QTAILQ_HEAD_INITIALIZER(net_compares);
 
+/*
+  + CompareState ++
+  |               |
+  +---------------+   +---------------+         +---------------+
+  |conn list      +--->conn           +--------->conn           |
+  +---------------+   +---------------+         +---------------+
+  |               |     |           |             |          |
+  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
+                    |primary |  |secondary    |primary | |secondary
+                    |packet  |  |packet  +    |packet  | |packet  +
+                    +--------+  +--------+    +--------+ +--------+
+                        |           |             |          |
+                    +---v----+  +---v----+    +---v----+ +---v----+
+                    |primary |  |secondary    |primary | |secondary
+                    |packet  |  |packet  +    |packet  | |packet  +
+                    +--------+  +--------+    +--------+ +--------+
+                        |           |             |          |
+                    +---v----+  +---v----+    +---v----+ +---v----+
+                    |primary |  |secondary    |primary | |secondary
+                    |packet  |  |packet  +    |packet  | |packet  +
+                    +--------+  +--------+    +--------+ +--------+
+*/
 typedef struct CompareState {
     Object parent;
 
@@ -50,12 +73,103 @@ typedef struct CompareState {
     QTAILQ_ENTRY(CompareState) next;
     SocketReadState pri_rs;
     SocketReadState sec_rs;
+
+    /* connection list: the connections belonged to this NIC could be found
+     * in this list.
+     * element type: Connection
+     */
+    GQueue conn_list;
+    QemuMutex conn_list_lock; /* to protect conn_list */
+    /* hashtable to save connection */
+    GHashTable *connection_track_table;
+    /* to save unprocessed_connections */
+    GQueue unprocessed_connections;
+    /* proxy current hash size */
+    uint32_t hashtable_size;
 } CompareState;
 
 typedef struct CompareClass {
     ObjectClass parent_class;
 } CompareClass;
 
+enum {
+    PRIMARY_IN = 0,
+    SECONDARY_IN,
+};
+
+static int compare_chr_send(CharDriverState *out,
+                            const uint8_t *buf,
+                            uint32_t size);
+
+/*
+ * Return 0 on success, if return -1 means the pkt
+ * is unsupported(arp and ipv6) and will be sent later
+ */
+static int packet_enqueue(CompareState *s, int mode)
+{
+    ConnectionKey key = {{ 0 } };
+    Packet *pkt = NULL;
+    Connection *conn;
+
+    if (mode == PRIMARY_IN) {
+        pkt = packet_new(s->pri_rs.buf, s->pri_rs.packet_len);
+    } else {
+        pkt = packet_new(s->sec_rs.buf, s->sec_rs.packet_len);
+    }
+
+    if (parse_packet_early(pkt)) {
+        packet_destroy(pkt, NULL);
+        pkt = NULL;
+        return -1;
+    }
+    fill_connection_key(pkt, &key, PRIMARY_IN);
+
+    conn = connection_get(s->connection_track_table,
+                          &key,
+                          &s->hashtable_size);
+    if (!conn->processing) {
+        qemu_mutex_lock(&s->conn_list_lock);
+        g_queue_push_tail(&s->conn_list, conn);
+        qemu_mutex_unlock(&s->conn_list_lock);
+        conn->processing = true;
+    }
+
+    if (mode == PRIMARY_IN) {
+        g_queue_push_tail(&conn->primary_list, pkt);
+    } else {
+        g_queue_push_tail(&conn->secondary_list, pkt);
+    }
+
+    return 0;
+}
+
+static int compare_chr_send(CharDriverState *out,
+                            const uint8_t *buf,
+                            uint32_t size)
+{
+    int ret = 0;
+    uint32_t len = htonl(size);
+
+    if (!size) {
+        return 0;
+    }
+
+    ret = qemu_chr_fe_write_all(out, (uint8_t *)&len, sizeof(len));
+    if (ret != sizeof(len)) {
+        goto err;
+    }
+
+    ret = qemu_chr_fe_write_all(out, (uint8_t *)buf, size);
+    if (ret != size) {
+        goto err;
+    }
+
+    return 0;
+
+err:
+    return ret < 0 ? ret : -EIO;
+}
+
 static char *compare_get_pri_indev(Object *obj, Error **errp)
 {
     CompareState *s = COLO_COMPARE(obj);
@@ -103,12 +217,21 @@ static void compare_set_outdev(Object *obj, const char *value, Error **errp)
 
 static void compare_pri_rs_finalize(SocketReadState *pri_rs)
 {
-    /* if packet_enqueue pri pkt failed we will send unsupported packet */
+    CompareState *s = container_of(pri_rs, CompareState, pri_rs);
+
+    if (packet_enqueue(s, PRIMARY_IN)) {
+        trace_colo_compare_main("primary: unsupported packet in");
+        compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len);
+    }
 }
 
 static void compare_sec_rs_finalize(SocketReadState *sec_rs)
 {
-    /* if packet_enqueue sec pkt failed we will notify trace */
+    CompareState *s = container_of(sec_rs, CompareState, sec_rs);
+
+    if (packet_enqueue(s, SECONDARY_IN)) {
+        trace_colo_compare_main("secondary: unsupported packet in");
+    }
 }
 
 /*
@@ -161,6 +284,15 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
     net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize);
     net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize);
 
+    g_queue_init(&s->conn_list);
+    qemu_mutex_init(&s->conn_list_lock);
+    s->hashtable_size = 0;
+
+    s->connection_track_table = g_hash_table_new_full(connection_key_hash,
+                                                      connection_key_equal,
+                                                      g_free,
+                                                      connection_destroy);
+
     return;
 }
 
@@ -203,6 +335,8 @@ static void colo_compare_finalize(Object *obj)
     if (!QTAILQ_EMPTY(&net_compares)) {
         QTAILQ_REMOVE(&net_compares, s, next);
     }
+    qemu_mutex_destroy(&s->conn_list_lock);
+    g_queue_free(&s->conn_list);
 
     g_free(s->pri_indev);
     g_free(s->sec_indev);
diff --git a/trace-events b/trace-events
index ca7211b..703de1a 100644
--- a/trace-events
+++ b/trace-events
@@ -1916,3 +1916,6 @@ aspeed_vic_update_fiq(int flags) "Raising FIQ: %d"
 aspeed_vic_update_irq(int flags) "Raising IRQ: %d"
 aspeed_vic_read(uint64_t offset, unsigned size, uint32_t value) "From 0x%" PRIx64 " of size %u: 0x%" PRIx32
 aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To 0x%" PRIx64 " of size %u: 0x%" PRIx32
+
+# net/colo-compare.c
+colo_compare_main(const char *chr) ": %s"
-- 
2.7.4

^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [Qemu-devel] [RFC PATCH V5 3/4] colo-compare: introduce packet comparison thread
  2016-06-23 11:34 [Qemu-devel] [RFC PATCH V5 0/4] Introduce COLO-compare Zhang Chen
  2016-06-23 11:34 ` [Qemu-devel] [RFC PATCH V5 1/4] colo-compare: introduce colo compare initialization Zhang Chen
  2016-06-23 11:34 ` [Qemu-devel] [RFC PATCH V5 2/4] colo-compare: track connection and enqueue packet Zhang Chen
@ 2016-06-23 11:34 ` Zhang Chen
  2016-07-08  4:23   ` Jason Wang
  2016-06-23 11:34 ` [Qemu-devel] [RFC PATCH V5 4/4] colo-compare: add TCP, UDP, ICMP packet comparison Zhang Chen
  2016-07-07  7:47 ` [Qemu-devel] [RFC PATCH V5 0/4] Introduce COLO-compare Zhang Chen
  4 siblings, 1 reply; 21+ messages in thread
From: Zhang Chen @ 2016-06-23 11:34 UTC (permalink / raw)
  To: qemu devel, Jason Wang
  Cc: Zhang Chen, Li Zhijian, Wen Congyang, zhanghailiang,
	eddie . dong, Dr . David Alan Gilbert

if packets are same, we send primary packet and drop secondary
packet, otherwise notify COLO do checkpoint.

Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
---
 net/colo-base.c    |   1 +
 net/colo-base.h    |   3 +
 net/colo-compare.c | 214 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 trace-events       |   2 +
 4 files changed, 220 insertions(+)

diff --git a/net/colo-base.c b/net/colo-base.c
index 7e263e8..9673661 100644
--- a/net/colo-base.c
+++ b/net/colo-base.c
@@ -146,6 +146,7 @@ Packet *packet_new(const void *data, int size)
 
     pkt->data = g_memdup(data, size);
     pkt->size = size;
+    pkt->creation_ms = qemu_clock_get_ms(QEMU_CLOCK_HOST);
 
     return pkt;
 }
diff --git a/net/colo-base.h b/net/colo-base.h
index 01c1a5d..8bb1043 100644
--- a/net/colo-base.h
+++ b/net/colo-base.h
@@ -18,6 +18,7 @@
 #include "slirp/slirp.h"
 #include "qemu/jhash.h"
 #include "qemu/rcu.h"
+#include "qemu/timer.h"
 
 #define HASHTABLE_MAX_SIZE 16384
 
@@ -47,6 +48,8 @@ typedef struct Packet {
     };
     uint8_t *transport_layer;
     int size;
+    /* Time of packet creation, in wall clock ms */
+    int64_t creation_ms;
 } Packet;
 
 typedef struct ConnectionKey {
diff --git a/net/colo-compare.c b/net/colo-compare.c
index 4231fe7..928d729 100644
--- a/net/colo-compare.c
+++ b/net/colo-compare.c
@@ -35,6 +35,8 @@
     OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
 
 #define COMPARE_READ_LEN_MAX NET_BUFSIZE
+/* TODO: Should be configurable */
+#define REGULAR_CHECK_MS 400
 
 static QTAILQ_HEAD(, CompareState) net_compares =
        QTAILQ_HEAD_INITIALIZER(net_compares);
@@ -86,6 +88,11 @@ typedef struct CompareState {
     GQueue unprocessed_connections;
     /* proxy current hash size */
     uint32_t hashtable_size;
+    /* compare thread, a thread for each NIC */
+    QemuThread thread;
+    int thread_status;
+    /* Timer used on the primary to find packets that are never matched */
+    QEMUTimer *timer;
 } CompareState;
 
 typedef struct CompareClass {
@@ -97,6 +104,15 @@ enum {
     SECONDARY_IN,
 };
 
+enum {
+    /* compare thread isn't started */
+    COMPARE_THREAD_NONE,
+    /* compare thread is running */
+    COMPARE_THREAD_RUNNING,
+    /* compare thread exit */
+    COMPARE_THREAD_EXIT,
+};
+
 static int compare_chr_send(CharDriverState *out,
                             const uint8_t *buf,
                             uint32_t size);
@@ -143,6 +159,98 @@ static int packet_enqueue(CompareState *s, int mode)
     return 0;
 }
 
+/*
+ * The IP packets sent by primary and secondary
+ * will be compared in here
+ * TODO support ip fragment, Out-Of-Order
+ * return:    0  means packet same
+ *            > 0 || < 0 means packet different
+ */
+static int colo_packet_compare(Packet *ppkt, Packet *spkt)
+{
+    trace_colo_compare_ip_info(ppkt->size, inet_ntoa(ppkt->ip->ip_src),
+                               inet_ntoa(ppkt->ip->ip_dst), spkt->size,
+                               inet_ntoa(spkt->ip->ip_src),
+                               inet_ntoa(spkt->ip->ip_dst));
+
+    if (ppkt->size == spkt->size) {
+        return memcmp(ppkt->data, spkt->data, spkt->size);
+    } else {
+        return -1;
+    }
+}
+
+static int colo_packet_compare_all(Packet *spkt, Packet *ppkt)
+{
+    trace_colo_compare_main("compare all");
+    return colo_packet_compare(ppkt, spkt);
+}
+
+static void colo_old_packet_check(void *opaque_packet, void *opaque_found)
+{
+    int64_t now;
+    bool *found_old = (bool *)opaque_found;
+    Packet *ppkt = (Packet *)opaque_packet;
+
+    if (*found_old) {
+        /* Someone found an old packet earlier in the queue */
+        return;
+    }
+
+    now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
+    if ((ppkt->creation_ms < now) &&
+        ((now - ppkt->creation_ms) > REGULAR_CHECK_MS)) {
+        trace_colo_old_packet_check_found(ppkt->creation_ms);
+        *found_old = true;
+    }
+}
+
+/*
+ * called from the compare thread on the primary
+ * for compare connection
+ */
+static void colo_compare_connection(void *opaque, void *user_data)
+{
+    CompareState *s = user_data;
+    Connection *conn = opaque;
+    Packet *pkt = NULL;
+    GList *result = NULL;
+    bool found_old;
+    int ret;
+
+    while (!g_queue_is_empty(&conn->primary_list) &&
+           !g_queue_is_empty(&conn->secondary_list)) {
+        pkt = g_queue_pop_tail(&conn->primary_list);
+        result = g_queue_find_custom(&conn->secondary_list,
+                              pkt, (GCompareFunc)colo_packet_compare_all);
+
+        if (result) {
+            ret = compare_chr_send(s->chr_out, pkt->data, pkt->size);
+            if (ret < 0) {
+                error_report("colo_send_primary_packet failed");
+            }
+            trace_colo_compare_main("packet same and release packet");
+            g_queue_remove(&conn->secondary_list, result->data);
+        } else {
+            trace_colo_compare_main("packet different");
+            g_queue_push_tail(&conn->primary_list, pkt);
+            /* TODO: colo_notify_checkpoint();*/
+            break;
+        }
+    }
+
+    /*
+     * Look for old packets that the secondary hasn't matched,
+     * if we have some then we have to checkpoint to wake
+     * the secondary up.
+     */
+    found_old = false;
+    g_queue_foreach(&conn->primary_list, colo_old_packet_check, &found_old);
+    if (found_old) {
+        /* TODO: colo_notify_checkpoint();*/
+    }
+}
+
 static int compare_chr_send(CharDriverState *out,
                             const uint8_t *buf,
                             uint32_t size)
@@ -170,6 +278,69 @@ err:
     return ret < 0 ? ret : -EIO;
 }
 
+static int compare_chr_can_read(void *opaque)
+{
+    return COMPARE_READ_LEN_MAX;
+}
+
+/*
+ * called from the main thread on the primary for packets
+ * arriving over the socket from the primary.
+ */
+static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
+{
+    CompareState *s = COLO_COMPARE(opaque);
+    int ret;
+
+    ret = net_fill_rstate(&s->pri_rs, buf, size);
+    if (ret == -1) {
+        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
+        error_report("colo-compare primary_in error");
+    }
+}
+
+/*
+ * called from the main thread on the primary for packets
+ * arriving over the socket from the secondary.
+ */
+static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
+{
+    CompareState *s = COLO_COMPARE(opaque);
+    int ret;
+
+    ret = net_fill_rstate(&s->sec_rs, buf, size);
+    if (ret == -1) {
+        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
+        error_report("colo-compare secondary_in error");
+    }
+}
+
+static void *colo_compare_thread(void *opaque)
+{
+    GMainContext *worker_context;
+    GMainLoop *compare_loop;
+    CompareState *s = opaque;
+
+    worker_context = g_main_context_new();
+    g_assert(g_main_context_get_thread_default() == NULL);
+    g_main_context_push_thread_default(worker_context);
+    g_assert(g_main_context_get_thread_default() == worker_context);
+
+    qemu_chr_add_handlers(s->chr_pri_in, compare_chr_can_read,
+                          compare_pri_chr_in, NULL, s);
+    qemu_chr_add_handlers(s->chr_sec_in, compare_chr_can_read,
+                          compare_sec_chr_in, NULL, s);
+
+    compare_loop = g_main_loop_new(worker_context, FALSE);
+
+    g_main_loop_run(compare_loop);
+
+    g_main_loop_unref(compare_loop);
+    g_main_context_pop_thread_default(worker_context);
+    g_main_context_unref(worker_context);
+    return NULL;
+}
+
 static char *compare_get_pri_indev(Object *obj, Error **errp)
 {
     CompareState *s = COLO_COMPARE(obj);
@@ -222,6 +393,9 @@ static void compare_pri_rs_finalize(SocketReadState *pri_rs)
     if (packet_enqueue(s, PRIMARY_IN)) {
         trace_colo_compare_main("primary: unsupported packet in");
         compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len);
+    } else {
+        /* compare connection */
+        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
     }
 }
 
@@ -231,16 +405,35 @@ static void compare_sec_rs_finalize(SocketReadState *sec_rs)
 
     if (packet_enqueue(s, SECONDARY_IN)) {
         trace_colo_compare_main("secondary: unsupported packet in");
+    } else {
+        /* compare connection */
+        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
     }
 }
 
 /*
+ * Prod the compare thread regularly so it can watch for any packets
+ * that the secondary hasn't produced equivalents of.
+ */
+static void colo_compare_regular(void *opaque)
+{
+    CompareState *s = opaque;
+
+    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
+                        REGULAR_CHECK_MS);
+    /* compare connection */
+    g_queue_foreach(&s->conn_list, colo_compare_connection, s);
+}
+
+/*
  * called from the main thread on the primary
  * to setup colo-compare.
  */
 static void colo_compare_complete(UserCreatable *uc, Error **errp)
 {
     CompareState *s = COLO_COMPARE(uc);
+    char thread_name[64];
+    static int compare_id;
 
     if (!s->pri_indev || !s->sec_indev || !s->outdev) {
         error_setg(errp, "colo compare needs 'primary_in' ,"
@@ -293,6 +486,19 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
                                                       g_free,
                                                       connection_destroy);
 
+    s->thread_status = COMPARE_THREAD_RUNNING;
+    sprintf(thread_name, "compare %d", compare_id);
+    qemu_thread_create(&s->thread, thread_name,
+                       colo_compare_thread, s,
+                       QEMU_THREAD_JOINABLE);
+    compare_id++;
+
+    /* A regular timer to kick any packets that the secondary doesn't match */
+    s->timer = timer_new_ms(QEMU_CLOCK_VIRTUAL, /* Only when guest runs */
+                            colo_compare_regular, s);
+    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
+                        REGULAR_CHECK_MS);
+
     return;
 }
 
@@ -338,6 +544,14 @@ static void colo_compare_finalize(Object *obj)
     qemu_mutex_destroy(&s->conn_list_lock);
     g_queue_free(&s->conn_list);
 
+    if (s->thread.thread) {
+        s->thread_status = COMPARE_THREAD_EXIT;
+        /* compare connection */
+        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
+        qemu_thread_join(&s->thread);
+    }
+    timer_del(s->timer);
+
     g_free(s->pri_indev);
     g_free(s->sec_indev);
     g_free(s->outdev);
diff --git a/trace-events b/trace-events
index 703de1a..1537e91 100644
--- a/trace-events
+++ b/trace-events
@@ -1919,3 +1919,5 @@ aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To 0x%" PRIx64
 
 # net/colo-compare.c
 colo_compare_main(const char *chr) ": %s"
+colo_compare_ip_info(int psize, const char *sta, const char *stb, int ssize, const char *stc, const char *std) "ppkt size = %d, ip_src = %s, ip_dst = %s, spkt size = %d, ip_src = %s, ip_dst = %s"
+colo_old_packet_check_found(int64_t old_time) "%" PRId64
-- 
2.7.4

^ permalink raw reply related	[flat|nested] 21+ messages in thread

* [Qemu-devel] [RFC PATCH V5 4/4] colo-compare: add TCP, UDP, ICMP packet comparison
  2016-06-23 11:34 [Qemu-devel] [RFC PATCH V5 0/4] Introduce COLO-compare Zhang Chen
                   ` (2 preceding siblings ...)
  2016-06-23 11:34 ` [Qemu-devel] [RFC PATCH V5 3/4] colo-compare: introduce packet comparison thread Zhang Chen
@ 2016-06-23 11:34 ` Zhang Chen
  2016-07-08  8:59   ` Jason Wang
  2016-07-07  7:47 ` [Qemu-devel] [RFC PATCH V5 0/4] Introduce COLO-compare Zhang Chen
  4 siblings, 1 reply; 21+ messages in thread
From: Zhang Chen @ 2016-06-23 11:34 UTC (permalink / raw)
  To: qemu devel, Jason Wang
  Cc: Zhang Chen, Li Zhijian, Wen Congyang, zhanghailiang,
	eddie . dong, Dr . David Alan Gilbert

Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
---
 net/colo-compare.c | 171 +++++++++++++++++++++++++++++++++++++++++++++++++++--
 trace-events       |   6 ++
 2 files changed, 173 insertions(+), 4 deletions(-)

diff --git a/net/colo-compare.c b/net/colo-compare.c
index 928d729..addf704 100644
--- a/net/colo-compare.c
+++ b/net/colo-compare.c
@@ -18,6 +18,7 @@
 #include "qapi/qmp/qerror.h"
 #include "qapi/error.h"
 #include "net/net.h"
+#include "net/eth.h"
 #include "net/vhost_net.h"
 #include "qom/object_interfaces.h"
 #include "qemu/iov.h"
@@ -180,9 +181,155 @@ static int colo_packet_compare(Packet *ppkt, Packet *spkt)
     }
 }
 
-static int colo_packet_compare_all(Packet *spkt, Packet *ppkt)
+/*
+ * called from the compare thread on the primary
+ * for compare tcp packet
+ * compare_tcp copied from Dr. David Alan Gilbert's branch
+ */
+static int colo_packet_compare_tcp(Packet *spkt, Packet *ppkt)
+{
+    struct tcphdr *ptcp, *stcp;
+    int res;
+    char *sdebug, *ddebug;
+
+    trace_colo_compare_main("compare tcp");
+    if (ppkt->size != spkt->size) {
+        if (trace_event_get_state(TRACE_COLO_COMPARE_MISCOMPARE)) {
+            trace_colo_compare_main("pkt size not same");
+        }
+        return -1;
+    }
+
+    ptcp = (struct tcphdr *)ppkt->transport_layer;
+    stcp = (struct tcphdr *)spkt->transport_layer;
+
+    if (ptcp->th_seq != stcp->th_seq) {
+        if (trace_event_get_state(TRACE_COLO_COMPARE_MISCOMPARE)) {
+            trace_colo_compare_main("pkt tcp seq not same");
+        }
+        return -1;
+    }
+
+    /*
+     * The 'identification' field in the IP header is *very* random
+     * it almost never matches.  Fudge this by ignoring differences in
+     * unfragmented packets; they'll normally sort themselves out if different
+     * anyway, and it should recover at the TCP level.
+     * An alternative would be to get both the primary and secondary to rewrite
+     * somehow; but that would need some sync traffic to sync the state
+     */
+    if (ntohs(ppkt->ip->ip_off) & IP_DF) {
+        spkt->ip->ip_id = ppkt->ip->ip_id;
+        /* and the sum will be different if the IDs were different */
+        spkt->ip->ip_sum = ppkt->ip->ip_sum;
+    }
+
+    res = memcmp(ppkt->data + ETH_HLEN, spkt->data + ETH_HLEN,
+                (spkt->size - ETH_HLEN));
+
+    if (res != 0 && trace_event_get_state(TRACE_COLO_COMPARE_MISCOMPARE)) {
+        sdebug = strdup(inet_ntoa(ppkt->ip->ip_src));
+        ddebug = strdup(inet_ntoa(ppkt->ip->ip_dst));
+        fprintf(stderr, "%s: src/dst: %s/%s p: seq/ack=%u/%u"
+        " s: seq/ack=%u/%u res=%d flags=%x/%x\n", __func__,
+                   sdebug, ddebug,
+                   ntohl(ptcp->th_seq), ntohl(ptcp->th_ack),
+                   ntohl(stcp->th_seq), ntohl(stcp->th_ack),
+                   res, ptcp->th_flags, stcp->th_flags);
+
+        trace_colo_compare_tcp_miscompare("Primary len", ppkt->size);
+        qemu_hexdump((char *)ppkt->data, stderr, "colo-compare", ppkt->size);
+        trace_colo_compare_tcp_miscompare("Secondary len", spkt->size);
+        qemu_hexdump((char *)spkt->data, stderr, "colo-compare", spkt->size);
+
+        g_free(sdebug);
+        g_free(ddebug);
+    }
+
+    return res;
+}
+
+/*
+ * called from the compare thread on the primary
+ * for compare udp packet
+ */
+static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
+{
+    int ret;
+
+    trace_colo_compare_main("compare udp");
+    ret = colo_packet_compare(ppkt, spkt);
+
+    if (ret) {
+        trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
+        qemu_hexdump((char *)ppkt->data, stderr, "colo-compare", ppkt->size);
+        trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
+        qemu_hexdump((char *)spkt->data, stderr, "colo-compare", spkt->size);
+    }
+
+    return ret;
+}
+
+/*
+ * called from the compare thread on the primary
+ * for compare icmp packet
+ */
+static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
+{
+    int network_length;
+    struct icmp *icmp_ppkt, *icmp_spkt;
+
+    trace_colo_compare_main("compare icmp");
+    network_length = ppkt->ip->ip_hl * 4;
+    if (ppkt->size != spkt->size ||
+        ppkt->size < network_length + ETH_HLEN) {
+        trace_colo_compare_icmp_miscompare_size(ppkt->size, spkt->size);
+        return -1;
+    }
+    icmp_ppkt = (struct icmp *)(ppkt->data + network_length + ETH_HLEN);
+    icmp_spkt = (struct icmp *)(spkt->data + network_length + ETH_HLEN);
+
+    if ((icmp_ppkt->icmp_type == icmp_spkt->icmp_type) &&
+        (icmp_ppkt->icmp_code == icmp_spkt->icmp_code)) {
+        if (icmp_ppkt->icmp_type == ICMP_REDIRECT) {
+            if (icmp_ppkt->icmp_gwaddr.s_addr !=
+                icmp_spkt->icmp_gwaddr.s_addr) {
+                trace_colo_compare_main("icmp_gwaddr.s_addr not same");
+                trace_colo_compare_icmp_miscompare_addr("ppkt s_addr",
+                        inet_ntoa(icmp_ppkt->icmp_gwaddr));
+                trace_colo_compare_icmp_miscompare_addr("spkt s_addr",
+                        inet_ntoa(icmp_spkt->icmp_gwaddr));
+                return -1;
+            }
+        } else if ((icmp_ppkt->icmp_type == ICMP_UNREACH) &&
+                   (icmp_ppkt->icmp_type == ICMP_UNREACH_NEEDFRAG)) {
+            if (icmp_ppkt->icmp_nextmtu != icmp_spkt->icmp_nextmtu) {
+                trace_colo_compare_main("icmp_nextmtu not same");
+                trace_colo_compare_icmp_miscompare_mtu("ppkt nextmtu",
+                                             icmp_ppkt->icmp_nextmtu);
+                trace_colo_compare_icmp_miscompare_mtu("spkt nextmtu",
+                                             icmp_spkt->icmp_nextmtu);
+                return -1;
+            }
+        }
+    } else {
+        return -1;
+    }
+
+    return 0;
+}
+
+/*
+ * called from the compare thread on the primary
+ * for compare other packet
+ */
+static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
 {
-    trace_colo_compare_main("compare all");
+    trace_colo_compare_main("compare other");
+    trace_colo_compare_ip_info(ppkt->size, inet_ntoa(ppkt->ip->ip_src),
+                               inet_ntoa(ppkt->ip->ip_dst), spkt->size,
+                               inet_ntoa(spkt->ip->ip_src),
+                               inet_ntoa(spkt->ip->ip_dst));
     return colo_packet_compare(ppkt, spkt);
 }
 
@@ -221,8 +368,24 @@ static void colo_compare_connection(void *opaque, void *user_data)
     while (!g_queue_is_empty(&conn->primary_list) &&
            !g_queue_is_empty(&conn->secondary_list)) {
         pkt = g_queue_pop_tail(&conn->primary_list);
-        result = g_queue_find_custom(&conn->secondary_list,
-                              pkt, (GCompareFunc)colo_packet_compare_all);
+        switch (conn->ip_proto) {
+        case IPPROTO_TCP:
+            result = g_queue_find_custom(&conn->secondary_list,
+                     pkt, (GCompareFunc)colo_packet_compare_tcp);
+            break;
+        case IPPROTO_UDP:
+            result = g_queue_find_custom(&conn->secondary_list,
+                     pkt, (GCompareFunc)colo_packet_compare_udp);
+            break;
+        case IPPROTO_ICMP:
+            result = g_queue_find_custom(&conn->secondary_list,
+                     pkt, (GCompareFunc)colo_packet_compare_icmp);
+            break;
+        default:
+            result = g_queue_find_custom(&conn->secondary_list,
+                     pkt, (GCompareFunc)colo_packet_compare_other);
+            break;
+        }
 
         if (result) {
             ret = compare_chr_send(s->chr_out, pkt->data, pkt->size);
diff --git a/trace-events b/trace-events
index 1537e91..6686cdf 100644
--- a/trace-events
+++ b/trace-events
@@ -1919,5 +1919,11 @@ aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To 0x%" PRIx64
 
 # net/colo-compare.c
 colo_compare_main(const char *chr) ": %s"
+colo_compare_tcp_miscompare(const char *sta, int size) ": %s = %d"
+colo_compare_udp_miscompare(const char *sta, int size) ": %s = %d"
+colo_compare_icmp_miscompare_size(int psize, int ssize) ":ppkt size = %d spkt size = %d"
+colo_compare_icmp_miscompare_addr(const char *sta, const char *stb) ": %s  %s"
+colo_compare_icmp_miscompare_mtu(const char *sta, int size) ": %s  %d"
 colo_compare_ip_info(int psize, const char *sta, const char *stb, int ssize, const char *stc, const char *std) "ppkt size = %d, ip_src = %s, ip_dst = %s, spkt size = %d, ip_src = %s, ip_dst = %s"
 colo_old_packet_check_found(int64_t old_time) "%" PRId64
+colo_compare_miscompare(void) ""
-- 
2.7.4

^ permalink raw reply related	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [RFC PATCH V5 0/4] Introduce COLO-compare
  2016-06-23 11:34 [Qemu-devel] [RFC PATCH V5 0/4] Introduce COLO-compare Zhang Chen
                   ` (3 preceding siblings ...)
  2016-06-23 11:34 ` [Qemu-devel] [RFC PATCH V5 4/4] colo-compare: add TCP, UDP, ICMP packet comparison Zhang Chen
@ 2016-07-07  7:47 ` Zhang Chen
  2016-07-07  8:41   ` Jason Wang
  4 siblings, 1 reply; 21+ messages in thread
From: Zhang Chen @ 2016-07-07  7:47 UTC (permalink / raw)
  To: qemu devel, Jason Wang
  Cc: Li Zhijian, Wen Congyang, zhanghailiang, eddie . dong,
	Dr . David Alan Gilbert


Hi~~ Jason~

Have any comments?
By the way this patch set depend on the patch
[RFC PATCH V2] qemu-char: Fix context for g_source_attach()
but no one review it....any suggestion?


Thanks
Zhang Chen


On 06/23/2016 07:34 PM, Zhang Chen wrote:
> COLO-compare is a part of COLO project. It is used
> to compare the network package to help COLO decide
> whether to do checkpoint.
>
> the full version in this github:
> https://github.com/zhangckid/qemu/tree/colo-v2.7-proxy-mode-compare-with-colo-base-jun20
>
> v5:
>   p3:
>      - comments from Jason
>        we poll and handle chardev in comapre thread,
>        Through this way, there's no need for extra
>        synchronization with main loop
>        this depend on another patch:
>        qemu-char: Fix context for g_source_attach()
>      - remove QemuEvent
>   p2:
>      - remove conn->list_lock
>   p1:
>      - move compare_pri/sec_chr_in to p3
>      - move compare_chr_send to p2
>
> v4:
>   p4:
>      - add some comments
>      - fix some trace-events
>      - fix tcp compare error
>   p3:
>      - add rcu_read_lock().
>      - fix trace name
>      - fix jason's other comments
>      - rebase some Dave's branch function
>   p2:
>      - colo_compare_connection() change g_queue_push_head() to
>      - g_queue_push_tail() match to sorted order.
>      - remove pkt->s
>      - move data structure to colo-base.h
>      - add colo-base.c reuse codes for filter-rewriter
>      - add some filter-rewriter needs struct
>      - depends on previous SocketReadState patch
>   p1:
>      - except move qemu_chr_add_handlers()
>        to colo thread
>      - remove class_finalize
>      - remove secondary arp codes
>      - depends on previous SocketReadState patch
>
> v3:
>    - rebase colo-compare to colo-frame v2.7
>    - fix most of Dave's comments
>      (except RCU)
>    - add TCP,UDP,ICMP and other packet comparison
>    - add trace-event
>    - add some comments
>    - other bug fix
>    - add RFC index
>    - add usage in patch 1/4
>
> v2:
>    - add jhash.h
>
> v1:
>    - initial patch
>
>
> Zhang Chen (4):
>    colo-compare: introduce colo compare initialization
>    colo-compare: track connection and enqueue packet
>    colo-compare: introduce packet comparison thread
>    colo-compare: add TCP,UDP,ICMP packet comparison
>
>   include/qemu/jhash.h |  61 +++++
>   net/Makefile.objs    |   2 +
>   net/colo-base.c      | 195 ++++++++++++++
>   net/colo-base.h      |  91 +++++++
>   net/colo-compare.c   | 742 +++++++++++++++++++++++++++++++++++++++++++++++++++
>   qemu-options.hx      |  34 +++
>   trace-events         |  11 +
>   vl.c                 |   3 +-
>   8 files changed, 1138 insertions(+), 1 deletion(-)
>   create mode 100644 include/qemu/jhash.h
>   create mode 100644 net/colo-base.c
>   create mode 100644 net/colo-base.h
>   create mode 100644 net/colo-compare.c
>

-- 
Thanks
zhangchen

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [RFC PATCH V5 0/4] Introduce COLO-compare
  2016-07-07  7:47 ` [Qemu-devel] [RFC PATCH V5 0/4] Introduce COLO-compare Zhang Chen
@ 2016-07-07  8:41   ` Jason Wang
  0 siblings, 0 replies; 21+ messages in thread
From: Jason Wang @ 2016-07-07  8:41 UTC (permalink / raw)
  To: Zhang Chen, qemu devel
  Cc: Li Zhijian, Wen Congyang, zhanghailiang, eddie . dong,
	Dr . David Alan Gilbert



On 2016年07月07日 15:47, Zhang Chen wrote:
>
> Hi~~ Jason~
>
> Have any comments?
> By the way this patch set depend on the patch

Will go through this tomorrow.

> [RFC PATCH V2] qemu-char: Fix context for g_source_attach()
> but no one review it....any suggestion?

I think you may want to ping the maintainer, and maybe you can cc Fam 
who is doing a lot of works on main loop.

Thanks

>
>
> Thanks
> Zhang Chen 

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [RFC PATCH V5 1/4] colo-compare: introduce colo compare initialization
  2016-06-23 11:34 ` [Qemu-devel] [RFC PATCH V5 1/4] colo-compare: introduce colo compare initialization Zhang Chen
@ 2016-07-08  3:40   ` Jason Wang
  2016-07-08  8:21     ` Zhang Chen
  0 siblings, 1 reply; 21+ messages in thread
From: Jason Wang @ 2016-07-08  3:40 UTC (permalink / raw)
  To: Zhang Chen, qemu devel
  Cc: Li Zhijian, Wen Congyang, zhanghailiang, eddie . dong,
	Dr . David Alan Gilbert



On 2016年06月23日 19:34, Zhang Chen wrote:
> Packets coming from the primary char indev will be sent to outdev
> Packets coming from the secondary char dev will be dropped
>
> usage:
>
> primary:
> -netdev tap,id=hn0,vhost=off,script=/etc/qemu-ifup,downscript=/etc/qemu-ifdown
> -device e1000,id=e0,netdev=hn0,mac=52:a4:00:12:78:66
> -chardev socket,id=mirror0,host=3.3.3.3,port=9003,server,nowait
> -chardev socket,id=compare1,host=3.3.3.3,port=9004,server,nowait
> -chardev socket,id=compare0,host=3.3.3.3,port=9001,server,nowait
> -chardev socket,id=compare0-0,host=3.3.3.3,port=9001
> -chardev socket,id=compare_out,host=3.3.3.3,port=9005,server,nowait
> -chardev socket,id=compare_out0,host=3.3.3.3,port=9005
> -object filter-mirror,id=m0,netdev=hn0,queue=tx,outdev=mirror0
> -object filter-redirector,netdev=hn0,id=redire0,queue=rx,indev=compare_out
> -object filter-redirector,netdev=hn0,id=redire1,queue=rx,outdev=compare0
> -object colo-compare,id=comp0,primary_in=compare0-0,secondary_in=compare1,outdev=compare_out0
>
> secondary:
> -netdev tap,id=hn0,vhost=off,script=/etc/qemu-ifup,down script=/etc/qemu-ifdown
> -device e1000,netdev=hn0,mac=52:a4:00:12:78:66
> -chardev socket,id=red0,host=3.3.3.3,port=9003
> -chardev socket,id=red1,host=3.3.3.3,port=9004
> -object filter-redirector,id=f1,netdev=hn0,queue=tx,indev=red0
> -object filter-redirector,id=f2,netdev=hn0,queue=rx,outdev=red1

Consider we finally want a non-rfc patch, it's better to have a some 
explanations on the above configurations since it was not easy to for 
starters at first glance.Maybe you can use either a ascii figure or a 
paragraph. Also need to explain the parameter of colo-compare in detail.

>
> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
> ---
>   net/Makefile.objs  |   1 +
>   net/colo-compare.c | 231 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>   qemu-options.hx    |  34 ++++++++
>   vl.c               |   3 +-
>   4 files changed, 268 insertions(+), 1 deletion(-)
>   create mode 100644 net/colo-compare.c
>
> diff --git a/net/Makefile.objs b/net/Makefile.objs
> index b7c22fd..ba92f73 100644
> --- a/net/Makefile.objs
> +++ b/net/Makefile.objs
> @@ -16,3 +16,4 @@ common-obj-$(CONFIG_NETMAP) += netmap.o
>   common-obj-y += filter.o
>   common-obj-y += filter-buffer.o
>   common-obj-y += filter-mirror.o
> +common-obj-y += colo-compare.o
> diff --git a/net/colo-compare.c b/net/colo-compare.c
> new file mode 100644
> index 0000000..a3e1456
> --- /dev/null
> +++ b/net/colo-compare.c
> @@ -0,0 +1,231 @@
> +/*
> + * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
> + * (a.k.a. Fault Tolerance or Continuous Replication)
> + *
> + * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
> + * Copyright (c) 2016 FUJITSU LIMITED
> + * Copyright (c) 2016 Intel Corporation
> + *
> + * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or
> + * later.  See the COPYING file in the top-level directory.
> + */
> +
> +#include "qemu/osdep.h"
> +#include "qemu/error-report.h"
> +#include "qemu-common.h"
> +#include "qapi/qmp/qerror.h"
> +#include "qapi/error.h"
> +#include "net/net.h"
> +#include "net/vhost_net.h"
> +#include "qom/object_interfaces.h"
> +#include "qemu/iov.h"
> +#include "qom/object.h"
> +#include "qemu/typedefs.h"
> +#include "net/queue.h"
> +#include "sysemu/char.h"
> +#include "qemu/sockets.h"
> +#include "qapi-visit.h"
> +#include "trace.h"

Looks like trace were not really used in the patch, you can delay the 
inclusion until is was really used.

> +
> +#define TYPE_COLO_COMPARE "colo-compare"
> +#define COLO_COMPARE(obj) \
> +    OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
> +
> +#define COMPARE_READ_LEN_MAX NET_BUFSIZE
> +
> +static QTAILQ_HEAD(, CompareState) net_compares =
> +       QTAILQ_HEAD_INITIALIZER(net_compares);

What's the usage of this? A comment would be better.

> +
> +typedef struct CompareState {
> +    Object parent;
> +
> +    char *pri_indev;
> +    char *sec_indev;
> +    char *outdev;
> +    CharDriverState *chr_pri_in;
> +    CharDriverState *chr_sec_in;
> +    CharDriverState *chr_out;
> +    QTAILQ_ENTRY(CompareState) next;
> +    SocketReadState pri_rs;
> +    SocketReadState sec_rs;
> +} CompareState;
> +
> +typedef struct CompareClass {
> +    ObjectClass parent_class;
> +} CompareClass;
> +
> +static char *compare_get_pri_indev(Object *obj, Error **errp)
> +{
> +    CompareState *s = COLO_COMPARE(obj);
> +
> +    return g_strdup(s->pri_indev);
> +}
> +
> +static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
> +{
> +    CompareState *s = COLO_COMPARE(obj);
> +
> +    g_free(s->pri_indev);
> +    s->pri_indev = g_strdup(value);

I think we need do something more than this, e.g release the orig dev 
and get the new one? Or just forbid setting this property.

And looks like we have similar issues for sec_indev and outdev.

> +}
> +
> +static char *compare_get_sec_indev(Object *obj, Error **errp)
> +{
> +    CompareState *s = COLO_COMPARE(obj);
> +
> +    return g_strdup(s->sec_indev);
> +}
> +
> +static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
> +{
> +    CompareState *s = COLO_COMPARE(obj);
> +
> +    g_free(s->sec_indev);
> +    s->sec_indev = g_strdup(value);
> +}
> +
> +static char *compare_get_outdev(Object *obj, Error **errp)
> +{
> +    CompareState *s = COLO_COMPARE(obj);
> +
> +    return g_strdup(s->outdev);
> +}
> +
> +static void compare_set_outdev(Object *obj, const char *value, Error **errp)
> +{
> +    CompareState *s = COLO_COMPARE(obj);
> +
> +    g_free(s->outdev);
> +    s->outdev = g_strdup(value);
> +}
> +
> +static void compare_pri_rs_finalize(SocketReadState *pri_rs)
> +{
> +    /* if packet_enqueue pri pkt failed we will send unsupported packet */
> +}
> +
> +static void compare_sec_rs_finalize(SocketReadState *sec_rs)
> +{
> +    /* if packet_enqueue sec pkt failed we will notify trace */
> +}
> +
> +/*
> + * called from the main thread on the primary
> + * to setup colo-compare.
> + */
> +static void colo_compare_complete(UserCreatable *uc, Error **errp)
> +{
> +    CompareState *s = COLO_COMPARE(uc);
> +
> +    if (!s->pri_indev || !s->sec_indev || !s->outdev) {
> +        error_setg(errp, "colo compare needs 'primary_in' ,"
> +                   "'secondary_in','outdev' property set");
> +        return;
> +    } else if (!strcmp(s->pri_indev, s->outdev) ||
> +               !strcmp(s->sec_indev, s->outdev) ||
> +               !strcmp(s->pri_indev, s->sec_indev)) {
> +        error_setg(errp, "'indev' and 'outdev' could not be same "
> +                   "for compare module");
> +        return;
> +    }
> +
> +    s->chr_pri_in = qemu_chr_find(s->pri_indev);
> +    if (s->chr_pri_in == NULL) {
> +        error_setg(errp, "Primary IN Device '%s' not found",
> +                   s->pri_indev);
> +        return;
> +    }
> +
> +    s->chr_sec_in = qemu_chr_find(s->sec_indev);
> +    if (s->chr_sec_in == NULL) {
> +        error_setg(errp, "Secondary IN Device '%s' not found",
> +                   s->sec_indev);
> +        return;
> +    }
> +
> +    s->chr_out = qemu_chr_find(s->outdev);
> +    if (s->chr_out == NULL) {
> +        error_setg(errp, "OUT Device '%s' not found", s->outdev);
> +        return;
> +    }
> +
> +    qemu_chr_fe_claim_no_fail(s->chr_pri_in);
> +
> +    qemu_chr_fe_claim_no_fail(s->chr_sec_in);
> +
> +    qemu_chr_fe_claim_no_fail(s->chr_out);
> +    QTAILQ_INSERT_TAIL(&net_compares, s, next);
> +
> +    net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize);
> +    net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize);
> +
> +    return;
> +}
> +
> +static void colo_compare_class_init(ObjectClass *oc, void *data)
> +{
> +    UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
> +
> +    ucc->complete = colo_compare_complete;
> +}
> +
> +static void colo_compare_init(Object *obj)
> +{
> +    object_property_add_str(obj, "primary_in",
> +                            compare_get_pri_indev, compare_set_pri_indev,
> +                            NULL);
> +    object_property_add_str(obj, "secondary_in",
> +                            compare_get_sec_indev, compare_set_sec_indev,
> +                            NULL);
> +    object_property_add_str(obj, "outdev",
> +                            compare_get_outdev, compare_set_outdev,
> +                            NULL);
> +}
> +
> +static void colo_compare_finalize(Object *obj)
> +{
> +    CompareState *s = COLO_COMPARE(obj);
> +
> +    if (s->chr_pri_in) {
> +        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);

Why need do this?

> +        qemu_chr_fe_release(s->chr_pri_in);
> +    }
> +    if (s->chr_sec_in) {
> +        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
> +        qemu_chr_fe_release(s->chr_sec_in);
> +    }
> +    if (s->chr_out) {
> +        qemu_chr_fe_release(s->chr_out);
> +    }
> +
> +    if (!QTAILQ_EMPTY(&net_compares)) {
> +        QTAILQ_REMOVE(&net_compares, s, next);
> +    }
> +
> +    g_free(s->pri_indev);
> +    g_free(s->sec_indev);
> +    g_free(s->outdev);
> +}
> +
> +static const TypeInfo colo_compare_info = {
> +    .name = TYPE_COLO_COMPARE,
> +    .parent = TYPE_OBJECT,
> +    .instance_size = sizeof(CompareState),
> +    .instance_init = colo_compare_init,
> +    .instance_finalize = colo_compare_finalize,
> +    .class_size = sizeof(CompareClass),
> +    .class_init = colo_compare_class_init,
> +    .interfaces = (InterfaceInfo[]) {
> +        { TYPE_USER_CREATABLE },
> +        { }
> +    }
> +};
> +
> +static void register_types(void)
> +{
> +    type_register_static(&colo_compare_info);
> +}
> +
> +type_init(register_types);
> diff --git a/qemu-options.hx b/qemu-options.hx
> index 587de8f..14bade5 100644
> --- a/qemu-options.hx
> +++ b/qemu-options.hx
> @@ -3866,6 +3866,40 @@ Dump the network traffic on netdev @var{dev} to the file specified by
>   The file format is libpcap, so it can be analyzed with tools such as tcpdump
>   or Wireshark.
>   
> +@item -object colo-compare,id=@var{id},primary_in=@var{chardevid},secondary_in=@var{chardevid},
> +outdev=@var{chardevid}
> +
> +Colo-compare gets packet from primary_in@var{chardevid} and secondary_in@var{chardevid},
> +and outputs to outdev@var{chardevid}, we can use it with the help of filter-mirror and filter-redirector.

Need a better organization of the above description. E.g it does not 
even mention any comparing.

> +
> +The simple usage:
> +
> +@example
> +
> +primary:
> +-netdev tap,id=hn0,vhost=off,script=/etc/qemu-ifup,downscript=/etc/qemu-ifdown
> +-device e1000,id=e0,netdev=hn0,mac=52:a4:00:12:78:66
> +-chardev socket,id=mirror0,host=3.3.3.3,port=9003,server,nowait
> +-chardev socket,id=compare1,host=3.3.3.3,port=9004,server,nowait
> +-chardev socket,id=compare0,host=3.3.3.3,port=9001,server,nowait
> +-chardev socket,id=compare0-0,host=3.3.3.3,port=9001
> +-chardev socket,id=compare_out,host=3.3.3.3,port=9005,server,nowait
> +-chardev socket,id=compare_out0,host=3.3.3.3,port=9005
> +-object filter-mirror,id=m0,netdev=hn0,queue=tx,outdev=mirror0
> +-object filter-redirector,netdev=hn0,id=redire0,queue=rx,indev=compare_out
> +-object filter-redirector,netdev=hn0,id=redire1,queue=rx,outdev=compare0
> +-object colo-compare,id=comp0,primary_in=compare0-0,secondary_in=compare1,outdev=compare_out0
> +
> +secondary:
> +-netdev tap,id=hn0,vhost=off,script=/etc/qemu-ifup,down script=/etc/qemu-ifdown
> +-device e1000,netdev=hn0,mac=52:a4:00:12:78:66
> +-chardev socket,id=red0,host=3.3.3.3,port=9003
> +-chardev socket,id=red1,host=3.3.3.3,port=9004
> +-object filter-redirector,id=f1,netdev=hn0,queue=tx,indev=red0
> +-object filter-redirector,id=f2,netdev=hn0,queue=rx,outdev=red1
> +
> +@end example
> +
>   @item -object secret,id=@var{id},data=@var{string},format=@var{raw|base64}[,keyid=@var{secretid},iv=@var{string}]
>   @item -object secret,id=@var{id},file=@var{filename},format=@var{raw|base64}[,keyid=@var{secretid},iv=@var{string}]
>   
> diff --git a/vl.c b/vl.c
> index cbe51ac..c6b9a6f 100644
> --- a/vl.c
> +++ b/vl.c
> @@ -2865,7 +2865,8 @@ static bool object_create_initial(const char *type)
>       if (g_str_equal(type, "filter-buffer") ||
>           g_str_equal(type, "filter-dump") ||
>           g_str_equal(type, "filter-mirror") ||
> -        g_str_equal(type, "filter-redirector")) {
> +        g_str_equal(type, "filter-redirector") ||
> +        g_str_equal(type, "colo-compare")) {
>           return false;
>       }
>   

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [RFC PATCH V5 2/4] colo-compare: track connection and enqueue packet
  2016-06-23 11:34 ` [Qemu-devel] [RFC PATCH V5 2/4] colo-compare: track connection and enqueue packet Zhang Chen
@ 2016-07-08  4:07   ` Jason Wang
  2016-07-08  9:56     ` Zhang Chen
  0 siblings, 1 reply; 21+ messages in thread
From: Jason Wang @ 2016-07-08  4:07 UTC (permalink / raw)
  To: Zhang Chen, qemu devel
  Cc: Li Zhijian, Wen Congyang, zhanghailiang, eddie . dong,
	Dr . David Alan Gilbert



On 2016年06月23日 19:34, Zhang Chen wrote:
> In this patch we use kernel jhash table to track
> connection, and then enqueue net packet like this:
>
> + CompareState ++
> |               |
> +---------------+   +---------------+         +---------------+
> |conn list      +--->conn           +--------->conn           |
> +---------------+   +---------------+         +---------------+
> |               |     |           |             |          |
> +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
>                    |primary |  |secondary    |primary | |secondary
>                    |packet  |  |packet  +    |packet  | |packet  +
>                    +--------+  +--------+    +--------+ +--------+
>                        |           |             |          |
>                    +---v----+  +---v----+    +---v----+ +---v----+
>                    |primary |  |secondary    |primary | |secondary
>                    |packet  |  |packet  +    |packet  | |packet  +
>                    +--------+  +--------+    +--------+ +--------+
>                        |           |             |          |
>                    +---v----+  +---v----+    +---v----+ +---v----+
>                    |primary |  |secondary    |primary | |secondary
>                    |packet  |  |packet  +    |packet  | |packet  +
>                    +--------+  +--------+    +--------+ +--------+

A paragraph to describe the above would be more than welcomed.

> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
> ---
>   include/qemu/jhash.h |  61 ++++++++++++++++
>   net/Makefile.objs    |   1 +
>   net/colo-base.c      | 194 +++++++++++++++++++++++++++++++++++++++++++++++++++
>   net/colo-base.h      |  88 +++++++++++++++++++++++
>   net/colo-compare.c   | 138 +++++++++++++++++++++++++++++++++++-
>   trace-events         |   3 +
>   6 files changed, 483 insertions(+), 2 deletions(-)
>   create mode 100644 include/qemu/jhash.h
>   create mode 100644 net/colo-base.c
>   create mode 100644 net/colo-base.h
>
> diff --git a/include/qemu/jhash.h b/include/qemu/jhash.h
> new file mode 100644
> index 0000000..0fcd875
> --- /dev/null
> +++ b/include/qemu/jhash.h
> @@ -0,0 +1,61 @@
> +/* jhash.h: Jenkins hash support.
> +  *
> +  * Copyright (C) 2006. Bob Jenkins (bob_jenkins@burtleburtle.net)
> +  *
> +  * http://burtleburtle.net/bob/hash/
> +  *
> +  * These are the credits from Bob's sources:
> +  *
> +  * lookup3.c, by Bob Jenkins, May 2006, Public Domain.
> +  *
> +  * These are functions for producing 32-bit hashes for hash table lookup.
> +  * hashword(), hashlittle(), hashlittle2(), hashbig(), mix(), and final()
> +  * are externally useful functions.  Routines to test the hash are
> +included
> +  * if SELF_TEST is defined.  You can use this free for any purpose.
> +It's in
> +  * the public domain.  It has no warranty.
> +  *
> +  * Copyright (C) 2009-2010 Jozsef Kadlecsik (kadlec@blackhole.kfki.hu)
> +  *
> +  * I've modified Bob's hash to be useful in the Linux kernel, and
> +  * any bugs present are my fault.
> +  * Jozsef
> +  */
> +
> +#ifndef QEMU_JHASH_H__
> +#define QEMU_JHASH_H__
> +
> +#include "qemu/bitops.h"
> +
> +/*
> + * hashtable relation copy from linux kernel jhash
> + */
> +
> +/* __jhash_mix -- mix 3 32-bit values reversibly. */
> +#define __jhash_mix(a, b, c)                \
> +{                                           \
> +    a -= c;  a ^= rol32(c, 4);  c += b;     \
> +    b -= a;  b ^= rol32(a, 6);  a += c;     \
> +    c -= b;  c ^= rol32(b, 8);  b += a;     \
> +    a -= c;  a ^= rol32(c, 16); c += b;     \
> +    b -= a;  b ^= rol32(a, 19); a += c;     \
> +    c -= b;  c ^= rol32(b, 4);  b += a;     \
> +}
> +
> +/* __jhash_final - final mixing of 3 32-bit values (a,b,c) into c */
> +#define __jhash_final(a, b, c)  \
> +{                               \
> +    c ^= b; c -= rol32(b, 14);  \
> +    a ^= c; a -= rol32(c, 11);  \
> +    b ^= a; b -= rol32(a, 25);  \
> +    c ^= b; c -= rol32(b, 16);  \
> +    a ^= c; a -= rol32(c, 4);   \
> +    b ^= a; b -= rol32(a, 14);  \
> +    c ^= b; c -= rol32(b, 24);  \
> +}
> +
> +/* An arbitrary initial parameter */
> +#define JHASH_INITVAL           0xdeadbeef
> +
> +#endif /* QEMU_JHASH_H__ */

Please split jhash into another patch.

> diff --git a/net/Makefile.objs b/net/Makefile.objs
> index ba92f73..119589f 100644
> --- a/net/Makefile.objs
> +++ b/net/Makefile.objs
> @@ -17,3 +17,4 @@ common-obj-y += filter.o
>   common-obj-y += filter-buffer.o
>   common-obj-y += filter-mirror.o
>   common-obj-y += colo-compare.o
> +common-obj-y += colo-base.o
> diff --git a/net/colo-base.c b/net/colo-base.c
> new file mode 100644
> index 0000000..7e263e8
> --- /dev/null
> +++ b/net/colo-base.c
> @@ -0,0 +1,194 @@
> +/*
> + * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
> + * (a.k.a. Fault Tolerance or Continuous Replication)
> + *
> + * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
> + * Copyright (c) 2016 FUJITSU LIMITED
> + * Copyright (c) 2016 Intel Corporation
> + *
> + * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or
> + * later.  See the COPYING file in the top-level directory.
> + */
> +
> +#include "qemu/osdep.h"
> +#include "qemu/error-report.h"
> +#include "net/colo-base.h"
> +
> +uint32_t connection_key_hash(const void *opaque)
> +{
> +    const ConnectionKey *key = opaque;
> +    uint32_t a, b, c;
> +
> +    /* Jenkins hash */
> +    a = b = c = JHASH_INITVAL + sizeof(*key);
> +    a += key->src.s_addr;
> +    b += key->dst.s_addr;
> +    c += (key->src_port | key->dst_port << 16);
> +    __jhash_mix(a, b, c);
> +
> +    a += key->ip_proto;
> +    __jhash_final(a, b, c);
> +
> +    return c;
> +}
> +
> +int connection_key_equal(const void *key1, const void *key2)
> +{
> +    return memcmp(key1, key2, sizeof(ConnectionKey)) == 0;
> +}
> +
> +int parse_packet_early(Packet *pkt)
> +{
> +    int network_length;
> +    uint8_t *data = pkt->data;
> +    uint16_t l3_proto;
> +    ssize_t l2hdr_len = eth_get_l2_hdr_length(data);
> +
> +    if (pkt->size < ETH_HLEN) {
> +        error_report("pkt->size < ETH_HLEN");
> +        return 1;
> +    }
> +    pkt->network_layer = data + ETH_HLEN;
> +    l3_proto = eth_get_l3_proto(data, l2hdr_len);
> +    if (l3_proto != ETH_P_IP) {
> +        return 1;
> +    }
> +
> +    network_length = pkt->ip->ip_hl * 4;
> +    if (pkt->size < ETH_HLEN + network_length) {
> +        error_report("pkt->size < network_layer + network_length");
> +        return 1;
> +    }
> +    pkt->transport_layer = pkt->network_layer + network_length;
> +    if (!pkt->transport_layer) {
> +        error_report("pkt->transport_layer is valid");
> +        return 1;
> +    }
> +
> +    return 0;
> +}
> +
> +void fill_connection_key(Packet *pkt, ConnectionKey *key, int mode)
> +{
> +    uint32_t tmp_ports;
> +
> +    key->ip_proto = pkt->ip->ip_p;
> +
> +    switch (key->ip_proto) {
> +    case IPPROTO_TCP:
> +    case IPPROTO_UDP:
> +    case IPPROTO_DCCP:
> +    case IPPROTO_ESP:
> +    case IPPROTO_SCTP:
> +    case IPPROTO_UDPLITE:
> +        tmp_ports = *(uint32_t *)(pkt->transport_layer);
> +        if (mode) {

Looks like mode is unnecessary here, you can actually compare and swap 
duing hashing to avoid mode here.

> +            key->src = pkt->ip->ip_src;
> +            key->dst = pkt->ip->ip_dst;
> +            key->src_port = ntohs(tmp_ports & 0xffff);
> +            key->dst_port = ntohs(tmp_ports >> 16);
> +        } else {
> +            key->dst = pkt->ip->ip_src;
> +            key->src = pkt->ip->ip_dst;
> +            key->dst_port = ntohs(tmp_ports & 0xffff);
> +            key->src_port = ntohs(tmp_ports >> 16);
> +        }
> +        break;
> +    case IPPROTO_AH:
> +        tmp_ports = *(uint32_t *)(pkt->transport_layer + 4);
> +        if (mode) {
> +            key->src = pkt->ip->ip_src;
> +            key->dst = pkt->ip->ip_dst;
> +            key->src_port = ntohs(tmp_ports & 0xffff);
> +            key->dst_port = ntohs(tmp_ports >> 16);
> +        } else {
> +            key->dst = pkt->ip->ip_src;
> +            key->src = pkt->ip->ip_dst;
> +            key->dst_port = ntohs(tmp_ports & 0xffff);
> +            key->src_port = ntohs(tmp_ports >> 16);
> +        }
> +        break;
> +    default:
> +        key->src_port = 0;
> +        key->dst_port = 0;
> +        break;
> +    }
> +}

This seems could be reused, please use a independent patch for 
connection key stuffs.

> +
> +Connection *connection_new(ConnectionKey *key)
> +{
> +    Connection *conn = g_slice_new(Connection);
> +
> +    conn->ip_proto = key->ip_proto;
> +    conn->processing = false;
> +    g_queue_init(&conn->primary_list);
> +    g_queue_init(&conn->secondary_list);
> +
> +    return conn;
> +}
> +
> +void connection_destroy(void *opaque)
> +{
> +    Connection *conn = opaque;
> +
> +    g_queue_foreach(&conn->primary_list, packet_destroy, NULL);
> +    g_queue_free(&conn->primary_list);
> +    g_queue_foreach(&conn->secondary_list, packet_destroy, NULL);
> +    g_queue_free(&conn->secondary_list);
> +    g_slice_free(Connection, conn);
> +}
> +
> +Packet *packet_new(const void *data, int size)
> +{
> +    Packet *pkt = g_slice_new(Packet);
> +
> +    pkt->data = g_memdup(data, size);
> +    pkt->size = size;
> +
> +    return pkt;
> +}
> +
> +void packet_destroy(void *opaque, void *user_data)
> +{
> +    Packet *pkt = opaque;
> +
> +    g_free(pkt->data);
> +    g_slice_free(Packet, pkt);
> +}
> +
> +/*
> + * Clear hashtable, stop this hash growing really huge
> + */
> +void connection_hashtable_reset(GHashTable *connection_track_table)
> +{
> +    g_hash_table_remove_all(connection_track_table);
> +}
> +
> +/* if not found, create a new connection and add to hash table */
> +Connection *connection_get(GHashTable *connection_track_table,
> +                           ConnectionKey *key,
> +                           uint32_t *hashtable_size)
> +{
> +    /* FIXME: protect connection_track_table */

I fail to understand why need protection here.

> +    Connection *conn = g_hash_table_lookup(connection_track_table, key);
> +
> +    if (conn == NULL) {
> +        ConnectionKey *new_key = g_memdup(key, sizeof(*key));
> +
> +        conn = connection_new(key);
> +
> +        (*hashtable_size) += 1;
> +        if (*hashtable_size > HASHTABLE_MAX_SIZE) {
> +            error_report("colo proxy connection hashtable full, clear it");

Is this a hint that we need a synchronization?

> +            connection_hashtable_reset(connection_track_table);
> +            *hashtable_size = 0;
> +            /* TODO:clear conn_list */

If we don't clear conn_list, looks like a bug, so probably need to do 
this in this patch.

> +        }
> +
> +        g_hash_table_insert(connection_track_table, new_key, conn);
> +    }
> +
> +    return conn;
> +}
> diff --git a/net/colo-base.h b/net/colo-base.h
> new file mode 100644
> index 0000000..01c1a5d
> --- /dev/null
> +++ b/net/colo-base.h
> @@ -0,0 +1,88 @@
> +/*
> + * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
> + * (a.k.a. Fault Tolerance or Continuous Replication)
> + *
> + * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
> + * Copyright (c) 2016 FUJITSU LIMITED
> + * Copyright (c) 2016 Intel Corporation
> + *
> + * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or
> + * later.  See the COPYING file in the top-level directory.
> + */
> +
> +#ifndef QEMU_COLO_BASE_H
> +#define QEMU_COLO_BASE_H
> +
> +#include "slirp/slirp.h"
> +#include "qemu/jhash.h"
> +#include "qemu/rcu.h"

Don't see any rcu usage in this patch.

> +
> +#define HASHTABLE_MAX_SIZE 16384
> +
> +typedef enum colo_conn_state {

This looks like can only take care of TCP, so probably add "tcp" in its 
name.

> +     COLO_CONN_IDLE,
> +
> +    /* States on the primary: For incoming connection */
> +     COLO_CONN_PRI_IN_SYN,   /* Received Syn */
> +     COLO_CONN_PRI_IN_PSYNACK, /* Received syn/ack from primary, but not
> +                                yet from secondary */
> +     COLO_CONN_PRI_IN_SSYNACK, /* Received syn/ack from secondary, but
> +                                  not yet from primary */
> +     COLO_CONN_PRI_IN_SYNACK,  /* Received syn/ack from both */
> +     COLO_CONN_PRI_IN_ESTABLISHED, /* Got the ACK */
> +
> +    /* States on the secondary: For incoming connection */
> +     COLO_CONN_SEC_IN_SYNACK,      /* We sent a syn/ack */
> +     COLO_CONN_SEC_IN_ACK,         /* Saw the ack but didn't yet see our syn/ack */
> +     COLO_CONN_SEC_IN_ESTABLISHED, /* Got the ACK from the outside */

Should we care about any FIN state here?

> +} colo_conn_state;
> +
> +typedef struct Packet {
> +    void *data;
> +    union {
> +        uint8_t *network_layer;
> +        struct ip *ip;
> +    };
> +    uint8_t *transport_layer;
> +    int size;
> +} Packet;

We may start to consider shares codes between e.g hw/net/net_tx_pkt.c.

> +
> +typedef struct ConnectionKey {
> +    /* (src, dst) must be grouped, in the same way than in IP header */
> +    struct in_addr src;
> +    struct in_addr dst;
> +    uint16_t src_port;
> +    uint16_t dst_port;
> +    uint8_t ip_proto;
> +} QEMU_PACKED ConnectionKey;
> +
> +typedef struct Connection {
> +    /* connection primary send queue: element type: Packet */
> +    GQueue primary_list;
> +    /* connection secondary send queue: element type: Packet */
> +    GQueue secondary_list;
> +    /* flag to enqueue unprocessed_connections */
> +    bool processing;
> +    uint8_t ip_proto;
> +    /* be used by filter-rewriter */
> +    colo_conn_state state;
> +    tcp_seq  primary_seq;
> +    tcp_seq  secondary_seq;
> +} Connection;
> +
> +uint32_t connection_key_hash(const void *opaque);
> +int connection_key_equal(const void *opaque1, const void *opaque2);
> +int parse_packet_early(Packet *pkt);
> +void fill_connection_key(Packet *pkt, ConnectionKey *key, int mode);
> +Connection *connection_new(ConnectionKey *key);
> +void connection_destroy(void *opaque);
> +Connection *connection_get(GHashTable *connection_track_table,
> +                           ConnectionKey *key,
> +                           uint32_t *hashtable_size);
> +void connection_hashtable_reset(GHashTable *connection_track_table);
> +Packet *packet_new(const void *data, int size);
> +void packet_destroy(void *opaque, void *user_data);
> +
> +#endif /* QEMU_COLO_BASE_H */
> diff --git a/net/colo-compare.c b/net/colo-compare.c
> index a3e1456..4231fe7 100644
> --- a/net/colo-compare.c
> +++ b/net/colo-compare.c
> @@ -28,6 +28,7 @@
>   #include "qemu/sockets.h"
>   #include "qapi-visit.h"
>   #include "trace.h"
> +#include "net/colo-base.h"
>   
>   #define TYPE_COLO_COMPARE "colo-compare"
>   #define COLO_COMPARE(obj) \
> @@ -38,6 +39,28 @@
>   static QTAILQ_HEAD(, CompareState) net_compares =
>          QTAILQ_HEAD_INITIALIZER(net_compares);
>   
> +/*
> +  + CompareState ++
> +  |               |
> +  +---------------+   +---------------+         +---------------+
> +  |conn list      +--->conn           +--------->conn           |
> +  +---------------+   +---------------+         +---------------+
> +  |               |     |           |             |          |
> +  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
> +                    |primary |  |secondary    |primary | |secondary
> +                    |packet  |  |packet  +    |packet  | |packet  +
> +                    +--------+  +--------+    +--------+ +--------+
> +                        |           |             |          |
> +                    +---v----+  +---v----+    +---v----+ +---v----+
> +                    |primary |  |secondary    |primary | |secondary
> +                    |packet  |  |packet  +    |packet  | |packet  +
> +                    +--------+  +--------+    +--------+ +--------+
> +                        |           |             |          |
> +                    +---v----+  +---v----+    +---v----+ +---v----+
> +                    |primary |  |secondary    |primary | |secondary
> +                    |packet  |  |packet  +    |packet  | |packet  +
> +                    +--------+  +--------+    +--------+ +--------+
> +*/
>   typedef struct CompareState {
>       Object parent;
>   
> @@ -50,12 +73,103 @@ typedef struct CompareState {
>       QTAILQ_ENTRY(CompareState) next;
>       SocketReadState pri_rs;
>       SocketReadState sec_rs;
> +
> +    /* connection list: the connections belonged to this NIC could be found
> +     * in this list.
> +     * element type: Connection
> +     */
> +    GQueue conn_list;
> +    QemuMutex conn_list_lock; /* to protect conn_list */

Why need this mutex?

> +    /* hashtable to save connection */
> +    GHashTable *connection_track_table;
> +    /* to save unprocessed_connections */
> +    GQueue unprocessed_connections;
> +    /* proxy current hash size */
> +    uint32_t hashtable_size;
>   } CompareState;
>   
>   typedef struct CompareClass {
>       ObjectClass parent_class;
>   } CompareClass;
>   
> +enum {
> +    PRIMARY_IN = 0,
> +    SECONDARY_IN,
> +};
> +
> +static int compare_chr_send(CharDriverState *out,
> +                            const uint8_t *buf,
> +                            uint32_t size);
> +
> +/*
> + * Return 0 on success, if return -1 means the pkt
> + * is unsupported(arp and ipv6) and will be sent later
> + */
> +static int packet_enqueue(CompareState *s, int mode)
> +{
> +    ConnectionKey key = {{ 0 } };
> +    Packet *pkt = NULL;
> +    Connection *conn;
> +
> +    if (mode == PRIMARY_IN) {
> +        pkt = packet_new(s->pri_rs.buf, s->pri_rs.packet_len);
> +    } else {
> +        pkt = packet_new(s->sec_rs.buf, s->sec_rs.packet_len);
> +    }
> +
> +    if (parse_packet_early(pkt)) {
> +        packet_destroy(pkt, NULL);
> +        pkt = NULL;
> +        return -1;
> +    }
> +    fill_connection_key(pkt, &key, PRIMARY_IN);
> +
> +    conn = connection_get(s->connection_track_table,
> +                          &key,
> +                          &s->hashtable_size);
> +    if (!conn->processing) {
> +        qemu_mutex_lock(&s->conn_list_lock);
> +        g_queue_push_tail(&s->conn_list, conn);
> +        qemu_mutex_unlock(&s->conn_list_lock);
> +        conn->processing = true;
> +    }
> +
> +    if (mode == PRIMARY_IN) {
> +        g_queue_push_tail(&conn->primary_list, pkt);
> +    } else {
> +        g_queue_push_tail(&conn->secondary_list, pkt);
> +    }
> +
> +    return 0;
> +}
> +
> +static int compare_chr_send(CharDriverState *out,
> +                            const uint8_t *buf,
> +                            uint32_t size)
> +{
> +    int ret = 0;
> +    uint32_t len = htonl(size);
> +
> +    if (!size) {
> +        return 0;
> +    }
> +
> +    ret = qemu_chr_fe_write_all(out, (uint8_t *)&len, sizeof(len));
> +    if (ret != sizeof(len)) {
> +        goto err;
> +    }
> +
> +    ret = qemu_chr_fe_write_all(out, (uint8_t *)buf, size);
> +    if (ret != size) {
> +        goto err;
> +    }
> +
> +    return 0;
> +
> +err:
> +    return ret < 0 ? ret : -EIO;
> +}
> +
>   static char *compare_get_pri_indev(Object *obj, Error **errp)
>   {
>       CompareState *s = COLO_COMPARE(obj);
> @@ -103,12 +217,21 @@ static void compare_set_outdev(Object *obj, const char *value, Error **errp)
>   
>   static void compare_pri_rs_finalize(SocketReadState *pri_rs)
>   {
> -    /* if packet_enqueue pri pkt failed we will send unsupported packet */
> +    CompareState *s = container_of(pri_rs, CompareState, pri_rs);
> +
> +    if (packet_enqueue(s, PRIMARY_IN)) {
> +        trace_colo_compare_main("primary: unsupported packet in");
> +        compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len);
> +    }

Do we have a upper limit on the maximum numbers of packets could be 
queued? If not, guest may easily trigger OOM.

>   }
>   
>   static void compare_sec_rs_finalize(SocketReadState *sec_rs)
>   {
> -    /* if packet_enqueue sec pkt failed we will notify trace */
> +    CompareState *s = container_of(sec_rs, CompareState, sec_rs);
> +
> +    if (packet_enqueue(s, SECONDARY_IN)) {
> +        trace_colo_compare_main("secondary: unsupported packet in");
> +    }
>   }
>   
>   /*
> @@ -161,6 +284,15 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>       net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize);
>       net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize);
>   
> +    g_queue_init(&s->conn_list);
> +    qemu_mutex_init(&s->conn_list_lock);
> +    s->hashtable_size = 0;
> +
> +    s->connection_track_table = g_hash_table_new_full(connection_key_hash,
> +                                                      connection_key_equal,
> +                                                      g_free,
> +                                                      connection_destroy);
> +
>       return;
>   }
>   
> @@ -203,6 +335,8 @@ static void colo_compare_finalize(Object *obj)
>       if (!QTAILQ_EMPTY(&net_compares)) {
>           QTAILQ_REMOVE(&net_compares, s, next);
>       }
> +    qemu_mutex_destroy(&s->conn_list_lock);
> +    g_queue_free(&s->conn_list);
>   
>       g_free(s->pri_indev);
>       g_free(s->sec_indev);
> diff --git a/trace-events b/trace-events
> index ca7211b..703de1a 100644
> --- a/trace-events
> +++ b/trace-events
> @@ -1916,3 +1916,6 @@ aspeed_vic_update_fiq(int flags) "Raising FIQ: %d"
>   aspeed_vic_update_irq(int flags) "Raising IRQ: %d"
>   aspeed_vic_read(uint64_t offset, unsigned size, uint32_t value) "From 0x%" PRIx64 " of size %u: 0x%" PRIx32
>   aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To 0x%" PRIx64 " of size %u: 0x%" PRIx32
> +
> +# net/colo-compare.c
> +colo_compare_main(const char *chr) ": %s"

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [RFC PATCH V5 3/4] colo-compare: introduce packet comparison thread
  2016-06-23 11:34 ` [Qemu-devel] [RFC PATCH V5 3/4] colo-compare: introduce packet comparison thread Zhang Chen
@ 2016-07-08  4:23   ` Jason Wang
  2016-07-11  7:17     ` Zhang Chen
  0 siblings, 1 reply; 21+ messages in thread
From: Jason Wang @ 2016-07-08  4:23 UTC (permalink / raw)
  To: Zhang Chen, qemu devel
  Cc: Li Zhijian, Wen Congyang, zhanghailiang, eddie . dong,
	Dr . David Alan Gilbert



On 2016年06月23日 19:34, Zhang Chen wrote:
> if packets are same, we send primary packet and drop secondary
> packet, otherwise notify COLO do checkpoint.

More verbose please, e.g how to handle each case of exception (or maybe 
comment in the code).

>
> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
> ---
>   net/colo-base.c    |   1 +
>   net/colo-base.h    |   3 +
>   net/colo-compare.c | 214 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>   trace-events       |   2 +
>   4 files changed, 220 insertions(+)
>
> diff --git a/net/colo-base.c b/net/colo-base.c
> index 7e263e8..9673661 100644
> --- a/net/colo-base.c
> +++ b/net/colo-base.c
> @@ -146,6 +146,7 @@ Packet *packet_new(const void *data, int size)
>   
>       pkt->data = g_memdup(data, size);
>       pkt->size = size;
> +    pkt->creation_ms = qemu_clock_get_ms(QEMU_CLOCK_HOST);
>   
>       return pkt;
>   }
> diff --git a/net/colo-base.h b/net/colo-base.h
> index 01c1a5d..8bb1043 100644
> --- a/net/colo-base.h
> +++ b/net/colo-base.h
> @@ -18,6 +18,7 @@
>   #include "slirp/slirp.h"
>   #include "qemu/jhash.h"
>   #include "qemu/rcu.h"
> +#include "qemu/timer.h"
>   
>   #define HASHTABLE_MAX_SIZE 16384
>   
> @@ -47,6 +48,8 @@ typedef struct Packet {
>       };
>       uint8_t *transport_layer;
>       int size;
> +    /* Time of packet creation, in wall clock ms */
> +    int64_t creation_ms;
>   } Packet;
>   
>   typedef struct ConnectionKey {
> diff --git a/net/colo-compare.c b/net/colo-compare.c
> index 4231fe7..928d729 100644
> --- a/net/colo-compare.c
> +++ b/net/colo-compare.c
> @@ -35,6 +35,8 @@
>       OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
>   
>   #define COMPARE_READ_LEN_MAX NET_BUFSIZE
> +/* TODO: Should be configurable */
> +#define REGULAR_CHECK_MS 400

"REGULAR" seems to generic, need a better name.

>   
>   static QTAILQ_HEAD(, CompareState) net_compares =
>          QTAILQ_HEAD_INITIALIZER(net_compares);
> @@ -86,6 +88,11 @@ typedef struct CompareState {
>       GQueue unprocessed_connections;
>       /* proxy current hash size */
>       uint32_t hashtable_size;
> +    /* compare thread, a thread for each NIC */
> +    QemuThread thread;
> +    int thread_status;
> +    /* Timer used on the primary to find packets that are never matched */
> +    QEMUTimer *timer;
>   } CompareState;
>   
>   typedef struct CompareClass {
> @@ -97,6 +104,15 @@ enum {
>       SECONDARY_IN,
>   };
>   
> +enum {
> +    /* compare thread isn't started */
> +    COMPARE_THREAD_NONE,
> +    /* compare thread is running */
> +    COMPARE_THREAD_RUNNING,
> +    /* compare thread exit */
> +    COMPARE_THREAD_EXIT,
> +};
> +
>   static int compare_chr_send(CharDriverState *out,
>                               const uint8_t *buf,
>                               uint32_t size);
> @@ -143,6 +159,98 @@ static int packet_enqueue(CompareState *s, int mode)
>       return 0;
>   }
>   
> +/*
> + * The IP packets sent by primary and secondary
> + * will be compared in here
> + * TODO support ip fragment, Out-Of-Order
> + * return:    0  means packet same
> + *            > 0 || < 0 means packet different
> + */
> +static int colo_packet_compare(Packet *ppkt, Packet *spkt)
> +{
> +    trace_colo_compare_ip_info(ppkt->size, inet_ntoa(ppkt->ip->ip_src),
> +                               inet_ntoa(ppkt->ip->ip_dst), spkt->size,
> +                               inet_ntoa(spkt->ip->ip_src),
> +                               inet_ntoa(spkt->ip->ip_dst));
> +
> +    if (ppkt->size == spkt->size) {
> +        return memcmp(ppkt->data, spkt->data, spkt->size);
> +    } else {
> +        return -1;
> +    }
> +}
> +
> +static int colo_packet_compare_all(Packet *spkt, Packet *ppkt)
> +{
> +    trace_colo_compare_main("compare all");
> +    return colo_packet_compare(ppkt, spkt);
> +}
> +
> +static void colo_old_packet_check(void *opaque_packet, void *opaque_found)
> +{
> +    int64_t now;
> +    bool *found_old = (bool *)opaque_found;
> +    Packet *ppkt = (Packet *)opaque_packet;
> +
> +    if (*found_old) {
> +        /* Someone found an old packet earlier in the queue */
> +        return;
> +    }
> +
> +    now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
> +    if ((ppkt->creation_ms < now) &&

Any case that ppkt->creation_ms >= now?

> +        ((now - ppkt->creation_ms) > REGULAR_CHECK_MS)) {
> +        trace_colo_old_packet_check_found(ppkt->creation_ms);
> +        *found_old = true;
> +    }
> +}
> +
> +/*
> + * called from the compare thread on the primary
> + * for compare connection
> + */
> +static void colo_compare_connection(void *opaque, void *user_data)
> +{
> +    CompareState *s = user_data;
> +    Connection *conn = opaque;
> +    Packet *pkt = NULL;
> +    GList *result = NULL;
> +    bool found_old;
> +    int ret;
> +
> +    while (!g_queue_is_empty(&conn->primary_list) &&
> +           !g_queue_is_empty(&conn->secondary_list)) {
> +        pkt = g_queue_pop_tail(&conn->primary_list);
> +        result = g_queue_find_custom(&conn->secondary_list,
> +                              pkt, (GCompareFunc)colo_packet_compare_all);
> +
> +        if (result) {
> +            ret = compare_chr_send(s->chr_out, pkt->data, pkt->size);
> +            if (ret < 0) {
> +                error_report("colo_send_primary_packet failed");
> +            }
> +            trace_colo_compare_main("packet same and release packet");
> +            g_queue_remove(&conn->secondary_list, result->data);
> +        } else {

A question I forget the answer, so may ask again. What if secondary 
packet comes late?

> +            trace_colo_compare_main("packet different");
> +            g_queue_push_tail(&conn->primary_list, pkt);
> +            /* TODO: colo_notify_checkpoint();*/
> +            break;
> +        }
> +    }
> +
> +    /*
> +     * Look for old packets that the secondary hasn't matched,
> +     * if we have some then we have to checkpoint to wake
> +     * the secondary up.
> +     */
> +    found_old = false;
> +    g_queue_foreach(&conn->primary_list, colo_old_packet_check, &found_old);
> +    if (found_old) {
> +        /* TODO: colo_notify_checkpoint();*/

Shouldn't we need to remove all "old" packets here?

> +    }
> +}
> +
>   static int compare_chr_send(CharDriverState *out,
>                               const uint8_t *buf,
>                               uint32_t size)
> @@ -170,6 +278,69 @@ err:
>       return ret < 0 ? ret : -EIO;
>   }
>   
> +static int compare_chr_can_read(void *opaque)
> +{
> +    return COMPARE_READ_LEN_MAX;
> +}
> +
> +/*
> + * called from the main thread on the primary for packets
> + * arriving over the socket from the primary.
> + */
> +static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
> +{
> +    CompareState *s = COLO_COMPARE(opaque);
> +    int ret;
> +
> +    ret = net_fill_rstate(&s->pri_rs, buf, size);
> +    if (ret == -1) {
> +        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
> +        error_report("colo-compare primary_in error");
> +    }
> +}
> +
> +/*
> + * called from the main thread on the primary for packets
> + * arriving over the socket from the secondary.
> + */
> +static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
> +{
> +    CompareState *s = COLO_COMPARE(opaque);
> +    int ret;
> +
> +    ret = net_fill_rstate(&s->sec_rs, buf, size);
> +    if (ret == -1) {
> +        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
> +        error_report("colo-compare secondary_in error");
> +    }
> +}
> +
> +static void *colo_compare_thread(void *opaque)
> +{
> +    GMainContext *worker_context;
> +    GMainLoop *compare_loop;
> +    CompareState *s = opaque;
> +
> +    worker_context = g_main_context_new();
> +    g_assert(g_main_context_get_thread_default() == NULL);
> +    g_main_context_push_thread_default(worker_context);
> +    g_assert(g_main_context_get_thread_default() == worker_context);
> +
> +    qemu_chr_add_handlers(s->chr_pri_in, compare_chr_can_read,
> +                          compare_pri_chr_in, NULL, s);
> +    qemu_chr_add_handlers(s->chr_sec_in, compare_chr_can_read,
> +                          compare_sec_chr_in, NULL, s);
> +
> +    compare_loop = g_main_loop_new(worker_context, FALSE);
> +
> +    g_main_loop_run(compare_loop);
> +
> +    g_main_loop_unref(compare_loop);
> +    g_main_context_pop_thread_default(worker_context);
> +    g_main_context_unref(worker_context);
> +    return NULL;
> +}
> +
>   static char *compare_get_pri_indev(Object *obj, Error **errp)
>   {
>       CompareState *s = COLO_COMPARE(obj);
> @@ -222,6 +393,9 @@ static void compare_pri_rs_finalize(SocketReadState *pri_rs)
>       if (packet_enqueue(s, PRIMARY_IN)) {
>           trace_colo_compare_main("primary: unsupported packet in");
>           compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len);
> +    } else {
> +        /* compare connection */
> +        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
>       }
>   }
>   
> @@ -231,16 +405,35 @@ static void compare_sec_rs_finalize(SocketReadState *sec_rs)
>   
>       if (packet_enqueue(s, SECONDARY_IN)) {
>           trace_colo_compare_main("secondary: unsupported packet in");
> +    } else {
> +        /* compare connection */
> +        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
>       }
>   }
>   
>   /*
> + * Prod the compare thread regularly so it can watch for any packets
> + * that the secondary hasn't produced equivalents of.
> + */
> +static void colo_compare_regular(void *opaque)
> +{
> +    CompareState *s = opaque;
> +
> +    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
> +                        REGULAR_CHECK_MS);
> +    /* compare connection */
> +    g_queue_foreach(&s->conn_list, colo_compare_connection, s);
> +}

We need make sure this function was called from colo thread, but it 
looks not?

> +
> +/*
>    * called from the main thread on the primary
>    * to setup colo-compare.
>    */
>   static void colo_compare_complete(UserCreatable *uc, Error **errp)
>   {
>       CompareState *s = COLO_COMPARE(uc);
> +    char thread_name[64];
> +    static int compare_id;
>   
>       if (!s->pri_indev || !s->sec_indev || !s->outdev) {
>           error_setg(errp, "colo compare needs 'primary_in' ,"
> @@ -293,6 +486,19 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>                                                         g_free,
>                                                         connection_destroy);
>   
> +    s->thread_status = COMPARE_THREAD_RUNNING;
> +    sprintf(thread_name, "compare %d", compare_id);
> +    qemu_thread_create(&s->thread, thread_name,
> +                       colo_compare_thread, s,
> +                       QEMU_THREAD_JOINABLE);
> +    compare_id++;
> +
> +    /* A regular timer to kick any packets that the secondary doesn't match */
> +    s->timer = timer_new_ms(QEMU_CLOCK_VIRTUAL, /* Only when guest runs */
> +                            colo_compare_regular, s);
> +    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
> +                        REGULAR_CHECK_MS);
> +
>       return;
>   }
>   
> @@ -338,6 +544,14 @@ static void colo_compare_finalize(Object *obj)
>       qemu_mutex_destroy(&s->conn_list_lock);
>       g_queue_free(&s->conn_list);
>   
> +    if (s->thread.thread) {
> +        s->thread_status = COMPARE_THREAD_EXIT;

Looks like there's not any code that depends on the status, so why need 
to this>

> +        /* compare connection */
> +        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
> +        qemu_thread_join(&s->thread);
> +    }
> +    timer_del(s->timer);
> +
>       g_free(s->pri_indev);
>       g_free(s->sec_indev);
>       g_free(s->outdev);
> diff --git a/trace-events b/trace-events
> index 703de1a..1537e91 100644
> --- a/trace-events
> +++ b/trace-events
> @@ -1919,3 +1919,5 @@ aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To 0x%" PRIx64
>   
>   # net/colo-compare.c
>   colo_compare_main(const char *chr) ": %s"
> +colo_compare_ip_info(int psize, const char *sta, const char *stb, int ssize, const char *stc, const char *std) "ppkt size = %d, ip_src = %s, ip_dst = %s, spkt size = %d, ip_src = %s, ip_dst = %s"
> +colo_old_packet_check_found(int64_t old_time) "%" PRId64

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [RFC PATCH V5 1/4] colo-compare: introduce colo compare initialization
  2016-07-08  3:40   ` Jason Wang
@ 2016-07-08  8:21     ` Zhang Chen
  2016-07-08  9:12       ` Jason Wang
  0 siblings, 1 reply; 21+ messages in thread
From: Zhang Chen @ 2016-07-08  8:21 UTC (permalink / raw)
  To: Jason Wang, qemu devel
  Cc: Li Zhijian, Wen Congyang, zhanghailiang, eddie . dong,
	Dr . David Alan Gilbert



On 07/08/2016 11:40 AM, Jason Wang wrote:
>
>
> On 2016年06月23日 19:34, Zhang Chen wrote:
>> Packets coming from the primary char indev will be sent to outdev
>> Packets coming from the secondary char dev will be dropped
>>
>> usage:
>>
>> primary:
>> -netdev 
>> tap,id=hn0,vhost=off,script=/etc/qemu-ifup,downscript=/etc/qemu-ifdown
>> -device e1000,id=e0,netdev=hn0,mac=52:a4:00:12:78:66
>> -chardev socket,id=mirror0,host=3.3.3.3,port=9003,server,nowait
>> -chardev socket,id=compare1,host=3.3.3.3,port=9004,server,nowait
>> -chardev socket,id=compare0,host=3.3.3.3,port=9001,server,nowait
>> -chardev socket,id=compare0-0,host=3.3.3.3,port=9001
>> -chardev socket,id=compare_out,host=3.3.3.3,port=9005,server,nowait
>> -chardev socket,id=compare_out0,host=3.3.3.3,port=9005
>> -object filter-mirror,id=m0,netdev=hn0,queue=tx,outdev=mirror0
>> -object 
>> filter-redirector,netdev=hn0,id=redire0,queue=rx,indev=compare_out
>> -object filter-redirector,netdev=hn0,id=redire1,queue=rx,outdev=compare0
>> -object 
>> colo-compare,id=comp0,primary_in=compare0-0,secondary_in=compare1,outdev=compare_out0
>>
>> secondary:
>> -netdev tap,id=hn0,vhost=off,script=/etc/qemu-ifup,down 
>> script=/etc/qemu-ifdown
>> -device e1000,netdev=hn0,mac=52:a4:00:12:78:66
>> -chardev socket,id=red0,host=3.3.3.3,port=9003
>> -chardev socket,id=red1,host=3.3.3.3,port=9004
>> -object filter-redirector,id=f1,netdev=hn0,queue=tx,indev=red0
>> -object filter-redirector,id=f2,netdev=hn0,queue=rx,outdev=red1
>
> Consider we finally want a non-rfc patch, it's better to have a some 
> explanations on the above configurations since it was not easy to for 
> starters at first glance.Maybe you can use either a ascii figure or a 
> paragraph. Also need to explain the parameter of colo-compare in detail.

Make sense,I will add a ascii figure and some comments to explain it.

>
>>
>> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
>> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
>> ---
>>   net/Makefile.objs  |   1 +
>>   net/colo-compare.c | 231 
>> +++++++++++++++++++++++++++++++++++++++++++++++++++++
>>   qemu-options.hx    |  34 ++++++++
>>   vl.c               |   3 +-
>>   4 files changed, 268 insertions(+), 1 deletion(-)
>>   create mode 100644 net/colo-compare.c
>>
>> diff --git a/net/Makefile.objs b/net/Makefile.objs
>> index b7c22fd..ba92f73 100644
>> --- a/net/Makefile.objs
>> +++ b/net/Makefile.objs
>> @@ -16,3 +16,4 @@ common-obj-$(CONFIG_NETMAP) += netmap.o
>>   common-obj-y += filter.o
>>   common-obj-y += filter-buffer.o
>>   common-obj-y += filter-mirror.o
>> +common-obj-y += colo-compare.o
>> diff --git a/net/colo-compare.c b/net/colo-compare.c
>> new file mode 100644
>> index 0000000..a3e1456
>> --- /dev/null
>> +++ b/net/colo-compare.c
>> @@ -0,0 +1,231 @@
>> +/*
>> + * COarse-grain LOck-stepping Virtual Machines for Non-stop Service 
>> (COLO)
>> + * (a.k.a. Fault Tolerance or Continuous Replication)
>> + *
>> + * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
>> + * Copyright (c) 2016 FUJITSU LIMITED
>> + * Copyright (c) 2016 Intel Corporation
>> + *
>> + * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2 or
>> + * later.  See the COPYING file in the top-level directory.
>> + */
>> +
>> +#include "qemu/osdep.h"
>> +#include "qemu/error-report.h"
>> +#include "qemu-common.h"
>> +#include "qapi/qmp/qerror.h"
>> +#include "qapi/error.h"
>> +#include "net/net.h"
>> +#include "net/vhost_net.h"
>> +#include "qom/object_interfaces.h"
>> +#include "qemu/iov.h"
>> +#include "qom/object.h"
>> +#include "qemu/typedefs.h"
>> +#include "net/queue.h"
>> +#include "sysemu/char.h"
>> +#include "qemu/sockets.h"
>> +#include "qapi-visit.h"
>> +#include "trace.h"
>
> Looks like trace were not really used in the patch, you can delay the 
> inclusion until is was really used.

OK~~~

>
>> +
>> +#define TYPE_COLO_COMPARE "colo-compare"
>> +#define COLO_COMPARE(obj) \
>> +    OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
>> +
>> +#define COMPARE_READ_LEN_MAX NET_BUFSIZE
>> +
>> +static QTAILQ_HEAD(, CompareState) net_compares =
>> +       QTAILQ_HEAD_INITIALIZER(net_compares);
>
> What's the usage of this? A comment would be better.

If we need compare more than one netdev,we should use
more than one colo-compare. we do checkpoint should flush
all enqueued packet in colo-compare when work with colo-frame.
we use this queue to find all colo-compare.
So, look like no need here, I will move it to after patch.


>
>> +
>> +typedef struct CompareState {
>> +    Object parent;
>> +
>> +    char *pri_indev;
>> +    char *sec_indev;
>> +    char *outdev;
>> +    CharDriverState *chr_pri_in;
>> +    CharDriverState *chr_sec_in;
>> +    CharDriverState *chr_out;
>> +    QTAILQ_ENTRY(CompareState) next;
>> +    SocketReadState pri_rs;
>> +    SocketReadState sec_rs;
>> +} CompareState;
>> +
>> +typedef struct CompareClass {
>> +    ObjectClass parent_class;
>> +} CompareClass;
>> +
>> +static char *compare_get_pri_indev(Object *obj, Error **errp)
>> +{
>> +    CompareState *s = COLO_COMPARE(obj);
>> +
>> +    return g_strdup(s->pri_indev);
>> +}
>> +
>> +static void compare_set_pri_indev(Object *obj, const char *value, 
>> Error **errp)
>> +{
>> +    CompareState *s = COLO_COMPARE(obj);
>> +
>> +    g_free(s->pri_indev);
>> +    s->pri_indev = g_strdup(value);
>
> I think we need do something more than this, e.g release the orig dev 
> and get the new one? Or just forbid setting this property.
>

Do you means that:
qemu_chr_fe_release(s->chr_pri_in);

If yes,in here we just get/set char* pri_indev(chardev's name).
We don't get or set CharDriverState, so I think we needn't do more.


> And looks like we have similar issues for sec_indev and outdev.
>
>> +}
>> +
>> +static char *compare_get_sec_indev(Object *obj, Error **errp)
>> +{
>> +    CompareState *s = COLO_COMPARE(obj);
>> +
>> +    return g_strdup(s->sec_indev);
>> +}
>> +
>> +static void compare_set_sec_indev(Object *obj, const char *value, 
>> Error **errp)
>> +{
>> +    CompareState *s = COLO_COMPARE(obj);
>> +
>> +    g_free(s->sec_indev);
>> +    s->sec_indev = g_strdup(value);
>> +}
>> +
>> +static char *compare_get_outdev(Object *obj, Error **errp)
>> +{
>> +    CompareState *s = COLO_COMPARE(obj);
>> +
>> +    return g_strdup(s->outdev);
>> +}
>> +
>> +static void compare_set_outdev(Object *obj, const char *value, Error 
>> **errp)
>> +{
>> +    CompareState *s = COLO_COMPARE(obj);
>> +
>> +    g_free(s->outdev);
>> +    s->outdev = g_strdup(value);
>> +}
>> +
>> +static void compare_pri_rs_finalize(SocketReadState *pri_rs)
>> +{
>> +    /* if packet_enqueue pri pkt failed we will send unsupported 
>> packet */
>> +}
>> +
>> +static void compare_sec_rs_finalize(SocketReadState *sec_rs)
>> +{
>> +    /* if packet_enqueue sec pkt failed we will notify trace */
>> +}
>> +
>> +/*
>> + * called from the main thread on the primary
>> + * to setup colo-compare.
>> + */
>> +static void colo_compare_complete(UserCreatable *uc, Error **errp)
>> +{
>> +    CompareState *s = COLO_COMPARE(uc);
>> +
>> +    if (!s->pri_indev || !s->sec_indev || !s->outdev) {
>> +        error_setg(errp, "colo compare needs 'primary_in' ,"
>> +                   "'secondary_in','outdev' property set");
>> +        return;
>> +    } else if (!strcmp(s->pri_indev, s->outdev) ||
>> +               !strcmp(s->sec_indev, s->outdev) ||
>> +               !strcmp(s->pri_indev, s->sec_indev)) {
>> +        error_setg(errp, "'indev' and 'outdev' could not be same "
>> +                   "for compare module");
>> +        return;
>> +    }
>> +
>> +    s->chr_pri_in = qemu_chr_find(s->pri_indev);
>> +    if (s->chr_pri_in == NULL) {
>> +        error_setg(errp, "Primary IN Device '%s' not found",
>> +                   s->pri_indev);
>> +        return;
>> +    }
>> +
>> +    s->chr_sec_in = qemu_chr_find(s->sec_indev);
>> +    if (s->chr_sec_in == NULL) {
>> +        error_setg(errp, "Secondary IN Device '%s' not found",
>> +                   s->sec_indev);
>> +        return;
>> +    }
>> +
>> +    s->chr_out = qemu_chr_find(s->outdev);
>> +    if (s->chr_out == NULL) {
>> +        error_setg(errp, "OUT Device '%s' not found", s->outdev);
>> +        return;
>> +    }
>> +
>> +    qemu_chr_fe_claim_no_fail(s->chr_pri_in);
>> +
>> +    qemu_chr_fe_claim_no_fail(s->chr_sec_in);
>> +
>> +    qemu_chr_fe_claim_no_fail(s->chr_out);
>> +    QTAILQ_INSERT_TAIL(&net_compares, s, next);
>> +
>> +    net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize);
>> +    net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize);
>> +
>> +    return;
>> +}
>> +
>> +static void colo_compare_class_init(ObjectClass *oc, void *data)
>> +{
>> +    UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
>> +
>> +    ucc->complete = colo_compare_complete;
>> +}
>> +
>> +static void colo_compare_init(Object *obj)
>> +{
>> +    object_property_add_str(obj, "primary_in",
>> +                            compare_get_pri_indev, 
>> compare_set_pri_indev,
>> +                            NULL);
>> +    object_property_add_str(obj, "secondary_in",
>> +                            compare_get_sec_indev, 
>> compare_set_sec_indev,
>> +                            NULL);
>> +    object_property_add_str(obj, "outdev",
>> +                            compare_get_outdev, compare_set_outdev,
>> +                            NULL);
>> +}
>> +
>> +static void colo_compare_finalize(Object *obj)
>> +{
>> +    CompareState *s = COLO_COMPARE(obj);
>> +
>> +    if (s->chr_pri_in) {
>> +        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
>
> Why need do this?

more safty before do qemu_chr_fe_release() for chardev
like backends/rng-egd.c

>
>> + qemu_chr_fe_release(s->chr_pri_in);
>> +    }
>> +    if (s->chr_sec_in) {
>> +        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
>> +        qemu_chr_fe_release(s->chr_sec_in);
>> +    }
>> +    if (s->chr_out) {
>> +        qemu_chr_fe_release(s->chr_out);
>> +    }
>> +
>> +    if (!QTAILQ_EMPTY(&net_compares)) {
>> +        QTAILQ_REMOVE(&net_compares, s, next);
>> +    }
>> +
>> +    g_free(s->pri_indev);
>> +    g_free(s->sec_indev);
>> +    g_free(s->outdev);
>> +}
>> +
>> +static const TypeInfo colo_compare_info = {
>> +    .name = TYPE_COLO_COMPARE,
>> +    .parent = TYPE_OBJECT,
>> +    .instance_size = sizeof(CompareState),
>> +    .instance_init = colo_compare_init,
>> +    .instance_finalize = colo_compare_finalize,
>> +    .class_size = sizeof(CompareClass),
>> +    .class_init = colo_compare_class_init,
>> +    .interfaces = (InterfaceInfo[]) {
>> +        { TYPE_USER_CREATABLE },
>> +        { }
>> +    }
>> +};
>> +
>> +static void register_types(void)
>> +{
>> +    type_register_static(&colo_compare_info);
>> +}
>> +
>> +type_init(register_types);
>> diff --git a/qemu-options.hx b/qemu-options.hx
>> index 587de8f..14bade5 100644
>> --- a/qemu-options.hx
>> +++ b/qemu-options.hx
>> @@ -3866,6 +3866,40 @@ Dump the network traffic on netdev @var{dev} 
>> to the file specified by
>>   The file format is libpcap, so it can be analyzed with tools such 
>> as tcpdump
>>   or Wireshark.
>>   +@item -object 
>> colo-compare,id=@var{id},primary_in=@var{chardevid},secondary_in=@var{chardevid},
>> +outdev=@var{chardevid}
>> +
>> +Colo-compare gets packet from primary_in@var{chardevid} and 
>> secondary_in@var{chardevid},
>> +and outputs to outdev@var{chardevid}, we can use it with the help of 
>> filter-mirror and filter-redirector.
>
> Need a better organization of the above description. E.g it does not 
> even mention any comparing.

OK~ I will fix this in next version.

Thanks
Zhang Chen

>
>> +
>> +The simple usage:
>> +
>> +@example
>> +
>> +primary:
>> +-netdev 
>> tap,id=hn0,vhost=off,script=/etc/qemu-ifup,downscript=/etc/qemu-ifdown
>> +-device e1000,id=e0,netdev=hn0,mac=52:a4:00:12:78:66
>> +-chardev socket,id=mirror0,host=3.3.3.3,port=9003,server,nowait
>> +-chardev socket,id=compare1,host=3.3.3.3,port=9004,server,nowait
>> +-chardev socket,id=compare0,host=3.3.3.3,port=9001,server,nowait
>> +-chardev socket,id=compare0-0,host=3.3.3.3,port=9001
>> +-chardev socket,id=compare_out,host=3.3.3.3,port=9005,server,nowait
>> +-chardev socket,id=compare_out0,host=3.3.3.3,port=9005
>> +-object filter-mirror,id=m0,netdev=hn0,queue=tx,outdev=mirror0
>> +-object 
>> filter-redirector,netdev=hn0,id=redire0,queue=rx,indev=compare_out
>> +-object 
>> filter-redirector,netdev=hn0,id=redire1,queue=rx,outdev=compare0
>> +-object 
>> colo-compare,id=comp0,primary_in=compare0-0,secondary_in=compare1,outdev=compare_out0
>> +
>> +secondary:
>> +-netdev tap,id=hn0,vhost=off,script=/etc/qemu-ifup,down 
>> script=/etc/qemu-ifdown
>> +-device e1000,netdev=hn0,mac=52:a4:00:12:78:66
>> +-chardev socket,id=red0,host=3.3.3.3,port=9003
>> +-chardev socket,id=red1,host=3.3.3.3,port=9004
>> +-object filter-redirector,id=f1,netdev=hn0,queue=tx,indev=red0
>> +-object filter-redirector,id=f2,netdev=hn0,queue=rx,outdev=red1
>> +
>> +@end example
>> +
>>   @item -object 
>> secret,id=@var{id},data=@var{string},format=@var{raw|base64}[,keyid=@var{secretid},iv=@var{string}]
>>   @item -object 
>> secret,id=@var{id},file=@var{filename},format=@var{raw|base64}[,keyid=@var{secretid},iv=@var{string}]
>>   diff --git a/vl.c b/vl.c
>> index cbe51ac..c6b9a6f 100644
>> --- a/vl.c
>> +++ b/vl.c
>> @@ -2865,7 +2865,8 @@ static bool object_create_initial(const char 
>> *type)
>>       if (g_str_equal(type, "filter-buffer") ||
>>           g_str_equal(type, "filter-dump") ||
>>           g_str_equal(type, "filter-mirror") ||
>> -        g_str_equal(type, "filter-redirector")) {
>> +        g_str_equal(type, "filter-redirector") ||
>> +        g_str_equal(type, "colo-compare")) {
>>           return false;
>>       }
>
>
>
> .
>

-- 
Thanks
zhangchen

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [RFC PATCH V5 4/4] colo-compare: add TCP, UDP, ICMP packet comparison
  2016-06-23 11:34 ` [Qemu-devel] [RFC PATCH V5 4/4] colo-compare: add TCP, UDP, ICMP packet comparison Zhang Chen
@ 2016-07-08  8:59   ` Jason Wang
  2016-07-11 10:02     ` Zhang Chen
  0 siblings, 1 reply; 21+ messages in thread
From: Jason Wang @ 2016-07-08  8:59 UTC (permalink / raw)
  To: Zhang Chen, qemu devel
  Cc: Li Zhijian, eddie . dong, Dr . David Alan Gilbert, zhanghailiang



On 2016年06月23日 19:34, Zhang Chen wrote:
> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
> ---
>   net/colo-compare.c | 171 +++++++++++++++++++++++++++++++++++++++++++++++++++--
>   trace-events       |   6 ++
>   2 files changed, 173 insertions(+), 4 deletions(-)

Commit log please.

> diff --git a/net/colo-compare.c b/net/colo-compare.c
> index 928d729..addf704 100644
> --- a/net/colo-compare.c
> +++ b/net/colo-compare.c
> @@ -18,6 +18,7 @@
>   #include "qapi/qmp/qerror.h"
>   #include "qapi/error.h"
>   #include "net/net.h"
> +#include "net/eth.h"
>   #include "net/vhost_net.h"
>   #include "qom/object_interfaces.h"
>   #include "qemu/iov.h"
> @@ -180,9 +181,155 @@ static int colo_packet_compare(Packet *ppkt, Packet *spkt)
>       }
>   }
>   
> -static int colo_packet_compare_all(Packet *spkt, Packet *ppkt)
> +/*
> + * called from the compare thread on the primary
> + * for compare tcp packet
> + * compare_tcp copied from Dr. David Alan Gilbert's branch
> + */
> +static int colo_packet_compare_tcp(Packet *spkt, Packet *ppkt)
> +{
> +    struct tcphdr *ptcp, *stcp;
> +    int res;
> +    char *sdebug, *ddebug;
> +
> +    trace_colo_compare_main("compare tcp");
> +    if (ppkt->size != spkt->size) {
> +        if (trace_event_get_state(TRACE_COLO_COMPARE_MISCOMPARE)) {
> +            trace_colo_compare_main("pkt size not same");
> +        }
> +        return -1;
> +    }
> +
> +    ptcp = (struct tcphdr *)ppkt->transport_layer;
> +    stcp = (struct tcphdr *)spkt->transport_layer;
> +
> +    if (ptcp->th_seq != stcp->th_seq) {
> +        if (trace_event_get_state(TRACE_COLO_COMPARE_MISCOMPARE)) {
> +            trace_colo_compare_main("pkt tcp seq not same");
> +        }
> +        return -1;
> +    }
> +
> +    /*
> +     * The 'identification' field in the IP header is *very* random
> +     * it almost never matches.  Fudge this by ignoring differences in
> +     * unfragmented packets; they'll normally sort themselves out if different
> +     * anyway, and it should recover at the TCP level.
> +     * An alternative would be to get both the primary and secondary to rewrite
> +     * somehow; but that would need some sync traffic to sync the state
> +     */
> +    if (ntohs(ppkt->ip->ip_off) & IP_DF) {
> +        spkt->ip->ip_id = ppkt->ip->ip_id;
> +        /* and the sum will be different if the IDs were different */
> +        spkt->ip->ip_sum = ppkt->ip->ip_sum;
> +    }

I was considering, can we do some trick in rewriter instead of here?

> +
> +    res = memcmp(ppkt->data + ETH_HLEN, spkt->data + ETH_HLEN,
> +                (spkt->size - ETH_HLEN));
> +
> +    if (res != 0 && trace_event_get_state(TRACE_COLO_COMPARE_MISCOMPARE)) {
> +        sdebug = strdup(inet_ntoa(ppkt->ip->ip_src));
> +        ddebug = strdup(inet_ntoa(ppkt->ip->ip_dst));
> +        fprintf(stderr, "%s: src/dst: %s/%s p: seq/ack=%u/%u"
> +        " s: seq/ack=%u/%u res=%d flags=%x/%x\n", __func__,
> +                   sdebug, ddebug,
> +                   ntohl(ptcp->th_seq), ntohl(ptcp->th_ack),
> +                   ntohl(stcp->th_seq), ntohl(stcp->th_ack),
> +                   res, ptcp->th_flags, stcp->th_flags);
> +
> +        trace_colo_compare_tcp_miscompare("Primary len", ppkt->size);
> +        qemu_hexdump((char *)ppkt->data, stderr, "colo-compare", ppkt->size);
> +        trace_colo_compare_tcp_miscompare("Secondary len", spkt->size);
> +        qemu_hexdump((char *)spkt->data, stderr, "colo-compare", spkt->size);
> +
> +        g_free(sdebug);
> +        g_free(ddebug);
> +    }
> +
> +    return res;
> +}
> +
> +/*
> + * called from the compare thread on the primary
> + * for compare udp packet
> + */
> +static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
> +{
> +    int ret;
> +
> +    trace_colo_compare_main("compare udp");
> +    ret = colo_packet_compare(ppkt, spkt);
> +
> +    if (ret) {
> +        trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
> +        qemu_hexdump((char *)ppkt->data, stderr, "colo-compare", ppkt->size);
> +        trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
> +        qemu_hexdump((char *)spkt->data, stderr, "colo-compare", spkt->size);
> +    }
> +
> +    return ret;
> +}
> +
> +/*
> + * called from the compare thread on the primary
> + * for compare icmp packet
> + */
> +static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
> +{
> +    int network_length;
> +    struct icmp *icmp_ppkt, *icmp_spkt;
> +
> +    trace_colo_compare_main("compare icmp");
> +    network_length = ppkt->ip->ip_hl * 4;
> +    if (ppkt->size != spkt->size ||
> +        ppkt->size < network_length + ETH_HLEN) {
> +        trace_colo_compare_icmp_miscompare_size(ppkt->size, spkt->size);
> +        return -1;
> +    }
> +    icmp_ppkt = (struct icmp *)(ppkt->data + network_length + ETH_HLEN);
> +    icmp_spkt = (struct icmp *)(spkt->data + network_length + ETH_HLEN);
> +
> +    if ((icmp_ppkt->icmp_type == icmp_spkt->icmp_type) &&
> +        (icmp_ppkt->icmp_code == icmp_spkt->icmp_code)) {
> +        if (icmp_ppkt->icmp_type == ICMP_REDIRECT) {
> +            if (icmp_ppkt->icmp_gwaddr.s_addr !=
> +                icmp_spkt->icmp_gwaddr.s_addr) {
> +                trace_colo_compare_main("icmp_gwaddr.s_addr not same");
> +                trace_colo_compare_icmp_miscompare_addr("ppkt s_addr",
> +                        inet_ntoa(icmp_ppkt->icmp_gwaddr));
> +                trace_colo_compare_icmp_miscompare_addr("spkt s_addr",
> +                        inet_ntoa(icmp_spkt->icmp_gwaddr));
> +                return -1;
> +            }
> +        } else if ((icmp_ppkt->icmp_type == ICMP_UNREACH) &&
> +                   (icmp_ppkt->icmp_type == ICMP_UNREACH_NEEDFRAG)) {
> +            if (icmp_ppkt->icmp_nextmtu != icmp_spkt->icmp_nextmtu) {
> +                trace_colo_compare_main("icmp_nextmtu not same");
> +                trace_colo_compare_icmp_miscompare_mtu("ppkt nextmtu",
> +                                             icmp_ppkt->icmp_nextmtu);
> +                trace_colo_compare_icmp_miscompare_mtu("spkt nextmtu",
> +                                             icmp_spkt->icmp_nextmtu);
> +                return -1;
> +            }
> +        }
> +    } else {
> +        return -1;
> +    }

Why only compare part of icmp packet?

> +
> +    return 0;
> +}
> +
> +/*
> + * called from the compare thread on the primary
> + * for compare other packet
> + */
> +static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
>   {
> -    trace_colo_compare_main("compare all");
> +    trace_colo_compare_main("compare other");
> +    trace_colo_compare_ip_info(ppkt->size, inet_ntoa(ppkt->ip->ip_src),
> +                               inet_ntoa(ppkt->ip->ip_dst), spkt->size,
> +                               inet_ntoa(spkt->ip->ip_src),
> +                               inet_ntoa(spkt->ip->ip_dst));
>       return colo_packet_compare(ppkt, spkt);
>   }
>   
> @@ -221,8 +368,24 @@ static void colo_compare_connection(void *opaque, void *user_data)
>       while (!g_queue_is_empty(&conn->primary_list) &&
>              !g_queue_is_empty(&conn->secondary_list)) {
>           pkt = g_queue_pop_tail(&conn->primary_list);
> -        result = g_queue_find_custom(&conn->secondary_list,
> -                              pkt, (GCompareFunc)colo_packet_compare_all);
> +        switch (conn->ip_proto) {
> +        case IPPROTO_TCP:
> +            result = g_queue_find_custom(&conn->secondary_list,
> +                     pkt, (GCompareFunc)colo_packet_compare_tcp);
> +            break;
> +        case IPPROTO_UDP:
> +            result = g_queue_find_custom(&conn->secondary_list,
> +                     pkt, (GCompareFunc)colo_packet_compare_udp);
> +            break;
> +        case IPPROTO_ICMP:
> +            result = g_queue_find_custom(&conn->secondary_list,
> +                     pkt, (GCompareFunc)colo_packet_compare_icmp);
> +            break;
> +        default:
> +            result = g_queue_find_custom(&conn->secondary_list,
> +                     pkt, (GCompareFunc)colo_packet_compare_other);
> +            break;
> +        }
>   
>           if (result) {
>               ret = compare_chr_send(s->chr_out, pkt->data, pkt->size);
> diff --git a/trace-events b/trace-events
> index 1537e91..6686cdf 100644
> --- a/trace-events
> +++ b/trace-events
> @@ -1919,5 +1919,11 @@ aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To 0x%" PRIx64
>   
>   # net/colo-compare.c
>   colo_compare_main(const char *chr) ": %s"
> +colo_compare_tcp_miscompare(const char *sta, int size) ": %s = %d"
> +colo_compare_udp_miscompare(const char *sta, int size) ": %s = %d"
> +colo_compare_icmp_miscompare_size(int psize, int ssize) ":ppkt size = %d spkt size = %d"
> +colo_compare_icmp_miscompare_addr(const char *sta, const char *stb) ": %s  %s"
> +colo_compare_icmp_miscompare_mtu(const char *sta, int size) ": %s  %d"
>   colo_compare_ip_info(int psize, const char *sta, const char *stb, int ssize, const char *stc, const char *std) "ppkt size = %d, ip_src = %s, ip_dst = %s, spkt size = %d, ip_src = %s, ip_dst = %s"
>   colo_old_packet_check_found(int64_t old_time) "%" PRId64
> +colo_compare_miscompare(void) ""

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [RFC PATCH V5 1/4] colo-compare: introduce colo compare initialization
  2016-07-08  8:21     ` Zhang Chen
@ 2016-07-08  9:12       ` Jason Wang
  2016-07-11  5:14         ` Zhang Chen
  0 siblings, 1 reply; 21+ messages in thread
From: Jason Wang @ 2016-07-08  9:12 UTC (permalink / raw)
  To: Zhang Chen, qemu devel
  Cc: Dr . David Alan Gilbert, eddie . dong, Li Zhijian, zhanghailiang



On 2016年07月08日 16:21, Zhang Chen wrote:
>
>
> On 07/08/2016 11:40 AM, Jason Wang wrote:
>>
>>
>> On 2016年06月23日 19:34, Zhang Chen wrote:
>>> Packets coming from the primary char indev will be sent to outdev
>>> Packets coming from the secondary char dev will be dropped
>>>
>>> usage:
>>>
>>> primary:
>>> -netdev 
>>> tap,id=hn0,vhost=off,script=/etc/qemu-ifup,downscript=/etc/qemu-ifdown
>>> -device e1000,id=e0,netdev=hn0,mac=52:a4:00:12:78:66
>>> -chardev socket,id=mirror0,host=3.3.3.3,port=9003,server,nowait
>>> -chardev socket,id=compare1,host=3.3.3.3,port=9004,server,nowait
>>> -chardev socket,id=compare0,host=3.3.3.3,port=9001,server,nowait
>>> -chardev socket,id=compare0-0,host=3.3.3.3,port=9001
>>> -chardev socket,id=compare_out,host=3.3.3.3,port=9005,server,nowait
>>> -chardev socket,id=compare_out0,host=3.3.3.3,port=9005
>>> -object filter-mirror,id=m0,netdev=hn0,queue=tx,outdev=mirror0
>>> -object 
>>> filter-redirector,netdev=hn0,id=redire0,queue=rx,indev=compare_out
>>> -object 
>>> filter-redirector,netdev=hn0,id=redire1,queue=rx,outdev=compare0
>>> -object 
>>> colo-compare,id=comp0,primary_in=compare0-0,secondary_in=compare1,outdev=compare_out0
>>>
>>> secondary:
>>> -netdev tap,id=hn0,vhost=off,script=/etc/qemu-ifup,down 
>>> script=/etc/qemu-ifdown
>>> -device e1000,netdev=hn0,mac=52:a4:00:12:78:66
>>> -chardev socket,id=red0,host=3.3.3.3,port=9003
>>> -chardev socket,id=red1,host=3.3.3.3,port=9004
>>> -object filter-redirector,id=f1,netdev=hn0,queue=tx,indev=red0
>>> -object filter-redirector,id=f2,netdev=hn0,queue=rx,outdev=red1
>>
>> Consider we finally want a non-rfc patch, it's better to have a some 
>> explanations on the above configurations since it was not easy to for 
>> starters at first glance.Maybe you can use either a ascii figure or a 
>> paragraph. Also need to explain the parameter of colo-compare in detail.
>
> Make sense,I will add a ascii figure and some comments to explain it.
>
>>
>>>
>>> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>>> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
>>> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
>>> ---
>>>   net/Makefile.objs  |   1 +
>>>   net/colo-compare.c | 231 
>>> +++++++++++++++++++++++++++++++++++++++++++++++++++++
>>>   qemu-options.hx    |  34 ++++++++
>>>   vl.c               |   3 +-
>>>   4 files changed, 268 insertions(+), 1 deletion(-)
>>>   create mode 100644 net/colo-compare.c
>>>
>>> diff --git a/net/Makefile.objs b/net/Makefile.objs
>>> index b7c22fd..ba92f73 100644
>>> --- a/net/Makefile.objs
>>> +++ b/net/Makefile.objs
>>> @@ -16,3 +16,4 @@ common-obj-$(CONFIG_NETMAP) += netmap.o
>>>   common-obj-y += filter.o
>>>   common-obj-y += filter-buffer.o
>>>   common-obj-y += filter-mirror.o
>>> +common-obj-y += colo-compare.o
>>> diff --git a/net/colo-compare.c b/net/colo-compare.c
>>> new file mode 100644
>>> index 0000000..a3e1456
>>> --- /dev/null
>>> +++ b/net/colo-compare.c
>>> @@ -0,0 +1,231 @@
>>> +/*
>>> + * COarse-grain LOck-stepping Virtual Machines for Non-stop Service 
>>> (COLO)
>>> + * (a.k.a. Fault Tolerance or Continuous Replication)
>>> + *
>>> + * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
>>> + * Copyright (c) 2016 FUJITSU LIMITED
>>> + * Copyright (c) 2016 Intel Corporation
>>> + *
>>> + * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>>> + *
>>> + * This work is licensed under the terms of the GNU GPL, version 2 or
>>> + * later.  See the COPYING file in the top-level directory.
>>> + */
>>> +
>>> +#include "qemu/osdep.h"
>>> +#include "qemu/error-report.h"
>>> +#include "qemu-common.h"
>>> +#include "qapi/qmp/qerror.h"
>>> +#include "qapi/error.h"
>>> +#include "net/net.h"
>>> +#include "net/vhost_net.h"
>>> +#include "qom/object_interfaces.h"
>>> +#include "qemu/iov.h"
>>> +#include "qom/object.h"
>>> +#include "qemu/typedefs.h"
>>> +#include "net/queue.h"
>>> +#include "sysemu/char.h"
>>> +#include "qemu/sockets.h"
>>> +#include "qapi-visit.h"
>>> +#include "trace.h"
>>
>> Looks like trace were not really used in the patch, you can delay the 
>> inclusion until is was really used.
>
> OK~~~
>
>>
>>> +
>>> +#define TYPE_COLO_COMPARE "colo-compare"
>>> +#define COLO_COMPARE(obj) \
>>> +    OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
>>> +
>>> +#define COMPARE_READ_LEN_MAX NET_BUFSIZE
>>> +
>>> +static QTAILQ_HEAD(, CompareState) net_compares =
>>> +       QTAILQ_HEAD_INITIALIZER(net_compares);
>>
>> What's the usage of this? A comment would be better.
>
> If we need compare more than one netdev,we should use
> more than one colo-compare. we do checkpoint should flush
> all enqueued packet in colo-compare when work with colo-frame.
> we use this queue to find all colo-compare.
> So, look like no need here, I will move it to after patch.

Yes unless you want a single colo comparing threads to do comparing for 
all netdevs. (But I agree, looks not need).

>
>
>>
>>> +
>>> +typedef struct CompareState {
>>> +    Object parent;
>>> +
>>> +    char *pri_indev;
>>> +    char *sec_indev;
>>> +    char *outdev;
>>> +    CharDriverState *chr_pri_in;
>>> +    CharDriverState *chr_sec_in;
>>> +    CharDriverState *chr_out;
>>> +    QTAILQ_ENTRY(CompareState) next;
>>> +    SocketReadState pri_rs;
>>> +    SocketReadState sec_rs;
>>> +} CompareState;
>>> +
>>> +typedef struct CompareClass {
>>> +    ObjectClass parent_class;
>>> +} CompareClass;
>>> +
>>> +static char *compare_get_pri_indev(Object *obj, Error **errp)
>>> +{
>>> +    CompareState *s = COLO_COMPARE(obj);
>>> +
>>> +    return g_strdup(s->pri_indev);
>>> +}
>>> +
>>> +static void compare_set_pri_indev(Object *obj, const char *value, 
>>> Error **errp)
>>> +{
>>> +    CompareState *s = COLO_COMPARE(obj);
>>> +
>>> +    g_free(s->pri_indev);
>>> +    s->pri_indev = g_strdup(value);
>>
>> I think we need do something more than this, e.g release the orig dev 
>> and get the new one? Or just forbid setting this property.
>>
>
> Do you means that:
> qemu_chr_fe_release(s->chr_pri_in);
>
> If yes,in here we just get/set char* pri_indev(chardev's name).
> We don't get or set CharDriverState, so I think we needn't do more.

Maybe I miss something, but is there any usage for just changing 
chardev's name here?

>
>
>> And looks like we have similar issues for sec_indev and outdev.
>>
>>> +}
>>> +
>>>

[...]

>>> +
>>> +static void colo_compare_finalize(Object *obj)
>>> +{
>>> +    CompareState *s = COLO_COMPARE(obj);
>>> +
>>> +    if (s->chr_pri_in) {
>>> +        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
>>
>> Why need do this?
>
> more safty before do qemu_chr_fe_release() for chardev
> like backends/rng-egd.c

Ok, but looks like we don't set any handlers in the patch, so I don't 
get why need to clear it.

For the things of eng-egd, I think we need fail if it was not a socket 
chardev. Vhost-use did similar check in net_vhost_chardev_opts() which 
maybe useful for here (we probably need this for mirror/redirector too).

[...]

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [RFC PATCH V5 2/4] colo-compare: track connection and enqueue packet
  2016-07-08  4:07   ` Jason Wang
@ 2016-07-08  9:56     ` Zhang Chen
  2016-07-11  5:41       ` Jason Wang
  0 siblings, 1 reply; 21+ messages in thread
From: Zhang Chen @ 2016-07-08  9:56 UTC (permalink / raw)
  To: Jason Wang, qemu devel
  Cc: Li Zhijian, Wen Congyang, zhanghailiang, eddie . dong,
	Dr . David Alan Gilbert



On 07/08/2016 12:07 PM, Jason Wang wrote:
>
>
> On 2016年06月23日 19:34, Zhang Chen wrote:
>> In this patch we use kernel jhash table to track
>> connection, and then enqueue net packet like this:
>>
>> + CompareState ++
>> |               |
>> +---------------+   +---------------+         +---------------+
>> |conn list      +--->conn +--------->conn           |
>> +---------------+   +---------------+         +---------------+
>> |               |     |           |             |          |
>> +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
>>                    |primary |  |secondary    |primary | |secondary
>>                    |packet  |  |packet  +    |packet  | |packet +
>>                    +--------+  +--------+    +--------+ +--------+
>>                        |           |             |          |
>>                    +---v----+  +---v----+    +---v----+ +---v----+
>>                    |primary |  |secondary    |primary | |secondary
>>                    |packet  |  |packet  +    |packet  | |packet +
>>                    +--------+  +--------+    +--------+ +--------+
>>                        |           |             |          |
>>                    +---v----+  +---v----+    +---v----+ +---v----+
>>                    |primary |  |secondary    |primary | |secondary
>>                    |packet  |  |packet  +    |packet  | |packet +
>>                    +--------+  +--------+    +--------+ +--------+
>
> A paragraph to describe the above would be more than welcomed.

I will add some comments for it.

>
>> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
>> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
>> ---
>>   include/qemu/jhash.h |  61 ++++++++++++++++
>>   net/Makefile.objs    |   1 +
>>   net/colo-base.c      | 194 
>> +++++++++++++++++++++++++++++++++++++++++++++++++++
>>   net/colo-base.h      |  88 +++++++++++++++++++++++
>>   net/colo-compare.c   | 138 +++++++++++++++++++++++++++++++++++-
>>   trace-events         |   3 +
>>   6 files changed, 483 insertions(+), 2 deletions(-)
>>   create mode 100644 include/qemu/jhash.h
>>   create mode 100644 net/colo-base.c
>>   create mode 100644 net/colo-base.h
>>
>> diff --git a/include/qemu/jhash.h b/include/qemu/jhash.h
>> new file mode 100644
>> index 0000000..0fcd875
>> --- /dev/null
>> +++ b/include/qemu/jhash.h
>> @@ -0,0 +1,61 @@
>> +/* jhash.h: Jenkins hash support.
>> +  *
>> +  * Copyright (C) 2006. Bob Jenkins (bob_jenkins@burtleburtle.net)
>> +  *
>> +  * http://burtleburtle.net/bob/hash/
>> +  *
>> +  * These are the credits from Bob's sources:
>> +  *
>> +  * lookup3.c, by Bob Jenkins, May 2006, Public Domain.
>> +  *
>> +  * These are functions for producing 32-bit hashes for hash table 
>> lookup.
>> +  * hashword(), hashlittle(), hashlittle2(), hashbig(), mix(), and 
>> final()
>> +  * are externally useful functions.  Routines to test the hash are
>> +included
>> +  * if SELF_TEST is defined.  You can use this free for any purpose.
>> +It's in
>> +  * the public domain.  It has no warranty.
>> +  *
>> +  * Copyright (C) 2009-2010 Jozsef Kadlecsik (kadlec@blackhole.kfki.hu)
>> +  *
>> +  * I've modified Bob's hash to be useful in the Linux kernel, and
>> +  * any bugs present are my fault.
>> +  * Jozsef
>> +  */
>> +
>> +#ifndef QEMU_JHASH_H__
>> +#define QEMU_JHASH_H__
>> +
>> +#include "qemu/bitops.h"
>> +
>> +/*
>> + * hashtable relation copy from linux kernel jhash
>> + */
>> +
>> +/* __jhash_mix -- mix 3 32-bit values reversibly. */
>> +#define __jhash_mix(a, b, c)                \
>> +{                                           \
>> +    a -= c;  a ^= rol32(c, 4);  c += b;     \
>> +    b -= a;  b ^= rol32(a, 6);  a += c;     \
>> +    c -= b;  c ^= rol32(b, 8);  b += a;     \
>> +    a -= c;  a ^= rol32(c, 16); c += b;     \
>> +    b -= a;  b ^= rol32(a, 19); a += c;     \
>> +    c -= b;  c ^= rol32(b, 4);  b += a;     \
>> +}
>> +
>> +/* __jhash_final - final mixing of 3 32-bit values (a,b,c) into c */
>> +#define __jhash_final(a, b, c)  \
>> +{                               \
>> +    c ^= b; c -= rol32(b, 14);  \
>> +    a ^= c; a -= rol32(c, 11);  \
>> +    b ^= a; b -= rol32(a, 25);  \
>> +    c ^= b; c -= rol32(b, 16);  \
>> +    a ^= c; a -= rol32(c, 4);   \
>> +    b ^= a; b -= rol32(a, 14);  \
>> +    c ^= b; c -= rol32(b, 24);  \
>> +}
>> +
>> +/* An arbitrary initial parameter */
>> +#define JHASH_INITVAL           0xdeadbeef
>> +
>> +#endif /* QEMU_JHASH_H__ */
>
> Please split jhash into another patch.

Split to a independent patch in this patch set or not?


>
>> diff --git a/net/Makefile.objs b/net/Makefile.objs
>> index ba92f73..119589f 100644
>> --- a/net/Makefile.objs
>> +++ b/net/Makefile.objs
>> @@ -17,3 +17,4 @@ common-obj-y += filter.o
>>   common-obj-y += filter-buffer.o
>>   common-obj-y += filter-mirror.o
>>   common-obj-y += colo-compare.o
>> +common-obj-y += colo-base.o
>> diff --git a/net/colo-base.c b/net/colo-base.c
>> new file mode 100644
>> index 0000000..7e263e8
>> --- /dev/null
>> +++ b/net/colo-base.c
>> @@ -0,0 +1,194 @@
>> +/*
>> + * COarse-grain LOck-stepping Virtual Machines for Non-stop Service 
>> (COLO)
>> + * (a.k.a. Fault Tolerance or Continuous Replication)
>> + *
>> + * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
>> + * Copyright (c) 2016 FUJITSU LIMITED
>> + * Copyright (c) 2016 Intel Corporation
>> + *
>> + * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2 or
>> + * later.  See the COPYING file in the top-level directory.
>> + */
>> +
>> +#include "qemu/osdep.h"
>> +#include "qemu/error-report.h"
>> +#include "net/colo-base.h"
>> +
>> +uint32_t connection_key_hash(const void *opaque)
>> +{
>> +    const ConnectionKey *key = opaque;
>> +    uint32_t a, b, c;
>> +
>> +    /* Jenkins hash */
>> +    a = b = c = JHASH_INITVAL + sizeof(*key);
>> +    a += key->src.s_addr;
>> +    b += key->dst.s_addr;
>> +    c += (key->src_port | key->dst_port << 16);
>> +    __jhash_mix(a, b, c);
>> +
>> +    a += key->ip_proto;
>> +    __jhash_final(a, b, c);
>> +
>> +    return c;
>> +}
>> +
>> +int connection_key_equal(const void *key1, const void *key2)
>> +{
>> +    return memcmp(key1, key2, sizeof(ConnectionKey)) == 0;
>> +}
>> +
>> +int parse_packet_early(Packet *pkt)
>> +{
>> +    int network_length;
>> +    uint8_t *data = pkt->data;
>> +    uint16_t l3_proto;
>> +    ssize_t l2hdr_len = eth_get_l2_hdr_length(data);
>> +
>> +    if (pkt->size < ETH_HLEN) {
>> +        error_report("pkt->size < ETH_HLEN");
>> +        return 1;
>> +    }
>> +    pkt->network_layer = data + ETH_HLEN;
>> +    l3_proto = eth_get_l3_proto(data, l2hdr_len);
>> +    if (l3_proto != ETH_P_IP) {
>> +        return 1;
>> +    }
>> +
>> +    network_length = pkt->ip->ip_hl * 4;
>> +    if (pkt->size < ETH_HLEN + network_length) {
>> +        error_report("pkt->size < network_layer + network_length");
>> +        return 1;
>> +    }
>> +    pkt->transport_layer = pkt->network_layer + network_length;
>> +    if (!pkt->transport_layer) {
>> +        error_report("pkt->transport_layer is valid");
>> +        return 1;
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +void fill_connection_key(Packet *pkt, ConnectionKey *key, int mode)
>> +{
>> +    uint32_t tmp_ports;
>> +
>> +    key->ip_proto = pkt->ip->ip_p;
>> +
>> +    switch (key->ip_proto) {
>> +    case IPPROTO_TCP:
>> +    case IPPROTO_UDP:
>> +    case IPPROTO_DCCP:
>> +    case IPPROTO_ESP:
>> +    case IPPROTO_SCTP:
>> +    case IPPROTO_UDPLITE:
>> +        tmp_ports = *(uint32_t *)(pkt->transport_layer);
>> +        if (mode) {
>
> Looks like mode is unnecessary here, you can actually compare and swap 
> duing hashing to avoid mode here.

I get your point.

>
>> +            key->src = pkt->ip->ip_src;
>> +            key->dst = pkt->ip->ip_dst;
>> +            key->src_port = ntohs(tmp_ports & 0xffff);
>> +            key->dst_port = ntohs(tmp_ports >> 16);
>> +        } else {
>> +            key->dst = pkt->ip->ip_src;
>> +            key->src = pkt->ip->ip_dst;
>> +            key->dst_port = ntohs(tmp_ports & 0xffff);
>> +            key->src_port = ntohs(tmp_ports >> 16);
>> +        }
>> +        break;
>> +    case IPPROTO_AH:
>> +        tmp_ports = *(uint32_t *)(pkt->transport_layer + 4);
>> +        if (mode) {
>> +            key->src = pkt->ip->ip_src;
>> +            key->dst = pkt->ip->ip_dst;
>> +            key->src_port = ntohs(tmp_ports & 0xffff);
>> +            key->dst_port = ntohs(tmp_ports >> 16);
>> +        } else {
>> +            key->dst = pkt->ip->ip_src;
>> +            key->src = pkt->ip->ip_dst;
>> +            key->dst_port = ntohs(tmp_ports & 0xffff);
>> +            key->src_port = ntohs(tmp_ports >> 16);
>> +        }
>> +        break;
>> +    default:
>> +        key->src_port = 0;
>> +        key->dst_port = 0;
>> +        break;
>> +    }
>> +}
>
> This seems could be reused, please use a independent patch for 
> connection key stuffs.

In this patch set or not?
If not, we make a new .c and .h for this?

>
>> +
>> +Connection *connection_new(ConnectionKey *key)
>> +{
>> +    Connection *conn = g_slice_new(Connection);
>> +
>> +    conn->ip_proto = key->ip_proto;
>> +    conn->processing = false;
>> +    g_queue_init(&conn->primary_list);
>> +    g_queue_init(&conn->secondary_list);
>> +
>> +    return conn;
>> +}
>> +
>> +void connection_destroy(void *opaque)
>> +{
>> +    Connection *conn = opaque;
>> +
>> +    g_queue_foreach(&conn->primary_list, packet_destroy, NULL);
>> +    g_queue_free(&conn->primary_list);
>> +    g_queue_foreach(&conn->secondary_list, packet_destroy, NULL);
>> +    g_queue_free(&conn->secondary_list);
>> +    g_slice_free(Connection, conn);
>> +}
>> +
>> +Packet *packet_new(const void *data, int size)
>> +{
>> +    Packet *pkt = g_slice_new(Packet);
>> +
>> +    pkt->data = g_memdup(data, size);
>> +    pkt->size = size;
>> +
>> +    return pkt;
>> +}
>> +
>> +void packet_destroy(void *opaque, void *user_data)
>> +{
>> +    Packet *pkt = opaque;
>> +
>> +    g_free(pkt->data);
>> +    g_slice_free(Packet, pkt);
>> +}
>> +
>> +/*
>> + * Clear hashtable, stop this hash growing really huge
>> + */
>> +void connection_hashtable_reset(GHashTable *connection_track_table)
>> +{
>> +    g_hash_table_remove_all(connection_track_table);
>> +}
>> +
>> +/* if not found, create a new connection and add to hash table */
>> +Connection *connection_get(GHashTable *connection_track_table,
>> +                           ConnectionKey *key,
>> +                           uint32_t *hashtable_size)
>> +{
>> +    /* FIXME: protect connection_track_table */
>
> I fail to understand why need protection here.

No need this...will remove it.

>
>> +    Connection *conn = g_hash_table_lookup(connection_track_table, 
>> key);
>> +
>> +    if (conn == NULL) {
>> +        ConnectionKey *new_key = g_memdup(key, sizeof(*key));
>> +
>> +        conn = connection_new(key);
>> +
>> +        (*hashtable_size) += 1;
>> +        if (*hashtable_size > HASHTABLE_MAX_SIZE) {
>> +            error_report("colo proxy connection hashtable full, 
>> clear it");
>
> Is this a hint that we need a synchronization?

NO...we needn't.

>
>> + connection_hashtable_reset(connection_track_table);
>> +            *hashtable_size = 0;
>> +            /* TODO:clear conn_list */
>
> If we don't clear conn_list, looks like a bug, so probably need to do 
> this in this patch.

OK~~

>
>> +        }
>> +
>> +        g_hash_table_insert(connection_track_table, new_key, conn);
>> +    }
>> +
>> +    return conn;
>> +}
>> diff --git a/net/colo-base.h b/net/colo-base.h
>> new file mode 100644
>> index 0000000..01c1a5d
>> --- /dev/null
>> +++ b/net/colo-base.h
>> @@ -0,0 +1,88 @@
>> +/*
>> + * COarse-grain LOck-stepping Virtual Machines for Non-stop Service 
>> (COLO)
>> + * (a.k.a. Fault Tolerance or Continuous Replication)
>> + *
>> + * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
>> + * Copyright (c) 2016 FUJITSU LIMITED
>> + * Copyright (c) 2016 Intel Corporation
>> + *
>> + * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2 or
>> + * later.  See the COPYING file in the top-level directory.
>> + */
>> +
>> +#ifndef QEMU_COLO_BASE_H
>> +#define QEMU_COLO_BASE_H
>> +
>> +#include "slirp/slirp.h"
>> +#include "qemu/jhash.h"
>> +#include "qemu/rcu.h"
>
> Don't see any rcu usage in this patch.

will remove it.

>
>> +
>> +#define HASHTABLE_MAX_SIZE 16384
>> +
>> +typedef enum colo_conn_state {
>
> This looks like can only take care of TCP, so probably add "tcp" in 
> its name.

yes.

>
>> +     COLO_CONN_IDLE,
>> +
>> +    /* States on the primary: For incoming connection */
>> +     COLO_CONN_PRI_IN_SYN,   /* Received Syn */
>> +     COLO_CONN_PRI_IN_PSYNACK, /* Received syn/ack from primary, but 
>> not
>> +                                yet from secondary */
>> +     COLO_CONN_PRI_IN_SSYNACK, /* Received syn/ack from secondary, but
>> +                                  not yet from primary */
>> +     COLO_CONN_PRI_IN_SYNACK,  /* Received syn/ack from both */
>> +     COLO_CONN_PRI_IN_ESTABLISHED, /* Got the ACK */
>> +
>> +    /* States on the secondary: For incoming connection */
>> +     COLO_CONN_SEC_IN_SYNACK,      /* We sent a syn/ack */
>> +     COLO_CONN_SEC_IN_ACK,         /* Saw the ack but didn't yet see 
>> our syn/ack */
>> +     COLO_CONN_SEC_IN_ESTABLISHED, /* Got the ACK from the outside */
>
> Should we care about any FIN state here?

Currently we don't care.

>
>> +} colo_conn_state;
>> +
>> +typedef struct Packet {
>> +    void *data;
>> +    union {
>> +        uint8_t *network_layer;
>> +        struct ip *ip;
>> +    };
>> +    uint8_t *transport_layer;
>> +    int size;
>> +} Packet;
>
> We may start to consider shares codes between e.g hw/net/net_tx_pkt.c.

I read it.the file be added to qemu a mouth ago.
it need time to be stable.maybe it will change.
So I think this job should be do after colo-compare be merged...

>
>> +
>> +typedef struct ConnectionKey {
>> +    /* (src, dst) must be grouped, in the same way than in IP header */
>> +    struct in_addr src;
>> +    struct in_addr dst;
>> +    uint16_t src_port;
>> +    uint16_t dst_port;
>> +    uint8_t ip_proto;
>> +} QEMU_PACKED ConnectionKey;
>> +
>> +typedef struct Connection {
>> +    /* connection primary send queue: element type: Packet */
>> +    GQueue primary_list;
>> +    /* connection secondary send queue: element type: Packet */
>> +    GQueue secondary_list;
>> +    /* flag to enqueue unprocessed_connections */
>> +    bool processing;
>> +    uint8_t ip_proto;
>> +    /* be used by filter-rewriter */
>> +    colo_conn_state state;
>> +    tcp_seq  primary_seq;
>> +    tcp_seq  secondary_seq;
>> +} Connection;
>> +
>> +uint32_t connection_key_hash(const void *opaque);
>> +int connection_key_equal(const void *opaque1, const void *opaque2);
>> +int parse_packet_early(Packet *pkt);
>> +void fill_connection_key(Packet *pkt, ConnectionKey *key, int mode);
>> +Connection *connection_new(ConnectionKey *key);
>> +void connection_destroy(void *opaque);
>> +Connection *connection_get(GHashTable *connection_track_table,
>> +                           ConnectionKey *key,
>> +                           uint32_t *hashtable_size);
>> +void connection_hashtable_reset(GHashTable *connection_track_table);
>> +Packet *packet_new(const void *data, int size);
>> +void packet_destroy(void *opaque, void *user_data);
>> +
>> +#endif /* QEMU_COLO_BASE_H */
>> diff --git a/net/colo-compare.c b/net/colo-compare.c
>> index a3e1456..4231fe7 100644
>> --- a/net/colo-compare.c
>> +++ b/net/colo-compare.c
>> @@ -28,6 +28,7 @@
>>   #include "qemu/sockets.h"
>>   #include "qapi-visit.h"
>>   #include "trace.h"
>> +#include "net/colo-base.h"
>>     #define TYPE_COLO_COMPARE "colo-compare"
>>   #define COLO_COMPARE(obj) \
>> @@ -38,6 +39,28 @@
>>   static QTAILQ_HEAD(, CompareState) net_compares =
>>          QTAILQ_HEAD_INITIALIZER(net_compares);
>>   +/*
>> +  + CompareState ++
>> +  |               |
>> +  +---------------+   +---------------+ +---------------+
>> +  |conn list      +--->conn +--------->conn           |
>> +  +---------------+   +---------------+ +---------------+
>> +  |               |     |           |             |          |
>> +  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
>> +                    |primary |  |secondary    |primary | |secondary
>> +                    |packet  |  |packet  +    |packet  | |packet  +
>> +                    +--------+  +--------+    +--------+ +--------+
>> +                        |           |             |          |
>> +                    +---v----+  +---v----+    +---v----+ +---v----+
>> +                    |primary |  |secondary    |primary | |secondary
>> +                    |packet  |  |packet  +    |packet  | |packet  +
>> +                    +--------+  +--------+    +--------+ +--------+
>> +                        |           |             |          |
>> +                    +---v----+  +---v----+    +---v----+ +---v----+
>> +                    |primary |  |secondary    |primary | |secondary
>> +                    |packet  |  |packet  +    |packet  | |packet  +
>> +                    +--------+  +--------+    +--------+ +--------+
>> +*/
>>   typedef struct CompareState {
>>       Object parent;
>>   @@ -50,12 +73,103 @@ typedef struct CompareState {
>>       QTAILQ_ENTRY(CompareState) next;
>>       SocketReadState pri_rs;
>>       SocketReadState sec_rs;
>> +
>> +    /* connection list: the connections belonged to this NIC could 
>> be found
>> +     * in this list.
>> +     * element type: Connection
>> +     */
>> +    GQueue conn_list;
>> +    QemuMutex conn_list_lock; /* to protect conn_list */
>
> Why need this mutex?

will remove it.

>
>> +    /* hashtable to save connection */
>> +    GHashTable *connection_track_table;
>> +    /* to save unprocessed_connections */
>> +    GQueue unprocessed_connections;
>> +    /* proxy current hash size */
>> +    uint32_t hashtable_size;
>>   } CompareState;
>>     typedef struct CompareClass {
>>       ObjectClass parent_class;
>>   } CompareClass;
>>   +enum {
>> +    PRIMARY_IN = 0,
>> +    SECONDARY_IN,
>> +};
>> +
>> +static int compare_chr_send(CharDriverState *out,
>> +                            const uint8_t *buf,
>> +                            uint32_t size);
>> +
>> +/*
>> + * Return 0 on success, if return -1 means the pkt
>> + * is unsupported(arp and ipv6) and will be sent later
>> + */
>> +static int packet_enqueue(CompareState *s, int mode)
>> +{
>> +    ConnectionKey key = {{ 0 } };
>> +    Packet *pkt = NULL;
>> +    Connection *conn;
>> +
>> +    if (mode == PRIMARY_IN) {
>> +        pkt = packet_new(s->pri_rs.buf, s->pri_rs.packet_len);
>> +    } else {
>> +        pkt = packet_new(s->sec_rs.buf, s->sec_rs.packet_len);
>> +    }
>> +
>> +    if (parse_packet_early(pkt)) {
>> +        packet_destroy(pkt, NULL);
>> +        pkt = NULL;
>> +        return -1;
>> +    }
>> +    fill_connection_key(pkt, &key, PRIMARY_IN);
>> +
>> +    conn = connection_get(s->connection_track_table,
>> +                          &key,
>> +                          &s->hashtable_size);
>> +    if (!conn->processing) {
>> +        qemu_mutex_lock(&s->conn_list_lock);
>> +        g_queue_push_tail(&s->conn_list, conn);
>> +        qemu_mutex_unlock(&s->conn_list_lock);
>> +        conn->processing = true;
>> +    }
>> +
>> +    if (mode == PRIMARY_IN) {
>> +        g_queue_push_tail(&conn->primary_list, pkt);
>> +    } else {
>> +        g_queue_push_tail(&conn->secondary_list, pkt);
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +static int compare_chr_send(CharDriverState *out,
>> +                            const uint8_t *buf,
>> +                            uint32_t size)
>> +{
>> +    int ret = 0;
>> +    uint32_t len = htonl(size);
>> +
>> +    if (!size) {
>> +        return 0;
>> +    }
>> +
>> +    ret = qemu_chr_fe_write_all(out, (uint8_t *)&len, sizeof(len));
>> +    if (ret != sizeof(len)) {
>> +        goto err;
>> +    }
>> +
>> +    ret = qemu_chr_fe_write_all(out, (uint8_t *)buf, size);
>> +    if (ret != size) {
>> +        goto err;
>> +    }
>> +
>> +    return 0;
>> +
>> +err:
>> +    return ret < 0 ? ret : -EIO;
>> +}
>> +
>>   static char *compare_get_pri_indev(Object *obj, Error **errp)
>>   {
>>       CompareState *s = COLO_COMPARE(obj);
>> @@ -103,12 +217,21 @@ static void compare_set_outdev(Object *obj, 
>> const char *value, Error **errp)
>>     static void compare_pri_rs_finalize(SocketReadState *pri_rs)
>>   {
>> -    /* if packet_enqueue pri pkt failed we will send unsupported 
>> packet */
>> +    CompareState *s = container_of(pri_rs, CompareState, pri_rs);
>> +
>> +    if (packet_enqueue(s, PRIMARY_IN)) {
>> +        trace_colo_compare_main("primary: unsupported packet in");
>> +        compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len);
>> +    }
>
> Do we have a upper limit on the maximum numbers of packets could be 
> queued? If not, guest may easily trigger OOM.

We need a g_queue to do this job? It upper than the limit we drop the 
packet?

Thanks
Zhang Chen

>
>>   }
>>     static void compare_sec_rs_finalize(SocketReadState *sec_rs)
>>   {
>> -    /* if packet_enqueue sec pkt failed we will notify trace */
>> +    CompareState *s = container_of(sec_rs, CompareState, sec_rs);
>> +
>> +    if (packet_enqueue(s, SECONDARY_IN)) {
>> +        trace_colo_compare_main("secondary: unsupported packet in");
>> +    }
>>   }
>>     /*
>> @@ -161,6 +284,15 @@ static void colo_compare_complete(UserCreatable 
>> *uc, Error **errp)
>>       net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize);
>>       net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize);
>>   +    g_queue_init(&s->conn_list);
>> +    qemu_mutex_init(&s->conn_list_lock);
>> +    s->hashtable_size = 0;
>> +
>> +    s->connection_track_table = 
>> g_hash_table_new_full(connection_key_hash,
>> + connection_key_equal,
>> +                                                      g_free,
>> + connection_destroy);
>> +
>>       return;
>>   }
>>   @@ -203,6 +335,8 @@ static void colo_compare_finalize(Object *obj)
>>       if (!QTAILQ_EMPTY(&net_compares)) {
>>           QTAILQ_REMOVE(&net_compares, s, next);
>>       }
>> +    qemu_mutex_destroy(&s->conn_list_lock);
>> +    g_queue_free(&s->conn_list);
>>         g_free(s->pri_indev);
>>       g_free(s->sec_indev);
>> diff --git a/trace-events b/trace-events
>> index ca7211b..703de1a 100644
>> --- a/trace-events
>> +++ b/trace-events
>> @@ -1916,3 +1916,6 @@ aspeed_vic_update_fiq(int flags) "Raising FIQ: %d"
>>   aspeed_vic_update_irq(int flags) "Raising IRQ: %d"
>>   aspeed_vic_read(uint64_t offset, unsigned size, uint32_t value) 
>> "From 0x%" PRIx64 " of size %u: 0x%" PRIx32
>>   aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To 
>> 0x%" PRIx64 " of size %u: 0x%" PRIx32
>> +
>> +# net/colo-compare.c
>> +colo_compare_main(const char *chr) ": %s"
>
>
>
> .
>

-- 
Thanks
zhangchen

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [RFC PATCH V5 1/4] colo-compare: introduce colo compare initialization
  2016-07-08  9:12       ` Jason Wang
@ 2016-07-11  5:14         ` Zhang Chen
  0 siblings, 0 replies; 21+ messages in thread
From: Zhang Chen @ 2016-07-11  5:14 UTC (permalink / raw)
  To: Jason Wang, qemu devel
  Cc: Dr . David Alan Gilbert, eddie . dong, Li Zhijian, zhanghailiang



On 07/08/2016 05:12 PM, Jason Wang wrote:
>
>
> On 2016年07月08日 16:21, Zhang Chen wrote:
>>
>>
>> On 07/08/2016 11:40 AM, Jason Wang wrote:
>>>
>>>
>>> On 2016年06月23日 19:34, Zhang Chen wrote:
>>>> Packets coming from the primary char indev will be sent to outdev
>>>> Packets coming from the secondary char dev will be dropped
>>>>
>>>> usage:
>>>>
>>>> primary:
>>>> -netdev 
>>>> tap,id=hn0,vhost=off,script=/etc/qemu-ifup,downscript=/etc/qemu-ifdown
>>>> -device e1000,id=e0,netdev=hn0,mac=52:a4:00:12:78:66
>>>> -chardev socket,id=mirror0,host=3.3.3.3,port=9003,server,nowait
>>>> -chardev socket,id=compare1,host=3.3.3.3,port=9004,server,nowait
>>>> -chardev socket,id=compare0,host=3.3.3.3,port=9001,server,nowait
>>>> -chardev socket,id=compare0-0,host=3.3.3.3,port=9001
>>>> -chardev socket,id=compare_out,host=3.3.3.3,port=9005,server,nowait
>>>> -chardev socket,id=compare_out0,host=3.3.3.3,port=9005
>>>> -object filter-mirror,id=m0,netdev=hn0,queue=tx,outdev=mirror0
>>>> -object 
>>>> filter-redirector,netdev=hn0,id=redire0,queue=rx,indev=compare_out
>>>> -object 
>>>> filter-redirector,netdev=hn0,id=redire1,queue=rx,outdev=compare0
>>>> -object 
>>>> colo-compare,id=comp0,primary_in=compare0-0,secondary_in=compare1,outdev=compare_out0
>>>>
>>>> secondary:
>>>> -netdev tap,id=hn0,vhost=off,script=/etc/qemu-ifup,down 
>>>> script=/etc/qemu-ifdown
>>>> -device e1000,netdev=hn0,mac=52:a4:00:12:78:66
>>>> -chardev socket,id=red0,host=3.3.3.3,port=9003
>>>> -chardev socket,id=red1,host=3.3.3.3,port=9004
>>>> -object filter-redirector,id=f1,netdev=hn0,queue=tx,indev=red0
>>>> -object filter-redirector,id=f2,netdev=hn0,queue=rx,outdev=red1
>>>
>>> Consider we finally want a non-rfc patch, it's better to have a some 
>>> explanations on the above configurations since it was not easy to 
>>> for starters at first glance.Maybe you can use either a ascii figure 
>>> or a paragraph. Also need to explain the parameter of colo-compare 
>>> in detail.
>>
>> Make sense,I will add a ascii figure and some comments to explain it.
>>
>>>
>>>>
>>>> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>>>> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
>>>> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
>>>> ---
>>>>   net/Makefile.objs  |   1 +
>>>>   net/colo-compare.c | 231 
>>>> +++++++++++++++++++++++++++++++++++++++++++++++++++++
>>>>   qemu-options.hx    |  34 ++++++++
>>>>   vl.c               |   3 +-
>>>>   4 files changed, 268 insertions(+), 1 deletion(-)
>>>>   create mode 100644 net/colo-compare.c
>>>>
>>>> diff --git a/net/Makefile.objs b/net/Makefile.objs
>>>> index b7c22fd..ba92f73 100644
>>>> --- a/net/Makefile.objs
>>>> +++ b/net/Makefile.objs
>>>> @@ -16,3 +16,4 @@ common-obj-$(CONFIG_NETMAP) += netmap.o
>>>>   common-obj-y += filter.o
>>>>   common-obj-y += filter-buffer.o
>>>>   common-obj-y += filter-mirror.o
>>>> +common-obj-y += colo-compare.o
>>>> diff --git a/net/colo-compare.c b/net/colo-compare.c
>>>> new file mode 100644
>>>> index 0000000..a3e1456
>>>> --- /dev/null
>>>> +++ b/net/colo-compare.c
>>>> @@ -0,0 +1,231 @@
>>>> +/*
>>>> + * COarse-grain LOck-stepping Virtual Machines for Non-stop 
>>>> Service (COLO)
>>>> + * (a.k.a. Fault Tolerance or Continuous Replication)
>>>> + *
>>>> + * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
>>>> + * Copyright (c) 2016 FUJITSU LIMITED
>>>> + * Copyright (c) 2016 Intel Corporation
>>>> + *
>>>> + * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>>>> + *
>>>> + * This work is licensed under the terms of the GNU GPL, version 2 or
>>>> + * later.  See the COPYING file in the top-level directory.
>>>> + */
>>>> +
>>>> +#include "qemu/osdep.h"
>>>> +#include "qemu/error-report.h"
>>>> +#include "qemu-common.h"
>>>> +#include "qapi/qmp/qerror.h"
>>>> +#include "qapi/error.h"
>>>> +#include "net/net.h"
>>>> +#include "net/vhost_net.h"
>>>> +#include "qom/object_interfaces.h"
>>>> +#include "qemu/iov.h"
>>>> +#include "qom/object.h"
>>>> +#include "qemu/typedefs.h"
>>>> +#include "net/queue.h"
>>>> +#include "sysemu/char.h"
>>>> +#include "qemu/sockets.h"
>>>> +#include "qapi-visit.h"
>>>> +#include "trace.h"
>>>
>>> Looks like trace were not really used in the patch, you can delay 
>>> the inclusion until is was really used.
>>
>> OK~~~
>>
>>>
>>>> +
>>>> +#define TYPE_COLO_COMPARE "colo-compare"
>>>> +#define COLO_COMPARE(obj) \
>>>> +    OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
>>>> +
>>>> +#define COMPARE_READ_LEN_MAX NET_BUFSIZE
>>>> +
>>>> +static QTAILQ_HEAD(, CompareState) net_compares =
>>>> +       QTAILQ_HEAD_INITIALIZER(net_compares);
>>>
>>> What's the usage of this? A comment would be better.
>>
>> If we need compare more than one netdev,we should use
>> more than one colo-compare. we do checkpoint should flush
>> all enqueued packet in colo-compare when work with colo-frame.
>> we use this queue to find all colo-compare.
>> So, look like no need here, I will move it to after patch.
>
> Yes unless you want a single colo comparing threads to do comparing 
> for all netdevs. (But I agree, looks not need).
>
>>
>>
>>>
>>>> +
>>>> +typedef struct CompareState {
>>>> +    Object parent;
>>>> +
>>>> +    char *pri_indev;
>>>> +    char *sec_indev;
>>>> +    char *outdev;
>>>> +    CharDriverState *chr_pri_in;
>>>> +    CharDriverState *chr_sec_in;
>>>> +    CharDriverState *chr_out;
>>>> +    QTAILQ_ENTRY(CompareState) next;
>>>> +    SocketReadState pri_rs;
>>>> +    SocketReadState sec_rs;
>>>> +} CompareState;
>>>> +
>>>> +typedef struct CompareClass {
>>>> +    ObjectClass parent_class;
>>>> +} CompareClass;
>>>> +
>>>> +static char *compare_get_pri_indev(Object *obj, Error **errp)
>>>> +{
>>>> +    CompareState *s = COLO_COMPARE(obj);
>>>> +
>>>> +    return g_strdup(s->pri_indev);
>>>> +}
>>>> +
>>>> +static void compare_set_pri_indev(Object *obj, const char *value, 
>>>> Error **errp)
>>>> +{
>>>> +    CompareState *s = COLO_COMPARE(obj);
>>>> +
>>>> +    g_free(s->pri_indev);
>>>> +    s->pri_indev = g_strdup(value);
>>>
>>> I think we need do something more than this, e.g release the orig 
>>> dev and get the new one? Or just forbid setting this property.
>>>
>>
>> Do you means that:
>> qemu_chr_fe_release(s->chr_pri_in);
>>
>> If yes,in here we just get/set char* pri_indev(chardev's name).
>> We don't get or set CharDriverState, so I think we needn't do more.
>
> Maybe I miss something, but is there any usage for just changing 
> chardev's name here?

Yes, colo-compare get the name of chardev and save it.
like primary_in=xxx, secondary_in=xxxx,outdev=xxxxx.

>
>>
>>
>>> And looks like we have similar issues for sec_indev and outdev.
>>>
>>>> +}
>>>> +
>>>>
>
> [...]
>
>>>> +
>>>> +static void colo_compare_finalize(Object *obj)
>>>> +{
>>>> +    CompareState *s = COLO_COMPARE(obj);
>>>> +
>>>> +    if (s->chr_pri_in) {
>>>> +        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
>>>
>>> Why need do this?
>>
>> more safty before do qemu_chr_fe_release() for chardev
>> like backends/rng-egd.c
>
> Ok, but looks like we don't set any handlers in the patch, so I don't 
> get why need to clear it.
>
> For the things of eng-egd, I think we need fail if it was not a socket 
> chardev. Vhost-use did similar check in net_vhost_chardev_opts() which 
> maybe useful for here (we probably need this for mirror/redirector too).
>
> [...]
>
>
> .
>

-- 
Thanks
zhangchen

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [RFC PATCH V5 2/4] colo-compare: track connection and enqueue packet
  2016-07-08  9:56     ` Zhang Chen
@ 2016-07-11  5:41       ` Jason Wang
  2016-07-12  5:42         ` Zhang Chen
  0 siblings, 1 reply; 21+ messages in thread
From: Jason Wang @ 2016-07-11  5:41 UTC (permalink / raw)
  To: Zhang Chen, qemu devel
  Cc: Li Zhijian, Wen Congyang, zhanghailiang, eddie . dong,
	Dr . David Alan Gilbert



On 2016年07月08日 17:56, Zhang Chen wrote:
>
>
> On 07/08/2016 12:07 PM, Jason Wang wrote:
>>
>>
>> On 2016年06月23日 19:34, Zhang Chen wrote:
>>> In this patch we use kernel jhash table to track
>>> connection, and then enqueue net packet like this:
>>>
>>> + CompareState ++
>>> |               |
>>> +---------------+   +---------------+ +---------------+
>>> |conn list      +--->conn +--------->conn           |
>>> +---------------+   +---------------+ +---------------+
>>> |               |     |           |             |          |
>>> +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
>>>                    |primary |  |secondary    |primary | |secondary
>>>                    |packet  |  |packet  +    |packet  | |packet +
>>>                    +--------+  +--------+    +--------+ +--------+
>>>                        |           |             |          |
>>>                    +---v----+  +---v----+    +---v----+ +---v----+
>>>                    |primary |  |secondary    |primary | |secondary
>>>                    |packet  |  |packet  +    |packet  | |packet +
>>>                    +--------+  +--------+    +--------+ +--------+
>>>                        |           |             |          |
>>>                    +---v----+  +---v----+    +---v----+ +---v----+
>>>                    |primary |  |secondary    |primary | |secondary
>>>                    |packet  |  |packet  +    |packet  | |packet +
>>>                    +--------+  +--------+    +--------+ +--------+
>>
>> A paragraph to describe the above would be more than welcomed.
>
> I will add some comments for it.
>
>>
>>> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>>> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
>>> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
>>> ---
>>>   include/qemu/jhash.h |  61 ++++++++++++++++
>>>   net/Makefile.objs    |   1 +
>>>   net/colo-base.c      | 194 
>>> +++++++++++++++++++++++++++++++++++++++++++++++++++
>>>   net/colo-base.h      |  88 +++++++++++++++++++++++
>>>   net/colo-compare.c   | 138 +++++++++++++++++++++++++++++++++++-
>>>   trace-events         |   3 +
>>>   6 files changed, 483 insertions(+), 2 deletions(-)
>>>   create mode 100644 include/qemu/jhash.h
>>>   create mode 100644 net/colo-base.c
>>>   create mode 100644 net/colo-base.h
>>>
>>> diff --git a/include/qemu/jhash.h b/include/qemu/jhash.h
>>> new file mode 100644
>>> index 0000000..0fcd875
>>> --- /dev/null
>>> +++ b/include/qemu/jhash.h
>>> @@ -0,0 +1,61 @@
>>> +/* jhash.h: Jenkins hash support.
>>> +  *
>>> +  * Copyright (C) 2006. Bob Jenkins (bob_jenkins@burtleburtle.net)
>>> +  *
>>> +  * http://burtleburtle.net/bob/hash/
>>> +  *
>>> +  * These are the credits from Bob's sources:
>>> +  *
>>> +  * lookup3.c, by Bob Jenkins, May 2006, Public Domain.
>>> +  *
>>> +  * These are functions for producing 32-bit hashes for hash table 
>>> lookup.
>>> +  * hashword(), hashlittle(), hashlittle2(), hashbig(), mix(), and 
>>> final()
>>> +  * are externally useful functions.  Routines to test the hash are
>>> +included
>>> +  * if SELF_TEST is defined.  You can use this free for any purpose.
>>> +It's in
>>> +  * the public domain.  It has no warranty.
>>> +  *
>>> +  * Copyright (C) 2009-2010 Jozsef Kadlecsik 
>>> (kadlec@blackhole.kfki.hu)
>>> +  *
>>> +  * I've modified Bob's hash to be useful in the Linux kernel, and
>>> +  * any bugs present are my fault.
>>> +  * Jozsef
>>> +  */
>>> +
>>> +#ifndef QEMU_JHASH_H__
>>> +#define QEMU_JHASH_H__
>>> +
>>> +#include "qemu/bitops.h"
>>> +
>>> +/*
>>> + * hashtable relation copy from linux kernel jhash
>>> + */
>>> +
>>> +/* __jhash_mix -- mix 3 32-bit values reversibly. */
>>> +#define __jhash_mix(a, b, c)                \
>>> +{                                           \
>>> +    a -= c;  a ^= rol32(c, 4);  c += b;     \
>>> +    b -= a;  b ^= rol32(a, 6);  a += c;     \
>>> +    c -= b;  c ^= rol32(b, 8);  b += a;     \
>>> +    a -= c;  a ^= rol32(c, 16); c += b;     \
>>> +    b -= a;  b ^= rol32(a, 19); a += c;     \
>>> +    c -= b;  c ^= rol32(b, 4);  b += a;     \
>>> +}
>>> +
>>> +/* __jhash_final - final mixing of 3 32-bit values (a,b,c) into c */
>>> +#define __jhash_final(a, b, c)  \
>>> +{                               \
>>> +    c ^= b; c -= rol32(b, 14);  \
>>> +    a ^= c; a -= rol32(c, 11);  \
>>> +    b ^= a; b -= rol32(a, 25);  \
>>> +    c ^= b; c -= rol32(b, 16);  \
>>> +    a ^= c; a -= rol32(c, 4);   \
>>> +    b ^= a; b -= rol32(a, 14);  \
>>> +    c ^= b; c -= rol32(b, 24);  \
>>> +}
>>> +
>>> +/* An arbitrary initial parameter */
>>> +#define JHASH_INITVAL           0xdeadbeef
>>> +
>>> +#endif /* QEMU_JHASH_H__ */
>>
>> Please split jhash into another patch.
>
> Split to a independent patch in this patch set or not?

Better this series since it was the first user.

>
>
>>
>>> diff --git a/net/Makefile.objs b/net/Makefile.objs
>>> index ba92f73..119589f 100644
>>> --- a/net/Makefile.objs
>>> +++ b/net/Makefile.objs
>>> @@ -17,3 +17,4 @@ common-obj-y += filter.o
>>>   common-obj-y += filter-buffer.o
>>>   common-obj-y += filter-mirror.o
>>>   common-obj-y += colo-compare.o
>>> +common-obj-y += colo-base.o
>>> diff --git a/net/colo-base.c b/net/colo-base.c
>>> new file mode 100644
>>> index 0000000..7e263e8
>>> --- /dev/null
>>> +++ b/net/colo-base.c
>>> @@ -0,0 +1,194 @@
>>> +/*
>>> + * COarse-grain LOck-stepping Virtual Machines for Non-stop Service 
>>> (COLO)
>>> + * (a.k.a. Fault Tolerance or Continuous Replication)
>>> + *
>>> + * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
>>> + * Copyright (c) 2016 FUJITSU LIMITED
>>> + * Copyright (c) 2016 Intel Corporation
>>> + *
>>> + * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>>> + *
>>> + * This work is licensed under the terms of the GNU GPL, version 2 or
>>> + * later.  See the COPYING file in the top-level directory.
>>> + */
>>> +
>>> +#include "qemu/osdep.h"
>>> +#include "qemu/error-report.h"
>>> +#include "net/colo-base.h"
>>> +
>>> +uint32_t connection_key_hash(const void *opaque)
>>> +{
>>> +    const ConnectionKey *key = opaque;
>>> +    uint32_t a, b, c;
>>> +
>>> +    /* Jenkins hash */
>>> +    a = b = c = JHASH_INITVAL + sizeof(*key);
>>> +    a += key->src.s_addr;
>>> +    b += key->dst.s_addr;
>>> +    c += (key->src_port | key->dst_port << 16);
>>> +    __jhash_mix(a, b, c);
>>> +
>>> +    a += key->ip_proto;
>>> +    __jhash_final(a, b, c);
>>> +
>>> +    return c;
>>> +}
>>> +
>>> +int connection_key_equal(const void *key1, const void *key2)
>>> +{
>>> +    return memcmp(key1, key2, sizeof(ConnectionKey)) == 0;
>>> +}
>>> +
>>> +int parse_packet_early(Packet *pkt)
>>> +{
>>> +    int network_length;
>>> +    uint8_t *data = pkt->data;
>>> +    uint16_t l3_proto;
>>> +    ssize_t l2hdr_len = eth_get_l2_hdr_length(data);
>>> +
>>> +    if (pkt->size < ETH_HLEN) {
>>> +        error_report("pkt->size < ETH_HLEN");
>>> +        return 1;
>>> +    }
>>> +    pkt->network_layer = data + ETH_HLEN;
>>> +    l3_proto = eth_get_l3_proto(data, l2hdr_len);
>>> +    if (l3_proto != ETH_P_IP) {
>>> +        return 1;
>>> +    }
>>> +
>>> +    network_length = pkt->ip->ip_hl * 4;
>>> +    if (pkt->size < ETH_HLEN + network_length) {
>>> +        error_report("pkt->size < network_layer + network_length");
>>> +        return 1;
>>> +    }
>>> +    pkt->transport_layer = pkt->network_layer + network_length;
>>> +    if (!pkt->transport_layer) {
>>> +        error_report("pkt->transport_layer is valid");
>>> +        return 1;
>>> +    }
>>> +
>>> +    return 0;
>>> +}
>>> +
>>> +void fill_connection_key(Packet *pkt, ConnectionKey *key, int mode)
>>> +{
>>> +    uint32_t tmp_ports;
>>> +
>>> +    key->ip_proto = pkt->ip->ip_p;
>>> +
>>> +    switch (key->ip_proto) {
>>> +    case IPPROTO_TCP:
>>> +    case IPPROTO_UDP:
>>> +    case IPPROTO_DCCP:
>>> +    case IPPROTO_ESP:
>>> +    case IPPROTO_SCTP:
>>> +    case IPPROTO_UDPLITE:
>>> +        tmp_ports = *(uint32_t *)(pkt->transport_layer);
>>> +        if (mode) {
>>
>> Looks like mode is unnecessary here, you can actually compare and 
>> swap duing hashing to avoid mode here.
>
> I get your point.
>
>>
>>> +            key->src = pkt->ip->ip_src;
>>> +            key->dst = pkt->ip->ip_dst;
>>> +            key->src_port = ntohs(tmp_ports & 0xffff);
>>> +            key->dst_port = ntohs(tmp_ports >> 16);
>>> +        } else {
>>> +            key->dst = pkt->ip->ip_src;
>>> +            key->src = pkt->ip->ip_dst;
>>> +            key->dst_port = ntohs(tmp_ports & 0xffff);
>>> +            key->src_port = ntohs(tmp_ports >> 16);
>>> +        }
>>> +        break;
>>> +    case IPPROTO_AH:
>>> +        tmp_ports = *(uint32_t *)(pkt->transport_layer + 4);
>>> +        if (mode) {
>>> +            key->src = pkt->ip->ip_src;
>>> +            key->dst = pkt->ip->ip_dst;
>>> +            key->src_port = ntohs(tmp_ports & 0xffff);
>>> +            key->dst_port = ntohs(tmp_ports >> 16);
>>> +        } else {
>>> +            key->dst = pkt->ip->ip_src;
>>> +            key->src = pkt->ip->ip_dst;
>>> +            key->dst_port = ntohs(tmp_ports & 0xffff);
>>> +            key->src_port = ntohs(tmp_ports >> 16);
>>> +        }
>>> +        break;
>>> +    default:
>>> +        key->src_port = 0;
>>> +        key->dst_port = 0;
>>> +        break;
>>> +    }
>>> +}
>>
>> This seems could be reused, please use a independent patch for 
>> connection key stuffs.
>
> In this patch set or not?
> If not, we make a new .c and .h for this?
>

Yes, this series please.

>>
>>> +
>>> +Connection *connection_new(ConnectionKey *key)
>>> +{
>>> +    Connection *conn = g_slice_new(Connection);
>>> +
>>> +    conn->ip_proto = key->ip_proto;
>>> +    conn->processing = false;
>>> +    g_queue_init(&conn->primary_list);
>>> +    g_queue_init(&conn->secondary_list);
>>> +
>>> +    return conn;
>>> +}
>>> +
>>> +void connection_destroy(void *opaque)
>>> +{
>>> +    Connection *conn = opaque;
>>> +
>>> +    g_queue_foreach(&conn->primary_list, packet_destroy, NULL);
>>> +    g_queue_free(&conn->primary_list);
>>> +    g_queue_foreach(&conn->secondary_list, packet_destroy, NULL);
>>> +    g_queue_free(&conn->secondary_list);
>>> +    g_slice_free(Connection, conn);
>>> +}
>>> +
>>> +Packet *packet_new(const void *data, int size)
>>> +{
>>> +    Packet *pkt = g_slice_new(Packet);
>>> +
>>> +    pkt->data = g_memdup(data, size);
>>> +    pkt->size = size;
>>> +
>>> +    return pkt;
>>> +}
>>> +
>>> +void packet_destroy(void *opaque, void *user_data)
>>> +{
>>> +    Packet *pkt = opaque;
>>> +
>>> +    g_free(pkt->data);
>>> +    g_slice_free(Packet, pkt);
>>> +}
>>> +
>>> +/*
>>> + * Clear hashtable, stop this hash growing really huge
>>> + */
>>> +void connection_hashtable_reset(GHashTable *connection_track_table)
>>> +{
>>> +    g_hash_table_remove_all(connection_track_table);
>>> +}
>>> +
>>> +/* if not found, create a new connection and add to hash table */
>>> +Connection *connection_get(GHashTable *connection_track_table,
>>> +                           ConnectionKey *key,
>>> +                           uint32_t *hashtable_size)
>>> +{
>>> +    /* FIXME: protect connection_track_table */
>>
>> I fail to understand why need protection here.
>
> No need this...will remove it.
>
>>
>>> +    Connection *conn = g_hash_table_lookup(connection_track_table, 
>>> key);
>>> +
>>> +    if (conn == NULL) {
>>> +        ConnectionKey *new_key = g_memdup(key, sizeof(*key));
>>> +
>>> +        conn = connection_new(key);
>>> +
>>> +        (*hashtable_size) += 1;
>>> +        if (*hashtable_size > HASHTABLE_MAX_SIZE) {
>>> +            error_report("colo proxy connection hashtable full, 
>>> clear it");
>>
>> Is this a hint that we need a synchronization?
>
> NO...we needn't.
>

But you reset the hash table which means we lose the status of packet 
comparing?

>>
>>> + connection_hashtable_reset(connection_track_table);
>>> +            *hashtable_size = 0;
>>> +            /* TODO:clear conn_list */
>>
>> If we don't clear conn_list, looks like a bug, so probably need to do 
>> this in this patch.
>
> OK~~
>
>>
>>> +        }
>>> +
>>> +        g_hash_table_insert(connection_track_table, new_key, conn);
>>> +    }
>>> +
>>> +    return conn;
>>> +}
>>> diff --git a/net/colo-base.h b/net/colo-base.h
>>> new file mode 100644
>>> index 0000000..01c1a5d
>>> --- /dev/null
>>> +++ b/net/colo-base.h
>>> @@ -0,0 +1,88 @@
>>> +/*
>>> + * COarse-grain LOck-stepping Virtual Machines for Non-stop Service 
>>> (COLO)
>>> + * (a.k.a. Fault Tolerance or Continuous Replication)
>>> + *
>>> + * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
>>> + * Copyright (c) 2016 FUJITSU LIMITED
>>> + * Copyright (c) 2016 Intel Corporation
>>> + *
>>> + * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>>> + *
>>> + * This work is licensed under the terms of the GNU GPL, version 2 or
>>> + * later.  See the COPYING file in the top-level directory.
>>> + */
>>> +
>>> +#ifndef QEMU_COLO_BASE_H
>>> +#define QEMU_COLO_BASE_H
>>> +
>>> +#include "slirp/slirp.h"
>>> +#include "qemu/jhash.h"
>>> +#include "qemu/rcu.h"
>>
>> Don't see any rcu usage in this patch.
>
> will remove it.
>
>>
>>> +
>>> +#define HASHTABLE_MAX_SIZE 16384
>>> +
>>> +typedef enum colo_conn_state {
>>
>> This looks like can only take care of TCP, so probably add "tcp" in 
>> its name.
>
> yes.
>
>>
>>> +     COLO_CONN_IDLE,
>>> +
>>> +    /* States on the primary: For incoming connection */
>>> +     COLO_CONN_PRI_IN_SYN,   /* Received Syn */
>>> +     COLO_CONN_PRI_IN_PSYNACK, /* Received syn/ack from primary, 
>>> but not
>>> +                                yet from secondary */
>>> +     COLO_CONN_PRI_IN_SSYNACK, /* Received syn/ack from secondary, but
>>> +                                  not yet from primary */
>>> +     COLO_CONN_PRI_IN_SYNACK,  /* Received syn/ack from both */
>>> +     COLO_CONN_PRI_IN_ESTABLISHED, /* Got the ACK */
>>> +
>>> +    /* States on the secondary: For incoming connection */
>>> +     COLO_CONN_SEC_IN_SYNACK,      /* We sent a syn/ack */
>>> +     COLO_CONN_SEC_IN_ACK,         /* Saw the ack but didn't yet 
>>> see our syn/ack */
>>> +     COLO_CONN_SEC_IN_ESTABLISHED, /* Got the ACK from the outside */
>>
>> Should we care about any FIN state here?
>
> Currently we don't care.
>

Then a comment to explain why only care the stated during connection 
establishment will be better.

>>
>>> +} colo_conn_state;
>>> +
>>> +typedef struct Packet {
>>> +    void *data;
>>> +    union {
>>> +        uint8_t *network_layer;
>>> +        struct ip *ip;
>>> +    };
>>> +    uint8_t *transport_layer;
>>> +    int size;
>>> +} Packet;
>>
>> We may start to consider shares codes between e.g hw/net/net_tx_pkt.c.
>
> I read it.the file be added to qemu a mouth ago.
> it need time to be stable.maybe it will change.
> So I think this job should be do after colo-compare be merged...

Ok, but we need to avoid duplications as much as possible.

>
>>
>>> +
>>> +typedef struct ConnectionKey {
>>> +    /* (src, dst) must be grouped, in the same way than in IP 
>>> header */
>>> +    struct in_addr src;
>>> +    struct in_addr dst;
>>> +    uint16_t src_port;
>>> +    uint16_t dst_port;
>>> +    uint8_t ip_proto;
>>> +} QEMU_PACKED ConnectionKey;
>>> +
>>> +typedef struct Connection {
>>> +    /* connection primary send queue: element type: Packet */
>>> +    GQueue primary_list;
>>> +    /* connection secondary send queue: element type: Packet */
>>> +    GQueue secondary_list;
>>> +    /* flag to enqueue unprocessed_connections */
>>> +    bool processing;
>>> +    uint8_t ip_proto;
>>> +    /* be used by filter-rewriter */
>>> +    colo_conn_state state;
>>> +    tcp_seq  primary_seq;
>>> +    tcp_seq  secondary_seq;
>>> +} Connection;
>>> +
>>> +uint32_t connection_key_hash(const void *opaque);
>>> +int connection_key_equal(const void *opaque1, const void *opaque2);
>>> +int parse_packet_early(Packet *pkt);
>>> +void fill_connection_key(Packet *pkt, ConnectionKey *key, int mode);
>>> +Connection *connection_new(ConnectionKey *key);
>>> +void connection_destroy(void *opaque);
>>> +Connection *connection_get(GHashTable *connection_track_table,
>>> +                           ConnectionKey *key,
>>> +                           uint32_t *hashtable_size);
>>> +void connection_hashtable_reset(GHashTable *connection_track_table);
>>> +Packet *packet_new(const void *data, int size);
>>> +void packet_destroy(void *opaque, void *user_data);
>>> +
>>> +#endif /* QEMU_COLO_BASE_H */
>>> diff --git a/net/colo-compare.c b/net/colo-compare.c
>>> index a3e1456..4231fe7 100644
>>> --- a/net/colo-compare.c
>>> +++ b/net/colo-compare.c
>>> @@ -28,6 +28,7 @@
>>>   #include "qemu/sockets.h"
>>>   #include "qapi-visit.h"
>>>   #include "trace.h"
>>> +#include "net/colo-base.h"
>>>     #define TYPE_COLO_COMPARE "colo-compare"
>>>   #define COLO_COMPARE(obj) \
>>> @@ -38,6 +39,28 @@
>>>   static QTAILQ_HEAD(, CompareState) net_compares =
>>>          QTAILQ_HEAD_INITIALIZER(net_compares);
>>>   +/*
>>> +  + CompareState ++
>>> +  |               |
>>> +  +---------------+   +---------------+ +---------------+
>>> +  |conn list      +--->conn +--------->conn           |
>>> +  +---------------+   +---------------+ +---------------+
>>> +  |               |     |           |             | |
>>> +  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
>>> +                    |primary |  |secondary    |primary | |secondary
>>> +                    |packet  |  |packet  +    |packet  | |packet  +
>>> +                    +--------+  +--------+    +--------+ +--------+
>>> +                        |           |             | |
>>> +                    +---v----+  +---v----+    +---v----+ +---v----+
>>> +                    |primary |  |secondary    |primary | |secondary
>>> +                    |packet  |  |packet  +    |packet  | |packet  +
>>> +                    +--------+  +--------+    +--------+ +--------+
>>> +                        |           |             | |
>>> +                    +---v----+  +---v----+    +---v----+ +---v----+
>>> +                    |primary |  |secondary    |primary | |secondary
>>> +                    |packet  |  |packet  +    |packet  | |packet  +
>>> +                    +--------+  +--------+    +--------+ +--------+
>>> +*/
>>>   typedef struct CompareState {
>>>       Object parent;
>>>   @@ -50,12 +73,103 @@ typedef struct CompareState {
>>>       QTAILQ_ENTRY(CompareState) next;
>>>       SocketReadState pri_rs;
>>>       SocketReadState sec_rs;
>>> +
>>> +    /* connection list: the connections belonged to this NIC could 
>>> be found
>>> +     * in this list.
>>> +     * element type: Connection
>>> +     */
>>> +    GQueue conn_list;
>>> +    QemuMutex conn_list_lock; /* to protect conn_list */
>>
>> Why need this mutex?
>
> will remove it.
>
>>
>>> +    /* hashtable to save connection */
>>> +    GHashTable *connection_track_table;
>>> +    /* to save unprocessed_connections */
>>> +    GQueue unprocessed_connections;
>>> +    /* proxy current hash size */
>>> +    uint32_t hashtable_size;
>>>   } CompareState;
>>>     typedef struct CompareClass {
>>>       ObjectClass parent_class;
>>>   } CompareClass;
>>>   +enum {
>>> +    PRIMARY_IN = 0,
>>> +    SECONDARY_IN,
>>> +};
>>> +
>>> +static int compare_chr_send(CharDriverState *out,
>>> +                            const uint8_t *buf,
>>> +                            uint32_t size);
>>> +
>>> +/*
>>> + * Return 0 on success, if return -1 means the pkt
>>> + * is unsupported(arp and ipv6) and will be sent later
>>> + */
>>> +static int packet_enqueue(CompareState *s, int mode)
>>> +{
>>> +    ConnectionKey key = {{ 0 } };
>>> +    Packet *pkt = NULL;
>>> +    Connection *conn;
>>> +
>>> +    if (mode == PRIMARY_IN) {
>>> +        pkt = packet_new(s->pri_rs.buf, s->pri_rs.packet_len);
>>> +    } else {
>>> +        pkt = packet_new(s->sec_rs.buf, s->sec_rs.packet_len);
>>> +    }
>>> +
>>> +    if (parse_packet_early(pkt)) {
>>> +        packet_destroy(pkt, NULL);
>>> +        pkt = NULL;
>>> +        return -1;
>>> +    }
>>> +    fill_connection_key(pkt, &key, PRIMARY_IN);
>>> +
>>> +    conn = connection_get(s->connection_track_table,
>>> +                          &key,
>>> +                          &s->hashtable_size);
>>> +    if (!conn->processing) {
>>> +        qemu_mutex_lock(&s->conn_list_lock);
>>> +        g_queue_push_tail(&s->conn_list, conn);
>>> +        qemu_mutex_unlock(&s->conn_list_lock);
>>> +        conn->processing = true;
>>> +    }
>>> +
>>> +    if (mode == PRIMARY_IN) {
>>> +        g_queue_push_tail(&conn->primary_list, pkt);
>>> +    } else {
>>> +        g_queue_push_tail(&conn->secondary_list, pkt);
>>> +    }
>>> +
>>> +    return 0;
>>> +}
>>> +
>>> +static int compare_chr_send(CharDriverState *out,
>>> +                            const uint8_t *buf,
>>> +                            uint32_t size)
>>> +{
>>> +    int ret = 0;
>>> +    uint32_t len = htonl(size);
>>> +
>>> +    if (!size) {
>>> +        return 0;
>>> +    }
>>> +
>>> +    ret = qemu_chr_fe_write_all(out, (uint8_t *)&len, sizeof(len));
>>> +    if (ret != sizeof(len)) {
>>> +        goto err;
>>> +    }
>>> +
>>> +    ret = qemu_chr_fe_write_all(out, (uint8_t *)buf, size);
>>> +    if (ret != size) {
>>> +        goto err;
>>> +    }
>>> +
>>> +    return 0;
>>> +
>>> +err:
>>> +    return ret < 0 ? ret : -EIO;
>>> +}
>>> +
>>>   static char *compare_get_pri_indev(Object *obj, Error **errp)
>>>   {
>>>       CompareState *s = COLO_COMPARE(obj);
>>> @@ -103,12 +217,21 @@ static void compare_set_outdev(Object *obj, 
>>> const char *value, Error **errp)
>>>     static void compare_pri_rs_finalize(SocketReadState *pri_rs)
>>>   {
>>> -    /* if packet_enqueue pri pkt failed we will send unsupported 
>>> packet */
>>> +    CompareState *s = container_of(pri_rs, CompareState, pri_rs);
>>> +
>>> +    if (packet_enqueue(s, PRIMARY_IN)) {
>>> +        trace_colo_compare_main("primary: unsupported packet in");
>>> +        compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len);
>>> +    }
>>
>> Do we have a upper limit on the maximum numbers of packets could be 
>> queued? If not, guest may easily trigger OOM.
>
> We need a g_queue to do this job? 

Maybe.

> It upper than the limit we drop the packet?
>
> Thanks
> Zhang Chen

Needs more thought, but we could start from dropping packets.

>
>>
>>>   }
>>>     static void compare_sec_rs_finalize(SocketReadState *sec_rs)
>>>   {
>>> -    /* if packet_enqueue sec pkt failed we will notify trace */
>>> +    CompareState *s = container_of(sec_rs, CompareState, sec_rs);
>>> +
>>> +    if (packet_enqueue(s, SECONDARY_IN)) {
>>> +        trace_colo_compare_main("secondary: unsupported packet in");
>>> +    }
>>>   }
>>>     /*
>>> @@ -161,6 +284,15 @@ static void colo_compare_complete(UserCreatable 
>>> *uc, Error **errp)
>>>       net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize);
>>>       net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize);
>>>   +    g_queue_init(&s->conn_list);
>>> +    qemu_mutex_init(&s->conn_list_lock);
>>> +    s->hashtable_size = 0;
>>> +
>>> +    s->connection_track_table = 
>>> g_hash_table_new_full(connection_key_hash,
>>> + connection_key_equal,
>>> +                                                      g_free,
>>> + connection_destroy);
>>> +
>>>       return;
>>>   }
>>>   @@ -203,6 +335,8 @@ static void colo_compare_finalize(Object *obj)
>>>       if (!QTAILQ_EMPTY(&net_compares)) {
>>>           QTAILQ_REMOVE(&net_compares, s, next);
>>>       }
>>> +    qemu_mutex_destroy(&s->conn_list_lock);
>>> +    g_queue_free(&s->conn_list);
>>>         g_free(s->pri_indev);
>>>       g_free(s->sec_indev);
>>> diff --git a/trace-events b/trace-events
>>> index ca7211b..703de1a 100644
>>> --- a/trace-events
>>> +++ b/trace-events
>>> @@ -1916,3 +1916,6 @@ aspeed_vic_update_fiq(int flags) "Raising FIQ: 
>>> %d"
>>>   aspeed_vic_update_irq(int flags) "Raising IRQ: %d"
>>>   aspeed_vic_read(uint64_t offset, unsigned size, uint32_t value) 
>>> "From 0x%" PRIx64 " of size %u: 0x%" PRIx32
>>>   aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) 
>>> "To 0x%" PRIx64 " of size %u: 0x%" PRIx32
>>> +
>>> +# net/colo-compare.c
>>> +colo_compare_main(const char *chr) ": %s"
>>
>>
>>
>> .
>>
>

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [RFC PATCH V5 3/4] colo-compare: introduce packet comparison thread
  2016-07-08  4:23   ` Jason Wang
@ 2016-07-11  7:17     ` Zhang Chen
  0 siblings, 0 replies; 21+ messages in thread
From: Zhang Chen @ 2016-07-11  7:17 UTC (permalink / raw)
  To: Jason Wang, qemu devel
  Cc: Li Zhijian, Wen Congyang, zhanghailiang, eddie . dong,
	Dr . David Alan Gilbert



On 07/08/2016 12:23 PM, Jason Wang wrote:
>
>
> On 2016年06月23日 19:34, Zhang Chen wrote:
>> if packets are same, we send primary packet and drop secondary
>> packet, otherwise notify COLO do checkpoint.
>
> More verbose please, e.g how to handle each case of exception (or 
> maybe comment in the code).
>

OK.

>>
>> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
>> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
>> ---
>>   net/colo-base.c    |   1 +
>>   net/colo-base.h    |   3 +
>>   net/colo-compare.c | 214 
>> +++++++++++++++++++++++++++++++++++++++++++++++++++++
>>   trace-events       |   2 +
>>   4 files changed, 220 insertions(+)
>>
>> diff --git a/net/colo-base.c b/net/colo-base.c
>> index 7e263e8..9673661 100644
>> --- a/net/colo-base.c
>> +++ b/net/colo-base.c
>> @@ -146,6 +146,7 @@ Packet *packet_new(const void *data, int size)
>>         pkt->data = g_memdup(data, size);
>>       pkt->size = size;
>> +    pkt->creation_ms = qemu_clock_get_ms(QEMU_CLOCK_HOST);
>>         return pkt;
>>   }
>> diff --git a/net/colo-base.h b/net/colo-base.h
>> index 01c1a5d..8bb1043 100644
>> --- a/net/colo-base.h
>> +++ b/net/colo-base.h
>> @@ -18,6 +18,7 @@
>>   #include "slirp/slirp.h"
>>   #include "qemu/jhash.h"
>>   #include "qemu/rcu.h"
>> +#include "qemu/timer.h"
>>     #define HASHTABLE_MAX_SIZE 16384
>>   @@ -47,6 +48,8 @@ typedef struct Packet {
>>       };
>>       uint8_t *transport_layer;
>>       int size;
>> +    /* Time of packet creation, in wall clock ms */
>> +    int64_t creation_ms;
>>   } Packet;
>>     typedef struct ConnectionKey {
>> diff --git a/net/colo-compare.c b/net/colo-compare.c
>> index 4231fe7..928d729 100644
>> --- a/net/colo-compare.c
>> +++ b/net/colo-compare.c
>> @@ -35,6 +35,8 @@
>>       OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
>>     #define COMPARE_READ_LEN_MAX NET_BUFSIZE
>> +/* TODO: Should be configurable */
>> +#define REGULAR_CHECK_MS 400
>
> "REGULAR" seems to generic, need a better name.

Like  "REGULAR_PACKET_CHECK_MS" ?

>
>>     static QTAILQ_HEAD(, CompareState) net_compares =
>>          QTAILQ_HEAD_INITIALIZER(net_compares);
>> @@ -86,6 +88,11 @@ typedef struct CompareState {
>>       GQueue unprocessed_connections;
>>       /* proxy current hash size */
>>       uint32_t hashtable_size;
>> +    /* compare thread, a thread for each NIC */
>> +    QemuThread thread;
>> +    int thread_status;
>> +    /* Timer used on the primary to find packets that are never 
>> matched */
>> +    QEMUTimer *timer;
>>   } CompareState;
>>     typedef struct CompareClass {
>> @@ -97,6 +104,15 @@ enum {
>>       SECONDARY_IN,
>>   };
>>   +enum {
>> +    /* compare thread isn't started */
>> +    COMPARE_THREAD_NONE,
>> +    /* compare thread is running */
>> +    COMPARE_THREAD_RUNNING,
>> +    /* compare thread exit */
>> +    COMPARE_THREAD_EXIT,
>> +};
>> +
>>   static int compare_chr_send(CharDriverState *out,
>>                               const uint8_t *buf,
>>                               uint32_t size);
>> @@ -143,6 +159,98 @@ static int packet_enqueue(CompareState *s, int 
>> mode)
>>       return 0;
>>   }
>>   +/*
>> + * The IP packets sent by primary and secondary
>> + * will be compared in here
>> + * TODO support ip fragment, Out-Of-Order
>> + * return:    0  means packet same
>> + *            > 0 || < 0 means packet different
>> + */
>> +static int colo_packet_compare(Packet *ppkt, Packet *spkt)
>> +{
>> +    trace_colo_compare_ip_info(ppkt->size, inet_ntoa(ppkt->ip->ip_src),
>> + inet_ntoa(ppkt->ip->ip_dst), spkt->size,
>> + inet_ntoa(spkt->ip->ip_src),
>> + inet_ntoa(spkt->ip->ip_dst));
>> +
>> +    if (ppkt->size == spkt->size) {
>> +        return memcmp(ppkt->data, spkt->data, spkt->size);
>> +    } else {
>> +        return -1;
>> +    }
>> +}
>> +
>> +static int colo_packet_compare_all(Packet *spkt, Packet *ppkt)
>> +{
>> +    trace_colo_compare_main("compare all");
>> +    return colo_packet_compare(ppkt, spkt);
>> +}
>> +
>> +static void colo_old_packet_check(void *opaque_packet, void 
>> *opaque_found)
>> +{
>> +    int64_t now;
>> +    bool *found_old = (bool *)opaque_found;
>> +    Packet *ppkt = (Packet *)opaque_packet;
>> +
>> +    if (*found_old) {
>> +        /* Someone found an old packet earlier in the queue */
>> +        return;
>> +    }
>> +
>> +    now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
>> +    if ((ppkt->creation_ms < now) &&
>
> Any case that ppkt->creation_ms >= now?

No, will remove it.

>
>> +        ((now - ppkt->creation_ms) > REGULAR_CHECK_MS)) {
>> + trace_colo_old_packet_check_found(ppkt->creation_ms);
>> +        *found_old = true;
>> +    }
>> +}
>> +
>> +/*
>> + * called from the compare thread on the primary
>> + * for compare connection
>> + */
>> +static void colo_compare_connection(void *opaque, void *user_data)
>> +{
>> +    CompareState *s = user_data;
>> +    Connection *conn = opaque;
>> +    Packet *pkt = NULL;
>> +    GList *result = NULL;
>> +    bool found_old;
>> +    int ret;
>> +
>> +    while (!g_queue_is_empty(&conn->primary_list) &&
>> +           !g_queue_is_empty(&conn->secondary_list)) {
>> +        pkt = g_queue_pop_tail(&conn->primary_list);
>> +        result = g_queue_find_custom(&conn->secondary_list,
>> +                              pkt, 
>> (GCompareFunc)colo_packet_compare_all);
>> +
>> +        if (result) {
>> +            ret = compare_chr_send(s->chr_out, pkt->data, pkt->size);
>> +            if (ret < 0) {
>> +                error_report("colo_send_primary_packet failed");
>> +            }
>> +            trace_colo_compare_main("packet same and release packet");
>> +            g_queue_remove(&conn->secondary_list, result->data);
>> +        } else {
>
> A question I forget the answer, so may ask again. What if secondary 
> packet comes late?

If secondary packet comes late, primary queue has some primary packet.
we use timer to regular call colo_compare_connection(), In here,
we foreach conn->primary_list, if have old primary packet(secondary 
packet comes late),
will call colo_notify_checkpoint() to do a checkpoint,that can make primary
and secondary be same.


>
>> + trace_colo_compare_main("packet different");
>> +            g_queue_push_tail(&conn->primary_list, pkt);
>> +            /* TODO: colo_notify_checkpoint();*/
>> +            break;
>> +        }
>> +    }
>> +
>> +    /*
>> +     * Look for old packets that the secondary hasn't matched,
>> +     * if we have some then we have to checkpoint to wake
>> +     * the secondary up.
>> +     */
>> +    found_old = false;
>> +    g_queue_foreach(&conn->primary_list, colo_old_packet_check, 
>> &found_old);
>> +    if (found_old) {
>> +        /* TODO: colo_notify_checkpoint();*/
>
> Shouldn't we need to remove all "old" packets here?

yes,will add remove func.

>
>> +    }
>> +}
>> +
>>   static int compare_chr_send(CharDriverState *out,
>>                               const uint8_t *buf,
>>                               uint32_t size)
>> @@ -170,6 +278,69 @@ err:
>>       return ret < 0 ? ret : -EIO;
>>   }
>>   +static int compare_chr_can_read(void *opaque)
>> +{
>> +    return COMPARE_READ_LEN_MAX;
>> +}
>> +
>> +/*
>> + * called from the main thread on the primary for packets
>> + * arriving over the socket from the primary.
>> + */
>> +static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int 
>> size)
>> +{
>> +    CompareState *s = COLO_COMPARE(opaque);
>> +    int ret;
>> +
>> +    ret = net_fill_rstate(&s->pri_rs, buf, size);
>> +    if (ret == -1) {
>> +        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
>> +        error_report("colo-compare primary_in error");
>> +    }
>> +}
>> +
>> +/*
>> + * called from the main thread on the primary for packets
>> + * arriving over the socket from the secondary.
>> + */
>> +static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int 
>> size)
>> +{
>> +    CompareState *s = COLO_COMPARE(opaque);
>> +    int ret;
>> +
>> +    ret = net_fill_rstate(&s->sec_rs, buf, size);
>> +    if (ret == -1) {
>> +        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
>> +        error_report("colo-compare secondary_in error");
>> +    }
>> +}
>> +
>> +static void *colo_compare_thread(void *opaque)
>> +{
>> +    GMainContext *worker_context;
>> +    GMainLoop *compare_loop;
>> +    CompareState *s = opaque;
>> +
>> +    worker_context = g_main_context_new();
>> +    g_assert(g_main_context_get_thread_default() == NULL);
>> +    g_main_context_push_thread_default(worker_context);
>> +    g_assert(g_main_context_get_thread_default() == worker_context);
>> +
>> +    qemu_chr_add_handlers(s->chr_pri_in, compare_chr_can_read,
>> +                          compare_pri_chr_in, NULL, s);
>> +    qemu_chr_add_handlers(s->chr_sec_in, compare_chr_can_read,
>> +                          compare_sec_chr_in, NULL, s);
>> +
>> +    compare_loop = g_main_loop_new(worker_context, FALSE);
>> +
>> +    g_main_loop_run(compare_loop);
>> +
>> +    g_main_loop_unref(compare_loop);
>> +    g_main_context_pop_thread_default(worker_context);
>> +    g_main_context_unref(worker_context);
>> +    return NULL;
>> +}
>> +
>>   static char *compare_get_pri_indev(Object *obj, Error **errp)
>>   {
>>       CompareState *s = COLO_COMPARE(obj);
>> @@ -222,6 +393,9 @@ static void 
>> compare_pri_rs_finalize(SocketReadState *pri_rs)
>>       if (packet_enqueue(s, PRIMARY_IN)) {
>>           trace_colo_compare_main("primary: unsupported packet in");
>>           compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len);
>> +    } else {
>> +        /* compare connection */
>> +        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
>>       }
>>   }
>>   @@ -231,16 +405,35 @@ static void 
>> compare_sec_rs_finalize(SocketReadState *sec_rs)
>>         if (packet_enqueue(s, SECONDARY_IN)) {
>>           trace_colo_compare_main("secondary: unsupported packet in");
>> +    } else {
>> +        /* compare connection */
>> +        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
>>       }
>>   }
>>     /*
>> + * Prod the compare thread regularly so it can watch for any packets
>> + * that the secondary hasn't produced equivalents of.
>> + */
>> +static void colo_compare_regular(void *opaque)
>> +{
>> +    CompareState *s = opaque;
>> +
>> +    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
>> +                        REGULAR_CHECK_MS);
>> +    /* compare connection */
>> +    g_queue_foreach(&s->conn_list, colo_compare_connection, s);
>> +}
>
> We need make sure this function was called from colo thread, but it 
> looks not?

Yes, In next version I will make old_packet_check related codes 
independent with
colo_compare_connection().


>
>> +
>> +/*
>>    * called from the main thread on the primary
>>    * to setup colo-compare.
>>    */
>>   static void colo_compare_complete(UserCreatable *uc, Error **errp)
>>   {
>>       CompareState *s = COLO_COMPARE(uc);
>> +    char thread_name[64];
>> +    static int compare_id;
>>         if (!s->pri_indev || !s->sec_indev || !s->outdev) {
>>           error_setg(errp, "colo compare needs 'primary_in' ,"
>> @@ -293,6 +486,19 @@ static void colo_compare_complete(UserCreatable 
>> *uc, Error **errp)
>>                                                         g_free,
>> connection_destroy);
>>   +    s->thread_status = COMPARE_THREAD_RUNNING;
>> +    sprintf(thread_name, "compare %d", compare_id);
>> +    qemu_thread_create(&s->thread, thread_name,
>> +                       colo_compare_thread, s,
>> +                       QEMU_THREAD_JOINABLE);
>> +    compare_id++;
>> +
>> +    /* A regular timer to kick any packets that the secondary 
>> doesn't match */
>> +    s->timer = timer_new_ms(QEMU_CLOCK_VIRTUAL, /* Only when guest 
>> runs */
>> +                            colo_compare_regular, s);
>> +    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
>> +                        REGULAR_CHECK_MS);
>> +
>>       return;
>>   }
>>   @@ -338,6 +544,14 @@ static void colo_compare_finalize(Object *obj)
>>       qemu_mutex_destroy(&s->conn_list_lock);
>>       g_queue_free(&s->conn_list);
>>   +    if (s->thread.thread) {
>> +        s->thread_status = COMPARE_THREAD_EXIT;
>
> Looks like there's not any code that depends on the status, so why 
> need to this>

Currently we needn't this, will move it to "work with colo-frame" 
related patch.

Thanks
Zhang Chen

>
>> +        /* compare connection */
>> +        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
>> +        qemu_thread_join(&s->thread);
>> +    }
>> +    timer_del(s->timer);
>> +
>>       g_free(s->pri_indev);
>>       g_free(s->sec_indev);
>>       g_free(s->outdev);
>> diff --git a/trace-events b/trace-events
>> index 703de1a..1537e91 100644
>> --- a/trace-events
>> +++ b/trace-events
>> @@ -1919,3 +1919,5 @@ aspeed_vic_write(uint64_t offset, unsigned 
>> size, uint32_t data) "To 0x%" PRIx64
>>     # net/colo-compare.c
>>   colo_compare_main(const char *chr) ": %s"
>> +colo_compare_ip_info(int psize, const char *sta, const char *stb, 
>> int ssize, const char *stc, const char *std) "ppkt size = %d, ip_src 
>> = %s, ip_dst = %s, spkt size = %d, ip_src = %s, ip_dst = %s"
>> +colo_old_packet_check_found(int64_t old_time) "%" PRId64
>
>
>
> .
>

-- 
Thanks
zhangchen

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [RFC PATCH V5 4/4] colo-compare: add TCP, UDP, ICMP packet comparison
  2016-07-08  8:59   ` Jason Wang
@ 2016-07-11 10:02     ` Zhang Chen
  2016-07-13  2:54       ` Jason Wang
  0 siblings, 1 reply; 21+ messages in thread
From: Zhang Chen @ 2016-07-11 10:02 UTC (permalink / raw)
  To: Jason Wang, qemu devel
  Cc: Li Zhijian, eddie . dong, Dr . David Alan Gilbert, zhanghailiang



On 07/08/2016 04:59 PM, Jason Wang wrote:
>
>
> On 2016年06月23日 19:34, Zhang Chen wrote:
>> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
>> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
>> ---
>>   net/colo-compare.c | 171 
>> +++++++++++++++++++++++++++++++++++++++++++++++++++--
>>   trace-events       |   6 ++
>>   2 files changed, 173 insertions(+), 4 deletions(-)
>
> Commit log please.

OK.

>
>> diff --git a/net/colo-compare.c b/net/colo-compare.c
>> index 928d729..addf704 100644
>> --- a/net/colo-compare.c
>> +++ b/net/colo-compare.c
>> @@ -18,6 +18,7 @@
>>   #include "qapi/qmp/qerror.h"
>>   #include "qapi/error.h"
>>   #include "net/net.h"
>> +#include "net/eth.h"
>>   #include "net/vhost_net.h"
>>   #include "qom/object_interfaces.h"
>>   #include "qemu/iov.h"
>> @@ -180,9 +181,155 @@ static int colo_packet_compare(Packet *ppkt, 
>> Packet *spkt)
>>       }
>>   }
>>   -static int colo_packet_compare_all(Packet *spkt, Packet *ppkt)
>> +/*
>> + * called from the compare thread on the primary
>> + * for compare tcp packet
>> + * compare_tcp copied from Dr. David Alan Gilbert's branch
>> + */
>> +static int colo_packet_compare_tcp(Packet *spkt, Packet *ppkt)
>> +{
>> +    struct tcphdr *ptcp, *stcp;
>> +    int res;
>> +    char *sdebug, *ddebug;
>> +
>> +    trace_colo_compare_main("compare tcp");
>> +    if (ppkt->size != spkt->size) {
>> +        if (trace_event_get_state(TRACE_COLO_COMPARE_MISCOMPARE)) {
>> +            trace_colo_compare_main("pkt size not same");
>> +        }
>> +        return -1;
>> +    }
>> +
>> +    ptcp = (struct tcphdr *)ppkt->transport_layer;
>> +    stcp = (struct tcphdr *)spkt->transport_layer;
>> +
>> +    if (ptcp->th_seq != stcp->th_seq) {
>> +        if (trace_event_get_state(TRACE_COLO_COMPARE_MISCOMPARE)) {
>> +            trace_colo_compare_main("pkt tcp seq not same");
>> +        }
>> +        return -1;
>> +    }
>> +
>> +    /*
>> +     * The 'identification' field in the IP header is *very* random
>> +     * it almost never matches.  Fudge this by ignoring differences in
>> +     * unfragmented packets; they'll normally sort themselves out if 
>> different
>> +     * anyway, and it should recover at the TCP level.
>> +     * An alternative would be to get both the primary and secondary 
>> to rewrite
>> +     * somehow; but that would need some sync traffic to sync the state
>> +     */
>> +    if (ntohs(ppkt->ip->ip_off) & IP_DF) {
>> +        spkt->ip->ip_id = ppkt->ip->ip_id;
>> +        /* and the sum will be different if the IDs were different */
>> +        spkt->ip->ip_sum = ppkt->ip->ip_sum;
>> +    }
>
> I was considering, can we do some trick in rewriter instead of here?

In secondary, we have no way to get primary ppkt->ip->ip_off.
this flag just use in IP fragmentation. so, that is no affect currently.
we will fix it after we support IP fragmentation func.

>
>> +
>> +    res = memcmp(ppkt->data + ETH_HLEN, spkt->data + ETH_HLEN,
>> +                (spkt->size - ETH_HLEN));
>> +
>> +    if (res != 0 && 
>> trace_event_get_state(TRACE_COLO_COMPARE_MISCOMPARE)) {
>> +        sdebug = strdup(inet_ntoa(ppkt->ip->ip_src));
>> +        ddebug = strdup(inet_ntoa(ppkt->ip->ip_dst));
>> +        fprintf(stderr, "%s: src/dst: %s/%s p: seq/ack=%u/%u"
>> +        " s: seq/ack=%u/%u res=%d flags=%x/%x\n", __func__,
>> +                   sdebug, ddebug,
>> +                   ntohl(ptcp->th_seq), ntohl(ptcp->th_ack),
>> +                   ntohl(stcp->th_seq), ntohl(stcp->th_ack),
>> +                   res, ptcp->th_flags, stcp->th_flags);
>> +
>> +        trace_colo_compare_tcp_miscompare("Primary len", ppkt->size);
>> +        qemu_hexdump((char *)ppkt->data, stderr, "colo-compare", 
>> ppkt->size);
>> +        trace_colo_compare_tcp_miscompare("Secondary len", spkt->size);
>> +        qemu_hexdump((char *)spkt->data, stderr, "colo-compare", 
>> spkt->size);
>> +
>> +        g_free(sdebug);
>> +        g_free(ddebug);
>> +    }
>> +
>> +    return res;
>> +}
>> +
>> +/*
>> + * called from the compare thread on the primary
>> + * for compare udp packet
>> + */
>> +static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
>> +{
>> +    int ret;
>> +
>> +    trace_colo_compare_main("compare udp");
>> +    ret = colo_packet_compare(ppkt, spkt);
>> +
>> +    if (ret) {
>> +        trace_colo_compare_udp_miscompare("primary pkt size", 
>> ppkt->size);
>> +        qemu_hexdump((char *)ppkt->data, stderr, "colo-compare", 
>> ppkt->size);
>> +        trace_colo_compare_udp_miscompare("Secondary pkt size", 
>> spkt->size);
>> +        qemu_hexdump((char *)spkt->data, stderr, "colo-compare", 
>> spkt->size);
>> +    }
>> +
>> +    return ret;
>> +}
>> +
>> +/*
>> + * called from the compare thread on the primary
>> + * for compare icmp packet
>> + */
>> +static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
>> +{
>> +    int network_length;
>> +    struct icmp *icmp_ppkt, *icmp_spkt;
>> +
>> +    trace_colo_compare_main("compare icmp");
>> +    network_length = ppkt->ip->ip_hl * 4;
>> +    if (ppkt->size != spkt->size ||
>> +        ppkt->size < network_length + ETH_HLEN) {
>> +        trace_colo_compare_icmp_miscompare_size(ppkt->size, 
>> spkt->size);
>> +        return -1;
>> +    }
>> +    icmp_ppkt = (struct icmp *)(ppkt->data + network_length + 
>> ETH_HLEN);
>> +    icmp_spkt = (struct icmp *)(spkt->data + network_length + 
>> ETH_HLEN);
>> +
>> +    if ((icmp_ppkt->icmp_type == icmp_spkt->icmp_type) &&
>> +        (icmp_ppkt->icmp_code == icmp_spkt->icmp_code)) {
>> +        if (icmp_ppkt->icmp_type == ICMP_REDIRECT) {
>> +            if (icmp_ppkt->icmp_gwaddr.s_addr !=
>> +                icmp_spkt->icmp_gwaddr.s_addr) {
>> +                trace_colo_compare_main("icmp_gwaddr.s_addr not same");
>> +                trace_colo_compare_icmp_miscompare_addr("ppkt s_addr",
>> +                        inet_ntoa(icmp_ppkt->icmp_gwaddr));
>> +                trace_colo_compare_icmp_miscompare_addr("spkt s_addr",
>> +                        inet_ntoa(icmp_spkt->icmp_gwaddr));
>> +                return -1;
>> +            }
>> +        } else if ((icmp_ppkt->icmp_type == ICMP_UNREACH) &&
>> +                   (icmp_ppkt->icmp_type == ICMP_UNREACH_NEEDFRAG)) {
>> +            if (icmp_ppkt->icmp_nextmtu != icmp_spkt->icmp_nextmtu) {
>> +                trace_colo_compare_main("icmp_nextmtu not same");
>> +                trace_colo_compare_icmp_miscompare_mtu("ppkt nextmtu",
>> + icmp_ppkt->icmp_nextmtu);
>> +                trace_colo_compare_icmp_miscompare_mtu("spkt nextmtu",
>> + icmp_spkt->icmp_nextmtu);
>> +                return -1;
>> +            }
>> +        }
>> +    } else {
>> +        return -1;
>> +    }
>
> Why only compare part of icmp packet?
>

That's include most of situation, increase all part of icmp
can reduce compare efficiency.

Thanks
Zhang Chen

>> +
>> +    return 0;
>> +}
>> +
>> +/*
>> + * called from the compare thread on the primary
>> + * for compare other packet
>> + */
>> +static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
>>   {
>> -    trace_colo_compare_main("compare all");
>> +    trace_colo_compare_main("compare other");
>> +    trace_colo_compare_ip_info(ppkt->size, inet_ntoa(ppkt->ip->ip_src),
>> + inet_ntoa(ppkt->ip->ip_dst), spkt->size,
>> + inet_ntoa(spkt->ip->ip_src),
>> + inet_ntoa(spkt->ip->ip_dst));
>>       return colo_packet_compare(ppkt, spkt);
>>   }
>>   @@ -221,8 +368,24 @@ static void colo_compare_connection(void 
>> *opaque, void *user_data)
>>       while (!g_queue_is_empty(&conn->primary_list) &&
>>              !g_queue_is_empty(&conn->secondary_list)) {
>>           pkt = g_queue_pop_tail(&conn->primary_list);
>> -        result = g_queue_find_custom(&conn->secondary_list,
>> -                              pkt, 
>> (GCompareFunc)colo_packet_compare_all);
>> +        switch (conn->ip_proto) {
>> +        case IPPROTO_TCP:
>> +            result = g_queue_find_custom(&conn->secondary_list,
>> +                     pkt, (GCompareFunc)colo_packet_compare_tcp);
>> +            break;
>> +        case IPPROTO_UDP:
>> +            result = g_queue_find_custom(&conn->secondary_list,
>> +                     pkt, (GCompareFunc)colo_packet_compare_udp);
>> +            break;
>> +        case IPPROTO_ICMP:
>> +            result = g_queue_find_custom(&conn->secondary_list,
>> +                     pkt, (GCompareFunc)colo_packet_compare_icmp);
>> +            break;
>> +        default:
>> +            result = g_queue_find_custom(&conn->secondary_list,
>> +                     pkt, (GCompareFunc)colo_packet_compare_other);
>> +            break;
>> +        }
>>             if (result) {
>>               ret = compare_chr_send(s->chr_out, pkt->data, pkt->size);
>> diff --git a/trace-events b/trace-events
>> index 1537e91..6686cdf 100644
>> --- a/trace-events
>> +++ b/trace-events
>> @@ -1919,5 +1919,11 @@ aspeed_vic_write(uint64_t offset, unsigned 
>> size, uint32_t data) "To 0x%" PRIx64
>>     # net/colo-compare.c
>>   colo_compare_main(const char *chr) ": %s"
>> +colo_compare_tcp_miscompare(const char *sta, int size) ": %s = %d"
>> +colo_compare_udp_miscompare(const char *sta, int size) ": %s = %d"
>> +colo_compare_icmp_miscompare_size(int psize, int ssize) ":ppkt size 
>> = %d spkt size = %d"
>> +colo_compare_icmp_miscompare_addr(const char *sta, const char *stb) 
>> ": %s  %s"
>> +colo_compare_icmp_miscompare_mtu(const char *sta, int size) ": %s  %d"
>>   colo_compare_ip_info(int psize, const char *sta, const char *stb, 
>> int ssize, const char *stc, const char *std) "ppkt size = %d, ip_src 
>> = %s, ip_dst = %s, spkt size = %d, ip_src = %s, ip_dst = %s"
>>   colo_old_packet_check_found(int64_t old_time) "%" PRId64
>> +colo_compare_miscompare(void) ""
>
>
>
> .
>

-- 
Thanks
zhangchen

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [RFC PATCH V5 2/4] colo-compare: track connection and enqueue packet
  2016-07-11  5:41       ` Jason Wang
@ 2016-07-12  5:42         ` Zhang Chen
  0 siblings, 0 replies; 21+ messages in thread
From: Zhang Chen @ 2016-07-12  5:42 UTC (permalink / raw)
  To: Jason Wang, qemu devel
  Cc: Li Zhijian, Wen Congyang, zhanghailiang, eddie . dong,
	Dr . David Alan Gilbert



On 07/11/2016 01:41 PM, Jason Wang wrote:
>
>
> On 2016年07月08日 17:56, Zhang Chen wrote:
>>
>>
>> On 07/08/2016 12:07 PM, Jason Wang wrote:
>>>
>>>
>>> On 2016年06月23日 19:34, Zhang Chen wrote:
>>>> In this patch we use kernel jhash table to track
>>>> connection, and then enqueue net packet like this:
>>>>
>>>> + CompareState ++
>>>> |               |
>>>> +---------------+   +---------------+ +---------------+
>>>> |conn list      +--->conn +--------->conn           |
>>>> +---------------+   +---------------+ +---------------+
>>>> |               |     |           |             |          |
>>>> +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
>>>>                    |primary |  |secondary    |primary | |secondary
>>>>                    |packet  |  |packet  +    |packet  | |packet +
>>>>                    +--------+  +--------+    +--------+ +--------+
>>>>                        |           |             | |
>>>>                    +---v----+  +---v----+    +---v----+ +---v----+
>>>>                    |primary |  |secondary    |primary | |secondary
>>>>                    |packet  |  |packet  +    |packet  | |packet +
>>>>                    +--------+  +--------+    +--------+ +--------+
>>>>                        |           |             | |
>>>>                    +---v----+  +---v----+    +---v----+ +---v----+
>>>>                    |primary |  |secondary    |primary | |secondary
>>>>                    |packet  |  |packet  +    |packet  | |packet +
>>>>                    +--------+  +--------+    +--------+ +--------+
>>>
>>> A paragraph to describe the above would be more than welcomed.
>>
>> I will add some comments for it.
>>
>>>
>>>> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>>>> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
>>>> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
>>>> ---
>>>>   include/qemu/jhash.h |  61 ++++++++++++++++
>>>>   net/Makefile.objs    |   1 +
>>>>   net/colo-base.c      | 194 
>>>> +++++++++++++++++++++++++++++++++++++++++++++++++++
>>>>   net/colo-base.h      |  88 +++++++++++++++++++++++
>>>>   net/colo-compare.c   | 138 +++++++++++++++++++++++++++++++++++-
>>>>   trace-events         |   3 +
>>>>   6 files changed, 483 insertions(+), 2 deletions(-)
>>>>   create mode 100644 include/qemu/jhash.h
>>>>   create mode 100644 net/colo-base.c
>>>>   create mode 100644 net/colo-base.h
>>>>
>>>> diff --git a/include/qemu/jhash.h b/include/qemu/jhash.h
>>>> new file mode 100644
>>>> index 0000000..0fcd875
>>>> --- /dev/null
>>>> +++ b/include/qemu/jhash.h
>>>> @@ -0,0 +1,61 @@
>>>> +/* jhash.h: Jenkins hash support.
>>>> +  *
>>>> +  * Copyright (C) 2006. Bob Jenkins (bob_jenkins@burtleburtle.net)
>>>> +  *
>>>> +  * http://burtleburtle.net/bob/hash/
>>>> +  *
>>>> +  * These are the credits from Bob's sources:
>>>> +  *
>>>> +  * lookup3.c, by Bob Jenkins, May 2006, Public Domain.
>>>> +  *
>>>> +  * These are functions for producing 32-bit hashes for hash table 
>>>> lookup.
>>>> +  * hashword(), hashlittle(), hashlittle2(), hashbig(), mix(), and 
>>>> final()
>>>> +  * are externally useful functions.  Routines to test the hash are
>>>> +included
>>>> +  * if SELF_TEST is defined.  You can use this free for any purpose.
>>>> +It's in
>>>> +  * the public domain.  It has no warranty.
>>>> +  *
>>>> +  * Copyright (C) 2009-2010 Jozsef Kadlecsik 
>>>> (kadlec@blackhole.kfki.hu)
>>>> +  *
>>>> +  * I've modified Bob's hash to be useful in the Linux kernel, and
>>>> +  * any bugs present are my fault.
>>>> +  * Jozsef
>>>> +  */
>>>> +
>>>> +#ifndef QEMU_JHASH_H__
>>>> +#define QEMU_JHASH_H__
>>>> +
>>>> +#include "qemu/bitops.h"
>>>> +
>>>> +/*
>>>> + * hashtable relation copy from linux kernel jhash
>>>> + */
>>>> +
>>>> +/* __jhash_mix -- mix 3 32-bit values reversibly. */
>>>> +#define __jhash_mix(a, b, c)                \
>>>> +{                                           \
>>>> +    a -= c;  a ^= rol32(c, 4);  c += b;     \
>>>> +    b -= a;  b ^= rol32(a, 6);  a += c;     \
>>>> +    c -= b;  c ^= rol32(b, 8);  b += a;     \
>>>> +    a -= c;  a ^= rol32(c, 16); c += b;     \
>>>> +    b -= a;  b ^= rol32(a, 19); a += c;     \
>>>> +    c -= b;  c ^= rol32(b, 4);  b += a;     \
>>>> +}
>>>> +
>>>> +/* __jhash_final - final mixing of 3 32-bit values (a,b,c) into c */
>>>> +#define __jhash_final(a, b, c)  \
>>>> +{                               \
>>>> +    c ^= b; c -= rol32(b, 14);  \
>>>> +    a ^= c; a -= rol32(c, 11);  \
>>>> +    b ^= a; b -= rol32(a, 25);  \
>>>> +    c ^= b; c -= rol32(b, 16);  \
>>>> +    a ^= c; a -= rol32(c, 4);   \
>>>> +    b ^= a; b -= rol32(a, 14);  \
>>>> +    c ^= b; c -= rol32(b, 24);  \
>>>> +}
>>>> +
>>>> +/* An arbitrary initial parameter */
>>>> +#define JHASH_INITVAL           0xdeadbeef
>>>> +
>>>> +#endif /* QEMU_JHASH_H__ */
>>>
>>> Please split jhash into another patch.
>>
>> Split to a independent patch in this patch set or not?
>
> Better this series since it was the first user.
>
>>
>>
>>>
>>>> diff --git a/net/Makefile.objs b/net/Makefile.objs
>>>> index ba92f73..119589f 100644
>>>> --- a/net/Makefile.objs
>>>> +++ b/net/Makefile.objs
>>>> @@ -17,3 +17,4 @@ common-obj-y += filter.o
>>>>   common-obj-y += filter-buffer.o
>>>>   common-obj-y += filter-mirror.o
>>>>   common-obj-y += colo-compare.o
>>>> +common-obj-y += colo-base.o
>>>> diff --git a/net/colo-base.c b/net/colo-base.c
>>>> new file mode 100644
>>>> index 0000000..7e263e8
>>>> --- /dev/null
>>>> +++ b/net/colo-base.c
>>>> @@ -0,0 +1,194 @@
>>>> +/*
>>>> + * COarse-grain LOck-stepping Virtual Machines for Non-stop 
>>>> Service (COLO)
>>>> + * (a.k.a. Fault Tolerance or Continuous Replication)
>>>> + *
>>>> + * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
>>>> + * Copyright (c) 2016 FUJITSU LIMITED
>>>> + * Copyright (c) 2016 Intel Corporation
>>>> + *
>>>> + * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>>>> + *
>>>> + * This work is licensed under the terms of the GNU GPL, version 2 or
>>>> + * later.  See the COPYING file in the top-level directory.
>>>> + */
>>>> +
>>>> +#include "qemu/osdep.h"
>>>> +#include "qemu/error-report.h"
>>>> +#include "net/colo-base.h"
>>>> +
>>>> +uint32_t connection_key_hash(const void *opaque)
>>>> +{
>>>> +    const ConnectionKey *key = opaque;
>>>> +    uint32_t a, b, c;
>>>> +
>>>> +    /* Jenkins hash */
>>>> +    a = b = c = JHASH_INITVAL + sizeof(*key);
>>>> +    a += key->src.s_addr;
>>>> +    b += key->dst.s_addr;
>>>> +    c += (key->src_port | key->dst_port << 16);
>>>> +    __jhash_mix(a, b, c);
>>>> +
>>>> +    a += key->ip_proto;
>>>> +    __jhash_final(a, b, c);
>>>> +
>>>> +    return c;
>>>> +}
>>>> +
>>>> +int connection_key_equal(const void *key1, const void *key2)
>>>> +{
>>>> +    return memcmp(key1, key2, sizeof(ConnectionKey)) == 0;
>>>> +}
>>>> +
>>>> +int parse_packet_early(Packet *pkt)
>>>> +{
>>>> +    int network_length;
>>>> +    uint8_t *data = pkt->data;
>>>> +    uint16_t l3_proto;
>>>> +    ssize_t l2hdr_len = eth_get_l2_hdr_length(data);
>>>> +
>>>> +    if (pkt->size < ETH_HLEN) {
>>>> +        error_report("pkt->size < ETH_HLEN");
>>>> +        return 1;
>>>> +    }
>>>> +    pkt->network_layer = data + ETH_HLEN;
>>>> +    l3_proto = eth_get_l3_proto(data, l2hdr_len);
>>>> +    if (l3_proto != ETH_P_IP) {
>>>> +        return 1;
>>>> +    }
>>>> +
>>>> +    network_length = pkt->ip->ip_hl * 4;
>>>> +    if (pkt->size < ETH_HLEN + network_length) {
>>>> +        error_report("pkt->size < network_layer + network_length");
>>>> +        return 1;
>>>> +    }
>>>> +    pkt->transport_layer = pkt->network_layer + network_length;
>>>> +    if (!pkt->transport_layer) {
>>>> +        error_report("pkt->transport_layer is valid");
>>>> +        return 1;
>>>> +    }
>>>> +
>>>> +    return 0;
>>>> +}
>>>> +
>>>> +void fill_connection_key(Packet *pkt, ConnectionKey *key, int mode)
>>>> +{
>>>> +    uint32_t tmp_ports;
>>>> +
>>>> +    key->ip_proto = pkt->ip->ip_p;
>>>> +
>>>> +    switch (key->ip_proto) {
>>>> +    case IPPROTO_TCP:
>>>> +    case IPPROTO_UDP:
>>>> +    case IPPROTO_DCCP:
>>>> +    case IPPROTO_ESP:
>>>> +    case IPPROTO_SCTP:
>>>> +    case IPPROTO_UDPLITE:
>>>> +        tmp_ports = *(uint32_t *)(pkt->transport_layer);
>>>> +        if (mode) {
>>>
>>> Looks like mode is unnecessary here, you can actually compare and 
>>> swap duing hashing to avoid mode here.
>>
>> I get your point.
>>
>>>
>>>> +            key->src = pkt->ip->ip_src;
>>>> +            key->dst = pkt->ip->ip_dst;
>>>> +            key->src_port = ntohs(tmp_ports & 0xffff);
>>>> +            key->dst_port = ntohs(tmp_ports >> 16);
>>>> +        } else {
>>>> +            key->dst = pkt->ip->ip_src;
>>>> +            key->src = pkt->ip->ip_dst;
>>>> +            key->dst_port = ntohs(tmp_ports & 0xffff);
>>>> +            key->src_port = ntohs(tmp_ports >> 16);
>>>> +        }
>>>> +        break;
>>>> +    case IPPROTO_AH:
>>>> +        tmp_ports = *(uint32_t *)(pkt->transport_layer + 4);
>>>> +        if (mode) {
>>>> +            key->src = pkt->ip->ip_src;
>>>> +            key->dst = pkt->ip->ip_dst;
>>>> +            key->src_port = ntohs(tmp_ports & 0xffff);
>>>> +            key->dst_port = ntohs(tmp_ports >> 16);
>>>> +        } else {
>>>> +            key->dst = pkt->ip->ip_src;
>>>> +            key->src = pkt->ip->ip_dst;
>>>> +            key->dst_port = ntohs(tmp_ports & 0xffff);
>>>> +            key->src_port = ntohs(tmp_ports >> 16);
>>>> +        }
>>>> +        break;
>>>> +    default:
>>>> +        key->src_port = 0;
>>>> +        key->dst_port = 0;
>>>> +        break;
>>>> +    }
>>>> +}
>>>
>>> This seems could be reused, please use a independent patch for 
>>> connection key stuffs.
>>
>> In this patch set or not?
>> If not, we make a new .c and .h for this?
>>
>
> Yes, this series please.
>
>>>
>>>> +
>>>> +Connection *connection_new(ConnectionKey *key)
>>>> +{
>>>> +    Connection *conn = g_slice_new(Connection);
>>>> +
>>>> +    conn->ip_proto = key->ip_proto;
>>>> +    conn->processing = false;
>>>> +    g_queue_init(&conn->primary_list);
>>>> +    g_queue_init(&conn->secondary_list);
>>>> +
>>>> +    return conn;
>>>> +}
>>>> +
>>>> +void connection_destroy(void *opaque)
>>>> +{
>>>> +    Connection *conn = opaque;
>>>> +
>>>> +    g_queue_foreach(&conn->primary_list, packet_destroy, NULL);
>>>> +    g_queue_free(&conn->primary_list);
>>>> +    g_queue_foreach(&conn->secondary_list, packet_destroy, NULL);
>>>> +    g_queue_free(&conn->secondary_list);
>>>> +    g_slice_free(Connection, conn);
>>>> +}
>>>> +
>>>> +Packet *packet_new(const void *data, int size)
>>>> +{
>>>> +    Packet *pkt = g_slice_new(Packet);
>>>> +
>>>> +    pkt->data = g_memdup(data, size);
>>>> +    pkt->size = size;
>>>> +
>>>> +    return pkt;
>>>> +}
>>>> +
>>>> +void packet_destroy(void *opaque, void *user_data)
>>>> +{
>>>> +    Packet *pkt = opaque;
>>>> +
>>>> +    g_free(pkt->data);
>>>> +    g_slice_free(Packet, pkt);
>>>> +}
>>>> +
>>>> +/*
>>>> + * Clear hashtable, stop this hash growing really huge
>>>> + */
>>>> +void connection_hashtable_reset(GHashTable *connection_track_table)
>>>> +{
>>>> +    g_hash_table_remove_all(connection_track_table);
>>>> +}
>>>> +
>>>> +/* if not found, create a new connection and add to hash table */
>>>> +Connection *connection_get(GHashTable *connection_track_table,
>>>> +                           ConnectionKey *key,
>>>> +                           uint32_t *hashtable_size)
>>>> +{
>>>> +    /* FIXME: protect connection_track_table */
>>>
>>> I fail to understand why need protection here.
>>
>> No need this...will remove it.
>>
>>>
>>>> +    Connection *conn = g_hash_table_lookup(connection_track_table, 
>>>> key);
>>>> +
>>>> +    if (conn == NULL) {
>>>> +        ConnectionKey *new_key = g_memdup(key, sizeof(*key));
>>>> +
>>>> +        conn = connection_new(key);
>>>> +
>>>> +        (*hashtable_size) += 1;
>>>> +        if (*hashtable_size > HASHTABLE_MAX_SIZE) {
>>>> +            error_report("colo proxy connection hashtable full, 
>>>> clear it");
>>>
>>> Is this a hint that we need a synchronization?
>>
>> NO...we needn't.
>>
>
> But you reset the hash table which means we lose the status of packet 
> comparing?

Make sense. will fix it in next version.

>
>>>
>>>> + connection_hashtable_reset(connection_track_table);
>>>> +            *hashtable_size = 0;
>>>> +            /* TODO:clear conn_list */
>>>
>>> If we don't clear conn_list, looks like a bug, so probably need to 
>>> do this in this patch.
>>
>> OK~~
>>
>>>
>>>> +        }
>>>> +
>>>> +        g_hash_table_insert(connection_track_table, new_key, conn);
>>>> +    }
>>>> +
>>>> +    return conn;
>>>> +}
>>>> diff --git a/net/colo-base.h b/net/colo-base.h
>>>> new file mode 100644
>>>> index 0000000..01c1a5d
>>>> --- /dev/null
>>>> +++ b/net/colo-base.h
>>>> @@ -0,0 +1,88 @@
>>>> +/*
>>>> + * COarse-grain LOck-stepping Virtual Machines for Non-stop 
>>>> Service (COLO)
>>>> + * (a.k.a. Fault Tolerance or Continuous Replication)
>>>> + *
>>>> + * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
>>>> + * Copyright (c) 2016 FUJITSU LIMITED
>>>> + * Copyright (c) 2016 Intel Corporation
>>>> + *
>>>> + * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>>>> + *
>>>> + * This work is licensed under the terms of the GNU GPL, version 2 or
>>>> + * later.  See the COPYING file in the top-level directory.
>>>> + */
>>>> +
>>>> +#ifndef QEMU_COLO_BASE_H
>>>> +#define QEMU_COLO_BASE_H
>>>> +
>>>> +#include "slirp/slirp.h"
>>>> +#include "qemu/jhash.h"
>>>> +#include "qemu/rcu.h"
>>>
>>> Don't see any rcu usage in this patch.
>>
>> will remove it.
>>
>>>
>>>> +
>>>> +#define HASHTABLE_MAX_SIZE 16384
>>>> +
>>>> +typedef enum colo_conn_state {
>>>
>>> This looks like can only take care of TCP, so probably add "tcp" in 
>>> its name.
>>
>> yes.
>>
>>>
>>>> +     COLO_CONN_IDLE,
>>>> +
>>>> +    /* States on the primary: For incoming connection */
>>>> +     COLO_CONN_PRI_IN_SYN,   /* Received Syn */
>>>> +     COLO_CONN_PRI_IN_PSYNACK, /* Received syn/ack from primary, 
>>>> but not
>>>> +                                yet from secondary */
>>>> +     COLO_CONN_PRI_IN_SSYNACK, /* Received syn/ack from secondary, 
>>>> but
>>>> +                                  not yet from primary */
>>>> +     COLO_CONN_PRI_IN_SYNACK,  /* Received syn/ack from both */
>>>> +     COLO_CONN_PRI_IN_ESTABLISHED, /* Got the ACK */
>>>> +
>>>> +    /* States on the secondary: For incoming connection */
>>>> +     COLO_CONN_SEC_IN_SYNACK,      /* We sent a syn/ack */
>>>> +     COLO_CONN_SEC_IN_ACK,         /* Saw the ack but didn't yet 
>>>> see our syn/ack */
>>>> +     COLO_CONN_SEC_IN_ESTABLISHED, /* Got the ACK from the outside */
>>>
>>> Should we care about any FIN state here?
>>
>> Currently we don't care.
>>
>
> Then a comment to explain why only care the stated during connection 
> establishment will be better.

OK

>
>>>
>>>> +} colo_conn_state;
>>>> +
>>>> +typedef struct Packet {
>>>> +    void *data;
>>>> +    union {
>>>> +        uint8_t *network_layer;
>>>> +        struct ip *ip;
>>>> +    };
>>>> +    uint8_t *transport_layer;
>>>> +    int size;
>>>> +} Packet;
>>>
>>> We may start to consider shares codes between e.g hw/net/net_tx_pkt.c.
>>
>> I read it.the file be added to qemu a mouth ago.
>> it need time to be stable.maybe it will change.
>> So I think this job should be do after colo-compare be merged...
>
> Ok, but we need to avoid duplications as much as possible.
>
>>
>>>
>>>> +
>>>> +typedef struct ConnectionKey {
>>>> +    /* (src, dst) must be grouped, in the same way than in IP 
>>>> header */
>>>> +    struct in_addr src;
>>>> +    struct in_addr dst;
>>>> +    uint16_t src_port;
>>>> +    uint16_t dst_port;
>>>> +    uint8_t ip_proto;
>>>> +} QEMU_PACKED ConnectionKey;
>>>> +
>>>> +typedef struct Connection {
>>>> +    /* connection primary send queue: element type: Packet */
>>>> +    GQueue primary_list;
>>>> +    /* connection secondary send queue: element type: Packet */
>>>> +    GQueue secondary_list;
>>>> +    /* flag to enqueue unprocessed_connections */
>>>> +    bool processing;
>>>> +    uint8_t ip_proto;
>>>> +    /* be used by filter-rewriter */
>>>> +    colo_conn_state state;
>>>> +    tcp_seq  primary_seq;
>>>> +    tcp_seq  secondary_seq;
>>>> +} Connection;
>>>> +
>>>> +uint32_t connection_key_hash(const void *opaque);
>>>> +int connection_key_equal(const void *opaque1, const void *opaque2);
>>>> +int parse_packet_early(Packet *pkt);
>>>> +void fill_connection_key(Packet *pkt, ConnectionKey *key, int mode);
>>>> +Connection *connection_new(ConnectionKey *key);
>>>> +void connection_destroy(void *opaque);
>>>> +Connection *connection_get(GHashTable *connection_track_table,
>>>> +                           ConnectionKey *key,
>>>> +                           uint32_t *hashtable_size);
>>>> +void connection_hashtable_reset(GHashTable *connection_track_table);
>>>> +Packet *packet_new(const void *data, int size);
>>>> +void packet_destroy(void *opaque, void *user_data);
>>>> +
>>>> +#endif /* QEMU_COLO_BASE_H */
>>>> diff --git a/net/colo-compare.c b/net/colo-compare.c
>>>> index a3e1456..4231fe7 100644
>>>> --- a/net/colo-compare.c
>>>> +++ b/net/colo-compare.c
>>>> @@ -28,6 +28,7 @@
>>>>   #include "qemu/sockets.h"
>>>>   #include "qapi-visit.h"
>>>>   #include "trace.h"
>>>> +#include "net/colo-base.h"
>>>>     #define TYPE_COLO_COMPARE "colo-compare"
>>>>   #define COLO_COMPARE(obj) \
>>>> @@ -38,6 +39,28 @@
>>>>   static QTAILQ_HEAD(, CompareState) net_compares =
>>>>          QTAILQ_HEAD_INITIALIZER(net_compares);
>>>>   +/*
>>>> +  + CompareState ++
>>>> +  |               |
>>>> +  +---------------+   +---------------+ +---------------+
>>>> +  |conn list      +--->conn +--------->conn |
>>>> +  +---------------+   +---------------+ +---------------+
>>>> +  |               |     |           |             | |
>>>> +  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
>>>> +                    |primary |  |secondary    |primary | |secondary
>>>> +                    |packet  |  |packet  +    |packet  | |packet  +
>>>> +                    +--------+  +--------+    +--------+ +--------+
>>>> +                        |           |             | |
>>>> +                    +---v----+  +---v----+    +---v----+ +---v----+
>>>> +                    |primary |  |secondary    |primary | |secondary
>>>> +                    |packet  |  |packet  +    |packet  | |packet  +
>>>> +                    +--------+  +--------+    +--------+ +--------+
>>>> +                        |           |             | |
>>>> +                    +---v----+  +---v----+    +---v----+ +---v----+
>>>> +                    |primary |  |secondary    |primary | |secondary
>>>> +                    |packet  |  |packet  +    |packet  | |packet  +
>>>> +                    +--------+  +--------+    +--------+ +--------+
>>>> +*/
>>>>   typedef struct CompareState {
>>>>       Object parent;
>>>>   @@ -50,12 +73,103 @@ typedef struct CompareState {
>>>>       QTAILQ_ENTRY(CompareState) next;
>>>>       SocketReadState pri_rs;
>>>>       SocketReadState sec_rs;
>>>> +
>>>> +    /* connection list: the connections belonged to this NIC could 
>>>> be found
>>>> +     * in this list.
>>>> +     * element type: Connection
>>>> +     */
>>>> +    GQueue conn_list;
>>>> +    QemuMutex conn_list_lock; /* to protect conn_list */
>>>
>>> Why need this mutex?
>>
>> will remove it.
>>
>>>
>>>> +    /* hashtable to save connection */
>>>> +    GHashTable *connection_track_table;
>>>> +    /* to save unprocessed_connections */
>>>> +    GQueue unprocessed_connections;
>>>> +    /* proxy current hash size */
>>>> +    uint32_t hashtable_size;
>>>>   } CompareState;
>>>>     typedef struct CompareClass {
>>>>       ObjectClass parent_class;
>>>>   } CompareClass;
>>>>   +enum {
>>>> +    PRIMARY_IN = 0,
>>>> +    SECONDARY_IN,
>>>> +};
>>>> +
>>>> +static int compare_chr_send(CharDriverState *out,
>>>> +                            const uint8_t *buf,
>>>> +                            uint32_t size);
>>>> +
>>>> +/*
>>>> + * Return 0 on success, if return -1 means the pkt
>>>> + * is unsupported(arp and ipv6) and will be sent later
>>>> + */
>>>> +static int packet_enqueue(CompareState *s, int mode)
>>>> +{
>>>> +    ConnectionKey key = {{ 0 } };
>>>> +    Packet *pkt = NULL;
>>>> +    Connection *conn;
>>>> +
>>>> +    if (mode == PRIMARY_IN) {
>>>> +        pkt = packet_new(s->pri_rs.buf, s->pri_rs.packet_len);
>>>> +    } else {
>>>> +        pkt = packet_new(s->sec_rs.buf, s->sec_rs.packet_len);
>>>> +    }
>>>> +
>>>> +    if (parse_packet_early(pkt)) {
>>>> +        packet_destroy(pkt, NULL);
>>>> +        pkt = NULL;
>>>> +        return -1;
>>>> +    }
>>>> +    fill_connection_key(pkt, &key, PRIMARY_IN);
>>>> +
>>>> +    conn = connection_get(s->connection_track_table,
>>>> +                          &key,
>>>> +                          &s->hashtable_size);
>>>> +    if (!conn->processing) {
>>>> +        qemu_mutex_lock(&s->conn_list_lock);
>>>> +        g_queue_push_tail(&s->conn_list, conn);
>>>> +        qemu_mutex_unlock(&s->conn_list_lock);
>>>> +        conn->processing = true;
>>>> +    }
>>>> +
>>>> +    if (mode == PRIMARY_IN) {
>>>> +        g_queue_push_tail(&conn->primary_list, pkt);
>>>> +    } else {
>>>> +        g_queue_push_tail(&conn->secondary_list, pkt);
>>>> +    }
>>>> +
>>>> +    return 0;
>>>> +}
>>>> +
>>>> +static int compare_chr_send(CharDriverState *out,
>>>> +                            const uint8_t *buf,
>>>> +                            uint32_t size)
>>>> +{
>>>> +    int ret = 0;
>>>> +    uint32_t len = htonl(size);
>>>> +
>>>> +    if (!size) {
>>>> +        return 0;
>>>> +    }
>>>> +
>>>> +    ret = qemu_chr_fe_write_all(out, (uint8_t *)&len, sizeof(len));
>>>> +    if (ret != sizeof(len)) {
>>>> +        goto err;
>>>> +    }
>>>> +
>>>> +    ret = qemu_chr_fe_write_all(out, (uint8_t *)buf, size);
>>>> +    if (ret != size) {
>>>> +        goto err;
>>>> +    }
>>>> +
>>>> +    return 0;
>>>> +
>>>> +err:
>>>> +    return ret < 0 ? ret : -EIO;
>>>> +}
>>>> +
>>>>   static char *compare_get_pri_indev(Object *obj, Error **errp)
>>>>   {
>>>>       CompareState *s = COLO_COMPARE(obj);
>>>> @@ -103,12 +217,21 @@ static void compare_set_outdev(Object *obj, 
>>>> const char *value, Error **errp)
>>>>     static void compare_pri_rs_finalize(SocketReadState *pri_rs)
>>>>   {
>>>> -    /* if packet_enqueue pri pkt failed we will send unsupported 
>>>> packet */
>>>> +    CompareState *s = container_of(pri_rs, CompareState, pri_rs);
>>>> +
>>>> +    if (packet_enqueue(s, PRIMARY_IN)) {
>>>> +        trace_colo_compare_main("primary: unsupported packet in");
>>>> +        compare_chr_send(s->chr_out, pri_rs->buf, 
>>>> pri_rs->packet_len);
>>>> +    }
>>>
>>> Do we have a upper limit on the maximum numbers of packets could be 
>>> queued? If not, guest may easily trigger OOM.
>>
>> We need a g_queue to do this job? 
>
> Maybe.
>
>> It upper than the limit we drop the packet?
>>
>> Thanks
>> Zhang Chen
>
> Needs more thought, but we could start from dropping packets.

OK.

>
>>
>>>
>>>>   }
>>>>     static void compare_sec_rs_finalize(SocketReadState *sec_rs)
>>>>   {
>>>> -    /* if packet_enqueue sec pkt failed we will notify trace */
>>>> +    CompareState *s = container_of(sec_rs, CompareState, sec_rs);
>>>> +
>>>> +    if (packet_enqueue(s, SECONDARY_IN)) {
>>>> +        trace_colo_compare_main("secondary: unsupported packet in");
>>>> +    }
>>>>   }
>>>>     /*
>>>> @@ -161,6 +284,15 @@ static void 
>>>> colo_compare_complete(UserCreatable *uc, Error **errp)
>>>>       net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize);
>>>>       net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize);
>>>>   +    g_queue_init(&s->conn_list);
>>>> +    qemu_mutex_init(&s->conn_list_lock);
>>>> +    s->hashtable_size = 0;
>>>> +
>>>> +    s->connection_track_table = 
>>>> g_hash_table_new_full(connection_key_hash,
>>>> + connection_key_equal,
>>>> + g_free,
>>>> + connection_destroy);
>>>> +
>>>>       return;
>>>>   }
>>>>   @@ -203,6 +335,8 @@ static void colo_compare_finalize(Object *obj)
>>>>       if (!QTAILQ_EMPTY(&net_compares)) {
>>>>           QTAILQ_REMOVE(&net_compares, s, next);
>>>>       }
>>>> +    qemu_mutex_destroy(&s->conn_list_lock);
>>>> +    g_queue_free(&s->conn_list);
>>>>         g_free(s->pri_indev);
>>>>       g_free(s->sec_indev);
>>>> diff --git a/trace-events b/trace-events
>>>> index ca7211b..703de1a 100644
>>>> --- a/trace-events
>>>> +++ b/trace-events
>>>> @@ -1916,3 +1916,6 @@ aspeed_vic_update_fiq(int flags) "Raising 
>>>> FIQ: %d"
>>>>   aspeed_vic_update_irq(int flags) "Raising IRQ: %d"
>>>>   aspeed_vic_read(uint64_t offset, unsigned size, uint32_t value) 
>>>> "From 0x%" PRIx64 " of size %u: 0x%" PRIx32
>>>>   aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) 
>>>> "To 0x%" PRIx64 " of size %u: 0x%" PRIx32
>>>> +
>>>> +# net/colo-compare.c
>>>> +colo_compare_main(const char *chr) ": %s"
>>>
>>>
>>>
>>> .
>>>
>>
>
>
>
> .
>

-- 
Thanks
zhangchen

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [RFC PATCH V5 4/4] colo-compare: add TCP, UDP, ICMP packet comparison
  2016-07-11 10:02     ` Zhang Chen
@ 2016-07-13  2:54       ` Jason Wang
  2016-07-13  5:10         ` Zhang Chen
  0 siblings, 1 reply; 21+ messages in thread
From: Jason Wang @ 2016-07-13  2:54 UTC (permalink / raw)
  To: Zhang Chen, qemu devel
  Cc: Li Zhijian, eddie . dong, Dr . David Alan Gilbert, zhanghailiang



On 2016年07月11日 18:02, Zhang Chen wrote:
>>> +static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
>>> +{
>>> +    int network_length;
>>> +    struct icmp *icmp_ppkt, *icmp_spkt;
>>> +
>>> +    trace_colo_compare_main("compare icmp");
>>> +    network_length = ppkt->ip->ip_hl * 4;
>>> +    if (ppkt->size != spkt->size ||
>>> +        ppkt->size < network_length + ETH_HLEN) {
>>> + trace_colo_compare_icmp_miscompare_size(ppkt->size, spkt->size);
>>> +        return -1;
>>> +    }
>>> +    icmp_ppkt = (struct icmp *)(ppkt->data + network_length + 
>>> ETH_HLEN);
>>> +    icmp_spkt = (struct icmp *)(spkt->data + network_length + 
>>> ETH_HLEN);
>>> +
>>> +    if ((icmp_ppkt->icmp_type == icmp_spkt->icmp_type) &&
>>> +        (icmp_ppkt->icmp_code == icmp_spkt->icmp_code)) {
>>> +        if (icmp_ppkt->icmp_type == ICMP_REDIRECT) {
>>> +            if (icmp_ppkt->icmp_gwaddr.s_addr !=
>>> +                icmp_spkt->icmp_gwaddr.s_addr) {
>>> +                trace_colo_compare_main("icmp_gwaddr.s_addr not 
>>> same");
>>> +                trace_colo_compare_icmp_miscompare_addr("ppkt s_addr",
>>> + inet_ntoa(icmp_ppkt->icmp_gwaddr));
>>> +                trace_colo_compare_icmp_miscompare_addr("spkt s_addr",
>>> + inet_ntoa(icmp_spkt->icmp_gwaddr));
>>> +                return -1;
>>> +            }
>>> +        } else if ((icmp_ppkt->icmp_type == ICMP_UNREACH) &&
>>> +                   (icmp_ppkt->icmp_type == ICMP_UNREACH_NEEDFRAG)) {
>>> +            if (icmp_ppkt->icmp_nextmtu != icmp_spkt->icmp_nextmtu) {
>>> +                trace_colo_compare_main("icmp_nextmtu not same");
>>> +                trace_colo_compare_icmp_miscompare_mtu("ppkt nextmtu",
>>> + icmp_ppkt->icmp_nextmtu);
>>> +                trace_colo_compare_icmp_miscompare_mtu("spkt nextmtu",
>>> + icmp_spkt->icmp_nextmtu);
>>> +                return -1;
>>> +            }
>>> +        }
>>> +    } else {
>>> +        return -1;
>>> +    }
>>
>> Why only compare part of icmp packet?
>>
>
> That's include most of situation, increase all part of icmp
> can reduce compare efficiency.
>
> Thanks
> Zhang Chen 

I believe we should cover all instead of "most" of situations. And looks 
like icmp packet were all small, so there's probably no need to do 
special tricks like this.

^ permalink raw reply	[flat|nested] 21+ messages in thread

* Re: [Qemu-devel] [RFC PATCH V5 4/4] colo-compare: add TCP, UDP, ICMP packet comparison
  2016-07-13  2:54       ` Jason Wang
@ 2016-07-13  5:10         ` Zhang Chen
  0 siblings, 0 replies; 21+ messages in thread
From: Zhang Chen @ 2016-07-13  5:10 UTC (permalink / raw)
  To: Jason Wang, qemu devel
  Cc: Li Zhijian, eddie . dong, Dr . David Alan Gilbert, zhanghailiang



On 07/13/2016 10:54 AM, Jason Wang wrote:
>
>
> On 2016年07月11日 18:02, Zhang Chen wrote:
>>>> +static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
>>>> +{
>>>> +    int network_length;
>>>> +    struct icmp *icmp_ppkt, *icmp_spkt;
>>>> +
>>>> +    trace_colo_compare_main("compare icmp");
>>>> +    network_length = ppkt->ip->ip_hl * 4;
>>>> +    if (ppkt->size != spkt->size ||
>>>> +        ppkt->size < network_length + ETH_HLEN) {
>>>> + trace_colo_compare_icmp_miscompare_size(ppkt->size, spkt->size);
>>>> +        return -1;
>>>> +    }
>>>> +    icmp_ppkt = (struct icmp *)(ppkt->data + network_length + 
>>>> ETH_HLEN);
>>>> +    icmp_spkt = (struct icmp *)(spkt->data + network_length + 
>>>> ETH_HLEN);
>>>> +
>>>> +    if ((icmp_ppkt->icmp_type == icmp_spkt->icmp_type) &&
>>>> +        (icmp_ppkt->icmp_code == icmp_spkt->icmp_code)) {
>>>> +        if (icmp_ppkt->icmp_type == ICMP_REDIRECT) {
>>>> +            if (icmp_ppkt->icmp_gwaddr.s_addr !=
>>>> +                icmp_spkt->icmp_gwaddr.s_addr) {
>>>> +                trace_colo_compare_main("icmp_gwaddr.s_addr not 
>>>> same");
>>>> + trace_colo_compare_icmp_miscompare_addr("ppkt s_addr",
>>>> + inet_ntoa(icmp_ppkt->icmp_gwaddr));
>>>> + trace_colo_compare_icmp_miscompare_addr("spkt s_addr",
>>>> + inet_ntoa(icmp_spkt->icmp_gwaddr));
>>>> +                return -1;
>>>> +            }
>>>> +        } else if ((icmp_ppkt->icmp_type == ICMP_UNREACH) &&
>>>> +                   (icmp_ppkt->icmp_type == ICMP_UNREACH_NEEDFRAG)) {
>>>> +            if (icmp_ppkt->icmp_nextmtu != icmp_spkt->icmp_nextmtu) {
>>>> +                trace_colo_compare_main("icmp_nextmtu not same");
>>>> + trace_colo_compare_icmp_miscompare_mtu("ppkt nextmtu",
>>>> + icmp_ppkt->icmp_nextmtu);
>>>> + trace_colo_compare_icmp_miscompare_mtu("spkt nextmtu",
>>>> + icmp_spkt->icmp_nextmtu);
>>>> +                return -1;
>>>> +            }
>>>> +        }
>>>> +    } else {
>>>> +        return -1;
>>>> +    }
>>>
>>> Why only compare part of icmp packet?
>>>
>>
>> That's include most of situation, increase all part of icmp
>> can reduce compare efficiency.
>>
>> Thanks
>> Zhang Chen 
>
> I believe we should cover all instead of "most" of situations. And 
> looks like icmp packet were all small, so there's probably no need to 
> do special tricks like this.
>
>

OK, I will fix this in next version.

Thanks
Zhang Chen

>
> .
>

-- 
Thanks
zhangchen

^ permalink raw reply	[flat|nested] 21+ messages in thread

end of thread, other threads:[~2016-07-13  5:09 UTC | newest]

Thread overview: 21+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-06-23 11:34 [Qemu-devel] [RFC PATCH V5 0/4] Introduce COLO-compare Zhang Chen
2016-06-23 11:34 ` [Qemu-devel] [RFC PATCH V5 1/4] colo-compare: introduce colo compare initialization Zhang Chen
2016-07-08  3:40   ` Jason Wang
2016-07-08  8:21     ` Zhang Chen
2016-07-08  9:12       ` Jason Wang
2016-07-11  5:14         ` Zhang Chen
2016-06-23 11:34 ` [Qemu-devel] [RFC PATCH V5 2/4] colo-compare: track connection and enqueue packet Zhang Chen
2016-07-08  4:07   ` Jason Wang
2016-07-08  9:56     ` Zhang Chen
2016-07-11  5:41       ` Jason Wang
2016-07-12  5:42         ` Zhang Chen
2016-06-23 11:34 ` [Qemu-devel] [RFC PATCH V5 3/4] colo-compare: introduce packet comparison thread Zhang Chen
2016-07-08  4:23   ` Jason Wang
2016-07-11  7:17     ` Zhang Chen
2016-06-23 11:34 ` [Qemu-devel] [RFC PATCH V5 4/4] colo-compare: add TCP, UDP, ICMP packet comparison Zhang Chen
2016-07-08  8:59   ` Jason Wang
2016-07-11 10:02     ` Zhang Chen
2016-07-13  2:54       ` Jason Wang
2016-07-13  5:10         ` Zhang Chen
2016-07-07  7:47 ` [Qemu-devel] [RFC PATCH V5 0/4] Introduce COLO-compare Zhang Chen
2016-07-07  8:41   ` Jason Wang

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.