All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH RFC 0/11] ulogd2 db rework
@ 2013-05-10  6:48 Eric Leblond
  2013-05-10  6:48 ` [PATCH 01/11] ulogd: display stack during configuration Eric Leblond
                   ` (10 more replies)
  0 siblings, 11 replies; 15+ messages in thread
From: Eric Leblond @ 2013-05-10  6:48 UTC (permalink / raw)
  To: netfilter-devel


Hello,

Here's a patchset for ulogd which is aiming at improving the way
databases are handled. It features two different parts:
 - a backlog system (patch 06) which allow queries to be store in
 memory when the database is not available. Data loss is prevented
 in this case (in the limit of the defined memcap)
 - a multithreaded ulogd db output (patch 09) with one thread getting
 messages from kernel and an other one dedicated to the SQL queries
 execution.

The backlog system prevent data loss and only cost memory when the
database is down. The multithreaded db output uses a ring buffer
(which cost memory) for communication between the two threads.

The idea behind the multithreaded db output is to almost fix the time
needed to read a kernel message. Doing this, ulogd should be more
resistant to burst of kernel messages as kernel-user buffer will not
be filled due to the high treatment time needed to execute SQL query.

Both features are modifying the generic db system inside ulogd. So the
modification is available in pgsql, mysql and dbi output. It has for
now only been tested on pgsql.

Comments, feedbacks and test results are more than welcome on both
these features.

Patchset statistics:

 configure.ac                          |   32 ++-
 include/ulogd/db.h                    |   67 +++++-
 output/mysql/ulogd_output_MYSQL.c     |    4 +-
 output/pgsql/ulogd_output_PGSQL.c     |    4 +-
 output/sqlite3/ulogd_output_SQLITE3.c |    2 +
 src/Makefile.am                       |    2 +-
 src/ulogd.c                           |    2 +-
 ulogd.conf.in                         |   13 ++
 util/db.c                             |  371 +++++++++++++++++++++++++++------
 9 files changed, 417 insertions(+), 80 deletions(-)

BR,
--
Eric Leblond <eric@regit.org>

^ permalink raw reply	[flat|nested] 15+ messages in thread

* [PATCH 01/11] ulogd: display stack during configuration
  2013-05-10  6:48 [PATCH RFC 0/11] ulogd2 db rework Eric Leblond
@ 2013-05-10  6:48 ` Eric Leblond
  2013-05-10  6:48 ` [PATCH 02/11] Fix automagic support of dbi, pcap and sqlite3 Eric Leblond
                   ` (9 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Eric Leblond @ 2013-05-10  6:48 UTC (permalink / raw)
  To: netfilter-devel; +Cc: Eric Leblond


Signed-off-by: Eric Leblond <eric@regit.org>
---
 src/ulogd.c |    2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/ulogd.c b/src/ulogd.c
index b28d0f8..8f21cc2 100644
--- a/src/ulogd.c
+++ b/src/ulogd.c
@@ -830,7 +830,7 @@ static int create_stack(const char *option)
 	}
 	INIT_LLIST_HEAD(&stack->list);
 
-	ulogd_log(ULOGD_DEBUG, "building new pluginstance stack (%s):\n",
+	ulogd_log(ULOGD_NOTICE, "building new pluginstance stack: '%s'\n",
 		  option);
 
 	/* PASS 1: find and instanciate plugins of stack, link them together */
-- 
1.7.10.4


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 02/11] Fix automagic support of dbi, pcap and sqlite3
  2013-05-10  6:48 [PATCH RFC 0/11] ulogd2 db rework Eric Leblond
  2013-05-10  6:48 ` [PATCH 01/11] ulogd: display stack during configuration Eric Leblond
@ 2013-05-10  6:48 ` Eric Leblond
  2013-05-10  6:48 ` [PATCH 03/11] postgresql: add sanity checking Eric Leblond
                   ` (8 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Eric Leblond @ 2013-05-10  6:48 UTC (permalink / raw)
  To: netfilter-devel; +Cc: Ilya Tumaykin, Eric Leblond

From: Ilya Tumaykin <itumaykin@gmail.com>

ulogd has automagic deps for several output plugins right now, namely dbi,
pcap and sqlite3. These plugins are built if the appropriate libs are present
on user's system. While this situation is fine with binary distros it is not OK
on source-based ones such as Gentoo.

The problem arises when such a program links against libs without user's
request and libs are later removed from system which leaves program in a
broken state.

This patch is modifying configure.ac which we apply in our package and which
fixes mentioned issue. It adds 3 new configure options: --
without-{dbi,pcap.sqlite}. I would like to emphasize that this patch doesn't
change default behaviour of configure script at all, so all other distros won't
suffer. We simply add options to explicitly disable any attempts to try and
detect libs for automagic deps, which is enough to avoid unnecessary linkage.

Signed-off-by: Eric Leblond <eric@regit.org>
---
 configure.ac |   30 ++++++++++++++++++++----------
 1 file changed, 20 insertions(+), 10 deletions(-)

diff --git a/configure.ac b/configure.ac
index c94704b..7e04531 100644
--- a/configure.ac
+++ b/configure.ac
@@ -20,14 +20,6 @@ AC_PROG_LIBTOOL
 dnl Checks for libraries.
 AC_SEARCH_LIBS([dlopen], [dl], [libdl_LIBS="$LIBS"; LIBS=""])
 AC_SUBST([libdl_LIBS])
-AC_SEARCH_LIBS([pcap_close], [pcap], [libpcap_LIBS="-lpcap"; LIBS=""])
-AC_SUBST([libpcap_LIBS])
-AM_CONDITIONAL([HAVE_PCAP], [test -n "$libpcap_LIBS"])
-if test "x$libpcap_LIBS" != "x"; then
-	enable_pcap="yes"
-else
-	enable_pcap="no"
-fi
 
 dnl Checks for header files.
 AC_HEADER_DIRENT
@@ -88,7 +80,10 @@ else
 	enable_mysql="no"
 fi
 
-PKG_CHECK_MODULES([libsqlite3], [sqlite3], [], [:])
+AC_ARG_WITH([sqlite], AS_HELP_STRING([--without-sqlite], [Build without SQLITE3 output plugin [default=test]]))
+AS_IF([test "x$with_sqlite" != "xno"], [
+    PKG_CHECK_MODULES([libsqlite3], [sqlite3], [], [:])
+])
 AM_CONDITIONAL([HAVE_SQLITE3], [test -n "$libsqlite3_LIBS"])
 if test "x$libsqlite3_LIBS" != "x"; then
 	enable_sqlite3="yes"
@@ -96,7 +91,10 @@ else
 	enable_sqlite3="no"
 fi
 
-CT_CHECK_DBI()
+AC_ARG_WITH([dbi], AS_HELP_STRING([--without-dbi], [Build without DBI output plugin [default=test]]))
+AS_IF([test "x$with_dbi" != "xno"], [
+    CT_CHECK_DBI()
+])
 AM_CONDITIONAL(HAVE_DBI, test "x$DBI_LIB" != "x")
 if test "x$DBI_LIB" != "x"; then
 	enable_dbi="yes"
@@ -104,6 +102,18 @@ else
 	enable_dbi="no"
 fi
 
+AC_ARG_WITH([pcap], AS_HELP_STRING([--without-pcap], [Build without PCAP output plugin [default=test]]))
+AS_IF([test "x$with_pcap" != "xno"], [
+    AC_SEARCH_LIBS([pcap_close], [pcap], [libpcap_LIBS="-lpcap"; LIBS=""])
+    AC_SUBST([libpcap_LIBS])
+])
+AM_CONDITIONAL([HAVE_PCAP], [test -n "$libpcap_LIBS"])
+if test "x$libpcap_LIBS" != "x"; then
+	enable_pcap="yes"
+else
+	enable_pcap="no"
+fi
+
 dnl AC_SUBST(DATABASE_DIR)
 dnl AC_SUBST(DATABASE_LIB)
 dnl AC_SUBST(DATABASE_LIB_DIR)
-- 
1.7.10.4


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 03/11] postgresql: add sanity checking
  2013-05-10  6:48 [PATCH RFC 0/11] ulogd2 db rework Eric Leblond
  2013-05-10  6:48 ` [PATCH 01/11] ulogd: display stack during configuration Eric Leblond
  2013-05-10  6:48 ` [PATCH 02/11] Fix automagic support of dbi, pcap and sqlite3 Eric Leblond
@ 2013-05-10  6:48 ` Eric Leblond
  2013-05-10  6:48 ` [PATCH 04/11] mysql: " Eric Leblond
                   ` (7 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Eric Leblond @ 2013-05-10  6:48 UTC (permalink / raw)
  To: netfilter-devel; +Cc: Eric Leblond

Clean postgresql handler at deinit.

Signed-off-by: Eric Leblond <eric@regit.org>
---
 output/pgsql/ulogd_output_PGSQL.c |    4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/output/pgsql/ulogd_output_PGSQL.c b/output/pgsql/ulogd_output_PGSQL.c
index f246153..88fb765 100644
--- a/output/pgsql/ulogd_output_PGSQL.c
+++ b/output/pgsql/ulogd_output_PGSQL.c
@@ -214,7 +214,9 @@ static int close_db_pgsql(struct ulogd_pluginstance *upi)
 {
 	struct pgsql_instance *pi = (struct pgsql_instance *) upi->private;
 
-	PQfinish(pi->dbh);
+	if (pi->dbh)
+		PQfinish(pi->dbh);
+	pi->dbh = NULL;
 
 	return 0;
 }
-- 
1.7.10.4


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 04/11] mysql: add sanity checking
  2013-05-10  6:48 [PATCH RFC 0/11] ulogd2 db rework Eric Leblond
                   ` (2 preceding siblings ...)
  2013-05-10  6:48 ` [PATCH 03/11] postgresql: add sanity checking Eric Leblond
