All of lore.kernel.org
 help / color / mirror / Atom feed
From: Jason Wang <jasowang@redhat.com>
To: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>,
	qemu devel <qemu-devel@nongnu.org>
Cc: Li Zhijian <lizhijian@cn.fujitsu.com>,
	Wen Congyang <wency@cn.fujitsu.com>,
	zhanghailiang <zhang.zhanghailiang@huawei.com>,
	"eddie . dong" <eddie.dong@intel.com>,
	"Dr . David Alan Gilbert" <dgilbert@redhat.com>
Subject: Re: [Qemu-devel] [RFC PATCH V10 6/7] colo-compare: introduce packet comparison thread
Date: Tue, 2 Aug 2016 15:52:00 +0800	[thread overview]
Message-ID: <d1ddce68-db0f-c20f-b8a8-932f256599c0@redhat.com> (raw)
In-Reply-To: <1469497794-16976-7-git-send-email-zhangchen.fnst@cn.fujitsu.com>



On 2016年07月26日 09:49, Zhang Chen wrote:
> If primary packet is same with secondary packet,
> we will send primary packet and drop secondary
> packet, otherwise notify COLO frame to do checkpoint.
> If primary packet comes and secondary packet not,

s/and/but/  and /packet not/packet does not/

> after REGULAR_PACKET_CHECK_MS milliseconds we set
> the primary packet as old_packet,then do a checkpoint.
>
> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
> ---
>   net/colo-base.c    |   1 +
>   net/colo-base.h    |   3 +
>   net/colo-compare.c | 212 +++++++++++++++++++++++++++++++++++++++++++++++++++++
>   trace-events       |   2 +
>   4 files changed, 218 insertions(+)
>
> diff --git a/net/colo-base.c b/net/colo-base.c
> index 7e91dec..eb1b631 100644
> --- a/net/colo-base.c
> +++ b/net/colo-base.c
> @@ -132,6 +132,7 @@ Packet *packet_new(const void *data, int size)
>   
>       pkt->data = g_memdup(data, size);
>       pkt->size = size;
> +    pkt->creation_ms = qemu_clock_get_ms(QEMU_CLOCK_HOST);
>   
>       return pkt;
>   }
> diff --git a/net/colo-base.h b/net/colo-base.h
> index 0505608..06d6dca 100644
> --- a/net/colo-base.h
> +++ b/net/colo-base.h
> @@ -17,6 +17,7 @@
>   
>   #include "slirp/slirp.h"
>   #include "qemu/jhash.h"
> +#include "qemu/timer.h"
>   
>   #define HASHTABLE_MAX_SIZE 16384
>   
> @@ -28,6 +29,8 @@ typedef struct Packet {
>       };
>       uint8_t *transport_layer;
>       int size;
> +    /* Time of packet creation, in wall clock ms */
> +    int64_t creation_ms;
>   } Packet;
>   
>   typedef struct ConnectionKey {
> diff --git a/net/colo-compare.c b/net/colo-compare.c
> index 5f87710..e020edc 100644
> --- a/net/colo-compare.c
> +++ b/net/colo-compare.c
> @@ -36,6 +36,8 @@
>   
>   #define COMPARE_READ_LEN_MAX NET_BUFSIZE
>   #define MAX_QUEUE_SIZE 1024
> +/* TODO: Should be configurable */
> +#define REGULAR_PACKET_CHECK_MS 3000
>   
>   /*
>     + CompareState ++
> @@ -83,6 +85,10 @@ typedef struct CompareState {
>       GQueue unprocessed_connections;
>       /* proxy current hash size */
>       uint32_t hashtable_size;
> +    /* compare thread, a thread for each NIC */
> +    QemuThread thread;
> +    /* Timer used on the primary to find packets that are never matched */
> +    QEMUTimer *timer;
>   } CompareState;
>   
>   typedef struct CompareClass {
> @@ -170,6 +176,112 @@ static int packet_enqueue(CompareState *s, int mode)
>       return 0;
>   }
>   
> +/*
> + * The IP packets sent by primary and secondary
> + * will be compared in here
> + * TODO support ip fragment, Out-Of-Order
> + * return:    0  means packet same
> + *            > 0 || < 0 means packet different
> + */
> +static int colo_packet_compare(Packet *ppkt, Packet *spkt)
> +{
> +    trace_colo_compare_ip_info(ppkt->size, inet_ntoa(ppkt->ip->ip_src),
> +                               inet_ntoa(ppkt->ip->ip_dst), spkt->size,
> +                               inet_ntoa(spkt->ip->ip_src),
> +                               inet_ntoa(spkt->ip->ip_dst));
> +
> +    if (ppkt->size == spkt->size) {
> +        return memcmp(ppkt->data, spkt->data, spkt->size);
> +    } else {
> +        return -1;
> +    }
> +}
> +
> +static int colo_packet_compare_all(Packet *spkt, Packet *ppkt)
> +{
> +    trace_colo_compare_main("compare all");
> +    return colo_packet_compare(ppkt, spkt);
> +}
> +
> +static void colo_old_packet_check_one(void *opaque_packet,
> +                                      void *opaque_found)
> +{
> +    int64_t now;
> +    bool *found_old = (bool *)opaque_found;
> +    Packet *ppkt = (Packet *)opaque_packet;
> +
> +    if (*found_old) {
> +        /* Someone found an old packet earlier in the queue */
> +        return;
> +    }
> +
> +    now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
> +    if ((now - ppkt->creation_ms) > REGULAR_PACKET_CHECK_MS) {
> +        trace_colo_old_packet_check_found(ppkt->creation_ms);
> +        *found_old = true;
> +    }
> +}
> +
> +static void colo_old_packet_check_one_conn(void *opaque,
> +                                           void *user_data)
> +{

user_data is unused.

> +    bool found_old = false;
> +    Connection *conn = opaque;
> +
> +    g_queue_foreach(&conn->primary_list, colo_old_packet_check_one,
> +                    &found_old);

To avoid odd API for colo_old_packet_check_one, maybe you can try use 
use QTAILQ and QTAILQ_FOREACH() or g_queue_find_custom() for avoiding 
iterating each element?

> +    if (found_old) {
> +        /* do checkpoint will flush old packet */
> +        /* TODO: colo_notify_checkpoint();*/
> +    }
> +}
> +
> +/*
> + * Look for old packets that the secondary hasn't matched,
> + * if we have some then we have to checkpoint to wake
> + * the secondary up.
> + */
> +static void colo_old_packet_check(void *opaque)
> +{
> +    CompareState *s = opaque;
> +
> +    g_queue_foreach(&s->conn_list, colo_old_packet_check_one_conn, NULL);
> +}
> +
> +/*
> + * called from the compare thread on the primary
> + * for compare connection
> + */
> +static void colo_compare_connection(void *opaque, void *user_data)
> +{
> +    CompareState *s = user_data;
> +    Connection *conn = opaque;
> +    Packet *pkt = NULL;
> +    GList *result = NULL;
> +    int ret;
> +
> +    while (!g_queue_is_empty(&conn->primary_list) &&
> +           !g_queue_is_empty(&conn->secondary_list)) {
> +        pkt = g_queue_pop_tail(&conn->primary_list);
> +        result = g_queue_find_custom(&conn->secondary_list,
> +                              pkt, (GCompareFunc)colo_packet_compare_all);
> +
> +        if (result) {
> +            ret = compare_chr_send(s->chr_out, pkt->data, pkt->size);
> +            if (ret < 0) {
> +                error_report("colo_send_primary_packet failed");
> +            }
> +            trace_colo_compare_main("packet same and release packet");
> +            g_queue_remove(&conn->secondary_list, result->data);

Both pkt and result were leaked here?

> +        } else {
> +            trace_colo_compare_main("packet different");
> +            g_queue_push_tail(&conn->primary_list, pkt);
> +            /* TODO: colo_notify_checkpoint();*/

Shouldn't we wait for a while consider packet may arrive late?

> +            break;
> +        }
> +    }
> +}
> +
>   static int compare_chr_send(CharDriverState *out,
>                               const uint8_t *buf,
>                               uint32_t size)
> @@ -197,6 +309,65 @@ err:
>       return ret < 0 ? ret : -EIO;
>   }
>   
> +static int compare_chr_can_read(void *opaque)
> +{
> +    return COMPARE_READ_LEN_MAX;
> +}
> +
> +/*
> + * called from the main thread on the primary for packets
> + * arriving over the socket from the primary.
> + */
> +static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
> +{
> +    CompareState *s = COLO_COMPARE(opaque);
> +    int ret;
> +
> +    ret = net_fill_rstate(&s->pri_rs, buf, size);
> +    if (ret == -1) {
> +        qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
> +        error_report("colo-compare primary_in error");
> +    }
> +}
> +
> +/*
> + * called from the main thread on the primary for packets
> + * arriving over the socket from the secondary.
> + */
> +static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
> +{
> +    CompareState *s = COLO_COMPARE(opaque);
> +    int ret;
> +
> +    ret = net_fill_rstate(&s->sec_rs, buf, size);
> +    if (ret == -1) {
> +        qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
> +        error_report("colo-compare secondary_in error");
> +    }
> +}
> +
> +static void *colo_compare_thread(void *opaque)
> +{
> +    GMainContext *worker_context;
> +    GMainLoop *compare_loop;
> +    CompareState *s = opaque;
> +
> +    worker_context = g_main_context_new();
> +
> +    qemu_chr_add_handlers_full(s->chr_pri_in, compare_chr_can_read,
> +                          compare_pri_chr_in, NULL, s, worker_context);
> +    qemu_chr_add_handlers_full(s->chr_sec_in, compare_chr_can_read,
> +                          compare_sec_chr_in, NULL, s, worker_context);
> +
> +    compare_loop = g_main_loop_new(worker_context, FALSE);
> +
> +    g_main_loop_run(compare_loop);
> +
> +    g_main_loop_unref(compare_loop);
> +    g_main_context_unref(worker_context);
> +    return NULL;
> +}
> +
>   static char *compare_get_pri_indev(Object *obj, Error **errp)
>   {
>       CompareState *s = COLO_COMPARE(obj);
> @@ -249,6 +420,9 @@ static void compare_pri_rs_finalize(SocketReadState *pri_rs)
>       if (packet_enqueue(s, PRIMARY_IN)) {
>           trace_colo_compare_main("primary: unsupported packet in");
>           compare_chr_send(s->chr_out, pri_rs->buf, pri_rs->packet_len);
> +    } else {
> +        /* compare connection */
> +        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
>       }
>   }
>   
> @@ -258,16 +432,35 @@ static void compare_sec_rs_finalize(SocketReadState *sec_rs)
>   
>       if (packet_enqueue(s, SECONDARY_IN)) {
>           trace_colo_compare_main("secondary: unsupported packet in");
> +    } else {
> +        /* compare connection */
> +        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
>       }
>   }
>   
>   /*
> + * Check old packet regularly so it can watch for any packets
> + * that the secondary hasn't produced equivalents of.
> + */
> +static void check_old_packet_regular(void *opaque)
> +{
> +    CompareState *s = opaque;
> +
> +    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
> +              REGULAR_PACKET_CHECK_MS);
> +    /* if have old packet we will notify checkpoint */
> +    colo_old_packet_check(s);
> +}
> +
> +/*
>    * called from the main thread on the primary
>    * to setup colo-compare.
>    */
>   static void colo_compare_complete(UserCreatable *uc, Error **errp)
>   {
>       CompareState *s = COLO_COMPARE(uc);
> +    char thread_name[64];
> +    static int compare_id;
>   
>       if (!s->pri_indev || !s->sec_indev || !s->outdev) {
>           error_setg(errp, "colo compare needs 'primary_in' ,"
> @@ -319,6 +512,18 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>                                                         g_free,
>                                                         connection_destroy);
>   
> +    sprintf(thread_name, "compare %d", compare_id);

colo-compare?

> +    qemu_thread_create(&s->thread, thread_name,
> +                       colo_compare_thread, s,
> +                       QEMU_THREAD_JOINABLE);
> +    compare_id++;
> +
> +    /* A regular timer to kick any packets that the secondary doesn't match */
> +    s->timer = timer_new_ms(QEMU_CLOCK_VIRTUAL, /* Only when guest runs */
> +                            check_old_packet_regular, s);
> +    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
> +                        REGULAR_PACKET_CHECK_MS);

I believe we need make sure this timer were processed in colo comparing 
thread. But it looks not.

> +
>       return;
>   }
>   
> @@ -360,6 +565,13 @@ static void colo_compare_finalize(Object *obj)
>   
>       g_queue_free(&s->conn_list);
>   
> +    if (s->thread.thread) {
> +        /* compare connection */
> +        g_queue_foreach(&s->conn_list, colo_compare_connection, s);
> +        qemu_thread_join(&s->thread);
> +    }
> +    timer_del(s->timer);
> +
>       g_free(s->pri_indev);
>       g_free(s->sec_indev);
>       g_free(s->outdev);
> diff --git a/trace-events b/trace-events
> index 703de1a..1537e91 100644
> --- a/trace-events
> +++ b/trace-events
> @@ -1919,3 +1919,5 @@ aspeed_vic_write(uint64_t offset, unsigned size, uint32_t data) "To 0x%" PRIx64
>   
>   # net/colo-compare.c
>   colo_compare_main(const char *chr) ": %s"
> +colo_compare_ip_info(int psize, const char *sta, const char *stb, int ssize, const char *stc, const char *std) "ppkt size = %d, ip_src = %s, ip_dst = %s, spkt size = %d, ip_src = %s, ip_dst = %s"
> +colo_old_packet_check_found(int64_t old_time) "%" PRId64

  reply	other threads:[~2016-08-02  7:52 UTC|newest]

