All of lore.kernel.org
 help / color / mirror / Atom feed
* [Patch 5/7] tabled: Add replication daemon
@ 2009-11-14  6:36 Pete Zaitcev
  2009-11-14  8:39 ` Jeff Garzik
  2009-11-14 23:44 ` Jeff Garzik
  0 siblings, 2 replies; 5+ messages in thread
From: Pete Zaitcev @ 2009-11-14  6:36 UTC (permalink / raw)
  To: Jeff Garzik; +Cc: Project Hail List

This patch adds what amounts to a background process that maintains
redundancy for object data. It is far from the complete solution.
For one thing, it does not verify checksums. But it's a start.

There's no way to turn this off, by intention. The whole thing must
work very reliably, not steal too much from benchmarks (but if it
does, it's only honest to take the hit). It is indispensible.
However, there's a plan to add useful monitoring of jobs and other
state, such as available nodes.

Signed-off-by: Pete Zaitcev <zaitcev@redhat.com>

---
 server/Makefile.am |    4 
 server/replica.c   |  753 +++++++++++++++++++++++++++++++++++++++++++
 server/server.c    |   25 +
 server/tabled.h    |   12 
 4 files changed, 789 insertions(+), 5 deletions(-)

diff --git a/server/Makefile.am b/server/Makefile.am
index f994b36..8faa95a 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -4,8 +4,8 @@ INCLUDES	= -I$(top_srcdir)/include @GLIB_CFLAGS@ @CHUNKDC_CFLAGS@ @CLDC_CFLAGS@
 sbin_PROGRAMS	= tabled tdbadm
 
 tabled_SOURCES	= tabled.h		\
-		  bucket.c object.c server.c storage.c storparse.c cldu.c \
-		  config.c util.c
+		  bucket.c object.c server.c storage.c storparse.c replica.c \
+		  cldu.c config.c util.c
 tabled_LDADD	= ../lib/libhttputil.a ../lib/libtdb.a		\
 		  @CHUNKDC_LIBS@ @CLDC_LIBS@ @PCRE_LIBS@ @GLIB_LIBS@ \
 		  @CRYPTO_LIBS@ @DB4_LIBS@ @EVENT_LIBS@ @ARGP_LIBS@ @SSL_LIBS@
diff --git a/server/replica.c b/server/replica.c
new file mode 100644
index 0000000..ff814da
--- /dev/null
+++ b/server/replica.c
@@ -0,0 +1,753 @@
+
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING.  If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define _GNU_SOURCE
+#include "tabled-config.h"
+#include <sys/types.h>
+#include <string.h>
+#include <syslog.h>
+#include <time.h>
+#include <db.h>
+#include <elist.h>
+#include "tabled.h"
+
+/*
+ * Replication Job
+ */
+struct rep_job {
+	struct list_head jlink;
+
+	uint64_t oid;
+	uint64_t size;		/* all of the object */
+	time_t start_time;
+	/* cannot look up by oid, keep key */
+	size_t klen;
+	struct db_obj_key *key;
+
+	struct storage_node *src, *dst;
+	struct open_chunk in_ce, out_ce;
+	long in_len;		/* can MIN() take long long? */
+	char *buf;
+	char *bptr;		/* points into buf */
+	ssize_t bcnt;		/* currently in buf */
+};
+
+struct rep_jobs {
+	int njobs;
+	struct list_head jlist;
+};
+
+static struct rep_jobs active = { 0, LIST_HEAD_INIT(active.jlist) };
+static struct rep_jobs queue = { 0, LIST_HEAD_INIT(queue.jlist) };
+static struct rep_jobs done = { 0, LIST_HEAD_INIT(done.jlist) };
+
+static void job_dispatch(void);
+
+/* should've called this job_alloc_and_fill actually */
+static struct rep_job *job_alloc(size_t klen, struct db_obj_key *key)
+{
+	struct rep_job *job;
+	size_t len;
+
+	len = sizeof(struct rep_job) + klen;
+	job = malloc(len);
+	if (job) {
+		memset(job, 0, sizeof(struct rep_job));
+		memcpy(job+1, key, klen);
+		job->klen = klen;
+		job->key = (struct db_obj_key *)(job+1);
+	}
+	return job;
+}
+
+static void job_free(struct rep_job *job)
+{
+	free(job->buf);
+	free(job);
+}
+
+/* N.B. the current calling convention is to wait for drain on the socket */
+static void job_done(struct rep_job *job)
+{
+/* P3 */ applog(LOG_INFO, "job done oid %llX", (long long) job->oid);
+	if (!stor_put_end(&job->out_ce)) {
+		applog(LOG_ERR, "Chunk sync failed on nid %u", job->dst->id);
+	}
+	stor_close(&job->out_ce);
+	stor_close(&job->in_ce);
+
+	list_del(&job->jlink);
+	--active.njobs;
+
+	list_add(&job->jlink, &done.jlist);
+}
+
+static void job_abend(struct rep_job *job)
+{
+/* P3 */ applog(LOG_INFO, "job abend from %u to %u oid %llX",
+  job->src->id, job->dst->id, (long long) job->oid);
+	stor_abort(&job->out_ce);
+	stor_close(&job->out_ce);
+	stor_close(&job->in_ce);
+
+	list_del(&job->jlink);
+	--active.njobs;
+	job_free(job);
+}
+
+static int job_submit_buf(struct rep_job *job, char *buf, ssize_t len)
+{
+	ssize_t bytes;
+
+	job->bptr = buf;
+	job->bcnt = len;
+
+	bytes = stor_put_buf(&job->out_ce, job->bptr, job->bcnt);
+	if (bytes < 0) {
+		job->bcnt = 0;
+		if (debugging)
+			applog(LOG_DEBUG, "stor_put_buf failed (%d)", bytes);
+		return bytes;
+	}
+	job->bptr += bytes;
+	job->bcnt -= bytes;
+	return 0;
+}
+
+static void job_get_poke(struct rep_job *job)
+{
+	ssize_t bytes;
+
+	for (;;) {
+		if (job->bcnt != 0)
+			break;
+		if (!job->in_len)
+			break;
+		bytes = stor_get_buf(&job->in_ce, job->buf,
+				     MIN(job->in_len, CLI_DATA_BUF_SZ));
+		if (bytes < 0) {
+			applog(LOG_ERR, "read failed oid %llX at nid %u",
+			       (unsigned long long) job->oid, job->src->id);
+			goto err_out;
+		}
+		if (bytes == 0)
+			break;
+		if (job_submit_buf(job, job->buf, bytes))
+			goto err_out;
+		job->in_len -= bytes;
+	}
+
+	if (job->bcnt == 0 && job->in_len == 0) {
+		job_done(job);
+		return;
+	}
+
+	/*
+	 * Since storage events automatically arm and disarm themselves,
+	 * we can just return to the main loop without a fear of looping.
+	 */
+	return;
+
+err_out:
+	job_abend(job);
+	return;
+}
+
+static void job_get_event(struct open_chunk *stp)
+{
+	job_get_poke(stp->cli);
+
+	job_dispatch();
+}
+
+static void job_put_poke(struct rep_job *job)
+{
+	ssize_t bytes;
+
+	bytes = stor_put_buf(&job->out_ce, job->bptr, job->bcnt);
+	if (bytes < 0) {
+		job->bcnt = 0;
+		applog(LOG_ERR, "write failed oid %llX at nid %u",
+		       (unsigned long long) job->oid, job->src->id);
+		job_abend(job);
+		return;
+	}
+	job->bptr += bytes;
+	job->bcnt -= bytes;
+
+	if (!job->bcnt)
+		job_get_poke(job);
+}
+
+static void job_put_event(struct open_chunk *stp)
+{
+	struct rep_job *job = stp->cli;
+
+	if (job->bcnt) {
+		job_put_poke(job);
+	} else {
+		job_get_poke(job);
+	}
+
+	job_dispatch();
+}
+
+/* well, not much scheduling for now, just throw to the tail of the queue. */
+static int job_schedule(struct rep_job *job)
+{
+
+	job->start_time = time(NULL);
+
+	/* P3 */ applog(LOG_INFO, "job oid %llX start %lu from %u to %u",
+	    job->oid, (long)job->start_time, job->src->id, job->dst->id);
+
+	list_add(&job->jlink, &queue.jlist);
+	queue.njobs++;
+	return 0;
+}
+
+/* FIXME needs to loop while active.njobs < max or something */
+static void job_dispatch()
+{
+	struct rep_job *job;
+	uint64_t objsize;	/* As reported by Chunk. Not used. */
+	int rc;
+
+	if (active.njobs >= 2)	/* FIXME: Bogus. Need to know current loads. */
+		return;
+
+	if (list_empty(&queue.jlist))
+		return;
+	job = list_entry(queue.jlist.next, struct rep_job, jlink);
+	list_del(&job->jlink);
+	--queue.njobs;
+
+	job->buf = malloc(CLI_DATA_BUF_SZ);
+	if (!job->buf)
+		goto err_malloc;
+
+	rc = stor_open(&job->in_ce, job->src);
+	if (rc) {
+		applog(LOG_WARNING, "Cannot open input chunk, nid %u (%d)",
+		       job->src->id, rc);
+		goto err_inopen;
+	}
+	job->in_ce.cli = job;
+
+	rc = stor_open(&job->out_ce, job->dst);
+	if (rc) {
+		applog(LOG_WARNING, "Cannot open output chunk, nid %u (%d)",
+		       job->dst->id, rc);
+		goto err_outopen;
+	}
+	job->out_ce.cli = job;
+
+	rc = stor_open_read(&job->in_ce, job_get_event, job->oid, &objsize);
+	if (rc) {
+		applog(LOG_ERR, "Cannot start nid %u for oid %llX (%d)",
+		       job->src->id, (unsigned long long) job->oid, rc);
+		goto err_read;
+	}
+	job->in_len = job->size;
+
+	rc = stor_put_start(&job->out_ce, job_put_event, job->oid, job->size);
+	if (rc) {
+		applog(LOG_ERR, "Cannot start putting, nid %u (%d)",
+		       job->dst->id, rc);
+		goto err_put;
+	}
+
+	list_add(&job->jlink, &active.jlist);
+	active.njobs++;
+
+	job_get_poke(job);	/* required to start */
+
+	return;
+
+err_put:
+err_read:
+	stor_close(&job->out_ce);
+err_outopen:
+	stor_close(&job->in_ce);
+err_inopen:
+	/* no free(buf) since job_free does it */
+err_malloc:
+	job_free(job);
+	return;
+}
+
+static struct storage_node *job_select_src(int nnum,
+					   struct storage_node *nvec[])
+{
+	if (nnum == 0)
+		return NULL;
+	return nvec[rand() % nnum];
+}
+
+/* FIXME Need to select by capacity and load. Ditto in initial selection. */
+static struct storage_node *job_select_dst(int nnum,
+					   struct storage_node *nvec[])
+{
+	enum { NRAND = 20 };
+	struct storage_node *tmp[NRAND];
+	int n, i;
+	struct storage_node *stn;
+	time_t t1;
+
+	t1 = time(NULL);
+	n = 0;
+	list_for_each_entry(stn, &tabled_srv.all_stor, all_link) {
+		if (!stn->up)
+			continue;
+		if (t1 > stn->last_up + CHUNK_REBOOT_TIME)
+			continue;
+
+		/* de-dup with source */
+		for (i = 0; i < nnum; i++) {
+			if (nvec[i] == stn)
+				break;
+		}
+		if (i < nnum)
+			continue;
+
+		tmp[n] = stn;
+		n++;
+	}
+	if (n == 0)
+		return NULL;
+	return tmp[rand() % n];
+}
+
+static struct rep_job *job_find_by_oid(uint64_t oid)
+{
+	struct rep_job *pos;
+
+	list_for_each_entry(pos, &queue.jlist, jlink) {
+		if (pos->oid == oid)
+			return pos;
+	}
+	list_for_each_entry(pos, &active.jlist, jlink) {
+		if (pos->oid == oid)
+			return pos;
+	}
+	return NULL;
+}
+
+/* start replicating the key somewhere */
+static void rep_job_start(size_t klen, struct db_obj_key *key,
+			  uint64_t oid, uint64_t objsize,
+			  int nnum, struct storage_node *nvec[])
+{
+	struct rep_job *job;
+
+	if (objsize == 0) {
+		static int cnt = 10;
+		if (cnt > 0) {	/* internal error; if it ever hits, it floods */
+			--cnt;
+			applog(LOG_ERR, "Submitting oid %llX with zero size",
+			       (long long) oid);
+		}
+		return;
+	}
+	if (job_find_by_oid(oid) != NULL)
+		return;
+	job = job_alloc(klen, key);
+	if (!job)
+		goto err_alloc;
+	job->oid = oid;
+	job->size = objsize;
+	job->src = job_select_src(nnum, nvec);
+	if (!job->src) {
+		/* P3 */ applog(LOG_INFO, "no src oid %llX", (long long) oid);
+		goto err_src;
+	}
+	job->dst = job_select_dst(nnum, nvec);
+	if (!job->dst) {
+		/* P3 */ applog(LOG_INFO, "no dst oid %llX", (long long) oid);
+		goto err_dst;
+	}
+	if (job->src->id == job->dst->id) {
+		/* Is this bad enough to invoke exit(1) right here? */
+		applog(LOG_ERR, "Internal error, copy from/to nid %u",
+		       job->src->id);
+		return;
+	}
+	if (job_schedule(job) != 0)
+		goto err_sched;
+	job_dispatch();
+	return;
+
+err_sched:
+err_dst:
+err_src:
+	job_free(job);
+err_alloc:
+	return;
+}
+
+/*
+ * rep_scan() and friends
+ * Read the whole db of keys, replicate those below redundancy.
+ */
+
+struct cursor {		/* our own "soft" cursor, works across transactions */
+	size_t klen;	/* zero possible, means no key */
+	struct db_obj_key *key;
+	DB_ENV *db_env;
+	DB     *db_objs;
+	DB_TXN *db_txn;
+	DBC    *db_cur;
+};
+
+static int rep_scan_open(struct cursor *cp)
+{
+	int rc;
+
+	rc = cp->db_env->txn_begin(cp->db_env, NULL, &cp->db_txn, 0);
+	if (rc) {
+		cp->db_env->err(cp->db_env, rc, "DB_ENV->txn_begin");
+		goto err_none;
+	}
+
+	// DB_WRITECURSOR ?  DB_BULK ?
+	rc = cp->db_objs->cursor(cp->db_objs, cp->db_txn, &cp->db_cur, 0);
+	if (rc) {
+		cp->db_objs->err(cp->db_objs, rc, "objs->cursor");
+		goto err_out;
+	}
+
+	return 0;
+
+err_out:
+	rc = cp->db_txn->abort(cp->db_txn);
+	if (rc)
+		cp->db_env->err(cp->db_env, rc, "DB_ENV->txn_abort");
+err_none:
+	return -1;
+}
+
+static void rep_scan_close(struct cursor *cp)
+{
+	int rc;
+
+	rc = cp->db_cur->close(cp->db_cur);
+	if (rc) {
+		cp->db_objs->err(cp->db_objs, rc, "objs->cursor close");
+		goto err_out;
+	}
+	cp->db_cur = NULL;
+
+	rc = cp->db_txn->commit(cp->db_txn, 0);
+	if (rc)
+		cp->db_env->err(cp->db_env, rc, "DB_ENV->txn_commit");
+	cp->db_txn = NULL;
+	return;
+
+err_out:
+	rc = cp->db_txn->abort(cp->db_txn);
+	if (rc)
+		cp->db_env->err(cp->db_env, rc, "DB_ENV->txn_abort");
+	return;
+}
+
+/* get next */
+static int rep_scan_get(struct cursor *cp, struct db_obj_ent **pobj)
+{
+	unsigned int get_flags;
+	DBT pkey, pval;
+	int rc;
+
+	if (cp->db_cur) {
+		get_flags = DB_NEXT;
+	} else {
+		if (rep_scan_open(cp) != 0)
+			return -1;
+		get_flags = DB_SET_RANGE;
+	}
+
+	memset(&pkey, 0, sizeof(pkey));
+	pkey.data = cp->key;
+	pkey.size = cp->klen;
+
+	memset(&pval, 0, sizeof(pval));
+	pval.flags = DB_DBT_MALLOC;
+
+	rc = cp->db_cur->get(cp->db_cur, &pkey, &pval, get_flags);
+	if (rc) {
+		if (rc != DB_NOTFOUND)
+			cp->db_objs->err(cp->db_objs, rc, "cur->get for keys");
+		return -1;
+	}
+
+	*pobj = pval.data;
+	return 0;
+}
+
+/* parse object into cursor state */
+static int rep_scan_parse(struct cursor *cp, struct db_obj_ent *obj)
+{
+	unsigned int obj_koff, obj_klen;
+	struct db_obj_key *okey;
+
+	obj_klen = GUINT16_FROM_LE(*(uint16_t *)(obj+1));
+	if (obj_klen >= 64*1024) {	/* byteswapped or corrupt */
+		applog(LOG_ERR, "bad key length %d", obj_klen);
+		return -1;
+	}
+	obj_koff = obj->n_str * sizeof(uint16_t);
+
+	okey = malloc(64 + obj_klen);
+
+	memcpy(okey->bucket, obj->bucket, 64);
+	memcpy(okey->key, (char *)(obj+1) + obj_koff, obj_klen);
+
+	free(cp->key);
+	cp->key = okey;
+	cp->klen = 64 + obj_klen;
+	return 0;
+}
+
+/* meat of scan - check if replication is need on the key */
+static void rep_scan_verify(struct cursor *cp, struct db_obj_ent *obj)
+{
+	char bucket_name[65];
+	char object_name[1025];
+	uint64_t oid;
+	int i;
+	struct storage_node *redvec[MAXWAY];
+	int allcnt, redcnt;
+	uint32_t nid;
+	struct storage_node *stn;
+	time_t t1;
+
+	memcpy(bucket_name, cp->key->bucket, 64);
+	bucket_name[64] = 0;
+	memcpy(object_name, cp->key->key, cp->klen - 64);
+	object_name[cp->klen - 64] = 0;
+
+	t1 = time(NULL);
+
+	allcnt = 0;
+	redcnt = 0;
+	for (i = 0; i < MAXWAY; i++) {
+		nid = GUINT32_FROM_LE(obj->d.a.nidv[i]);
+		if (!nid)
+			continue;
+		stn = stor_node_by_nid(nid);
+		if (!stn)
+			continue;
+		allcnt++;
+		if (!stn->up)
+			continue;
+		if (t1 > stn->last_up + CHUNK_REBOOT_TIME)
+			continue;
+		/*
+		 * This is where we later ask chunks for checksums (TODO).
+		 */
+
+		redvec[redcnt] = stn;
+		redcnt++;
+	}
+
+	oid = GUINT64_FROM_LE(obj->d.a.oid);
+
+	applog(LOG_INFO, "bucket %s key %s oid %llX n(%u,%u,%u): all %d ok %d",
+	       bucket_name, object_name, (long long) oid,
+	       GUINT32_FROM_LE(obj->d.a.nidv[0]),
+	       GUINT32_FROM_LE(obj->d.a.nidv[1]),
+	       GUINT32_FROM_LE(obj->d.a.nidv[2]),
+	       allcnt, redcnt);
+
+	if (redcnt < MAXWAY) {		/* maybe have MINWAY too? */
+		rep_job_start(cp->klen, cp->key, oid,
+			      GUINT64_FROM_LE(obj->size),
+			      redcnt, redvec);
+	}
+}
+
+static void rep_add_nid(unsigned int klen, struct db_obj_key *key, uint32_t nid)
+{
+	DB_ENV *db_env = tdb.env;
+	DB *db_objs = tdb.objs;
+	DB_TXN *db_txn;
+	DBT pkey, pval;
+	struct db_obj_ent *obj;
+	ssize_t oelen;
+	unsigned empty;
+	uint32_t n;
+	int i;
+	int rc;
+
+	rc = db_env->txn_begin(db_env, NULL, &db_txn, 0);
+	if (rc) {
+		db_env->err(db_env, rc, "DB_ENV->txn_begin");
+		goto err_none;
+	}
+
+	memset(&pkey, 0, sizeof(pkey));
+	pkey.data = key;
+	pkey.size = klen;
+
+	memset(&pval, 0, sizeof(pval));
+	pval.flags = DB_DBT_MALLOC;
+
+	rc = db_objs->get(db_objs, db_txn, &pkey, &pval, DB_RMW);
+	if (rc) {
+		db_env->err(db_env, rc, "objs->get");
+		goto err_get;
+	}
+
+	obj = pval.data;
+	oelen = pval.size;
+
+	memset(&pkey, 0, sizeof(pkey));
+	pkey.data = key;
+	pkey.size = klen;
+
+	rc = db_objs->del(db_objs, db_txn, &pkey, 0);
+	if (rc) {
+		db_objs->err(db_objs, rc, "objs->del");
+		goto err_del;
+	}
+
+	empty = 0;
+	for (i = 0; i < MAXWAY; i++) {
+		n = GUINT32_FROM_LE(obj->d.a.nidv[i]);
+		if (n && n == nid) {
+			applog(LOG_WARNING,
+			       "object %llX already has nid %u",
+			       (long long) GUINT64_FROM_LE(obj->d.a.oid), nid);
+			goto err_check;
+		}
+		if (!n)
+			empty++;
+	}
+	if (!empty) {
+		applog(LOG_WARNING,
+		      "object %llX already fully redundant, dropping nid %u",
+		       (long long) GUINT64_FROM_LE(obj->d.a.oid), nid);
+		goto err_check;
+	}
+
+	for (i = 0; i < MAXWAY; i++) {
+		if (!obj->d.a.nidv[i]) {
+			obj->d.a.nidv[i] = GUINT32_TO_LE(nid);
+			break;
+		}
+	}
+
+	memset(&pkey, 0, sizeof(pkey));
+	pkey.data = key;
+	pkey.size = klen;
+
+	memset(&pval, 0, sizeof(pval));
+	pval.data = obj;
+	pval.size = oelen;
+
+	rc = db_objs->put(db_objs, db_txn, &pkey, &pval, 0);
+	if (rc) {
+		db_env->err(db_env, rc, "objs->put");
+		goto err_put;
+	}
+
+	free(obj);
+
+	rc = db_txn->commit(db_txn, 0);
+	if (rc) {
+		db_env->err(db_env, rc, "DB_ENV->txn_commit");
+	}
+	return;
+
+err_put:
+err_check:
+err_del:
+	free(obj);
+err_get:
+	rc = db_txn->abort(db_txn);
+	if (rc)
+		db_env->err(db_env, rc, "DB_ENV->txn_abort");
+err_none:
+	return;
+}
+
+static void rep_retire(void)
+{
+	struct rep_job *job;
+
+	while (!list_empty(&done.jlist)) {
+		job = list_entry(done.jlist.next, struct rep_job, jlink);
+		list_del(&job->jlink);
+
+		rep_add_nid(job->klen, job->key, job->dst->id);
+		job_free(job);
+	}
+}
+
+void rep_scan(void)
+{
+	struct cursor cur;
+	struct db_obj_ent *obj;
+	unsigned long kcnt;
+	time_t start_time, t;
+
+	rep_retire();
+
+	start_time = time(NULL);
+	if (debugging)
+		applog(LOG_DEBUG, "key scan start time %lu", (long)start_time);
+
+	memset(&cur, 0, sizeof(struct cursor));	/* enough to construct */
+	cur.db_env = tdb.env;
+	cur.db_objs = tdb.objs;
+
+	kcnt = 0;
+	for (;;) {
+		if ((t = time(NULL)) >= start_time + 2) {
+			if (debugging)
+				applog(LOG_DEBUG,
+				       "db release at keys %lu seconds %lu",
+				       kcnt, (long)t);
+			rep_scan_close(&cur);
+		}
+
+		if (rep_scan_get(&cur, &obj) != 0)
+			break;
+
+		/* not needed for db4 with DB_NEXT, but eases our logic */
+		if (rep_scan_parse(&cur, obj) != 0) {
+			free(obj);
+			continue;
+		}
+
+		if (!GUINT32_FROM_LE(obj->flags) & DB_OBJ_INLINE)
+			rep_scan_verify(&cur, obj);
+
+		free(obj);
+		kcnt++;
+	}
+
+	rep_scan_close(&cur);
+	free(cur.key);
+	cur.key = NULL;
+
+	if (debugging)
+		applog(LOG_DEBUG, "key scan done keys %lu", kcnt);
+	return;
+}
+
diff --git a/server/server.c b/server/server.c
index 2a504e2..673c151 100644
--- a/server/server.c
+++ b/server/server.c
@@ -1318,6 +1318,20 @@ static void tdb_checkpoint(int fd, short events, void *userdata)
 	add_chkpt_timer();
 }
 