@ 2013-05-10  6:48 ` Eric Leblond
  2013-05-10  6:48 ` [PATCH 05/11] sqlite3: " Eric Leblond
                   ` (6 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Eric Leblond @ 2013-05-10  6:48 UTC (permalink / raw)
  To: netfilter-devel; +Cc: Eric Leblond

Nullify mysql handler at deinit.

Signed-off-by: Eric Leblond <eric@regit.org>
---
 output/mysql/ulogd_output_MYSQL.c |    4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/output/mysql/ulogd_output_MYSQL.c b/output/mysql/ulogd_output_MYSQL.c
index 72c080e..0a1ebfc 100644
--- a/output/mysql/ulogd_output_MYSQL.c
+++ b/output/mysql/ulogd_output_MYSQL.c
@@ -162,7 +162,9 @@ static int get_columns_mysql(struct ulogd_pluginstance *upi)
 static int close_db_mysql(struct ulogd_pluginstance *upi)
 {
 	struct mysql_instance *mi = (struct mysql_instance *) upi->private;
-	mysql_close(mi->dbh);
+	if (mi->dbh)
+		mysql_close(mi->dbh);
+	mi->dbh = NULL;
 	return 0;
 }
 
-- 
1.7.10.4


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 05/11] sqlite3: add sanity checking
  2013-05-10  6:48 [PATCH RFC 0/11] ulogd2 db rework Eric Leblond
                   ` (3 preceding siblings ...)
  2013-05-10  6:48 ` [PATCH 04/11] mysql: " Eric Leblond
@ 2013-05-10  6:48 ` Eric Leblond
  2013-05-10  6:48 ` [PATCH 06/11] db: store data in memory during database downtime Eric Leblond
                   ` (5 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Eric Leblond @ 2013-05-10  6:48 UTC (permalink / raw)
  To: netfilter-devel; +Cc: Eric Leblond

Nullify sqlite3 handler at deinit.

Signed-off-by: Eric Leblond <eric@regit.org>
---
 output/sqlite3/ulogd_output_SQLITE3.c |    2 ++
 1 file changed, 2 insertions(+)

diff --git a/output/sqlite3/ulogd_output_SQLITE3.c b/output/sqlite3/ulogd_output_SQLITE3.c
index f9f2462..5c49055 100644
--- a/output/sqlite3/ulogd_output_SQLITE3.c
+++ b/output/sqlite3/ulogd_output_SQLITE3.c
@@ -431,6 +431,8 @@ sqlite3_stop(struct ulogd_pluginstance *pi)
 
 	sqlite3_close(priv->dbh);
 
+	priv->dbh = NULL;
+
 	return 0;
 }
 
-- 
1.7.10.4


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 06/11] db: store data in memory during database downtime
  2013-05-10  6:48 [PATCH RFC 0/11] ulogd2 db rework Eric Leblond
                   ` (4 preceding siblings ...)
  2013-05-10  6:48 ` [PATCH 05/11] sqlite3: " Eric Leblond