Thread overview: 20+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2016-07-26  1:49 [Qemu-devel] [RFC PATCH V10 0/7] Introduce COLO-compare Zhang Chen
2016-07-26  1:49 ` [Qemu-devel] [RFC PATCH V10 1/7] colo-compare: introduce colo compare initialization Zhang Chen
2016-08-02  6:26   ` Jason Wang
2016-08-03  2:51     ` Zhang Chen
2016-07-26  1:49 ` [Qemu-devel] [RFC PATCH V10 2/7] colo-base: add colo-base to define and handle packet Zhang Chen
2016-08-02  6:38   ` Jason Wang
2016-08-03  6:34     ` Zhang Chen
2016-07-26  1:49 ` [Qemu-devel] [RFC PATCH V10 3/7] Jhash: add linux kernel jhashtable in qemu Zhang Chen
2016-08-02  6:40   ` Jason Wang
2016-08-03  6:36     ` Zhang Chen
2016-07-26  1:49 ` [Qemu-devel] [RFC PATCH V10 4/7] colo-compare: track connection and enqueue packet Zhang Chen
2016-08-02  7:14   ` Jason Wang
2016-08-04  6:49     ` Zhang Chen
2016-07-26  1:49 ` [Qemu-devel] [RFC PATCH V10 5/7] qemu-char: Add qemu_chr_add_handlers_full() for GMaincontext Zhang Chen
2016-07-26  1:49 ` [Qemu-devel] [RFC PATCH V10 6/7] colo-compare: introduce packet comparison thread Zhang Chen
2016-08-02  7:52   ` Jason Wang [this message]
2016-08-05  7:45     ` Zhang Chen
2016-07-26  1:49 ` [Qemu-devel] [RFC PATCH V10 7/7] colo-compare: add TCP, UDP, ICMP packet comparison Zhang Chen
2016-08-02  8:04   ` Jason Wang
2016-08-05  8:55     ` Zhang Chen

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=d1ddce68-db0f-c20f-b8a8-932f256599c0@redhat.com \
    --to=jasowang@redhat.com \
    --cc=dgilbert@redhat.com \
    --cc=eddie.dong@intel.com \
    --cc=lizhijian@cn.fujitsu.com \
    --cc=qemu-devel@nongnu.org \
    --cc=wency@cn.fujitsu.com \
    --cc=zhang.zhanghailiang@huawei.com \
    --cc=zhangchen.fnst@cn.fujitsu.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.