All of lore.kernel.org
 help / color / mirror / Atom feed
* [Xenomai-core] RFC: use a pool of pre-allocated buffers in rt_printf
@ 2011-06-22 21:55 Gilles Chanteperdrix
  2011-06-23  7:38 ` Jan Kiszka
  0 siblings, 1 reply; 10+ messages in thread
From: Gilles Chanteperdrix @ 2011-06-22 21:55 UTC (permalink / raw)
  To: Xenomai core


Hi,

I would like to better integrate rtdk and the posix skin by forcibly 
wrapping the calls to malloc/free and also wrap printf to call 
rt_printf.

However, currently, rt_printf can not really be used as a drop-in 
replacement of printf:
- it is necessary to call rt_printf_auto_init, we can do it in the 
library init code;
- the first rt_printf causes a memory allocation, so a potential switch 
to secondary mode, which is what using rt_printf was trying to avoid in
the first place.

In order to solve this second issue, and to avoid forcibly creating a 
buffer for each thread, which would be wasteful, here is a patch, which, 
following an idea of Philippe, creates a pool of pre-allocated buffers. 
The pool is handled in a lockless fashion, using xnarch_atomic_cmpxchg 
(so, will only work with CONFIG_XENO_FASTSYNCH).

diff --git a/src/rtdk/rt_print.c b/src/rtdk/rt_print.c
index 93b711a..2ed2209 100644
--- a/src/rtdk/rt_print.c
+++ b/src/rtdk/rt_print.c
@@ -30,6 +30,8 @@
 #include <rtdk.h>
 #include <asm/xenomai/system.h>
 #include <asm-generic/stack.h>
+#include <asm/xenomai/atomic.h>
+#include <asm-generic/bits/current.h>
 
 #define RT_PRINT_BUFFER_ENV		"RT_PRINT_BUFFER"
 #define RT_PRINT_DEFAULT_BUFFER		16*1024
@@ -37,6 +39,9 @@
 #define RT_PRINT_PERIOD_ENV		"RT_PRINT_PERIOD"
 #define RT_PRINT_DEFAULT_PERIOD		100 /* ms */
 
+#define RT_PRINT_BUFFERS_COUNT_ENV      "RT_PRINT_BUFFERS_COUNT"
+#define RT_PRINT_DEFAULT_BUFFERS_COUNT  4
+
 #define RT_PRINT_LINE_BREAK		256
 
 #define RT_PRINT_SYSLOG_STREAM		NULL
@@ -63,6 +68,9 @@ struct print_buffer {
 	 * caching on SMP.
 	 */
 	off_t read_pos;
+#ifdef CONFIG_XENO_FASTSYNCH
+	struct print_buffer *pool_next;
+#endif /* CONFIG_XENO_FASTSYNCH */
 };
 
 static struct print_buffer *first_buffer;
@@ -75,6 +83,17 @@ static pthread_mutex_t buffer_lock;
 static pthread_cond_t printer_wakeup;
 static pthread_key_t buffer_key;
 static pthread_t printer_thread;
+#ifdef CONFIG_XENO_FASTSYNCH
+static xnarch_atomic_t buffer_pool;
+static unsigned pool_capacity;
+static xnarch_atomic_t pool_count;
+
+#define buf_pool_set(buf) xnarch_atomic_set(&buffer_pool, (unsigned long)buf)
+#define buf_pool_get() (struct print_buffer *)xnarch_atomic_get(&buffer_pool)
+#define buf_pool_cmpxchg(oldval, newval) \
+	((struct print_buffer *)xnarch_atomic_cmpxchg			\
+	 (&buffer_pool, (unsigned long)oldval, (unsigned long)newval))
+#endif /* CONFIG_XENO_FASTSYNCH */
 
 static void cleanup_buffer(struct print_buffer *buffer);
 static void print_buffers(void);
@@ -243,43 +262,28 @@ static void set_buffer_name(struct print_buffer *buffer, const char *name)
 	}
 }
 
-int rt_print_init(size_t buffer_size, const char *buffer_name)
+static struct print_buffer *rt_print_init_inner(size_t size)
 {
-	struct print_buffer *buffer = pthread_getspecific(buffer_key);
-	size_t size = buffer_size;
-
-	if (!size)
-		size = default_buffer_size;
-	else if (size < RT_PRINT_LINE_BREAK)
-		return EINVAL;
+	struct print_buffer *buffer;
 
-	if (buffer) {
-		/* Only set name if buffer size is unchanged or default */
-		if (size == buffer->size || !buffer_size) {
-			set_buffer_name(buffer, buffer_name);
-			return 0;
-		}
-		cleanup_buffer(buffer);
-	}
+	assert_nrt();
 
 	buffer = malloc(sizeof(*buffer));
 	if (!buffer)
-		return ENOMEM;
+		return NULL;
 
 	buffer->ring = malloc(size);
 	if (!buffer->ring) {
 		free(buffer);
-		return ENOMEM;
+		return NULL;
 	}
+	buffer->size = size;
+
 	memset(buffer->ring, 0, size);
 
 	buffer->read_pos  = 0;
 	buffer->write_pos = 0;
 
-	buffer->size = size;
-
-	set_buffer_name(buffer, buffer_name);
-
 	buffer->prev = NULL;
 
 	pthread_mutex_lock(&buffer_lock);
@@ -294,6 +298,52 @@ int rt_print_init(size_t buffer_size, const char *buffer_name)
 
 	pthread_mutex_unlock(&buffer_lock);
 
+	return buffer;
+}
+
+int rt_print_init(size_t buffer_size, const char *buffer_name)
+{
+	struct print_buffer *buffer = pthread_getspecific(buffer_key);
+	size_t size = buffer_size;
+
+	if (!size)
+		size = default_buffer_size;
+	else if (size < RT_PRINT_LINE_BREAK)
+		return EINVAL;
+
+	if (buffer) {
+		/* Only set name if buffer size is unchanged or default */
+		if (size == buffer->size || !buffer_size) {
+			set_buffer_name(buffer, buffer_name);
+			return 0;
+		}
+		cleanup_buffer(buffer);
+		buffer = NULL;
+	}
+
+#ifdef CONFIG_XENO_FASTSYNCH
+	if (xeno_get_current() != XN_NO_HANDLE &&
+	    !(xeno_get_current_mode() & XNRELAX) && buf_pool_get()) {
+		struct print_buffer *old;
+
+		old = buf_pool_get();
+		while (old != buffer) {
+			buffer = old;
+			old = buf_pool_cmpxchg(buffer, buffer->pool_next);
+		}
+		if (buffer)
+			xnarch_atomic_dec(&pool_count);
+	}
+#endif /* CONFIG_XENO_FASTSYNCH */
+
+	if (!buffer)
+		buffer = rt_print_init_inner(size);
+
+	if (!buffer)
+		return ENOMEM;
+
+	set_buffer_name(buffer, buffer_name);
+
 	pthread_setspecific(buffer_key, buffer);
 
 	return 0;
@@ -346,12 +396,36 @@ static void cleanup_buffer(struct print_buffer *buffer)
 {
 	struct print_buffer *prev, *next;
 
+	assert_nrt();
+
 	pthread_setspecific(buffer_key, NULL);
 
 	pthread_mutex_lock(&buffer_lock);
 
 	print_buffers();
 
+	pthread_mutex_unlock(&buffer_lock);
+
+#ifdef CONFIG_XENO_FASTSYNCH
+	{
+		struct print_buffer *old;
+
+		old = buf_pool_get();
+		buffer->pool_next = buffer;
+		while (old != buffer->next
+		       && xnarch_atomic_get(&pool_count) < pool_capacity) {
+			buffer->next = old;
+			old = buf_pool_cmpxchg(buffer->pool_next, buffer);
+		}
+		if (old == buffer->pool_next) {
+			xnarch_atomic_inc(&pool_count);
+			return;
+		}
+	}
+#endif /* CONFIG_XENO_FASTSYNCH */
+
+	pthread_mutex_lock(&buffer_lock);
+
 	prev = buffer->prev;
 	next = buffer->next;
 
@@ -523,6 +597,39 @@ void __rt_print_init(void)
 	print_period.tv_sec  = period / 1000;
 	print_period.tv_nsec = (period % 1000) * 1000000;
 
+#ifdef CONFIG_XENO_FASTSYNCH
+	{
+		unsigned i;
+
+		pool_capacity = RT_PRINT_DEFAULT_BUFFERS_COUNT;
+
+		value_str = getenv(RT_PRINT_BUFFERS_COUNT_ENV);
+		if (value_str) {
+			errno = 0;
+			pool_capacity = strtoul(value_str, NULL, 0);
+			if (errno) {
+				fprintf(stderr, "Invalid %s\n",
+					RT_PRINT_BUFFERS_COUNT_ENV);
+				exit(1);
+			}
+		}
+		for (i = 0; i < pool_capacity; i++) {
+			struct print_buffer *buffer;
+
+			buffer = rt_print_init_inner(default_buffer_size);
+			if (!buffer) {
+				fprintf(stderr, "Error allocating rt_printf "
+					"buffer\n");
+				exit(1);
+			}
+
+			buffer->pool_next = buf_pool_get();
+			buf_pool_set(buffer);
+		}
+		xnarch_atomic_set(&pool_count, pool_capacity);
+	}
+#endif /* CONFIG_XENO_FASTSYNCH */
+
 	pthread_mutex_init(&buffer_lock, NULL);
 	pthread_key_create(&buffer_key, (void (*)(void*))cleanup_buffer);
 


-- 
                                                                Gilles.


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* Re: [Xenomai-core] RFC: use a pool of pre-allocated buffers in rt_printf
  2011-06-22 21:55 [Xenomai-core] RFC: use a pool of pre-allocated buffers in rt_printf Gilles Chanteperdrix
@ 2011-06-23  7:38 ` Jan Kiszka
  2011-06-23  9:05   ` Gilles Chanteperdrix
  0 siblings, 1 reply; 10+ messages in thread
From: Jan Kiszka @ 2011-06-23  7:38 UTC (permalink / raw)
  To: Gilles Chanteperdrix; +Cc: Xenomai core

[-- Attachment #1: Type: text/plain, Size: 8412 bytes --]

On 2011-06-22 23:55, Gilles Chanteperdrix wrote:
> 
> Hi,
> 
> I would like to better integrate rtdk and the posix skin by forcibly 
> wrapping the calls to malloc/free and also wrap printf to call 
> rt_printf.
> 
> However, currently, rt_printf can not really be used as a drop-in 
> replacement of printf:
> - it is necessary to call rt_printf_auto_init, we can do it in the 
> library init code;
> - the first rt_printf causes a memory allocation, so a potential switch 
> to secondary mode, which is what using rt_printf was trying to avoid in
> the first place.
> 
> In order to solve this second issue, and to avoid forcibly creating a 
> buffer for each thread, which would be wasteful, here is a patch, which, 
> following an idea of Philippe, creates a pool of pre-allocated buffers. 
> The pool is handled in a lockless fashion, using xnarch_atomic_cmpxchg 
> (so, will only work with CONFIG_XENO_FASTSYNCH).

Makes sense.

> 
> diff --git a/src/rtdk/rt_print.c b/src/rtdk/rt_print.c
> index 93b711a..2ed2209 100644
> --- a/src/rtdk/rt_print.c
> +++ b/src/rtdk/rt_print.c
> @@ -30,6 +30,8 @@
>  #include <rtdk.h>
>  #include <asm/xenomai/system.h>
>  #include <asm-generic/stack.h>
> +#include <asm/xenomai/atomic.h>
> +#include <asm-generic/bits/current.h>
>  
>  #define RT_PRINT_BUFFER_ENV		"RT_PRINT_BUFFER"
>  #define RT_PRINT_DEFAULT_BUFFER		16*1024
> @@ -37,6 +39,9 @@
>  #define RT_PRINT_PERIOD_ENV		"RT_PRINT_PERIOD"
>  #define RT_PRINT_DEFAULT_PERIOD		100 /* ms */
>  
> +#define RT_PRINT_BUFFERS_COUNT_ENV      "RT_PRINT_BUFFERS_COUNT"
> +#define RT_PRINT_DEFAULT_BUFFERS_COUNT  4
> +
>  #define RT_PRINT_LINE_BREAK		256
>  
>  #define RT_PRINT_SYSLOG_STREAM		NULL
> @@ -63,6 +68,9 @@ struct print_buffer {
>  	 * caching on SMP.
>  	 */
>  	off_t read_pos;
> +#ifdef CONFIG_XENO_FASTSYNCH
> +	struct print_buffer *pool_next;
> +#endif /* CONFIG_XENO_FASTSYNCH */
>  };
>  
>  static struct print_buffer *first_buffer;
> @@ -75,6 +83,17 @@ static pthread_mutex_t buffer_lock;
>  static pthread_cond_t printer_wakeup;
>  static pthread_key_t buffer_key;
>  static pthread_t printer_thread;
> +#ifdef CONFIG_XENO_FASTSYNCH
> +static xnarch_atomic_t buffer_pool;
> +static unsigned pool_capacity;
> +static xnarch_atomic_t pool_count;
> +
> +#define buf_pool_set(buf) xnarch_atomic_set(&buffer_pool, (unsigned long)buf)
> +#define buf_pool_get() (struct print_buffer *)xnarch_atomic_get(&buffer_pool)
> +#define buf_pool_cmpxchg(oldval, newval) \
> +	((struct print_buffer *)xnarch_atomic_cmpxchg			\
> +	 (&buffer_pool, (unsigned long)oldval, (unsigned long)newval))

static inlines should work as well and document input/output types a bit
better.

> +#endif /* CONFIG_XENO_FASTSYNCH */
>  
>  static void cleanup_buffer(struct print_buffer *buffer);
>  static void print_buffers(void);
> @@ -243,43 +262,28 @@ static void set_buffer_name(struct print_buffer *buffer, const char *name)
>  	}
>  }
>  
> -int rt_print_init(size_t buffer_size, const char *buffer_name)
> +static struct print_buffer *rt_print_init_inner(size_t size)
>  {
> -	struct print_buffer *buffer = pthread_getspecific(buffer_key);
> -	size_t size = buffer_size;
> -
> -	if (!size)
> -		size = default_buffer_size;
> -	else if (size < RT_PRINT_LINE_BREAK)
> -		return EINVAL;
> +	struct print_buffer *buffer;
>  
> -	if (buffer) {
> -		/* Only set name if buffer size is unchanged or default */
> -		if (size == buffer->size || !buffer_size) {
> -			set_buffer_name(buffer, buffer_name);
> -			return 0;
> -		}
> -		cleanup_buffer(buffer);
> -	}
> +	assert_nrt();
>  
>  	buffer = malloc(sizeof(*buffer));
>  	if (!buffer)
> -		return ENOMEM;
> +		return NULL;
>  
>  	buffer->ring = malloc(size);
>  	if (!buffer->ring) {
>  		free(buffer);
> -		return ENOMEM;
> +		return NULL;
>  	}
> +	buffer->size = size;
> +
>  	memset(buffer->ring, 0, size);
>  
>  	buffer->read_pos  = 0;
>  	buffer->write_pos = 0;
>  
> -	buffer->size = size;
> -
> -	set_buffer_name(buffer, buffer_name);
> -
>  	buffer->prev = NULL;
>  
>  	pthread_mutex_lock(&buffer_lock);
> @@ -294,6 +298,52 @@ int rt_print_init(size_t buffer_size, const char *buffer_name)
>  
>  	pthread_mutex_unlock(&buffer_lock);
>  
> +	return buffer;
> +}
> +
> +int rt_print_init(size_t buffer_size, const char *buffer_name)
> +{
> +	struct print_buffer *buffer = pthread_getspecific(buffer_key);
> +	size_t size = buffer_size;
> +
> +	if (!size)
> +		size = default_buffer_size;
> +	else if (size < RT_PRINT_LINE_BREAK)
> +		return EINVAL;
> +
> +	if (buffer) {
> +		/* Only set name if buffer size is unchanged or default */
> +		if (size == buffer->size || !buffer_size) {
> +			set_buffer_name(buffer, buffer_name);
> +			return 0;
> +		}
> +		cleanup_buffer(buffer);
> +		buffer = NULL;
> +	}
> +
> +#ifdef CONFIG_XENO_FASTSYNCH
> +	if (xeno_get_current() != XN_NO_HANDLE &&
> +	    !(xeno_get_current_mode() & XNRELAX) && buf_pool_get()) {
> +		struct print_buffer *old;
> +
> +		old = buf_pool_get();
> +		while (old != buffer) {
> +			buffer = old;
> +			old = buf_pool_cmpxchg(buffer, buffer->pool_next);

Though unlikely, it's still possible: The buffer obtained in the last
round may have been dequeued meanwhile and then freed (in case a third
buffer was released to the pool, filling it up to pool_capacity).

A simple solution would be not freeing buffer even of the pool became
large than its initial capacity. Algorithms to solve that race are more
complicated. E.g.: Exchange with a dummy element, then read next and
update the head. Other readers need to spin while finding the dummy.

> +		}
> +		if (buffer)
> +			xnarch_atomic_dec(&pool_count);
> +	}
> +#endif /* CONFIG_XENO_FASTSYNCH */
> +
> +	if (!buffer)
> +		buffer = rt_print_init_inner(size);
> +
> +	if (!buffer)
> +		return ENOMEM;
> +
> +	set_buffer_name(buffer, buffer_name);
> +
>  	pthread_setspecific(buffer_key, buffer);
>  
>  	return 0;
> @@ -346,12 +396,36 @@ static void cleanup_buffer(struct print_buffer *buffer)
>  {
>  	struct print_buffer *prev, *next;
>  
> +	assert_nrt();
> +
>  	pthread_setspecific(buffer_key, NULL);
>  
>  	pthread_mutex_lock(&buffer_lock);
>  
>  	print_buffers();

You also need to unhook the buffer from the active list before returning
it to the pool.

>  
> +	pthread_mutex_unlock(&buffer_lock);
> +
> +#ifdef CONFIG_XENO_FASTSYNCH
> +	{
> +		struct print_buffer *old;
> +
> +		old = buf_pool_get();
> +		buffer->pool_next = buffer;
> +		while (old != buffer->next
> +		       && xnarch_atomic_get(&pool_count) < pool_capacity) {
> +			buffer->next = old;
> +			old = buf_pool_cmpxchg(buffer->pool_next, buffer);

This looks strange:

	buffer->pool_next = buffer;
	...
	buf_pool_cmpxchg(buffer->pool_next, buffer);

Don't you want

	buffer->pool_next = old;
	buf_pool_cmpxchg(old, buffer);

?

> +		}
> +		if (old == buffer->pool_next) {
> +			xnarch_atomic_inc(&pool_count);
> +			return;
> +		}
> +	}
> +#endif /* CONFIG_XENO_FASTSYNCH */
> +
> +	pthread_mutex_lock(&buffer_lock);
> +
>  	prev = buffer->prev;
>  	next = buffer->next;
>  
> @@ -523,6 +597,39 @@ void __rt_print_init(void)
>  	print_period.tv_sec  = period / 1000;
>  	print_period.tv_nsec = (period % 1000) * 1000000;
>  
> +#ifdef CONFIG_XENO_FASTSYNCH
> +	{
> +		unsigned i;
> +
> +		pool_capacity = RT_PRINT_DEFAULT_BUFFERS_COUNT;
> +
> +		value_str = getenv(RT_PRINT_BUFFERS_COUNT_ENV);
> +		if (value_str) {
> +			errno = 0;
> +			pool_capacity = strtoul(value_str, NULL, 0);
> +			if (errno) {
> +				fprintf(stderr, "Invalid %s\n",
> +					RT_PRINT_BUFFERS_COUNT_ENV);
> +				exit(1);
> +			}
> +		}
> +		for (i = 0; i < pool_capacity; i++) {
> +			struct print_buffer *buffer;
> +
> +			buffer = rt_print_init_inner(default_buffer_size);
> +			if (!buffer) {
> +				fprintf(stderr, "Error allocating rt_printf "
> +					"buffer\n");
> +				exit(1);
> +			}
> +
> +			buffer->pool_next = buf_pool_get();
> +			buf_pool_set(buffer);
> +		}
> +		xnarch_atomic_set(&pool_count, pool_capacity);
> +	}
> +#endif /* CONFIG_XENO_FASTSYNCH */
> +
>  	pthread_mutex_init(&buffer_lock, NULL);
>  	pthread_key_create(&buffer_key, (void (*)(void*))cleanup_buffer);
>  
> 
> 

Jan


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 259 bytes --]

^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [Xenomai-core] RFC: use a pool of pre-allocated buffers in rt_printf
  2011-06-23  7:38 ` Jan Kiszka
