All of lore.kernel.org
 help / color / mirror / Atom feed
* [LTP] [PATCH 0/6] Locally distributed work API
@ 2022-09-27 16:14 Richard Palethorpe via ltp
  2022-09-27 16:14 ` [LTP] [PATCH 1/6] api/epoll: Add safe epoll functions Richard Palethorpe via ltp
                   ` (5 more replies)
  0 siblings, 6 replies; 9+ messages in thread
From: Richard Palethorpe via ltp @ 2022-09-27 16:14 UTC (permalink / raw)
  To: ltp; +Cc: Richard Palethorpe

Hello,

Sorry for such a large complicated patch-set. This is essentially an
RFC, but I haven't marked it as such because some parts of it may be
ready to merge.

This is motivated by patterns I noticed both in the read_all test and
work we have been doing on the LTP parallel test executor
(LTX). Currently LTX exists outside the main LTP tree and doesn't use
the LTP library. However it would be nice to share code between some
types of test and LTX.

We have various ways of communicating and synchronising between
processes. This adds another which is particularly suited to
distributing work across multiple processes. Including work that
completes unevenly and is likely to fail.

Currently these are local processes, but LTX runs as a remote process
on a SUT. It takes commands from and returns results to a test
schedular on another machine. The interface should be relatively easy
to extend to support this scenario.

Still TODO:
* Documenation (other than commit messages)
* Integrate LTX into LTP API
* Implement variations on the read_all test (e.g. splice_all)

Richard Palethorpe (6):
  api/epoll: Add safe epoll functions
  api/evloop: Add helpers for creating an event loop
  api/state_machine: Add validating state machines
  api/channel: Add channel abstraction for message passing
  api/worker: Add library for distributing work over multiple procs
  read_all: Migrate to the worker lib

 include/tst_channel.h                   |  97 ++++
 include/tst_epoll.h                     |  36 ++
 include/tst_evloop.h                    |  32 ++
 include/tst_state_machine.h             |  50 ++
 include/tst_worker.h                    |  62 +++
 lib/tst_channel.c                       | 410 +++++++++++++++++
 lib/tst_epoll.c                         |  81 ++++
 lib/tst_evloop.c                        | 102 ++++
 lib/tst_state_machine.c                 |  98 ++++
 lib/tst_worker.c                        | 285 ++++++++++++
 testcases/kernel/fs/read_all/read_all.c | 588 +++++++-----------------
 11 files changed, 1424 insertions(+), 417 deletions(-)
 create mode 100644 include/tst_channel.h
 create mode 100644 include/tst_epoll.h
 create mode 100644 include/tst_evloop.h
 create mode 100644 include/tst_state_machine.h
 create mode 100644 include/tst_worker.h
 create mode 100644 lib/tst_channel.c
 create mode 100644 lib/tst_epoll.c
 create mode 100644 lib/tst_evloop.c
 create mode 100644 lib/tst_state_machine.c
 create mode 100644 lib/tst_worker.c

-- 
2.36.1


-- 
Mailing list info: https://lists.linux.it/listinfo/ltp

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [LTP] [PATCH 1/6] api/epoll: Add safe epoll functions
  2022-09-27 16:14 [LTP] [PATCH 0/6] Locally distributed work API Richard Palethorpe via ltp
@ 2022-09-27 16:14 ` Richard Palethorpe via ltp
  2022-09-27 16:14 ` [LTP] [PATCH 2/6] api/evloop: Add helpers for creating an event loop Richard Palethorpe via ltp
                   ` (4 subsequent siblings)
  5 siblings, 0 replies; 9+ messages in thread
From: Richard Palethorpe via ltp @ 2022-09-27 16:14 UTC (permalink / raw)
  To: ltp; +Cc: Richard Palethorpe

Probably safe to use it over (p)select/(p)poll now.

Signed-off-by: Richard Palethorpe <rpalethorpe@suse.com>
---
 include/tst_epoll.h | 36 ++++++++++++++++++++
 lib/tst_epoll.c     | 81 +++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 117 insertions(+)
 create mode 100644 include/tst_epoll.h
 create mode 100644 lib/tst_epoll.c

diff --git a/include/tst_epoll.h b/include/tst_epoll.h
new file mode 100644
index 000000000..c5ffc07e3
--- /dev/null
+++ b/include/tst_epoll.h
@@ -0,0 +1,36 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Copyright (c) 2022 SUSE LLC <rpalethorpe@suse.com>
+ */
+
+#include <sys/epoll.h>
+
+#ifndef TST_EPOLL_H
+#define TST_EPOLL_H
+
+typedef int (*tst_on_epoll_fn)(void *, uint32_t);
+struct tst_epoll_event_data {
+	tst_on_epoll_fn on_epoll;
+	void *self;
+};
+
+int safe_epoll_create1(const char *const file, const int lineno,
+		       const int flags);
+
+#define SAFE_EPOLL_CREATE1(flags) \
+	safe_epoll_create1(__FILE__, __LINE__, (flags))
+
+int safe_epoll_ctl(const char *const file, const int lineno,
+		   int epfd, int op, int fd, struct epoll_event *ev);
+
+#define SAFE_EPOLL_CTL(epfd, op, fd, ev) \
+	safe_epoll_ctl(__FILE__, __LINE__, epfd, op, fd, ev)
+
+int safe_epoll_wait(const char *const file, const int lineno,
+		    int epfd, struct epoll_event *events,
+		    int maxevents, int timeout);
+
+#define SAFE_EPOLL_WAIT(epfd, events, maxevents, timeout)\
+	safe_epoll_wait(__FILE__, __LINE__, epfd, events, maxevents, timeout)
+
+#endif
diff --git a/lib/tst_epoll.c b/lib/tst_epoll.c
new file mode 100644
index 000000000..556b3bdab
--- /dev/null
+++ b/lib/tst_epoll.c
@@ -0,0 +1,81 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Copyright (c) 2022 SUSE LLC <rpalethorpe@suse.com>
+ */
+#define _GNU_SOURCE
+#define TST_NO_DEFAULT_MAIN
+
+#include "tst_test.h"
+#include "tst_epoll.h"
+
+int safe_epoll_create1(const char *const file, const int lineno,
+		       const int flags)
+{
+	const char *flags_str;
+	int ret = epoll_create1(flags);
+
+	switch (flags) {
+	case EPOLL_CLOEXEC:
+		flags_str = "EPOLL_CLOEXEC";
+		break;
+	case 0:
+		flags_str = "";
+		break;
+	default:
+		flags_str = "???";
+	}
+
+	if (ret == -1) {
+		tst_brk_(file, lineno,
+			 TBROK | TERRNO, "epoll_create1(%s)", flags_str);
+	}
+
+	return ret;
+}
+
+int safe_epoll_ctl(const char *const file, const int lineno,
+		   int epfd, int op, int fd, struct epoll_event *ev)
+{
+	const char *op_str;
+	int ret;
+
+	switch (op) {
+	case EPOLL_CTL_ADD:
+		op_str = "EPOLL_CTL_ADD";
+		break;
+	case EPOLL_CTL_DEL:
+		op_str = "EPOLL_CTL_DEL";
+		break;
+	case EPOLL_CTL_MOD:
+		op_str = "EPOLL_CTL_MOD";
+		break;
+	default:
+		op_str = "???";
+	}
+
+	ret = epoll_ctl(epfd, op, fd, ev);
+
+	if (ret == -1) {
+		tst_brk_(file, lineno,
+			 TBROK | TERRNO,
+			 "epoll_ctl(%d, %s, %d, ...", epfd, op_str, fd);
+	}
+
+	return ret;
+}
+
+int safe_epoll_wait(const char *const file, const int lineno,
+		    int epfd, struct epoll_event *events,
+		    int maxevents, int timeout)
+{
+	int ret = epoll_wait(epfd, events, maxevents, timeout);
+
+	if (ret == -1) {
+		tst_brk_(file, lineno, TBROK | TERRNO,
+			 "epoll_wait(%d, ..., %d, %d)",
+			 epfd, maxevents, timeout);
+	}
+
+	return ret;
+}
+
-- 
2.36.1


-- 
Mailing list info: https://lists.linux.it/listinfo/ltp

^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [LTP] [PATCH 2/6] api/evloop: Add helpers for creating an event loop
  2022-09-27 16:14 [LTP] [PATCH 0/6] Locally distributed work API Richard Palethorpe via ltp
  2022-09-27 16:14 ` [LTP] [PATCH 1/6] api/epoll: Add safe epoll functions Richard Palethorpe via ltp
@ 2022-09-27 16:14 ` Richard Palethorpe via ltp
  2022-09-30 13:36   ` Petr Vorel
  2022-09-27 16:14 ` [LTP] [PATCH 3/6] api/state_machine: Add validating state machines Richard Palethorpe via ltp
                   ` (3 subsequent siblings)
  5 siblings, 1 reply; 9+ messages in thread
From: Richard Palethorpe via ltp @ 2022-09-27 16:14 UTC (permalink / raw)
  To: ltp; +Cc: Richard Palethorpe

Puts some of the boiler plate for creating an "event loop", into an
API. Useful for asynchronous or evented I/O.

This uses epoll and signalfd which are very widely supported on
Linux. I also think epoll is a better interface than ppoll and
pselect.

The tst_epoll_event_data struct (added in the previous commit) can be
used to add callbacks on particular FD events.

There is also a special callback for the signlfd and on_cont which is
called at the end of each loop. Returning 0 from these will cause the
loop to exit.

Signed-off-by: Richard Palethorpe <rpalethorpe@suse.com>
---
 include/tst_evloop.h |  32 ++++++++++++++
 lib/tst_evloop.c     | 102 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 134 insertions(+)
 create mode 100644 include/tst_evloop.h
 create mode 100644 lib/tst_evloop.c

