linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [take36 3/10] kevent: poll/select() notifications.
       [not found] <11712796473213@2ka.mipt.ru>
@ 2007-02-12 11:27 ` Evgeniy Polyakov
  2007-02-12 11:27   ` [take36 4/10] kevent: Socket notifications Evgeniy Polyakov
  0 siblings, 1 reply; 12+ messages in thread
From: Evgeniy Polyakov @ 2007-02-12 11:27 UTC (permalink / raw)
  To: Evgeniy Polyakov
  Cc: David Miller, Ulrich Drepper, Andrew Morton, Evgeniy Polyakov,
	netdev, Zach Brown, Christoph Hellwig, Chase Venters,
	Johann Borck, linux-kernel, Jeff Garzik, Jamal Hadi Salim,
	Ingo Molnar


poll/select() notifications.

This patch includes generic poll/select notifications.
kevent_poll works simialr to epoll and has the same issues (callback
is invoked not from internal state machine of the caller, but through
process awake, a lot of allocations and so on).

Signed-off-by: Evgeniy Polyakov <johnpol@2ka.mitp.ru>

diff --git a/fs/file_table.c b/fs/file_table.c
index 4c17a18..46f458c 100644
--- a/fs/file_table.c
+++ b/fs/file_table.c
@@ -20,6 +20,7 @@
 #include <linux/cdev.h>
 #include <linux/fsnotify.h>
 #include <linux/sysctl.h>
+#include <linux/kevent.h>
 #include <linux/percpu_counter.h>
 
 #include <asm/atomic.h>
@@ -119,6 +120,7 @@ struct file *get_empty_filp(void)
 	f->f_uid = tsk->fsuid;
 	f->f_gid = tsk->fsgid;
 	eventpoll_init_file(f);
+	kevent_init_file(f);
 	/* f->f_version: 0 */
 	return f;
 
@@ -164,6 +166,7 @@ void fastcall __fput(struct file *file)
 	 * in the file cleanup chain.
 	 */
 	eventpoll_release(file);
+	kevent_cleanup_file(file);
 	locks_remove_flock(file);
 
 	if (file->f_op && file->f_op->release)
diff --git a/include/linux/fs.h b/include/linux/fs.h
index 1410e53..24f605c 100644
--- a/include/linux/fs.h
+++ b/include/linux/fs.h
@@ -280,6 +280,7 @@ extern int dir_notify_enable;
 #include <linux/init.h>
 #include <linux/pid.h>
 #include <linux/mutex.h>
+#include <linux/kevent_storage.h>
 
 #include <asm/atomic.h>
 #include <asm/semaphore.h>
