From: Jeremy Sowden <jeremy@azazel.net>
To: Netfilter Devel <netfilter-devel@vger.kernel.org>
Subject: [PATCH ulogd2 v2 v2 22/34] db: refactor ring-buffer
Date: Tue, 29 Nov 2022 21:47:37 +0000 [thread overview]
Message-ID: <20221129214749.247878-23-jeremy@azazel.net> (raw)
In-Reply-To: <20221129214749.247878-1-jeremy@azazel.net>
* Rename some fields.
* Use `uint32_t` consistently for all sizes and indices.
* Move thread ID into the ring structure.
* Replace status flag with a count of in-use elements.
Signed-off-by: Jeremy Sowden <jeremy@azazel.net>
---
include/ulogd/db.h | 25 ++++++------
util/db.c | 100 +++++++++++++++++++++++++--------------------
2 files changed, 68 insertions(+), 57 deletions(-)
diff --git a/include/ulogd/db.h b/include/ulogd/db.h
index 7c0649583f1d..ebf4f42917c3 100644
--- a/include/ulogd/db.h
+++ b/include/ulogd/db.h
@@ -27,22 +27,22 @@ struct db_driver {
};
-enum {
- RING_NO_QUERY,
- RING_QUERY_READY,
-};
-
struct db_stmt_ring {
- /* Ring buffer: 1 status byte + string */
- char *ring; /* pointer to the ring */
- uint32_t size; /* size of ring buffer in element */
- int length; /* length of one ring buffer element */
- uint32_t wr_item; /* write item in ring buffer */
- uint32_t rd_item; /* read item in ring buffer */
- char *wr_place;
+
+ char *elems; /* Buffer containing `size` strings of `length` bytes */
+
+ uint32_t size; /* No. of elements in ring buffer */
+ uint32_t used; /* No. of elements in ring buffer in use */
+ uint32_t length; /* Length of one element in ring buffer */
+ uint32_t wr_idx; /* Index of next element to write in ring buffer */
+ uint32_t rd_idx; /* Index of next element to read in ring buffer */
+
+ pthread_t thread_id;
pthread_cond_t cond;
pthread_mutex_t mutex;
+
int full;
+
};
struct db_stmt {
@@ -60,7 +60,6 @@ struct db_instance {
struct db_driver *driver;
/* DB ring buffer */
struct db_stmt_ring ring;
- pthread_t db_thread_id;
/* Backlog system */
unsigned int backlog_memcap;
unsigned int backlog_memusage;
diff --git a/util/db.c b/util/db.c
index ee6dfb6b5a2a..8a870846332b 100644
--- a/util/db.c
+++ b/util/db.c
@@ -63,8 +63,10 @@ static int _process_backlog(struct ulogd_pluginstance *upi);
static int _configure_ring(struct ulogd_pluginstance *upi);
static int _start_ring(struct ulogd_pluginstance *upi);
-static int _add_to_ring(struct ulogd_pluginstance *upi, struct db_instance *di);
+static int _add_to_ring(struct ulogd_pluginstance *upi);
static void *_process_ring(void *arg);
+static char *_get_ring_elem(struct db_stmt_ring *r, uint32_t i);
+static void _incr_ring_used(struct db_stmt_ring *r, int incr);
int
ulogd_db_configure(struct ulogd_pluginstance *upi,
@@ -182,17 +184,16 @@ ulogd_db_signal(struct ulogd_pluginstance *upi, int signal)
case SIGTERM:
case SIGINT:
if (di->ring.size) {
- int s = pthread_cancel(di->db_thread_id);
+ int s = pthread_cancel(di->ring.thread_id);
if (s != 0) {
ulogd_log(ULOGD_ERROR,
- "Can't cancel injection thread\n");
+ "Can't cancel ring-processing thread\n");
break;
}
- s = pthread_join(di->db_thread_id, NULL);
+ s = pthread_join(di->ring.thread_id, NULL);
if (s != 0) {
ulogd_log(ULOGD_ERROR,
- "Error waiting for injection thread"
- "cancelation\n");
+ "Error waiting for ring-processing thread cancellation\n");
}
}
break;
@@ -293,7 +294,7 @@ _interp_db_main(struct ulogd_pluginstance *upi)
struct db_instance *di = (struct db_instance *) &upi->private;
if (di->ring.size) {
- if (_add_to_ring(upi, di) < 0)
+ if (_add_to_ring(upi) < 0)
return ULOGD_IRET_ERR;
return ULOGD_IRET_OK;
}
@@ -372,11 +373,11 @@ _stop_db(struct ulogd_pluginstance *upi)
di->stmt = NULL;
}
if (di->ring.size > 0) {
- pthread_cancel(di->db_thread_id);
- free(di->ring.ring);
+ pthread_cancel(di->ring.thread_id);
pthread_cond_destroy(&di->ring.cond);
pthread_mutex_destroy(&di->ring.mutex);
- di->ring.ring = NULL;
+ free(di->ring.elems);
+ di->ring.elems = NULL;
}
}
@@ -743,18 +744,17 @@ _start_ring(struct ulogd_pluginstance *upi)
return 0;
/* allocate */
- di->ring.ring = calloc(di->ring.size, sizeof(char) * di->ring.length);
- if (di->ring.ring == NULL)
+ di->ring.elems = calloc(di->ring.size, di->ring.length);
+ if (di->ring.elems == NULL)
return -1;
- di->ring.wr_place = di->ring.ring;
+ di->ring.wr_idx = di->ring.rd_idx = di->ring.used = 0;
ulogd_log(ULOGD_NOTICE,
- "Allocating %d elements of size %d for ring\n",
+ "Allocating %" PRIu32 " elements of size %" PRIu32 " for ring\n",
di->ring.size, di->ring.length);
/* init start of query for each element */
for(i = 0; i < di->ring.size; i++)
- strcpy(di->ring.ring + di->ring.length * i + 1,
- di->stmt);
+ strcpy(_get_ring_elem(&di->ring, i), di->stmt);
/* init cond & mutex */
ret = pthread_cond_init(&di->ring.cond, NULL);
@@ -765,7 +765,7 @@ _start_ring(struct ulogd_pluginstance *upi)
goto cond_error;
/* create thread */
- ret = pthread_create(&di->db_thread_id, NULL, _process_ring, upi);
+ ret = pthread_create(&di->ring.thread_id, NULL, _process_ring, upi);
if (ret != 0)
goto mutex_error;
@@ -776,66 +776,78 @@ mutex_error:
cond_error:
pthread_cond_destroy(&di->ring.cond);
alloc_error:
- free(di->ring.ring);
+ free(di->ring.elems);
return -1;
}
static int
-_add_to_ring(struct ulogd_pluginstance *upi, struct db_instance *di)
+_add_to_ring(struct ulogd_pluginstance *upi)
{
- if (*di->ring.wr_place == RING_QUERY_READY) {
- if (di->ring.full == 0) {
+ struct db_instance *di = (struct db_instance *) &upi->private;
+
+ if (di->ring.used == di->ring.size) {
+ if (!di->ring.full) {
ulogd_log(ULOGD_ERROR, "No place left in ring\n");
di->ring.full = 1;
}
return ULOGD_IRET_OK;
- } else if (di->ring.full) {
+ }
+
+ if (di->ring.full) {
ulogd_log(ULOGD_NOTICE, "Recovered some place in ring\n");
di->ring.full = 0;
}
- _bind_sql_stmt(upi, di->ring.wr_place + 1);
- *di->ring.wr_place = RING_QUERY_READY;
+
+ _bind_sql_stmt(upi, _get_ring_elem(&di->ring, di->ring.wr_idx));
+ _incr_ring_used(&di->ring, 1);
+
pthread_cond_signal(&di->ring.cond);
- di->ring.wr_item ++;
- di->ring.wr_place += di->ring.length;
- if (di->ring.wr_item == di->ring.size) {
- di->ring.wr_item = 0;
- di->ring.wr_place = di->ring.ring;
- }
return ULOGD_IRET_OK;
}
static void *
-_process_ring(void *gdi)
+_process_ring(void *arg)
{
- struct ulogd_pluginstance *upi = gdi;
+ struct ulogd_pluginstance *upi = arg;
struct db_instance *di = (struct db_instance *) &upi->private;
- char *wr_place;
- wr_place = di->ring.ring;
pthread_mutex_lock(&di->ring.mutex);
while(1) {
/* wait cond */
pthread_cond_wait(&di->ring.cond, &di->ring.mutex);
- while (*wr_place == RING_QUERY_READY) {
- if (di->driver->execute(upi, wr_place + 1,
- strlen(wr_place + 1)) < 0) {
+ while (di->ring.used > 0) {
+ char *stmt = _get_ring_elem(&di->ring, di->ring.rd_idx);
+
+ if (di->driver->execute(upi, stmt,
+ strlen(stmt)) < 0) {
+
di->driver->close_db(upi);
while (di->driver->open_db(upi) < 0)
sleep(1);
/* try to re run query */
continue;
}
- *wr_place = RING_NO_QUERY;
- di->ring.rd_item++;
- if (di->ring.rd_item == di->ring.size) {
- di->ring.rd_item = 0;
- wr_place = di->ring.ring;
- } else
- wr_place += di->ring.length;
+
+ _incr_ring_used(&di->ring, -1);
}
}
return NULL;
}
+
+static char *
+_get_ring_elem(struct db_stmt_ring *r, uint32_t i)
+{
+ return &r->elems[i * r->length];
+}
+
+static void
+_incr_ring_used(struct db_stmt_ring *r, int incr)
+{
+ uint32_t *idx = incr > 0 ? &r->wr_idx : &r->rd_idx;
+
+ *idx = (*idx + 1) % r->size;
+
+ r->used += incr;
+}
--
2.35.1
next prev parent reply other threads:[~2022-11-29 21:58 UTC|newest]
Thread overview: 40+ messages / expand[flat|nested] mbox.gz Atom feed top
2022-11-29 21:47 [PATCH ulogd2 v2 v2 00/34] Refactor of the DB output plug-ins Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 01/34] ulogd: fix parse-error check Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 02/34] filter: fix buffer sizes in filter plug-ins Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 03/34] output: JSON: remove incorrect config value check Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 04/34] db: fix back-log capacity checks Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 05/34] build: add checks to configure.ac Jeremy Sowden
2022-11-30 10:04 ` Jan Engelhardt
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 06/34] src: remove some trailing white space Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 07/34] src: remove zero-valued config-key fields Jeremy Sowden
2022-11-30 10:21 ` Jan Engelhardt
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 08/34] src: parenthesize config-entry macro arguments Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 09/34] src: define constructors and destructors consistently Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 10/34] src: remove `TIME_ERR` macro Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 11/34] src: remove superfluous casts Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 12/34] conffile: replace malloc+strcpy with strdup Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 13/34] output: remove zero-initialized `struct ulogd_plugin` members Jeremy Sowden
2022-11-30 10:26 ` Jan Engelhardt
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 14/34] output: de-duplicate allocation of input keys Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 15/34] db: reorganize source Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 16/34] db: use consistent integer return values to indicate errors Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 17/34] db: change return type of two functions to `void` Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 18/34] db: open-code `_loop_reconnect_db` Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 19/34] db: improve calculation of sql statement length Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 20/34] db: refactor configuration Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 21/34] db: refactor ring-buffer initialization Jeremy Sowden
2022-11-29 21:47 ` Jeremy Sowden [this message]
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 23/34] db: refactor backlog Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 24/34] db: use `struct db_stmt` objects more widely Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 25/34] db: synchronize access to ring-buffer Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 26/34] db: avoid cancelling ring-buffer thread Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 27/34] db, IP2BIN: defer formatting of raw strings Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 28/34] db: add prep & exec support Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 29/34] output: mysql: " Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 30/34] output: pgsql: remove a couple of struct members Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 31/34] output: pgsql: remove variable-length arrays Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 32/34] output: pgsql: tidy up `open_db_pgsql` and fix memory leak Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 33/34] output: pgsql: add prep & exec support Jeremy Sowden
2022-11-29 21:47 ` [PATCH ulogd2 v2 v2 34/34] output: sqlite3: reimplement using the common DB API Jeremy Sowden
2022-11-30 10:27 ` [PATCH ulogd2 v2 v2 00/34] Refactor of the DB output plug-ins Pablo Neira Ayuso
2022-11-30 16:03 ` Jeremy Sowden
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20221129214749.247878-23-jeremy@azazel.net \
--to=jeremy@azazel.net \
--cc=netfilter-devel@vger.kernel.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).