netfilter-devel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
From: Jeremy Sowden <jeremy@azazel.net>
To: Netfilter Devel <netfilter-devel@vger.kernel.org>
Subject: [PATCH ulogd2 v2 v2 25/34] db: synchronize access to ring-buffer
Date: Tue, 29 Nov 2022 21:47:40 +0000	[thread overview]
Message-ID: <20221129214749.247878-26-jeremy@azazel.net> (raw)
In-Reply-To: <20221129214749.247878-1-jeremy@azazel.net>

There are a mutex and condition-variable associated with the
ring-buffer.  However, they are not used to synchronize access to the
buffer, but only to wake the thread that processes the buffer when new
statements are added to it.  Thus there is nothing to prevent concurrent
modifications of the buffer.

Instead, acquire the mutex before adding to the buffer, and, in the
processing thread, copy the statement we're about to execute out of the
buffer and release the mutex while processing it.

Signed-off-by: Jeremy Sowden <jeremy@azazel.net>
---
 include/ulogd/db.h |  2 ++
 util/db.c          | 63 +++++++++++++++++++++++++++++++++++++---------
 2 files changed, 53 insertions(+), 12 deletions(-)

diff --git a/include/ulogd/db.h b/include/ulogd/db.h
index 6c2e3d07f463..bf4a19dea150 100644
--- a/include/ulogd/db.h
+++ b/include/ulogd/db.h
@@ -34,6 +34,8 @@ struct db_stmt_ring {
 	struct db_stmt *elems; /* Buffer containing `size` statements of
 				* `length` bytes */
 
+	struct db_stmt *stmt; /* Currently executing statement */
+
 	uint32_t size; /* No. of elements in ring buffer */
 	uint32_t used; /* No. of elements in ring buffer in use */
 	uint32_t length; /* Length of one element in ring buffer */
diff --git a/util/db.c b/util/db.c
index 487eaed26153..6cfbcbc16791 100644
--- a/util/db.c
+++ b/util/db.c
@@ -372,10 +372,15 @@ _stop_db(struct ulogd_pluginstance *upi)
 	}
 	if (di->ring.size > 0) {
 		pthread_cancel(di->ring.thread_id);
+
 		pthread_cond_destroy(&di->ring.cond);
 		pthread_mutex_destroy(&di->ring.mutex);
+
 		free(di->ring.elems);
+		free(di->ring.stmt);
+
 		di->ring.elems = NULL;
+		di->ring.stmt = NULL;
 	}
 }
 
@@ -737,14 +742,22 @@ _start_ring(struct ulogd_pluginstance *upi)
 		return 0;
 
 	/* allocate */
+
 	stmt_size = sizeof(*di->stmt) + di->stmt->size;
 	stmt_align = __alignof__(*di->stmt);
 	di->ring.length = stmt_size % stmt_align != 0
 		? (1 + stmt_size / stmt_align) * stmt_align
 		: stmt_size;
+
+	di->ring.stmt = malloc(di->ring.length);
+	if (di->ring.stmt == NULL)
+		return -1;
+
 	di->ring.elems = calloc(di->ring.size, di->ring.length);
-	if (di->ring.elems == NULL)
+	if (di->ring.elems == NULL) {
+		free(di->ring.stmt);
 		return -1;
+	}
 	di->ring.wr_idx = di->ring.rd_idx = di->ring.used = 0;
 	ulogd_log(ULOGD_NOTICE,
 		  "Allocating %" PRIu32 " elements of size %" PRIu32 " for ring\n",
@@ -775,6 +788,7 @@ cond_error:
 	pthread_cond_destroy(&di->ring.cond);
 alloc_error:
 	free(di->ring.elems);
+	free(di->ring.stmt);
 
 	return -1;
 }