@@ -408,6 +409,8 @@ struct address_space_operations {
 
 	int (*readpages)(struct file *filp, struct address_space *mapping,
 			struct list_head *pages, unsigned nr_pages);
+	int (*aio_readpages)(struct file *filp, struct address_space *mapping,
+			struct list_head *pages, unsigned nr_pages, void *priv);
 
 	/*
 	 * ext3 requires that a successful prepare_write() call be followed
@@ -579,6 +582,10 @@ struct inode {
 	struct mutex		inotify_mutex;	/* protects the watches list */
 #endif
 
+#if defined CONFIG_KEVENT_SOCKET || defined CONFIG_KEVENT_PIPE
+	struct kevent_storage	st;
+#endif
+
 	unsigned long		i_state;
 	unsigned long		dirtied_when;	/* jiffies of first dirtying */
 
@@ -738,6 +745,9 @@ struct file {
 	struct list_head	f_ep_links;
 	spinlock_t		f_ep_lock;
 #endif /* #ifdef CONFIG_EPOLL */
+#ifdef CONFIG_KEVENT_POLL
+	struct kevent_storage	st;
+#endif
 	struct address_space	*f_mapping;
 };
 extern spinlock_t files_lock;
diff --git a/kernel/kevent/kevent_poll.c b/kernel/kevent/kevent_poll.c
new file mode 100644
index 0000000..58129fa
--- /dev/null
+++ b/kernel/kevent/kevent_poll.c
@@ -0,0 +1,234 @@
+/*
+ * 2006 Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ */
+
+#include <linux/kernel.h>
+#include <linux/types.h>
+#include <linux/list.h>
+#include <linux/slab.h>
+#include <linux/spinlock.h>
+#include <linux/timer.h>
+#include <linux/file.h>
+#include <linux/kevent.h>
+#include <linux/poll.h>
+#include <linux/fs.h>
+
+static struct kmem_cache *kevent_poll_container_cache;
+static struct kmem_cache *kevent_poll_priv_cache;
+
+struct kevent_poll_ctl
+{
+	struct poll_table_struct 	pt;
+	struct kevent			*k;
+};
+
+struct kevent_poll_wait_container
+{
+	struct list_head		container_entry;
+	wait_queue_head_t		*whead;
+	wait_queue_t			wait;
+	struct kevent			*k;
+};
+
+struct kevent_poll_private
+{
+	struct list_head		container_list;
+	spinlock_t			container_lock;
+};
+
+static int kevent_poll_enqueue(struct kevent *k);
+static int kevent_poll_dequeue(struct kevent *k);
+static int kevent_poll_callback(struct kevent *k);
+
+static int kevent_poll_wait_callback(wait_queue_t *wait,
+		unsigned mode, int sync, void *key)
+{
+	struct kevent_poll_wait_container *cont =
+		container_of(wait, struct kevent_poll_wait_container, wait);
+	struct kevent *k = cont->k;
+
+	kevent_storage_ready(k->st, NULL, KEVENT_MASK_ALL);
+	return 0;
+}
+
+static void kevent_poll_qproc(struct file *file, wait_queue_head_t *whead,
+		struct poll_table_struct *poll_table)
+{
+	struct kevent *k =
+		container_of(poll_table, struct kevent_poll_ctl, pt)->k;
+	struct kevent_poll_private *priv = k->priv;
+	struct kevent_poll_wait_container *cont;
+	unsigned long flags;
+
+	cont = kmem_cache_alloc(kevent_poll_container_cache, GFP_KERNEL);
+	if (!cont) {
+		kevent_break(k);
+		return;
+	}
+
+	cont->k = k;
+	init_waitqueue_func_entry(&cont->wait, kevent_poll_wait_callback);
+	cont->whead = whead;
+
+	spin_lock_irqsave(&priv->container_lock, flags);
+	list_add_tail(&cont->container_entry, &priv->container_list);
+	spin_unlock_irqrestore(&priv->container_lock, flags);
+
+	add_wait_queue(whead, &cont->wait);
+}
+
+static int kevent_poll_enqueue(struct kevent *k)
+{
+	struct file *file;
+	int err;
+	unsigned int revents;
+	unsigned long flags;
+	struct kevent_poll_ctl ctl;
+	struct kevent_poll_private *priv;
+
+	file = fget(k->event.id.raw[0]);
+	if (!file)
+		return -EBADF;
+	
+	err = -EINVAL;
+	if (!file->f_op || !file->f_op->poll)
+		goto err_out_fput;
+	
+	err = -ENOMEM;
+	priv = kmem_cache_alloc(kevent_poll_priv_cache, GFP_KERNEL);
+	if (!priv)
+		goto err_out_fput;
+
+	spin_lock_init(&priv->container_lock);
+	INIT_LIST_HEAD(&priv->container_list);
+
+	k->priv = priv;
+
+	ctl.k = k;
+	init_poll_funcptr(&ctl.pt, &kevent_poll_qproc);
+	
+	err = kevent_storage_enqueue(&file->st, k);
+	if (err)
+		goto err_out_free;
+
+	revents = file->f_op->poll(file, &ctl.pt);
+	if (k->event.req_flags & KEVENT_REQ_ALWAYS_QUEUE) {
+		kevent_requeue(k);
+	} else {
+		if (revents & k->event.event) {
+			err = 1;
+			goto out_dequeue;
+		}
+	}
+
+	spin_lock_irqsave(&k->ulock, flags);
+	k->event.req_flags |= KEVENT_REQ_LAST_CHECK;
+	spin_unlock_irqrestore(&k->ulock, flags);
+
+	return 0;
+
+out_dequeue:
+	kevent_storage_dequeue(k->st, k);
+err_out_free:
+	kmem_cache_free(kevent_poll_priv_cache, priv);
+err_out_fput:
+	fput(file);
+	return err;
+}
+
+static int kevent_poll_dequeue(struct kevent *k)
+{
+	struct file *file = k->st->origin;
+	struct kevent_poll_private *priv = k->priv;
+	struct kevent_poll_wait_container *w, *n;
+	unsigned long flags;
+
+	kevent_storage_dequeue(k->st, k);
+
+	spin_lock_irqsave(&priv->container_lock, flags);
+	list_for_each_entry_safe(w, n, &priv->container_list, container_entry) {
+		list_del(&w->container_entry);
+		remove_wait_queue(w->whead, &w->wait);
+		kmem_cache_free(kevent_poll_container_cache, w);
+	}
+	spin_unlock_irqrestore(&priv->container_lock, flags);
+
+	kmem_cache_free(kevent_poll_priv_cache, priv);
+	k->priv = NULL;
+
+	fput(file);
+
+	return 0;
+}
+
+static int kevent_poll_callback(struct kevent *k)
+{
+	if (k->event.req_flags & KEVENT_REQ_LAST_CHECK) {
+		return 1;
+	} else {
+		struct file *file = k->st->origin;
+		unsigned int revents = file->f_op->poll(file, NULL);
+
+		k->event.ret_data[0] = revents & k->event.event;
+
+		return (revents & k->event.event);
+	}
+}
+
+static int __init kevent_poll_sys_init(void)
+{
+	struct kevent_callbacks pc = {
+		.callback = &kevent_poll_callback,
+		.enqueue = &kevent_poll_enqueue,
+		.dequeue = &kevent_poll_dequeue,
+		.flags = 0,
+	};
+
+	kevent_poll_container_cache = kmem_cache_create("kevent_poll_container_cache",
+			sizeof(struct kevent_poll_wait_container), 0, 0, NULL, NULL);
+	if (!kevent_poll_container_cache) {
+		printk(KERN_ERR "Failed to create kevent poll container cache.\n");
+		return -ENOMEM;
+	}
+
+	kevent_poll_priv_cache = kmem_cache_create("kevent_poll_priv_cache",
+			sizeof(struct kevent_poll_private), 0, 0, NULL, NULL);
+	if (!kevent_poll_priv_cache) {
+		printk(KERN_ERR "Failed to create kevent poll private data cache.\n");
+		kmem_cache_destroy(kevent_poll_container_cache);
+		kevent_poll_container_cache = NULL;
+		return -ENOMEM;
+	}
+
+	kevent_add_callbacks(&pc, KEVENT_POLL);
+
+	printk(KERN_INFO "Kevent poll()/select() subsystem has been initialized.\n");
+	return 0;
+}
+
+static struct lock_class_key kevent_poll_key;
+
+void kevent_poll_reinit(struct file *file)
+{
+	lockdep_set_class(&file->st.lock, &kevent_poll_key);
+}
+
+static void __exit kevent_poll_sys_fini(void)
+{
+	kmem_cache_destroy(kevent_poll_priv_cache);
+	kmem_cache_destroy(kevent_poll_container_cache);
+}
+
+module_init(kevent_poll_sys_init);
+module_exit(kevent_poll_sys_fini);


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [take36 4/10] kevent: Socket notifications.
  2007-02-12 11:27 ` [take36 3/10] kevent: poll/select() notifications Evgeniy Polyakov
@ 2007-02-12 11:27   ` Evgeniy Polyakov
  2007-02-12 11:27     ` [take36 5/10] kevent: Timer notifications Evgeniy Polyakov
  0 siblings, 1 reply; 12+ messages in thread
From: Evgeniy Polyakov @ 2007-02-12 11:27 UTC (permalink / raw)
  To: Evgeniy Polyakov
  Cc: David Miller, Ulrich Drepper, Andrew Morton, Evgeniy Polyakov,
	netdev, Zach Brown, Christoph Hellwig, Chase Venters,
	Johann Borck, linux-kernel, Jeff Garzik, Jamal Hadi Salim,
	Ingo Molnar


Socket notifications.

This patch includes socket send/recv/accept notifications.

Signed-off-by: Evgeniy Polyakov <johnpol@2ka.mipt.ru>

diff --git a/fs/inode.c b/fs/inode.c
index bf21dc6..82817b1 100644
--- a/fs/inode.c
+++ b/fs/inode.c
@@ -21,6 +21,7 @@
 #include <linux/cdev.h>
 #include <linux/bootmem.h>
 #include <linux/inotify.h>
+#include <linux/kevent.h>
 #include <linux/mount.h>
 
 /*
@@ -164,12 +165,18 @@ static struct inode *alloc_inode(struct super_block *sb)
 		}
 		inode->i_private = NULL;
 		inode->i_mapping = mapping;
+#if defined CONFIG_KEVENT_SOCKET || defined CONFIG_KEVENT_PIPE
+		kevent_storage_init(inode, &inode->st);
+#endif
 	}
 	return inode;
 }
 
 void destroy_inode(struct inode *inode) 
 {
+#if defined CONFIG_KEVENT_SOCKET || defined CONFIG_KEVENT_PIPE
+	kevent_storage_fini(&inode->st);
+#endif
 	BUG_ON(inode_has_buffers(inode));
 	security_inode_free(inode);
 	if (inode->i_sb->s_op->destroy_inode)
diff --git a/include/net/sock.h b/include/net/sock.h
index 03684e7..d840399 100644
--- a/include/net/sock.h
+++ b/include/net/sock.h
@@ -49,6 +49,7 @@
 #include <linux/skbuff.h>	/* struct sk_buff */
 #include <linux/mm.h>
 #include <linux/security.h>
+#include <linux/kevent.h>
 
 #include <linux/filter.h>
 
@@ -451,6 +452,21 @@ static inline int sk_stream_memory_free(struct sock *sk)
 
 extern void sk_stream_rfree(struct sk_buff *skb);
 
+struct socket_alloc {
+	struct socket socket;
+	struct inode vfs_inode;
+};
+
+static inline struct socket *SOCKET_I(struct inode *inode)
+{
+	return &container_of(inode, struct socket_alloc, vfs_inode)->socket;
+}
+
+static inline struct inode *SOCK_INODE(struct socket *socket)
+{
+	return &container_of(socket, struct socket_alloc, socket)->vfs_inode;
+}
+
 static inline void sk_stream_set_owner_r(struct sk_buff *skb, struct sock *sk)
 {
 	skb->sk = sk;
@@ -478,6 +494,7 @@ static inline void sk_add_backlog(struct sock *sk, struct sk_buff *skb)
 		sk->sk_backlog.tail = skb;
 	}
 	skb->next = NULL;
+	kevent_socket_notify(sk, KEVENT_SOCKET_RECV);
 }
 
 #define sk_wait_event(__sk, __timeo, __condition)		\
@@ -679,21 +696,6 @@ static inline struct kiocb *siocb_to_kiocb(struct sock_iocb *si)
 	return si->kiocb;
 }
 
-struct socket_alloc {
-	struct socket socket;
-	struct inode vfs_inode;
-};
-
-static inline struct socket *SOCKET_I(struct inode *inode)
-{
-	return &container_of(inode, struct socket_alloc, vfs_inode)->socket;
-}
-
-static inline struct inode *SOCK_INODE(struct socket *socket)
-{
-	return &container_of(socket, struct socket_alloc, socket)->vfs_inode;
-}
-
 extern void __sk_stream_mem_reclaim(struct sock *sk);
 extern int sk_stream_mem_schedule(struct sock *sk, int size, int kind);
 
diff --git a/include/net/tcp.h b/include/net/tcp.h
index cd8fa0c..7344d61 100644
--- a/include/net/tcp.h
+++ b/include/net/tcp.h
@@ -864,6 +864,7 @@ static inline int tcp_prequeue(struct sock *sk, struct sk_buff *skb)
 			tp->ucopy.memory = 0;
 		} else if (skb_queue_len(&tp->ucopy.prequeue) == 1) {
 			wake_up_interruptible(sk->sk_sleep);
+			kevent_socket_notify(sk, KEVENT_SOCKET_RECV|KEVENT_SOCKET_SEND);
 			if (!inet_csk_ack_scheduled(sk))
 				inet_csk_reset_xmit_timer(sk, ICSK_TIME_DACK,
 						          (3 * TCP_RTO_MIN) / 4,
diff --git a/kernel/kevent/kevent_socket.c b/kernel/kevent/kevent_socket.c
new file mode 100644
index 0000000..d1a2701
--- /dev/null
+++ b/kernel/kevent/kevent_socket.c
@@ -0,0 +1,144 @@
+/*
+ * 	kevent_socket.c
+ * 
+ * 2006 Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ * 
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+#include <linux/kernel.h>
+#include <linux/types.h>
+#include <linux/list.h>
+#include <linux/slab.h>
+#include <linux/spinlock.h>
+#include <linux/timer.h>
+#include <linux/file.h>
+#include <linux/tcp.h>
+#include <linux/kevent.h>
+
+#include <net/sock.h>
+#include <net/request_sock.h>
+#include <net/inet_connection_sock.h>
+
+static int kevent_socket_callback(struct kevent *k)
+{
+	struct inode *inode = k->st->origin;
+	unsigned int events = SOCKET_I(inode)->ops->poll(SOCKET_I(inode)->file, SOCKET_I(inode), NULL);
+
+	if ((events & (POLLIN | POLLRDNORM)) && (k->event.event & (KEVENT_SOCKET_RECV | KEVENT_SOCKET_ACCEPT)))
+		return 1;
+	if ((events & (POLLOUT | POLLWRNORM)) && (k->event.event & KEVENT_SOCKET_SEND))
+		return 1;
+	if (events & (POLLERR | POLLHUP | POLLRDHUP | POLLREMOVE))
+		return -1;
+	return 0;
+}
+
+int kevent_socket_enqueue(struct kevent *k)
+{
+	struct inode *inode;
+	struct socket *sock;
+	int err = -EBADF;
+
+	sock = sockfd_lookup(k->event.id.raw[0], &err);
+	if (!sock)
+		goto err_out_exit;
+
+	inode = igrab(SOCK_INODE(sock));
+	if (!inode)
+		goto err_out_fput;
+
+	err = kevent_storage_enqueue(&inode->st, k);
+	if (err)
+		goto err_out_iput;
+
+	if (k->event.req_flags & KEVENT_REQ_ALWAYS_QUEUE) {
+		kevent_requeue(k);
+		err = 0;
+	} else {
+		err = k->callbacks.callback(k);
+		if (err)
+			goto err_out_dequeue;
+	}
+
+	return err;
+
+err_out_dequeue:
+	kevent_storage_dequeue(k->st, k);
+err_out_iput:
+	iput(inode);
+err_out_fput:
+	sockfd_put(sock);
+err_out_exit:
+	return err;
+}
+
+int kevent_socket_dequeue(struct kevent *k)
+{
+	struct inode *inode = k->st->origin;
+	struct socket *sock;
+
+	kevent_storage_dequeue(k->st, k);
+
+	sock = SOCKET_I(inode);
+	iput(inode);
+	sockfd_put(sock);
+
+	return 0;
+}
+
+void kevent_socket_notify(struct sock *sk, u32 event)
+{
+	if (sk->sk_socket)
+		kevent_storage_ready(&SOCK_INODE(sk->sk_socket)->st, NULL, event);
+}
+
+/*
+ * It is required for network protocols compiled as modules, like IPv6.
+ */
+EXPORT_SYMBOL_GPL(kevent_socket_notify);
+
+#ifdef CONFIG_LOCKDEP
+static struct lock_class_key kevent_sock_key;
+
+void kevent_socket_reinit(struct socket *sock)
+{
+	struct inode *inode = SOCK_INODE(sock);
+
+	lockdep_set_class(&inode->st.lock, &kevent_sock_key);
+}
+
+void kevent_sk_reinit(struct sock *sk)
+{
+	if (sk->sk_socket) {
+		struct inode *inode = SOCK_INODE(sk->sk_socket);
+
+		lockdep_set_class(&inode->st.lock, &kevent_sock_key);
+	}
+}
+#endif
+static int __init kevent_init_socket(void)
+{
+	struct kevent_callbacks sc = {
+		.callback = &kevent_socket_callback,
+		.enqueue = &kevent_socket_enqueue,
+		.dequeue = &kevent_socket_dequeue,
+		.flags = 0,
+	};
+
+	return kevent_add_callbacks(&sc, KEVENT_SOCKET);
+}
+module_init(kevent_init_socket);
diff --git a/net/core/sock.c b/net/core/sock.c
index 0ed5b4f..e687f54 100644
--- a/net/core/sock.c
+++ b/net/core/sock.c
@@ -1393,6 +1393,7 @@ static void sock_def_wakeup(struct sock *sk)
 	if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
 		wake_up_interruptible_all(sk->sk_sleep);
 	read_unlock(&sk->sk_callback_lock);
+	kevent_socket_notify(sk, KEVENT_SOCKET_RECV|KEVENT_SOCKET_SEND);
 }
 
 static void sock_def_error_report(struct sock *sk)
@@ -1402,6 +1403,7 @@ static void sock_def_error_report(struct sock *sk)
 		wake_up_interruptible(sk->sk_sleep);
 	sk_wake_async(sk,0,POLL_ERR); 
 	read_unlock(&sk->sk_callback_lock);
+	kevent_socket_notify(sk, KEVENT_SOCKET_RECV|KEVENT_SOCKET_SEND);
 }
 
 static void sock_def_readable(struct sock *sk, int len)
@@ -1411,6 +1413,7 @@ static void sock_def_readable(struct sock *sk, int len)
 		wake_up_interruptible(sk->sk_sleep);
 	sk_wake_async(sk,1,POLL_IN);
 	read_unlock(&sk->sk_callback_lock);
+	kevent_socket_notify(sk, KEVENT_SOCKET_RECV|KEVENT_SOCKET_SEND);
 }
 
 static void sock_def_write_space(struct sock *sk)
@@ -1430,6 +1433,7 @@ static void sock_def_write_space(struct sock *sk)
 	}
 
 	read_unlock(&sk->sk_callback_lock);
+	kevent_socket_notify(sk, KEVENT_SOCKET_SEND|KEVENT_SOCKET_RECV);
 }
 
 static void sock_def_destruct(struct sock *sk)
@@ -1480,6 +1484,8 @@ void sock_init_data(struct socket *sock, struct sock *sk)
 	sk->sk_state		=	TCP_CLOSE;
 	sk->sk_socket		=	sock;
 
+	kevent_sk_reinit(sk);
+
 	sock_set_flag(sk, SOCK_ZAPPED);
 
 	if(sock)
@@ -1546,8 +1552,10 @@ void fastcall release_sock(struct sock *sk)
 	if (sk->sk_backlog.tail)
 		__release_sock(sk);
 	sk->sk_lock.owner = NULL;
-	if (waitqueue_active(&sk->sk_lock.wq))
+	if (waitqueue_active(&sk->sk_lock.wq)) {
 		wake_up(&sk->sk_lock.wq);
+		kevent_socket_notify(sk, KEVENT_SOCKET_RECV|KEVENT_SOCKET_SEND);
+	}
 	spin_unlock_bh(&sk->sk_lock.slock);
 }
 EXPORT_SYMBOL(release_sock);
diff --git a/net/core/stream.c b/net/core/stream.c
index d1d7dec..2878c2a 100644
--- a/net/core/stream.c
+++ b/net/core/stream.c
@@ -36,6 +36,7 @@ void sk_stream_write_space(struct sock *sk)
 			wake_up_interruptible(sk->sk_sleep);
 		if (sock->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
 			sock_wake_async(sock, 2, POLL_OUT);
+		kevent_socket_notify(sk, KEVENT_SOCKET_SEND|KEVENT_SOCKET_RECV);
 	}
 }
 
diff --git a/net/ipv4/tcp_input.c b/net/ipv4/tcp_input.c
index c26076f..8e56c71 100644
--- a/net/ipv4/tcp_input.c
+++ b/net/ipv4/tcp_input.c
@@ -3128,6 +3128,7 @@ static void tcp_ofo_queue(struct sock *sk)
 
 		__skb_unlink(skb, &tp->out_of_order_queue);
 		__skb_queue_tail(&sk->sk_receive_queue, skb);
+		kevent_socket_notify(sk, KEVENT_SOCKET_RECV);
 		tp->rcv_nxt = TCP_SKB_CB(skb)->end_seq;
 		if(skb->h.th->fin)
 			tcp_fin(skb, sk, skb->h.th);
diff --git a/net/ipv4/tcp_ipv4.c b/net/ipv4/tcp_ipv4.c
index 12de90a..fca282e 100644
--- a/net/ipv4/tcp_ipv4.c
+++ b/net/ipv4/tcp_ipv4.c
@@ -61,6 +61,7 @@
 #include <linux/jhash.h>
 #include <linux/init.h>
 #include <linux/times.h>
+#include <linux/kevent.h>
 
 #include <net/icmp.h>
 #include <net/inet_hashtables.h>
@@ -1391,6 +1392,7 @@ int tcp_v4_conn_request(struct sock *sk, struct sk_buff *skb)
 	   	reqsk_free(req);
 	} else {
 		inet_csk_reqsk_queue_hash_add(sk, req, TCP_TIMEOUT_INIT);
+		kevent_socket_notify(sk, KEVENT_SOCKET_ACCEPT);
 	}
 	return 0;
 
diff --git a/net/socket.c b/net/socket.c
index 4e39631..776dc2e 100644
--- a/net/socket.c
+++ b/net/socket.c
@@ -84,6 +84,7 @@
 #include <linux/kmod.h>
 #include <linux/audit.h>
 #include <linux/wireless.h>
+#include <linux/kevent.h>
 
 #include <asm/uaccess.h>
 #include <asm/unistd.h>
@@ -496,6 +497,8 @@ static struct socket *sock_alloc(void)
 	inode->i_uid = current->fsuid;
 	inode->i_gid = current->fsgid;
 
+	kevent_socket_reinit(sock);
+
 	get_cpu_var(sockets_in_use)++;
 	put_cpu_var(sockets_in_use);
 	return sock;
diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index 2f208c7..835e20f 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -1563,8 +1563,10 @@ static int unix_dgram_recvmsg(struct kiocb *iocb, struct socket *sock,
 	struct scm_cookie tmp_scm;
 	struct sock *sk = sock->sk;
 	struct unix_sock *u = unix_sk(sk);
+	struct sock *other;
 	int noblock = flags & MSG_DONTWAIT;
 	struct sk_buff *skb;
+
 	int err;
 
 	err = -EOPNOTSUPP;
@@ -1580,6 +1582,12 @@ static int unix_dgram_recvmsg(struct kiocb *iocb, struct socket *sock,
 		goto out_unlock;
 
 	wake_up_interruptible(&u->peer_wait);
+	other =unix_peer_get(sk);
+	if (other) {
+		kevent_socket_notify(other, KEVENT_SOCKET_SEND);
+		sock_put(other);
+	} else
+		kevent_socket_notify(sk, KEVENT_SOCKET_RECV);
 
 	if (msg->msg_name)
 		unix_copy_addr(msg, skb->sk);
@@ -1674,7 +1682,7 @@ static int unix_stream_recvmsg(struct kiocb *iocb, struct socket *sock,
 {
 	struct sock_iocb *siocb = kiocb_to_siocb(iocb);
 	struct scm_cookie tmp_scm;
-	struct sock *sk = sock->sk;
+	struct sock *sk = sock->sk, *other;
 	struct unix_sock *u = unix_sk(sk);
 	struct sockaddr_un *sunaddr=msg->msg_name;
 	int copied = 0;
@@ -1803,6 +1811,14 @@ static int unix_stream_recvmsg(struct kiocb *iocb, struct socket *sock,
 		}
 	} while (size);
 
+	other =unix_peer_get(sk);
+	if (other) {
+		kevent_socket_notify(other, KEVENT_SOCKET_SEND);
+		sock_put(other);
+	}
+	if (sk->sk_shutdown & RCV_SHUTDOWN || !other)
+		kevent_socket_notify(sk, KEVENT_SOCKET_RECV);
+
 	mutex_unlock(&u->readlock);
 	scm_recv(sock, msg, siocb->scm, flags);
 out:
@@ -1824,6 +1840,10 @@ static int unix_shutdown(struct socket *sock, int mode)
 			sock_hold(other);
 		unix_state_wunlock(sk);
 		sk->sk_state_change(sk);
+		kevent_socket_notify(sk, KEVENT_SOCKET_SEND|KEVENT_SOCKET_RECV);
+		if (other)
+			kevent_socket_notify(other, KEVENT_SOCKET_SEND|KEVENT_SOCKET_RECV);
+		kevent_socket_notify(sk, KEVENT_SOCKET_SEND|KEVENT_SOCKET_RECV);
 
 		if (other &&
 			(sk->sk_type == SOCK_STREAM || sk->sk_type == SOCK_SEQPACKET)) {


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [take36 6/10] kevent: Pipe notifications.
  2007-02-12 11:27     ` [take36 5/10] kevent: Timer notifications Evgeniy Polyakov
@ 2007-02-12 11:27       ` Evgeniy Polyakov
  2007-02-12 11:27         ` [take36 7/10] kevent: Signal notifications Evgeniy Polyakov
  0 siblings, 1 reply; 12+ messages in thread
From: Evgeniy Polyakov @ 2007-02-12 11:27 UTC (permalink / raw)
  To: Evgeniy Polyakov
  Cc: David Miller, Ulrich Drepper, Andrew Morton, Evgeniy Polyakov,
	netdev, Zach Brown, Christoph Hellwig, Chase Venters,
	Johann Borck, linux-kernel, Jeff Garzik, Jamal Hadi Salim,
	Ingo Molnar


Pipe notifications.


diff --git a/fs/pipe.c b/fs/pipe.c
index 68090e8..0c75bf1 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -16,6 +16,7 @@
 #include <linux/uio.h>
 #include <linux/highmem.h>
 #include <linux/pagemap.h>
+#include <linux/kevent.h>
 
 #include <asm/uaccess.h>
 #include <asm/ioctls.h>
@@ -313,6 +314,7 @@ redo:
 			break;
 		}
 		if (do_wakeup) {
+			kevent_pipe_notify(inode, KEVENT_SOCKET_SEND);
 			wake_up_interruptible_sync(&pipe->wait);
  			kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
 		}