+static void add_kscan_timer(void)
+{
+	static const struct timeval tv = { TABLED_RESCAN_SEC, 0 };
+
+	if (evtimer_add(&tabled_srv.kscan_timer, &tv) < 0)
+		applog(LOG_WARNING, "unable to add key scan timer");
+}
+
+static void tdb_keyscan(int fd, short events, void *userdata)
+{
+	rep_scan();
+	add_kscan_timer();
+}
+
 static void tdb_state_cb(enum db_event event)
 {
 
@@ -1365,7 +1379,7 @@ static void tdb_state_cb(enum db_event event)
  *
  * We don't even bother with registering this callback, just call it by name. 
  *
- * The return value is used to re-arm rescan mechanism.
+ * The return value is used to re-arm storage rescan mechanism.
  */
 int stor_update_cb(void)
 {
@@ -1383,8 +1397,15 @@ int stor_update_cb(void)
 				applog(LOG_DEBUG, " NID %u is up", stn->id);
 			num_up++;
 			stn->up = true;
+			stn->last_up = time(NULL);
+		} else {
+			if (stn->last_up != 0 &&
+			    time(NULL) >= stn->last_up + CHUNK_REBOOT_TIME) {
+				applog(LOG_INFO, " NID %u went down", stn->id);
+			}
 		}
 	}