diff --git a/include/tst_evloop.h b/include/tst_evloop.h
new file mode 100644
index 000000000..bdab2d6f7
--- /dev/null
+++ b/include/tst_evloop.h
@@ -0,0 +1,32 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Copyright (c) 2022 SUSE LLC <rpalethorpe@suse.com>
+ */
+
+#include "inttypes.h"
+#include "sys/signalfd.h"
+
+#include "tst_epoll.h"
+
+#ifndef TST_EVLOOP_H
+#define TST_EVLOOP_H
+
+struct tst_evloop {
+	int epollfd;
+	int signalfd;
+	struct tst_epoll_event_data signalfd_evdata;
+	int timeout;
+
+	void *priv;
+	int (*on_cont)(struct tst_evloop *self);
+	int (*on_signal)(struct tst_evloop *self, struct signalfd_siginfo *si);
+};
+
+void tst_evloop_setup(struct tst_evloop *self);
+void tst_evloop_run(struct tst_evloop *self);
+void tst_evloop_add(struct tst_evloop *self,
+		    struct tst_epoll_event_data *evdata,
+		    int fd, uint32_t events);
+void tst_evloop_cleanup(struct tst_evloop *self);
+
+#endif
diff --git a/lib/tst_evloop.c b/lib/tst_evloop.c
new file mode 100644
index 000000000..66d74ce58
--- /dev/null
+++ b/lib/tst_evloop.c
@@ -0,0 +1,102 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Copyright (c) 2022 SUSE LLC <rpalethorpe@suse.com>
+ */
+#define _GNU_SOURCE
+#define TST_NO_DEFAULT_MAIN
+
+#include "tst_test.h"
+#include "tst_evloop.h"
+
+static void handle_epoll_event(struct epoll_event *event)
+{
+	struct tst_epoll_event_data *data = event->data.ptr;
+
+	data->on_epoll(data->self, event->events);
+}
+
+static int evloop_on_signal(struct tst_evloop *self, uint32_t events)
+{
+	int i, n;
+	struct signalfd_siginfo si[16];
+
+	if (events ^ EPOLLIN) {
+		tst_brk(TBROK, "Unexpected event on signalfd");
+		return 1;
+	}
+
+	n = SAFE_READ(0, self->signalfd, si, sizeof(si));
+
+	if (!n)
+		tst_brk(TBROK, "Got EPOLLIN on signalfd, but no signal read from fd");
+
+	for (i = 0; i < n/(int)sizeof(si[0]); i++) {
+		if (!self->on_signal(self, si + i))
+			return 0;
+	}
+
+	return 1;
+}
+
+void tst_evloop_add(struct tst_evloop *self,
+		       struct tst_epoll_event_data *evdata,
+		       int fd, uint32_t events)
+{
+	struct epoll_event ev = {
+		.events = events,
+		.data.ptr = evdata,
+	};
+
+	SAFE_EPOLL_CTL(self->epollfd, EPOLL_CTL_ADD, fd, &ev);
+}
+
+void tst_evloop_setup(struct tst_evloop *self)
+{
+
+	sigset_t mask;
+
+	self->epollfd = SAFE_EPOLL_CREATE1(EPOLL_CLOEXEC);
+
+	sigfillset(&mask);
+	SAFE_SIGPROCMASK(SIG_BLOCK, &mask, NULL);
+	self->signalfd = signalfd(-1, &mask, SFD_CLOEXEC);
+
+	self->signalfd_evdata.self = self;
+	self->signalfd_evdata.on_epoll = (tst_on_epoll_fn)evloop_on_signal;
+
+	tst_evloop_add(self, &self->signalfd_evdata, self->signalfd, EPOLLIN);
+}
+
+void tst_evloop_run(struct tst_evloop *self)
+{
+	static int saturated_warn;
+	const int maxevents = 128;
+	struct epoll_event events[maxevents];
+
+	for (;;) {
+		const int ev_num = SAFE_EPOLL_WAIT(self->epollfd, events,
+						   maxevents, self->timeout);
+
+		for (int i = 0; i < ev_num; i++)
+			handle_epoll_event(events + i);
+
+		if (ev_num == maxevents) {
+			if (!saturated_warn)
+				tst_res(TINFO, "Event loop saturated");
+
+			saturated_warn = 1;
+			continue;
+		}
+
+		if (!self->on_cont(self))
+			break;
+	}
+}
+
+void tst_evloop_cleanup(struct tst_evloop *self)
+{
+	if (self->epollfd > 0)
+		SAFE_CLOSE(self->epollfd);
+	if (self->signalfd > 0)
+		SAFE_CLOSE(self->signalfd);
+}
-- 
2.36.1


-- 
Mailing list info: https://lists.linux.it/listinfo/ltp

^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [LTP] [PATCH 3/6] api/state_machine: Add validating state machines
  2022-09-27 16:14 [LTP] [PATCH 0/6] Locally distributed work API Richard Palethorpe via ltp
  2022-09-27 16:14 ` [LTP] [PATCH 1/6] api/epoll: Add safe epoll functions Richard Palethorpe via ltp
  2022-09-27 16:14 ` [LTP] [PATCH 2/6] api/evloop: Add helpers for creating an event loop Richard Palethorpe via ltp