@ 2013-05-10  6:48 ` Eric Leblond
  2013-05-10  6:48 ` [PATCH 07/11] db: suppress field in db structure Eric Leblond
                   ` (4 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Eric Leblond @ 2013-05-10  6:48 UTC (permalink / raw)
  To: netfilter-devel; +Cc: Eric Leblond

This patch is adding a mechanism to store query in a backlog build
in memory. This allow to store events during downtime in memory and
realize the effective insertion when the database comes back.
A memory cap is used to avoid any memory flooding.

Signed-off-by: Eric Leblond <eric@regit.org>
---
 include/ulogd/db.h |   34 +++++++++--
 ulogd.conf.in      |    9 +++
 util/db.c          |  170 +++++++++++++++++++++++++++++++++++++++++++---------
 3 files changed, 180 insertions(+), 33 deletions(-)

diff --git a/include/ulogd/db.h b/include/ulogd/db.h
index 1c910ff..a533902 100644
--- a/include/ulogd/db.h
+++ b/include/ulogd/db.h
@@ -20,6 +20,12 @@ struct db_driver {
 			const char *stmt, unsigned int len);
 };
 
+struct db_stmt {
+	char *stmt;
+	int len;
+	struct llist_head list;
+};
+
 struct db_instance {
 	char *stmt; /* buffer for our insert statement */
 	char *stmt_val; /* pointer to the beginning of the "VALUES" part */
@@ -28,9 +34,15 @@ struct db_instance {
 	time_t reconnect;
 	int (*interp)(struct ulogd_pluginstance *upi);
 	struct db_driver *driver;
+	unsigned int backlog_memcap;
+	unsigned int backlog_memusage;
+	unsigned int backlog_oneshot;
+	unsigned char backlog_full;
+	struct llist_head backlog;
 };
 #define TIME_ERR		((time_t)-1)	/* Be paranoid */
 #define RECONNECT_DEFAULT	2
+#define MAX_ONESHOT_REQUEST	10
 
 #define DB_CES							\
 		{						\
@@ -51,13 +63,25 @@ struct db_instance {
 			.key = "procedure",			\
 			.type = CONFIG_TYPE_STRING,		\
 			.options = CONFIG_OPT_MANDATORY,	\
+		},						\
+		{						\
+			.key = "backlog_memcap",		\
+			.type = CONFIG_TYPE_INT,		\
+			.u.value = 0,				\
+		},						\
+		{						\
+			.key = "backlog_oneshot_requests",	\
+			.type = CONFIG_TYPE_INT,		\
+			.u.value = MAX_ONESHOT_REQUEST,		\
 		}
 
-#define DB_CE_NUM	4
-#define table_ce(x)	(x->ces[0])
-#define reconnect_ce(x)	(x->ces[1])
-#define timeout_ce(x)	(x->ces[2])
-#define procedure_ce(x)	(x->ces[3])
+#define DB_CE_NUM		6
+#define table_ce(x)		(x->ces[0])
+#define reconnect_ce(x)		(x->ces[1])
+#define timeout_ce(x)		(x->ces[2])
+#define procedure_ce(x)		(x->ces[3])
+#define backlog_memcap_ce(x)	(x->ces[4])
+#define backlog_oneshot_ce(x)	(x->ces[5])
 
 void ulogd_db_signal(struct ulogd_pluginstance *upi, int signal);
 int ulogd_db_start(struct ulogd_pluginstance *upi);
diff --git a/ulogd.conf.in b/ulogd.conf.in
index f4f63d9..3e5e648 100644
--- a/ulogd.conf.in
+++ b/ulogd.conf.in
@@ -207,6 +207,13 @@ user="nupik"
 table="ulog"
 pass="changeme"
 procedure="INSERT_PACKET_FULL"
+# backlog configuration:
+# set backlog_memcap to the size of memory that will be
+# allocated to store events in memory if data is temporary down
+# and insert them when the database came back.
+#backlog_memcap=1000000
+# number of events to insert at once when backlog is not empty
+#backlog_oneshot_requests=10
 
 [mysql2]
 db="nulog"
@@ -224,6 +231,8 @@ table="ulog"
 #schema="public"
 pass="changeme"
 procedure="INSERT_PACKET_FULL"
+#backlog_memcap=1000000
+#backlog_oneshot_requests=10
 
 [pgsql2]
 db="nulog"
diff --git a/util/db.c b/util/db.c
index 0d8b9c1..d125e21 100644
--- a/util/db.c
+++ b/util/db.c
@@ -167,7 +167,22 @@ int ulogd_db_configure(struct ulogd_pluginstance *upi,
 	 * but abort during input key resolving routines.  configure
 	 * doesn't have a destructor... */
 	di->driver->close_db(upi);
+
+	INIT_LLIST_HEAD(&di->backlog);
+	di->backlog_memusage = 0;
 	
+	di->backlog_memcap = backlog_memcap_ce(upi->config_kset).u.value;
+	if (di->backlog_memcap > 0) {
+		di->backlog_oneshot = backlog_oneshot_ce(upi->config_kset).u.value;
+		if (di->backlog_oneshot <= 2) {
+			ulogd_log(ULOGD_ERROR,
+				  "backlog_oneshot_requests must be > 2 to hope"
+				  " cleaning. Setting it to 3.\n");
+			di->backlog_oneshot = 3;
+		}
+		di->backlog_full = 0;
+	}
+
 	return ret;
 }
 
@@ -245,38 +260,15 @@ static int _init_reconnect(struct ulogd_pluginstance *upi)
 	return 0;
 }
 
-static int _init_db(struct ulogd_pluginstance *upi)
-{
-	struct db_instance *di = (struct db_instance *) upi->private;
-
-	if (di->reconnect && di->reconnect > time(NULL))
-		return 0;
-	
-	if (di->driver->open_db(upi)) {
-		ulogd_log(ULOGD_ERROR, "can't establish database connection\n");
-		return _init_reconnect(upi);
-	}
-
-	/* enable 'real' logging */
-	di->interp = &__interp_db;
-
-	di->reconnect = 0;
-
-	/* call the interpreter function to actually write the
-	 * log line that we wanted to write */
-	return __interp_db(upi);
-}
-
-
-/* our main output function, called by ulogd */
-static int __interp_db(struct ulogd_pluginstance *upi)
+static void __format_query_db(struct ulogd_pluginstance *upi)
 {
 	struct db_instance *di = (struct db_instance *) &upi->private;
+
 	unsigned int i;
 
 	di->stmt_ins = di->stmt_val;
 
-	for (i = 0; i < upi->input.num_keys; i++) { 
+	for (i = 0; i < upi->input.num_keys; i++) {
 		struct ulogd_key *res = upi->input.keys[i].u.source;
 
 		if (upi->input.keys[i].flags & ULOGD_KEYF_INACTIVE)
@@ -325,8 +317,8 @@ static int __interp_db(struct ulogd_pluginstance *upi)
 		case ULOGD_RET_STRING:
 			*(di->stmt_ins++) = '\'';
 			if (res->u.value.ptr) {
-				di->stmt_ins += 
-				di->driver->escape_string(upi, di->stmt_ins, 
+				di->stmt_ins +=
+				di->driver->escape_string(upi, di->stmt_ins,
 							  res->u.value.ptr,
 							strlen(res->u.value.ptr));
 			}
@@ -347,10 +339,132 @@ static int __interp_db(struct ulogd_pluginstance *upi)
 		di->stmt_ins = di->stmt + strlen(di->stmt);
 	}
 	*(di->stmt_ins - 1) = ')';
+}
+
+static int __add_to_backlog(struct ulogd_pluginstance *upi, const char *stmt, unsigned int len)
+{
+	struct db_instance *di = (struct db_instance *) &upi->private;
+	struct db_stmt *query;
 
+	/* check if we are using backlog */
+	if (di->backlog_memcap == 0)
+		return 0;
+
+	/* check len against backlog */
+	if (len + di->backlog_memusage > di->backlog_memcap) {
+		if (di->backlog_full == 0)
+			ulogd_log(ULOGD_ERROR,
+				  "Backlog is full starting to reject events.\n");
+		di->backlog_full = 1;
+		return -1;
+	}
+
+	query = malloc(sizeof(struct db_stmt));
+	if (query == NULL)
+		return -1;
+
+	query->stmt = strndup(stmt, len);
+	query->len = len;
+
+	if (query->stmt == NULL) {
+		free(query);
+		return -1;
+	}
+
+	di->backlog_memusage += len + sizeof(struct db_stmt);
+	di->backlog_full = 0;
+
+	llist_add_tail(&query->list, &di->backlog);
+
+	return 0;
+}
+
+static int _init_db(struct ulogd_pluginstance *upi)
+{
+	struct db_instance *di = (struct db_instance *) upi->private;
+
+	if (di->reconnect && di->reconnect > time(NULL)) {
+		/* store entry to backlog if it is active */
+		if (di->backlog_memcap && !di->backlog_full) {
+			__format_query_db(upi);
+			__add_to_backlog(upi, di->stmt,
+						strlen(di->stmt));
+		}
+		return 0;
+	}
+
+	if (di->driver->open_db(upi)) {
+		ulogd_log(ULOGD_ERROR, "can't establish database connection\n");
+		if (di->backlog_memcap && !di->backlog_full) {
+			__format_query_db(upi);
+			__add_to_backlog(upi, di->stmt, strlen(di->stmt));
+		}
+		return _init_reconnect(upi);
+	}
+
+	/* enable 'real' logging */
+	di->interp = &__interp_db;
+
+	di->reconnect = 0;
+
+	/* call the interpreter function to actually write the
+	 * log line that we wanted to write */
+	return __interp_db(upi);
+}
+
+static int __treat_backlog(struct ulogd_pluginstance *upi)
+{
+	struct db_instance *di = (struct db_instance *) &upi->private;
+	int i = di->backlog_oneshot;
+	struct db_stmt *query;
+	struct db_stmt *nquery;
+
+	/* Don't try reconnect before timeout */
+	if (di->reconnect && di->reconnect > time(NULL))
+		return 0;
+
+	llist_for_each_entry_safe(query, nquery, &di->backlog, list) {
+		if (di->driver->execute(upi, query->stmt, query->len) < 0) {
+			/* error occur, database connexion need to be closed */
+			di->driver->close_db(upi);
+			return _init_reconnect(upi);
+		} else {
+			di->backlog_memusage -= query->len + sizeof(struct db_stmt);
+			llist_del(&query->list);
+			free(query->stmt);
+			free(query);
+		}
+		if (--i < 0)
+			break;
+	}
+	return 0;
+}
+
+/* our main output function, called by ulogd */
+static int __interp_db(struct ulogd_pluginstance *upi)
+{
+	struct db_instance *di = (struct db_instance *) &upi->private;
+
+
+	__format_query_db(upi);
 	/* now we have created our statement, insert it */
 
+	/* if backup log is not empty we add current query to it */
+	if (! llist_empty(&di->backlog)) {
+		int ret = __add_to_backlog(upi, di->stmt, strlen(di->stmt));
+		if (ret == 0)
+			return __treat_backlog(upi);
+		else {
+			ret = __treat_backlog(upi);
+			if (ret)
+				return ret;
+			/* try adding once the data to backlog */
+			return __add_to_backlog(upi, di->stmt, strlen(di->stmt));
+		}
+	}
+
 	if (di->driver->execute(upi, di->stmt, strlen(di->stmt)) < 0) {
+		__add_to_backlog(upi, di->stmt, strlen(di->stmt));
 		/* error occur, database connexion need to be closed */
 		di->driver->close_db(upi);
 		return _init_reconnect(upi);
-- 
1.7.10.4


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 07/11] db: suppress field in db structure
  2013-05-10  6:48 [PATCH RFC 0/11] ulogd2 db rework Eric Leblond
                   ` (5 preceding siblings ...)
  2013-05-10  6:48 ` [PATCH 06/11] db: store data in memory during database downtime Eric Leblond
@ 2013-05-10  6:48 ` Eric Leblond
  2013-05-10  6:48 ` [PATCH 08/11] db: use offset instead of direct pointer Eric Leblond
                   ` (3 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Eric Leblond @ 2013-05-10  6:48 UTC (permalink / raw)
  To: netfilter-devel; +Cc: Eric Leblond

The field is currently only used in a single function as a string
pointer and can thus be removed from the db instance structure.

Signed-off-by: Eric Leblond <eric@regit.org>
---
 include/ulogd/db.h |    2 +-
 util/db.c          |   36 ++++++++++++++++++------------------
 2 files changed, 19 insertions(+), 19 deletions(-)

diff --git a/include/ulogd/db.h b/include/ulogd/db.h
index a533902..a02afb5 100644
--- a/include/ulogd/db.h
+++ b/include/ulogd/db.h
@@ -1,6 +1,7 @@
 /* DB handling functions
  *
  * (C) 2000-2005 by Harald Welte <laforge@gnumonks.org>
+ * (C) 2013 by Eric Leblond <eric@regit.org>
  *
  * This code is distributed under the terms of GNU GPL version 2 */
 
@@ -29,7 +30,6 @@ struct db_stmt {
 struct db_instance {
 	char *stmt; /* buffer for our insert statement */
 	char *stmt_val; /* pointer to the beginning of the "VALUES" part */
-	char *stmt_ins; /* pointer to current inser position in statement */
 	char *schema;
 	time_t reconnect;
 	int (*interp)(struct ulogd_pluginstance *upi);
diff --git a/util/db.c b/util/db.c
index d125e21..ec0c045 100644
--- a/util/db.c
+++ b/util/db.c
@@ -266,7 +266,7 @@ static void __format_query_db(struct ulogd_pluginstance *upi)
 
 	unsigned int i;
 
-	di->stmt_ins = di->stmt_val;
+	char * stmt_ins = di->stmt_val;
 
 	for (i = 0; i < upi->input.num_keys; i++) {
 		struct ulogd_key *res = upi->input.keys[i].u.source;
@@ -280,52 +280,52 @@ static void __format_query_db(struct ulogd_pluginstance *upi)
 			
 		if (!res || !IS_VALID(*res)) {
 			/* no result, we have to fake something */
-			di->stmt_ins += sprintf(di->stmt_ins, "NULL,");
+			stmt_ins += sprintf(stmt_ins, "NULL,");
 			continue;
 		}
 		
 		switch (res->type) {
 		case ULOGD_RET_INT8:
-			sprintf(di->stmt_ins, "%d,", res->u.value.i8);
+			sprintf(stmt_ins, "%d,", res->u.value.i8);
 			break;
 		case ULOGD_RET_INT16:
-			sprintf(di->stmt_ins, "%d,", res->u.value.i16);
+			sprintf(stmt_ins, "%d,", res->u.value.i16);
 			break;
 		case ULOGD_RET_INT32:
-			sprintf(di->stmt_ins, "%d,", res->u.value.i32);
+			sprintf(stmt_ins, "%d,", res->u.value.i32);
 			break;
 		case ULOGD_RET_INT64:
-			sprintf(di->stmt_ins, "%" PRId64 ",", res->u.value.i64);
+			sprintf(stmt_ins, "%" PRId64 ",", res->u.value.i64);
 			break;
 		case ULOGD_RET_UINT8:
-			sprintf(di->stmt_ins, "%u,", res->u.value.ui8);
+			sprintf(stmt_ins, "%u,", res->u.value.ui8);
 			break;
 		case ULOGD_RET_UINT16:
-			sprintf(di->stmt_ins, "%u,", res->u.value.ui16);
+			sprintf(stmt_ins, "%u,", res->u.value.ui16);
 			break;
 		case ULOGD_RET_IPADDR:
 			/* fallthrough when logging IP as u_int32_t */
 		case ULOGD_RET_UINT32:
-			sprintf(di->stmt_ins, "%u,", res->u.value.ui32);
+			sprintf(stmt_ins, "%u,", res->u.value.ui32);
 			break;
 		case ULOGD_RET_UINT64:
-			sprintf(di->stmt_ins, "%" PRIu64 ",", res->u.value.ui64);
+			sprintf(stmt_ins, "%" PRIu64 ",", res->u.value.ui64);
 			break;
 		case ULOGD_RET_BOOL:
-			sprintf(di->stmt_ins, "'%d',", res->u.value.b);
+			sprintf(stmt_ins, "'%d',", res->u.value.b);
 			break;
 		case ULOGD_RET_STRING:
-			*(di->stmt_ins++) = '\'';
+			*(stmt_ins++) = '\'';
 			if (res->u.value.ptr) {
-				di->stmt_ins +=
-				di->driver->escape_string(upi, di->stmt_ins,
+				stmt_ins +=
+				di->driver->escape_string(upi, stmt_ins,
 							  res->u.value.ptr,
 							strlen(res->u.value.ptr));
 			}
-			sprintf(di->stmt_ins, "',");
+			sprintf(stmt_ins, "',");
 			break;
 		case ULOGD_RET_RAWSTR:
-			sprintf(di->stmt_ins, "%s,", (char *) res->u.value.ptr);
+			sprintf(stmt_ins, "%s,", (char *) res->u.value.ptr);
 			break;
 		case ULOGD_RET_RAW:
 			ulogd_log(ULOGD_NOTICE,
@@ -336,9 +336,9 @@ static void __format_query_db(struct ulogd_pluginstance *upi)
 				res->type, upi->input.keys[i].name);
 			break;
 		}
-		di->stmt_ins = di->stmt + strlen(di->stmt);
+		stmt_ins = di->stmt + strlen(di->stmt);
 	}
-	*(di->stmt_ins - 1) = ')';
+	*(stmt_ins - 1) = ')';
 }
 
 static int __add_to_backlog(struct ulogd_pluginstance *upi, const char *stmt, unsigned int len)
-- 
1.7.10.4


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 08/11] db: use offset instead of direct pointer.
  2013-05-10  6:48 [PATCH RFC 0/11] ulogd2 db rework Eric Leblond
                   ` (6 preceding siblings ...)
  2013-05-10  6:48 ` [PATCH 07/11] db: suppress field in db structure Eric Leblond
@ 2013-05-10  6:48 ` Eric Leblond
  2013-05-10  6:48 ` [PATCH 09/11] db: add ring buffer for DB query Eric Leblond
                   ` (2 subsequent siblings)
  10 siblings, 0 replies; 15+ messages in thread
From: Eric Leblond @ 2013-05-10  6:48 UTC (permalink / raw)
  To: netfilter-devel; +Cc: Eric Leblond

Use an offset approach to get the start of values printing area. It
is more generic and will be use soon.

Signed-off-by: Eric Leblond <eric@regit.org>
---
 include/ulogd/db.h |    2 +-
 util/db.c          |   16 +++++++++-------
 2 files changed, 10 insertions(+), 8 deletions(-)

diff --git a/include/ulogd/db.h b/include/ulogd/db.h
index a02afb5..82f37b9 100644
--- a/include/ulogd/db.h
+++ b/include/ulogd/db.h
@@ -29,7 +29,7 @@ struct db_stmt {
 
 struct db_instance {
 	char *stmt; /* buffer for our insert statement */
-	char *stmt_val; /* pointer to the beginning of the "VALUES" part */
+	int stmt_offset; /* offset to the beginning of the "VALUES" part */
 	char *schema;
 	time_t reconnect;
 	int (*interp)(struct ulogd_pluginstance *upi);
diff --git a/util/db.c b/util/db.c
index ec0c045..1a11173 100644
--- a/util/db.c
+++ b/util/db.c
@@ -66,6 +66,7 @@ static int sql_createstmt(struct ulogd_pluginstance *upi)
 	unsigned int i;
 	char *table = table_ce(upi->config_kset).u.string;
 	char *procedure = procedure_ce(upi->config_kset).u.string;
+	char *stmt_val = NULL;
 
 	if (mi->stmt)
 		free(mi->stmt);
@@ -106,7 +107,7 @@ static int sql_createstmt(struct ulogd_pluginstance *upi)
 		else
 			sprintf(mi->stmt, "%s (", procedure);
 
-		mi->stmt_val = mi->stmt + strlen(mi->stmt);
+		stmt_val = mi->stmt + strlen(mi->stmt);
 
 		for (i = 0; i < upi->input.num_keys; i++) {
 			if (upi->input.keys[i].flags & ULOGD_KEYF_INACTIVE)
@@ -115,19 +116,20 @@ static int sql_createstmt(struct ulogd_pluginstance *upi)
 			strncpy(buf, upi->input.keys[i].name, ULOGD_MAX_KEYLEN);	
 			while ((underscore = strchr(buf, '.')))
 				*underscore = '_';
-			sprintf(mi->stmt_val, "%s,", buf);
-			mi->stmt_val = mi->stmt + strlen(mi->stmt);
+			sprintf(stmt_val, "%s,", buf);
+			stmt_val = mi->stmt + strlen(mi->stmt);
 		}
-		*(mi->stmt_val - 1) = ')';
+		*(stmt_val - 1) = ')';
 
-		sprintf(mi->stmt_val, " values (");
+		sprintf(stmt_val, " values (");
 	} else if (strncasecmp(procedure,"CALL", strlen("CALL")) == 0) {
 		sprintf(mi->stmt, "CALL %s(", procedure);
 	} else {
 		sprintf(mi->stmt, "SELECT %s(", procedure);
 
 	}
-	mi->stmt_val = mi->stmt + strlen(mi->stmt);
+
+	mi->stmt_offset = strlen(mi->stmt);
 
 	ulogd_log(ULOGD_DEBUG, "stmt='%s'\n", mi->stmt);
 
@@ -266,7 +268,7 @@ static void __format_query_db(struct ulogd_pluginstance *upi)
 
 	unsigned int i;
 
-	char * stmt_ins = di->stmt_val;
+	char * stmt_ins = di->stmt + di->stmt_offset;
 
 	for (i = 0; i < upi->input.num_keys; i++) {
 		struct ulogd_key *res = upi->input.keys[i].u.source;
-- 
1.7.10.4


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 09/11] db: add ring buffer for DB query
  2013-05-10  6:48 [PATCH RFC 0/11] ulogd2 db rework Eric Leblond
                   ` (7 preceding siblings ...)
  2013-05-10  6:48 ` [PATCH 08/11] db: use offset instead of direct pointer Eric Leblond
@ 2013-05-10  6:48 ` Eric Leblond
  2013-05-11 19:29   ` Pablo Neira Ayuso
  2013-05-10  6:48 ` [PATCH 10/11] db: disable SIGHUP if ring buffer is used Eric Leblond
  2013-05-10  6:48 ` [PATCH 11/11] db: db ring has precedence over backlog Eric Leblond
  10 siblings, 1 reply; 15+ messages in thread
From: Eric Leblond @ 2013-05-10  6:48 UTC (permalink / raw)
  To: netfilter-devel; +Cc: Eric Leblond

This patch adds an optional ring buffer option which modify
the way database queries are made. The main thread is only handling
kernel message reading and query formatting. The SQL request is made
in a separate dedicated thread.
The idea is to try to avoid buffer overrun by minimizing the time
requested to treat kernel message. Doing synchronous SQL request, as
it was made before was causing a delay which could cause some messages
to be lost in case of burst from kernel side.

Signed-off-by: Eric Leblond <eric@regit.org>
---
 configure.ac       |    2 +
 include/ulogd/db.h |   31 ++++++++++-
 src/Makefile.am    |    2 +-
 ulogd.conf.in      |    4 ++
 util/db.c          |  152 ++++++++++++++++++++++++++++++++++++++++++++++++----
 5 files changed, 178 insertions(+), 13 deletions(-)

diff --git a/configure.ac b/configure.ac
index 7e04531..7351749 100644
--- a/configure.ac
+++ b/configure.ac
@@ -39,6 +39,8 @@ AC_CHECK_FUNCS(socket strerror)
 regular_CFLAGS="-Wall -Wextra -Wno-unused-parameter"
 AC_SUBST([regular_CFLAGS])
 
+AC_CHECK_LIB(pthread, pthread_create)
+
 dnl Check for the right nfnetlink version
 PKG_CHECK_MODULES([LIBNFNETLINK], [libnfnetlink >= 1.0.1])
 PKG_CHECK_MODULES([LIBMNL], [libmnl >= 1.0.3])
diff --git a/include/ulogd/db.h b/include/ulogd/db.h
index 82f37b9..823f462 100644
--- a/include/ulogd/db.h
+++ b/include/ulogd/db.h
@@ -21,6 +21,24 @@ struct db_driver {
 			const char *stmt, unsigned int len);
 };
 
+enum {
+	RING_NO_QUERY,
+	RING_QUERY_READY,
+};
+
+struct db_stmt_ring {
+	/* Ring buffer: 1 status byte + string */
+	char* ring; /* pointer to the ring */
+	uint32_t size; /* size of ring buffer in element */
+	int length; /* length of one ring buffer element */
+	uint32_t wr_item; /* write item in ring buffer */
+	uint32_t rd_item; /* read item in ring buffer */
+	char *wr_place;
+	pthread_cond_t cond;
+	pthread_mutex_t mutex;
+	int full;
+};
+
 struct db_stmt {
 	char *stmt;
 	int len;
@@ -34,6 +52,10 @@ struct db_instance {
 	time_t reconnect;
 	int (*interp)(struct ulogd_pluginstance *upi);
 	struct db_driver *driver;
+	/* DB ring buffer */
+	struct db_stmt_ring ring;
+	pthread_t db_thread_id;
+	/* Backlog system */
 	unsigned int backlog_memcap;
 	unsigned int backlog_memusage;
 	unsigned int backlog_oneshot;
@@ -43,6 +65,7 @@ struct db_instance {
 #define TIME_ERR		((time_t)-1)	/* Be paranoid */
 #define RECONNECT_DEFAULT	2
 #define MAX_ONESHOT_REQUEST	10
+#define RING_BUFFER_DEFAULT_SIZE	10
 
 #define DB_CES							\
 		{						\
@@ -73,15 +96,21 @@ struct db_instance {
 			.key = "backlog_oneshot_requests",	\
 			.type = CONFIG_TYPE_INT,		\
 			.u.value = MAX_ONESHOT_REQUEST,		\
+		},						\
+		{						\
+			.key = "ring_buffer_size",		\
+			.type = CONFIG_TYPE_INT,		\
+			.u.value = RING_BUFFER_DEFAULT_SIZE,	\
 		}
 
-#define DB_CE_NUM		6
+#define DB_CE_NUM		7
 #define table_ce(x)		(x->ces[0])
 #define reconnect_ce(x)		(x->ces[1])
 #define timeout_ce(x)		(x->ces[2])
 #define procedure_ce(x)		(x->ces[3])
 #define backlog_memcap_ce(x)	(x->ces[4])
 #define backlog_oneshot_ce(x)	(x->ces[5])
+#define ringsize_ce(x)		(x->ces[6])
 
 void ulogd_db_signal(struct ulogd_pluginstance *upi, int signal);
 int ulogd_db_start(struct ulogd_pluginstance *upi);
diff --git a/src/Makefile.am b/src/Makefile.am
index e462cb2..1097468 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -8,4 +8,4 @@ sbin_PROGRAMS = ulogd
 
 ulogd_SOURCES = ulogd.c select.c timer.c rbtree.c conffile.c hash.c addr.c
 ulogd_LDADD   = ${libdl_LIBS}
-ulogd_LDFLAGS = -export-dynamic
+ulogd_LDFLAGS = -export-dynamic -lpthread
diff --git a/ulogd.conf.in b/ulogd.conf.in
index 3e5e648..11a56d6 100644
--- a/ulogd.conf.in
+++ b/ulogd.conf.in
@@ -233,6 +233,10 @@ pass="changeme"
 procedure="INSERT_PACKET_FULL"
 #backlog_memcap=1000000
 #backlog_oneshot_requests=10
+# If superior to 1 a thread dedicated to SQL request execution
+# is created. The value stores the number of SQL request to keep
+# in the ring buffer
+#ring_buffer_size=1000
 
 [pgsql2]
 db="nulog"
diff --git a/util/db.c b/util/db.c
index 1a11173..4050f0f 100644
--- a/util/db.c
+++ b/util/db.c
@@ -8,6 +8,7 @@
  *           (C) 2005 Sven Schuster <schuster.sven@gmx.de>,
  *           (C) 2005 Jozsef Kadlecsik <kadlec@blackhole.kfki.hu>
  *           (C) 2008 Eric Leblond <eric@inl.fr>
+ *           (C) 2013 Eric Leblond <eric@regit.org>
  *
  *  This program is free software; you can redistribute it and/or modify
  *  it under the terms of the GNU General Public License version 2 
@@ -32,6 +33,7 @@
 #include <arpa/inet.h>
 #include <time.h>
 #include <inttypes.h>
+#include <pthread.h>
 
 #include <ulogd/ulogd.h>
 #include <ulogd/db.h>
@@ -90,6 +92,7 @@ static int sql_createstmt(struct ulogd_pluginstance *upi)
 		ulogd_log(ULOGD_ERROR, "OOM!\n");
 		return -ENOMEM;
 	}
+	mi->ring.length = size + 1;
 
 	if (strncasecmp(procedure,"INSERT", strlen("INSERT")) == 0 &&
 	    (procedure[strlen("INSERT")] == '\0' ||
@@ -138,6 +141,8 @@ static int sql_createstmt(struct ulogd_pluginstance *upi)
 
 static int _init_db(struct ulogd_pluginstance *upi);
 
+static void *__inject_thread(void *gdi);
+
 int ulogd_db_configure(struct ulogd_pluginstance *upi,
 			struct ulogd_pluginstance_stack *stack)
 {
@@ -185,6 +190,9 @@ int ulogd_db_configure(struct ulogd_pluginstance *upi,
 		di->backlog_full = 0;
 	}
 
+	/* check ring option */
+	di->ring.size = ringsize_ce(upi->config_kset).u.value;
+
 	return ret;
 }
 
@@ -192,6 +200,7 @@ int ulogd_db_start(struct ulogd_pluginstance *upi)
 {
 	struct db_instance *di = (struct db_instance *) upi->private;
 	int ret;
+	unsigned int i;
 
 	ulogd_log(ULOGD_NOTICE, "starting\n");
 
@@ -201,11 +210,51 @@ int ulogd_db_start(struct ulogd_pluginstance *upi)
 
 	ret = sql_createstmt(upi);
 	if (ret < 0)
-		di->driver->close_db(upi);
+		goto db_error;
+
+	if (di->ring.size > 0) {
+		/* allocate */
+		di->ring.ring = calloc(di->ring.size, sizeof(char) * di->ring.length);
+		if (di->ring.ring == NULL) {
+			ret = -1;
+			goto db_error;
+		}
+		di->ring.wr_place = di->ring.ring;
+		ulogd_log(ULOGD_NOTICE,
+			  "Allocating %d elements of size %d for ring\n",
+			  di->ring.size, di->ring.length);
+		/* init start of query for each element */
+		for(i = 0; i < di->ring.size; i++) {
+			strncpy(di->ring.ring + di->ring.length * i + 1,
+				di->stmt,
+				strlen(di->stmt));
+		}
+		/* init cond & mutex */
+		ret = pthread_cond_init(&di->ring.cond, NULL);
+		if (ret != 0)
+			goto alloc_error;
+		ret = pthread_mutex_init(&di->ring.mutex, NULL);
+		if (ret != 0)
+			goto cond_error;
+		/* create thread */
+		ret = pthread_create(&di->db_thread_id, NULL, __inject_thread, upi);
+		if (ret != 0)
+			goto mutex_error;
+	}
 
 	di->interp = &_init_db;
 
 	return ret;
+
+mutex_error:
+	pthread_mutex_destroy(&di->ring.mutex);
+cond_error:
+	pthread_cond_destroy(&di->ring.cond);
+alloc_error:
+	free(di->ring.ring);
+db_error:
+	di->driver->close_db(upi);
+	return ret;
 }
 
 static int ulogd_db_instance_stop(struct ulogd_pluginstance *upi)
@@ -219,7 +268,13 @@ static int ulogd_db_instance_stop(struct ulogd_pluginstance *upi)
 		free(di->stmt);
 		di->stmt = NULL;
 	}
-
+	if (di->ring.size > 0) {
+		pthread_cancel(di->db_thread_id);
+		free(di->ring.ring);
+		pthread_cond_destroy(&di->ring.cond);
+		pthread_mutex_destroy(&di->ring.mutex);
+		di->ring.ring = NULL;
+	}
 	return 0;
 }
 
@@ -262,13 +317,13 @@ static int _init_reconnect(struct ulogd_pluginstance *upi)
 	return 0;
 }
 
-static void __format_query_db(struct ulogd_pluginstance *upi)
+static void __format_query_db(struct ulogd_pluginstance *upi, char *start)
 {
 	struct db_instance *di = (struct db_instance *) &upi->private;
 
 	unsigned int i;
 
-	char * stmt_ins = di->stmt + di->stmt_offset;
+	char * stmt_ins = start + di->stmt_offset;
 
 	for (i = 0; i < upi->input.num_keys; i++) {
 		struct ulogd_key *res = upi->input.keys[i].u.source;
@@ -279,13 +334,13 @@ static void __format_query_db(struct ulogd_pluginstance *upi)
 		if (!res)
 			ulogd_log(ULOGD_NOTICE, "no source for `%s' ?!?\n",
 				  upi->input.keys[i].name);
-			
+
 		if (!res || !IS_VALID(*res)) {
 			/* no result, we have to fake something */
 			stmt_ins += sprintf(stmt_ins, "NULL,");
 			continue;
 		}
-		
+
 		switch (res->type) {
 		case ULOGD_RET_INT8:
 			sprintf(stmt_ins, "%d,", res->u.value.i8);
@@ -338,7 +393,7 @@ static void __format_query_db(struct ulogd_pluginstance *upi)
 				res->type, upi->input.keys[i].name);
 			break;
 		}