@@ -322,6 +324,7 @@ redo:
 
 	/* Signal writers asynchronously that there is more room. */
 	if (do_wakeup) {
+		kevent_pipe_notify(inode, KEVENT_SOCKET_SEND);
 		wake_up_interruptible(&pipe->wait);
 		kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
 	}
@@ -484,6 +487,7 @@ redo2:
 			break;
 		}
 		if (do_wakeup) {
+			kevent_pipe_notify(inode, KEVENT_SOCKET_RECV);
 			wake_up_interruptible_sync(&pipe->wait);
 			kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
 			do_wakeup = 0;
@@ -495,6 +499,7 @@ redo2:
 out:
 	mutex_unlock(&inode->i_mutex);
 	if (do_wakeup) {
+		kevent_pipe_notify(inode, KEVENT_SOCKET_RECV);
 		wake_up_interruptible(&pipe->wait);
 		kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
 	}
@@ -590,6 +595,7 @@ pipe_release(struct inode *inode, int decr, int decw)
 		free_pipe_info(inode);
 	} else {
 		wake_up_interruptible(&pipe->wait);
+		kevent_pipe_notify(inode, KEVENT_SOCKET_SEND|KEVENT_SOCKET_RECV);
 		kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
 		kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
 	}
diff --git a/kernel/kevent/kevent_pipe.c b/kernel/kevent/kevent_pipe.c
new file mode 100644
index 0000000..91dc1eb
--- /dev/null
+++ b/kernel/kevent/kevent_pipe.c
@@ -0,0 +1,123 @@
+/*
+ * 	kevent_pipe.c
+ * 
+ * 2006 Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ * 
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+#include <linux/kernel.h>
+#include <linux/types.h>
+#include <linux/slab.h>
+#include <linux/spinlock.h>
+#include <linux/file.h>
+#include <linux/fs.h>
+#include <linux/kevent.h>
+#include <linux/pipe_fs_i.h>
+
+static int kevent_pipe_callback(struct kevent *k)
+{
+	struct inode *inode = k->st->origin;
+	struct pipe_inode_info *pipe = inode->i_pipe;
+	int nrbufs = pipe->nrbufs;
+
+	if (k->event.event & KEVENT_SOCKET_RECV && nrbufs > 0) {
+		if (!pipe->writers)
+			return -1;
+		return 1;
+	}
+	
+	if (k->event.event & KEVENT_SOCKET_SEND && nrbufs < PIPE_BUFFERS) {
+		if (!pipe->readers)
+			return -1;
+		return 1;
+	}
+
+	return 0;
+}
+
+int kevent_pipe_enqueue(struct kevent *k)
+{
+	struct file *pipe;
+	int err = -EBADF;
+	struct inode *inode;
+
+	pipe = fget(k->event.id.raw[0]);
+	if (!pipe)
+		goto err_out_exit;
+
+	inode = igrab(pipe->f_dentry->d_inode);
+	if (!inode)
+		goto err_out_fput;
+
+	err = -EINVAL;
+	if (!S_ISFIFO(inode->i_mode))
+		goto err_out_iput;
+
+	err = kevent_storage_enqueue(&inode->st, k);
+	if (err)
+		goto err_out_iput;
+
+	if (k->event.req_flags & KEVENT_REQ_ALWAYS_QUEUE) {
+		kevent_requeue(k);
+		err = 0;
+	} else {
+		err = k->callbacks.callback(k);
+		if (err)
+			goto err_out_dequeue;
+	}
+
+	fput(pipe);
+
+	return err;
+
+err_out_dequeue:
+	kevent_storage_dequeue(k->st, k);
+err_out_iput:
+	iput(inode);
+err_out_fput:
+	fput(pipe);
+err_out_exit:
+	return err;
+}
+
+int kevent_pipe_dequeue(struct kevent *k)
+{
+	struct inode *inode = k->st->origin;
+
+	kevent_storage_dequeue(k->st, k);
+	iput(inode);
+
+	return 0;
+}
+
+void kevent_pipe_notify(struct inode *inode, u32 event)
+{
+	kevent_storage_ready(&inode->st, NULL, event);
+}
+
+static int __init kevent_init_pipe(void)
+{
+	struct kevent_callbacks sc = {
+		.callback = &kevent_pipe_callback,
+		.enqueue = &kevent_pipe_enqueue,
+		.dequeue = &kevent_pipe_dequeue,
+		.flags = 0,
+	};
+
+	return kevent_add_callbacks(&sc, KEVENT_PIPE);
+}
+module_init(kevent_init_pipe);


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [take36 7/10] kevent: Signal notifications.
  2007-02-12 11:27       ` [take36 6/10] kevent: Pipe notifications Evgeniy Polyakov
@ 2007-02-12 11:27         ` Evgeniy Polyakov
  2007-02-12 11:27           ` [take36 8/10] kevent: Kevent posix timer notifications Evgeniy Polyakov
  0 siblings, 1 reply; 12+ messages in thread
From: Evgeniy Polyakov @ 2007-02-12 11:27 UTC (permalink / raw)
  To: Evgeniy Polyakov
  Cc: David Miller, Ulrich Drepper, Andrew Morton, Evgeniy Polyakov,
	netdev, Zach Brown, Christoph Hellwig, Chase Venters,
	Johann Borck, linux-kernel, Jeff Garzik, Jamal Hadi Salim,
	Ingo Molnar


Signal notifications.

This type of notifications allows to deliver signals through kevent queue.
One can find example application signal.c on project homepage.

If KEVENT_SIGNAL_NOMASK bit is set in raw_u64 id then signal will be
delivered only through queue, otherwise both delivery types are used - old
through update of mask of pending signals and through queue.

If signal is delivered only through kevent queue, mask of pending signals
is not updated at all, which is equal to putting signal into blocked mask,
but with delivery of that signal through kevent queue.

Signed-off-by: Evgeniy Polyakov <johnpol@2ka.mipt.ru>


diff --git a/include/linux/sched.h b/include/linux/sched.h
index 4463735..e7372f2 100644
--- a/include/linux/sched.h
+++ b/include/linux/sched.h
@@ -82,6 +82,7 @@ struct sched_param {
 #include <linux/resource.h>
 #include <linux/timer.h>
 #include <linux/hrtimer.h>
+#include <linux/kevent_storage.h>
 #include <linux/task_io_accounting.h>
 
 #include <asm/processor.h>
@@ -1048,6 +1049,10 @@ struct task_struct {
 #ifdef	CONFIG_TASK_DELAY_ACCT
 	struct task_delay_info *delays;
 #endif
+#ifdef CONFIG_KEVENT_SIGNAL
+	struct kevent_storage st;
+	u32 kevent_signals;
+#endif
 #ifdef CONFIG_FAULT_INJECTION
 	int make_it_fail;
 #endif
diff --git a/kernel/fork.c b/kernel/fork.c
index fc723e5..fd7c749 100644
--- a/kernel/fork.c
+++ b/kernel/fork.c
@@ -49,6 +49,7 @@
 #include <linux/delayacct.h>
 #include <linux/taskstats_kern.h>
 #include <linux/random.h>
+#include <linux/kevent.h>
 
 #include <asm/pgtable.h>
 #include <asm/pgalloc.h>
@@ -118,6 +119,9 @@ void __put_task_struct(struct task_struct *tsk)
 	WARN_ON(atomic_read(&tsk->usage));
 	WARN_ON(tsk == current);
 
+#ifdef CONFIG_KEVENT_SIGNAL
+	kevent_storage_fini(&tsk->st);
+#endif
 	security_task_free(tsk);
 	free_uid(tsk->user);
 	put_group_info(tsk->group_info);
@@ -1126,6 +1130,10 @@ static struct task_struct *copy_process(unsigned long clone_flags,
 	if (retval)
 		goto bad_fork_cleanup_namespaces;
 
+#ifdef CONFIG_KEVENT_SIGNAL
+	kevent_storage_init(p, &p->st);
+#endif
+
 	p->set_child_tid = (clone_flags & CLONE_CHILD_SETTID) ? child_tidptr : NULL;
 	/*
 	 * Clear TID on mm_release()?
diff --git a/kernel/kevent/kevent_signal.c b/kernel/kevent/kevent_signal.c
new file mode 100644
index 0000000..abe3972
--- /dev/null
+++ b/kernel/kevent/kevent_signal.c
@@ -0,0 +1,94 @@
+/*
+ * 	kevent_signal.c
+ * 
+ * 2006 Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ * 
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+#include <linux/kernel.h>
+#include <linux/types.h>
+#include <linux/slab.h>
+#include <linux/spinlock.h>
+#include <linux/file.h>
+#include <linux/fs.h>
+#include <linux/kevent.h>
+
+static int kevent_signal_callback(struct kevent *k)
+{
+	struct task_struct *tsk = k->st->origin;
+	int sig = k->event.id.raw[0];
+	int ret = 0;
+
+	if (sig == tsk->kevent_signals)
+		ret = 1;
+
+	if (ret && (k->event.id.raw_u64 & KEVENT_SIGNAL_NOMASK))
+		tsk->kevent_signals |= 0x80000000;
+
+	return ret;
+}
+
+int kevent_signal_enqueue(struct kevent *k)
+{
+	int err;
+
+	err = kevent_storage_enqueue(&current->st, k);
+	if (err)
+		goto err_out_exit;
+
+	if (k->event.req_flags & KEVENT_REQ_ALWAYS_QUEUE) {
+		kevent_requeue(k);
+		err = 0;
+	} else {
+		err = k->callbacks.callback(k);
+		if (err)
+			goto err_out_dequeue;
+	}
+
+	return err;
+
+err_out_dequeue:
+	kevent_storage_dequeue(k->st, k);
+err_out_exit:
+	return err;
+}
+
+int kevent_signal_dequeue(struct kevent *k)
+{
+	kevent_storage_dequeue(k->st, k);
+	return 0;
+}
+
+int kevent_signal_notify(struct task_struct *tsk, int sig)
+{
+	tsk->kevent_signals = sig;
+	kevent_storage_ready(&tsk->st, NULL, KEVENT_SIGNAL_DELIVERY);
+	return (tsk->kevent_signals & 0x80000000);
+}
+
+static int __init kevent_init_signal(void)
+{
+	struct kevent_callbacks sc = {
+		.callback = &kevent_signal_callback,
+		.enqueue = &kevent_signal_enqueue,
+		.dequeue = &kevent_signal_dequeue,
+		.flags = 0,
+	};
+
+	return kevent_add_callbacks(&sc, KEVENT_SIGNAL);
+}
+module_init(kevent_init_signal);
diff --git a/kernel/signal.c b/kernel/signal.c
index 5630255..f12ebc0 100644
--- a/kernel/signal.c
+++ b/kernel/signal.c
@@ -23,6 +23,7 @@
 #include <linux/ptrace.h>
 #include <linux/signal.h>
 #include <linux/capability.h>
+#include <linux/kevent.h>
 #include <linux/freezer.h>
 #include <linux/pid_namespace.h>
 #include <linux/nsproxy.h>
@@ -714,6 +715,9 @@ static int send_signal(int sig, struct siginfo *info, struct task_struct *t,
 {
 	struct sigqueue * q = NULL;
 	int ret = 0;
+	
+	if (kevent_signal_notify(t, sig))
+		return 1;
 
 	/*
 	 * fast-pathed signals for kernel-internal things like SIGSTOP
@@ -793,6 +797,17 @@ specific_send_sig_info(int sig, struct siginfo *info, struct task_struct *t)
 	ret = send_signal(sig, info, t, &t->pending);
 	if (!ret && !sigismember(&t->blocked, sig))
 		signal_wake_up(t, sig == SIGKILL);
+#ifdef CONFIG_KEVENT_SIGNAL
+	/*
+	 * Kevent allows to deliver signals through kevent queue, 
+	 * it is possible to setup kevent to not deliver
+	 * signal through the usual way, in that case send_signal()
+	 * returns 1 and signal is delivered only through kevent queue.
+	 * We simulate successfull delivery notification through this hack:
+	 */
+	if (ret == 1)
+		ret = 0;
+#endif
 out:
 	return ret;
 }
@@ -982,6 +997,17 @@ __group_send_sig_info(int sig, struct siginfo *info, struct task_struct *p)
 	 * to avoid several races.
 	 */
 	ret = send_signal(sig, info, p, &p->signal->shared_pending);