@ 2022-09-27 16:14 ` Richard Palethorpe via ltp
  2022-09-27 16:14 ` [LTP] [PATCH 4/6] api/channel: Add channel abstraction for message passing Richard Palethorpe via ltp
                   ` (2 subsequent siblings)
  5 siblings, 0 replies; 9+ messages in thread
From: Richard Palethorpe via ltp @ 2022-09-27 16:14 UTC (permalink / raw)
  To: ltp; +Cc: Richard Palethorpe

Allows creating state machines where the state transitions are
validated. Also one can assert which states a line of code expects to
be executed in.

This is useful for verifying implicit or explicit state machines used
to process I/O events or data.

When a state violation is found a trace of previous state transitions
is printed.

Signed-off-by: Richard Palethorpe <rpalethorpe@suse.com>
---
 include/tst_state_machine.h | 50 +++++++++++++++++++
 lib/tst_state_machine.c     | 98 +++++++++++++++++++++++++++++++++++++
 2 files changed, 148 insertions(+)
 create mode 100644 include/tst_state_machine.h
 create mode 100644 lib/tst_state_machine.c

diff --git a/include/tst_state_machine.h b/include/tst_state_machine.h
new file mode 100644
index 000000000..2e86535c6
--- /dev/null
+++ b/include/tst_state_machine.h
@@ -0,0 +1,50 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Copyright (c) 2022 SUSE LLC <rpalethorpe@suse.com>
+ */
+
+#include "inttypes.h"
+
+#ifndef TST_STATE_MACHINE_H
+#define TST_STATE_MACHINE_H
+
+#define TST_STATE_ANY (~(uint64_t)0)
+
+struct tst_state_matrix {
+	char *names[64];
+	uint64_t states[64];
+};
+
+struct tst_state_trace {
+	const char *file;
+	int line;
+	unsigned from;
+	unsigned to;
+};
+
+struct tst_state_mach {
+	const struct tst_state_matrix *mat;
+
+	unsigned top;
+	struct tst_state_trace ring[8];
+};
+
+#define TST_STATE_SET(mach, to) \
+	tst_state_set(__FILE__, __LINE__, mach, to)
+
+void tst_state_set(const char *const file, const int lineno,
+		  struct tst_state_mach *mach, unsigned to);
+
+#define TST_STATE_EXP(mach, mask) \
+	tst_state_exp(__FILE__, __LINE__, mach, mask)
+
+void tst_state_exp(const char *const file, const int lineno,
+		   struct tst_state_mach *mach, uint64_t mask);
+
+#define TST_STATE_GET(mach, mask) \
+	tst_state_get(__FILE__, __LINE__, mach, mask)
+
+unsigned tst_state_get(const char *const file, const int lineno,
+		       struct tst_state_mach *mach, uint64_t mask);
+
+#endif
diff --git a/lib/tst_state_machine.c b/lib/tst_state_machine.c
new file mode 100644
index 000000000..cb8ed79c4
--- /dev/null
+++ b/lib/tst_state_machine.c
@@ -0,0 +1,98 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Copyright (c) 2022 SUSE LLC <rpalethorpe@suse.com>
+ */
+
+#define _GNU_SOURCE
+#define TST_NO_DEFAULT_MAIN
+
+#include <unistd.h>
+#include "stdio.h"
+
+#include "tst_test.h"
+#include "tst_state_machine.h"
+
+static const char *state_trace(struct tst_state_mach *mach)
+{
+	static char buf[4096];
+	char *const *const names = mach->mat->names;
+	size_t off = 1;
+	unsigned c = 0, i;
+
+	buf[0] = '\n';
+
+	for (i = mach->top; c < 8; c++) {
+		const struct tst_state_trace *t = mach->ring + i;
+
+		if (!t->file)
+			break;
+
+		if (off >= sizeof(buf))
+			break;
+
+		off += snprintf(buf + off,
+				sizeof(buf) - off - 1,
+				"\t%s:%d %s (%u) -> %s (%u)\n",
+				t->file, t->line,
+				names[t->from], t->from,
+				names[t->to], t->to);
+
+		if (!i)
+			i = 7;
+		else
+			i--;
+	}
+
+	return buf;
+}
+
+static void state_trace_set(const char *const file, const int lineno,
+			    struct tst_state_trace *trace, unsigned from, unsigned to)
+{
+	trace->file = file;
+	trace->line = lineno;
+	trace->from = from;
+	trace->to = to;
+}
+
+void tst_state_set(const char *const file, const int lineno,
+		   struct tst_state_mach *mach, unsigned to)
+{
+	char *const *const names = mach->mat->names;
+	const unsigned cur = mach->ring[mach->top].to;
+
+	if (cur > 63)
+		tst_brk_(file, lineno, TBROK, "Attempting to transition from an invalid state: %u: %s", cur, state_trace(mach));
+
+	if (to > 63)
+		tst_brk_(file, lineno, TBROK, "Attempting to transition to invalid state: %u: %s", to, state_trace(mach));
+
+	if (!(mach->mat->states[cur] & (1 << to)))
+		tst_brk_(file, lineno, TBROK, "Invalid transition: %s (%u) -> %s (%u): %s", names[cur], cur, names[to], to, state_trace(mach));
+
+	if (++(mach->top) == 8)
+		mach->top = 0;
+
+	state_trace_set(file, lineno, &mach->ring[mach->top], cur, to);
+}
+
+unsigned tst_state_get(const char *const file, const int lineno,
+		       struct tst_state_mach *mach, uint64_t mask)
+{
+	char *const *const names = mach->mat->names;
+	const unsigned cur = mach->ring[mach->top].to;
+
+	if (mask & (1 << cur))
+		return cur;
+
+	tst_brk_(file, lineno, TBROK, "Should not reach here while in state: %s (%u): %s",
+		 names[cur], cur, state_trace(mach));
+
+	return cur;
+}
+
+void tst_state_exp(const char *const file, const int lineno,
+		   struct tst_state_mach *mach, uint64_t mask)
+{
+	tst_state_get(file, lineno, mach, mask);
+}
-- 
2.36.1


-- 
Mailing list info: https://lists.linux.it/listinfo/ltp

^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [LTP] [PATCH 4/6] api/channel: Add channel abstraction for message passing
  2022-09-27 16:14 [LTP] [PATCH 0/6] Locally distributed work API Richard Palethorpe via ltp
                   ` (2 preceding siblings ...)
  2022-09-27 16:14 ` [LTP] [PATCH 3/6] api/state_machine: Add validating state machines Richard Palethorpe via ltp
@ 2022-09-27 16:14 ` Richard Palethorpe via ltp
  2022-09-30  9:17   ` Petr Vorel
  2022-09-27 16:14 ` [LTP] [PATCH 5/6] api/worker: Add library for distributing work over multiple procs Richard Palethorpe via ltp
  2022-09-27 16:14 ` [LTP] [PATCH 6/6] read_all: Migrate to the worker lib Richard Palethorpe via ltp
  5 siblings, 1 reply; 9+ messages in thread
From: Richard Palethorpe via ltp @ 2022-09-27 16:14 UTC (permalink / raw)
  To: ltp; +Cc: Richard Palethorpe

Adds an API for the channel pattern and an implementation based on
pipes.

A channel supports synchronous and asynchronous modes. When a message
is sent synchronously it's guaranteed to have been sent and ack'ed by
the time send returns. Similar for receiving.

In asynchronous mode, send returns immediately and a callback must be
registered to handle the response. This requires that the channel is
registered with an event loop. Again it's similar for receiving.

Messages are binary safe strings with a specified size.

The use of pipes instead of shared memory is motivated by the idea
that the same interface can be used for remote or isolated
processes.

Signed-off-by: Richard Palethorpe <rpalethorpe@suse.com>
---
 include/tst_channel.h |  97 ++++++++++
 lib/tst_channel.c     | 410 ++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 507 insertions(+)
 create mode 100644 include/tst_channel.h
 create mode 100644 lib/tst_channel.c

diff --git a/include/tst_channel.h b/include/tst_channel.h
new file mode 100644
index 000000000..c572d213b
--- /dev/null
+++ b/include/tst_channel.h
@@ -0,0 +1,97 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Copyright (c) 2022 SUSE LLC <rpalethorpe@suse.com>
+ */
+
+#include "tst_evloop.h"
+#include "tst_state_machine.h"
+
+#ifndef TST_CHANNEL_H
+#define TST_CHANNEL_H
+
+enum tst_chan_mode {
+	CHM_SYNC,
+	CHM_ASYNC,
+};
+
+enum tst_chan_state {
+	CHS_CLOSED,
+	CHS_READY,
+	CHS_RECV,
+	CHS_SEND
+};
+
+struct tst_chan_buf {
+	char *ptr;
+	size_t len;
+	size_t off;
+};
+
+struct tst_chan;
+struct tst_chan_ops {
+	void (*const close)(struct tst_chan *self);
+
+	void (*const send)(struct tst_chan *self);
+	void (*const recv)(struct tst_chan *self);
+
+	int (*const on_epoll)(struct tst_chan *self, uint32_t events);
+};
+
+struct tst_chan {
+	const struct tst_chan_ops *ops;
+	void *priv;
+
+	enum tst_chan_mode mode;
+	struct tst_state_mach mach;
+	long long last_seen;
+
+	struct tst_epoll_event_data evdata;
+
+	struct tst_chan_buf in;
+	struct tst_chan_buf out;
+
+	void *user_priv;
+	void (*on_send)(struct tst_chan *self, char *sent, size_t len);
+	void (*on_recv)(struct tst_chan *self, char *recv, size_t len);
+};
+
+enum tst_pchan_msg_kind {
+	PCMK_ACK = 1,
+	PCMK_DATA
+};
+
+struct tst_pchan_envelope {
+	unsigned int kind;
+	unsigned int len;
+} __attribute__((packed));
+
+enum tst_pchan_state {
+	PCS_IDLE,
+	PCS_RECV_DATA,
+	PCS_SEND_ACK,
+	PCS_SEND_DATA,
+	PCS_RECV_ACK,
+};
+
+struct tst_pchan {
+	int infd;
+	int outfd;
+
+	struct tst_pchan_envelope envelope;
+	struct tst_chan_buf envelope_buf;
+
+	unsigned int out_full:1;
+
+	struct tst_state_mach mach;
+};
+
+void tst_chan_send(struct tst_chan *self, char *msg, size_t len);
+void tst_chan_recv(struct tst_chan *self, char *msg, size_t len);
+void tst_chan_seen(struct tst_chan *self);
+long long tst_chan_elapsed(struct tst_chan *self);
+
+void tst_pchan_open(struct tst_chan *self, int infd, int outfd,
+		    struct tst_evloop *const evloop);
+void tst_pchan_close(struct tst_chan *self);
+
+#endif
diff --git a/lib/tst_channel.c b/lib/tst_channel.c
new file mode 100644
index 000000000..6d96cccab
--- /dev/null
+++ b/lib/tst_channel.c
@@ -0,0 +1,410 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Copyright (c) 2022 SUSE LLC <rpalethorpe@suse.com>
+ */
+#define _GNU_SOURCE
+#define TST_NO_DEFAULT_MAIN
+
+#include "tst_timer.h"
+#include "tst_safe_clocks.h"
+#include "tst_channel.h"
+
+static struct tst_state_matrix chan_state_mat = {
+	.names = {
+		[CHS_CLOSED] = "closed",
+		[CHS_READY] = "ready",
+		[CHS_RECV] = "receiving",
+		[CHS_SEND] = "sending"
+	},
+	.states = {
+		[CHS_CLOSED] = 1 << CHS_READY,
+		[CHS_READY] = 1 << CHS_CLOSED | 1 << CHS_RECV | 1 << CHS_SEND,
+		[CHS_RECV] = 1 << CHS_CLOSED | 1 << CHS_READY,
+		[CHS_SEND] = 1 << CHS_CLOSED | 1 << CHS_READY,
+	}
+};
+
+static struct tst_state_matrix pchan_state_mat = {
+	.names = {
+		[PCS_IDLE] = "idle",
+		[PCS_RECV_DATA] = "receiving data",
+		[PCS_SEND_ACK] = "sending ack",
+		[PCS_SEND_DATA]= "sending data",
+		[PCS_RECV_ACK] = "receving ack",
+	},
+	.states = {
+		[PCS_IDLE] = 1 << PCS_IDLE | 1 << PCS_RECV_DATA | 1 << PCS_SEND_DATA,
+		[PCS_RECV_DATA] = 1 << PCS_SEND_ACK | 1 << PCS_IDLE,
+		[PCS_SEND_ACK] = 1 << PCS_IDLE,
+		[PCS_SEND_DATA]= 1 << PCS_RECV_ACK | 1 << PCS_IDLE,
+		[PCS_RECV_ACK] = 1 << PCS_IDLE,
+	},
+};
+
+static size_t chan_buf_more(struct tst_chan_buf *self)
+{
+	return self->off < self->len;
+}
+
+void tst_chan_send(struct tst_chan *self, char *msg, size_t len)
+{
+	if (self->mode == CHM_SYNC)
+		goto send;
+
+	if (!self->on_send) {
+		tst_brk(TBROK, "In async mode, but on_sent cb not set");
+		return;
+	}
+
+	if (!self->evdata.on_epoll) {
+		tst_brk(TBROK, "In async mode, but not added to epoll");
+		return;
+	}
+
+send:
+	self->out.ptr = msg;
+	self->out.len = len;
+	self->out.off = 0;
+
+	TST_STATE_SET(&self->mach, CHS_SEND);
+	self->ops->send(self);
+}
+
+void tst_chan_recv(struct tst_chan *self, char *msg, size_t len)
+{
+	if (self->mode == CHM_SYNC)
+		goto recv;
+
+	if (!self->on_recv) {
+		tst_brk(TBROK, "In async mode, but on_recv cb not set");
+		return;
+	}
+
+	if (!self->evdata.on_epoll) {
+		tst_brk(TBROK, "In async mode, but not added to epoll");
+		return;
+	}
+
+recv:
+	self->in.ptr = msg;
+	self->in.len = len;
+	self->in.off = 0;
+
+	TST_STATE_SET(&self->mach, CHS_RECV);
+	self->ops->recv(self);
+}
+
+void tst_chan_seen(struct tst_chan *self)
+{
+	struct timespec now;
+
+	SAFE_CLOCK_GETTIME(CLOCK_MONOTONIC_RAW, &now);
+	self->last_seen = tst_timespec_to_us(now);
+}
+
+long long tst_chan_elapsed(struct tst_chan *self)
+{
+	struct timespec now;
+
+	SAFE_CLOCK_GETTIME(CLOCK_MONOTONIC_RAW, &now);
+
+	return tst_timespec_to_us(now) - self->last_seen;
+}
+
+static void pipe_chan_write(struct tst_chan *self, struct tst_chan_buf *buf)
+{
+	ssize_t ret;
+	size_t written = buf->off;
+	struct tst_pchan *priv = self->priv;
+
+	while (written < buf->len) {
+		ret = write(priv->outfd,
+			    buf->ptr + written, buf->len - written);
+
+		if (ret == -1) {
+			if (self->mode == CHM_ASYNC && errno == EAGAIN) {
+				priv->out_full = 1;
+				break;
+			}
+
+			if (errno == EINTR)
+				continue;
+
+			tst_brk(TBROK | TERRNO, "write");
+		}
+
+		written += ret;
+	}
+
+	buf->off = written;
+}
+
+static void pipe_chan_read(struct tst_chan *self, struct tst_chan_buf *buf)
+{
+	ssize_t ret;
+	size_t recved = buf->off;
+	struct tst_pchan *priv = self->priv;
+
+	while (recved < buf->len) {
+		ret = read(priv->infd,
+			   buf->ptr + recved, buf->len - recved);
+
+		if (!ret)
+			tst_brk(TBROK, "PID %d: read(%d) = EOF", getpid(), priv->infd);
+
+		if (ret == -1) {
+			if (self->mode == CHM_ASYNC && errno == EAGAIN)
+				break;
+
+			if (errno == EINTR)
+				continue;
+
+			tst_brk(TBROK | TERRNO, "read");
+		}
+
+		recved += ret;
+	}
+
+	buf->off = recved;
+}
+
+static void pipe_chan_send(struct tst_chan *self)
+{
+	struct tst_pchan *priv = self->priv;
+	const enum tst_pchan_state state =
+		TST_STATE_GET(&priv->mach, 1 << PCS_IDLE | 1 << PCS_SEND_DATA | 1 << PCS_RECV_ACK);
+
+ 	TST_STATE_EXP(&self->mach, 1 << CHS_READY | 1 << CHS_SEND);
+
+	switch (state) {
+	case PCS_IDLE:
+		priv->envelope.kind = PCMK_DATA;
+		priv->envelope.len = self->out.len;
+		priv->envelope_buf.ptr = (char *)&priv->envelope;
+		priv->envelope_buf.len = sizeof(priv->envelope);
+		priv->envelope_buf.off = 0;
+
+		TST_STATE_SET(&priv->mach, PCS_SEND_DATA);
+		break;
+	case PCS_SEND_DATA:
+		break;
+	case PCS_RECV_ACK:
+		goto ack;
+	default:
+		break;
+	}
+
+	if (priv->out_full)
+		return;
+
+	pipe_chan_write(self, &priv->envelope_buf);
+	if (chan_buf_more(&priv->envelope_buf))
+		return;
+
+	pipe_chan_write(self, &self->out);
+	if (chan_buf_more(&self->out))
+		return;
+
+	TST_STATE_SET(&priv->mach, PCS_RECV_ACK);
+	priv->envelope_buf.off = 0;
+
+	if (self->mode == CHM_ASYNC)
+		return;
+ack:
+	pipe_chan_read(self, &priv->envelope_buf);
+
+	if (chan_buf_more(&priv->envelope_buf))
+		return;
+
+	if (priv->envelope.kind != PCMK_ACK || priv->envelope.len) {
+		tst_brk(TBROK, "Malformed ack");
+		return;
+	}
+
+	tst_chan_seen(self);
+	TST_STATE_SET(&priv->mach, PCS_IDLE);
+	TST_STATE_SET(&self->mach, CHS_READY);
+
+	if (self->on_send)
+		self->on_send(self, self->out.ptr, self->out.len);
+}
+
+static void pipe_chan_recv(struct tst_chan *self)
+{
+	struct tst_pchan *priv = self->priv;
+	const enum tst_pchan_state state =
+		TST_STATE_GET(&priv->mach, 1 << PCS_IDLE | 1 << PCS_RECV_DATA | 1 << PCS_SEND_ACK);
+
+	TST_STATE_EXP(&self->mach, 1 << CHS_READY | 1 << CHS_RECV);
+
+	switch (state) {
+	case PCS_IDLE:
+		priv->envelope_buf.off = 0;
+
+		TST_STATE_SET(&priv->mach, PCS_RECV_DATA);
+		break;
+	case PCS_RECV_DATA:
+		break;
+	case PCS_SEND_ACK:
+		goto ack;
+	default:
+		break;
+	}
+
+	pipe_chan_read(self, &priv->envelope_buf);
+	if (chan_buf_more(&priv->envelope_buf))
+		return;
+
+	if (priv->envelope.kind != PCMK_DATA) {
+		tst_brk(TBROK, "Expected data message, but got: %d", priv->envelope.kind);
+		return;
+	}
+
+	if (priv->envelope.len > self->in.len) {
+		tst_brk(TBROK, "Incoming message too large: %ul", priv->envelope.len);
+		return;
+	}
+
+	self->in.len = priv->envelope.len;
+	pipe_chan_read(self, &self->in);
+	if (chan_buf_more(&self->in))
+		return;
+
+	TST_STATE_SET(&priv->mach, PCS_SEND_ACK);
+	priv->envelope.kind = PCMK_ACK;
+	priv->envelope.len = 0;
+	priv->envelope_buf.off = 0;
+
+ack:
+	pipe_chan_write(self, &priv->envelope_buf);
+	if (chan_buf_more(&priv->envelope_buf))
+		return;
+
+	TST_STATE_SET(&priv->mach, PCS_IDLE);
+	TST_STATE_SET(&self->mach, CHS_READY);
+
+	if (self->on_recv)
+		self->on_recv(self, self->in.ptr, self->in.len);
+}
+
+static int pipe_chan_on_epoll(struct tst_chan *self, uint32_t events)
+{
+	struct tst_pchan *priv = self->priv;
+	enum tst_chan_state chs = TST_STATE_GET(&self->mach, TST_STATE_ANY);
+	enum tst_pchan_state phs = TST_STATE_GET(&priv->mach, TST_STATE_ANY);
+
+	if (events | EPOLLOUT) {
+		priv->out_full = 0;
+
+		switch (chs) {
+		case CHS_RECV:
+			if (phs == PCS_RECV_DATA)
+				break;
+
+			self->ops->recv(self);
+			break;
+		case CHS_SEND:
+			if (phs == PCS_RECV_ACK)
+				break;
+
+			self->ops->send(self);
+			break;
+		case CHS_READY:
+		case CHS_CLOSED:
+			TST_STATE_EXP(&priv->mach, 1 << PCS_IDLE);
+			break;
+		}
+	}
+
+	if (events | EPOLLIN) {
+		switch (chs) {
+		case CHS_RECV:
+			if (phs == PCS_SEND_ACK)
+				break;
+
+			self->ops->recv(self);
+			break;
+		case CHS_SEND:
+			if (phs == PCS_SEND_DATA)
+				break;
+
+			self->ops->send(self);
+			break;
+		case CHS_READY:
+		case CHS_CLOSED:
+			TST_STATE_EXP(&priv->mach, 1 << PCS_IDLE);
+			break;
+		}
+	}
+
+	if (events & EPOLLERR) {
+		switch (phs) {
+		case PCS_RECV_DATA:
+		case PCS_SEND_ACK:
+		case PCS_SEND_DATA:
+			tst_brk(TBROK, "Channel closed during operation");
+		default:
+			break;
+		}
+
+		if (chs != CHS_CLOSED)
+			self->ops->close(self);
+	}
+
+	return 1;
+}
+
+static struct tst_chan_ops pipe_chan_ops = {
+	.close = tst_pchan_close,
+	.send = pipe_chan_send,
+	.recv = pipe_chan_recv,
+	.on_epoll = pipe_chan_on_epoll,
+};
+
+void tst_pchan_open(struct tst_chan *self,
+		    int infd, int outfd,
+		    struct tst_evloop *const evloop)
+{
+	struct tst_pchan *priv = self->priv;
+
+	self->mach.mat = &chan_state_mat;
+	TST_STATE_EXP(&self->mach, 1 << CHS_CLOSED);
+
+	self->ops = &pipe_chan_ops;
+
+	if (!priv)
+		tst_brk(TBROK, "Channel should have pipe_chan priv preallocated");
+
+	priv->mach.mat = &pchan_state_mat;
+	TST_STATE_EXP(&priv->mach, 1 << PCS_IDLE);
+
+	priv->out_full = 0;
+	priv->infd = infd;
+	priv->outfd = outfd;
+	priv->envelope_buf.ptr = (char *)&priv->envelope;
+	priv->envelope_buf.len = sizeof(priv->envelope);
+
+	if (!evloop)
+		goto out;
+
+	self->mode = CHM_ASYNC;
+	self->evdata.on_epoll = (tst_on_epoll_fn)self->ops->on_epoll;
+	self->evdata.self = self;
+	tst_evloop_add(evloop, &self->evdata, infd, EPOLLIN);
+	tst_evloop_add(evloop, &self->evdata, outfd, EPOLLOUT | EPOLLET);
+
+out:
+	TST_STATE_SET(&self->mach, CHS_READY);
+	tst_chan_seen(self);
+}
+
+void tst_pchan_close(struct tst_chan *self)
+{
+	struct tst_pchan *priv = self->priv;
+
+	close(priv->infd);
+	close(priv->outfd);
+
+	TST_STATE_SET(&priv->mach, PCS_IDLE);
+	TST_STATE_SET(&self->mach, CHS_CLOSED);
+}
-- 
2.36.1


-- 
Mailing list info: https://lists.linux.it/listinfo/ltp

^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [LTP] [PATCH 5/6] api/worker: Add library for distributing work over multiple procs
  2022-09-27 16:14 [LTP] [PATCH 0/6] Locally distributed work API Richard Palethorpe via ltp
                   ` (3 preceding siblings ...)
  2022-09-27 16:14 ` [LTP] [PATCH 4/6] api/channel: Add channel abstraction for message passing Richard Palethorpe via ltp
@ 2022-09-27 16:14 ` Richard Palethorpe via ltp
  2022-09-27 16:14 ` [LTP] [PATCH 6/6] read_all: Migrate to the worker lib Richard Palethorpe via ltp
  5 siblings, 0 replies; 9+ messages in thread
From: Richard Palethorpe via ltp @ 2022-09-27 16:14 UTC (permalink / raw)
  To: ltp; +Cc: Richard Palethorpe

Builds on the channel, state machine and event loop APIs. Allows one
to create multiple processes (workers) and distribute work to them
using messages.

The main test process can create, send and receive messages from the
workers. The main process can communicate async or synchronously with
each worker over the worker's channel.

Initially workers are internally synchronous and do not communicate
directly with each other. However there is nothing preventing them
from creating an event loop or using channels between one another.

Presently workers are expected to be local processes. However various
types of isolation could be used. The main process just needs to share
a channel with them (pipe) and send and receive signals with them.

This is motivated by suggested additions to the read_all test and LTX.

Signed-off-by: Richard Palethorpe <rpalethorpe@suse.com>
---
 include/tst_worker.h |  62 ++++++++++
 lib/tst_worker.c     | 285 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 347 insertions(+)
 create mode 100644 include/tst_worker.h
 create mode 100644 lib/tst_worker.c

diff --git a/include/tst_worker.h b/include/tst_worker.h
new file mode 100644
index 000000000..b5c51a66c
--- /dev/null
+++ b/include/tst_worker.h
@@ -0,0 +1,62 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Copyright (c) 2022 SUSE LLC <rpalethorpe@suse.com>
+ */
+
+#include "tst_channel.h"
+
+#ifndef TST_WORKER_H
+#define TST_WORKER_H
+
+enum tst_worker_mode {
+	TST_WORKER_SYNC,
+	TST_WORKER_ASYNC
+};
+
+enum tst_worker_state {
+	WS_STOPPED,
+	WS_RUNNING,
+	WS_STOPPING,
+	WS_KILL_SENT,
+	WS_DIED
+};
+
+struct tst_workers;
+struct tst_worker {
+	int i;
+	pid_t pid;
+	struct tst_chan chan;
+	struct tst_pchan pipe_chan;
+	struct tst_workers *group;
+	enum tst_worker_mode mode;
+	struct tst_state_mach mach;
+
+	char display_buf[128];
+	char *name;
+
+	void *priv;
+	int (*run)(struct tst_worker *self);
+	void (*on_stopped)(struct tst_worker *self);
+	void (*on_died)(struct tst_worker *self);
+	void (*on_timeout)(struct tst_worker *self);
+	void (*on_sent)(struct tst_worker *self, char *sent, size_t len);
+	void (*on_recved)(struct tst_worker *self, char *recv, size_t len);
+};
+
+struct tst_workers {
+	long long timeout;
+	struct tst_evloop evloop;
+
+	long count;
+	struct tst_worker *vec;
+};
+
+void tst_workers_setup(struct tst_workers *self);
+void tst_workers_cleanup(struct tst_workers *self);
+
+void tst_worker_start(struct tst_worker *self);
+int tst_worker_ttl(struct tst_worker *self);
+void tst_worker_kill(struct tst_worker *self);
+char *tst_worker_idstr(struct tst_worker *self);
+
+#endif
diff --git a/lib/tst_worker.c b/lib/tst_worker.c
new file mode 100644
index 000000000..4bb2398e1
--- /dev/null
+++ b/lib/tst_worker.c
@@ -0,0 +1,285 @@
+// SPDX-License-Identifier: GPL-2.0-or-later
+/*
+ * Copyright (c) 2022 SUSE LLC <rpalethorpe@suse.com>
+ */
+
+#define _GNU_SOURCE
+#define TST_NO_DEFAULT_MAIN
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/wait.h>
+
+#include "tst_test.h"
+#include "tst_worker.h"
+
+static struct tst_state_matrix worker_state_mat = {
+	.names = {
+		[WS_STOPPED] = "Stopped",
+		[WS_RUNNING] = "Running",
+		[WS_STOPPING] = "Stopping",
+		[WS_KILL_SENT] = "Kill sent",
+		[WS_DIED] = "Dead"
+	},
+	.states = {
+		[WS_STOPPED] = 1 << WS_RUNNING,
+		[WS_RUNNING] = 1 << WS_STOPPING | 1 << WS_STOPPED | 1 << WS_KILL_SENT | 1 << WS_DIED,
+		[WS_STOPPING] = 1 << WS_STOPPED | 1 << WS_KILL_SENT | 1 << WS_DIED,
+		[WS_KILL_SENT] = 1 << WS_STOPPED | 1 << WS_DIED,
+		[WS_DIED] = 1 << WS_RUNNING,
+	},
+};
+
+static char *idstr(struct tst_worker *self)
+{
+	if (self->display_buf[0] != '\0')
+		return self->display_buf;
+
+	snprintf(self->display_buf,
+		 sizeof(self->display_buf) - 1,
+		 "%s Worker %d (%d)", self->name, self->pid, self->i);
+
+	self->display_buf[sizeof(self->display_buf) - 1] = '\0';
+
+	return self->display_buf;
+}
+
+static void worker_chan_on_send(struct tst_chan *chan, char *sent, size_t len)
+{
+	struct tst_worker *self = chan->user_priv;
+
+	if (self->on_sent)
+		self->on_sent(self, sent, len);
+}
+
+static void worker_chan_on_recv(struct tst_chan *chan, char *recved, size_t len)
+{
+	struct tst_worker *self = chan->user_priv;
+
+	if (self->on_recved)
+		self->on_recved(self, recved, len);
+}
+
+char *tst_worker_idstr(struct tst_worker *self)
+{
+	return idstr(self);
+}
+
+void tst_worker_start(struct tst_worker *self)
+{
+	struct tst_evloop *evloop = NULL;
+	int infd[2];
+	int outfd[2];
+
+	SAFE_PIPE2(infd, O_CLOEXEC);
+	SAFE_PIPE2(outfd, O_CLOEXEC);
+
+	self->chan.user_priv = self;
+	self->chan.priv = &self->pipe_chan;
+	tst_chan_seen(&self->chan);
+	self->pid = SAFE_FORK();
+
+	if (!self->pid) {
+		self->pid = getpid();
+
+		close(infd[0]);
+		close(outfd[1]);
+		tst_pchan_open(&self->chan, outfd[0], infd[1], NULL);
+
+		if (!tst_worker_ttl(self))
+			tst_res(TWARN, "Worker timeout is too short; restarts take >%lldus", tst_chan_elapsed(&self->chan));
+
+		exit(self->run(self));
+	}
+
+	close(infd[1]);
+	close(outfd[0]);
+
+	self->chan.on_send = worker_chan_on_send;
+	self->chan.on_recv = worker_chan_on_recv;
+
+	if (self->mode == TST_WORKER_ASYNC)
+		evloop = &self->group->evloop;
+
+	tst_pchan_open(&self->chan, infd[0], outfd[1], evloop);
+
+	tst_res(TINFO, "%s: Started", idstr(self));
+	TST_STATE_SET(&self->mach, WS_RUNNING);
+}
+
+int tst_worker_ttl(struct tst_worker *self)
+{
+	long long t = self->group->timeout;
+
+	return MAX(0LL, t - tst_chan_elapsed(&self->chan));
+}
+
+void tst_worker_kill(struct tst_worker *w)
+{
+	const enum tst_worker_state ws =
+		TST_STATE_GET(&w->mach, 1 << WS_RUNNING | 1 << WS_STOPPING | 1 << WS_KILL_SENT);
+
+	if (ws != WS_KILL_SENT) {
+		if (TST_STATE_GET(&w->chan.mach, TST_STATE_ANY) != CHS_CLOSED)
+			w->chan.ops->close(&w->chan);
+
+		tst_chan_seen(&w->chan);
+
+		SAFE_KILL(w->pid, SIGKILL);
+		TST_STATE_SET(&w->mach, WS_KILL_SENT);
+		return;
+	}
+
+	tst_res(TWARN, "%s: Timed out again after KILL signal sent", idstr(w));
+
+	if (w->on_died)
+		w->on_died(w);
+}
+
+static int workers_waitpid(struct tst_workers *self)
+{
+	struct tst_worker *w = self->vec;
+	int i,  ws;
+	const pid_t pid = waitpid(-1, &ws, WNOHANG);
+
+	if (!pid || (pid == -1 && errno == ECHILD))
+		return 0;
+
+	if (pid == -1)
+		tst_brk(TBROK | TERRNO, "waitpid(-1, &ws, WNOHANG)");
+
+	for (i = 0; i < self->count; i++) {
+		if (w[i].pid == pid)
+			break;
+	}
+
+	if (i == self->count) {
+		tst_res(TWARN, "Received SIGCHLD for untracked PID: %d", pid);
+
+		if (WIFEXITED(ws))
+			tst_res(TINFO, "PID: %d: Exited: %d", pid, WEXITSTATUS(ws));
+		if (WIFSIGNALED(ws))
+			tst_res(TINFO, "PID: %d: Killed: %s", pid, tst_strsig(WTERMSIG(ws)));
+		if (WCOREDUMP(ws))
+			tst_res(TINFO, "PID: %d: Core dumped", pid);
+	}
+
+	w += i;
+
+	if (WIFSTOPPED(ws) || WIFCONTINUED(ws))
+		return 1;
+
+	if (TST_STATE_GET(&w->chan.mach, TST_STATE_ANY) != CHS_CLOSED)
+		w->chan.ops->close(&w->chan);
+
+	if (WIFEXITED(ws) && !WEXITSTATUS(ws)) {
+		TST_STATE_SET(&w->mach, WS_STOPPED);
+
+		if (w->on_stopped)
+			w->on_stopped(w);
+		else
+			tst_res(TINFO, "%s: Stopped", idstr(w));
+	} else {
+		TST_STATE_SET(&w->mach, WS_DIED);
+
+		if (w->on_died)
+			w->on_died(w);
+		else
+			tst_brk(TBROK, "%s: Died", idstr(w));
+	}
+
+	return 1;
+}
+
+static int workers_on_signal(struct tst_evloop *self,
+			     struct signalfd_siginfo *si)
+{
+	struct tst_workers *const workers = self->priv;
+
+	if (si->ssi_signo != SIGCHLD) {
+		tst_brk(TBROK,
+			"Don't know how to handle signal %s",
+			tst_strsig(si->ssi_signo));
+		return 0;
+	}
+
+	while (workers_waitpid(workers))
+		;
+
+	return 1;
+}
+
+static int workers_on_cont(struct tst_evloop *self)
+{
+	struct tst_workers *const workers = self->priv;
+	int i, stopped = 0;
+
+	for (i = 0; i < workers->count; i++) {
+		struct tst_worker *w = workers->vec + i;
+		const enum tst_worker_state ws = TST_STATE_GET(&w->mach, TST_STATE_ANY);
+
+		if (ws == WS_STOPPED || ws == WS_DIED) {
+			stopped++;
+			continue;
+		}
+
+		if (tst_worker_ttl(w))
+			continue;
+
+		if (w->on_timeout) {
+			w->on_timeout(w);
+		} else {
+			tst_res(TINFO, "%s: Timedout", idstr(w));
+			tst_worker_kill(w);
+		}
+	}
+
+	if (stopped == workers->count)
+		return 0;
+
+	return 1;
+}
+
+void tst_workers_setup(struct tst_workers *self)
+{
+	int i;
+
+	self->evloop.priv = self;
+	self->evloop.on_cont = workers_on_cont;
+	self->evloop.on_signal = workers_on_signal;
+
+	tst_evloop_setup(&self->evloop);
+
+	for (i = 0; i < self->count; i++) {
+		struct tst_worker *w = self->vec + i;
+
+		w->mach.mat = &worker_state_mat;
+		TST_STATE_EXP(&w->mach, 1 << WS_STOPPED);
+
+		w->i = i;
+		w->group = self;
+	}
+}
+
+void tst_workers_cleanup(struct tst_workers *self)
+{
+	int i;
+
+	for (i = 0; i < self->count; i++) {
+		struct tst_worker *w = self->vec + i;
+		const enum tst_worker_state ws = TST_STATE_GET(&w->mach, TST_STATE_ANY);
+
+		if (TST_STATE_GET(&w->chan.mach, 1 << CHS_CLOSED) != CHS_CLOSED)
+			w->chan.ops->close(&w->chan);
+
+		if (ws != WS_STOPPED) {
+			if (ws != WS_KILL_SENT)
+				SAFE_KILL(w->pid, SIGKILL);
+
+			tst_res(TWARN, "%s: Still running", idstr(w));
+		}
+	}
+
+	tst_evloop_cleanup(&self->evloop);
+}
-- 
2.36.1


-- 
Mailing list info: https://lists.linux.it/listinfo/ltp

^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [LTP] [PATCH 6/6] read_all: Migrate to the worker lib
  2022-09-27 16:14 [LTP] [PATCH 0/6] Locally distributed work API Richard Palethorpe via ltp
                   ` (4 preceding siblings ...)
  2022-09-27 16:14 ` [LTP] [PATCH 5/6] api/worker: Add library for distributing work over multiple procs Richard Palethorpe via ltp
@ 2022-09-27 16:14 ` Richard Palethorpe via ltp
  5 siblings, 0 replies; 9+ messages in thread