@ 2011-06-23  9:05   ` Gilles Chanteperdrix
  2011-06-23  9:09     ` Jan Kiszka
  0 siblings, 1 reply; 10+ messages in thread
From: Gilles Chanteperdrix @ 2011-06-23  9:05 UTC (permalink / raw)
  To: Jan Kiszka; +Cc: Xenomai core

On 06/23/2011 09:38 AM, Jan Kiszka wrote:
>> +#ifdef CONFIG_XENO_FASTSYNCH
>> +	if (xeno_get_current() != XN_NO_HANDLE &&
>> +	    !(xeno_get_current_mode() & XNRELAX) && buf_pool_get()) {
>> +		struct print_buffer *old;
>> +
>> +		old = buf_pool_get();
>> +		while (old != buffer) {
>> +			buffer = old;
>> +			old = buf_pool_cmpxchg(buffer, buffer->pool_next);
> 
> Though unlikely, it's still possible: The buffer obtained in the last
> round may have been dequeued meanwhile and then freed (in case a third
> buffer was released to the pool, filling it up to pool_capacity).

I do not get it: it seems to me that if the current head (that is
buf_pool_get()) is freed, then the cmpxchg will fail, so we will loop
and try again.

>> @@ -346,12 +396,36 @@ static void cleanup_buffer(struct print_buffer *buffer)
>>  {
>>  	struct print_buffer *prev, *next;
>>  
>> +	assert_nrt();
>> +
>>  	pthread_setspecific(buffer_key, NULL);
>>  
>>  	pthread_mutex_lock(&buffer_lock);
>>  
>>  	print_buffers();
> 
> You also need to unhook the buffer from the active list before returning
> it to the pool.

We can not do that: we want that when rt_print_init takes a buffer from
the pool, it does not need to do any operation which would need to
switch to secondary mode. This includes getting the active list mutex.
So, the buffer in the pool are also in the active list, but they remain
empty, so, nothing will be printed.

> This looks strange:
> 
> 	buffer->pool_next = buffer;
> 	...
> 	buf_pool_cmpxchg(buffer->pool_next, buffer);
> 
> Don't you want
> 
> 	buffer->pool_next = old;
> 	buf_pool_cmpxchg(old, buffer);

Yes, search and replace error.

-- 
					    Gilles.


^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [Xenomai-core] RFC: use a pool of pre-allocated buffers in rt_printf
  2011-06-23  9:05   ` Gilles Chanteperdrix
@ 2011-06-23  9:09     ` Jan Kiszka
  2011-06-23 11:29       ` Gilles Chanteperdrix
  0 siblings, 1 reply; 10+ messages in thread
From: Jan Kiszka @ 2011-06-23  9:09 UTC (permalink / raw)
  To: Gilles Chanteperdrix; +Cc: Xenomai core

[-- Attachment #1: Type: text/plain, Size: 1737 bytes --]

On 2011-06-23 11:05, Gilles Chanteperdrix wrote:
> On 06/23/2011 09:38 AM, Jan Kiszka wrote:
>>> +#ifdef CONFIG_XENO_FASTSYNCH
>>> +	if (xeno_get_current() != XN_NO_HANDLE &&
>>> +	    !(xeno_get_current_mode() & XNRELAX) && buf_pool_get()) {
>>> +		struct print_buffer *old;
>>> +
>>> +		old = buf_pool_get();
>>> +		while (old != buffer) {
>>> +			buffer = old;
>>> +			old = buf_pool_cmpxchg(buffer, buffer->pool_next);
>>
>> Though unlikely, it's still possible: The buffer obtained in the last
>> round may have been dequeued meanwhile and then freed (in case a third
>> buffer was released to the pool, filling it up to pool_capacity).
> 
> I do not get it: it seems to me that if the current head (that is
> buf_pool_get()) is freed, then the cmpxchg will fail, so we will loop
> and try again.

Problematic is the dereferencing of the stale buffer pointer obtained
during the last cmpxchg or via buf_pool_get. This happens before the new
cmpxchg.

> 
>>> @@ -346,12 +396,36 @@ static void cleanup_buffer(struct print_buffer *buffer)
>>>  {
>>>  	struct print_buffer *prev, *next;
>>>  
>>> +	assert_nrt();
>>> +
>>>  	pthread_setspecific(buffer_key, NULL);
>>>  
>>>  	pthread_mutex_lock(&buffer_lock);
>>>  
>>>  	print_buffers();
>>
>> You also need to unhook the buffer from the active list before returning
>> it to the pool.
> 
> We can not do that: we want that when rt_print_init takes a buffer from
> the pool, it does not need to do any operation which would need to
> switch to secondary mode. This includes getting the active list mutex.
> So, the buffer in the pool are also in the active list, but they remain
> empty, so, nothing will be printed.

Ah, I see.

Jan


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 259 bytes --]

^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [Xenomai-core] RFC: use a pool of pre-allocated buffers in rt_printf
  2011-06-23  9:09     ` Jan Kiszka
@ 2011-06-23 11:29       ` Gilles Chanteperdrix
  2011-06-23 11:36         ` Jan Kiszka
  0 siblings, 1 reply; 10+ messages in thread
From: Gilles Chanteperdrix @ 2011-06-23 11:29 UTC (permalink / raw)
  To: Jan Kiszka; +Cc: Xenomai core

On 06/23/2011 11:09 AM, Jan Kiszka wrote:
> On 2011-06-23 11:05, Gilles Chanteperdrix wrote:
>> On 06/23/2011 09:38 AM, Jan Kiszka wrote:
>>>> +#ifdef CONFIG_XENO_FASTSYNCH
>>>> +	if (xeno_get_current() != XN_NO_HANDLE &&
>>>> +	    !(xeno_get_current_mode() & XNRELAX) && buf_pool_get()) {
>>>> +		struct print_buffer *old;
>>>> +
>>>> +		old = buf_pool_get();
>>>> +		while (old != buffer) {
>>>> +			buffer = old;
>>>> +			old = buf_pool_cmpxchg(buffer, buffer->pool_next);
>>>
>>> Though unlikely, it's still possible: The buffer obtained in the last
>>> round may have been dequeued meanwhile and then freed (in case a third
>>> buffer was released to the pool, filling it up to pool_capacity).
>>
>> I do not get it: it seems to me that if the current head (that is
>> buf_pool_get()) is freed, then the cmpxchg will fail, so we will loop
>> and try again.
> 
> Problematic is the dereferencing of the stale buffer pointer obtained
> during the last cmpxchg or via buf_pool_get. This happens before the new
> cmpxchg.

Ok. Got it, that would be a problem only if the stale pointer pointed to
an unmapped area, but ok, better avoid freeing the buffers. I guess it
would not be a problem as applications tend to have a fixed number of
threads anyway.

-- 
                                                                Gilles.


^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [Xenomai-core] RFC: use a pool of pre-allocated buffers in rt_printf
  2011-06-23 11:29       ` Gilles Chanteperdrix
@ 2011-06-23 11:36         ` Jan Kiszka
  2011-06-23 11:40           ` Gilles Chanteperdrix
  2011-06-24 19:58           ` Gilles Chanteperdrix
  0 siblings, 2 replies; 10+ messages in thread
From: Jan Kiszka @ 2011-06-23 11:36 UTC (permalink / raw)
  To: Gilles Chanteperdrix; +Cc: Xenomai core

[-- Attachment #1: Type: text/plain, Size: 1576 bytes --]

On 2011-06-23 13:29, Gilles Chanteperdrix wrote:
> On 06/23/2011 11:09 AM, Jan Kiszka wrote:
>> On 2011-06-23 11:05, Gilles Chanteperdrix wrote:
>>> On 06/23/2011 09:38 AM, Jan Kiszka wrote:
>>>>> +#ifdef CONFIG_XENO_FASTSYNCH
>>>>> +	if (xeno_get_current() != XN_NO_HANDLE &&
>>>>> +	    !(xeno_get_current_mode() & XNRELAX) && buf_pool_get()) {
>>>>> +		struct print_buffer *old;
>>>>> +
>>>>> +		old = buf_pool_get();
>>>>> +		while (old != buffer) {
>>>>> +			buffer = old;
>>>>> +			old = buf_pool_cmpxchg(buffer, buffer->pool_next);
>>>>
>>>> Though unlikely, it's still possible: The buffer obtained in the last
>>>> round may have been dequeued meanwhile and then freed (in case a third
>>>> buffer was released to the pool, filling it up to pool_capacity).
>>>
>>> I do not get it: it seems to me that if the current head (that is
>>> buf_pool_get()) is freed, then the cmpxchg will fail, so we will loop
>>> and try again.
>>
>> Problematic is the dereferencing of the stale buffer pointer obtained
>> during the last cmpxchg or via buf_pool_get. This happens before the new
>> cmpxchg.
> 
> Ok. Got it, that would be a problem only if the stale pointer pointed to
> an unmapped area, but ok, better avoid freeing the buffers. I guess it
> would not be a problem as applications tend to have a fixed number of
> threads anyway.

That is a risky assumption, proven wrong e.g. by one of our
applications. Threads may always be created or destroyed depending on
the mode of an application, specifically if it is a larger one.

Jan


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 259 bytes --]

^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [Xenomai-core] RFC: use a pool of pre-allocated buffers in rt_printf
  2011-06-23 11:36         ` Jan Kiszka
@ 2011-06-23 11:40           ` Gilles Chanteperdrix
  2011-06-24 19:58           ` Gilles Chanteperdrix
  1 sibling, 0 replies; 10+ messages in thread
From: Gilles Chanteperdrix @ 2011-06-23 11:40 UTC (permalink / raw)
  To: Jan Kiszka; +Cc: Xenomai core

On 06/23/2011 01:36 PM, Jan Kiszka wrote:
> On 2011-06-23 13:29, Gilles Chanteperdrix wrote:
>> On 06/23/2011 11:09 AM, Jan Kiszka wrote:
>>> On 2011-06-23 11:05, Gilles Chanteperdrix wrote:
>>>> On 06/23/2011 09:38 AM, Jan Kiszka wrote:
>>>>>> +#ifdef CONFIG_XENO_FASTSYNCH
>>>>>> +	if (xeno_get_current() != XN_NO_HANDLE &&
>>>>>> +	    !(xeno_get_current_mode() & XNRELAX) && buf_pool_get()) {
>>>>>> +		struct print_buffer *old;
>>>>>> +
>>>>>> +		old = buf_pool_get();
>>>>>> +		while (old != buffer) {
>>>>>> +			buffer = old;
>>>>>> +			old = buf_pool_cmpxchg(buffer, buffer->pool_next);
>>>>>
>>>>> Though unlikely, it's still possible: The buffer obtained in the last
>>>>> round may have been dequeued meanwhile and then freed (in case a third
>>>>> buffer was released to the pool, filling it up to pool_capacity).
>>>>
>>>> I do not get it: it seems to me that if the current head (that is
>>>> buf_pool_get()) is freed, then the cmpxchg will fail, so we will loop
>>>> and try again.
>>>
>>> Problematic is the dereferencing of the stale buffer pointer obtained
>>> during the last cmpxchg or via buf_pool_get. This happens before the new
>>> cmpxchg.
>>
>> Ok. Got it, that would be a problem only if the stale pointer pointed to
>> an unmapped area, but ok, better avoid freeing the buffers. I guess it
>> would not be a problem as applications tend to have a fixed number of
>> threads anyway.
> 
> That is a risky assumption, proven wrong e.g. by one of our
> applications. Threads may always be created or destroyed depending on
> the mode of an application, specifically if it is a larger one.

The situation which would be problematic is to create many threads at
once, which use printf, then continues to run with just a few threads.
In that case, we would have many unfreed buffers. But as long as the
application creates and destroy threads over the time, the pool will
simply keep the buffers until next use.

> 
> Jan
> 


-- 
                                                                Gilles.


^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [Xenomai-core] RFC: use a pool of pre-allocated buffers in rt_printf
  2011-06-23 11:36         ` Jan Kiszka
  2011-06-23 11:40           ` Gilles Chanteperdrix
@ 2011-06-24 19:58           ` Gilles Chanteperdrix
  2011-06-27  6:32             ` Jan Kiszka
  1 sibling, 1 reply; 10+ messages in thread
From: Gilles Chanteperdrix @ 2011-06-24 19:58 UTC (permalink / raw)
  To: Jan Kiszka; +Cc: Xenomai core

On 06/23/2011 01:36 PM, Jan Kiszka wrote:
> On 2011-06-23 13:29, Gilles Chanteperdrix wrote:
>> On 06/23/2011 11:09 AM, Jan Kiszka wrote:
>>> On 2011-06-23 11:05, Gilles Chanteperdrix wrote:
>>>> On 06/23/2011 09:38 AM, Jan Kiszka wrote:
>>>>>> +#ifdef CONFIG_XENO_FASTSYNCH
>>>>>> +	if (xeno_get_current() != XN_NO_HANDLE &&
>>>>>> +	    !(xeno_get_current_mode() & XNRELAX) && buf_pool_get()) {
>>>>>> +		struct print_buffer *old;
>>>>>> +
>>>>>> +		old = buf_pool_get();
>>>>>> +		while (old != buffer) {
>>>>>> +			buffer = old;
>>>>>> +			old = buf_pool_cmpxchg(buffer, buffer->pool_next);
>>>>>
>>>>> Though unlikely, it's still possible: The buffer obtained in the last
>>>>> round may have been dequeued meanwhile and then freed (in case a third
>>>>> buffer was released to the pool, filling it up to pool_capacity).
>>>>
>>>> I do not get it: it seems to me that if the current head (that is
>>>> buf_pool_get()) is freed, then the cmpxchg will fail, so we will loop
>>>> and try again.
>>>
>>> Problematic is the dereferencing of the stale buffer pointer obtained
>>> during the last cmpxchg or via buf_pool_get. This happens before the new
>>> cmpxchg.
>>
>> Ok. Got it, that would be a problem only if the stale pointer pointed to
>> an unmapped area, but ok, better avoid freeing the buffers. I guess it
>> would not be a problem as applications tend to have a fixed number of
>> threads anyway.
> 
> That is a risky assumption, proven wrong e.g. by one of our
> applications. Threads may always be created or destroyed depending on
> the mode of an application, specifically if it is a larger one.

Here is another attempt, based on a bitmap.

diff --git a/src/rtdk/rt_print.c b/src/rtdk/rt_print.c
index 93b711a..fb4820f 100644
--- a/src/rtdk/rt_print.c
+++ b/src/rtdk/rt_print.c
@@ -28,7 +28,9 @@
 #include <syslog.h>
 
 #include <rtdk.h>
+#include <nucleus/types.h>	/* For BITS_PER_LONG */
 #include <asm/xenomai/system.h>
+#include <asm/xenomai/atomic.h>	/* For atomic_cmpxchg */
 #include <asm-generic/stack.h>
 
 #define RT_PRINT_BUFFER_ENV		"RT_PRINT_BUFFER"
@@ -37,6 +39,9 @@
 #define RT_PRINT_PERIOD_ENV		"RT_PRINT_PERIOD"
 #define RT_PRINT_DEFAULT_PERIOD		100 /* ms */
 
+#define RT_PRINT_BUFFERS_COUNT_ENV      "RT_PRINT_BUFFERS_COUNT"
+#define RT_PRINT_DEFAULT_BUFFERS_COUNT  4
+
 #define RT_PRINT_LINE_BREAK		256
 
 #define RT_PRINT_SYSLOG_STREAM		NULL
@@ -75,6 +80,12 @@ static pthread_mutex_t buffer_lock;
 static pthread_cond_t printer_wakeup;
 static pthread_key_t buffer_key;
 static pthread_t printer_thread;
+#ifdef CONFIG_XENO_FASTSYNCH
+static xnarch_atomic_t *pool_bitmap;
+static unsigned pool_bitmap_len;
+static unsigned pool_buf_size;
+static unsigned long pool_start, pool_len;
+#endif /* CONFIG_XENO_FASTSYNCH */
 
 static void cleanup_buffer(struct print_buffer *buffer);
 static void print_buffers(void);
@@ -243,10 +254,36 @@ static void set_buffer_name(struct print_buffer *buffer, const char *name)
 	}
 }
 
+void rt_print_init_inner(struct print_buffer *buffer, size_t size)
+{
+	buffer->size = size;
+
+	memset(buffer->ring, 0, size);
+
+	buffer->read_pos  = 0;
+	buffer->write_pos = 0;
+
+	buffer->prev = NULL;
+
+	pthread_mutex_lock(&buffer_lock);
+
+	buffer->next = first_buffer;
+	if (first_buffer)
+		first_buffer->prev = buffer;
+	first_buffer = buffer;
+
+	buffers++;
+	pthread_cond_signal(&printer_wakeup);
+
+	pthread_mutex_unlock(&buffer_lock);
+}
+
 int rt_print_init(size_t buffer_size, const char *buffer_name)
 {
 	struct print_buffer *buffer = pthread_getspecific(buffer_key);
 	size_t size = buffer_size;
+	unsigned long old_bitmap;
+	unsigned j;
 
 	if (!size)
 		size = default_buffer_size;
@@ -260,39 +297,56 @@ int rt_print_init(size_t buffer_size, const char *buffer_name)
 			return 0;
 		}
 		cleanup_buffer(buffer);
+		buffer = NULL;
 	}
 
-	buffer = malloc(sizeof(*buffer));
-	if (!buffer)
-		return ENOMEM;
+#ifdef CONFIG_XENO_FASTSYNCH
+	/* Find a free buffer in the pool */
+	do {
+		unsigned long bitmap;
+		unsigned i;
 
-	buffer->ring = malloc(size);
-	if (!buffer->ring) {
-		free(buffer);
-		return ENOMEM;
-	}
-	memset(buffer->ring, 0, size);
+		for (i = 0; i < pool_bitmap_len; i++) {
+			old_bitmap = xnarch_atomic_get(&pool_bitmap[i]);
+			if (old_bitmap)
+				goto acquire;
+		}
 
-	buffer->read_pos  = 0;
-	buffer->write_pos = 0;
+		goto not_found;
 
-	buffer->size = size;
+	  acquire:
+		do {
+			bitmap = old_bitmap;
+			j = __builtin_ffsl(bitmap) - 1;
+			old_bitmap = xnarch_atomic_cmpxchg(&pool_bitmap[i],
+							   bitmap,
+							   bitmap & ~(1UL << j));
+		} while (old_bitmap != bitmap && old_bitmap);
+		j += i * BITS_PER_LONG;
+	} while (!old_bitmap);
 
-	set_buffer_name(buffer, buffer_name);
+	buffer = (struct print_buffer *)(pool_start + j * pool_buf_size);
 
-	buffer->prev = NULL;
+  not_found:
+#endif /* CONFIG_XENO_FASTSYNCH */
 
-	pthread_mutex_lock(&buffer_lock);
+	if (!buffer) {
+		assert_nrt();
 
-	buffer->next = first_buffer;
-	if (first_buffer)
-		first_buffer->prev = buffer;
-	first_buffer = buffer;
+		buffer = malloc(sizeof(*buffer));
+		if (!buffer)
+			return ENOMEM;
 
-	buffers++;
-	pthread_cond_signal(&printer_wakeup);
+		buffer->ring = malloc(size);
+		if (!buffer->ring) {
+			free(buffer);
+			return ENOMEM;
+		}
 
-	pthread_mutex_unlock(&buffer_lock);
+		rt_print_init_inner(buffer, size);
+	}
+
+	set_buffer_name(buffer, buffer_name);
 
 	pthread_setspecific(buffer_key, buffer);
 
@@ -346,12 +400,44 @@ static void cleanup_buffer(struct print_buffer *buffer)
 {
 	struct print_buffer *prev, *next;
 
+	assert_nrt();
+
 	pthread_setspecific(buffer_key, NULL);
 
 	pthread_mutex_lock(&buffer_lock);
 
 	print_buffers();
 
+	pthread_mutex_unlock(&buffer_lock);
+
+#ifdef CONFIG_XENO_FASTSYNCH
+	/* Return the buffer to the pool */
+	{
+		unsigned long old_bitmap, bitmap;
+		unsigned i, j;
+
+		if ((unsigned long)buffer - pool_start >= pool_len)
+			goto dofree;
+
+		j = ((unsigned long)buffer - pool_start) / pool_buf_size;
+		i = j / BITS_PER_LONG;
+		j = j % BITS_PER_LONG;
+
+		old_bitmap = xnarch_atomic_get(&pool_bitmap[i]);
+		do {
+			bitmap = old_bitmap;
+			old_bitmap = xnarch_atomic_cmpxchg(&pool_bitmap[i],
+							   bitmap,
+							   bitmap | (1UL << j));
+		} while (old_bitmap != bitmap);
+
+		return;
+	}
+  dofree:
+#endif /* CONFIG_XENO_FASTSYNCH */
+
+	pthread_mutex_lock(&buffer_lock);
+
 	prev = buffer->prev;
 	next = buffer->next;
 
@@ -523,6 +609,64 @@ void __rt_print_init(void)
 	print_period.tv_sec  = period / 1000;
 	print_period.tv_nsec = (period % 1000) * 1000000;
 
+#ifdef CONFIG_XENO_FASTSYNCH
+	/* Fill the buffer pool */
+	{
+		unsigned buffers_count, i;
+
+		buffers_count = RT_PRINT_DEFAULT_BUFFERS_COUNT;
+
+		value_str = getenv(RT_PRINT_BUFFERS_COUNT_ENV);
+		if (value_str) {
+			errno = 0;
+			buffers_count = strtoul(value_str, NULL, 0);
+			if (errno) {
+				fprintf(stderr, "Invalid %s\n",
+					RT_PRINT_BUFFERS_COUNT_ENV);
+				exit(1);
+			}
+		}
+
+		pool_bitmap_len = (buffers_count + BITS_PER_LONG - 1)
+			/ BITS_PER_LONG;
+		if (!pool_bitmap_len)
+			goto done;
+
+		pool_bitmap = malloc(pool_bitmap_len * sizeof(*pool_bitmap));
+		if (!pool_bitmap) {
+			fprintf(stderr, "Error allocating rt_printf "
+				"buffers\n");
+			exit(1);
+		}
+
+		pool_buf_size = sizeof(struct print_buffer) + default_buffer_size;
+		pool_len = buffers_count * pool_buf_size;
+		pool_start = (unsigned long)malloc(pool_len);
+		if (!pool_start) {
+			fprintf(stderr, "Error allocating rt_printf "
+				"buffers\n");
+			exit(1);
+		}
+
+		for (i = 0; i < buffers_count / BITS_PER_LONG; i++)
+			xnarch_atomic_set(&pool_bitmap[i], ~0UL);
+		if (buffers_count % BITS_PER_LONG)
+			xnarch_atomic_set(&pool_bitmap[i],
+					  (1UL << (buffers_count % BITS_PER_LONG))					  - 1);
+
+		for (i = 0; i < buffers_count; i++) {
+			struct print_buffer *buffer =
+				(struct print_buffer *)
+				(pool_start + i * pool_buf_size);
+
+			buffer->ring = (char *)(buffer + 1);
+
+			rt_print_init_inner(buffer, default_buffer_size);
+		}
+	}
+  done:
+#endif /* CONFIG_XENO_FASTSYNCH */
+
 	pthread_mutex_init(&buffer_lock, NULL);
 	pthread_key_create(&buffer_key, (void (*)(void*))cleanup_buffer);
 

-- 
                                                                Gilles.


^ permalink raw reply related	[flat|nested] 10+ messages in thread

* Re: [Xenomai-core] RFC: use a pool of pre-allocated buffers in rt_printf
  2011-06-24 19:58           ` Gilles Chanteperdrix
@ 2011-06-27  6:32             ` Jan Kiszka
  2011-06-27 10:57               ` Gilles Chanteperdrix
  0 siblings, 1 reply; 10+ messages in thread
From: Jan Kiszka @ 2011-06-27  6:32 UTC (permalink / raw)
  To: Gilles Chanteperdrix; +Cc: Xenomai core

[-- Attachment #1: Type: text/plain, Size: 9285 bytes --]

On 2011-06-24 21:58, Gilles Chanteperdrix wrote:
> On 06/23/2011 01:36 PM, Jan Kiszka wrote:
>> On 2011-06-23 13:29, Gilles Chanteperdrix wrote:
>>> On 06/23/2011 11:09 AM, Jan Kiszka wrote:
>>>> On 2011-06-23 11:05, Gilles Chanteperdrix wrote:
>>>>> On 06/23/2011 09:38 AM, Jan Kiszka wrote:
>>>>>>> +#ifdef CONFIG_XENO_FASTSYNCH
>>>>>>> +	if (xeno_get_current() != XN_NO_HANDLE &&
>>>>>>> +	    !(xeno_get_current_mode() & XNRELAX) && buf_pool_get()) {
>>>>>>> +		struct print_buffer *old;
>>>>>>> +
>>>>>>> +		old = buf_pool_get();
>>>>>>> +		while (old != buffer) {
>>>>>>> +			buffer = old;
>>>>>>> +			old = buf_pool_cmpxchg(buffer, buffer->pool_next);
>>>>>>
>>>>>> Though unlikely, it's still possible: The buffer obtained in the last
>>>>>> round may have been dequeued meanwhile and then freed (in case a third
>>>>>> buffer was released to the pool, filling it up to pool_capacity).
>>>>>
>>>>> I do not get it: it seems to me that if the current head (that is
>>>>> buf_pool_get()) is freed, then the cmpxchg will fail, so we will loop
>>>>> and try again.
>>>>
>>>> Problematic is the dereferencing of the stale buffer pointer obtained
>>>> during the last cmpxchg or via buf_pool_get. This happens before the new
>>>> cmpxchg.
>>>
>>> Ok. Got it, that would be a problem only if the stale pointer pointed to
>>> an unmapped area, but ok, better avoid freeing the buffers. I guess it
>>> would not be a problem as applications tend to have a fixed number of
>>> threads anyway.
>>
>> That is a risky assumption, proven wrong e.g. by one of our
>> applications. Threads may always be created or destroyed depending on
>> the mode of an application, specifically if it is a larger one.
> 
> Here is another attempt, based on a bitmap.
> 
> diff --git a/src/rtdk/rt_print.c b/src/rtdk/rt_print.c
> index 93b711a..fb4820f 100644
> --- a/src/rtdk/rt_print.c
> +++ b/src/rtdk/rt_print.c
> @@ -28,7 +28,9 @@
>  #include <syslog.h>
>  
>  #include <rtdk.h>
> +#include <nucleus/types.h>	/* For BITS_PER_LONG */
>  #include <asm/xenomai/system.h>
> +#include <asm/xenomai/atomic.h>	/* For atomic_cmpxchg */
>  #include <asm-generic/stack.h>
>  
>  #define RT_PRINT_BUFFER_ENV		"RT_PRINT_BUFFER"
> @@ -37,6 +39,9 @@
>  #define RT_PRINT_PERIOD_ENV		"RT_PRINT_PERIOD"
>  #define RT_PRINT_DEFAULT_PERIOD		100 /* ms */
>  
> +#define RT_PRINT_BUFFERS_COUNT_ENV      "RT_PRINT_BUFFERS_COUNT"
> +#define RT_PRINT_DEFAULT_BUFFERS_COUNT  4
> +
>  #define RT_PRINT_LINE_BREAK		256
>  
>  #define RT_PRINT_SYSLOG_STREAM		NULL
> @@ -75,6 +80,12 @@ static pthread_mutex_t buffer_lock;
>  static pthread_cond_t printer_wakeup;
>  static pthread_key_t buffer_key;
>  static pthread_t printer_thread;
> +#ifdef CONFIG_XENO_FASTSYNCH
> +static xnarch_atomic_t *pool_bitmap;
> +static unsigned pool_bitmap_len;
> +static unsigned pool_buf_size;
> +static unsigned long pool_start, pool_len;
> +#endif /* CONFIG_XENO_FASTSYNCH */
>  
>  static void cleanup_buffer(struct print_buffer *buffer);
>  static void print_buffers(void);
> @@ -243,10 +254,36 @@ static void set_buffer_name(struct print_buffer *buffer, const char *name)
>  	}
>  }
>  
> +void rt_print_init_inner(struct print_buffer *buffer, size_t size)
> +{
> +	buffer->size = size;
> +
> +	memset(buffer->ring, 0, size);
> +
> +	buffer->read_pos  = 0;
> +	buffer->write_pos = 0;
> +
> +	buffer->prev = NULL;
> +
> +	pthread_mutex_lock(&buffer_lock);
> +
> +	buffer->next = first_buffer;
> +	if (first_buffer)
> +		first_buffer->prev = buffer;
> +	first_buffer = buffer;
> +
> +	buffers++;
> +	pthread_cond_signal(&printer_wakeup);
> +
> +	pthread_mutex_unlock(&buffer_lock);
> +}
> +
>  int rt_print_init(size_t buffer_size, const char *buffer_name)
>  {
>  	struct print_buffer *buffer = pthread_getspecific(buffer_key);
>  	size_t size = buffer_size;
> +	unsigned long old_bitmap;
> +	unsigned j;
>  
>  	if (!size)
>  		size = default_buffer_size;
> @@ -260,39 +297,56 @@ int rt_print_init(size_t buffer_size, const char *buffer_name)
>  			return 0;
>  		}
>  		cleanup_buffer(buffer);
> +		buffer = NULL;
>  	}
>  
> -	buffer = malloc(sizeof(*buffer));
> -	if (!buffer)
> -		return ENOMEM;
> +#ifdef CONFIG_XENO_FASTSYNCH
> +	/* Find a free buffer in the pool */
> +	do {
> +		unsigned long bitmap;
> +		unsigned i;
>  
> -	buffer->ring = malloc(size);
> -	if (!buffer->ring) {
> -		free(buffer);
> -		return ENOMEM;
> -	}
> -	memset(buffer->ring, 0, size);
> +		for (i = 0; i < pool_bitmap_len; i++) {
> +			old_bitmap = xnarch_atomic_get(&pool_bitmap[i]);
> +			if (old_bitmap)
> +				goto acquire;
> +		}
>  
> -	buffer->read_pos  = 0;
> -	buffer->write_pos = 0;
> +		goto not_found;
>  
> -	buffer->size = size;
> +	  acquire:
> +		do {
> +			bitmap = old_bitmap;
> +			j = __builtin_ffsl(bitmap) - 1;
> +			old_bitmap = xnarch_atomic_cmpxchg(&pool_bitmap[i],
> +							   bitmap,
> +							   bitmap & ~(1UL << j));
> +		} while (old_bitmap != bitmap && old_bitmap);
> +		j += i * BITS_PER_LONG;
> +	} while (!old_bitmap);
>  
> -	set_buffer_name(buffer, buffer_name);
> +	buffer = (struct print_buffer *)(pool_start + j * pool_buf_size);
>  
> -	buffer->prev = NULL;
> +  not_found:
> +#endif /* CONFIG_XENO_FASTSYNCH */
>  
> -	pthread_mutex_lock(&buffer_lock);
> +	if (!buffer) {
> +		assert_nrt();
>  
> -	buffer->next = first_buffer;
> -	if (first_buffer)
> -		first_buffer->prev = buffer;
> -	first_buffer = buffer;
> +		buffer = malloc(sizeof(*buffer));
> +		if (!buffer)
> +			return ENOMEM;
>  
> -	buffers++;
> -	pthread_cond_signal(&printer_wakeup);
> +		buffer->ring = malloc(size);
> +		if (!buffer->ring) {
> +			free(buffer);
> +			return ENOMEM;
> +		}
>  
> -	pthread_mutex_unlock(&buffer_lock);
> +		rt_print_init_inner(buffer, size);
> +	}
> +
> +	set_buffer_name(buffer, buffer_name);
>  
>  	pthread_setspecific(buffer_key, buffer);
>  
> @@ -346,12 +400,44 @@ static void cleanup_buffer(struct print_buffer *buffer)
>  {
>  	struct print_buffer *prev, *next;
>  
> +	assert_nrt();
> +
>  	pthread_setspecific(buffer_key, NULL);
>  
>  	pthread_mutex_lock(&buffer_lock);
>  
>  	print_buffers();
>  
> +	pthread_mutex_unlock(&buffer_lock);
> +
> +#ifdef CONFIG_XENO_FASTSYNCH
> +	/* Return the buffer to the pool */
> +	{
> +		unsigned long old_bitmap, bitmap;
> +		unsigned i, j;
> +
> +		if ((unsigned long)buffer - pool_start >= pool_len)
> +			goto dofree;
> +
> +		j = ((unsigned long)buffer - pool_start) / pool_buf_size;
> +		i = j / BITS_PER_LONG;
> +		j = j % BITS_PER_LONG;
> +
> +		old_bitmap = xnarch_atomic_get(&pool_bitmap[i]);
> +		do {
> +			bitmap = old_bitmap;
> +			old_bitmap = xnarch_atomic_cmpxchg(&pool_bitmap[i],
> +							   bitmap,
> +							   bitmap | (1UL << j));
> +		} while (old_bitmap != bitmap);
> +
> +		return;
> +	}
> +  dofree:
> +#endif /* CONFIG_XENO_FASTSYNCH */
> +
> +	pthread_mutex_lock(&buffer_lock);
> +
>  	prev = buffer->prev;
>  	next = buffer->next;
>  
> @@ -523,6 +609,64 @@ void __rt_print_init(void)
>  	print_period.tv_sec  = period / 1000;
>  	print_period.tv_nsec = (period % 1000) * 1000000;
>  
> +#ifdef CONFIG_XENO_FASTSYNCH
> +	/* Fill the buffer pool */
> +	{
> +		unsigned buffers_count, i;
> +
> +		buffers_count = RT_PRINT_DEFAULT_BUFFERS_COUNT;
> +
> +		value_str = getenv(RT_PRINT_BUFFERS_COUNT_ENV);
> +		if (value_str) {
> +			errno = 0;
> +			buffers_count = strtoul(value_str, NULL, 0);
> +			if (errno) {
> +				fprintf(stderr, "Invalid %s\n",
> +					RT_PRINT_BUFFERS_COUNT_ENV);
> +				exit(1);
> +			}
> +		}
> +
> +		pool_bitmap_len = (buffers_count + BITS_PER_LONG - 1)
> +			/ BITS_PER_LONG;
> +		if (!pool_bitmap_len)
> +			goto done;
> +
> +		pool_bitmap = malloc(pool_bitmap_len * sizeof(*pool_bitmap));
> +		if (!pool_bitmap) {
> +			fprintf(stderr, "Error allocating rt_printf "
> +				"buffers\n");
> +			exit(1);
> +		}
> +
> +		pool_buf_size = sizeof(struct print_buffer) + default_buffer_size;
> +		pool_len = buffers_count * pool_buf_size;
> +		pool_start = (unsigned long)malloc(pool_len);
> +		if (!pool_start) {
> +			fprintf(stderr, "Error allocating rt_printf "
> +				"buffers\n");
> +			exit(1);
> +		}
> +
> +		for (i = 0; i < buffers_count / BITS_PER_LONG; i++)
> +			xnarch_atomic_set(&pool_bitmap[i], ~0UL);
> +		if (buffers_count % BITS_PER_LONG)
> +			xnarch_atomic_set(&pool_bitmap[i],
> +					  (1UL << (buffers_count % BITS_PER_LONG))					  - 1);
> +
> +		for (i = 0; i < buffers_count; i++) {
> +			struct print_buffer *buffer =
> +				(struct print_buffer *)
> +				(pool_start + i * pool_buf_size);
> +
> +			buffer->ring = (char *)(buffer + 1);
> +
> +			rt_print_init_inner(buffer, default_buffer_size);
> +		}
> +	}
> +  done:
> +#endif /* CONFIG_XENO_FASTSYNCH */
> +
>  	pthread_mutex_init(&buffer_lock, NULL);
>  	pthread_key_create(&buffer_key, (void (*)(void*))cleanup_buffer);
>  
> 

Looks good to me.

[ As you know ensure that buffers taken from the pool are always
returned to it, thus the objects remain valid, you could theoretically
also use a linked list again. ]

Jan


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 259 bytes --]

^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [Xenomai-core] RFC: use a pool of pre-allocated buffers in rt_printf
  2011-06-27  6:32             ` Jan Kiszka
@ 2011-06-27 10:57               ` Gilles Chanteperdrix
  0 siblings, 0 replies; 10+ messages in thread
From: Gilles Chanteperdrix @ 2011-06-27 10:57 UTC (permalink / raw)
  To: Jan Kiszka; +Cc: Xenomai core

On 06/27/2011 08:32 AM, Jan Kiszka wrote:
> On 2011-06-24 21:58, Gilles Chanteperdrix wrote:
>> On 06/23/2011 01:36 PM, Jan Kiszka wrote:
>>> On 2011-06-23 13:29, Gilles Chanteperdrix wrote:
>>>> On 06/23/2011 11:09 AM, Jan Kiszka wrote:
>>>>> On 2011-06-23 11:05, Gilles Chanteperdrix wrote:
>>>>>> On 06/23/2011 09:38 AM, Jan Kiszka wrote:
>>>>>>>> +#ifdef CONFIG_XENO_FASTSYNCH
>>>>>>>> +	if (xeno_get_current() != XN_NO_HANDLE &&
>>>>>>>> +	    !(xeno_get_current_mode() & XNRELAX) && buf_pool_get()) {
>>>>>>>> +		struct print_buffer *old;
>>>>>>>> +
>>>>>>>> +		old = buf_pool_get();
>>>>>>>> +		while (old != buffer) {
>>>>>>>> +			buffer = old;
>>>>>>>> +			old = buf_pool_cmpxchg(buffer, buffer->pool_next);
>>>>>>>
>>>>>>> Though unlikely, it's still possible: The buffer obtained in the last
>>>>>>> round may have been dequeued meanwhile and then freed (in case a third
>>>>>>> buffer was released to the pool, filling it up to pool_capacity).
>>>>>>
>>>>>> I do not get it: it seems to me that if the current head (that is
>>>>>> buf_pool_get()) is freed, then the cmpxchg will fail, so we will loop
>>>>>> and try again.
>>>>>
>>>>> Problematic is the dereferencing of the stale buffer pointer obtained
>>>>> during the last cmpxchg or via buf_pool_get. This happens before the new
>>>>> cmpxchg.
>>>>
>>>> Ok. Got it, that would be a problem only if the stale pointer pointed to
>>>> an unmapped area, but ok, better avoid freeing the buffers. I guess it
>>>> would not be a problem as applications tend to have a fixed number of
>>>> threads anyway.
>>>
>>> That is a risky assumption, proven wrong e.g. by one of our
>>> applications. Threads may always be created or destroyed depending on
>>> the mode of an application, specifically if it is a larger one.
>>
>> Here is another attempt, based on a bitmap.
>>
>> diff --git a/src/rtdk/rt_print.c b/src/rtdk/rt_print.c
>> index 93b711a..fb4820f 100644
>> --- a/src/rtdk/rt_print.c
>> +++ b/src/rtdk/rt_print.c
>> @@ -28,7 +28,9 @@
>>  #include <syslog.h>
>>  
>>  #include <rtdk.h>
>> +#include <nucleus/types.h>	/* For BITS_PER_LONG */
>>  #include <asm/xenomai/system.h>
>> +#include <asm/xenomai/atomic.h>	/* For atomic_cmpxchg */
>>  #include <asm-generic/stack.h>
>>  
>>  #define RT_PRINT_BUFFER_ENV		"RT_PRINT_BUFFER"
>> @@ -37,6 +39,9 @@
>>  #define RT_PRINT_PERIOD_ENV		"RT_PRINT_PERIOD"
>>  #define RT_PRINT_DEFAULT_PERIOD		100 /* ms */
>>  
>> +#define RT_PRINT_BUFFERS_COUNT_ENV      "RT_PRINT_BUFFERS_COUNT"
>> +#define RT_PRINT_DEFAULT_BUFFERS_COUNT  4
>> +
>>  #define RT_PRINT_LINE_BREAK		256
>>  
>>  #define RT_PRINT_SYSLOG_STREAM		NULL
>> @@ -75,6 +80,12 @@ static pthread_mutex_t buffer_lock;
>>  static pthread_cond_t printer_wakeup;
>>  static pthread_key_t buffer_key;
>>  static pthread_t printer_thread;
>> +#ifdef CONFIG_XENO_FASTSYNCH
>> +static xnarch_atomic_t *pool_bitmap;
>> +static unsigned pool_bitmap_len;
>> +static unsigned pool_buf_size;
>> +static unsigned long pool_start, pool_len;
>> +#endif /* CONFIG_XENO_FASTSYNCH */
>>  
>>  static void cleanup_buffer(struct print_buffer *buffer);
>>  static void print_buffers(void);
>> @@ -243,10 +254,36 @@ static void set_buffer_name(struct print_buffer *buffer, const char *name)
>>  	}
>>  }
>>  
>> +void rt_print_init_inner(struct print_buffer *buffer, size_t size)
>> +{
>> +	buffer->size = size;
>> +
>> +	memset(buffer->ring, 0, size);
>> +
>> +	buffer->read_pos  = 0;
>> +	buffer->write_pos = 0;
>> +
>> +	buffer->prev = NULL;
>> +
>> +	pthread_mutex_lock(&buffer_lock);
>> +
>> +	buffer->next = first_buffer;
>> +	if (first_buffer)
>> +		first_buffer->prev = buffer;
>> +	first_buffer = buffer;
>> +
>> +	buffers++;
>> +	pthread_cond_signal(&printer_wakeup);
>> +
>> +	pthread_mutex_unlock(&buffer_lock);
>> +}
>> +
>>  int rt_print_init(size_t buffer_size, const char *buffer_name)
>>  {
>>  	struct print_buffer *buffer = pthread_getspecific(buffer_key);
>>  	size_t size = buffer_size;
>> +	unsigned long old_bitmap;
>> +	unsigned j;
>>  
>>  	if (!size)
>>  		size = default_buffer_size;
>> @@ -260,39 +297,56 @@ int rt_print_init(size_t buffer_size, const char *buffer_name)
>>  			return 0;
>>  		}
>>  		cleanup_buffer(buffer);
>> +		buffer = NULL;
>>  	}
>>  
>> -	buffer = malloc(sizeof(*buffer));
>> -	if (!buffer)
>> -		return ENOMEM;
>> +#ifdef CONFIG_XENO_FASTSYNCH
>> +	/* Find a free buffer in the pool */
>> +	do {
>> +		unsigned long bitmap;
>> +		unsigned i;
>>  
>> -	buffer->ring = malloc(size);
>> -	if (!buffer->ring) {
>> -		free(buffer);
>> -		return ENOMEM;
>> -	}
>> -	memset(buffer->ring, 0, size);
>> +		for (i = 0; i < pool_bitmap_len; i++) {
>> +			old_bitmap = xnarch_atomic_get(&pool_bitmap[i]);
>> +			if (old_bitmap)
>> +				goto acquire;
>> +		}
>>  
>> -	buffer->read_pos  = 0;
>> -	buffer->write_pos = 0;
>> +		goto not_found;
>>  
>> -	buffer->size = size;
>> +	  acquire:
>> +		do {
>> +			bitmap = old_bitmap;
>> +			j = __builtin_ffsl(bitmap) - 1;
>> +			old_bitmap = xnarch_atomic_cmpxchg(&pool_bitmap[i],
>> +							   bitmap,
>> +							   bitmap & ~(1UL << j));
>> +		} while (old_bitmap != bitmap && old_bitmap);
>> +		j += i * BITS_PER_LONG;
>> +	} while (!old_bitmap);
>>  
>> -	set_buffer_name(buffer, buffer_name);
>> +	buffer = (struct print_buffer *)(pool_start + j * pool_buf_size);
>>  
>> -	buffer->prev = NULL;
>> +  not_found:
>> +#endif /* CONFIG_XENO_FASTSYNCH */
>>  
>> -	pthread_mutex_lock(&buffer_lock);
>> +	if (!buffer) {
>> +		assert_nrt();
>>  
>> -	buffer->next = first_buffer;
>> -	if (first_buffer)
>> -		first_buffer->prev = buffer;
>> -	first_buffer = buffer;
>> +		buffer = malloc(sizeof(*buffer));
>> +		if (!buffer)
>> +			return ENOMEM;
>>  
>> -	buffers++;
>> -	pthread_cond_signal(&printer_wakeup);
>> +		buffer->ring = malloc(size);
>> +		if (!buffer->ring) {
>> +			free(buffer);
>> +			return ENOMEM;
>> +		}
>>  
>> -	pthread_mutex_unlock(&buffer_lock);
>> +		rt_print_init_inner(buffer, size);
>> +	}
>> +
>> +	set_buffer_name(buffer, buffer_name);
>>  
>>  	pthread_setspecific(buffer_key, buffer);
>>  
>> @@ -346,12 +400,44 @@ static void cleanup_buffer(struct print_buffer *buffer)
>>  {
>>  	struct print_buffer *prev, *next;
>>  
>> +	assert_nrt();
>> +
>>  	pthread_setspecific(buffer_key, NULL);
>>  
>>  	pthread_mutex_lock(&buffer_lock);
>>  
>>  	print_buffers();
>>  
>> +	pthread_mutex_unlock(&buffer_lock);
>> +
>> +#ifdef CONFIG_XENO_FASTSYNCH
>> +	/* Return the buffer to the pool */
>> +	{
>> +		unsigned long old_bitmap, bitmap;
>> +		unsigned i, j;
>> +
>> +		if ((unsigned long)buffer - pool_start >= pool_len)
>> +			goto dofree;
>> +
>> +		j = ((unsigned long)buffer - pool_start) / pool_buf_size;
>> +		i = j / BITS_PER_LONG;
>> +		j = j % BITS_PER_LONG;
>> +
>> +		old_bitmap = xnarch_atomic_get(&pool_bitmap[i]);
>> +		do {
>> +			bitmap = old_bitmap;
>> +			old_bitmap = xnarch_atomic_cmpxchg(&pool_bitmap[i],
>> +							   bitmap,
>> +							   bitmap | (1UL << j));
>> +		} while (old_bitmap != bitmap);
>> +
>> +		return;
>> +	}
>> +  dofree:
>> +#endif /* CONFIG_XENO_FASTSYNCH */
>> +
>> +	pthread_mutex_lock(&buffer_lock);
>> +
>>  	prev = buffer->prev;
>>  	next = buffer->next;
>>  
>> @@ -523,6 +609,64 @@ void __rt_print_init(void)
>>  	print_period.tv_sec  = period / 1000;
>>  	print_period.tv_nsec = (period % 1000) * 1000000;
>>  
>> +#ifdef CONFIG_XENO_FASTSYNCH
>> +	/* Fill the buffer pool */
>> +	{
>> +		unsigned buffers_count, i;
>> +
>> +		buffers_count = RT_PRINT_DEFAULT_BUFFERS_COUNT;
>> +
>> +		value_str = getenv(RT_PRINT_BUFFERS_COUNT_ENV);
>> +		if (value_str) {
>> +			errno = 0;
>> +			buffers_count = strtoul(value_str, NULL, 0);
>> +			if (errno) {
>> +				fprintf(stderr, "Invalid %s\n",
>> +					RT_PRINT_BUFFERS_COUNT_ENV);
>> +				exit(1);
>> +			}
>> +		}
>> +
>> +		pool_bitmap_len = (buffers_count + BITS_PER_LONG - 1)
>> +			/ BITS_PER_LONG;
>> +		if (!pool_bitmap_len)
>> +			goto done;
>> +
>> +		pool_bitmap = malloc(pool_bitmap_len * sizeof(*pool_bitmap));
>> +		if (!pool_bitmap) {
>> +			fprintf(stderr, "Error allocating rt_printf "
>> +				"buffers\n");
>> +			exit(1);
>> +		}
>> +
>> +		pool_buf_size = sizeof(struct print_buffer) + default_buffer_size;
>> +		pool_len = buffers_count * pool_buf_size;
>> +		pool_start = (unsigned long)malloc(pool_len);
>> +		if (!pool_start) {
>> +			fprintf(stderr, "Error allocating rt_printf "
>> +				"buffers\n");
>> +			exit(1);
>> +		}
>> +
>> +		for (i = 0; i < buffers_count / BITS_PER_LONG; i++)
>> +			xnarch_atomic_set(&pool_bitmap[i], ~0UL);
>> +		if (buffers_count % BITS_PER_LONG)
>> +			xnarch_atomic_set(&pool_bitmap[i],
>> +					  (1UL << (buffers_count % BITS_PER_LONG))					  - 1);
>> +
>> +		for (i = 0; i < buffers_count; i++) {
>> +			struct print_buffer *buffer =
>> +				(struct print_buffer *)
>> +				(pool_start + i * pool_buf_size);
>> +
>> +			buffer->ring = (char *)(buffer + 1);
>> +
>> +			rt_print_init_inner(buffer, default_buffer_size);
>> +		}
>> +	}
>> +  done:
>> +#endif /* CONFIG_XENO_FASTSYNCH */
>> +
>>  	pthread_mutex_init(&buffer_lock, NULL);
>>  	pthread_key_create(&buffer_key, (void (*)(void*))cleanup_buffer);
>>  
>>
> 
> Looks good to me.
> 
> [ As you know ensure that buffers taken from the pool are always
> returned to it, thus the objects remain valid, you could theoretically
> also use a linked list again. ]

In the mean-time, I have read a few things about a LIFO implementation
using cmpxchg, and learned about the ABA problem. So, it is not going to
be as simple as I imagined, I prefer to stick to bitmap approach, if you
have no objection.

-- 
                                                                Gilles.


^ permalink raw reply	[flat|nested] 10+ messages in thread

end of thread, other threads:[~2011-06-27 10:57 UTC | newest]

Thread overview: 10+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2011-06-22 21:55 [Xenomai-core] RFC: use a pool of pre-allocated buffers in rt_printf Gilles Chanteperdrix
2011-06-23  7:38 ` Jan Kiszka
2011-06-23  9:05   ` Gilles Chanteperdrix
2011-06-23  9:09     ` Jan Kiszka
2011-06-23 11:29       ` Gilles Chanteperdrix
2011-06-23 11:36         ` Jan Kiszka
2011-06-23 11:40           ` Gilles Chanteperdrix
2011-06-24 19:58           ` Gilles Chanteperdrix
2011-06-27  6:32             ` Jan Kiszka
2011-06-27 10:57               ` Gilles Chanteperdrix

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.