+#ifdef CONFIG_KEVENT_SIGNAL
+	/*
+	 * Kevent allows to deliver signals through kevent queue, 
+	 * it is possible to setup kevent to not deliver
+	 * signal through the usual way, in that case send_signal()
+	 * returns 1 and signal is delivered only through kevent queue.
+	 * We simulate successfull delivery notification through this hack:
+	 */
+	if (ret == 1)
+		ret = 0;
+#endif
 	if (unlikely(ret))
 		return ret;
 


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [take36 9/10] kevent: Private userspace notifications.
  2007-02-12 11:27           ` [take36 8/10] kevent: Kevent posix timer notifications Evgeniy Polyakov
@ 2007-02-12 11:27             ` Evgeniy Polyakov
  2007-02-12 11:27               ` [take36 10/10] kevent: Kevent based generic AIO Evgeniy Polyakov
  0 siblings, 1 reply; 12+ messages in thread
From: Evgeniy Polyakov @ 2007-02-12 11:27 UTC (permalink / raw)
  To: Evgeniy Polyakov
  Cc: David Miller, Ulrich Drepper, Andrew Morton, Evgeniy Polyakov,
	netdev, Zach Brown, Christoph Hellwig, Chase Venters,
	Johann Borck, linux-kernel, Jeff Garzik, Jamal Hadi Salim,
	Ingo Molnar


Private userspace notifications.

Allows to register notifications of any private userspace
events over kevent. Events can be marked as ready using 
kevent_ctl(KEVENT_READY) command.

Signed-off-by: Evgeniy Polyakov <johnpol@2ka.mipt.ru>

diff --git a/kernel/kevent/kevent_unotify.c b/kernel/kevent/kevent_unotify.c
new file mode 100644
index 0000000..618c09c
--- /dev/null
+++ b/kernel/kevent/kevent_unotify.c
@@ -0,0 +1,62 @@
+/*
+ * 2006 Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ * 
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+#include <linux/kernel.h>
+#include <linux/kevent.h>
+
+static int kevent_unotify_callback(struct kevent *k)
+{
+	return 1;
+}
+
+int kevent_unotify_enqueue(struct kevent *k)
+{
+	int err;
+
+	err = kevent_storage_enqueue(&k->user->st, k);
+	if (err)
+		goto err_out_exit;
+
+	if (k->event.req_flags & KEVENT_REQ_ALWAYS_QUEUE)
+		kevent_requeue(k);
+
+	return 0;
+
+err_out_exit:
+	return err;
+}
+
+int kevent_unotify_dequeue(struct kevent *k)
+{
+	kevent_storage_dequeue(k->st, k);
+	return 0;
+}
+
+static int __init kevent_init_unotify(void)
+{
+	struct kevent_callbacks sc = {
+		.callback = &kevent_unotify_callback,
+		.enqueue = &kevent_unotify_enqueue,
+		.dequeue = &kevent_unotify_dequeue,
+		.flags = 0,
+	};
+
+	return kevent_add_callbacks(&sc, KEVENT_UNOTIFY);
+}
+module_init(kevent_init_unotify);


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [take36 8/10] kevent: Kevent posix timer notifications.
  2007-02-12 11:27         ` [take36 7/10] kevent: Signal notifications Evgeniy Polyakov
@ 2007-02-12 11:27           ` Evgeniy Polyakov
  2007-02-12 11:27             ` [take36 9/10] kevent: Private userspace notifications Evgeniy Polyakov
  0 siblings, 1 reply; 12+ messages in thread
From: Evgeniy Polyakov @ 2007-02-12 11:27 UTC (permalink / raw)
  To: Evgeniy Polyakov
  Cc: David Miller, Ulrich Drepper, Andrew Morton, Evgeniy Polyakov,
	netdev, Zach Brown, Christoph Hellwig, Chase Venters,
	Johann Borck, linux-kernel, Jeff Garzik, Jamal Hadi Salim,
	Ingo Molnar


Kevent posix timer notifications.

Simple extension to POSIX timers which allows
to deliver notification of the timer expiration
through kevent queue.

Example application posix_timer.c can be found
in archive on project homepage.

Signed-off-by: Evgeniy Polyakov <johnpol@2ka.mipt.ru>


diff --git a/include/asm-generic/siginfo.h b/include/asm-generic/siginfo.h
index 8786e01..3768746 100644
--- a/include/asm-generic/siginfo.h
+++ b/include/asm-generic/siginfo.h
@@ -235,6 +235,7 @@ typedef struct siginfo {
 #define SIGEV_NONE	1	/* other notification: meaningless */
 #define SIGEV_THREAD	2	/* deliver via thread creation */
 #define SIGEV_THREAD_ID 4	/* deliver to thread */
+#define SIGEV_KEVENT	8	/* deliver through kevent queue */
 
 /*
  * This works because the alignment is ok on all current architectures
@@ -260,6 +261,8 @@ typedef struct sigevent {
 			void (*_function)(sigval_t);
 			void *_attribute;	/* really pthread_attr_t */
 		} _sigev_thread;
+
+		int kevent_fd;
 	} _sigev_un;
 } sigevent_t;
 
diff --git a/include/linux/posix-timers.h b/include/linux/posix-timers.h
index a7dd38f..4b9deb4 100644
--- a/include/linux/posix-timers.h
+++ b/include/linux/posix-timers.h
@@ -4,6 +4,7 @@
 #include <linux/spinlock.h>
 #include <linux/list.h>
 #include <linux/sched.h>