From: Richard Palethorpe via ltp @ 2022-09-27 16:14 UTC (permalink / raw)
  To: ltp; +Cc: Richard Palethorpe

Use the worker API added in the previous commit.

Signed-off-by: Richard Palethorpe <rpalethorpe@suse.com>
---
 testcases/kernel/fs/read_all/read_all.c | 588 +++++++-----------------
 1 file changed, 171 insertions(+), 417 deletions(-)

diff --git a/testcases/kernel/fs/read_all/read_all.c b/testcases/kernel/fs/read_all/read_all.c
index 266678ea7..6d3ff4594 100644
--- a/testcases/kernel/fs/read_all/read_all.c
+++ b/testcases/kernel/fs/read_all/read_all.c
@@ -27,9 +27,9 @@
  * overridden with the 'w' parameters.
  */
 #include <signal.h>
+#include <stdint.h>
 #include <sys/types.h>
 #include <sys/stat.h>
-#include <sys/wait.h>
 #include <fnmatch.h>
 #include <lapi/fnmatch.h>
 #include <stdlib.h>
@@ -40,7 +40,6 @@
 #include <unistd.h>
 #include <string.h>
 #include <limits.h>
-#include <semaphore.h>
 #include <ctype.h>
 #include <pwd.h>
 #include <grp.h>