@@ -784,12 +798,14 @@ _add_to_ring(struct ulogd_pluginstance *upi)
 {
 	struct db_instance *di = (struct db_instance *) &upi->private;
 
+	pthread_mutex_lock(&di->ring.mutex);
+
 	if (di->ring.used == di->ring.size) {
 		if (!di->ring.full) {
 			ulogd_log(ULOGD_ERROR, "No place left in ring\n");
 			di->ring.full = 1;
 		}
-		return ULOGD_IRET_OK;
+		goto unlock_mutex;
 	}
 
 	if (di->ring.full) {
@@ -801,6 +817,9 @@ _add_to_ring(struct ulogd_pluginstance *upi)
 	_incr_ring_used(&di->ring, 1);
 
 	pthread_cond_signal(&di->ring.cond);
+unlock_mutex:
+	pthread_mutex_unlock(&di->ring.mutex);
+
 	return ULOGD_IRET_OK;
 }
 
@@ -809,27 +828,47 @@ _process_ring(void *arg)
 {
 	struct ulogd_pluginstance *upi = arg;
 	struct db_instance *di = (struct db_instance *) &upi->private;
+	struct db_stmt *stmt = di->ring.stmt;
 
 	pthread_mutex_lock(&di->ring.mutex);
+
 	while(1) {
-		/* wait cond */
+
 		pthread_cond_wait(&di->ring.cond, &di->ring.mutex);
+
 		while (di->ring.used > 0) {
-			struct db_stmt *stmt = _get_ring_elem(&di->ring,
-							      di->ring.rd_idx);
-
-			if (di->driver->execute(upi, stmt) < 0) {
-				di->driver->close_db(upi);
-				while (di->driver->open_db(upi) < 0)
-					sleep(1);
-				/* try to re-run statement */
+
+			memcpy(stmt, _get_ring_elem(&di->ring, di->ring.rd_idx),
+			       di->ring.length);
+
+			pthread_mutex_unlock(&di->ring.mutex);
+
+exec_stmt:
+			if (di->driver->execute(upi, stmt) == 0) {
+
+				pthread_mutex_lock(&di->ring.mutex);
+
+				_incr_ring_used(&di->ring, -1);
+
 				continue;
+
 			}
 
-			_incr_ring_used(&di->ring, -1);
+			/* If the exec fails, close the DB connexion and try to
+			 * open it again.  Once the connexion is made, retry the
+			 * statement.
+			 */
+			di->driver->close_db(upi);
+			while (di->driver->open_db(upi) < 0)
+				sleep(1);
+			goto exec_stmt;
+
 		}
+
 	}
 
+	pthread_mutex_unlock(&di->ring.mutex);
+
 	return NULL;
 }
 
-- 
2.35.1


  parent reply	other threads:[~2022-11-29 21:58 UTC|newest]

Thread overview: 40+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2022-11-29 21:47 [PATCH ulogd2 v2 v2 00/34] Refactor of the DB output plug-ins Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 01/34] ulogd: fix parse-error check Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 02/34] filter: fix buffer sizes in filter plug-ins Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 03/34] output: JSON: remove incorrect config value check Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 04/34] db: fix back-log capacity checks Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 05/34] build: add checks to configure.ac Jeremy Sowden
2022-11-30 10:04   ` Jan Engelhardt
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 06/34] src: remove some trailing white space Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 07/34] src: remove zero-valued config-key fields Jeremy Sowden
2022-11-30 10:21   ` Jan Engelhardt
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 08/34] src: parenthesize config-entry macro arguments Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 09/34] src: define constructors and destructors consistently Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 10/34] src: remove `TIME_ERR` macro Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 11/34] src: remove superfluous casts Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 12/34] conffile: replace malloc+strcpy with strdup Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 13/34] output: remove zero-initialized `struct ulogd_plugin` members Jeremy Sowden
2022-11-30 10:26   ` Jan Engelhardt
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 14/34] output: de-duplicate allocation of input keys Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 15/34] db: reorganize source Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 16/34] db: use consistent integer return values to indicate errors Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 17/34] db: change return type of two functions to `void` Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 18/34] db: open-code `_loop_reconnect_db` Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 19/34] db: improve calculation of sql statement length Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 20/34] db: refactor configuration Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 21/34] db: refactor ring-buffer initialization Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 22/34] db: refactor ring-buffer Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 23/34] db: refactor backlog Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 24/34] db: use `struct db_stmt` objects more widely Jeremy Sowden
2022-11-29 21:47 ` Jeremy Sowden [this message]
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 26/34] db: avoid cancelling ring-buffer thread Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 27/34] db, IP2BIN: defer formatting of raw strings Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 28/34] db: add prep & exec support Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 29/34] output: mysql: " Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 30/34] output: pgsql: remove a couple of struct members Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 31/34] output: pgsql: remove variable-length arrays Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 32/34] output: pgsql: tidy up `open_db_pgsql` and fix memory leak Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 33/34] output: pgsql: add prep & exec support Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 34/34] output: sqlite3: reimplement using the common DB API Jeremy Sowden
2022-11-30 10:27 ` [PATCH ulogd2 v2 v2 00/34] Refactor of the DB output plug-ins Pablo Neira Ayuso
2022-11-30 16:03   ` Jeremy Sowden

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20221129214749.247878-26-jeremy@azazel.net \
    --to=jeremy@azazel.net \
    --cc=netfilter-devel@vger.kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).