+
 	if (num_up < 1) {
 		applog(LOG_INFO, "No active storage node(s), waiting");
 		return num_up;
@@ -1695,6 +1716,7 @@ static void tdb_state_process(enum st_tdb new_state)
 			return;
 		}
 		add_chkpt_timer();
+		add_kscan_timer();
 		net_listen();
 	}
 }
@@ -1772,6 +1794,7 @@ int main (int argc, char *argv[])
 
 	event_init();
 	evtimer_set(&tabled_srv.chkpt_timer, tdb_checkpoint, NULL);
+	evtimer_set(&tabled_srv.kscan_timer, tdb_keyscan, NULL);
 
 	/* set up server networking */
 	rc = net_open();
diff --git a/server/tabled.h b/server/tabled.h
index fd6142e..31cead2 100644
--- a/server/tabled.h
+++ b/server/tabled.h
@@ -19,9 +19,9 @@
  *
  */
 
-
 #include <stdbool.h>
 #include <stdlib.h>
+#include <time.h>
 #include <netinet/in.h>
 #include <openssl/md5.h>
 #include <glib.h>
@@ -43,6 +43,9 @@ enum {
 	TABLED_PGSZ_LOCK	= 4096,
 
 	TABLED_CHKPT_SEC	= 60 * 5,	/* secs between db4 chkpt */
+	TABLED_RESCAN_SEC	= 60*3 + 7,	/* secs btw key rescans */
+
+	CHUNK_REBOOT_TIME	= 3*60,		/* secs to declare chunk dead */
 
 	CLI_REQ_BUF_SZ		= 8192,		/* buffer for req + hdrs */
 	CLI_DATA_BUF_SZ		= 8192,
@@ -89,6 +92,7 @@ struct storage_node {
 	struct list_head	all_link;
 	uint32_t		id;
 	bool			up;
+	time_t			last_up;
 
 	unsigned		alen;
 	int			addr_af;
@@ -115,7 +119,7 @@ struct open_chunk {
 	struct st_client	*stc;
 	struct storage_node	*node;
 	struct list_head	link;
-	struct client		*cli;
+	void			*cli;	/* usually struct client * */
 
 	uint64_t		wtogo;
 	uint64_t		wkey;
@@ -241,6 +245,7 @@ struct server {
 	enum st_net		state_net;
 
 	struct event		chkpt_timer;	/* db4 checkpoint timer */
+	struct event		kscan_timer;	/* db4 key rescan timer */
 
 	struct server_stats	stats;		/* global statistics */
 };
@@ -340,4 +345,7 @@ extern int stor_node_check(struct storage_node *stn);
 /* storparse.c */
 extern void stor_parse(char *fname, const char *text, size_t len);
 
+/* replica.c */
+extern void rep_scan(void);
+
 #endif /* __TABLED_H__ */

^ permalink raw reply related	[flat|nested] 5+ messages in thread

* Re: [Patch 5/7] tabled: Add replication daemon
  2009-11-14  6:36 [Patch 5/7] tabled: Add replication daemon Pete Zaitcev
@ 2009-11-14  8:39 ` Jeff Garzik
  2009-11-14 23:44 ` Jeff Garzik
  1 sibling, 0 replies; 5+ messages in thread
From: Jeff Garzik @ 2009-11-14  8:39 UTC (permalink / raw)
  To: Pete Zaitcev; +Cc: Project Hail List

On 11/14/2009 01:36 AM, Pete Zaitcev wrote:
> This patch adds what amounts to a background process that maintains
> redundancy for object data. It is far from the complete solution.
> For one thing, it does not verify checksums. But it's a start.
>
> There's no way to turn this off, by intention. The whole thing must
> work very reliably, not steal too much from benchmarks (but if it
> does, it's only honest to take the hit). It is indispensible.
> However, there's a plan to add useful monitoring of jobs and other
> state, such as available nodes.
>
> Signed-off-by: Pete Zaitcev<zaitcev@redhat.com>

Just so you're not left hanging, regarding this patch, I'm holding off 
in-depth review.  The other patches were easy, quick to apply without 
much post-3am thinking... ;)

	Jeff



^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [Patch 5/7] tabled: Add replication daemon
  2009-11-14  6:36 [Patch 5/7] tabled: Add replication daemon Pete Zaitcev
  2009-11-14  8:39 ` Jeff Garzik
@ 2009-11-14 23:44 ` Jeff Garzik
  2009-11-15  4:53   ` Pete Zaitcev
  1 sibling, 1 reply; 5+ messages in thread
From: Jeff Garzik @ 2009-11-14 23:44 UTC (permalink / raw)
  To: Pete Zaitcev; +Cc: Project Hail List

On 11/14/2009 01:36 AM, Pete Zaitcev wrote:
> +void rep_scan(void)
> +{
> +	struct cursor cur;
> +	struct db_obj_ent *obj;
> +	unsigned long kcnt;
> +	time_t start_time, t;
> +
> +	rep_retire();
> +
> +	start_time = time(NULL);
> +	if (debugging)
> +		applog(LOG_DEBUG, "key scan start time %lu", (long)start_time);
> +
> +	memset(&cur, 0, sizeof(struct cursor));	/* enough to construct */
> +	cur.db_env = tdb.env;
> +	cur.db_objs = tdb.objs;
> +
> +	kcnt = 0;
> +	for (;;) {
> +		if ((t = time(NULL))>= start_time + 2) {
> +			if (debugging)
> +				applog(LOG_DEBUG,
> +				       "db release at keys %lu seconds %lu",
> +				       kcnt, (long)t);
> +			rep_scan_close(&cur);
> +		}
> +
> +		if (rep_scan_get(&cur,&obj) != 0)
> +			break;
> +
> +		/* not needed for db4 with DB_NEXT, but eases our logic */
> +		if (rep_scan_parse(&cur, obj) != 0) {
> +			free(obj);
> +			continue;
> +		}
> +
> +		if (!GUINT32_FROM_LE(obj->flags)&  DB_OBJ_INLINE)
> +			rep_scan_verify(&cur, obj);
> +
> +		free(obj);
> +		kcnt++;
> +	}
> +
> +	rep_scan_close(&cur);
> +	free(cur.key);
> +	cur.key = NULL;
> +
> +	if (debugging)
> +		applog(LOG_DEBUG, "key scan done keys %lu", kcnt);
> +	return;

Major comments:

1) What is the point of db->del() in rep_add_nid() ?  You are in the 
middle of a transaction, and you immediately overwrite that record in 
the same transaction.  That is clearly unnecessary work, when db->put() 
will simply overwrite an existing record, if requested.


2) rep_scan():  I would rather not make the entire daemon non-responsive 
for multiple seconds.

The database is already in multi-threaded mode, and db4 is 
free-threaded, so it would seem to make a lot more sense to simply 
g_thread_create() a thread to do this work.

Also, there should be no need to scan a chunkd's keys at all, as long as 
the chunkd instance is still communicating with us.

	Jeff


^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [Patch 5/7] tabled: Add replication daemon
  2009-11-14 23:44 ` Jeff Garzik
@ 2009-11-15  4:53   ` Pete Zaitcev
  2009-11-15 10:47     ` Jeff Garzik
  0 siblings, 1 reply; 5+ messages in thread
From: Pete Zaitcev @ 2009-11-15  4:53 UTC (permalink / raw)
  To: Jeff Garzik; +Cc: Project Hail List

On Sat, 14 Nov 2009 18:44:26 -0500, Jeff Garzik <jeff@garzik.org> wrote:

> 1) What is the point of db->del() in rep_add_nid() ?  You are in the 
> middle of a transaction, and you immediately overwrite that record in 
> the same transaction.  That is clearly unnecessary work, when db->put() 
> will simply overwrite an existing record, if requested.

That didn't work last I tested, but I forgot what the issue was.
The code is copied from object.c. I'll review it.

> 2) rep_scan():  I would rather not make the entire daemon non-responsive 
> for multiple seconds.
> 
> The database is already in multi-threaded mode, and db4 is 
> free-threaded, so it would seem to make a lot more sense to simply 
> g_thread_create() a thread to do this work.

True, but multi-threading in tabled is a much bigger undertaking than
just "simply" creating a thread. There's a lot of common state.
I think the 2s problem is less urgent than processing massive
replication more efficiently (from a Chunk down).

> Also, there should be no need to scan a chunkd's keys at all, as long as 
> the chunkd instance is still communicating with us.

No, there's a need, although there's no code for it: the scan should
probe keys by requesting their metadata, which forturously includes
checksums. This is how the results of Chunk's self-test will be
communicated to tabled.

-- Pete

^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [Patch 5/7] tabled: Add replication daemon
  2009-11-15  4:53   ` Pete Zaitcev
@ 2009-11-15 10:47     ` Jeff Garzik
  0 siblings, 0 replies; 5+ messages in thread
From: Jeff Garzik @ 2009-11-15 10:47 UTC (permalink / raw)
  To: Pete Zaitcev; +Cc: Project Hail List

On 11/14/2009 11:53 PM, Pete Zaitcev wrote:
> On Sat, 14 Nov 2009 18:44:26 -0500, Jeff Garzik<jeff@garzik.org>  wrote:
>
>> 1) What is the point of db->del() in rep_add_nid() ?  You are in the
>> middle of a transaction, and you immediately overwrite that record in
>> the same transaction.  That is clearly unnecessary work, when db->put()
>> will simply overwrite an existing record, if requested.
>
> That didn't work last I tested, but I forgot what the issue was.
> The code is copied from object.c. I'll review it.