@@ -49,28 +48,12 @@
 #include "tst_safe_clocks.h"
 #include "tst_test.h"
 #include "tst_timer.h"
+#include "tst_worker.h"
 
-#define QUEUE_SIZE 16384
 #define BUFFER_SIZE 1024
 #define MAX_PATH 4096
 #define MAX_DISPLAY 40
 
-struct queue {
-	sem_t sem;
-	int front;
-	int back;
-	char data[QUEUE_SIZE];
-	char popped[BUFFER_SIZE];
-};
-
-struct worker {
-	int i;
-	pid_t pid;
-	struct queue *q;
-	int last_seen;
-	unsigned int kill_sent:1;
-};
-
 enum dent_action {
 	DA_UNKNOWN,
 	DA_IGNORE,
@@ -78,19 +61,21 @@ enum dent_action {
 	DA_VISIT,
 };
 
+struct path_worker {
+	char cur[MAX_PATH];
+	char next[MAX_PATH];
+};
+
 static char *verbose;
 static char *quiet;
 static char *root_dir;
 static char *str_reads;
 static int reads = 1;
 static char *str_worker_count;
-static long worker_count;
 static char *str_max_workers;
 static long max_workers = 15;
-static struct worker *workers;
 static char *drop_privs;
 static char *str_worker_timeout;
-static int worker_timeout;
 static int timeout_warnings_left = 15;
 
 static char *blacklist[] = {
@@ -101,83 +86,9 @@ static char *blacklist[] = {
 	"/sys/*/cpu??*(?)/*",	/* cpu* entries with 2 or more digits */
 };
 
-static long long epoch;
-
-static int atomic_timestamp(void)
-{
-	struct timespec now;
-
-	SAFE_CLOCK_GETTIME(CLOCK_MONOTONIC_RAW, &now);
-
-	return tst_timespec_to_us(now) - epoch;
-}
-
-static int queue_pop(struct queue *q)
-{
-	int i = q->front, j = 0;
-
-	sem_wait(&q->sem);
-
-	if (!q->data[i])
-		return 0;
-
-	while (q->data[i]) {
-		q->popped[j] = q->data[i];
-
-		if (++j >= BUFFER_SIZE - 1)
-			tst_brk(TBROK, "Buffer is too small for path");
-
-		 i = (i + 1) % QUEUE_SIZE;
-	}
-
-	q->popped[j] = '\0';
-	tst_atomic_store((i + 1) % QUEUE_SIZE, &q->front);
-
-	return 1;
-}
-
-static int queue_push(struct queue *q, const char *buf)
-{
-	int i = q->back, j = 0;
-	int front = tst_atomic_load(&q->front);
-
-	do {
-		q->data[i] = buf[j];
-
-		i = (i + 1) % QUEUE_SIZE;
-
-		if (i == front)
-			return 0;
-
-	} while (buf[j++]);
-
-	q->back = i;
-	sem_post(&q->sem);
-
-	return 1;
-}
-
-static struct queue *queue_init(void)
-{
-	struct queue *q = SAFE_MMAP(NULL, sizeof(*q),
-				    PROT_READ | PROT_WRITE,
-				    MAP_SHARED | MAP_ANONYMOUS,
-				    0, 0);
-
-	sem_init(&q->sem, 1, 0);
-	q->front = 0;
-	q->back = 0;
-
-	return q;
-}
-
-static void queue_destroy(struct queue *q, int is_worker)
-{
-	if (is_worker)
-		sem_destroy(&q->sem);
-
-	SAFE_MUNMAP(q, sizeof(*q));
-}
+static struct tst_workers workers;
+static struct path_worker *worker_privs;
+struct tst_evloop evloop;
 
 static void sanitize_str(char *buf, ssize_t count)
 {
@@ -208,63 +119,45 @@ static int is_blacklisted(const char *path)
 	return 0;
 }
 
-static void worker_heartbeat(const int worker)
-{
-	tst_atomic_store(atomic_timestamp(), &workers[worker].last_seen);
-}
-
-static int worker_elapsed(const int worker)
-{
-	struct worker *const w = workers + worker;
-
-	return atomic_timestamp() - tst_atomic_load(&w->last_seen);
-}
-
-static int worker_ttl(const int worker)
-{
-	return MAX(0, worker_timeout - worker_elapsed(worker));
-}
-
-static void read_test(const int worker, const char *const path)
+static void read_test(struct tst_worker *self, char *path)
 {
 	char buf[BUFFER_SIZE];
 	int fd;
 	ssize_t count;
-	const pid_t pid = workers[worker].pid;
-	int elapsed;
+	long long elapsed;
 
 	if (is_blacklisted(path))
 		return;
 
 	if (verbose)
-		tst_res(TINFO, "Worker %d: %s(%s)", pid, __func__, path);
+		tst_res(TINFO, "%s: %s(%s)", tst_worker_idstr(self), __func__, path);
 
 	fd = open(path, O_RDONLY | O_NONBLOCK);
 	if (fd < 0) {
 		if (!quiet) {
-			tst_res(TINFO | TERRNO, "Worker %d (%d): open(%s)",
-				pid, worker, path);
+			tst_res(TINFO | TERRNO, "%s: open(%s)",
+			        tst_worker_idstr(self), path);
 		}
 		return;
 	}
 
-	worker_heartbeat(worker);
+	elapsed = tst_chan_elapsed(&self->chan);
 	count = read(fd, buf, sizeof(buf) - 1);
-	elapsed = worker_elapsed(worker);
+	elapsed = tst_chan_elapsed(&self->chan) - elapsed;
 
 	if (count > 0 && verbose) {
 		sanitize_str(buf, count);
 		tst_res(TINFO,
-			"Worker %d (%d): read(%s, buf) = %zi, buf = %s, elapsed = %dus",
-			pid, worker, path, count, buf, elapsed);
+			"%s: read(%s, buf) = %zi, buf = %s, elapsed = %llus",
+			tst_worker_idstr(self), path, count, buf, elapsed);
 	} else if (!count && verbose) {
 		tst_res(TINFO,
-			"Worker %d (%d): read(%s) = EOF, elapsed = %dus",
-			pid, worker, path, elapsed);
+			"%s: read(%s) = EOF, elapsed = %llus",
+			tst_worker_idstr(self), path, elapsed);
 	} else if (count < 0 && !quiet) {
 		tst_res(TINFO | TERRNO,
-			"Worker %d (%d): read(%s), elapsed = %dus",
-			pid, worker, path, elapsed);
+			"%s: read(%s), elapsed = %llus",
+			tst_worker_idstr(self), path, elapsed);
 	}
 
 	SAFE_CLOSE(fd);
