From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([2001:4830:134:3::10]:51349) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1bLN57-0004zP-OU for qemu-devel@nongnu.org; Fri, 08 Jul 2016 00:08:04 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1bLN53-00024l-FK for qemu-devel@nongnu.org; Fri, 08 Jul 2016 00:08:00 -0400 Received: from mx1.redhat.com ([209.132.183.28]:54932) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1bLN53-00024h-4F for qemu-devel@nongnu.org; Fri, 08 Jul 2016 00:07:57 -0400 References: <1466681677-30487-1-git-send-email-zhangchen.fnst@cn.fujitsu.com> <1466681677-30487-3-git-send-email-zhangchen.fnst@cn.fujitsu.com> From: Jason Wang Message-ID: <577F2716.1060909@redhat.com> Date: Fri, 8 Jul 2016 12:07:50 +0800 MIME-Version: 1.0 In-Reply-To: <1466681677-30487-3-git-send-email-zhangchen.fnst@cn.fujitsu.com> Content-Type: text/plain; charset=UTF-8; format=flowed Content-Transfer-Encoding: quoted-printable Subject: Re: [Qemu-devel] [RFC PATCH V5 2/4] colo-compare: track connection and enqueue packet List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Zhang Chen , qemu devel Cc: Li Zhijian , Wen Congyang , zhanghailiang , "eddie . dong" , "Dr . David Alan Gilbert" On 2016=E5=B9=B406=E6=9C=8823=E6=97=A5 19:34, Zhang Chen wrote: > In this patch we use kernel jhash table to track > connection, and then enqueue net packet like this: > > + CompareState ++ > | | > +---------------+ +---------------+ +---------------+ > |conn list +--->conn +--------->conn | > +---------------+ +---------------+ +---------------+ > | | | | | | > +---------------+ +---v----+ +---v----+ +---v----+ +---v----+ > |primary | |secondary |primary | |secondary > |packet | |packet + |packet | |packet + > +--------+ +--------+ +--------+ +--------+ > | | | | > +---v----+ +---v----+ +---v----+ +---v----+ > |primary | |secondary |primary | |secondary > |packet | |packet + |packet | |packet + > +--------+ +--------+ +--------+ +--------+ > | | | | > +---v----+ +---v----+ +---v----+ +---v----+ > |primary | |secondary |primary | |secondary > |packet | |packet + |packet | |packet + > +--------+ +--------+ +--------+ +--------+ A paragraph to describe the above would be more than welcomed. > Signed-off-by: Zhang Chen > Signed-off-by: Li Zhijian > Signed-off-by: Wen Congyang > --- > include/qemu/jhash.h | 61 ++++++++++++++++ > net/Makefile.objs | 1 + > net/colo-base.c | 194 ++++++++++++++++++++++++++++++++++++++++++= +++++++++ > net/colo-base.h | 88 +++++++++++++++++++++++ > net/colo-compare.c | 138 +++++++++++++++++++++++++++++++++++- > trace-events | 3 + > 6 files changed, 483 insertions(+), 2 deletions(-) > create mode 100644 include/qemu/jhash.h > create mode 100644 net/colo-base.c > create mode 100644 net/colo-base.h > > diff --git a/include/qemu/jhash.h b/include/qemu/jhash.h > new file mode 100644 > index 0000000..0fcd875 > --- /dev/null > +++ b/include/qemu/jhash.h > @@ -0,0 +1,61 @@ > +/* jhash.h: Jenkins hash support. > + * > + * Copyright (C) 2006. Bob Jenkins (bob_jenkins@burtleburtle.net) > + * > + * http://burtleburtle.net/bob/hash/ > + * > + * These are the credits from Bob's sources: > + * > + * lookup3.c, by Bob Jenkins, May 2006, Public Domain. > + * > + * These are functions for producing 32-bit hashes for hash table loo= kup. > + * hashword(), hashlittle(), hashlittle2(), hashbig(), mix(), and fin= al() > + * are externally useful functions. Routines to test the hash are > +included > + * if SELF_TEST is defined. You can use this free for any purpose. > +It's in > + * the public domain. It has no warranty. > + * > + * Copyright (C) 2009-2010 Jozsef Kadlecsik (kadlec@blackhole.kfki.hu= ) > + * > + * I've modified Bob's hash to be useful in the Linux kernel, and > + * any bugs present are my fault. > + * Jozsef > + */ > + > +#ifndef QEMU_JHASH_H__ > +#define QEMU_JHASH_H__ > + > +#include "qemu/bitops.h" > + > +/* > + * hashtable relation copy from linux kernel jhash > + */ > + > +/* __jhash_mix -- mix 3 32-bit values reversibly. */ > +#define __jhash_mix(a, b, c) \ > +{ \ > + a -=3D c; a ^=3D rol32(c, 4); c +=3D b; \ > + b -=3D a; b ^=3D rol32(a, 6); a +=3D c; \ > + c -=3D b; c ^=3D rol32(b, 8); b +=3D a; \ > + a -=3D c; a ^=3D rol32(c, 16); c +=3D b; \ > + b -=3D a; b ^=3D rol32(a, 19); a +=3D c; \ > + c -=3D b; c ^=3D rol32(b, 4); b +=3D a; \ > +} > + > +/* __jhash_final - final mixing of 3 32-bit values (a,b,c) into c */ > +#define __jhash_final(a, b, c) \ > +{ \ > + c ^=3D b; c -=3D rol32(b, 14); \ > + a ^=3D c; a -=3D rol32(c, 11); \ > + b ^=3D a; b -=3D rol32(a, 25); \ > + c ^=3D b; c -=3D rol32(b, 16); \ > + a ^=3D c; a -=3D rol32(c, 4); \ > + b ^=3D a; b -=3D rol32(a, 14); \ > + c ^=3D b; c -=3D rol32(b, 24); \ > +} > + > +/* An arbitrary initial parameter */ > +#define JHASH_INITVAL 0xdeadbeef > + > +#endif /* QEMU_JHASH_H__ */ Please split jhash into another patch. > diff --git a/net/Makefile.objs b/net/Makefile.objs > index ba92f73..119589f 100644 > --- a/net/Makefile.objs > +++ b/net/Makefile.objs > @@ -17,3 +17,4 @@ common-obj-y +=3D filter.o > common-obj-y +=3D filter-buffer.o > common-obj-y +=3D filter-mirror.o > common-obj-y +=3D colo-compare.o > +common-obj-y +=3D colo-base.o > diff --git a/net/colo-base.c b/net/colo-base.c > new file mode 100644 > index 0000000..7e263e8 > --- /dev/null > +++ b/net/colo-base.c > @@ -0,0 +1,194 @@ > +/* > + * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (C= OLO) > + * (a.k.a. Fault Tolerance or Continuous Replication) > + * > + * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD. > + * Copyright (c) 2016 FUJITSU LIMITED > + * Copyright (c) 2016 Intel Corporation > + * > + * Author: Zhang Chen > + * > + * This work is licensed under the terms of the GNU GPL, version 2 or > + * later. See the COPYING file in the top-level directory. > + */ > + > +#include "qemu/osdep.h" > +#include "qemu/error-report.h" > +#include "net/colo-base.h" > + > +uint32_t connection_key_hash(const void *opaque) > +{ > + const ConnectionKey *key =3D opaque; > + uint32_t a, b, c; > + > + /* Jenkins hash */ > + a =3D b =3D c =3D JHASH_INITVAL + sizeof(*key); > + a +=3D key->src.s_addr; > + b +=3D key->dst.s_addr; > + c +=3D (key->src_port | key->dst_port << 16); > + __jhash_mix(a, b, c); > + > + a +=3D key->ip_proto; > + __jhash_final(a, b, c); > + > + return c; > +} > + > +int connection_key_equal(const void *key1, const void *key2) > +{ > + return memcmp(key1, key2, sizeof(ConnectionKey)) =3D=3D 0; > +} > + > +int parse_packet_early(Packet *pkt) > +{ > + int network_length; > + uint8_t *data =3D pkt->data; > + uint16_t l3_proto; > + ssize_t l2hdr_len =3D eth_get_l2_hdr_length(data); > + > + if (pkt->size < ETH_HLEN) { > + error_report("pkt->size < ETH_HLEN"); > + return 1; > + } > + pkt->network_layer =3D data + ETH_HLEN; > + l3_proto =3D eth_get_l3_proto(data, l2hdr_len); > + if (l3_proto !=3D ETH_P_IP) { > + return 1; > + } > + > + network_length =3D pkt->ip->ip_hl * 4; > + if (pkt->size < ETH_HLEN + network_length) { > + error_report("pkt->size < network_layer + network_length"); > + return 1; > + } > + pkt->transport_layer =3D pkt->network_layer + network_length; > + if (!pkt->transport_layer) { > + error_report("pkt->transport_layer is valid"); > + return 1; > + } > + > + return 0; > +} > + > +void fill_connection_key(Packet *pkt, ConnectionKey *key, int mode) > +{ > + uint32_t tmp_ports; > + > + key->ip_proto =3D pkt->ip->ip_p; > + > + switch (key->ip_proto) { > + case IPPROTO_TCP: > + case IPPROTO_UDP: > + case IPPROTO_DCCP: > + case IPPROTO_ESP: > + case IPPROTO_SCTP: > + case IPPROTO_UDPLITE: > + tmp_ports =3D *(uint32_t *)(pkt->transport_layer); > + if (mode) { Looks like mode is unnecessary here, you can actually compare and swap=20 duing hashing to avoid mode here. > + key->src =3D pkt->ip->ip_src; > + key->dst =3D pkt->ip->ip_dst; > + key->src_port =3D ntohs(tmp_ports & 0xffff); > + key->dst_port =3D ntohs(tmp_ports >> 16); > + } else { > + key->dst =3D pkt->ip->ip_src; > + key->src =3D pkt->ip->ip_dst; > + key->dst_port =3D ntohs(tmp_ports & 0xffff); > + key->src_port =3D ntohs(tmp_ports >> 16); > + } > + break; > + case IPPROTO_AH: > + tmp_ports =3D *(uint32_t *)(pkt->transport_layer + 4); > + if (mode) { > + key->src =3D pkt->ip->ip_src; > + key->dst =3D pkt->ip->ip_dst; > + key->src_port =3D ntohs(tmp_ports & 0xffff); > + key->dst_port =3D ntohs(tmp_ports >> 16); > + } else { > + key->dst =3D pkt->ip->ip_src; > + key->src =3D pkt->ip->ip_dst; > + key->dst_port =3D ntohs(tmp_ports & 0xffff); > + key->src_port =3D ntohs(tmp_ports >> 16); > + } > + break; > + default: > + key->src_port =3D 0; > + key->dst_port =3D 0; > + break; > + } > +} This seems could be reused, please use a independent patch for=20 connection key stuffs. > + > +Connection *connection_new(ConnectionKey *key) > +{ > + Connection *conn =3D g_slice_new(Connection); > + > + conn->ip_proto =3D key->ip_proto; > + conn->processing =3D false; > + g_queue_init(&conn->primary_list); > + g_queue_init(&conn->secondary_list); > + > + return conn; > +} > + > +void connection_destroy(void *opaque) > +{ > + Connection *conn =3D opaque; > + > + g_queue_foreach(&conn->primary_list, packet_destroy, NULL); > + g_queue_free(&conn->primary_list); > + g_queue_foreach(&conn->secondary_list, packet_destroy, NULL); > + g_queue_free(&conn->secondary_list); > + g_slice_free(Connection, conn); > +} > + > +Packet *packet_new(const void *data, int size) > +{ > + Packet *pkt =3D g_slice_new(Packet); > + > + pkt->data =3D g_memdup(data, size); > + pkt->size =3D size; > + > + return pkt; > +} > + > +void packet_destroy(void *opaque, void *user_data) > +{ > + Packet *pkt =3D opaque; > + > + g_free(pkt->data); > + g_slice_free(Packet, pkt); > +} > + > +/* > + * Clear hashtable, stop this hash growing really huge > + */ > +void connection_hashtable_reset(GHashTable *connection_track_table) > +{ > + g_hash_table_remove_all(connection_track_table); > +} > + > +/* if not found, create a new connection and add to hash table */ > +Connection *connection_get(GHashTable *connection_track_table, > + ConnectionKey *key, > + uint32_t *hashtable_size) > +{ > + /* FIXME: protect connection_track_table */ I fail to understand why need protection here. > + Connection *conn =3D g_hash_table_lookup(connection_track_table, k= ey); > + > + if (conn =3D=3D NULL) { > + ConnectionKey *new_key =3D g_memdup(key, sizeof(*key)); > + > + conn =3D connection_new(key); > + > + (*hashtable_size) +=3D 1; > + if (*hashtable_size > HASHTABLE_MAX_SIZE) { > + error_report("colo proxy connection hashtable full, clear = it"); Is this a hint that we need a synchronization? > + connection_hashtable_reset(connection_track_table); > + *hashtable_size =3D 0; > + /* TODO:clear conn_list */ If we don't clear conn_list, looks like a bug, so probably need to do=20 this in this patch. > + } > + > + g_hash_table_insert(connection_track_table, new_key, conn); > + } > + > + return conn; > +} > diff --git a/net/colo-base.h b/net/colo-base.h > new file mode 100644 > index 0000000..01c1a5d > --- /dev/null > +++ b/net/colo-base.h > @@ -0,0 +1,88 @@ > +/* > + * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (C= OLO) > + * (a.k.a. Fault Tolerance or Continuous Replication) > + * > + * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD. > + * Copyright (c) 2016 FUJITSU LIMITED > + * Copyright (c) 2016 Intel Corporation > + * > + * Author: Zhang Chen > + * > + * This work is licensed under the terms of the GNU GPL, version 2 or > + * later. See the COPYING file in the top-level directory. > + */ > + > +#ifndef QEMU_COLO_BASE_H > +#define QEMU_COLO_BASE_H > + > +#include "slirp/slirp.h" > +#include "qemu/jhash.h" > +#include "qemu/rcu.h" Don't see any rcu usage in this patch. > + > +#define HASHTABLE_MAX_SIZE 16384 > + > +typedef enum colo_conn_state { This looks like can only take care of TCP, so probably add "tcp" in its=20 name. > + COLO_CONN_IDLE, > + > + /* States on the primary: For incoming connection */ > + COLO_CONN_PRI_IN_SYN, /* Received Syn */ > + COLO_CONN_PRI_IN_PSYNACK, /* Received syn/ack from primary, but n= ot > + yet from secondary */ > + COLO_CONN_PRI_IN_SSYNACK, /* Received syn/ack from secondary, but > + not yet from primary */ > + COLO_CONN_PRI_IN_SYNACK, /* Received syn/ack from both */ > + COLO_CONN_PRI_IN_ESTABLISHED, /* Got the ACK */ > + > + /* States on the secondary: For incoming connection */ > + COLO_CONN_SEC_IN_SYNACK, /* We sent a syn/ack */ > + COLO_CONN_SEC_IN_ACK, /* Saw the ack but didn't yet see o= ur syn/ack */ > + COLO_CONN_SEC_IN_ESTABLISHED, /* Got the ACK from the outside */ Should we care about any FIN state here? > +} colo_conn_state; > + > +typedef struct Packet { > + void *data; > + union { > + uint8_t *network_layer; > + struct ip *ip; > + }; > + uint8_t *transport_layer; > + int size; > +} Packet; We may start to consider shares codes between e.g hw/net/net_tx_pkt.c. > + > +typedef struct ConnectionKey { > + /* (src, dst) must be grouped, in the same way than in IP header *= / > + struct in_addr src; > + struct in_addr dst; > + uint16_t src_port; > + uint16_t dst_port; > + uint8_t ip_proto; > +} QEMU_PACKED ConnectionKey; > + > +typedef struct Connection { > + /* connection primary send queue: element type: Packet */ > + GQueue primary_list; > + /* connection secondary send queue: element type: Packet */ > + GQueue secondary_list; > + /* flag to enqueue unprocessed_connections */ > + bool processing; > + uint8_t ip_proto; > + /* be used by filter-rewriter */ > + colo_conn_state state; > + tcp_seq primary_seq; > + tcp_seq secondary_seq; > +} Connection; > + > +uint32_t connection_key_hash(const void *opaque); > +int connection_key_equal(const void *opaque1, const void *opaque2); > +int parse_packet_early(Packet *pkt); > +void fill_connection_key(Packet *pkt, ConnectionKey *key, int mode); > +Connection *connection_new(ConnectionKey *key); > +void connection_destroy(void *opaque); > +Connection *connection_get(GHashTable *connection_track_table, > + ConnectionKey *key, > + uint32_t *hashtable_size); > +void connection_hashtable_reset(GHashTable *connection_track_table); > +Packet *packet_new(const void *data, int size); > +void packet_destroy(void *opaque, void *user_data); > + > +#endif /* QEMU_COLO_BASE_H */ > diff --git a/net/colo-compare.c b/net/colo-compare.c > index a3e1456..4231fe7 100644 > --- a/net/colo-compare.c > +++ b/net/colo-compare.c > @@ -28,6 +28,7 @@ > #include "qemu/sockets.h" > #include "qapi-visit.h" > #include "trace.h" > +#include "net/colo-base.h" > =20 > #define TYPE_COLO_COMPARE "colo-compare" > #define COLO_COMPARE(obj) \ > @@ -38,6 +39,28 @@ > static QTAILQ_HEAD(, CompareState) net_compares =3D > QTAILQ_HEAD_INITIALIZER(net_compares); > =20 > +/* > + + CompareState ++ > + | | > + +---------------+ +---------------+ +---------------+ > + |conn list +--->conn +--------->conn | > + +---------------+ +---------------+ +---------------+ > + | | | | | | > + +---------------+ +---v----+ +---v----+ +---v----+ +---v----+ > + |primary | |secondary |primary | |secondary > + |packet | |packet + |packet | |packet + > + +--------+ +--------+ +--------+ +--------+ > + | | | | > + +---v----+ +---v----+ +---v----+ +---v----+ > + |primary | |secondary |primary | |secondary > + |packet | |packet + |packet | |packet + > + +--------+ +--------+ +--------+ +--------+ > + | | | | > + +---v----+ +---v----+ +---v----+ +---v----+ > + |primary | |secondary |primary | |secondary > + |packet | |packet + |packet | |packet + > + +--------+ +--------+ +--------+ +--------+ > +*/ > typedef struct CompareState { > Object parent; > =20 > @@ -50,12 +73,103 @@ typedef struct CompareState { > QTAILQ_ENTRY(CompareState) next; > SocketReadState pri_rs; > SocketReadState sec_rs; > + > + /* connection list: the connections belonged to this NIC could be = found > + * in this list. > + * element type: Connection > + */ > + GQueue conn_list; > + QemuMutex conn_list_lock; /* to protect conn_list */ Why need this mutex? > + /* hashtable to save connection */ > + GHashTable *connection_track_table; > + /* to save unprocessed_connections */ > + GQueue unprocessed_connections; > + /* proxy current hash size */ > + uint32_t hashtable_size; > } CompareState; > =20 > typedef struct CompareClass { > ObjectClass parent_class; > } CompareClass; > =20 > +enum { > + PRIMARY_IN =3D 0, > + SECONDARY_IN, > +}; > + > +static int compare_chr_send(CharDriverState *out, > + const uint8_t *buf, > + uint32_t size); > + > +/* > + * Return 0 on success, if return -1 means the pkt > + * is unsupported(arp and ipv6) and will be sent later > + */ > +static int packet_enqueue(CompareState *s, int mode) > +{ > + ConnectionKey key =3D {{ 0 } }; > + Packet *pkt =3D NULL; > + Connection *conn; > + > + if (mode =3D=3D PRIMARY_IN) { > + pkt =3D packet_new(s->pri_rs.buf, s->pri_rs.packet_len); > + } else { > + pkt =3D packet_new(s->sec_rs.buf, s->sec_rs.packet_len); > + } > + > + if (parse_packet_early(pkt)) { > + packet_destroy(pkt, NULL); > + pkt =3D NULL; > + return -1; > + } > + fill_connection_key(pkt, &key, PRIMARY_IN); > + > + conn =3D connection_get(s->connection_track_table, > + &key, > + &s->hashtable_size); > + if (!conn->processing) { > + qemu_mutex_lock(&s->conn_list_lock); > + g_queue_push_tail(&s->conn_list, conn); > + qemu_mutex_unlock(&s->conn_list_lock); > + conn->processing =3D true; > + } > + > + if (mode =3D=3D PRIMARY_IN) { > + g_queue_push_tail(&conn->primary_list, pkt); > + } else { > + g_queue_push_tail(&conn->secondary_list, pkt); > + } > + > + return 0; > +} > + > +static int compare_chr_send(CharDriverState *out, > + const uint8_t *buf, > + uint32_t size) > +{ > + int ret =3D 0; > + uint32_t len =3D htonl(size); > + > + if (!size) { > + return 0; > + } > + > + ret =3D qemu_chr_fe_write_all(out, (uint8_t *)&len, sizeof(len)); > + if (ret !=3D sizeof(len)) { > + goto err; > + } > + > + ret =3D qemu_chr_fe_write_all(out, (uint8_t *)buf, size); > + if (ret !=3D size) { > + goto err; > + } > + > + return 0; > + > +err: > + return ret < 0 ? ret : -EIO; > +} > + > static char *compare_get_pri_indev(Object *obj, Error **errp) > { > CompareState *s =3D COLO_COMPARE(obj); > @@ -103,12 +217,21 @@ static void compare_set_outdev(Object *obj, const= char *value, Error **errp) > =20 > static void compare_pri_rs_finalize(SocketReadState *pri_rs) > { > - /* if packet_enqueue pri pkt failed we will send unsupported packe= t */ > + CompareState *s =3D container_of(pri_rs, CompareState, pri_rs); > + > + if (packet_enqueue(s, PRIMARY_IN)) { > + trace_colo_compare_main("primary: unsupported packet in"); > + compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len); > + } Do we have a upper limit on the maximum numbers of packets could be=20 queued? If not, guest may easily trigger OOM. > } > =20 > static void compare_sec_rs_finalize(SocketReadState *sec_rs) > { > - /* if packet_enqueue sec pkt failed we will notify trace */ > + CompareState *s =3D container_of(sec_rs, CompareState, sec_rs); > + > + if (packet_enqueue(s, SECONDARY_IN)) { > + trace_colo_compare_main("secondary: unsupported packet in"); > + } > } > =20 > /* > @@ -161,6 +284,15 @@ static void colo_compare_complete(UserCreatable *u= c, Error **errp) > net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize); > net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize); > =20 > + g_queue_init(&s->conn_list); > + qemu_mutex_init(&s->conn_list_lock); > + s->hashtable_size =3D 0; > + > + s->connection_track_table =3D g_hash_table_new_full(connection_key= _hash, > + connection_key_e= qual, > + g_free, > + connection_destr= oy); > + > return; > } > =20 > @@ -203,6 +335,8 @@ static void colo_compare_finalize(Object *obj) > if (!QTAILQ_EMPTY(&net_compares)) { > QTAILQ_REMOVE(&net_compares, s, next); > } > + qemu_mutex_destroy(&s->conn_list_lock); > + g_queue_free(&s->conn_list); > =20 > g_free(s->pri_indev); > g_free(s->sec_indev); > diff --git a/trace-events b/trace-events > index ca7211b..703de1a 100644 > --- a/trace-events > +++ b/trace-events > @@ -1916,3 +1916,6 @@ aspeed_vic_update_fiq(int flags) "Raising FIQ: %d= " > aspeed_vic_update_irq(int flags) "Raising IRQ: %d" > aspeed_vic_read(uint64_t offset, unsigned size, uint32_t value) "From= 0x%" PRIx64 " of size %u: 0x%" PRIx32 > aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To 0= x%" PRIx64 " of size %u: 0x%" PRIx32 > + > +# net/colo-compare.c > +colo_compare_main(const char *chr) ": %s"