The only code matching that pattern is object_put_end(), which also 
performs unnecessary work of needlessly calling db->del().

This should be recognized as nothing more than programmer laziness, 
enabled by the sharing of __object_del() between DEL and PUT, and the 
necessity of deleting associated records (ACLs) in conjunction with the 
main [deletion | update].

That does not apply to rep_add_nid(), which has neither object data nor 
related records to delete.

(patches accepted to remove ->del from PUT path, too, if you wish)


>> 2) rep_scan():  I would rather not make the entire daemon non-responsive
>> for multiple seconds.
>>
>> The database is already in multi-threaded mode, and db4 is
>> free-threaded, so it would seem to make a lot more sense to simply
>> g_thread_create() a thread to do this work.
>
> True, but multi-threading in tabled is a much bigger undertaking than
> just "simply" creating a thread. There's a lot of common state.
> I think the 2s problem is less urgent than processing massive
> replication more efficiently (from a Chunk down).

I only see one global variable reference in the whole nicely-designed, 
nicely self-contained replica.c file.  "a lot of common state" seems 
like quite an exaggeration.

This patch resuscitates the worst application model from the late 
1980's, cooperative multi-tasking.  I do not want to walk down that 
road:  such programs have weird pauses at weird times, and behave in a 
very non-deterministic manner.

Furthermore, I think you will find that threading gives you a lot more 
freedom to work in the background.


>> Also, there should be no need to scan a chunkd's keys at all, as long as
>> the chunkd instance is still communicating with us.
>
> No, there's a need, although there's no code for it: the scan should
> probe keys by requesting their metadata, which forturously includes
> checksums. This is how the results of Chunk's self-test will be
> communicated to tabled.

What is the need?  This description provides "what" and "how" but does 
not answer "why?"

	Jeff


^ permalink raw reply	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2009-11-15 10:47 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2009-11-14  6:36 [Patch 5/7] tabled: Add replication daemon Pete Zaitcev
2009-11-14  8:39 ` Jeff Garzik
2009-11-14 23:44 ` Jeff Garzik
2009-11-15  4:53   ` Pete Zaitcev
2009-11-15 10:47     ` Jeff Garzik

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.