+#include <linux/kevent_storage.h>
 
 union cpu_time_count {
 	cputime_t cpu;
@@ -49,6 +50,9 @@ struct k_itimer {
 	sigval_t it_sigev_value;	/* value word of sigevent struct */
 	struct task_struct *it_process;	/* process to send signal to */
 	struct sigqueue *sigq;		/* signal queue entry. */
+#ifdef CONFIG_KEVENT_TIMER
+	struct kevent_storage st;
+#endif
 	union {
 		struct {
 			struct hrtimer timer;
diff --git a/kernel/posix-timers.c b/kernel/posix-timers.c
index 5fe87de..5ec805e 100644
--- a/kernel/posix-timers.c
+++ b/kernel/posix-timers.c
@@ -48,6 +48,8 @@
 #include <linux/wait.h>
 #include <linux/workqueue.h>
 #include <linux/module.h>
+#include <linux/kevent.h>
+#include <linux/file.h>
 
 /*
  * Management arrays for POSIX timers.	 Timers are kept in slab memory
@@ -224,6 +226,100 @@ static int posix_ktime_get_ts(clockid_t which_clock, struct timespec *tp)
 	return 0;
 }
 
+#ifdef CONFIG_KEVENT_TIMER
+static int posix_kevent_enqueue(struct kevent *k)
+{
+	/*
+	 * It is not ugly - there is no pointer in the id field union, 
+	 * but its size is 64bits, which is ok for any known pointer size.
+	 */
+	struct k_itimer *tmr = (struct k_itimer *)(unsigned long)k->event.id.raw_u64;
+	return kevent_storage_enqueue(&tmr->st, k);
+}
+static int posix_kevent_dequeue(struct kevent *k)
+{
+	struct k_itimer *tmr = (struct k_itimer *)(unsigned long)k->event.id.raw_u64;
+	kevent_storage_dequeue(&tmr->st, k);
+	return 0;
+}
+static int posix_kevent_callback(struct kevent *k)
+{
+	return 1;
+}
+static int posix_kevent_init(void)
+{
+	struct kevent_callbacks tc = {
+		.callback = &posix_kevent_callback,
+		.enqueue = &posix_kevent_enqueue,
+		.dequeue = &posix_kevent_dequeue,
+		.flags = KEVENT_CALLBACKS_KERNELONLY};
+
+	return kevent_add_callbacks(&tc, KEVENT_POSIX_TIMER);
+}
+
+extern struct file_operations kevent_user_fops;
+
+static int posix_kevent_init_timer(struct k_itimer *tmr, int fd)
+{
+	struct ukevent uk;
+	struct file *file;
+	struct kevent_user *u;
+	int err;
+
+	file = fget(fd);
+	if (!file) {
+		err = -EBADF;
+		goto err_out;
+	}
+
+	if (file->f_op != &kevent_user_fops) {
+		err = -EINVAL;
+		goto err_out_fput;
+	}
+
+	u = file->private_data;
+
+	memset(&uk, 0, sizeof(struct ukevent));
+
+	uk.event = KEVENT_MASK_ALL;
+	uk.type = KEVENT_POSIX_TIMER;
+	uk.id.raw_u64 = (unsigned long)(tmr); /* Just cast to something unique */
+	uk.req_flags = KEVENT_REQ_ONESHOT | KEVENT_REQ_ALWAYS_QUEUE;
+	uk.ptr = tmr->it_sigev_value.sival_ptr;
+
+	err = kevent_user_add_ukevent(&uk, u);
+	if (err)
+		goto err_out_fput;
+
+	fput(file);
+
+	return 0;
+
+err_out_fput:
+	fput(file);
+err_out:
+	return err;
+}
+
+static void posix_kevent_fini_timer(struct k_itimer *tmr)
+{
+	kevent_storage_fini(&tmr->st);
+}
+#else
+static int posix_kevent_init_timer(struct k_itimer *tmr, int fd)
+{
+	return -ENOSYS;
+}
+static int posix_kevent_init(void)
+{
+	return 0;
+}
+static void posix_kevent_fini_timer(struct k_itimer *tmr)
+{
+}
+#endif
+
+
 /*
  * Initialize everything, well, just everything in Posix clocks/timers ;)
  */
@@ -241,6 +337,11 @@ static __init int init_posix_timers(void)
 	register_posix_clock(CLOCK_REALTIME, &clock_realtime);
 	register_posix_clock(CLOCK_MONOTONIC, &clock_monotonic);
 
+	if (posix_kevent_init()) {
+		printk(KERN_ERR "Failed to initialize kevent posix timers.\n");
+		BUG();
+	}
+
 	posix_timers_cache = kmem_cache_create("posix_timers_cache",
 					sizeof (struct k_itimer), 0, 0, NULL, NULL);
 	idr_init(&posix_timers_id);
@@ -343,23 +444,29 @@ static int posix_timer_fn(struct hrtimer *timer)
 
 	timr = container_of(timer, struct k_itimer, it.real.timer);
 	spin_lock_irqsave(&timr->it_lock, flags);
+	
+	if (timr->it_sigev_notify == SIGEV_KEVENT) {
+#ifdef CONFIG_KEVENT_TIMER
+		kevent_storage_ready(&timr->st, NULL, KEVENT_MASK_ALL);
+#endif
+	} else {
+		if (timr->it.real.interval.tv64 != 0)
+			si_private = ++timr->it_requeue_pending;
 
-	if (timr->it.real.interval.tv64 != 0)
-		si_private = ++timr->it_requeue_pending;
-
-	if (posix_timer_event(timr, si_private)) {
-		/*
-		 * signal was not sent because of sig_ignor
-		 * we will not get a call back to restart it AND
-		 * it should be restarted.
-		 */
-		if (timr->it.real.interval.tv64 != 0) {
-			timr->it_overrun +=
-				hrtimer_forward(timer,
-						timer->base->softirq_time,
-						timr->it.real.interval);
-			ret = HRTIMER_RESTART;
-			++timr->it_requeue_pending;
+		if (posix_timer_event(timr, si_private)) {
+			/*
+			 * signal was not sent because of sig_ignor
+			 * we will not get a call back to restart it AND
+			 * it should be restarted.
+			 */
+			if (timr->it.real.interval.tv64 != 0) {
+				timr->it_overrun +=
+					hrtimer_forward(timer,
+							timer->base->softirq_time,
+							timr->it.real.interval);
+				ret = HRTIMER_RESTART;
+				++timr->it_requeue_pending;
+			}
 		}
 	}
 
@@ -407,6 +514,9 @@ static struct k_itimer * alloc_posix_timer(void)
 		kmem_cache_free(posix_timers_cache, tmr);
 		tmr = NULL;
 	}
+#ifdef CONFIG_KEVENT_TIMER
+	kevent_storage_init(tmr, &tmr->st);
+#endif
 	return tmr;
 }
 
@@ -424,6 +534,7 @@ static void release_posix_timer(struct k_itimer *tmr, int it_id_set)
 	if (unlikely(tmr->it_process) &&
 	    tmr->it_sigev_notify == (SIGEV_SIGNAL|SIGEV_THREAD_ID))
 		put_task_struct(tmr->it_process);
+	posix_kevent_fini_timer(tmr);
 	kmem_cache_free(posix_timers_cache, tmr);
 }
 
@@ -496,40 +607,52 @@ sys_timer_create(const clockid_t which_clock,
 		new_timer->it_sigev_signo = event.sigev_signo;
 		new_timer->it_sigev_value = event.sigev_value;
 
-		read_lock(&tasklist_lock);
-		if ((process = good_sigevent(&event))) {
-			/*
-			 * We may be setting up this process for another
-			 * thread.  It may be exiting.  To catch this
-			 * case the we check the PF_EXITING flag.  If
-			 * the flag is not set, the siglock will catch
-			 * him before it is too late (in exit_itimers).
-			 *
-			 * The exec case is a bit more invloved but easy
-			 * to code.  If the process is in our thread
-			 * group (and it must be or we would not allow
-			 * it here) and is doing an exec, it will cause
-			 * us to be killed.  In this case it will wait
-			 * for us to die which means we can finish this
-			 * linkage with our last gasp. I.e. no code :)
-			 */
+		if (event.sigev_notify == SIGEV_KEVENT) {
+			error = posix_kevent_init_timer(new_timer, event._sigev_un.kevent_fd);
+			if (error)
+				goto out;
+
+			process = current->group_leader;
 			spin_lock_irqsave(&process->sighand->siglock, flags);
-			if (!(process->flags & PF_EXITING)) {
-				new_timer->it_process = process;
-				list_add(&new_timer->list,
-					 &process->signal->posix_timers);
-				spin_unlock_irqrestore(&process->sighand->siglock, flags);
-				if (new_timer->it_sigev_notify == (SIGEV_SIGNAL|SIGEV_THREAD_ID))
-					get_task_struct(process);
-			} else {
-				spin_unlock_irqrestore(&process->sighand->siglock, flags);
-				process = NULL;
+			new_timer->it_process = process;
+			list_add(&new_timer->list, &process->signal->posix_timers);
+			spin_unlock_irqrestore(&process->sighand->siglock, flags);
+		} else {
+			read_lock(&tasklist_lock);
+			if ((process = good_sigevent(&event))) {
+				/*
+				 * We may be setting up this process for another
+				 * thread.  It may be exiting.  To catch this
+				 * case the we check the PF_EXITING flag.  If
+				 * the flag is not set, the siglock will catch
+				 * him before it is too late (in exit_itimers).
+				 *
+				 * The exec case is a bit more invloved but easy
+				 * to code.  If the process is in our thread
+				 * group (and it must be or we would not allow
+				 * it here) and is doing an exec, it will cause
+				 * us to be killed.  In this case it will wait
+				 * for us to die which means we can finish this
+				 * linkage with our last gasp. I.e. no code :)
+				 */
+				spin_lock_irqsave(&process->sighand->siglock, flags);
+				if (!(process->flags & PF_EXITING)) {
+					new_timer->it_process = process;
+					list_add(&new_timer->list,
+						 &process->signal->posix_timers);
+					spin_unlock_irqrestore(&process->sighand->siglock, flags);
+					if (new_timer->it_sigev_notify == (SIGEV_SIGNAL|SIGEV_THREAD_ID))
+						get_task_struct(process);
+				} else {
+					spin_unlock_irqrestore(&process->sighand->siglock, flags);
+					process = NULL;
+				}
+			}
+			read_unlock(&tasklist_lock);
+			if (!process) {
+				error = -EINVAL;
+				goto out;
 			}
-		}
-		read_unlock(&tasklist_lock);
-		if (!process) {
-			error = -EINVAL;
-			goto out;
 		}
 	} else {
 		new_timer->it_sigev_notify = SIGEV_SIGNAL;


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [take36 5/10] kevent: Timer notifications.
  2007-02-12 11:27   ` [take36 4/10] kevent: Socket notifications Evgeniy Polyakov
@ 2007-02-12 11:27     ` Evgeniy Polyakov
  2007-02-12 11:27       ` [take36 6/10] kevent: Pipe notifications Evgeniy Polyakov
  0 siblings, 1 reply; 12+ messages in thread
From: Evgeniy Polyakov @ 2007-02-12 11:27 UTC (permalink / raw)
  To: Evgeniy Polyakov
  Cc: David Miller, Ulrich Drepper, Andrew Morton, Evgeniy Polyakov,
	netdev, Zach Brown, Christoph Hellwig, Chase Venters,
	Johann Borck, linux-kernel, Jeff Garzik, Jamal Hadi Salim,
	Ingo Molnar


Timer notifications.

Timer notifications can be used for fine grained per-process time 
management, since interval timers are very inconvenient to use, 
and they are limited.

This subsystem uses high-resolution timers.

id.raw[0] is used as number of seconds
id.raw[1] is used as number of nanoseconds

Signed-off-by: Evgeniy Polyakov <johnpol@2ka.mipt.ru>

diff --git a/kernel/kevent/kevent_timer.c b/kernel/kevent/kevent_timer.c
new file mode 100644
index 0000000..c21a155
--- /dev/null
+++ b/kernel/kevent/kevent_timer.c
@@ -0,0 +1,114 @@
+/*
+ * 2006 Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+#include <linux/kernel.h>
+#include <linux/types.h>
+#include <linux/list.h>
+#include <linux/slab.h>
+#include <linux/spinlock.h>
+#include <linux/hrtimer.h>
+#include <linux/jiffies.h>
+#include <linux/kevent.h>
+
+struct kevent_timer
+{
+	struct hrtimer		ktimer;
+	struct kevent_storage	ktimer_storage;
+	struct kevent		*ktimer_event;
+};
+
+static int kevent_timer_func(struct hrtimer *timer)
+{
+	struct kevent_timer *t = container_of(timer, struct kevent_timer, ktimer);
+	struct kevent *k = t->ktimer_event;
+
+	kevent_storage_ready(&t->ktimer_storage, NULL, KEVENT_MASK_ALL);
+	hrtimer_forward(timer, timer->base->softirq_time,
+			ktime_set(k->event.id.raw[0], k->event.id.raw[1]));
+	return HRTIMER_RESTART;
+}
+
+static struct lock_class_key kevent_timer_key;
+
+static int kevent_timer_enqueue(struct kevent *k)
+{
+	int err;
+	struct kevent_timer *t;
+
+	t = kmalloc(sizeof(struct kevent_timer), GFP_KERNEL);
+	if (!t)
+		return -ENOMEM;
+
+	hrtimer_init(&t->ktimer, CLOCK_MONOTONIC, HRTIMER_REL);
+	t->ktimer.expires = ktime_set(k->event.id.raw[0], k->event.id.raw[1]);
+	t->ktimer.function = kevent_timer_func;
+	t->ktimer_event = k;
+
+	err = kevent_storage_init(&t->ktimer, &t->ktimer_storage);
+	if (err)
+		goto err_out_free;
+	lockdep_set_class(&t->ktimer_storage.lock, &kevent_timer_key);
+
+	err = kevent_storage_enqueue(&t->ktimer_storage, k);
+	if (err)
+		goto err_out_st_fini;
+
+	hrtimer_start(&t->ktimer, t->ktimer.expires, HRTIMER_REL);
+
+	return 0;
+
+err_out_st_fini:
+	kevent_storage_fini(&t->ktimer_storage);
+err_out_free:
+	kfree(t);
+
+	return err;
+}
+
+static int kevent_timer_dequeue(struct kevent *k)
+{
+	struct kevent_storage *st = k->st;
+	struct kevent_timer *t = container_of(st, struct kevent_timer, ktimer_storage);
+
+	hrtimer_cancel(&t->ktimer);
+	kevent_storage_dequeue(st, k);
+	kfree(t);
+
+	return 0;
+}
+
+static int kevent_timer_callback(struct kevent *k)
+{
+	k->event.ret_data[0] = jiffies_to_msecs(jiffies);
+	return 1;
+}
+
+static int __init kevent_init_timer(void)
+{
+	struct kevent_callbacks tc = {
+		.callback = &kevent_timer_callback,
+		.enqueue = &kevent_timer_enqueue,
+		.dequeue = &kevent_timer_dequeue,
+		.flags = 0,
+	};
+
+	return kevent_add_callbacks(&tc, KEVENT_TIMER);
+}
+module_init(kevent_init_timer);
+


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [take36 10/10] kevent: Kevent based generic AIO.
  2007-02-12 11:27             ` [take36 9/10] kevent: Private userspace notifications Evgeniy Polyakov
@ 2007-02-12 11:27               ` Evgeniy Polyakov
  2007-02-12 13:08                 ` Andi Kleen
  0 siblings, 1 reply; 12+ messages in thread
From: Evgeniy Polyakov @ 2007-02-12 11:27 UTC (permalink / raw)
  To: Evgeniy Polyakov
  Cc: David Miller, Ulrich Drepper, Andrew Morton, Evgeniy Polyakov,
	netdev, Zach Brown, Christoph Hellwig, Chase Venters,
	Johann Borck, linux-kernel, Jeff Garzik, Jamal Hadi Salim,
	Ingo Molnar, linux-fsdevel


Kevent based generic AIO.

This patch only implements network AIO, which is _COMPLETELY_ 
impossible and broken in _ANY_ micro-thread design. For details 
and test consider following link:
http://tservice.net.ru/~s0mbre/blog/2007/02/10#2007_02_10

Designing AIO without network in mind can only be result of heavy hang-over.

Kevent AIO is implemented as state machine.
There is a patch which implements async open/send_header/sendfile/close.

aio_sendfile()/aio_sendfile_path() contains of two major parts: AIO 
state machine and page processing code. 
The former is just a small subsystem, which allows to queue callback 
for theirs invocation in process' context on behalf of pool of kernel 
threads. It allows to queue caches of callbacks to the local thread 
or to any other specified. Each cache of callbacks is processed until 
there are callbacks in it, callbacks can requeue themselfs into the 
same cache.

Real work is being done in page processing code - code which populates 
pages into VFS cache and then sends pages to the destination socket 
via ->sendpage(). Unlike previous aio_sendfile() implementation, new 
one does not require low-level filesystem specific callbacks (->get_block())
at all, instead I extended struct address_space_operations to contain new 
member called ->aio_readpages(), which is exactly the same as ->readpage() 
(read: mpage_readpages()) except different BIO allocation and sumbission 
routines. I changed mpage_readpages() to provide mpage_alloc() and 
mpage_bio_submit() to the new function called __mpage_readpages(), which is 
exactly old mpage_readpages() with provided callback invocation instead of 
usage for old functions. mpage_readpages_aio() provides kevent specific 
callbacks, which calls old functions, but with different destructor callbacks,
which are essentially the same, except that they reschedule AIO processing.

aio_sendfile_path() is essentially aio_sendfile(), except that it takes
source filename as parameter, has a pointer to private header
and its size (which allows to send header and file's content in one syscall
instead of three (open, send, sendfile) and returns opened file descriptor.

Benchmark of the 100 1MB files transfer (files are in VFS already) using sync 
sendfile() against aio_sendfile_path() shows about 10MB/sec performance win 
(78 MB/s vs 66-72 MB/s over 1 Gb network, sendfile sending server is one-way 
AMD Athlong 64 3500+) for aio_sendfile_path().

AIO state machine is a base for network AIO (which becomes
quite trivial), but I will not start implementation until
roadback of kevent as a whole and AIO implementation become more clear.

Signed-off-by: Evgeniy Polyakov <johnpol@2ka.mipt.ru>

diff --git a/fs/bio.c b/fs/bio.c
index 7618bcb..291e7e8 100644
--- a/fs/bio.c
+++ b/fs/bio.c
@@ -120,7 +120,7 @@ void bio_free(struct bio *bio, struct bio_set *bio_set)
 /*
  * default destructor for a bio allocated with bio_alloc_bioset()
  */
-static void bio_fs_destructor(struct bio *bio)
+void bio_fs_destructor(struct bio *bio)
 {
 	bio_free(bio, fs_bio_set);
 }
diff --git a/fs/ext3/inode.c b/fs/ext3/inode.c
index beaf25f..f08c957 100644
--- a/fs/ext3/inode.c
+++ b/fs/ext3/inode.c
@@ -1650,6 +1650,13 @@ ext3_readpages(struct file *file, struct address_space *mapping,
 	return mpage_readpages(mapping, pages, nr_pages, ext3_get_block);
 }
 
+static int
+ext3_readpages_aio(struct file *file, struct address_space *mapping,
+		struct list_head *pages, unsigned nr_pages, void *priv)
+{
+	return mpage_readpages_aio(mapping, pages, nr_pages, ext3_get_block, priv);
+}
+
 static void ext3_invalidatepage(struct page *page, unsigned long offset)
 {
 	journal_t *journal = EXT3_JOURNAL(page->mapping->host);
@@ -1768,6 +1775,7 @@ static int ext3_journalled_set_page_dirty(struct page *page)
 }
 
 static const struct address_space_operations ext3_ordered_aops = {
+	.aio_readpages	= ext3_readpages_aio,
 	.readpage	= ext3_readpage,
 	.readpages	= ext3_readpages,
 	.writepage	= ext3_ordered_writepage,
diff --git a/fs/mpage.c b/fs/mpage.c
index 692a3e5..e5ba44b 100644
--- a/fs/mpage.c
+++ b/fs/mpage.c
@@ -102,7 +102,7 @@ static struct bio *mpage_bio_submit(int rw, struct bio *bio)
 static struct bio *
 mpage_alloc(struct block_device *bdev,
 		sector_t first_sector, int nr_vecs,
-		gfp_t gfp_flags)
+		gfp_t gfp_flags, void *priv)
 {
 	struct bio *bio;
 
@@ -116,6 +116,7 @@ mpage_alloc(struct block_device *bdev,
 	if (bio) {
 		bio->bi_bdev = bdev;
 		bio->bi_sector = first_sector;
+		bio->bi_private = priv;
 	}
 	return bio;
 }
@@ -175,7 +176,10 @@ map_buffer_to_page(struct page *page, struct buffer_head *bh, int page_block)
 static struct bio *
 do_mpage_readpage(struct bio *bio, struct page *page, unsigned nr_pages,
 		sector_t *last_block_in_bio, struct buffer_head *map_bh,
-		unsigned long *first_logical_block, get_block_t get_block)
+		unsigned long *first_logical_block, get_block_t get_block,
+		struct bio *(*alloc)(struct block_device *bdev, sector_t first_sector, 
+			int nr_vecs, gfp_t gfp_flags, void *priv),
+		struct bio *(*submit)(int rw, struct bio *bio), void *priv)
 {
 	struct inode *inode = page->mapping->host;
 	const unsigned blkbits = inode->i_blkbits;
@@ -302,25 +306,25 @@ do_mpage_readpage(struct bio *bio, struct page *page, unsigned nr_pages,
 	 * This page will go to BIO.  Do we need to send this BIO off first?
 	 */
 	if (bio && (*last_block_in_bio != blocks[0] - 1))
-		bio = mpage_bio_submit(READ, bio);
+		bio = submit(READ, bio);
 
 alloc_new:
 	if (bio == NULL) {
-		bio = mpage_alloc(bdev, blocks[0] << (blkbits - 9),
+		bio = alloc(bdev, blocks[0] << (blkbits - 9),
 			  	min_t(int, nr_pages, bio_get_nr_vecs(bdev)),
-				GFP_KERNEL);
+				GFP_KERNEL, priv);
 		if (bio == NULL)
 			goto confused;
 	}
 
 	length = first_hole << blkbits;
 	if (bio_add_page(bio, page, length, 0) < length) {
-		bio = mpage_bio_submit(READ, bio);
+		bio = submit(READ, bio);
 		goto alloc_new;
 	}
 
 	if (buffer_boundary(map_bh) || (first_hole != blocks_per_page))
-		bio = mpage_bio_submit(READ, bio);
+		bio = submit(READ, bio);
 	else
 		*last_block_in_bio = blocks[blocks_per_page - 1];
 out:
@@ -328,7 +332,7 @@ out:
 
 confused:
 	if (bio)
-		bio = mpage_bio_submit(READ, bio);
+		bio = submit(READ, bio);
 	if (!PageUptodate(page))
 	        block_read_full_page(page, get_block);
 	else
@@ -336,6 +340,48 @@ confused:
 	goto out;
 }
 
+int
+__mpage_readpages(struct address_space *mapping, struct list_head *pages,
+				unsigned nr_pages, get_block_t get_block,
+		struct bio *(*alloc)(struct block_device *bdev, sector_t first_sector, 
+			int nr_vecs, gfp_t gfp_flags, void *priv),
+		struct bio *(*submit)(int rw, struct bio *bio), 
+		void *priv)
+{
+	struct bio *bio = NULL;
+	unsigned page_idx;
+	sector_t last_block_in_bio = 0;
+	struct pagevec lru_pvec;
+	struct buffer_head map_bh;
+	unsigned long first_logical_block = 0;
+
+	clear_buffer_mapped(&map_bh);
+	pagevec_init(&lru_pvec, 0);
+	for (page_idx = 0; page_idx < nr_pages; page_idx++) {
+		struct page *page = list_entry(pages->prev, struct page, lru);
+
+		prefetchw(&page->flags);
+		list_del(&page->lru);
+		if (!add_to_page_cache(page, mapping,
+					page->index, GFP_KERNEL)) {
+			bio = do_mpage_readpage(bio, page,
+					nr_pages - page_idx,
+					&last_block_in_bio, &map_bh,
+					&first_logical_block,
+					get_block, alloc, submit, priv);
+			if (!pagevec_add(&lru_pvec, page))
+				__pagevec_lru_add(&lru_pvec);
+		} else {
+			page_cache_release(page);
+		}
+	}
+	pagevec_lru_add(&lru_pvec);
+	BUG_ON(!list_empty(pages));
+	if (bio)
+		submit(READ, bio);
+	return 0;
+}
+
 /**
  * mpage_readpages - populate an address space with some pages, and
  *                       start reads against them.
@@ -386,40 +432,28 @@ int
 mpage_readpages(struct address_space *mapping, struct list_head *pages,
 				unsigned nr_pages, get_block_t get_block)
 {
-	struct bio *bio = NULL;
-	unsigned page_idx;
-	sector_t last_block_in_bio = 0;
-	struct pagevec lru_pvec;
-	struct buffer_head map_bh;
-	unsigned long first_logical_block = 0;
+	return __mpage_readpages(mapping, pages, nr_pages, get_block,
+			mpage_alloc, mpage_bio_submit, NULL);
+}
+EXPORT_SYMBOL(mpage_readpages);
 
-	clear_buffer_mapped(&map_bh);
-	pagevec_init(&lru_pvec, 0);
-	for (page_idx = 0; page_idx < nr_pages; page_idx++) {
-		struct page *page = list_entry(pages->prev, struct page, lru);
+#ifdef CONFIG_KEVENT_AIO
+extern struct bio *kaio_mpage_alloc(struct block_device *bdev, sector_t first_sector, 
+		int nr_vecs, gfp_t gfp_flags, void *priv);
+extern struct bio *kaio_mpage_bio_submit(int rw, struct bio *bio);
+#else
+#define kaio_mpage_alloc	mpage_alloc
+#define kaio_mpage_bio_submit	mpage_bio_submit
+#endif
 
-		prefetchw(&page->flags);
-		list_del(&page->lru);
-		if (!add_to_page_cache(page, mapping,
-					page->index, GFP_KERNEL)) {
-			bio = do_mpage_readpage(bio, page,
-					nr_pages - page_idx,
-					&last_block_in_bio, &map_bh,
-					&first_logical_block,
-					get_block);
-			if (!pagevec_add(&lru_pvec, page))
-				__pagevec_lru_add(&lru_pvec);
-		} else {
-			page_cache_release(page);
-		}
-	}
-	pagevec_lru_add(&lru_pvec);
-	BUG_ON(!list_empty(pages));
-	if (bio)
-		mpage_bio_submit(READ, bio);
-	return 0;
+int
+mpage_readpages_aio(struct address_space *mapping, struct list_head *pages,
+				unsigned nr_pages, get_block_t get_block, void *priv)
+{
+	return __mpage_readpages(mapping, pages, nr_pages, get_block,
+			kaio_mpage_alloc, kaio_mpage_bio_submit, priv);
 }
-EXPORT_SYMBOL(mpage_readpages);
+EXPORT_SYMBOL(mpage_readpages_aio);
 
 /*
  * This isn't called much at all
@@ -433,7 +467,8 @@ int mpage_readpage(struct page *page, get_block_t get_block)
 
 	clear_buffer_mapped(&map_bh);
 	bio = do_mpage_readpage(bio, page, 1, &last_block_in_bio,
-			&map_bh, &first_logical_block, get_block);
+			&map_bh, &first_logical_block, get_block,
+			mpage_alloc, mpage_bio_submit, NULL);
 	if (bio)
 		mpage_bio_submit(READ, bio);
 	return 0;
@@ -595,7 +630,7 @@ page_is_mapped:
 alloc_new:
 	if (bio == NULL) {
 		bio = mpage_alloc(bdev, blocks[0] << (blkbits - 9),
-				bio_get_nr_vecs(bdev), GFP_NOFS|__GFP_HIGH);
+				bio_get_nr_vecs(bdev), GFP_NOFS|__GFP_HIGH, NULL);
 		if (bio == NULL)
 			goto confused;
 	}
diff --git a/include/linux/mpage.h b/include/linux/mpage.h
index cc5fb75..accdbdd 100644
--- a/include/linux/mpage.h
+++ b/include/linux/mpage.h
@@ -16,6 +16,8 @@ typedef int (writepage_t)(struct page *page, struct writeback_control *wbc);
 
 int mpage_readpages(struct address_space *mapping, struct list_head *pages,
 				unsigned nr_pages, get_block_t get_block);
+int mpage_readpages_aio(struct address_space *mapping, struct list_head *pages,
+				unsigned nr_pages, get_block_t get_block, void *priv);
 int mpage_readpage(struct page *page, get_block_t get_block);
 int mpage_writepages(struct address_space *mapping,
 		struct writeback_control *wbc, get_block_t get_block);
diff --git a/kernel/kevent/kevent_aio.c b/kernel/kevent/kevent_aio.c
new file mode 100644
index 0000000..9bb5e99
--- /dev/null
+++ b/kernel/kevent/kevent_aio.c
@@ -0,0 +1,939 @@
+/*
+ * 2006 Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ * 
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+#include <linux/kernel.h>
+#include <linux/module.h>
+#include <linux/types.h>
+#include <linux/spinlock.h>
+#include <linux/list.h>
+#include <linux/kthread.h>
+#include <linux/socket.h>
+#include <linux/slab.h>
+#include <linux/bio.h>
+#include <linux/pagemap.h>
+#include <linux/file.h>
+#include <linux/swap.h>
+#include <linux/kevent.h>
+
+#define KAIO_CALL_NUM		8
+#define KAIO_THREAD_NUM		8
+
+//#define KEVENT_AIO_DEBUG
+
+#ifdef KEVENT_AIO_DEBUG
+#define dprintk(f, a...) printk(f, ##a)
+#else
+#define dprintk(f, a...) do {} while (0)
+#endif
+
+struct kaio_thread
+{
+	struct list_head	req_list;
+	spinlock_t		req_lock;
+	struct task_struct	*thread;
+	int			refcnt;
+	wait_queue_head_t 	wait;
+};
+
+extern struct file_operations kevent_user_fops;
+static DEFINE_PER_CPU(u32, kaio_req_counter);
+static DEFINE_PER_CPU(int, kaio_req_cpu);
+
+/*
+ * Array of working threads.
+ * It can only be accessed under RCU protection, 
+ * so threads reference counters are not atomic.
+ */
+static struct kaio_thread *kaio_threads[KAIO_THREAD_NUM] __read_mostly;
+static struct kmem_cache *kaio_req_cache __read_mostly;
+static struct kmem_cache *kaio_priv_cache __read_mostly;
+
+struct kaio_req;
+typedef int (* kaio_callback)(struct kaio_req *req, int direct);
+
+#define KAIO_REQ_PENDING	0
+
+/*
+ * Cache of kaio request callbacks.
+ * It is not allowed to change the same cache entry
+ * simultaneously (for example it is forbidden to add entries
+ * in parallel).
+ *
+ * When cache entry is scheduled for execution in one of the threads,
+ * it is forbidden to access it, since it will be freed when
+ * all callbacks have been invoked.
+ *
+ * It is possible to add callbacks into this cache from callbacks itself.
+ */
+struct kaio_req
+{
+	struct list_head	req_entry;
+	kaio_callback		call[KAIO_CALL_NUM];
+	int			read_idx, add_idx;
+	int			cpu;
+	long			flags;
+	atomic_t		refcnt;
+	void			(*destructor)(struct kaio_req *);
+	void			*priv;
+};
+
+/*
+ * Returns pointer to thread entry for given index.
+ * Must be called under RCU protection.
+ */
+static struct kaio_thread *kaio_get_thread(int cpu)
+{
+	struct kaio_thread *th;
+
+	if (cpu == -1) {
+#if 1
+		int *cnt = &__get_cpu_var(kaio_req_cpu);
+		cpu = *cnt;
+
+		*cnt = *cnt + 1;
+		if (*cnt >= KAIO_THREAD_NUM)
+			*cnt = 0;
+#else
+		cpu = 0;
+#endif
+	}
+
+	if (unlikely(cpu >= KAIO_THREAD_NUM || !kaio_threads[cpu]))
+		return NULL;
+
+	th = kaio_threads[cpu];
+	th->refcnt++;
+
+	return th;
+}
+
+/*
+ * Drops reference counter for given thread.
+ * Must be called under RCU protection.
+ */
+static inline void kaio_put_thread(struct kaio_thread *th)
+{
+	th->refcnt--;
+}
+
+void kaio_schedule_req(struct kaio_req *req)
+{
+	struct kaio_thread *th;
+	unsigned long flags;
+
+	rcu_read_lock();
+	th = kaio_get_thread(req->cpu);
+	if (!th) {
+		req->cpu = -1;
+		th = kaio_get_thread(-1);
+		BUG_ON(!th);
+	}
+
+	if (!test_and_set_bit(KAIO_REQ_PENDING, &req->flags)) {
+		spin_lock_irqsave(&th->req_lock, flags);
+		list_add_tail(&req->req_entry, &th->req_list);
+		spin_unlock_irqrestore(&th->req_lock, flags);
+	}
+
+	wake_up(&th->wait);
+
+	kaio_put_thread(th);
+	rcu_read_unlock();
+}
+
+EXPORT_SYMBOL_GPL(kaio_schedule_req);
+
+static inline void kaio_req_get(struct kaio_req *req)
+{
+	atomic_inc(&req->refcnt);
+}
+
+static inline int kaio_req_put(struct kaio_req *req)
+{
+	if (atomic_dec_and_test(&req->refcnt)) {
+		dprintk("%s: freeing req: %p, priv: %p.\n", __func__, req, req->priv);
+		if (req->destructor)
+			req->destructor(req);
+		kmem_cache_free(kaio_req_cache, req);
+		return 1;
+	}
+
+	return 0;
+}
+
+/*
+ * Append a call request into cache.
+ * Returns -EOVERFLOW in case cache is full, and 0 otherwise.
+ */
+int kaio_append_call(struct kaio_req *req, kaio_callback call)
+{
+	if ((req->add_idx + 1 == req->read_idx) ||
+			((req->add_idx + 1 == KAIO_CALL_NUM) && req->read_idx == 0))
+		return -EOVERFLOW;
+
+	req->call[req->add_idx] = call;
+
+	dprintk("%s: req: %p, read_idx: %d, add_idx: %d, call: %p [%p].\n",
+			__func__, req, req->read_idx, req->add_idx, 
+			req->call[req->read_idx], req->call[req->add_idx]);
+	if (++req->add_idx == KAIO_CALL_NUM)
+		req->add_idx = 0;
+
+	kaio_req_get(req);
+	
+	return 0;
+}
+
+EXPORT_SYMBOL_GPL(kaio_append_call);
+
+/*
+ * Adds one call request into given cache.
+ * If cache is NULL or full, allocate new one.
+ */
+struct kaio_req *kaio_add_call(struct kaio_req *req, kaio_callback call, int cpu, gfp_t gflags)
+{
+	if (req && !kaio_append_call(req, call)) {
+		kaio_schedule_req(req);
+		return req;
+	}
+
+	req = kmem_cache_alloc(kaio_req_cache, gflags);
+	if (!req)
+		return NULL;
+
+	memset(req->call, 0, sizeof(req->call));
+
+	req->destructor = NULL;
+	req->cpu = cpu;
+	req->call[0] = call;
+	req->add_idx = 1;
+	req->read_idx = 0;
+	req->flags = 0;
+	atomic_set(&req->refcnt, 1);
+
+	dprintk("%s: req: %p, call: %p [%p].\n", __func__, req, call, req->call[0]);
+
+	return req;
+}
+
+EXPORT_SYMBOL_GPL(kaio_add_call);
+
+/*
+ * Call appropriate callbacks in cache.
+ * This can only be called by working threads, which means that cache
+ * is filled (probably partially) and are not even accessible from
+ * the originator of requests, which means that cache will be freed
+ * when all callbacks are invoked.
+ *
+ * Callback itself can reschedule new callback into the same cache.
+ *
+ * If callback returns negative value, the whole cache will be freed.
+ * If positive value is returned, then further processing is stopped, 
+ * so cache can be queued into the end of the processing FIFO by callback.
+ * If zero is returned, next callback will be invoked if any.
+ */
+static int kaio_call(struct kaio_req *req)
+{
+	int err = -EINVAL;
+
+	if (likely(req->add_idx != req->read_idx)) {
+		dprintk("%s: req: %p, read_idx: %d, add_idx: %d, call: %p [%p].\n",
+				__func__, req, req->read_idx, req->add_idx, 
+				req->call[req->read_idx], req->call[0]);
+		err = (*req->call[req->read_idx])(req, 0);
+		if (++req->read_idx == KAIO_CALL_NUM)
+			req->read_idx = 0;
+
+		if (kaio_req_put(req))
+			err = 0;
+	}
+	return err;
+}
+
+static int kaio_thread_process(void *data)
+{
+	struct kaio_thread *th = data;
+	unsigned long flags;
+	struct kaio_req *req, *first;
+	DECLARE_WAITQUEUE(wait, current);
+	int err;
+
+	add_wait_queue_exclusive(&th->wait, &wait);
+
+	while (!kthread_should_stop()) {
+		first = req = NULL;
+		do {
+			req = NULL;
+			spin_lock_irqsave(&th->req_lock, flags);
+			if (!list_empty(&th->req_list)) {
+				req = list_entry(th->req_list.prev, struct kaio_req, req_entry);
+				if (first != req)
+					list_del(&req->req_entry);
+			}
+			spin_unlock_irqrestore(&th->req_lock, flags);
+				
+			if (!first)
+				first = req;
+			else if (first == req)
+				break;
+
+			if (req) {
+				err = 0;
+				while ((req->read_idx != req->add_idx) && !kthread_should_stop()) {
+					dprintk("%s: req: %p, read_idx: %d, add_idx: %d, err: %d.\n",
+							__func__, req, req->read_idx, req->add_idx, err);
+					err = kaio_call(req);
+					if (err != 0)
+						break;
+				}
+
+				if (err > 0) {
+					spin_lock_irqsave(&th->req_lock, flags);
+					list_add_tail(&req->req_entry, &th->req_list);
+					spin_unlock_irqrestore(&th->req_lock, flags);
+				}
+			}
+		} while (req);
+		__set_current_state(TASK_INTERRUPTIBLE);
+		schedule_timeout(HZ);
+		__set_current_state(TASK_RUNNING);
+	}
+	
+	remove_wait_queue(&th->wait, &wait);
+
+	return 0;
+}
+
+struct kaio_private
+{
+	union {
+		void 		*sptr;
+		__u64		sdata;
+	};
+	union {
+		void 		*dptr;
+		__u64		ddata;
+	};
+	void			__user *header;
+	size_t			header_size;
+	__u64			offset, processed;
+	__u64			count, limit;
+	struct kevent_user	*kevent_user;
+};
+
+extern void bio_fs_destructor(struct bio *bio);
+
+static void kaio_bio_destructor(struct bio *bio)
+{
+	dprintk("%s: bio=%p, num=%u.\n", __func__, bio, bio->bi_vcnt);
+	bio_fs_destructor(bio);
+}
+
+static int kaio_read_send_pages(struct kaio_req *req, int direct);
+
+static int kaio_mpage_end_io_read(struct bio *bio, unsigned int bytes_done, int err)
+{
+	const int uptodate = test_bit(BIO_UPTODATE, &bio->bi_flags);
+	struct bio_vec *bvec = bio->bi_io_vec + bio->bi_vcnt - 1;
+	struct kaio_req *req = bio->bi_private;
+
+	if (bio->bi_size)
+		return 1;
+
+	do {
+		struct page *page = bvec->bv_page;
+
+		if (--bvec >= bio->bi_io_vec)
+			prefetchw(&bvec->bv_page->flags);
+
+		if (uptodate) {
+			SetPageUptodate(page);
+		} else {
+			ClearPageUptodate(page);
+			SetPageError(page);
+		}
+		unlock_page(page);
+	} while (bvec >= bio->bi_io_vec);
+
+	dprintk("%s: bio: %p, req: %p, pending: %d.\n", 
+			__func__, bio, req, test_bit(KAIO_REQ_PENDING, &req->flags));
+
+	kaio_append_call(req, kaio_read_send_pages);
+	kaio_req_put(req);
+	kaio_schedule_req(req);
+
+	bio_put(bio);
+	return 0;
+}
+
+struct bio *kaio_mpage_bio_submit(int rw, struct bio *bio)
+{
+	if (bio) {
+		bio->bi_end_io = kaio_mpage_end_io_read;
+		dprintk("%s: bio=%p, num=%u.\n", __func__, bio, bio->bi_vcnt);
+		submit_bio(READ, bio);
+	}
+	return NULL;
+}
+
+struct bio *kaio_mpage_alloc(struct block_device *bdev,
+		sector_t first_sector, int nr_vecs, gfp_t gfp_flags, void *priv)
+{
+	struct bio *bio;
+
+	bio = bio_alloc(gfp_flags, nr_vecs);
+
+	if (bio == NULL && (current->flags & PF_MEMALLOC)) {
+		while (!bio && (nr_vecs /= 2))
+			bio = bio_alloc(gfp_flags, nr_vecs);
+	}
+
+	if (bio) {
+		struct kaio_req *req = priv;
+
+		bio->bi_bdev = bdev;
+		bio->bi_sector = first_sector;
+		bio->bi_private = priv;
+		bio->bi_destructor = kaio_bio_destructor;
+		kaio_req_get(req);
+		dprintk("%s: bio: %p, req: %p, num: %d.\n", __func__, bio, priv, nr_vecs);
+	}
+	return bio;
+}
+
+static ssize_t kaio_vfs_read_actor(struct kaio_private *priv, struct page *page, size_t len)
+{
+	struct socket *sock = priv->dptr;
+	struct file *file = sock->file;
+	
+	return file->f_op->sendpage(file, page, 0, len, &file->f_pos, 1);
+}
+
+static int kaio_vfs_read(struct kaio_private *priv,
+		ssize_t (*actor)(struct kaio_private *, struct page *, size_t))
+{
+	struct address_space *mapping;
+	struct file *file = priv->sptr;
+	ssize_t actor_size;
+	loff_t isize;
+	int i = 0, pg_num;
+
+	mapping = file->f_mapping;
+	isize = i_size_read(file->f_dentry->d_inode);
+
+	if (priv->processed >= isize) {
+		priv->count = 0;
+		return 0;
+	}
+	priv->count = isize - priv->processed;
+	pg_num = ALIGN(min_t(u64, isize, priv->count), PAGE_SIZE) >> PAGE_SHIFT;
+	
+	dprintk("%s: start: priv: %p, ret: %d, num: %d, count: %Lu, offset: %Lu, processed: %Lu.\n", 
+			__func__, priv, i, pg_num, priv->count, priv->offset, priv->processed);
+
+	for (i=0; i<pg_num && priv->count; ++i) {
+		struct page *page;
+		size_t nr = PAGE_CACHE_SIZE;
+
+		page = find_get_page(mapping, priv->processed >> PAGE_CACHE_SHIFT);
+		if (unlikely(page == NULL))
+			break;
+		if (!PageUptodate(page)) {
+			dprintk("%s: %2d: page=%p, processed=%Lu, count=%Lu not uptodate.\n", 
+					__func__, i, page, priv->processed, priv->count);
+			page_cache_release(page);
+			break;
+		}
+
+		if (mapping_writably_mapped(mapping))
+			flush_dcache_page(page);
+
+		mark_page_accessed(page);
+
+		if (nr + priv->processed > isize)
+			nr = isize - priv->processed;
+		if (nr > priv->count)
+			nr = priv->count;
+
+		actor_size = actor(priv, page, nr);
+		if (actor_size < 0) {
+			page_cache_release(page);
+			i = (int)actor_size;
+			break;
+		}
+
+		page_cache_release(page);
+
+		priv->processed += actor_size;
+		priv->count -= actor_size;
+	}
+
+	if (!priv->count)
+		i = pg_num;
+
+	dprintk("%s: end: priv: %p, ret: %d, num: %d, count: %Lu, offset: %Lu, processed: %Lu.\n", 
+			__func__, priv, i, pg_num, priv->count, priv->offset, priv->processed);
+
+	return i;
+}
+
+/*
+ * SThis function sends a header if it is available.
+ * Zero is returned if it was sent correctly,
+ * in case of fatal error negative value is returned,
+ * and positive one, if -EAGAIN was returned by underlying socket layer.
+ */
+static int kaio_send_header(struct kaio_private *priv)
+{
+	int err;
+	struct socket *sock = priv->dptr;
+	struct iovec iov;
+	struct msghdr msg;
+
+	memset(&msg, 0, sizeof(struct msghdr));
+	iov.iov_base = priv->header;
+	iov.iov_len = priv->header_size;
+	msg.msg_iov = &iov;
+	msg.msg_iovlen = 1;
+
+	while (priv->header_size) {
+		iov.iov_base = priv->header;
+		iov.iov_len = priv->header_size;
+
+		err = sock_sendmsg(sock, &msg, priv->header_size);
+
+		if (err < 0)
+			return err;
+		if (err == 0)
+			return -1;
+
+		priv->header += err;
+		priv->header_size -= err;
+	}
+
+	return 0;
+}
+
+static int kaio_read_send_pages(struct kaio_req *req, int direct)
+{
+	struct kaio_private *priv = req->priv;
+	struct file *file = priv->sptr;
+	struct address_space *mapping = file->f_mapping;
+	struct page *page;
+	int err = 0, i, num;
+	u64 offset;
+	LIST_HEAD(page_pool);
+
+	if (priv->header_size) {
+		err = kaio_send_header(priv);
+		if (err < 0)
+			return err;
+	}
+
+	if (err > 0) {
+		kaio_append_call(req, kaio_read_send_pages);
+		return 0;
+	}
+
+	err = kaio_vfs_read(priv, &kaio_vfs_read_actor);
+	if (err < 0)
+		return err;
+
+	if (err == 0) {
+		priv->limit >>= 1;
+	} else {
+		if (priv->limit)
+			priv->limit <<= 1;
+		else
+			priv->limit = 8;
+	}
+
+	if (priv->offset < priv->processed)
+		priv->offset = priv->processed;
+
+	if (!priv->count) {
+		kevent_storage_ready(&priv->kevent_user->st, NULL, KEVENT_MASK_ALL);
+		return 0;
+	}
+
+	if (priv->offset >= priv->processed + priv->count) {
+		kaio_append_call(req, kaio_read_send_pages);
+		return 0;
+	}
+
+	num = min_t(int, max_sane_readahead(priv->limit), 
+			ALIGN(priv->count, PAGE_SIZE) >> PAGE_SHIFT);
+
+	offset = priv->offset;
+	for (i=0; i<num; ++i) {
+		page = page_cache_alloc_cold(mapping);
+		if (!page)
+			break;
+
+		page->index = priv->offset >> PAGE_CACHE_SHIFT;
+		list_add(&page->lru, &page_pool);
+
+		priv->offset += PAGE_CACHE_SIZE;
+	}
+
+	dprintk("%s: submit: req: %p, priv: %p, offset: %Lu, num: %d, limit: %Lu.\n",
+			__func__, req, priv, offset, i, priv->limit);
+
+	err = mapping->a_ops->aio_readpages(file, mapping, &page_pool, i, req);
+	if (err) {
+		dprintk("%s: kevent_mpage_readpages failed: err=%d, count=%Lu.\n",
+				__func__, err, priv->count);
+		kaio_schedule_req(req);
+		return err;
+	}
+
+	return 1;
+}
+
+static int kaio_add_kevent(int fd, struct kaio_req *req)
+{
+	struct ukevent uk;
+	struct file *file;
+	struct kevent_user *u;
+	int err, need_fput = 0;
+	u32 *cnt;
+
+	file = fget_light(fd, &need_fput);
+	if (!file) {
+		err = -EBADF;
+		goto err_out;
+	}
+
+	if (file->f_op != &kevent_user_fops) {
+		err = -EINVAL;
+		goto err_out_fput;
+	}
+
+	u = file->private_data;
+
+	memset(&uk, 0, sizeof(struct ukevent));
+
+	uk.event = KEVENT_MASK_ALL;
+	uk.type = KEVENT_AIO;
+
+	preempt_disable();
+	uk.id.raw[0] = smp_processor_id();
+	cnt = &__get_cpu_var(kaio_req_counter);
+	uk.id.raw[1] = *cnt;
+	*cnt = *cnt + 1;
+	preempt_enable();
+
+	uk.req_flags = KEVENT_REQ_ONESHOT | KEVENT_REQ_ALWAYS_QUEUE;
+	uk.ptr = req;
+
+	err = kevent_user_add_ukevent(&uk, u);
+	if (err)
+		goto err_out_fput;
+
+	kevent_user_get(u);
+
+	fput_light(file, need_fput);
+
+	return 0;
+
+err_out_fput:
+	fput_light(file, need_fput);
+err_out:
+	return err;
+}
+
+static void kaio_destructor(struct kaio_req *req)
+{
+	struct kaio_private *priv = req->priv;
+	struct socket *sock = priv->dptr;
+	struct file *file = priv->sptr;
+
+	fput(file);
+	sockfd_put(sock);
+
+	kevent_storage_ready(&priv->kevent_user->st, NULL, KEVENT_MASK_ALL);
+	kevent_user_put(priv->kevent_user);
+
+	kmem_cache_free(kaio_priv_cache, req->priv);
+}
+
+static struct kaio_req *kaio_sendfile(int kevent_fd, int sock_fd, 
+		void __user *header, size_t header_size, 
+		struct file *file, off_t offset, size_t count)
+{
+	struct kaio_req *req;
+	struct socket *sock;
+	struct kaio_private *priv;
+	int err;
+
+	sock = sockfd_lookup(sock_fd, &err);
+	if (!sock)
+		goto err_out_exit;
+
+	priv = kmem_cache_alloc(kaio_priv_cache, GFP_KERNEL);
+	if (!priv)
+		goto err_out_sput;
+
+	priv->sptr = file;
+	priv->dptr = sock;
+	priv->offset = offset;
+	priv->count = min_t(u64, i_size_read(file->f_dentry->d_inode), count);
+	priv->processed = offset;
+	priv->limit = 128;
+	priv->header = header;
+	priv->header_size = header_size;
+
+	req = kaio_add_call(NULL, &kaio_read_send_pages, -1, GFP_KERNEL);
+	if (!req)
+		goto err_out_free;
+
+	req->destructor = kaio_destructor;
+	req->priv = priv;
+
+	err = kaio_add_kevent(kevent_fd, req);
+
+	dprintk("%s: req: %p, priv: %p, call: %p [%p], offset: %Lu, processed: %Lu, count: %Lu, err: %d.\n",
+			__func__, req, priv, &kaio_read_send_pages, 
+			kaio_read_send_pages, priv->offset, priv->processed, priv->count, err);
+
+	if (err)
+		goto err_out_remove;
+
+	kaio_schedule_req(req);
+
+	return req;
+
+err_out_remove:
+	/* It is safe to just free the object since it is guaranteed that it was not
+	 * queued for processing.
+	 */
+	kmem_cache_free(kaio_req_cache, req);
+err_out_free:
+	kmem_cache_free(kaio_priv_cache, priv);
+err_out_sput:
+	sockfd_put(sock);
+err_out_exit:
+	return NULL;
+
+}
+
+asmlinkage long sys_aio_sendfile(int kevent_fd, int sock_fd, int in_fd, off_t offset, size_t count)
+{
+	struct kaio_req *req;
+	struct file *file;
+	int err;
+
+	file = fget(in_fd);
+	if (!file) {
+		err = -EBADF;
+		goto err_out_exit;
+	}
+
+	req = kaio_sendfile(kevent_fd, sock_fd, NULL, 0, file, offset, count);
+	if (!req) {
+		err = -EINVAL;
+		goto err_out_fput;
+	}
+
+	return (long)req;
+
+err_out_fput:
+	fput(file);
+err_out_exit:
+	return err;
+}
+
+asmlinkage long sys_aio_sendfile_path(int kevent_fd, int sock_fd, 
+		void __user *header, size_t header_size, 
+		char __user *filename, off_t offset, size_t count)
+{
+	char *tmp = getname(filename);
+	int fd = PTR_ERR(tmp);
+	int flags = O_RDONLY, err;
+	struct nameidata nd;
+	struct file *file;
+	struct kaio_req *req;
+
+	if (force_o_largefile())
+		flags = O_LARGEFILE;
+
+	if (IS_ERR(tmp)) {
+		err = fd;
+		goto err_out_exit;
+	}
+
+	fd = get_unused_fd();
+	if (fd < 0) {
+		err = fd;
+		goto err_out_put_name;
+	}
+
+	if ((flags+1) & O_ACCMODE)
+		flags++;
+
+	err = open_namei(AT_FDCWD, tmp, flags, 0400, &nd);
+	if (err)
+		goto err_out_fdput;
+
+	file = nameidata_to_filp(&nd, flags);
+	if (!file)
+		goto err_out_fdput;
+
+	/* One reference will be released in sys_close(), 
+	 * second one through req->destructor() 
+	 */
+	atomic_inc(&file->f_count);
+
+	req = kaio_sendfile(kevent_fd, sock_fd, header, header_size, 
+			file, offset, count);
+	if (!req) {
+		err = -EINVAL;
+		goto err_out_fput;
+	}
+
+	fd_install(fd, file);
+
+	return fd;
+
+err_out_fput:
+	fput(file);
+	fput(file);
+err_out_fdput:
+	put_unused_fd(fd);
+err_out_put_name:
+	putname(tmp);
+err_out_exit:
+	return err;
+}
+
+static int kevent_aio_callback(struct kevent *k)
+{
+	struct kaio_req *req = k->event.ptr;
+	struct kaio_private *priv = req->priv;
+
+	if (!priv->count) {
+		__u32 *processed = (__u32 *)&priv->processed;
+		k->event.ret_data[0] = processed[0];
+		k->event.ret_data[1] = processed[1];
+		return 1;
+	}
+
+	return 0;
+}
+
+int kevent_aio_enqueue(struct kevent *k)
+{
+	int err;
+	struct kaio_req *req = k->event.ptr;
+	struct kaio_private *priv = req->priv;
+
+	err = kevent_storage_enqueue(&k->user->st, k);
+	if (err)
+		goto err_out_exit;
+
+	priv->kevent_user = k->user;
+	if (k->event.req_flags & KEVENT_REQ_ALWAYS_QUEUE)
+		kevent_requeue(k);
+
+	return 0;
+
+err_out_exit:
+	return err;
+}
+
+int kevent_aio_dequeue(struct kevent *k)
+{
+	kevent_storage_dequeue(k->st, k);
+
+	return 0;
+}
+
+static void kaio_thread_stop(struct kaio_thread *th)
+{
+	kthread_stop(th->thread);
+	kfree(th);
+}
+
+static int kaio_thread_start(struct kaio_thread **thp, int num)
+{
+	struct kaio_thread *th;
+
+	th = kzalloc(sizeof(struct kaio_thread), GFP_KERNEL);
+	if (!th)
+		return -ENOMEM;
+
+	th->refcnt = 1;
+	spin_lock_init(&th->req_lock);
+	INIT_LIST_HEAD(&th->req_list);
+	init_waitqueue_head(&th->wait);
+
+	th->thread = kthread_run(kaio_thread_process, th, "kaio/%d", num);
+	if (IS_ERR(th->thread)) {
+		int err = PTR_ERR(th->thread);
+		kfree(th);
+		return err;
+	}
+
+	*thp = th;
+	wmb();
+
+	return 0;
+}
+
+static int __init kevent_init_aio(void)
+{
+	struct kevent_callbacks sc = {
+		.callback = &kevent_aio_callback,
+		.enqueue = &kevent_aio_enqueue,
+		.dequeue = &kevent_aio_dequeue,
+		.flags = 0,
+	};
+	int err, i;
+
+	kaio_req_cache = kmem_cache_create("kaio_req", sizeof(struct kaio_req), 
+			0, SLAB_PANIC, NULL, NULL);
+	kaio_priv_cache = kmem_cache_create("kaio_priv", sizeof(struct kaio_private), 
+			0, SLAB_PANIC, NULL, NULL);
+
+	memset(kaio_threads, 0, sizeof(kaio_threads));
+
+	for (i=0; i<KAIO_THREAD_NUM; ++i) {
+		err = kaio_thread_start(&kaio_threads[i], i);
+		if (err)
+			goto err_out_stop;
+	}
+
+	err = kevent_add_callbacks(&sc, KEVENT_AIO);
+	if (err)
+		goto err_out_stop;
+
+	return 0;
+
+err_out_stop:
+	while (--i >= 0) {
+		struct kaio_thread *th = kaio_threads[i];
+
+		kaio_threads[i] = NULL;
+		wmb();
+
+		kaio_thread_stop(th);
+	}
+	return err;
+}
+module_init(kevent_init_aio);


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* Re: [take36 10/10] kevent: Kevent based generic AIO.
  2007-02-12 13:08                 ` Andi Kleen
@ 2007-02-12 12:19                   ` Evgeniy Polyakov
  2007-02-12 13:12                   ` Alan
  1 sibling, 0 replies; 12+ messages in thread
From: Evgeniy Polyakov @ 2007-02-12 12:19 UTC (permalink / raw)
  To: Andi Kleen
  Cc: David Miller, Ulrich Drepper, Andrew Morton, netdev, Zach Brown,
	Christoph Hellwig, Chase Venters, Johann Borck, linux-kernel,
	Jeff Garzik, Jamal Hadi Salim, Ingo Molnar, linux-fsdevel

On Mon, Feb 12, 2007 at 02:08:10PM +0100, Andi Kleen (ak@suse.de) wrote:
> Evgeniy Polyakov <johnpol@2ka.mipt.ru> writes:
> > 
> > aio_sendfile_path() is essentially aio_sendfile(), except that it takes
> > source filename as parameter, has a pointer to private header
> > and its size (which allows to send header and file's content in one syscall
> > instead of three (open, send, sendfile) and returns opened file descriptor.
> 
> Are you sure this is a useful optimization? Do you have numbers vs open+aio_sendfile+close? 
> 
> Compared to the cost of sending a complete file three system calls should be quite in the noise. 
> And Linux system calls are not that expensive (few hundred cycles normally) 
> 
> Adding such compound system calls would be a worrying precedent because
> I'm sure others would want them then for their favourite system call combo
> too. If they were really useful it might make more sense to have a batch() 
> system call that works for arbitary calls, but I'm not convinced yet
> it's even needed. It would be certainly ugly.

Yes, that call ends up about 10MB/sec faster for 100 1mb file transfers
over 1gbit network (78 MB/s vs 66-72 MB/s over 1 Gb network, sendfile sending server 
is one-way AMD Athlong 64 3500+), but indeed, it can be the case that async IO sending 
was main speed factor.

I added header by request from Suparna Bhattacharya - my main position
is the same about syscall overhead, but I do not that care.

> -Andi

-- 
	Evgeniy Polyakov

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [take36 10/10] kevent: Kevent based generic AIO.
  2007-02-12 11:27               ` [take36 10/10] kevent: Kevent based generic AIO Evgeniy Polyakov
@ 2007-02-12 13:08                 ` Andi Kleen
  2007-02-12 12:19                   ` Evgeniy Polyakov
  2007-02-12 13:12                   ` Alan
  0 siblings, 2 replies; 12+ messages in thread
From: Andi Kleen @ 2007-02-12 13:08 UTC (permalink / raw)
  To: Evgeniy Polyakov
  Cc: David Miller, Ulrich Drepper, Andrew Morton, netdev, Zach Brown,
	Christoph Hellwig, Chase Venters, Johann Borck, linux-kernel,
	Jeff Garzik, Jamal Hadi Salim, Ingo Molnar, linux-fsdevel

Evgeniy Polyakov <johnpol@2ka.mipt.ru> writes:
> 
> aio_sendfile_path() is essentially aio_sendfile(), except that it takes
> source filename as parameter, has a pointer to private header
> and its size (which allows to send header and file's content in one syscall
> instead of three (open, send, sendfile) and returns opened file descriptor.

Are you sure this is a useful optimization? Do you have numbers vs open+aio_sendfile+close? 

Compared to the cost of sending a complete file three system calls should be quite in the noise. 
And Linux system calls are not that expensive (few hundred cycles normally) 

Adding such compound system calls would be a worrying precedent because
I'm sure others would want them then for their favourite system call combo
too. If they were really useful it might make more sense to have a batch() 
system call that works for arbitary calls, but I'm not convinced yet
it's even needed. It would be certainly ugly.

-Andi

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [take36 10/10] kevent: Kevent based generic AIO.
  2007-02-12 13:08                 ` Andi Kleen
  2007-02-12 12:19                   ` Evgeniy Polyakov
@ 2007-02-12 13:12                   ` Alan
  2007-02-12 13:24                     ` Evgeniy Polyakov
  1 sibling, 1 reply; 12+ messages in thread
From: Alan @ 2007-02-12 13:12 UTC (permalink / raw)
  To: Andi Kleen
  Cc: Evgeniy Polyakov, David Miller, Ulrich Drepper, Andrew Morton,
	netdev, Zach Brown, Christoph Hellwig, Chase Venters,
	Johann Borck, linux-kernel, Jeff Garzik, Jamal Hadi Salim,
	Ingo Molnar, linux-fsdevel

> I'm sure others would want them then for their favourite system call combo
> too. If they were really useful it might make more sense to have a batch() 
> system call that works for arbitary calls, but I'm not convinced yet
> it's even needed. It would be certainly ugly.

batch() would possibly make a lot of sense in terms of the fibril/thread
based removal for the need for all the AIO stuff, just to provide a
natural way to group and order sequences of synchronous operations into
asynchronous groups.

I am extremely sceptical about the need for aio_sendfile_path since with
sendfile/sendpath hacking around there didn't seem to be much gain.

I'm even more sceptical of the header buffer stuff as while other OS's do
that as a hack to make TCP packetising work we simply fixed the root
problem with TCP_CORK

Alan

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [take36 10/10] kevent: Kevent based generic AIO.
  2007-02-12 13:12                   ` Alan
@ 2007-02-12 13:24                     ` Evgeniy Polyakov
  0 siblings, 0 replies; 12+ messages in thread
From: Evgeniy Polyakov @ 2007-02-12 13:24 UTC (permalink / raw)
  To: Alan
  Cc: Andi Kleen, David Miller, Ulrich Drepper, Andrew Morton, netdev,
	Zach Brown, Christoph Hellwig, Chase Venters, Johann Borck,
	linux-kernel, Jeff Garzik, Jamal Hadi Salim, Ingo Molnar,
	linux-fsdevel

On Mon, Feb 12, 2007 at 01:12:57PM +0000, Alan (alan@lxorguk.ukuu.org.uk) wrote:
> > I'm sure others would want them then for their favourite system call combo
> > too. If they were really useful it might make more sense to have a batch() 
> > system call that works for arbitary calls, but I'm not convinced yet
> > it's even needed. It would be certainly ugly.
> 
> batch() would possibly make a lot of sense in terms of the fibril/thread
> based removal for the need for all the AIO stuff, just to provide a
> natural way to group and order sequences of synchronous operations into
> asynchronous groups.
> 
> I am extremely sceptical about the need for aio_sendfile_path since with
> sendfile/sendpath hacking around there didn't seem to be much gain.
> 
> I'm even more sceptical of the header buffer stuff as while other OS's do
> that as a hack to make TCP packetising work we simply fixed the root
> problem with TCP_CORK

Well, that does not matter that much - that syscall is an example of how
kevent AIO state machine works - it trivially allows to have async
open/send/sendfile/close like aio_senfile_path.

I would remove header too (it was not there in the first release, added
on request), but the whole idea of async open/send/close seems natural 
for web-like workloads, when user should not even care about proper state 
machine.

Having batch() mode would be good too - with kevent state machine it is
quite trivially: 
req = kaio_add_call(NULL, first_call)
kaio_append_call(req, second_call);
kaio_append_call(req, third_call);
...
kaio_schedule_req(req);

Yes, state machine handling requires additional code, but when it ends
up in faster processing implementation complexities deserve its price.

> Alan

-- 
	Evgeniy Polyakov

^ permalink raw reply	[flat|nested] 12+ messages in thread

end of thread, other threads:[~2007-02-12 13:29 UTC | newest]

Thread overview: 12+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
     [not found] <11712796473213@2ka.mipt.ru>
2007-02-12 11:27 ` [take36 3/10] kevent: poll/select() notifications Evgeniy Polyakov
2007-02-12 11:27   ` [take36 4/10] kevent: Socket notifications Evgeniy Polyakov
2007-02-12 11:27     ` [take36 5/10] kevent: Timer notifications Evgeniy Polyakov
2007-02-12 11:27       ` [take36 6/10] kevent: Pipe notifications Evgeniy Polyakov
2007-02-12 11:27         ` [take36 7/10] kevent: Signal notifications Evgeniy Polyakov
2007-02-12 11:27           ` [take36 8/10] kevent: Kevent posix timer notifications Evgeniy Polyakov
2007-02-12 11:27             ` [take36 9/10] kevent: Private userspace notifications Evgeniy Polyakov
2007-02-12 11:27               ` [take36 10/10] kevent: Kevent based generic AIO Evgeniy Polyakov
2007-02-12 13:08                 ` Andi Kleen
2007-02-12 12:19                   ` Evgeniy Polyakov
2007-02-12 13:12                   ` Alan
2007-02-12 13:24                     ` Evgeniy Polyakov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).