-		stmt_ins = di->stmt + strlen(di->stmt);
+		stmt_ins = start + strlen(start);
 	}
 	*(stmt_ins - 1) = ')';
 }
@@ -388,7 +443,7 @@ static int _init_db(struct ulogd_pluginstance *upi)
 	if (di->reconnect && di->reconnect > time(NULL)) {
 		/* store entry to backlog if it is active */
 		if (di->backlog_memcap && !di->backlog_full) {
-			__format_query_db(upi);
+			__format_query_db(upi, di->stmt);
 			__add_to_backlog(upi, di->stmt,
 						strlen(di->stmt));
 		}
@@ -398,7 +453,7 @@ static int _init_db(struct ulogd_pluginstance *upi)
 	if (di->driver->open_db(upi)) {
 		ulogd_log(ULOGD_ERROR, "can't establish database connection\n");
 		if (di->backlog_memcap && !di->backlog_full) {
-			__format_query_db(upi);
+			__format_query_db(upi, di->stmt);
 			__add_to_backlog(upi, di->stmt, strlen(di->stmt));
 		}
 		return _init_reconnect(upi);
@@ -442,14 +497,39 @@ static int __treat_backlog(struct ulogd_pluginstance *upi)
 	return 0;
 }
 
+static int __add_to_ring(struct ulogd_pluginstance *upi, struct db_instance *di)
+{
+	if (*di->ring.wr_place == RING_QUERY_READY) {
+		if (di->ring.full == 0) {
+			ulogd_log(ULOGD_ERROR, "No place left in ring\n");
+			di->ring.full = 1;
+		}
+		return ULOGD_IRET_OK;
+	} else if (di->ring.full) {
+		ulogd_log(ULOGD_NOTICE, "Recovered some place in ring\n");
+		di->ring.full = 0;
+	}
+	__format_query_db(upi, di->ring.wr_place + 1);
+	*di->ring.wr_place = RING_QUERY_READY;
+	pthread_cond_signal(&di->ring.cond);
+	di->ring.wr_item ++;
+	di->ring.wr_place += di->ring.length;
+	if (di->ring.wr_item == di->ring.size) {
+		di->ring.wr_item = 0;
+		di->ring.wr_place = di->ring.ring;
+	}
+	return ULOGD_IRET_OK;
+}
+
 /* our main output function, called by ulogd */
 static int __interp_db(struct ulogd_pluginstance *upi)
 {
 	struct db_instance *di = (struct db_instance *) &upi->private;
 
+	if (di->ring.size)
+		return __add_to_ring(upi, di);
 
-	__format_query_db(upi);
-	/* now we have created our statement, insert it */
+	__format_query_db(upi, di->stmt);
 
 	/* if backup log is not empty we add current query to it */
 	if (! llist_empty(&di->backlog)) {
@@ -475,6 +555,56 @@ static int __interp_db(struct ulogd_pluginstance *upi)
 	return 0;
 }
 
+static int __loop_reconnect_db(struct ulogd_pluginstance * upi) {
+	struct db_instance *di = (struct db_instance *) &upi->private;
+
+	di->driver->close_db(upi);
+	while (1) {
+		if (di->driver->open_db(upi)) {
+			sleep(1);
+		} else {
+			return 0;
+		}
+	}
+	return 0;
+}
+
+static void *__inject_thread(void *gdi)
+{
+	struct ulogd_pluginstance *upi = (struct ulogd_pluginstance *) gdi;
+	struct db_instance *di = (struct db_instance *) &upi->private;
+	char *wr_place;
+
+	wr_place = di->ring.ring;
+	pthread_mutex_lock(&di->ring.mutex);
+	while(1) {
+		/* wait cond */
+		pthread_cond_wait(&di->ring.cond, &di->ring.mutex);
+		while (*wr_place == RING_QUERY_READY) {
+			if (di->driver->execute(upi, wr_place + 1,
+						strlen(wr_place + 1)) < 0) {
+				if (__loop_reconnect_db(upi) != 0) {
+					/* loop has failed on unrecoverable error */
+					ulogd_log(ULOGD_ERROR,
+						  "permanently disabling plugin\n");
+					di->interp = &disabled_interp_db;
+					return NULL;
+				}
+			}
+			*wr_place = RING_NO_QUERY;
+			di->ring.rd_item++;
+			if (di->ring.rd_item == di->ring.size) {
+				di->ring.rd_item = 0;
+				wr_place = di->ring.ring;
+			} else
+				wr_place += di->ring.length;
+		}
+	}
+
+	return NULL;
+}
+
+
 void ulogd_db_signal(struct ulogd_pluginstance *upi, int signal)
 {
 	switch (signal) {
-- 
1.7.10.4


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 10/11] db: disable SIGHUP if ring buffer is used.
  2013-05-10  6:48 [PATCH RFC 0/11] ulogd2 db rework Eric Leblond
                   ` (8 preceding siblings ...)
  2013-05-10  6:48 ` [PATCH 09/11] db: add ring buffer for DB query Eric Leblond
@ 2013-05-10  6:48 ` Eric Leblond
  2013-05-10  6:48 ` [PATCH 11/11] db: db ring has precedence over backlog Eric Leblond
  10 siblings, 0 replies; 15+ messages in thread
From: Eric Leblond @ 2013-05-10  6:48 UTC (permalink / raw)
  To: netfilter-devel; +Cc: Eric Leblond

The handling of signal when using threads can be complicated. When
ring buffer is used for query, this means ulogd will have to follow
some sort of mutex. Thus, it is easier and better performance wise
to disable the reload via SIGHUP when the ring buffer is used.

Signed-off-by: Eric Leblond <eric@regit.org>
---
 util/db.c |   11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git a/util/db.c b/util/db.c
index 4050f0f..c23c362 100644
--- a/util/db.c
+++ b/util/db.c
@@ -607,11 +607,16 @@ static void *__inject_thread(void *gdi)
 
 void ulogd_db_signal(struct ulogd_pluginstance *upi, int signal)
 {
+	struct db_instance *di = (struct db_instance *) &upi->private;
 	switch (signal) {
 	case SIGHUP:
-		/* reopen database connection */
-		ulogd_db_instance_stop(upi);
-		ulogd_db_start(upi);
+		if (!di->ring.size) {
+			/* reopen database connection */
+			ulogd_db_instance_stop(upi);
+			ulogd_db_start(upi);
+		} else
+			ulogd_log(ULOGD_ERROR,
+				  "No SIGHUP handling if ring buffer is used\n");
 		break;
 	default:
 		break;
-- 
1.7.10.4


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 11/11] db: db ring has precedence over backlog.
  2013-05-10  6:48 [PATCH RFC 0/11] ulogd2 db rework Eric Leblond
                   ` (9 preceding siblings ...)
  2013-05-10  6:48 ` [PATCH 10/11] db: disable SIGHUP if ring buffer is used Eric Leblond
@ 2013-05-10  6:48 ` Eric Leblond
  10 siblings, 0 replies; 15+ messages in thread
From: Eric Leblond @ 2013-05-10  6:48 UTC (permalink / raw)
  To: netfilter-devel; +Cc: Eric Leblond


Signed-off-by: Eric Leblond <eric@regit.org>
---
 util/db.c |   12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git a/util/db.c b/util/db.c
index c23c362..6be06e6 100644
--- a/util/db.c
+++ b/util/db.c
@@ -177,9 +177,14 @@ int ulogd_db_configure(struct ulogd_pluginstance *upi,
 
 	INIT_LLIST_HEAD(&di->backlog);
 	di->backlog_memusage = 0;
-	
+
+	di->ring.size = ringsize_ce(upi->config_kset).u.value;
 	di->backlog_memcap = backlog_memcap_ce(upi->config_kset).u.value;
-	if (di->backlog_memcap > 0) {
+
+	if (di->ring.size && di->backlog_memcap) {
+		ulogd_log(ULOGD_ERROR, "Ring buffer has precedence over backlog\n");
+		di->backlog_memcap = 0;
+	} else if (di->backlog_memcap > 0) {
 		di->backlog_oneshot = backlog_oneshot_ce(upi->config_kset).u.value;
 		if (di->backlog_oneshot <= 2) {
 			ulogd_log(ULOGD_ERROR,
@@ -190,9 +195,6 @@ int ulogd_db_configure(struct ulogd_pluginstance *upi,
 		di->backlog_full = 0;
 	}
 
-	/* check ring option */
-	di->ring.size = ringsize_ce(upi->config_kset).u.value;
-
 	return ret;
 }
 
-- 
1.7.10.4


^ permalink raw reply related	[flat|nested] 15+ messages in thread

* Re: [PATCH 09/11] db: add ring buffer for DB query
  2013-05-10  6:48 ` [PATCH 09/11] db: add ring buffer for DB query Eric Leblond
@ 2013-05-11 19:29   ` Pablo Neira Ayuso
  2013-05-12 10:29     ` Eric Leblond
  0 siblings, 1 reply; 15+ messages in thread
From: Pablo Neira Ayuso @ 2013-05-11 19:29 UTC (permalink / raw)
  To: Eric Leblond; +Cc: netfilter-devel

Hi Eric,

On Fri, May 10, 2013 at 08:48:56AM +0200, Eric Leblond wrote:
> This patch adds an optional ring buffer option which modify
> the way database queries are made. The main thread is only handling
> kernel message reading and query formatting. The SQL request is made
> in a separate dedicated thread.
> The idea is to try to avoid buffer overrun by minimizing the time
> requested to treat kernel message. Doing synchronous SQL request, as
> it was made before was causing a delay which could cause some messages
> to be lost in case of burst from kernel side.

Would be feasible to make asynchronous SQL requests instead, so you
can skip the use of pthread?

Regards.

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [PATCH 09/11] db: add ring buffer for DB query
  2013-05-11 19:29   ` Pablo Neira Ayuso
@ 2013-05-12 10:29     ` Eric Leblond
  2013-05-21 13:38       ` Pablo Neira Ayuso
  0 siblings, 1 reply; 15+ messages in thread
From: Eric Leblond @ 2013-05-12 10:29 UTC (permalink / raw)
  To: Pablo Neira Ayuso; +Cc: netfilter-devel

[-- Attachment #1: Type: text/plain, Size: 3104 bytes --]

Hi,

Le samedi 11 mai 2013 à 21:29 +0200, Pablo Neira Ayuso a écrit :
> Hi Eric,
> 
> On Fri, May 10, 2013 at 08:48:56AM +0200, Eric Leblond wrote:
> > This patch adds an optional ring buffer option which modify
> > the way database queries are made. The main thread is only handling
> > kernel message reading and query formatting. The SQL request is made
> > in a separate dedicated thread.
> > The idea is to try to avoid buffer overrun by minimizing the time
> > requested to treat kernel message. Doing synchronous SQL request, as
> > it was made before was causing a delay which could cause some messages
> > to be lost in case of burst from kernel side.
> 
> Would be feasible to make asynchronous SQL requests instead, so you
> can skip the use of pthread?

That is an excellent question :)

Here's a list of points. From the small to big one.

From a performance point of view, ulogd will still be slow down by the
request time due to the network communication with the db . I think the
effect is really negligible but may have an impact in the case of
distant db.

From a lazy point of view, this would require to update all the database
backends instead of simply updating the db abstraction layer as it is
done with the proposed patch.

Regarding the use of asynchronous request, I've always stop trying using
them after reading the doc. So I've got no practical experience here and
correct someone correct me if I'm wrong. What I've understood is that
there is no "take my query and just do your work" function. Application
has to get the result to the query and free them.

For example, let's have a look at postgresql case. The doc on
asynchronous API is here:
http://www.postgresql.org/docs/6.4/static/libpq-chapter17044.htm

Here's an interesting part of the documentation:
        
        PQsendQuery: Submit a query to Postgres without waiting for the
        result(s). TRUE is returned if the query was successfully
        dispatched, FALSE if not (in which case, use PQerrorMessage to
        get more information about the failure).
        After successfully calling PQsendQuery, call PQgetResult one or
        more times to obtain the query results. PQsendQuery may not be
        called again (on the same connection) until PQgetResult has
        returned NULL, indicating that the query is done.

On other interesting point regarding PQgetResult:

        Don't forget to free each result object with PQclear when done
        with it.

So, the modification would involve doing query with PQsendQuery and
repeatedly call the PQgetResult function to get the result and be able
to free it with PQclear.

I think this system does not really fit with ulogd task which consists
in multiple short queries. It is far more adapted for long queries and
for GUI using them as ulogd will continuously be calling PQgetResult to
free result it will not use. So, it is complicated and does not seem
really adapted.

Here's the points that make me consider using thread instead of
asynchronous call.

BR,
--
Eric


[-- Attachment #2: This is a digitally signed message part --]
[-- Type: application/pgp-signature, Size: 190 bytes --]

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [PATCH 09/11] db: add ring buffer for DB query
  2013-05-12 10:29     ` Eric Leblond
@ 2013-05-21 13:38       ` Pablo Neira Ayuso
  0 siblings, 0 replies; 15+ messages in thread
From: Pablo Neira Ayuso @ 2013-05-21 13:38 UTC (permalink / raw)
  To: Eric Leblond; +Cc: netfilter-devel

Hi Eric,

On Sun, May 12, 2013 at 12:29:29PM +0200, Eric Leblond wrote:
> Le samedi 11 mai 2013 à 21:29 +0200, Pablo Neira Ayuso a écrit :
> > On Fri, May 10, 2013 at 08:48:56AM +0200, Eric Leblond wrote:
> > > This patch adds an optional ring buffer option which modify
> > > the way database queries are made. The main thread is only handling
> > > kernel message reading and query formatting. The SQL request is made
> > > in a separate dedicated thread.
> > > The idea is to try to avoid buffer overrun by minimizing the time
> > > requested to treat kernel message. Doing synchronous SQL request, as
> > > it was made before was causing a delay which could cause some messages
> > > to be lost in case of burst from kernel side.
> > 
> > Would be feasible to make asynchronous SQL requests instead, so you
> > can skip the use of pthread?
[...]
> From a lazy point of view, this would require to update all the database
> backends instead of simply updating the db abstraction layer as it is
> done with the proposed patch.

Fair enough, it provides a generic way to support this and the main
thread should be really taking care of netlink messages coming from
the kernel to avoid the overrun.

If the target is to overcome small bursts, this should help to relief
those peaks for some time, but at some point the intermediate queue
(the ring) will also get full if the situation lasts long.

Regards.
--
To unsubscribe from this list: send the line "unsubscribe netfilter-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

^ permalink raw reply	[flat|nested] 15+ messages in thread

end of thread, other threads:[~2013-05-21 13:38 UTC | newest]

Thread overview: 15+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2013-05-10  6:48 [PATCH RFC 0/11] ulogd2 db rework Eric Leblond
2013-05-10  6:48 ` [PATCH 01/11] ulogd: display stack during configuration Eric Leblond
2013-05-10  6:48 ` [PATCH 02/11] Fix automagic support of dbi, pcap and sqlite3 Eric Leblond
2013-05-10  6:48 ` [PATCH 03/11] postgresql: add sanity checking Eric Leblond
2013-05-10  6:48 ` [PATCH 04/11] mysql: " Eric Leblond
2013-05-10  6:48 ` [PATCH 05/11] sqlite3: " Eric Leblond
2013-05-10  6:48 ` [PATCH 06/11] db: store data in memory during database downtime Eric Leblond
2013-05-10  6:48 ` [PATCH 07/11] db: suppress field in db structure Eric Leblond
2013-05-10  6:48 ` [PATCH 08/11] db: use offset instead of direct pointer Eric Leblond
2013-05-10  6:48 ` [PATCH 09/11] db: add ring buffer for DB query Eric Leblond
2013-05-11 19:29   ` Pablo Neira Ayuso
2013-05-12 10:29     ` Eric Leblond
2013-05-21 13:38       ` Pablo Neira Ayuso
2013-05-10  6:48 ` [PATCH 10/11] db: disable SIGHUP if ring buffer is used Eric Leblond
2013-05-10  6:48 ` [PATCH 11/11] db: db ring has precedence over backlog Eric Leblond

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.