@@ -294,254 +187,213 @@ static void maybe_drop_privs(void)
 		tst_brk(TBROK | TTERRNO, "Failed to use nobody uid");
 }
 
-static int worker_run(int worker)
+static void visit_dir(struct tst_worker *self, const char *path)
 {
+	DIR *dir;
+	struct dirent *dent;
+	struct stat dent_st;
+	char dent_path[MAX_PATH];
+	enum dent_action act;
+
+	dir = opendir(path);
+	if (!dir) {
+		tst_res(TINFO | TERRNO, "opendir(%s)", path);
+		return;
+	}
+
+	while (1) {
+		errno = 0;
+		dent = readdir(dir);
+		if (!dent && errno) {
+			tst_res(TINFO | TERRNO, "readdir(%s)", path);
+			break;
+		} else if (!dent) {
+			break;
+		}
+
+		if (!strcmp(dent->d_name, ".") ||
+		    !strcmp(dent->d_name, ".."))
+			continue;
+
+		if (dent->d_type == DT_DIR)
+			act = DA_VISIT;
+		else if (dent->d_type == DT_LNK)
+			act = DA_IGNORE;
+		else if (dent->d_type == DT_UNKNOWN)
+			act = DA_UNKNOWN;
+		else
+			act = DA_READ;
+
+		snprintf(dent_path, MAX_PATH,
+			 "%s/%s", path, dent->d_name);
+
+		if (act == DA_UNKNOWN) {
+			if (lstat(dent_path, &dent_st))
+				tst_res(TINFO | TERRNO, "lstat(%s)", path);
+			else if ((dent_st.st_mode & S_IFMT) == S_IFDIR)
+				act = DA_VISIT;
+			else if ((dent_st.st_mode & S_IFMT) == S_IFLNK)
+				act = DA_IGNORE;
+			else
+				act = DA_READ;
+		}
+
+		if (act == DA_VISIT)
+			visit_dir(self, dent_path);
+		else if (act == DA_READ)
+			tst_chan_send(&self->chan, dent_path, strlen(dent_path) + 1);
+	}
+
+	if (closedir(dir))
+		tst_res(TINFO | TERRNO, "closedir(%s)", path);
+}
+
+static int dir_worker_run(struct tst_worker *self)
+{
+	visit_dir(self, root_dir);
+
+	tst_res(TINFO, "Dir Worker %d (%d): fin.", self->pid, self->i);
+	tst_chan_send(&self->chan, "", 1);
+
+	return 0;
+}
+
+static int path_worker_run(struct tst_worker *self)
+{
+	char buf[BUFFER_SIZE];
 	struct sigaction term_sa = {
 		.sa_handler = SIG_IGN,
 		.sa_flags = 0,
 	};
-	struct worker *const self = workers + worker;
-	struct queue *q = self->q;
 
 	sigaction(SIGTTIN, &term_sa, NULL);
 	maybe_drop_privs();
-	self->pid = getpid();
-
-	if (!worker_ttl(self->i)) {
-		tst_brk(TBROK,
-			"Worker timeout is too short; restarts take >%dus",
-			worker_elapsed(self->i));
-	}
 
 	while (1) {
-		worker_heartbeat(worker);
+		tst_chan_recv(&self->chan, buf, PATH_MAX);
 
-		if (!queue_pop(q))
+		if (!buf[0])
 			break;
 
-		read_test(worker, q->popped);
+		read_test(self, buf);
 	}
 
-	queue_destroy(q, 1);
 	tst_flush();
 	return 0;
 }
 
