linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
From: Jakub Jelinek <jakub@redhat.com>
To: GOTO Masanori <gotom@debian.or.jp>
Cc: linux-kernel@vger.kernel.org
Subject: POSIX message queue passing (was Re: State of Posix compliance in v2.2/v2.4 kernel?)
Date: Sun, 19 Nov 2000 08:30:14 -0500	[thread overview]
Message-ID: <20001119083014.A1514@devserv.devel.redhat.com> (raw)
In-Reply-To: <88256996.00577D9E.00@hqoutbound.ops.3com.com> <3A101009.5F05DA18@mandrakesoft.com> <20001113111319.E1514@devserv.devel.redhat.com> <14871.43600.833808.90123Q@fe.dis.titech.ac.jp>
In-Reply-To: <14871.43600.833808.90123Q@fe.dis.titech.ac.jp>; from gotom@debian.or.jp on Sun, Nov 19, 2000 at 07:24:16PM +0900

[-- Attachment #1: Type: text/plain, Size: 2052 bytes --]

On Sun, Nov 19, 2000 at 07:24:16PM +0900, GOTO Masanori wrote:
> At Mon, 13 Nov 2000 11:13:19 -0500,
> Jakub Jelinek <jakub@redhat.com> wrote:
> > ago were done in the kernel, POSIX message queue passing is not doable in
> > userland without kernel help either (I have a message queue filesystem
> > kernel patch for this, but it is a 2.5 thing).
> 
> Interesting. Is yours ready for?
> (I'm also working with it. I agree it's for 2.5)

Below is my preliminary version from Sep, 16th if you're interested.
I haven't had time for it since then, so it most probably will not apply
cleanly to current kernel.
Things still to do:
- clean it up
- implement poll on message queues
- handle __SI_RT in architectural copy_siginfo_to_user routines
- test much more than I have done so far
- fix mq_notify - see below
- avoid doing linear searches - see below

Message queues are presented as a new filesystem, mounted usually on
/dev/msg. The objects in that filesystems are fifos with special MQ
semantics.
One can use normal open/read/write on fifos in /dev/msg, which
means mq_open with mq_attr NULL, mq_receive which does not tell the priority
and mq_send with default priority.
Then there are a few ioctls which allow to open with special queue
attributes, send with priority and receive so that you get priority back,
etc.
Things I'm not sure about is mq_notify, because it states the signal should
be sent to the process (ie. I'd think it is tgid, not pid in 2.4.0-test8,
but then I don't know which close/exit should cause the notification
registration to be freed).
Also, I wonder how many pending messages typical message queues have
pending, if not too many, then the current linear search is fine, otherwise
I should put the messages into some heap which would allow O(1) mq_receive.
If you find any races/problems, please let me know.

I've coded mqueue.h public glibc userland header and mqueue.c which has
hacks on top and then basically what could end up in glibc's mq_*.c (after
shm_open.c code for locating mount points is copied in).

	Jakub

