All of lore.kernel.org
 help / color / mirror / Atom feed
From: Pradyumna Sampath <pradysam@gmail.com>
To: Carsten Emde <Carsten.Emde@osadl.org>
Cc: "M. Koehrer" <mathias_koehrer@arcor.de>,
	linux-rt-users@vger.kernel.org, rachana.rao@in.abb.com,
	Thomas Gleixner <tglx@linutronix.de>
Subject: Re: [PATCH] Re: mq_timedrecieve timeout accuracy
Date: Tue, 30 Mar 2010 09:41:57 +0200	[thread overview]
Message-ID: <m2u6d09081c1003300041h2bb7bdf6wb30773e513284f31@mail.gmail.com> (raw)
In-Reply-To: <4BB0C269.9050408@osadl.org>

[-- Attachment #1: Type: text/plain, Size: 827 bytes --]

Hi Carsten,

On Mon, Mar 29, 2010 at 5:08 PM, Carsten Emde <Carsten.Emde@osadl.org> wrote:
> Does the attached patch work for you?
>
> - Fixed error handling
> - Replaced schedule_timeout() with schedule_hrtimeout()
> - Let schedule_hrtimeout() handle expired timers

Well no, the mq_timedrecieve blocks for ever using my test program.
This behaviour would not really be evident in the pmqtest since the
frequency of sending the test messages is lesser than the timeout
value.

So, I have modified the pmqtest (attached) to
- Not send test messages

This means when the -T1 option is used it should wait on
mq_timedrecieve for 1 second and then exit. On a normal kernel this
behaviour is demostrated but with the patch on, the timeout never
expires and hence the program does not exit.

regards
/prady

-- 
http://www.prady.in

[-- Attachment #2: pmqtest.c --]
[-- Type: text/plain, Size: 14381 bytes --]

/*
 * pmqtest.c
 *
 * Copyright (C) 2009 Carsten Emde <C.Emde@osadl.org>
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307,
 * USA.
 */

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <limits.h>
#include <fcntl.h>
#include <getopt.h>
#include <signal.h>
#include <string.h>
#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/mman.h>
#include <linux/unistd.h>
#include <utmpx.h>
#include <mqueue.h>
#include "rt-utils.h"
#include "rt-get_cpu.h"

#include <pthread.h>

#define gettid() syscall(__NR_gettid)

#define USEC_PER_SEC 1000000

#define SYNCMQ_NAME "/syncmsg%d"
#define TESTMQ_NAME "/testmsg%d"
#define MSG_SIZE 8
#define MSEC_PER_SEC 1000
#define NSEC_PER_SEC 1000000000

char *syncmsg = "Syncing";
char *testmsg = "Testing";

enum {
	AFFINITY_UNSPECIFIED,
	AFFINITY_SPECIFIED,
	AFFINITY_USEALL
};

struct params {
	int num;
	int cpu;
	int priority;
	int affinity;
	int sender;
	int samples;
	int max_cycles;
	int tracelimit;
	int tid;
	int shutdown;
	int stopped;
	struct timespec delay;
	unsigned int mindiff, maxdiff;
	double sumdiff;
	struct timeval sent, received, diff;
	pthread_t threadid;
	int timeout;
	mqd_t syncmq, testmq;
	char recvsyncmsg[MSG_SIZE];
	char recvtestmsg[MSG_SIZE];
	struct params *neighbor;
	char error[MAX_PATH * 2];
};

int no_send = 1;

void *pmqthread(void *param)
{
	int mustgetcpu = 0;
	struct params *par = param;
	cpu_set_t mask;
	int policy = SCHED_FIFO;
	struct sched_param schedp;
	struct timespec ts;

	memset(&schedp, 0, sizeof(schedp));
	schedp.sched_priority = par->priority;
	sched_setscheduler(0, policy, &schedp);

	if (par->cpu != -1) {
		CPU_ZERO(&mask);
		CPU_SET(par->cpu, &mask);
		if(sched_setaffinity(0, sizeof(mask), &mask) == -1)
			fprintf(stderr,	"WARNING: Could not set CPU affinity "
				"to CPU #%d\n", par->cpu);
	} else
		mustgetcpu = 1;

	par->tid = gettid();

	if (par->timeout > 0) {
		clock_gettime(CLOCK_REALTIME, &ts);
		ts.tv_sec += par->timeout;
	}

	while (!par->shutdown) {
		if (par->sender) {
			/* Send message: Start of latency measurement ... */
			gettimeofday(&par->sent, NULL);
			if (mq_send(par->testmq, testmsg, strlen(testmsg), 1) != 0) {
				fprintf(stderr, "could not send test message\n");
				par->shutdown = 1;
			}
			par->samples++;
			if(par->max_cycles && par->samples >= par->max_cycles)
				par->shutdown = 1;
			if (mustgetcpu)
				par->cpu = get_cpu();
			/* Wait until receiver ready */
			if (par->timeout > 0) {
				if (mq_timedreceive(par->syncmq, par->recvsyncmsg, MSG_SIZE, NULL, &ts)
				    != strlen(syncmsg)) {
					fprintf(stderr, "could not receive sync message\n");
					par->shutdown = 1;
				}
			} else {
				if (mq_receive(par->syncmq, par->recvsyncmsg, MSG_SIZE, NULL) !=
				    strlen(syncmsg)) {
					fprintf(stderr, "could not receive sync message\n");
					par->shutdown = 1;
				}
			}
			if (strcmp(syncmsg, par->recvsyncmsg)) {
				fprintf(stderr, "ERROR: Sync message mismatch detected\n");
				fprintf(stderr, "  %s != %s\n", syncmsg, par->recvsyncmsg);
				par->shutdown = 1;
			}

		} else {
			/* Receiver */
			if (par->timeout > 0) {
				if (mq_timedreceive(par->testmq, par->recvtestmsg, MSG_SIZE, NULL, &ts) !=
				    strlen(testmsg)) {
					fprintf(stderr, "could not receive test message\n");
					par->shutdown = 1;
				}
			} else {
				if (mq_receive(par->testmq, par->recvtestmsg, MSG_SIZE, NULL) !=
				    strlen(testmsg)) {
					fprintf(stderr, "could not receive test message\n");
					par->shutdown = 1;
				}
			}
			/* ... Received the message: End of latency measurement */
			gettimeofday(&par->received, NULL);

			if (strcmp(testmsg, par->recvtestmsg)) {
				fprintf(stderr, "ERROR: Test message mismatch detected\n");
				fprintf(stderr, "  %s != %s\n", testmsg, par->recvtestmsg);
				par->shutdown = 1;
			}
			par->samples++;
			timersub(&par->received, &par->neighbor->sent,
			    &par->diff);

			if (par->diff.tv_usec < par->mindiff)
				par->mindiff = par->diff.tv_usec;
			if (par->diff.tv_usec > par->maxdiff)
				par->maxdiff = par->diff.tv_usec;
			par->sumdiff += (double) par->diff.tv_usec;
			if (par->tracelimit && par->maxdiff > par->tracelimit) {
				char tracing_enabled_file[MAX_PATH];

				strcpy(tracing_enabled_file, get_debugfileprefix());
				strcat(tracing_enabled_file, "tracing_enabled");
				int tracing_enabled =
				    open(tracing_enabled_file, O_WRONLY);
				if (tracing_enabled >= 0) {
					write(tracing_enabled, "0", 1);
					close(tracing_enabled);
				} else
					snprintf(par->error, sizeof(par->error),
					    "Could not access %s\n",
					    tracing_enabled_file);
				par->shutdown = 1;
				par->neighbor->shutdown = 1;
			}

			if (par->max_cycles && par->samples >= par->max_cycles)
				par->shutdown = 1;
			if (mustgetcpu)
				par->cpu = get_cpu();
			nanosleep(&par->delay, NULL);

			/* Tell receiver that we are ready for the next measurement */
			if (no_send == 1)
			{
				if (mq_send(par->syncmq, syncmsg, strlen(syncmsg), 1) != 0) {
					fprintf(stderr, "could not send sync message\n");
					par->shutdown = 1;
				}
			}
		}
	}
	par->stopped = 1;
	return NULL;
}


static void display_help(void)
{
	printf("pmqtest V %1.2f\n", VERSION_STRING);
	puts("Usage: pmqtest <options>");
	puts("Function: test POSIX message queue latency");
	puts(
	"Options:\n"
	"-a [NUM] --affinity        run thread #N on processor #N, if possible\n"
	"                           with NUM pin all threads to the processor NUM\n"
	"-b USEC  --breaktrace=USEC send break trace command when latency > USEC\n"
	"-d DIST  --distance=DIST   distance of thread intervals in us default=500\n"
	"-i INTV  --interval=INTV   base interval of thread in us default=1000\n"
	"-l LOOPS --loops=LOOPS     number of loops: default=0(endless)\n"
	"-p PRIO  --prio=PRIO       priority\n"
	"-S       --smp             SMP testing: options -a -t and same priority\n"
        "                           of all threads\n"
	"-t       --threads         one thread per available processor\n"
	"-t [NUM] --threads=NUM     number of threads:\n"
	"                           without NUM, threads = max_cpus\n"
	"                           without -t default = 1\n"
	"-T TO    --timeout=TO      use mq_timedreceive() instead of mq_receive()\n"
	"                           with timeout TO in seconds\n");
	exit(1);
}


static int setaffinity = AFFINITY_UNSPECIFIED;
static int affinity;
static int tracelimit;
static int priority;
static int num_threads = 1;
static int max_cycles;
static int interval = 1000;
static int distance = 500;
static int smp;
static int sameprio;
static int timeout;

static void process_options (int argc, char *argv[])
{
	int error = 0;
	int max_cpus = sysconf(_SC_NPROCESSORS_CONF);

	for (;;) {
		int option_index = 0;
		/** Options for getopt */
		static struct option long_options[] = {
			{"affinity", optional_argument, NULL, 'a'},
			{"breaktrace", required_argument, NULL, 'b'},
			{"distance", required_argument, NULL, 'd'},
			{"interval", required_argument, NULL, 'i'},
			{"loops", required_argument, NULL, 'l'},
			{"priority", required_argument, NULL, 'p'},
			{"smp", no_argument, NULL, 'S'},
			{"threads", optional_argument, NULL, 't'},
			{"timeout", required_argument, NULL, 'T'},
			{"help", no_argument, NULL, '?'},
			{NULL, 0, NULL, 0}
		};
		int c = getopt_long (argc, argv, "a::b:d:i:l:p:St::T:",
			long_options, &option_index);
		if (c == -1)
			break;
		switch (c) {
		case 'a':
			if (smp) {
				warn("-a ignored due to --smp\n");
				break;
			}
			if (optarg != NULL) {
				affinity = atoi(optarg);
				setaffinity = AFFINITY_SPECIFIED;
			} else if (optind<argc && atoi(argv[optind])) {
				affinity = atoi(argv[optind]);
				setaffinity = AFFINITY_SPECIFIED;
			} else {
				setaffinity = AFFINITY_USEALL;
			}
			break;
		case 'b': tracelimit = atoi(optarg); break;
		case 'd': distance = atoi(optarg); break;
		case 'i': interval = atoi(optarg); break;
		case 'l': max_cycles = atoi(optarg); break;
		case 'p': priority = atoi(optarg); break;
		case 'S':
			smp = 1;
			num_threads = max_cpus;
			setaffinity = AFFINITY_USEALL;
			break;
		case 't':
			if (smp) {
				warn("-t ignored due to --smp\n");
				break;
			}
			if (optarg != NULL)
				num_threads = atoi(optarg);
			else if (optind<argc && atoi(argv[optind]))
				num_threads = atoi(argv[optind]);
			else
				num_threads = max_cpus;
			break;
		case 'T': timeout = atoi(optarg); break;
		case '?': error = 1; break;
		}
	}

	if (setaffinity == AFFINITY_SPECIFIED) {
		if (affinity < 0)
			error = 1;
		if (affinity >= max_cpus) {
			fprintf(stderr, "ERROR: CPU #%d not found, only %d CPUs available\n",
			    affinity, max_cpus);
			error = 1;
		}
	}

	if (num_threads < 0 || num_threads > 255)
		error = 1;

	if (priority < 0 || priority > 99)
		error = 1;

	if (num_threads < 1)
		error = 1;

	if (priority && smp)
		sameprio = 1;

	if (timeout < 0)
		error = 1;

	if (error)
		display_help ();
}


static int volatile shutdown;

static void sighand(int sig)
{
	shutdown = 1;
}

int main(int argc, char *argv[])
{
	int i;
	int max_cpus = sysconf(_SC_NPROCESSORS_CONF);
	int oldsamples = 1;
	struct params *receiver = NULL;
	struct params *sender = NULL;
	sigset_t sigset;
	struct timespec maindelay;
	int oflag = O_CREAT|O_RDWR;
	struct mq_attr mqstat;

	memset(&mqstat, 0, sizeof(mqstat));
	mqstat.mq_maxmsg = 1;
	mqstat.mq_msgsize = 8;
	mqstat.mq_flags = 0;

	process_options(argc, argv);

	if (check_privs())
		return 1;

	if (mlockall(MCL_CURRENT|MCL_FUTURE) == -1) {
		perror("mlockall");
		return 1;
	}

	signal(SIGINT, sighand);
	signal(SIGTERM, sighand);

	receiver = calloc(num_threads, sizeof(struct params));
	sender = calloc(num_threads, sizeof(struct params));
	if (receiver == NULL || sender == NULL)
		goto nomem;

	for (i = 0; i < num_threads; i++) {
		char mqname[16];

		sprintf(mqname, SYNCMQ_NAME, i);
		receiver[i].syncmq = mq_open(mqname, oflag, 0777, &mqstat);
		if (receiver[i].syncmq == (mqd_t) -1) {
			fprintf(stderr, "could not open POSIX message queue #1\n");
			return 1;
		}
		sprintf(mqname, TESTMQ_NAME, i);
		receiver[i].testmq = mq_open(mqname, oflag, 0777, &mqstat);
		if (receiver[i].testmq == (mqd_t) -1) {
			fprintf(stderr, "could not open POSIX message queue #2\n");
			return 1;
		}

		receiver[i].mindiff = UINT_MAX;
		receiver[i].maxdiff = 0;
		receiver[i].sumdiff = 0.0;

		receiver[i].num = i;
		receiver[i].cpu = i;
		switch (setaffinity) {
		case AFFINITY_UNSPECIFIED: receiver[i].cpu = -1; break;
		case AFFINITY_SPECIFIED: receiver[i].cpu = affinity; break;
		case AFFINITY_USEALL: receiver[i].cpu = i % max_cpus; break;
		}
		receiver[i].priority = priority;
		receiver[i].tracelimit = tracelimit;
		if (priority > 1 && !sameprio)
			priority--;
		receiver[i].delay.tv_sec = interval / USEC_PER_SEC;
		receiver[i].delay.tv_nsec = (interval % USEC_PER_SEC) * 1000;
		interval += distance;
		receiver[i].max_cycles = max_cycles;
		receiver[i].sender = 0;
		receiver[i].neighbor = &sender[i];
		receiver[i].timeout = timeout;
		pthread_create(&receiver[i].threadid, NULL, pmqthread, &receiver[i]);
		memcpy(&sender[i], &receiver[i], sizeof(receiver[0]));
		sender[i].sender = 1;
		sender[i].neighbor = &receiver[i];
		pthread_create(&sender[i].threadid, NULL, pmqthread, &sender[i]);
	}

	maindelay.tv_sec = 0;
	maindelay.tv_nsec = 50000000; /* 50 ms */

	while (!shutdown) {
		int printed;
		int errorlines = 0;

		for (i = 0; i < num_threads; i++)
			shutdown |= receiver[i].shutdown | sender[i].shutdown;

		if (receiver[0].samples > oldsamples || shutdown) {
			for (i = 0; i < num_threads; i++) {
				printf("#%1d: ID%d, P%d, CPU%d, I%ld; #%1d: ID%d, P%d, CPU%d, Cycles %d\n",
				    i*2, receiver[i].tid, receiver[i].priority, receiver[i].cpu,
				    receiver[i].delay.tv_nsec / 1000,
				    i*2+1, sender[i].tid, sender[i].priority, sender[i].cpu,
				    sender[i].samples);
			}
			for (i = 0; i < num_threads; i++) {
				printf("#%d -> #%d, Min %4d, Cur %4d, Avg %4d, Max %4d\n",
					i*2+1, i*2,
					receiver[i].mindiff, (int) receiver[i].diff.tv_usec,
					(int) ((receiver[i].sumdiff / receiver[i].samples) + 0.5),
					receiver[i].maxdiff);
				if (receiver[i].error[0] != '\0') {
					printf(receiver[i].error);
					errorlines++;
					receiver[i].error[0] = '\0';
				}
				if (sender[i].error[0] != '\0') {
					printf(sender[i].error);
					errorlines++;
					receiver[i].error[0] = '\0';
				}
			}
			printed = 1;
		} else
			printed = 0;

		sigemptyset(&sigset);
		sigaddset(&sigset, SIGTERM);
		sigaddset(&sigset, SIGINT);
		pthread_sigmask(SIG_SETMASK, &sigset, NULL);

		nanosleep(&maindelay, NULL);

		sigemptyset(&sigset);
		pthread_sigmask(SIG_SETMASK, &sigset, NULL);

		if (printed && !shutdown)
			printf("\033[%dA", num_threads*2 + errorlines);
	}

	for (i = 0; i < num_threads; i++) {
		receiver[i].shutdown = 1;
		sender[i].shutdown = 1;
	}
	nanosleep(&receiver[0].delay, NULL);

	for (i = 0; i < num_threads; i++) {
		mq_close(receiver[i].syncmq);
		mq_close(receiver[i].testmq);
		if (!receiver[i].stopped)
			pthread_kill(receiver[i].threadid, SIGTERM);
		if (!sender[i].stopped)
			pthread_kill(sender[i].threadid, SIGTERM);
	}

	nomem:

	return 0;
}


      reply	other threads:[~2010-03-30  7:42 UTC|newest]

Thread overview: 11+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2010-03-24 12:27 mq_timedrecieve timeout accuracy Pradyumna Sampath
2010-03-24 13:12 ` John Kacur
2010-03-24 13:21 ` Sujit K M
2010-03-24 13:22 ` M. Koehrer
2010-03-24 13:37   ` Pradyumna Sampath
2010-03-24 13:45     ` Sujit K M
2010-03-24 13:47       ` Pradyumna Sampath
2010-03-24 14:03     ` Pradyumna Sampath
2010-03-24 15:46       ` [PATCH] " Pradyumna Sampath
2010-03-29 15:08         ` [PATCH] " Carsten Emde
2010-03-30  7:41           ` Pradyumna Sampath [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=m2u6d09081c1003300041h2bb7bdf6wb30773e513284f31@mail.gmail.com \
    --to=pradysam@gmail.com \
    --cc=Carsten.Emde@osadl.org \
    --cc=linux-rt-users@vger.kernel.org \
    --cc=mathias_koehrer@arcor.de \
    --cc=rachana.rao@in.abb.com \
    --cc=tglx@linutronix.de \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.