-static void spawn_workers(void)
+static void path_worker_resend(struct tst_worker *const self)
 {
-	int i;
-	struct worker *wa = workers;
-
-	memset(workers, 0, worker_count * sizeof(*workers));
+	struct path_worker *pw = self->priv;
 
-	for (i = 0; i < worker_count; i++) {
-		wa[i].i = i;
-		wa[i].q = queue_init();
-		wa[i].last_seen = atomic_timestamp();
-		wa[i].pid = SAFE_FORK();
-		if (!wa[i].pid)
-			exit(worker_run(i));
-	}
+	tst_chan_send(&self->chan, pw->next, strlen(pw->next));
 }
 
-static void restart_worker(const int worker)
+static void do_next_path(struct tst_worker *path_worker)
 {
-	struct worker *const w = workers + worker;
-	int wstatus, ret, i, q_len;
+	size_t slen = 1;
+	struct tst_worker *const dir_worker = workers.vec;
+	struct path_worker *pw = path_worker->priv;
 
-	if (!w->kill_sent) {
-		SAFE_KILL(w->pid, SIGKILL);
-		w->kill_sent = 1;
-		worker_heartbeat(worker);
-	}
-
-	ret = waitpid(w->pid, &wstatus, WNOHANG);
+	pw->next[0] = '\0';
 
-	if (!ret) {
-		if (worker_ttl(worker) > 0)
-			return;
+	if (TST_STATE_GET(&dir_worker->mach, TST_STATE_ANY) != WS_RUNNING)
+		goto send;
 
-		if (!quiet || timeout_warnings_left) {
-			tst_res(TINFO,
-				"Worker %d (%d): Timeout waiting after kill",
-				w->pid, worker);
-		}
-	} else if (ret != w->pid) {
-		tst_brk(TBROK | TERRNO, "Worker %d (%d): waitpid = %d",
-			w->pid, worker, ret);
-	}
+	tst_chan_recv(&dir_worker->chan, pw->next, BUFFER_SIZE);
 
-	w->kill_sent = 0;
+	if (!pw->next[0])
+		TST_STATE_SET(&dir_worker->mach, WS_STOPPING);
 
-	if (!w->q->popped[0]) {
-		tst_brk(TBROK,
-			"Worker %d (%d): Timed out, but doesn't appear to be reading anything",
-			w->pid, worker);
-	}
+	slen = dir_worker->chan.in.len;
+send:
+	tst_chan_send(&path_worker->chan, pw->next, slen);
 
-	if (!quiet || timeout_warnings_left) {
-		tst_res(TINFO, "Worker %d (%d): Last popped '%s'",
-			w->pid, worker, w->q->popped);
-	}
+}
 
-	/* Make sure the queue length and semaphore match. Threre is a
-	 * race in qeue_pop where the semaphore can be decremented
-	 * then the worker killed before updating q->front
-	 */
-	q_len = 0;
-	i = w->q->front;
-	while (i != w->q->back) {
-		if (!w->q->data[i])
-			q_len++;
-
-		i = (i + 1) % QUEUE_SIZE;
-	}
+static void path_worker_sent(struct tst_worker *self, char *path, size_t len)
+{
+	struct path_worker *pw = self->priv;
 
-	ret = sem_destroy(&w->q->sem);
-	if (ret == -1)
-		tst_brk(TBROK | TERRNO, "sem_destroy");
-	ret = sem_init(&w->q->sem, 1, q_len);
-	if (ret == -1)
-		tst_brk(TBROK | TERRNO, "sem_init");
+	memcpy(pw->cur, path, len);
 
-	worker_heartbeat(worker);
-	w->pid = SAFE_FORK();
+	if (!path[0])
+		return;
 
-	if (!w->pid)
-		exit(worker_run(worker));
+	do_next_path(self);
 }
 
-static void check_timeout_warnings_limit(void)
+static int check_timeout_warnings_limit(void)
 {
 	if (!quiet)
-		return;
+		return 1;
 
 	timeout_warnings_left--;
 
 	if (timeout_warnings_left)
-		return;
+		return 1;
 
 	tst_res(TINFO,
 		"Silencing timeout warnings; consider increasing LTP_RUNTIME_MUL or removing -q");
-}
-
-static int try_push_work(const int worker, const char *buf)
-{
-	int ret = 0;
-	int elapsed;
-	struct worker *const w = workers + worker;
-
-	if (w->kill_sent) {
-		restart_worker(worker);
-		return 0;
-	}
-
-	ret = queue_push(w->q, buf);
-	if (ret)
-		return 1;
-
-	elapsed = worker_elapsed(worker);
-
-	if (elapsed > worker_timeout) {
-		if (!quiet || timeout_warnings_left) {
-			tst_res(TINFO,
-				"Worker %d (%d): Stuck for %dus, restarting it",
-				w->pid, worker, elapsed);
-			check_timeout_warnings_limit();
-		}
-		restart_worker(worker);
-	}
 
 	return 0;
 }
 