[-- Attachment #2: 2.4.0-test8.patch --]
[-- Type: text/plain, Size: 43239 bytes --]

--- linux/Documentation/ioctl-number.txt.jj	Thu Jun 22 13:42:24 2000
+++ linux/Documentation/ioctl-number.txt	Fri Sep  8 13:16:42 2000
@@ -183,5 +183,6 @@ Code	Seq#	Include File		Comments
 0xB0	all	RATIO devices		in development:
 					<mailto:vgo@ratio.de>
 0xB1	00-1F	PPPoX			<mailto:mostrows@styx.uwaterloo.ca>
+0xB2	00-1F	linux/mqueue.h
 0xCB	00-1F	CBM serial IEC bus	in development:
 					<mailto:michael.klein@puffin.lb.shuttle.de>
--- linux/include/asm-alpha/siginfo.h.jj	Sat May 27 02:49:37 2000
+++ linux/include/asm-alpha/siginfo.h	Mon Sep 11 13:30:50 2000
@@ -104,7 +104,7 @@ typedef struct siginfo {
 #define SI_KERNEL	0x80		/* sent by the kernel from somewhere */
 #define SI_QUEUE	-1		/* sent by sigqueue */
 #define SI_TIMER __SI_CODE(__SI_TIMER,-2) /* sent by timer expiration */
-#define SI_MESGQ	-3		/* sent by real time mesq state change */
+#define SI_MESGQ __SI_CODE(__SI_RT,-3)	/* sent by real time mesq state change */
 #define SI_ASYNCIO	-4		/* sent by AIO completion */
 #define SI_SIGIO	-5		/* sent by queued SIGIO */
 
--- linux/include/asm-arm/siginfo.h.jj	Sat May 27 02:49:37 2000
+++ linux/include/asm-arm/siginfo.h	Mon Sep 11 13:31:02 2000
@@ -104,7 +104,7 @@ typedef struct siginfo {
 #define SI_KERNEL	0x80		/* sent by the kernel from somewhere */
 #define SI_QUEUE	-1		/* sent by sigqueue */
 #define SI_TIMER __SI_CODE(__SI_TIMER,-2) /* sent by timer expiration */
-#define SI_MESGQ	-3		/* sent by real time mesq state change */
+#define SI_MESGQ __SI_CODE(__SI_RT,-3)	/* sent by real time mesq state change */
 #define SI_ASYNCIO	-4		/* sent by AIO completion */
 #define SI_SIGIO	-5		/* sent by queued SIGIO */
 
--- linux/include/asm-i386/siginfo.h.jj	Thu Sep  7 10:38:08 2000
+++ linux/include/asm-i386/siginfo.h	Mon Sep 11 13:31:15 2000
@@ -104,7 +104,7 @@ typedef struct siginfo {
 #define SI_KERNEL	0x80		/* sent by the kernel from somewhere */
 #define SI_QUEUE	-1		/* sent by sigqueue */
 #define SI_TIMER __SI_CODE(__SI_TIMER,-2) /* sent by timer expiration */
-#define SI_MESGQ	-3		/* sent by real time mesq state change */
+#define SI_MESGQ __SI_CODE(__SI_RT,-3)	/* sent by real time mesq state change */
 #define SI_ASYNCIO	-4		/* sent by AIO completion */
 #define SI_SIGIO	-5		/* sent by queued SIGIO */
 
--- linux/include/asm-ia64/siginfo.h.jj	Tue Aug 15 10:09:41 2000
+++ linux/include/asm-ia64/siginfo.h	Mon Sep 11 13:31:23 2000
@@ -113,7 +113,7 @@ typedef struct siginfo {
 #define SI_KERNEL	0x80		/* sent by the kernel from somewhere */
 #define SI_QUEUE	-1		/* sent by sigqueue */
 #define SI_TIMER __SI_CODE(__SI_TIMER,-2) /* sent by timer expiration */
-#define SI_MESGQ	-3		/* sent by real time mesq state change */
+#define SI_MESGQ __SI_CODE(__SI_RT,-3)	/* sent by real time mesq state change */
 #define SI_ASYNCIO	-4		/* sent by AIO completion */
 #define SI_SIGIO	-5		/* sent by queued SIGIO */
 
--- linux/include/asm-m68k/siginfo.h.jj	Sat May 27 02:49:37 2000
+++ linux/include/asm-m68k/siginfo.h	Mon Sep 11 13:31:31 2000
@@ -104,7 +104,7 @@ typedef struct siginfo {
 #define SI_KERNEL	0x80		/* sent by the kernel from somewhere */
 #define SI_QUEUE	-1		/* sent by sigqueue */
 #define SI_TIMER __SI_CODE(__SI_TIMER,-2) /* sent by timer expiration */
-#define SI_MESGQ	-3		/* sent by real time mesq state change */
+#define SI_MESGQ __SI_CODE(__SI_RT,-3)	/* sent by real time mesq state change */
 #define SI_ASYNCIO	-4		/* sent by AIO completion */
 #define SI_SIGIO	-5		/* sent by queued SIGIO */
 
--- linux/include/asm-mips/siginfo.h.jj	Sat May 27 02:49:37 2000
+++ linux/include/asm-mips/siginfo.h	Mon Sep 11 13:31:49 2000
@@ -125,7 +125,7 @@ typedef struct siginfo {
 #define SI_QUEUE	-1	/* sent by sigqueue */
 #define SI_ASYNCIO	-2	/* sent by AIO completion */
 #define SI_TIMER __SI_CODE(__SI_TIMER,-3) /* sent by timer expiration */
-#define SI_MESGQ	-4	/* sent by real time mesq state change */
+#define SI_MESGQ __SI_CODE(__SI_RT,-4)	/* sent by real time mesq state change */
 #define SI_SIGIO	-5	/* sent by queued SIGIO */
 
 #define SI_FROMUSER(siptr)	((siptr)->si_code <= 0)
--- linux/include/asm-mips64/siginfo.h.jj	Sat May 27 02:49:37 2000
+++ linux/include/asm-mips64/siginfo.h	Mon Sep 11 13:31:59 2000
@@ -125,7 +125,7 @@ typedef struct siginfo {
 #define SI_QUEUE	-1	/* sent by sigqueue */
 #define SI_ASYNCIO	-2	/* sent by AIO completion */
 #define SI_TIMER __SI_CODE(__SI_TIMER,-3) /* sent by timer expiration */
-#define SI_MESGQ	-4	/* sent by real time mesq state change */
+#define SI_MESGQ __SI_CODE(__SI_RT,-4)	/* sent by real time mesq state change */
 #define SI_SIGIO	-5	/* sent by queued SIGIO */
 
 #define SI_FROMUSER(siptr)	((siptr)->si_code <= 0)
--- linux/include/asm-ppc/siginfo.h.jj	Sat May 27 02:49:37 2000
+++ linux/include/asm-ppc/siginfo.h	Mon Sep 11 13:32:07 2000
@@ -104,7 +104,7 @@ typedef struct siginfo {
 #define SI_KERNEL	0x80		/* sent by the kernel from somewhere */
 #define SI_QUEUE	-1		/* sent by sigqueue */
 #define SI_TIMER __SI_CODE(__SI_TIMER,-2) /* sent by timer expiration */
-#define SI_MESGQ	-3		/* sent by real time mesq state change */
+#define SI_MESGQ __SI_CODE(__SI_RT,-3)	/* sent by real time mesq state change */
 #define SI_ASYNCIO	-4		/* sent by AIO completion */
 #define SI_SIGIO	-5		/* sent by queued SIGIO */
 
--- linux/include/asm-sh/siginfo.h.jj	Sat May 27 02:49:37 2000
+++ linux/include/asm-sh/siginfo.h	Mon Sep 11 13:32:41 2000
@@ -104,7 +104,7 @@ typedef struct siginfo {
 #define SI_KERNEL	0x80		/* sent by the kernel from somewhere */
 #define SI_QUEUE	-1		/* sent by sigqueue */
 #define SI_TIMER __SI_CODE(__SI_TIMER,-2) /* sent by timer expiration */
-#define SI_MESGQ	-3		/* sent by real time mesq state change */
+#define SI_MESGQ __SI_CODE(__SI_RT,-3)	/* sent by real time mesq state change */
 #define SI_ASYNCIO	-4		/* sent by AIO completion */
 #define SI_SIGIO	-5		/* sent by queued SIGIO */
 
--- linux/include/asm-sparc/siginfo.h.jj	Sat May 27 02:49:37 2000
+++ linux/include/asm-sparc/siginfo.h	Mon Sep 11 13:32:49 2000
@@ -109,7 +109,7 @@ typedef struct siginfo {
 #define SI_KERNEL	0x80		/* sent by the kernel from somewhere */
 #define SI_QUEUE	-1		/* sent by sigqueue */
 #define SI_TIMER __SI_CODE(__SI_TIMER,-2) /* sent by timer expiration */
-#define SI_MESGQ	-3		/* sent by real time mesq state change */
+#define SI_MESGQ __SI_CODE(__SI_RT,-3)	/* sent by real time mesq state change */
 #define SI_ASYNCIO	-4		/* sent by AIO completion */
 #define SI_SIGIO	-5		/* sent by queued SIGIO */
 
--- linux/include/asm-sparc64/siginfo.h.jj	Sat May 27 02:49:37 2000
+++ linux/include/asm-sparc64/siginfo.h	Mon Sep 11 13:32:56 2000
@@ -169,7 +169,7 @@ typedef struct siginfo32 {
 #define SI_KERNEL	0x80		/* sent by the kernel from somewhere */
 #define SI_QUEUE	-1		/* sent by sigqueue */
 #define SI_TIMER __SI_CODE(__SI_TIMER,-2) /* sent by timer expiration */
-#define SI_MESGQ	-3		/* sent by real time mesq state change */
+#define SI_MESGQ __SI_CODE(__SI_RT,-3)	/* sent by real time mesq state change */
 #define SI_ASYNCIO	-4		/* sent by AIO completion */
 #define SI_SIGIO	-5		/* sent by queued SIGIO */
 
--- linux/include/linux/mqueue.h.jj	Fri Sep  8 10:21:42 2000
+++ linux/include/linux/mqueue.h	Mon Sep 11 10:01:06 2000
@@ -0,0 +1,37 @@
+#ifndef _LINUX_MQUEUE_H
+#define _LINUX_MQUEUE_H
+
+#include <linux/types.h>
+#include <linux/ioctl.h>
+#include <asm/siginfo.h>
+
+struct mq_attr {
+	long		mq_flags;	/* O_NONBLOCK or 0 */
+	long		mq_maxmsg;	/* Maximum number of messages in the queue */
+	long		mq_msgsize;	/* Maximum size of one message in bytes */
+	long		mq_curmsgs;	/* Current number of messages in the queue */
+	long		__pad[2];
+};
+
+struct mq_open {
+	char		*mq_name;	/* pathname */
+	int		mq_oflag;	/* flags */
+	mode_t		mq_mode;	/* mode */
+	struct mq_attr	mq_attr;	/* attributes */
+};
+
+struct mq_sndrcv {
+	size_t		mq_len;		/* message length */
+	long		mq_type;	/* message type */
+	char		*mq_buf;	/* message buffer */
+};
+
+#define MQ_OPEN		_IOW(0xB2, 0, struct mq_open)
+#define MQ_GETATTR	_IOR(0xB2, 1, struct mq_attr)
+#define MQ_SEND		_IOW(0xB2, 2, struct mq_sndrcv)
+#define MQ_RECEIVE	_IOWR(0xB2, 3, struct mq_sndrcv)
+#define MQ_NOTIFY	_IOW(0xB2, 4, struct sigevent)
+
+#define MQ_DEFAULT_TYPE	0x7FFFFFFE
+
+#endif /* _LINUX_MQUEUE_H */
--- linux/ipc/msg.c.jj	Thu Jan 13 01:06:46 2000
+++ linux/ipc/msg.c	Mon Sep 11 19:13:47 2000
@@ -13,6 +13,9 @@
  * mostly rewritten, threaded and wake-one semantics added
  * MSGMAX limit removed, sysctl's added
  * (c) 1999 Manfred Spraul <manfreds@colorfullife.com>
+ *
+ * make it a filesystem (based on Christoph Rohland's work on shmfs),
+ * (c) 2000 Jakub Jelinek <jakub@redhat.com>
  */
 
 #include <linux/config.h>
@@ -22,6 +25,11 @@
 #include <linux/init.h>
 #include <linux/proc_fs.h>
 #include <linux/list.h>
+#include <linux/module.h>
+#include <linux/mqueue.h>
+#include <linux/poll.h>
+#include <linux/signal.h>
+#include <linux/smp_lock.h>
 #include <asm/uaccess.h>
 #include "util.h"
 
@@ -29,6 +37,78 @@
 int msg_ctlmax = MSGMAX;
 int msg_ctlmnb = MSGMNB;
 int msg_ctlmni = MSGMNI;
+static int msg_mode;
+
+#define MSG_FS_MAGIC	822419456
+
+#define MSG_NAME_LEN NAME_MAX
+#define MSG_FMT ".IPC_%08x"
+#define MSG_FMT_LEN 13
+
+#define MSG_UNLK	0010000 /* filename is unlinked */
+#define MSG_SYSV	0020000 /* It is a SYSV message queue */
+
+static struct super_block * msg_sb;
+
+static struct super_block *msg_read_super(struct super_block *,void *, int);
+static void msg_put_super(struct super_block *);
+static int msg_remount_fs(struct super_block *, int *, char *);
+static void msg_read_inode(struct inode *);
+static int msg_statfs(struct super_block *, struct statfs *);
+static int msg_create(struct inode *,struct dentry *,int);
+static struct dentry *msg_lookup(struct inode *,struct dentry *);
+static int msg_unlink(struct inode *,struct dentry *);
+static int msg_setattr(struct dentry *dent, struct iattr *attr);
+static void msg_delete(struct inode *);
+static int msg_readdir(struct file *, void *, filldir_t);
+static int msg_remove_name(int id);
+static int msg_ioctl(struct inode *, struct file *, unsigned int, unsigned long);
+static int msg_root_ioctl(struct inode *, struct file *, unsigned int, unsigned long);
+static ssize_t msg_read(struct file *, char *, size_t, loff_t *);
+static ssize_t msg_write(struct file *, const char *, size_t, loff_t *);
+/* FIXME: Support poll on mq
+static unsigned int msg_poll(struct file *, poll_table *);
+ */
+static ssize_t msg_send (struct inode *, struct file *, const char *, size_t, long);
+static ssize_t msg_receive (struct inode *, struct file *, char *, size_t, long *);
+static int msg_flush (struct file *);
+static int msg_release (struct inode *, struct file *);
+
+static DECLARE_FSTYPE(msg_fs_type, "msg", msg_read_super, FS_SINGLE);
+
+static struct super_operations msg_sops = {
+	read_inode:	msg_read_inode,
+	delete_inode:	msg_delete,
+	put_super:	msg_put_super,
+	statfs:		msg_statfs,
+	remount_fs:	msg_remount_fs,
+};
+
+static struct file_operations msg_root_operations = {
+	readdir:	msg_readdir,
+	ioctl:		msg_root_ioctl,
+};
+
+static struct inode_operations msg_root_inode_operations = {
+	create:		msg_create,
+	lookup:		msg_lookup,
+	unlink:		msg_unlink,
+};
+
+static struct file_operations msg_file_operations = {
+	read:		msg_read,
+	write:		msg_write,
+	ioctl:		msg_ioctl,
+/* FIXME: Support poll on mq *
+	poll:		msg_poll,
+ */
+	flush:		msg_flush,
+	release:	msg_release,
+};
+
+static struct inode_operations msg_inode_operations = {
+	setattr:	msg_setattr,
+};
 
 /* one msg_receiver structure for each sleeping receiver */
 struct msg_receiver {
@@ -55,7 +135,7 @@ struct msg_msgseg {
 /* one msg_msg structure for each message */
 struct msg_msg {
 	struct list_head m_list; 
-	long  m_type;          
+	long  m_type;
 	int m_ts;           /* message text size */
 	struct msg_msgseg* next;
 	/* the actual message follows immediately */
@@ -67,19 +147,36 @@ struct msg_msg {
 /* one msq_queue structure for each present queue on the system */
 struct msg_queue {
 	struct kern_ipc_perm q_perm;
+#define q_flags q_perm.mode
 	time_t q_stime;			/* last msgsnd time */
 	time_t q_rtime;			/* last msgrcv time */
 	time_t q_ctime;			/* last change time */
 	unsigned long q_cbytes;		/* current number of bytes on queue */
 	unsigned long q_qnum;		/* number of messages in queue */
 	unsigned long q_qbytes;		/* max number of bytes on queue */
+	unsigned int q_msgsize;		/* max number of bytes for one message */
+	unsigned int q_maxmsg;		/* max number of outstanding messages */
 	pid_t q_lspid;			/* pid of last msgsnd */
 	pid_t q_lrpid;			/* last receive pid */
+	int q_signo;			/* signal to be sent if empty queue with no waiting
+					   receivers should be sent */
+	pid_t q_pid;			/* to which pid */
+	sigval_t q_sigval;		/* which value to pass */
+	int id;
 
 	struct list_head q_messages;
 	struct list_head q_receivers;
 	struct list_head q_senders;
+	unsigned int q_namelen;
+	unsigned char q_name[0];
+};
+
+struct mq_link {
+	struct list_head link;
+	struct task_struct *tsk;
+	struct mq_attr *attr;
 };
+static LIST_HEAD(mq_open_links);
 
 #define SEARCH_ANY		1
 #define SEARCH_EQUAL		2
@@ -94,32 +191,529 @@ static struct ipc_ids msg_ids;
 #define msg_lock(id)	((struct msg_queue*)ipc_lock(&msg_ids,id))
 #define msg_unlock(id)	ipc_unlock(&msg_ids,id)
 #define msg_rmid(id)	((struct msg_queue*)ipc_rmid(&msg_ids,id))
-#define msg_checkid(msq, msgid)	\
-	ipc_checkid(&msg_ids,&msq->q_perm,msgid)
+#define msg_get(id)	((struct msg_queue*)ipc_get(&msg_ids,id))
 #define msg_buildid(id, seq) \
 	ipc_buildid(&msg_ids, id, seq)
 
 static void freeque (int id);
-static int newque (key_t key, int msgflg);
+static int newque (key_t key, const char *name, int namelen, struct mq_attr *attr, int msgflg);
 #ifdef CONFIG_PROC_FS
 static int sysvipc_msg_read_proc(char *buffer, char **start, off_t offset, int length, int *eof, void *data);
 #endif
 
 void __init msg_init (void)
 {
+	struct vfsmount *res;
 	ipc_init_ids(&msg_ids,msg_ctlmni);
 
+	register_filesystem (&msg_fs_type);
+	res = kern_mount(&msg_fs_type);
+	if (IS_ERR(res)) {
+		unregister_filesystem(&msg_fs_type);
+		return;
+	}
 #ifdef CONFIG_PROC_FS
 	create_proc_read_entry("sysvipc/msg", 0, 0, sysvipc_msg_read_proc, NULL);
 #endif
 }
 
-static int newque (key_t key, int msgflg)
+static int msg_parse_options(char *options)
+{
+	int blocks = msg_ctlmnb * msg_ctlmni;
+	int inodes = msg_ctlmni;
+	umode_t mode = msg_mode;
+	char *this_char, *value;
+
+	this_char = NULL;
+	if ( options )
+		this_char = strtok(options,",");
+	for ( ; this_char; this_char = strtok(NULL,",")) {
+		if ((value = strchr(this_char,'=')) != NULL)
+			*value++ = 0;
+		if (!strcmp(this_char,"nr_blocks")) {
+			if (!value || !*value)
+				return 1;
+			blocks = simple_strtoul(value,&value,0);
+			if (*value)
+				return 1;
+		}
+		else if (!strcmp(this_char,"nr_inodes")) {
+			if (!value || !*value)
+				return 1;
+			inodes = simple_strtoul(value,&value,0);
+			if (*value)
+				return 1;
+		}
+		else if (!strcmp(this_char,"mode")) {
+			if (!value || !*value)
+				return 1;
+			mode = simple_strtoul(value,&value,8);
+			if (*value)
+				return 1;
+		}
+		else
+			return 1;
+	}
+/* FIXME *
+	msg_ctlmni = inodes;
+	msg_ctlmnb = inodes ? blocks / inodes : 0;
+ */
+	msg_mode   = mode;
+
+	return 0;
+}
+
+static struct super_block *msg_read_super(struct super_block *s,void *data, 
+					  int silent)
+{
+	struct inode * root_inode;
+
+/* FIXME *
+	msg_ctlmnb = MSGMNB;
+	msg_ctlmni = MSGMNI;
+ */
+	msg_mode   = S_IRWXUGO | S_ISVTX;
+	if (msg_parse_options (data)) {
+		printk(KERN_ERR "msg fs invalid option\n");
+		goto out_unlock;
+	}
+
+	s->s_blocksize = PAGE_SIZE;
+	s->s_blocksize_bits = PAGE_SHIFT;
+	s->s_magic = MSG_FS_MAGIC;
+	s->s_op = &msg_sops;
+	root_inode = iget (s, SEQ_MULTIPLIER);
+	if (!root_inode)
+		goto out_no_root;
+	root_inode->i_op = &msg_root_inode_operations;
+	root_inode->i_sb = s;
+	root_inode->i_nlink = 2;
+	root_inode->i_mode = S_IFDIR | msg_mode;
+	s->s_root = d_alloc_root(root_inode);
+	if (!s->s_root)
+		goto out_no_root;
+	msg_sb = s;
+	return s;
+
+out_no_root:
+	printk(KERN_ERR "msg_read_super: get root inode failed\n");
+	iput(root_inode);
+out_unlock:
+	return NULL;
+}
+
+static int msg_remount_fs (struct super_block *sb, int *flags, char *data)
+{
+	if (msg_parse_options (data))
+		return -EINVAL;
+	return 0;
+}
+
+static inline int msg_checkid(struct msg_queue *msq, int id)
+{
+	if (!(msq->q_flags & MSG_SYSV))
+		return -EINVAL;
+	if (ipc_checkid(&msg_ids,&msq->q_perm,id))
+		return -EIDRM;
+	return 0;
+}
+
+static void msg_put_super(struct super_block *sb)
+{
+	int i;
+	struct msg_queue *msq;
+
+	down(&msg_ids.sem);
+	for(i = 0; i <= msg_ids.max_id; i++) {
+		if (!(msq = msg_lock (i)))
+			continue;
+		freeque(i);
+	}
+	dput (sb->s_root);
+	up(&msg_ids.sem);
+}
+
+static int msg_statfs(struct super_block *sb, struct statfs *buf)
+{
+	buf->f_type = MSG_FS_MAGIC;
+	buf->f_bsize = PAGE_SIZE;
+	buf->f_blocks = (msg_ctlmnb * msg_ctlmni) >> PAGE_SHIFT;
+	buf->f_bavail = buf->f_bfree = buf->f_blocks - (atomic_read(&msg_bytes) >> PAGE_SHIFT);
+	buf->f_files = msg_ctlmni;
+	buf->f_ffree = msg_ctlmni - atomic_read(&msg_hdrs);
+	buf->f_namelen = MSG_NAME_LEN;
+	return 0;
+}
+
+static void msg_read_inode(struct inode * inode)
 {
 	int id;
 	struct msg_queue *msq;
 
-	msq  = (struct msg_queue *) kmalloc (sizeof (*msq), GFP_KERNEL);
+	id = inode->i_ino;
+	inode->i_op = NULL;
+	inode->i_mode = 0;
+	
+	if (id < SEQ_MULTIPLIER) {
+		if (!(msq = msg_lock (id)))
+			return;
+		inode->i_mode = (msq->q_flags & S_IRWXUGO) | S_IFIFO;
+		inode->i_uid  = msq->q_perm.uid;
+		inode->i_gid  = msq->q_perm.gid;
+		inode->i_size = msq->q_cbytes;
+		inode->i_mtime = msq->q_stime;
+		inode->i_atime = msq->q_stime > msq->q_rtime ? msq->q_stime : msq->q_rtime;
+		inode->i_ctime = msq->q_ctime;
+		msg_unlock (id);
+		inode->i_op  = &msg_inode_operations;
+		inode->i_fop = &msg_file_operations;
+		return;
+	}
+	inode->i_mtime = inode->i_atime = inode->i_ctime = CURRENT_TIME;
+	inode->i_op    = &msg_root_inode_operations;
+	inode->i_fop   = &msg_root_operations;
+	inode->i_sb    = msg_sb;
+	inode->i_nlink = 2;
+	inode->i_mode  = S_IFDIR | msg_mode;
+	inode->i_uid   = inode->i_gid = 0;
+}
+
+static int msg_create (struct inode *dir, struct dentry *dent, int mode)
+{
+	int id, err;
+	struct inode *inode;
+	struct mq_attr attr, *p;
+	struct list_head *tmp;
+
+	attr.mq_maxmsg = 32;
+	attr.mq_msgsize = 64;
+	p = &attr;
+
+	down(&msg_ids.sem);
+	list_for_each(tmp, &mq_open_links) {
+		struct mq_link *l = list_entry(tmp, struct mq_link, link);
+		if (l->tsk == current) {
+			p = l->attr;
+			break;
+		}
+	}
+	err = id = newque (IPC_PRIVATE, dent->d_name.name, dent->d_name.len, p, mode);
+	if (err < 0)
+		goto out;
+
+	err = -ENOMEM;
+	inode = iget (msg_sb, id % SEQ_MULTIPLIER);
+	if (!inode)
+		goto out;
+
+	err = 0;
+	down (&inode->i_sem);
+	inode->i_mode = (mode & S_IRWXUGO) | S_IFIFO;
+	inode->i_op   = &msg_inode_operations;
+	d_instantiate(dent, inode);
+	up (&inode->i_sem);
+
+out:
+	up(&msg_ids.sem);
+	return err;
+}
+
+static int msg_readdir (struct file *filp, void *dirent, filldir_t filldir)
+{
+	struct inode * inode = filp->f_dentry->d_inode;
+	struct msg_queue *msq;
+	off_t nr;
+
+	nr = filp->f_pos;
+
+	switch(nr)
+	{
+	case 0:
+		if (filldir(dirent, ".", 1, nr, inode->i_ino, DT_DIR) < 0)
+			return 0;
+		filp->f_pos = ++nr;
+		/* fall through */
+	case 1:
+		if (filldir(dirent, "..", 2, nr, inode->i_ino, DT_DIR) < 0)
+			return 0;
+		filp->f_pos = ++nr;
+		/* fall through */
+	default:
+		down(&msg_ids.sem);
+		for (; nr-2 <= msg_ids.max_id; nr++) {
+			if (!(msq = msg_get (nr-2))) 
+				continue;
+			if (msq->q_flags & MSG_UNLK)
+				continue;
+			if (filldir(dirent, msq->q_name, msq->q_namelen, nr, nr, DT_FIFO) < 0)
+				break;;
+		}
+		filp->f_pos = nr;
+		up(&msg_ids.sem);
+		break;
+	}
+
+	UPDATE_ATIME(inode);
+	return 0;
+}
+
+static struct dentry *msg_lookup (struct inode *dir, struct dentry *dent)
+{
+	int i, err = 0;
+	struct msg_queue* msq;
+	struct inode *inode = NULL;
+
+	if (dent->d_name.len > MSG_NAME_LEN)
+		return ERR_PTR(-ENAMETOOLONG);
+
+	down(&msg_ids.sem);
+	for(i = 0; i <= msg_ids.max_id; i++) {
+		if (!(msq = msg_lock(i)))
+			continue;
+		if (!(msq->q_flags & MSG_UNLK) &&
+		    dent->d_name.len == msq->q_namelen &&
+		    strncmp(dent->d_name.name, msq->q_name, msq->q_namelen) == 0)
+			goto found;
+		msg_unlock(i);
+	}
+
+	/*
+	 * prevent the reserved names as negative dentries. 
+	 * This also prevents object creation through the filesystem
+	 */
+	if (dent->d_name.len == MSG_FMT_LEN &&
+	    memcmp (MSG_FMT, dent->d_name.name, MSG_FMT_LEN - 8) == 0)
+		err = -EINVAL;	/* EINVAL to give IPC_RMID the right error */
+
+	goto out;
+
+found:
+	msg_unlock(i);
+	inode = iget(dir->i_sb, i);
+
+	if (!inode)
+		err = -EACCES;
+out:
+	if (err == 0)
+		d_add (dent, inode);
+	up (&msg_ids.sem);
+	return ERR_PTR(err);
+}
+
+extern inline int msg_do_unlink (struct inode *dir, struct dentry *dent, int sysv)
+{
+	struct inode * inode = dent->d_inode;
+	struct msg_queue *msq;
+
+	down (&msg_ids.sem);
+	if (!(msq = msg_lock (inode->i_ino)))
+		BUG();
+	if (sysv) {
+		int ret = 0;
+
+		if (!(msq->q_flags & MSG_SYSV))
+			ret = -EINVAL;
+		else if (current->euid != msq->q_perm.cuid &&
+			 current->euid != msq->q_perm.uid && !capable(CAP_SYS_ADMIN))
+			ret = -EPERM;
+		if (ret) {
+			msg_unlock (inode->i_ino);
+			up (&msg_ids.sem);
+			return ret;
+		}
+	}
+	msq->q_flags |= MSG_UNLK;
+	msq->q_perm.key = IPC_PRIVATE; /* Do not find it any more */
+	msg_unlock (inode->i_ino);
+	up (&msg_ids.sem);
+	inode->i_nlink -= 1;
+	/*
+	 * If it's a reserved name we have to drop the dentry instead
+	 * of creating a negative dentry
+	 */
+	if (dent->d_name.len == MSG_FMT_LEN &&
+	    memcmp (MSG_FMT, dent->d_name.name, MSG_FMT_LEN - 8) == 0)
+		d_drop (dent);
+	return 0;
+}
+
+static int msg_unlink (struct inode *dir, struct dentry *dent)
+{
+	return msg_do_unlink (dir, dent, 0);
+}
+
+static int msg_setattr (struct dentry *dentry, struct iattr *attr)
+{
+	int error;
+	struct inode *inode = dentry->d_inode;
+	struct msg_queue *msq;
+
+	error = inode_change_ok(inode, attr);
+	if (error)
+		return error;
+	if (attr->ia_valid & ATTR_SIZE)
+		return -EINVAL;
+
+	if (attr->ia_valid & (ATTR_MODE | ATTR_UID | ATTR_GID)) {
+		if (!(msq = msg_lock(inode->i_ino)))
+			BUG();
+		if (attr->ia_valid & ATTR_MODE)
+			msq->q_flags = (msq->q_flags & ~S_IRWXUGO)
+				| (S_IRWXUGO & attr->ia_mode);
+		if (attr->ia_valid & ATTR_UID)
+			msq->q_perm.uid = attr->ia_uid;
+		if (attr->ia_valid & ATTR_GID)
+			msq->q_perm.gid = attr->ia_gid;
+		msq->q_ctime = attr->ia_ctime;
+		msg_unlock (inode->i_ino);
+	}
+
+	inode_setattr(inode, attr);
+	return error;
+}
+
+static int msg_root_ioctl (struct inode * inode, struct file * filp, unsigned int cmd, unsigned long arg)
+{
+	struct mq_open o;
+	struct mq_link link;
+	int ret;
+
+	if (cmd != MQ_OPEN)
+		return -EINVAL;
+	unlock_kernel();
+	ret = -EFAULT;
+	if (copy_from_user(&o, (struct mq_open *)arg, sizeof(struct mq_open)))
+		goto out;
+	ret = -EINVAL;
+	if ((unsigned long)o.mq_attr.mq_msgsize > msg_ctlmnb ||
+	    (unsigned long)o.mq_attr.mq_maxmsg > msg_ctlmnb ||
+	    o.mq_attr.mq_msgsize * o.mq_attr.mq_maxmsg > msg_ctlmnb)
+		goto out;
+	link.attr = &o.mq_attr;
+	link.tsk = current;
+	down(&msg_ids.sem);
+	list_add(&link.link, &mq_open_links);
+	up(&msg_ids.sem);
+	/* FIXME: Shouldn't we check here whether mq_name is really a file within the msg filesystem?
+	   Otherwise people tracing the open(2) syscall might miss this place... */
+	ret = sys_open(o.mq_name, o.mq_oflag, o.mq_mode);
+	down(&msg_ids.sem);
+	list_del(&link.link);
+	up(&msg_ids.sem);
+out:
+	lock_kernel();
+	return ret;
+}
+
+static int msg_ioctl (struct inode * inode, struct file * filp, unsigned int cmd, unsigned long arg)
+{
+	int ret = -EINVAL;
+	struct msg_queue *msq;
+	struct mq_sndrcv sr;
+
+	unlock_kernel();
+	switch (cmd) {
+	case MQ_GETATTR: {
+		struct mq_attr attr;
+		memset(&attr, 0, sizeof(attr));
+		msq = msg_lock (inode->i_ino);
+		if (msq == NULL)
+			BUG();
+		attr.mq_maxmsg = msq->q_maxmsg;
+		attr.mq_msgsize = msq->q_msgsize;
+		attr.mq_curmsgs = msq->q_qnum;
+		attr.mq_flags = filp->f_flags & O_NONBLOCK;
+		msg_unlock (inode->i_ino);
+		ret = copy_to_user((struct mq_attr *)arg, &attr, sizeof(attr)) ? -EFAULT : 0;
+		break;
+		}
+	case MQ_SEND:
+		ret = -EBADF;
+		if (!(filp->f_mode & FMODE_WRITE))
+			break;
+		ret = -EFAULT;
+		if (copy_from_user(&sr, (struct mq_sndrcv *)arg, sizeof(sr)))
+			break;
+		ret = -EINVAL;
+		if (sr.mq_type <= 0)
+			break;
+		ret = msg_send (inode, filp, sr.mq_buf, sr.mq_len, sr.mq_type);
+		break;
+	case MQ_RECEIVE:
+		ret = -EBADF;
+		if (!(filp->f_mode & FMODE_READ))
+			break;
+		ret = -EFAULT;
+		if (copy_from_user(&sr, (struct mq_sndrcv *)arg, sizeof(sr)))
+			break;
+		ret = msg_receive (inode, filp, sr.mq_buf, sr.mq_len, &sr.mq_type);
+		if (!ret && put_user (sr.mq_type, &((struct mq_sndrcv *)arg)->mq_type))
+			ret = -EFAULT;
+		break;
+	case MQ_NOTIFY: {
+		struct sigevent sev;
+		struct msg_queue *msg;
+		ret = -EFAULT;
+		if (copy_from_user(&sev, (struct sigevent *)arg, sizeof(sev)))
+			break;
+		ret = -EINVAL;
+		if (sev.sigev_notify != SIGEV_SIGNAL && sev.sigev_notify != SIGEV_NONE)
+			break;
+		if (sev.sigev_signo <= 0 || sev.sigev_signo > _NSIG)
+			break;
+		msg = msg_lock(inode->i_ino);
+		if (!msg) BUG();
+		ret = 0;
+		if (msg->q_signo)
+			ret = -EBUSY;
+		else if (sev.sigev_notify == SIGEV_SIGNAL) {
+			msg->q_signo = sev.sigev_signo;
+			msg->q_sigval = sev.sigev_value;
+		} else
+			msg->q_signo = 0;
+		msg_unlock(inode->i_ino);
+		}
+	default:
+		break;
+	}
+	lock_kernel();
+	return ret;
+}
+
+static ssize_t msg_write(struct file * file, const char * buf, size_t count, loff_t *ppos)
+{
+	int ret = msg_send(file->f_dentry->d_inode, file, buf, count, MQ_DEFAULT_TYPE);
+	return ret ?: count;
+}
+
+static ssize_t msg_read(struct file * file, char * buf, size_t count, loff_t *ppos)
+{
+	return msg_receive(file->f_dentry->d_inode, file, buf, count, NULL);
+}
+
+static int msg_release (struct inode *ino, struct file *filp)
+{
+	struct msg_queue *msq = msg_lock(ino->i_ino);
+	if (!msq) BUG();
+	if (msq->q_signo && msq->q_pid == current->pid)
+		msq->q_signo = 0;
+	msg_unlock(ino->i_ino);
+	return 0;
+}
+
+static int msg_flush (struct file *filp)
+{
+	return msg_release(filp->f_dentry->d_inode, filp);
+}
+
+static int newque (key_t key, const char *name, int namelen, struct mq_attr *attr, int msgflg)
+{
+	int id, bid;
+	struct msg_queue *msq;
+
+	if (namelen > MSG_NAME_LEN)
+		return -ENAMETOOLONG;
+	msq = (struct msg_queue *) kmalloc (sizeof (*msq) + namelen, GFP_KERNEL);
 	if (!msq) 
 		return -ENOMEM;
 	id = ipc_addid(&msg_ids, &msq->q_perm, msg_ctlmni);
@@ -127,20 +721,92 @@ static int newque (key_t key, int msgflg
 		kfree(msq);
 		return -ENOSPC;
 	}
-	msq->q_perm.mode = (msgflg & S_IRWXUGO);
+	msq->q_flags = (msgflg & S_IRWXUGO);
 	msq->q_perm.key = key;
 
 	msq->q_stime = msq->q_rtime = 0;
 	msq->q_ctime = CURRENT_TIME;
 	msq->q_cbytes = msq->q_qnum = 0;
-	msq->q_qbytes = msg_ctlmnb;
 	msq->q_lspid = msq->q_lrpid = 0;
+	msq->q_signo = 0;
 	INIT_LIST_HEAD(&msq->q_messages);
 	INIT_LIST_HEAD(&msq->q_receivers);
 	INIT_LIST_HEAD(&msq->q_senders);
+	msq->id = bid = msg_buildid(id, msq->q_perm.seq);
+
+	if (name) {
+		msq->q_maxmsg = attr->mq_maxmsg;
+		msq->q_msgsize = attr->mq_msgsize;
+		msq->q_qbytes = msq->q_maxmsg * msq->q_msgsize;
+		msq->q_namelen = namelen;
+		memcpy(msq->q_name, name, namelen);
+	} else {
+		msq->q_qbytes = msg_ctlmnb;
+		msq->q_maxmsg = msg_ctlmnb;
+		msq->q_msgsize = msg_ctlmax;
+		msq->q_flags |= MSG_SYSV;
+		msq->q_namelen = sprintf(msq->q_name, MSG_FMT, bid);
+	}
 	msg_unlock(id);
 
-	return msg_buildid(id,msq->q_perm.seq);
+	return bid;
+}
+
+/* FIXME: maybe we need lock_kernel() here */
+static void msg_delete (struct inode *ino)
+{
+	int msgid = ino->i_ino;
+	struct msg_queue *msq;
+
+	down(&msg_ids.sem);
+	msq = msg_lock(msgid);
+	if(msq==NULL)
+		BUG();
+	freeque(msgid);
+	up(&msg_ids.sem);
+	clear_inode(ino);
+}
+
+static int msg_remove_name(int msqid)
+{
+	struct dentry *dir;
+	struct dentry *dentry;
+	struct msg_queue *msq;
+	int error, id;
+	char name[MSG_FMT_LEN+1];
+
+	down(&msg_ids.sem);
+	msq = msg_lock(msqid);
+	if (msq == NULL)
+		return -EINVAL;
+	id = msq->id;
+	if (msg_checkid (msq, msqid)) {
+		msg_unlock(msqid);
+		return -EIDRM;
+	}
+	msg_unlock(msqid);
+	up(&msg_ids.sem);
+	sprintf (name, MSG_FMT, id);
+	dir = lock_parent(msg_sb->s_root);
+	dentry = lookup_one(name, dir);
+	error = PTR_ERR(dentry);
+	if (!IS_ERR(dentry)) {
+		/*
+		 * We have to do our own unlink to prevent the vfs
+		 * permission check. We'll do the SYSV IPC style check
+		 * inside of msg_do_unlink when we hold msg lock and
+		 * msg_ids semaphore.
+		 */
+		struct inode *inode = dir->d_inode;
+		down(&inode->i_zombie);
+		error = msg_do_unlink(inode, dentry, 1);
+		if (!error)
+			d_delete(dentry);
+		up(&inode->i_zombie);
+		dput(dentry);
+	}
+	unlock_dir(dir);
+	return error;
 }
 
 static void free_msg(struct msg_msg* msg)
@@ -155,7 +821,7 @@ static void free_msg(struct msg_msg* msg
 	}
 }
 
-static struct msg_msg* load_msg(void* src, int len)
+static struct msg_msg* load_msg(const char * src, int len)
 {
 	struct msg_msg* msg;
 	struct msg_msgseg** pseg;
@@ -207,9 +873,9 @@ out_err:
 	return ERR_PTR(err);
 }
 
-static int store_msg(void* dest, struct msg_msg* msg, int len)
+static int store_msg(void* dest, struct msg_msg* msg, size_t len)
 {
-	int alen;
+	size_t alen;
 	struct msg_msgseg *seg;
 
 	alen = len;
@@ -229,7 +895,7 @@ static int store_msg(void* dest, struct 
 			return -1;
 		len -= alen;
 		dest = ((char*)dest)+alen;
-		seg=seg->next;
+		seg = seg->next;
 	}
 	return 0;
 }
@@ -288,7 +954,7 @@ static void freeque (int id)
 	expunge_all(msq,-EIDRM);
 	ss_wakeup(&msq->q_senders,1);
 	msg_unlock(id);
-		
+
 	tmp = msq->q_messages.next;
 	while(tmp != &msq->q_messages) {
 		struct msg_msg* msg = list_entry(tmp,struct msg_msg,m_list);
@@ -307,12 +973,12 @@ asmlinkage long sys_msgget (key_t key, i
 	
 	down(&msg_ids.sem);
 	if (key == IPC_PRIVATE) 
-		ret = newque(key, msgflg);
+		ret = newque(key, NULL, MSG_FMT_LEN + 1, NULL, msgflg);
 	else if ((id = ipc_findkey(&msg_ids, key)) == -1) { /* key not used */
 		if (!(msgflg & IPC_CREAT))
 			ret = -ENOENT;
 		else
-			ret = newque(key, msgflg);
+			ret = newque(key, NULL, MSG_FMT_LEN + 1, NULL, msgflg);
 	} else if (msgflg & IPC_CREAT && msgflg & IPC_EXCL) {
 		ret = -EEXIST;
 	} else {
@@ -483,10 +1149,13 @@ asmlinkage long sys_msgctl (int msqid, i
 			return -EINVAL;
 
 		if(cmd == MSG_STAT) {
+			err = -EINVAL;
+			if (!(msq->q_flags & MSG_SYSV))
+				goto out_unlock;
 			success_return = msg_buildid(msqid, msq->q_perm.seq);
 		} else {
-			err = -EIDRM;
-			if (msg_checkid(msq,msqid))
+			err = msg_checkid(msq,msqid);
+			if (err)
 				goto out_unlock;
 			success_return = 0;
 		}
@@ -495,6 +1164,7 @@ asmlinkage long sys_msgctl (int msqid, i
 			goto out_unlock;
 
 		kernel_to_ipc64_perm(&msq->q_perm, &tbuf.msg_perm);
+		tbuf.msg_perm.mode &= S_IRWXUGO;
 		tbuf.msg_stime  = msq->q_stime;
 		tbuf.msg_rtime  = msq->q_rtime;
 		tbuf.msg_ctime  = msq->q_ctime;
@@ -515,7 +1185,7 @@ asmlinkage long sys_msgctl (int msqid, i
 			return -EFAULT;
 		break;
 	case IPC_RMID:
-		break;
+		return msg_remove_name(msqid);
 	default:
 		return  -EINVAL;
 	}
@@ -536,12 +1206,11 @@ asmlinkage long sys_msgctl (int msqid, i
 	    /* We _could_ check for CAP_CHOWN above, but we don't */
 		goto out_unlock_up;
 
-	switch (cmd) {
-	case IPC_SET:
-	{
+	if (cmd == IPC_SET) {
 		if (setbuf.qbytes > msg_ctlmnb && !capable(CAP_SYS_RESOURCE))
 			goto out_unlock_up;
 		msq->q_qbytes = setbuf.qbytes;
+		msq->q_maxmsg = setbuf.qbytes;
 
 		ipcp->uid = setbuf.uid;
 		ipcp->gid = setbuf.gid;
@@ -557,11 +1226,6 @@ asmlinkage long sys_msgctl (int msqid, i
 		 */
 		ss_wakeup(&msq->q_senders,0);
 		msg_unlock(msqid);
-		break;
-	}
-	case IPC_RMID:
-		freeque (msqid); 
-		break;
 	}
 	err = 0;
 out_up:
@@ -623,6 +1287,105 @@ int inline pipelined_send(struct msg_que
 	return 0;
 }
 
+static int msg_do_send (struct msg_queue **msqp, int msqid,
+			struct msg_msg *msg, size_t msgsz, int nowait)
+{
+	struct msg_queue *msq = *msqp;
+
+	if(msgsz + msq->q_cbytes > msq->q_qbytes ||
+	   1 + msq->q_qnum > msq->q_maxmsg) {
+		struct msg_sender s;
+
+		if(nowait)
+			return -EAGAIN;
+
+		ss_add(msq, &s);
+		msg_unlock(msqid);
+		schedule();
+		current->state = TASK_RUNNING;
+
+		*msqp = msq = msg_lock(msqid);
+		if(msq==NULL)
+			return -EIDRM;
+		ss_del(&s);
+		
+		if (signal_pending(current))
+			return -EINTR;
+		return -EBUSY;
+	}
+
+	if(!pipelined_send(msq,msg)) {
+		/* noone is waiting for this message, enqueue it */
+		list_add_tail(&msg->m_list,&msq->q_messages);
+		msq->q_cbytes += msgsz;
+		msq->q_qnum++;
+		atomic_add(msgsz,&msg_bytes);
+		atomic_inc(&msg_hdrs);
+		if (msq->q_qnum == 1 && msq->q_signo) {
+			struct task_struct *p;
+			siginfo_t si;
+			read_lock(&tasklist_lock);
+			p = find_task_by_pid(msq->q_pid);
+			if (p) {
+				si.si_signo = msq->q_signo;
+				si.si_errno = 0;
+				si.si_code = SI_MESGQ;
+				si.si_pid = current->pid;
+				si.si_uid = current->euid;
+				si.si_value = msq->q_sigval;
+				if (!send_sig_info(msq->q_signo, &si, p))
+					send_sig(msq->q_signo, p, 1);
+			}
+			read_unlock(&tasklist_lock);
+			msq->q_signo = 0;
+		}
+	}
+
+	msq->q_lspid = current->pid;
+	msq->q_stime = CURRENT_TIME;
+	return 0;
+}
+
+static ssize_t msg_send (struct inode *ino, struct file *filp, const char *mtext, size_t msgsz, long mtype)
+{
+	struct msg_queue *msq;
+	struct msg_msg *msg;
+	int err = 0;
+	
+	if (mtype < 1)
+		return -EINVAL;
+	msq = msg_lock(ino->i_ino);
+	if (!msq) BUG();
+	if (msgsz > msq->q_msgsize)
+		err = -EMSGSIZE;
+	msg_unlock(ino->i_ino);
+	if (err) return err;
+
+	msg = load_msg(mtext, msgsz);
+	if(IS_ERR(msg))
+		return PTR_ERR(msg);
+
+	msg->m_type = mtype;
+	msg->m_ts = msgsz;
+
+	msq = msg_lock(ino->i_ino);
+	if (!msq) BUG();
+
+	do {
+		err = -EACCES;
+		if (msq->q_flags & MSG_SYSV && ipcperms(&msq->q_perm, S_IWUGO))
+			break;
+
+		err = msg_do_send(&msq, ino->i_ino, msg, msgsz, filp->f_flags & O_NONBLOCK);
+
+	} while (err == -EBUSY);
+
+	msg_unlock(ino->i_ino);
+	if (msg && err)
+		free_msg(msg);
+	return err;
+}
+
 asmlinkage long sys_msgsnd (int msqid, struct msgbuf *msgp, size_t msgsz, int msgflg)
 {
 	struct msg_queue *msq;
@@ -648,59 +1411,23 @@ asmlinkage long sys_msgsnd (int msqid, s
 	err=-EINVAL;
 	if(msq==NULL)
 		goto out_free;
-retry:
-	err= -EIDRM;
-	if (msg_checkid(msq,msqid))
-		goto out_unlock_free;
-
-	err=-EACCES;
-	if (ipcperms(&msq->q_perm, S_IWUGO)) 
-		goto out_unlock_free;
+	do {
+		err= -EIDRM;
+		if (msg_checkid(msq,msqid))
+			break;
 
-	if(msgsz + msq->q_cbytes > msq->q_qbytes ||
-		1 + msq->q_qnum > msq->q_qbytes) {
-		struct msg_sender s;
+		err=-EACCES;
+		if (ipcperms(&msq->q_perm, S_IWUGO)) 
+			break;
 
-		if(msgflg&IPC_NOWAIT) {
-			err=-EAGAIN;
-			goto out_unlock_free;
-		}
-		ss_add(msq, &s);
-		msg_unlock(msqid);
-		schedule();
-		current->state= TASK_RUNNING;
+		err = msg_do_send(&msq, msqid, msg, msgsz, msgflg & IPC_NOWAIT);
 
-		msq = msg_lock(msqid);
-		err = -EIDRM;
-		if(msq==NULL)
-			goto out_free;
-		ss_del(&s);
-		
-		if (signal_pending(current)) {
-			err=-EINTR;
-			goto out_unlock_free;
-		}
-		goto retry;
-	}
+	} while (err == -EBUSY);
 
-	if(!pipelined_send(msq,msg)) {
-		/* noone is waiting for this message, enqueue it */
-		list_add_tail(&msg->m_list,&msq->q_messages);
-		msq->q_cbytes += msgsz;
-		msq->q_qnum++;
-		atomic_add(msgsz,&msg_bytes);
-		atomic_inc(&msg_hdrs);
-	}
-	
-	err = 0;
-	msg = NULL;
-	msq->q_lspid = current->pid;
-	msq->q_stime = CURRENT_TIME;
-
-out_unlock_free:
-	msg_unlock(msqid);
+	if (msq)
+		msg_unlock(msqid);
 out_free:
-	if(msg!=NULL)
+	if (msg && err)
 		free_msg(msg);
 	return err;
 }
@@ -724,123 +1451,170 @@ int inline convert_mode(long* msgtyp, in
 	return SEARCH_EQUAL;
 }
 
+static struct msg_msg *
+msg_do_receive (struct msg_queue *msq, int *msqidp, size_t msgsz,
+		long msgtyp, int mode, int msgflg)
+{
+	struct msg_receiver msr_d;
+	struct list_head *tmp;
+	struct msg_msg *msg, *found_msg;
+	int msqid = *msqidp;
+
+	for (;;) {
+		if (msq->q_flags & MSG_SYSV && ipcperms (&msq->q_perm, S_IRUGO))
+			return ERR_PTR(-EACCES);
+
+		tmp = msq->q_messages.next;
+		found_msg = NULL;
+		while (tmp != &msq->q_messages) {
+			msg = list_entry(tmp,struct msg_msg,m_list);
+			if(testmsg(msg, msgtyp, mode)) {
+				found_msg = msg;
+				if(mode == SEARCH_LESSEQUAL && msg->m_type != 1)
+					msgtyp = msg->m_type - 1;
+				else
+					break;
+			}
+			tmp = tmp->next;
+		}
+		if (found_msg) {
+			msg = found_msg;
+			if ((msgsz < msg->m_ts) && !(msgflg & MSG_NOERROR))
+				return ERR_PTR(-E2BIG);
+			list_del(&msg->m_list);
+			msq->q_qnum--;
+			msq->q_rtime = CURRENT_TIME;
+			msq->q_lrpid = current->pid;
+			msq->q_cbytes -= msg->m_ts;
+			atomic_sub(msg->m_ts,&msg_bytes);
+			atomic_dec(&msg_hdrs);
+			ss_wakeup(&msq->q_senders,0);
+			msg_unlock(msqid);
+			return msg;
+		} else {
+			struct msg_queue *t;
+			/* no message waiting. Prepare for pipelined
+			 * receive.
+			 */
+			if (msgflg & IPC_NOWAIT)
+				return ERR_PTR(-ENOMSG);
+			list_add_tail(&msr_d.r_list,&msq->q_receivers);
+			msr_d.r_tsk = current;
+			msr_d.r_msgtype = msgtyp;
+			msr_d.r_mode = mode;
+			if(msgflg & MSG_NOERROR)
+				msr_d.r_maxsize = INT_MAX;
+			else
+				msr_d.r_maxsize = msgsz;
+			msr_d.r_msg = ERR_PTR(-EAGAIN);
+			current->state = TASK_INTERRUPTIBLE;
+			msg_unlock(msqid);
+
+			schedule();
+			current->state = TASK_RUNNING;
+
+			msg = (struct msg_msg*) msr_d.r_msg;
+			if(!IS_ERR(msg))
+				return msg;
+
+			t = msg_lock(msqid);
+			if(t == NULL)
+				*msqidp = msqid = -1;
+			msg = (struct msg_msg*)msr_d.r_msg;
+			if(!IS_ERR(msg)) {
+				/* our message arived while we waited for
+				 * the spinlock. Process it.
+				 */
+				if (msqid != -1)
+					msg_unlock(msqid);
+				return msg;
+			}
+			if(PTR_ERR(msg) == -EAGAIN) {
+				if(msqid == -1)
+					BUG();
+				list_del(&msr_d.r_list);
+				if (signal_pending(current))
+					return ERR_PTR(-EINTR);
+				else
+					continue;
+			}
+			return msg;
+		}
+	}
+}
+
+static int msg_receive (struct inode *ino, struct file *filp, char *mtext,
+			size_t msgsz, long *msgtypp)
+{
+	struct msg_queue *msq;
+	struct msg_msg *msg;
+	long msgtyp;
+	int err, mode, msqid = ino->i_ino;
+
+	if (msgtypp)
+		msgtyp = *msgtypp;
+	else
+		msgtyp = -MQ_DEFAULT_TYPE;
+	mode = convert_mode(&msgtyp, 0);
+	msq = msg_lock(msqid);
+	if (!msq) BUG();
+	if (msgtypp && msgsz < msq->q_msgsize) {
+		msg_unlock(msqid);
+		return -EMSGSIZE;
+	}
+
+	msg = msg_do_receive (msq, &msqid, msgsz, msgtyp, mode,
+			      (filp->f_flags & O_NONBLOCK) ? IPC_NOWAIT : 0);
+	if (!IS_ERR (msg)) {
+		msgsz = (msgsz > msg->m_ts) ? msg->m_ts : msgsz;
+		if (store_msg(mtext, msg, msgsz))
+			msgsz = -EFAULT;
+		else if (msgtypp)
+			*msgtypp = msg->m_type;
+		free_msg(msg);
+		return msgsz;
+	}
+	if (msqid != -1)
+		msg_unlock(msqid);
+	err = PTR_ERR(msg);
+	switch (err) {
+	case -ENOMSG: err = -EAGAIN; break;
+	case -E2BIG: err = -EMSGSIZE; break;
+	}
+	return err;
+}
+
 asmlinkage long sys_msgrcv (int msqid, struct msgbuf *msgp, size_t msgsz,
 			    long msgtyp, int msgflg)
 {
 	struct msg_queue *msq;
-	struct msg_receiver msr_d;
-	struct list_head* tmp;
-	struct msg_msg* msg, *found_msg;
-	int err;
+	struct msg_msg *msg;
 	int mode;
 
 	if (msqid < 0 || (long) msgsz < 0)
 		return -EINVAL;
-	mode = convert_mode(&msgtyp,msgflg);
+	mode = convert_mode(&msgtyp, msgflg);
 
-	msq = msg_lock(msqid);
-	if(msq==NULL)
+	msq = msg_lock (msqid);
+	if (msq==NULL)
 		return -EINVAL;
-retry:
-	err=-EACCES;
-	if (ipcperms (&msq->q_perm, S_IRUGO))
-		goto out_unlock;
 
-	tmp = msq->q_messages.next;
-	found_msg=NULL;
-	while (tmp != &msq->q_messages) {
-		msg = list_entry(tmp,struct msg_msg,m_list);
-		if(testmsg(msg,msgtyp,mode)) {
-			found_msg = msg;
-			if(mode == SEARCH_LESSEQUAL && msg->m_type != 1) {
-				found_msg=msg;
-				msgtyp=msg->m_type-1;
-			} else {
-				found_msg=msg;
-				break;
-			}
-		}
-		tmp = tmp->next;
+	if (!(msq->q_flags & MSG_SYSV)) {
+		msg_unlock (msqid);
+		return -EINVAL;
 	}
-	if(found_msg) {
-		msg=found_msg;
-		if ((msgsz < msg->m_ts) && !(msgflg & MSG_NOERROR)) {
-			err=-E2BIG;
-			goto out_unlock;
-		}
-		list_del(&msg->m_list);
-		msq->q_qnum--;
-		msq->q_rtime = CURRENT_TIME;
-		msq->q_lrpid = current->pid;
-		msq->q_cbytes -= msg->m_ts;
-		atomic_sub(msg->m_ts,&msg_bytes);
-		atomic_dec(&msg_hdrs);
-		ss_wakeup(&msq->q_senders,0);
-		msg_unlock(msqid);
-out_success:
+	msg = msg_do_receive (msq, &msqid, msgsz, msgtyp, mode, msgflg);
+	if (!IS_ERR (msg)) {
 		msgsz = (msgsz > msg->m_ts) ? msg->m_ts : msgsz;
 		if (put_user (msg->m_type, &msgp->mtype) ||
-		    store_msg(msgp->mtext, msg, msgsz)) {
-			    msgsz = -EFAULT;
-		}
+		    store_msg(msgp->mtext, msg, msgsz))
+			msgsz = -EFAULT;
 		free_msg(msg);
 		return msgsz;
-	} else
-	{
-		struct msg_queue *t;
-		/* no message waiting. Prepare for pipelined
-		 * receive.
-		 */
-		if (msgflg & IPC_NOWAIT) {
-			err=-ENOMSG;
-			goto out_unlock;
-		}
-		list_add_tail(&msr_d.r_list,&msq->q_receivers);
-		msr_d.r_tsk = current;
-		msr_d.r_msgtype = msgtyp;
-		msr_d.r_mode = mode;
-		if(msgflg & MSG_NOERROR)
-			msr_d.r_maxsize = INT_MAX;
-		 else
-		 	msr_d.r_maxsize = msgsz;
-		msr_d.r_msg = ERR_PTR(-EAGAIN);
-		current->state = TASK_INTERRUPTIBLE;
-		msg_unlock(msqid);
-
-		schedule();
-		current->state = TASK_RUNNING;
-
-		msg = (struct msg_msg*) msr_d.r_msg;
-		if(!IS_ERR(msg)) 
-			goto out_success;
-
-		t = msg_lock(msqid);
-		if(t==NULL)
-			msqid=-1;
-		msg = (struct msg_msg*)msr_d.r_msg;
-		if(!IS_ERR(msg)) {
-			/* our message arived while we waited for
-			 * the spinlock. Process it.
-			 */
-			if(msqid!=-1)
-				msg_unlock(msqid);
-			goto out_success;
-		}
-		err = PTR_ERR(msg);
-		if(err == -EAGAIN) {
-			if(msqid==-1)
-				BUG();
-			list_del(&msr_d.r_list);
-			if (signal_pending(current))
-				err=-EINTR;
-			 else
-				goto retry;
-		}
 	}
-out_unlock:
-	if(msqid!=-1)
+	if (msqid != -1)
 		msg_unlock(msqid);
-	return err;
+	return PTR_ERR(msg);
 }
 
 #ifdef CONFIG_PROC_FS
@@ -857,10 +1631,10 @@ static int sysvipc_msg_read_proc(char *b
 		struct msg_queue * msq;
 		msq = msg_lock(i);
 		if(msq != NULL) {
-			len += sprintf(buffer + len, "%10d %10d  %4o  %10lu %10lu %5u %5u %5u %5u %5u %5u %10lu %10lu %10lu\n",
+			len += sprintf(buffer + len, "%10d %10d  %4o  %10lu %10lu %5u %5u %5u %5u %5u %5u %10lu %10lu %10lu %.*s%s\n",
 				msq->q_perm.key,
 				msg_buildid(i,msq->q_perm.seq),
-				msq->q_perm.mode,
+				msq->q_flags & S_IRWXUGO,
 				msq->q_cbytes,
 				msq->q_qnum,
 				msq->q_lspid,
@@ -871,7 +1645,10 @@ static int sysvipc_msg_read_proc(char *b
 				msq->q_perm.cgid,
 				msq->q_stime,
 				msq->q_rtime,
-				msq->q_ctime);
+				msq->q_ctime,
+				msq->q_namelen,
+				msq->q_name,
+				msq->q_flags & MSG_UNLK ? " (deleted)" : "");
 			msg_unlock(i);
 
 			pos += len;

[-- Attachment #3: mqueue.h --]
[-- Type: text/plain, Size: 813 bytes --]

#ifndef _MQUEUE_H
#define _MQUEUE_H

#include <sys/types.h>
#include <signal.h>
#include <fcntl.h>

typedef int mqd_t;

struct mq_attr {
  long  mq_flags;	/* O_NONBLOCK or 0 */
  long  mq_maxmsg;	/* Maximum number of messages in the queue */
  long  mq_msgsize;	/* Maximum size of one message in bytes */
  long  mq_curmsgs;	/* Current number of messages in the queue */
  long  __pad[2];
};

#define MQ_PRIO_MAX	0x7FFFFFFE

int	mq_close (mqd_t);
int	mq_getattr (mqd_t, struct mq_attr *);
int	mq_notify (mqd_t, const struct sigevent *);
mqd_t	mq_open (const char *, int, ...);
ssize_t	mq_receive (mqd_t, char *, size_t, unsigned int *);
int	mq_send (mqd_t, const char *, size_t, unsigned int);
int	mq_setattr (mqd_t, const struct mq_attr *, struct mq_attr *);
int	mq_unlink (const char *);

#endif /* mqueue.h */

[-- Attachment #4: mqueue.c --]
[-- Type: text/plain, Size: 5905 bytes --]

#define _GNU_SOURCE
#include <sys/types.h>
#include <sys/ioctl.h>
#include <stdarg.h>
#include <signal.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <alloca.h>

struct mq_attr {
	long		mq_flags;	/* O_NONBLOCK or 0 */
	long		mq_maxmsg;	/* Maximum number of messages in the queue */
	long		mq_msgsize;	/* Maximum size of one message in bytes */
	long		mq_curmsgs;	/* Current number of messages in the queue */
	long		__pad[2];
};

struct mq_open {
	char		*mq_name;	/* pathname */
	int		mq_oflag;	/* flags */
	unsigned short	mq_mode;	/* mode */
	struct mq_attr	mq_attr;	/* attributes */
};

struct mq_sndrcv {
	size_t		mq_len;		/* message length */
	long		mq_type;	/* message type */
	char		*mq_buf;	/* message buffer */
};

#define MQ_OPEN		_IOW(0xB2, 0, struct mq_open)
#define MQ_GETATTR	_IOR(0xB2, 1, struct mq_attr)
#define MQ_SEND		_IOW(0xB2, 2, struct mq_sndrcv)
#define MQ_RECEIVE	_IOWR(0xB2, 3, struct mq_sndrcv)
#define MQ_NOTIFY	_IOW(0xB2, 4, struct sigevent)

#define MQ_PRIO_MAX	0x7FFFFFFE

typedef int mqd_t;

/* *********** */
#ifndef __set_errno
#define __set_errno(x) errno = x
#endif
#ifndef __memcpy
#define __memcpy(x,y,z) memcpy(x,y,z)
#endif
#ifndef __mempcpy
#define __mempcpy(x,y,z) mempcpy(x,y,z)
#endif
#ifndef __ioctl
#define __ioctl(x,y,z) ioctl(x,y,z)
#endif
#undef __libc_once
#define __libc_once(x,y) do { } while (0)
/* *********** */

/* Mount point of the shared memory filesystem.  */
static struct
{
  char *dir;
  size_t dirlen;
} mountpoint
= { "/dev/msg/", 9 };

mqd_t mq_open (const char *name, int oflag, ...)
{
  size_t namelen;
  char *fname, *p;
  int fd;
  mqd_t ret;
  mode_t mode = 0;
  struct mq_attr *attr = NULL;
  struct mq_open open_arg;

  /* Determine where the msgfs is mounted.  */
  __libc_once (once, where_is_msgfs);

  /* If we don't know the mount points there is nothing we can do.  Ever.  */
  if (mountpoint.dir == NULL)
    {
      __set_errno (ENOSYS);
      return -1;
    }
      
  /* Construct the filename.  */
  while (name[0] == '/')
    ++name;
                                          
  if (name[0] == '\0')
    {
      /* The name "/" is not supported.  */
      __set_errno (EINVAL);
      return -1;
    }
                                                                      
  namelen = strlen (name);
  fname = (char *) alloca (mountpoint.dirlen + namelen + 1);
  p = __mempcpy (fname, mountpoint.dir, mountpoint.dirlen);

  if (oflag & O_CREAT)
    {
      va_list ap;

      va_start (ap, oflag);
      /* Get the arguments.  */
      mode = va_arg (ap, mode_t);
      attr = va_arg (ap, struct mq_attr *);
      va_end (ap);

      if (attr != NULL)
	{
	  p[0] = '.';
	  p[1] = '\0';
	  fd = open (fname, O_RDONLY);

	  if (fd < 0)
	    {
	      __set_errno (ENOSYS);
	      return -1;
	    }

	  __memcpy (p, name, namelen + 1);
	  open_arg.mq_name = fname;
	  open_arg.mq_oflag = oflag;
	  open_arg.mq_mode = mode;
	  open_arg.mq_attr = *attr;

	  ret = __ioctl (fd, MQ_OPEN, &open_arg);
	  if (ret < 0 && errno == ENOTTY)
	    __set_errno (ENOSYS);

	  close (fd);
	  return ret;
	}
    }

  __memcpy (p, name, namelen + 1);
  return open (fname, oflag, mode);
}

int mq_close (mqd_t mqdes)
{
  return close (mqdes);
}

int mq_send (mqd_t mqdes, const char *buf, size_t len, unsigned int prio)
{
  struct mq_sndrcv send_req;

  if (prio > MQ_PRIO_MAX)
    {
      __set_errno (EINVAL);
      return -1;
    }

  send_req.mq_buf = (char *)buf;
  send_req.mq_len = len;
  send_req.mq_type = MQ_PRIO_MAX - prio;

  return __ioctl (mqdes, MQ_SEND, &send_req);
}

ssize_t mq_receive (mqd_t mqdes, char *buf, size_t len, unsigned int *prio)
{
  struct mq_sndrcv recv_req;
  ssize_t ret;

  recv_req.mq_buf = buf;
  recv_req.mq_len = len;
  recv_req.mq_type = -MQ_PRIO_MAX;

  ret = __ioctl (mqdes, MQ_RECEIVE, &recv_req);

  if (!ret && prio != NULL)
    *prio = MQ_PRIO_MAX - recv_req.mq_type;

  return ret;
}

int mq_unlink (const char *name)
{
  size_t namelen;
  char *fname;

  /* Determine where the msgfs is mounted.  */
  __libc_once (once, where_is_msgfs);

  if (mountpoint.dir == NULL)
    {
      /* We cannot find the shmfs.  If `name' is really a message
         queue object it must have been created by another process
         and we have no idea where that process found the mountpoint.  */
      __set_errno (ENOENT);
      return -1;
    }

  /* Construct the filename.  */
  while (name[0] == '/')
    ++name;

  if (name[0] == '\0')
    {
      /* The name "/" is not supported.  */
      __set_errno (ENOENT);
      return -1;
    }

  namelen = strlen (name);
  fname = (char *) alloca (mountpoint.dirlen + namelen + 1);
  __mempcpy (__mempcpy (fname, mountpoint.dir, mountpoint.dirlen),
             name, namelen + 1);

  /* And get the file descriptor.  */
  return unlink (fname);
}

int mq_getattr (mqd_t mqdes, const struct mq_attr *mqstat)
{
  return __ioctl (mqdes, MQ_GETATTR, mqstat);
}

int mq_setattr (mqd_t mqdes, const struct mq_attr *mqstat,
	        struct mq_attr *omqstat)
{
  int ret;

  if (omqstat != NULL)
    {
      ret = __ioctl (mqdes, MQ_GETATTR, omqstat);
      if (ret)
	return ret;
    }

  ret = fcntl (mqdes, F_GETFL);
  if (ret == -1)
    return ret;

  if ((ret ^ mqstat->mq_flags) & O_NONBLOCK)
    {
      ret = fcntl (mqdes, F_SETFL, (ret & ~O_NONBLOCK)
				   | (mqstat->mq_flags & O_NONBLOCK));
      if (ret == -1)
	return ret;
    }

  return 0;
}

int mq_notify (mqd_t mqdes, const struct sigevent *notification)
{
  struct sigevent null_notify, *n;

  n = (struct sigevent *)notification;
  if (notification == NULL)
    {
      null_notify.sigev_notify = SIGEV_NONE;
      n = &null_notify;
    }
  else if (notification->sigev_notify != SIGEV_NONE
	   && notification->sigev_notify != SIGEV_SIGNAL)
    {
      __set_errno (EINVAL);
      return -1;
    }

  return __ioctl (mqdes, MQ_NOTIFY, notification);
}

      reply	other threads:[~2000-11-19 14:00 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2000-11-13 15:54 State of Posix compliance in v2.2/v2.4 kernel? Steven_Snyder
2000-11-13 16:00 ` Jeff Garzik
2000-11-13 16:13   ` Jakub Jelinek
2000-11-13 16:17     ` Jeff Garzik
2000-11-13 18:31       ` Gary Lawrence Murphy
2000-11-13 16:40   ` Guest section DW
2000-11-13 20:28   ` Ingo Oeser
2000-11-19 10:24   ` GOTO Masanori
2000-11-19 13:30     ` Jakub Jelinek [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20001119083014.A1514@devserv.devel.redhat.com \
    --to=jakub@redhat.com \
    --cc=gotom@debian.or.jp \
    --cc=linux-kernel@vger.kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).