-static void push_work(const int worker, const char *buf)
+static void path_worker_died(struct tst_worker *self)
 {
-	int sleep_time = 1;
+	struct path_worker *pw = self->priv;
 
-	while (!try_push_work(worker, buf)) {
-		const int ttl = worker_ttl(worker);
-
-		sleep_time = MIN(2 * sleep_time, ttl);
-		usleep(sleep_time);
+	if (pw->cur[0] == '\0') {
+		tst_brk(TBROK,
+			"%s: Died, but doesn't appear to be reading anything",
+			tst_worker_idstr(self));
 	}
-}
-
-static void stop_workers(void)
-{
-	const char stop_code[1] = { '\0' };
-	int i;
 
-	if (!workers)
-		return;
+	if (check_timeout_warnings_limit())
+		tst_res(TINFO, "%s: Died; Last sent '%s'", tst_worker_idstr(self), pw->cur);
 
-	for (i = 0; i < worker_count; i++) {
-		if (workers[i].q)
-			push_work(i, stop_code);
-	}
+	tst_worker_start(self);
+	path_worker_resend(self);
 }
 
-static void destroy_workers(void)
+static void spawn_workers(void)
 {
 	int i;
+	long wcount = workers.count;
+	struct tst_worker *wa = workers.vec;
 
-	if (!workers)
-		return;
-
-	for (i = 0; i < worker_count; i++) {
-		if (workers[i].q) {
-			queue_destroy(workers[i].q, 0);
-			workers[i].q = 0;
-		}
-	}
-}
-
-static int sched_work(const int first_worker,
-		      const char *path, int repetitions)
-{
-	int i, j;
-	int min_ttl = worker_timeout, sleep_time = 1;
-	int pushed, workers_pushed = 0;
-
-	for (i = 0, j = first_worker; i < repetitions; j++) {
-		if (j >= worker_count)
-			j = 0;
-
-		if (j == first_worker && !workers_pushed) {
-			sleep_time = MIN(2 * sleep_time, min_ttl);
-			usleep(sleep_time);
-			min_ttl = worker_timeout;
-		}
+	wa[0].name = "Dir";
+	wa[0].run = dir_worker_run;
+	wa[0].mode = TST_WORKER_SYNC;
 
-		if (j == first_worker)
-			workers_pushed = 0;
+	tst_worker_start(wa);
 
-		pushed = try_push_work(j, path);
-		i += pushed;
-		workers_pushed += pushed;
+	for (i = 1; i < wcount; i++) {
+		wa[i].name = "Path";
+		wa[i].run = path_worker_run;
+		wa[i].on_sent = path_worker_sent;
+		wa[i].on_died = path_worker_died;
+		wa[i].mode = TST_WORKER_ASYNC;
+		wa[i].priv = worker_privs + i;
 
-		if (!pushed)
-			min_ttl = MIN(min_ttl, worker_ttl(j));
+		tst_worker_start(wa + i);
+		do_next_path(wa + i);
 	}
-
-	return j;
 }
 
 static void setup(void)
 {
-	struct timespec now;
-
 	if (tst_parse_int(str_reads, &reads, 1, INT_MAX))
 		tst_brk(TBROK,
 			"Invalid reads (-r) argument: '%s'", str_reads);
 
-	if (tst_parse_long(str_max_workers, &max_workers, 1, LONG_MAX)) {
+	if (tst_parse_long(str_max_workers, &max_workers, 2, LONG_MAX)) {
 		tst_brk(TBROK,
 			"Invalid max workers (-w) argument: '%s'",
 			str_max_workers);
 	}
 
-	if (tst_parse_long(str_worker_count, &worker_count, 1, LONG_MAX)) {
+	if (tst_parse_long(str_worker_count, &workers.count, 2, LONG_MAX)) {
 		tst_brk(TBROK,
 			"Invalid worker count (-W) argument: '%s'",
 			str_worker_count);
@@ -550,139 +402,41 @@ static void setup(void)
 	if (!root_dir)
 		tst_brk(TBROK, "The directory argument (-d) is required");
 
-	if (!worker_count)
-		worker_count = MIN(MAX(tst_ncpus() - 1, 1L), max_workers);
-	workers = SAFE_MALLOC(worker_count * sizeof(*workers));
+	if (!workers.count)
+		workers.count = MIN(MAX(tst_ncpus() - 1, 2L), max_workers);
 
-	if (tst_parse_int(str_worker_timeout, &worker_timeout, 1, INT_MAX)) {
+	workers.vec = SAFE_MALLOC(workers.count * sizeof(workers.vec[0]));
+	memset(workers.vec, 0, workers.count * sizeof(workers.vec[0]));
+	worker_privs = SAFE_MALLOC(workers.count * sizeof(struct path_worker));
+
+	if (tst_parse_int(str_worker_timeout, (int *)&workers.timeout, 1, INT_MAX)) {
 		tst_brk(TBROK,
 			"Invalid worker timeout (-t) argument: '%s'",
 			str_worker_count);
 	}
 
-	if (worker_timeout) {
-		tst_res(TINFO, "Worker timeout forcibly set to %dms",
-			worker_timeout);
+	if (workers.timeout) {
+		tst_res(TINFO, "Worker timeout forcibly set to %lldms",
+			workers.timeout);
 	} else {
-		worker_timeout = 10 * tst_remaining_runtime();
-		tst_res(TINFO, "Worker timeout set to 10%% of max_runtime: %dms",
-			worker_timeout);
+		workers.timeout = 10 * tst_remaining_runtime();
+		tst_res(TINFO, "Worker timeout set to 10%% of max_runtime: %lldms",
+			workers.timeout);
 	}
-	worker_timeout *= 1000;
+	workers.timeout *= 1000;
 
-	SAFE_CLOCK_GETTIME(CLOCK_MONOTONIC_RAW, &now);
-	epoch = tst_timespec_to_us(now);
-}
-
-static void reap_children(void)
-{
-	int status, bad_exit = 0;
-	pid_t pid;
-
-	for (;;) {
-		pid = wait(&status);
-
-		if (pid > 0) {
-			if (!WIFEXITED(status))
-				bad_exit = 1;
-
-			continue;
-		}
-
-		if (errno == ECHILD)
-			break;
-
-		if (errno == EINTR)
-			continue;
-
-		tst_brk(TBROK | TERRNO, "wait() failed");
-	}
-
-	if (!bad_exit)
-		return;
-
-	tst_res(TINFO,
-		"Zombie workers detected; consider increasing LTP_RUNTIME_MUL");
+	tst_workers_setup(&workers);
 }
 
 static void cleanup(void)
 {
-	stop_workers();
-	reap_children();
-	destroy_workers();
-	free(workers);
-}
-
-static void visit_dir(const char *path)
-{
-	DIR *dir;
-	struct dirent *dent;
-	struct stat dent_st;
-	char dent_path[MAX_PATH];
-	enum dent_action act;
-	int last_sched = 0;
-
-	dir = opendir(path);
-	if (!dir) {
-		tst_res(TINFO | TERRNO, "opendir(%s)", path);
-		return;
-	}
-
-	while (1) {
-		errno = 0;
-		dent = readdir(dir);
-		if (!dent && errno) {
-			tst_res(TINFO | TERRNO, "readdir(%s)", path);
-			break;
-		} else if (!dent) {
-			break;
-		}
-
-		if (!strcmp(dent->d_name, ".") ||
-		    !strcmp(dent->d_name, ".."))
-			continue;
-
-		if (dent->d_type == DT_DIR)
-			act = DA_VISIT;
-		else if (dent->d_type == DT_LNK)
-			act = DA_IGNORE;
-		else if (dent->d_type == DT_UNKNOWN)
-			act = DA_UNKNOWN;
-		else
-			act = DA_READ;
-
-		snprintf(dent_path, MAX_PATH,
-			 "%s/%s", path, dent->d_name);
-
-		if (act == DA_UNKNOWN) {
-			if (lstat(dent_path, &dent_st))
-				tst_res(TINFO | TERRNO, "lstat(%s)", path);
-			else if ((dent_st.st_mode & S_IFMT) == S_IFDIR)
-				act = DA_VISIT;
-			else if ((dent_st.st_mode & S_IFMT) == S_IFLNK)
-				act = DA_IGNORE;
-			else
-				act = DA_READ;
-		}
-
-		if (act == DA_VISIT)
-			visit_dir(dent_path);
-		else if (act == DA_READ)
-			last_sched = sched_work(last_sched, dent_path, reads);
-	}
-
-	if (closedir(dir))
-		tst_res(TINFO | TERRNO, "closedir(%s)", path);
+	tst_workers_cleanup(&workers);
 }
 
 static void run(void)
 {
 	spawn_workers();
-	visit_dir(root_dir);
-
-	stop_workers();
-	reap_children();
-	destroy_workers();
+	tst_evloop_run(&workers.evloop);
 
 	tst_res(TPASS, "Finished reading files");
 }
-- 
2.36.1


-- 
Mailing list info: https://lists.linux.it/listinfo/ltp

^ permalink raw reply related	[flat|nested] 9+ messages in thread

* Re: [LTP] [PATCH 4/6] api/channel: Add channel abstraction for message passing
  2022-09-27 16:14 ` [LTP] [PATCH 4/6] api/channel: Add channel abstraction for message passing Richard Palethorpe via ltp
@ 2022-09-30  9:17   ` Petr Vorel
  0 siblings, 0 replies; 9+ messages in thread
From: Petr Vorel @ 2022-09-30  9:17 UTC (permalink / raw)
  To: Richard Palethorpe; +Cc: ltp

Hi Richie,

...
> diff --git a/lib/tst_channel.c b/lib/tst_channel.c
...
> +static void pipe_chan_send(struct tst_chan *self)
> +{
> +	struct tst_pchan *priv = self->priv;
> +	const enum tst_pchan_state state =
> +		TST_STATE_GET(&priv->mach, 1 << PCS_IDLE | 1 << PCS_SEND_DATA | 1 << PCS_RECV_ACK);
> +
> + 	TST_STATE_EXP(&self->mach, 1 << CHS_READY | 1 << CHS_SEND);
nit: git am complains:

Applying: api/channel: Add channel abstraction for message passing
.git/rebase-apply/patch:293: space before tab in indent.
 	TST_STATE_EXP(&self->mach, 1 << CHS_READY | 1 << CHS_SEND);


Petr

-- 
Mailing list info: https://lists.linux.it/listinfo/ltp

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [LTP] [PATCH 2/6] api/evloop: Add helpers for creating an event loop
  2022-09-27 16:14 ` [LTP] [PATCH 2/6] api/evloop: Add helpers for creating an event loop Richard Palethorpe via ltp
@ 2022-09-30 13:36   ` Petr Vorel
  0 siblings, 0 replies; 9+ messages in thread
From: Petr Vorel @ 2022-09-30 13:36 UTC (permalink / raw)
  To: Richard Palethorpe; +Cc: ltp

Hi Richie,

> +++ b/lib/tst_evloop.c
...
> +void tst_evloop_run(struct tst_evloop *self)
> +{
> +	static int saturated_warn;
> +	const int maxevents = 128;
> +	struct epoll_event events[maxevents];
> +
> +	for (;;) {
> +		const int ev_num = SAFE_EPOLL_WAIT(self->epollfd, events,
> +						   maxevents, self->timeout);
> +
> +		for (int i = 0; i < ev_num; i++)
I'm sorry we still do not allow to use C99 due CentOS 7 still supported:
lib/tst_evloop.c:80:3: error: 'for' loop initial declarations are only allowed in C99 mode

Kind regards,
Petr

-- 
Mailing list info: https://lists.linux.it/listinfo/ltp

^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2022-09-30 13:36 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2022-09-27 16:14 [LTP] [PATCH 0/6] Locally distributed work API Richard Palethorpe via ltp
2022-09-27 16:14 ` [LTP] [PATCH 1/6] api/epoll: Add safe epoll functions Richard Palethorpe via ltp
2022-09-27 16:14 ` [LTP] [PATCH 2/6] api/evloop: Add helpers for creating an event loop Richard Palethorpe via ltp
2022-09-30 13:36   ` Petr Vorel
2022-09-27 16:14 ` [LTP] [PATCH 3/6] api/state_machine: Add validating state machines Richard Palethorpe via ltp
2022-09-27 16:14 ` [LTP] [PATCH 4/6] api/channel: Add channel abstraction for message passing Richard Palethorpe via ltp
2022-09-30  9:17   ` Petr Vorel
2022-09-27 16:14 ` [LTP] [PATCH 5/6] api/worker: Add library for distributing work over multiple procs Richard Palethorpe via ltp
2022-09-27 16:14 ` [LTP] [PATCH 6/6] read_all: Migrate to the worker lib Richard Palethorpe via ltp

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.