All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH 1/8] af_unix: Documentation on multicast unix sockets
       [not found] <20110121143751.57b1453d@chocolatine.cbg.collabora.co.uk>
@ 2011-01-21 14:39   ` Alban Crequy
  2011-01-21 14:39   ` Alban Crequy
                     ` (6 subsequent siblings)
  7 siblings, 0 replies; 19+ messages in thread
From: Alban Crequy @ 2011-01-21 14:39 UTC (permalink / raw)
  To: David S. Miller, Eric Dumazet, Lennart Poettering, netdev,
	linux-doc, linux-kernel, Alban Crequy, Ian Molton
  Cc: Alban Crequy

Signed-off-by: Alban Crequy <alban.crequy@collabora.co.uk>
Reviewed-by: Ian Molton <ian.molton@collabora.co.uk>
---
 .../networking/multicast-unix-sockets.txt          |  171 ++++++++++++++++++++
 1 files changed, 171 insertions(+), 0 deletions(-)
 create mode 100644 Documentation/networking/multicast-unix-sockets.txt

diff --git a/Documentation/networking/multicast-unix-sockets.txt b/Documentation/networking/multicast-unix-sockets.txt
new file mode 100644
index 0000000..0cc30cb
--- /dev/null
+++ b/Documentation/networking/multicast-unix-sockets.txt
@@ -0,0 +1,171 @@
+Multicast Unix sockets
+======================
+
+Multicast is implemented on SOCK_DGRAM and SOCK_SEQPACKET Unix sockets.
+
+An userspace application can create a multicast group with:
+
+  struct unix_mreq mreq = {0,};
+  mreq.address.sun_family = AF_UNIX;
+  mreq.address.sun_path[0] = '\0';
+  strcpy(mreq.address.sun_path + 1, "socket-address");
+
+  sockfd = socket(AF_UNIX, SOCK_DGRAM, 0);
+  ret = setsockopt(sockfd, SOL_UNIX, UNIX_CREATE_GROUP, &mreq, sizeof(mreq));
+
+This allocates a struct unix_mcast_group, which is reference counted and exists
+as long as the socket who created it exists or the group has at least one
+member.
+
+Then a multicast group can be joined with:
+
+  ret = setsockopt(sockfd, SOL_UNIX, UNIX_JOIN_GROUP, &mreq, sizeof(mreq));
+
+This allocates a struct unix_mcast, which holds the settings of the membership,
+mainly whether loopback is enabled. A socket can be a member of several
+multicast groups.
+
+The socket is part of the multicast group until it is released, shutdown with
+RCV_SHUTDOWN or it leaves explicitely the group:
+
+  ret = setsockopt(sockfd, SOL_UNIX, UNIX_LEAVE_GROUP, &mreq, sizeof(mreq));
+
+Struct unix_mcast nodes are linked in two RCU lists:
+- (struct unix_sock)->mcast_subscriptions
+- (struct unix_mcast_group)->mcast_members
+
+              unix_mcast_group  unix_mcast_group
+                      |                 |
+                      v                 v
+unix_sock  ---->  unix_mcast  ----> unix_mcast
+                      |
+                      v
+unix_sock  ---->  unix_mcast
+                      |
+                      v
+unix_sock  ---->  unix_mcast
+
+
+SOCK_DGRAM semantics
+====================
+
+          G          The socket which created the group
+       /  |  \
+     P1  P2  P3      The member sockets
+
+Messages sent to the group are received by all members except the sender itself
+unless the sending socket has UNIX_MREQ_LOOPBACK set.
+
+Non-members can also send to the group socket G and the message will be
+broadcast to the group members, however socket G does not receive messages sent
+to the group, via it, itself.
+
+
+SOCK_SEQPACKET semantics
+========================
+
+When a connection is performed on a SOCK_SEQPACKET multicast socket, a new
+socket is created and its file descriptor is received by accept().
+
+          L          The listening socket
+       /  |  \
+     A1  A2  A3      The accepted sockets
+      |   |   |
+     C1  C2  C3      The connected sockets
+
+Messages sent on the C1 socket are received by:
+- C1 itself if UNIX_MREQ_LOOPBACK is set.
+- The peer socket A1 if UNIX_MREQ_SEND_TO_PEER is set.
+- The other members of the multicast group C2 and C3.
+
+Only members can send to the group in this case.
+
+
+Atomic delivery and ordering
+============================
+
+Each message sent is delivered atomically to either none of the recipients or
+all the recipients, even with interruptions and errors.
+
+Locking is used in order to keep the ordering consistent on all recipients. We
+want to avoid the following scenario. Two emitters A and B, and 2 recipients, C
+and D:
+
+           C    D
+A -------->|    |    Step 1: A's message is delivered to C
+B -------->|    |    Step 2: B's message is delivered to C
+B ---------|--->|    Step 3: B's message is delivered to D
+A ---------|--->|    Step 4: A's message is delivered to D
+
+Result: - C received (A, B)
+        - D received (B, A)
+
+Although A and B had a list of recipients (C, D) in the same order, C and D
+received the messages in a different order. To avoid this scenario, we need a
+locking mechanism while the messages are being delivered with skb_queue_tail().
+
+Solution 1:
+The easiest implementation would be to use a global spinlock on the group, but
+it creates an avoidable contention, especially when there are two independent
+streams set up with socket filters; e.g. if A sends messages received only by
+C, and B sends messages received only by D.
+
+Solution 2:
+Fine-grained locking could be implemented with a spinlock on each recipient.
+Before delivering the message to the recipients, the sender takes a spinlock on
+each recipient at the same time.
+
+Taking several spinlocks on the same struct can be dangerous and leads to
+deadlocks. This is prevented by sorting the list of sockets by memory address
+and taking the spinlocks in that order. The ordered list of recipients is
+computed on demand when a message is sent and the list is cached for
+performance. When the group membership changes, the generation of the
+membership is incremented and the ordered recipient list is invalidated.
+
+With this solution, the number of spinlocks taken simultaneously can be
+arbitrary big. Whilst it works, it breaks the lockdep mechanism.
+
+Solution 3:
+The current implementation is similar to solution 2 but with a limit on the
+number of spinlocks taken simultaneously (8), so lockdep works fine. A hash
+function and bit array with n=8 specifies which spinlocks to take.  Contention
+on independent streams can still happen but it is less likely.
+
+
+Flow control
+============
+
+When a socket's receiving queue is full, the default behavior is to block
+senders (or to return -EAGAIN on non-blocking sockets). The socket can also
+join a multicast group with the flag UNIX_MREQ_DROP_WHEN_FULL. In this case,
+messages sent to the group will not be delivered to that socket when its
+receiving queue is full.
+
+Messages are still delivered atomically to all members who don't have the flag
+UNIX_MREQ_DROP_WHEN_FULL. If send() returns -EAGAIN, nobody received the
+message. If send() blocks because of one member, the other members don't
+receive the message until all sockets (except those with
+UNIX_MREQ_DROP_WHEN_FULL set) can receive at the same time.
+
+poll/epoll/select on POLLOUT events have a consistent behavior; they block if
+at least one member of the multicast group without UNIX_MREQ_DROP_WHEN_FULL has
+a full receiving queue.
+
+
+Multicast socket reference counting
+===================================
+
+A poller for POLLOUT events can block for any member of the group. The poller
+can use the wait queue "peer_wait" of any member. So it is important that Unix
+sockets are not released before all pollers exit. This is achieved by:
+
+- Incrementing the reference counter of a socket when it joins a multicast
+  group.
+- Decrementing it when the group is destroyed, that is when all
+  sockets keeping a reference on the group released their reference on the
+  group.
+
+struct unix_mcast_group keeps track of both current members and previous
+members. When a socket leaves a group, it is removed from the members list and
+put in the dead members list. This is done in order to take advantage of RCU
+lists, which reduces lock contention.
-- 
1.7.2.3


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 1/8] af_unix: Documentation on multicast unix sockets
@ 2011-01-21 14:39   ` Alban Crequy
  0 siblings, 0 replies; 19+ messages in thread
From: Alban Crequy @ 2011-01-21 14:39 UTC (permalink / raw)
  To: David S. Miller, Eric Dumazet, Lennart Poettering, netdev,
	linux-doc, linux-
  Cc: Alban Crequy

Signed-off-by: Alban Crequy <alban.crequy@collabora.co.uk>
Reviewed-by: Ian Molton <ian.molton@collabora.co.uk>
---
 .../networking/multicast-unix-sockets.txt          |  171 ++++++++++++++++++++
 1 files changed, 171 insertions(+), 0 deletions(-)
 create mode 100644 Documentation/networking/multicast-unix-sockets.txt

diff --git a/Documentation/networking/multicast-unix-sockets.txt b/Documentation/networking/multicast-unix-sockets.txt
new file mode 100644
index 0000000..0cc30cb
--- /dev/null
+++ b/Documentation/networking/multicast-unix-sockets.txt
@@ -0,0 +1,171 @@
+Multicast Unix sockets
+======================
+
+Multicast is implemented on SOCK_DGRAM and SOCK_SEQPACKET Unix sockets.
+
+An userspace application can create a multicast group with:
+
+  struct unix_mreq mreq = {0,};
+  mreq.address.sun_family = AF_UNIX;
+  mreq.address.sun_path[0] = '\0';
+  strcpy(mreq.address.sun_path + 1, "socket-address");
+
+  sockfd = socket(AF_UNIX, SOCK_DGRAM, 0);
+  ret = setsockopt(sockfd, SOL_UNIX, UNIX_CREATE_GROUP, &mreq, sizeof(mreq));
+
+This allocates a struct unix_mcast_group, which is reference counted and exists
+as long as the socket who created it exists or the group has at least one
+member.
+
+Then a multicast group can be joined with:
+
+  ret = setsockopt(sockfd, SOL_UNIX, UNIX_JOIN_GROUP, &mreq, sizeof(mreq));
+
+This allocates a struct unix_mcast, which holds the settings of the membership,
+mainly whether loopback is enabled. A socket can be a member of several
+multicast groups.
+
+The socket is part of the multicast group until it is released, shutdown with
+RCV_SHUTDOWN or it leaves explicitely the group:
+
+  ret = setsockopt(sockfd, SOL_UNIX, UNIX_LEAVE_GROUP, &mreq, sizeof(mreq));
+
+Struct unix_mcast nodes are linked in two RCU lists:
+- (struct unix_sock)->mcast_subscriptions
+- (struct unix_mcast_group)->mcast_members
+
+              unix_mcast_group  unix_mcast_group
+                      |                 |
+                      v                 v
+unix_sock  ---->  unix_mcast  ----> unix_mcast
+                      |
+                      v
+unix_sock  ---->  unix_mcast
+                      |
+                      v
+unix_sock  ---->  unix_mcast
+
+
+SOCK_DGRAM semantics
+====================
+
+          G          The socket which created the group
+       /  |  \
+     P1  P2  P3      The member sockets
+
+Messages sent to the group are received by all members except the sender itself
+unless the sending socket has UNIX_MREQ_LOOPBACK set.
+
+Non-members can also send to the group socket G and the message will be
+broadcast to the group members, however socket G does not receive messages sent
+to the group, via it, itself.
+
+
+SOCK_SEQPACKET semantics
+========================
+
+When a connection is performed on a SOCK_SEQPACKET multicast socket, a new
+socket is created and its file descriptor is received by accept().
+
+          L          The listening socket
+       /  |  \
+     A1  A2  A3      The accepted sockets
+      |   |   |
+     C1  C2  C3      The connected sockets
+
+Messages sent on the C1 socket are received by:
+- C1 itself if UNIX_MREQ_LOOPBACK is set.
+- The peer socket A1 if UNIX_MREQ_SEND_TO_PEER is set.
+- The other members of the multicast group C2 and C3.
+
+Only members can send to the group in this case.
+
+
+Atomic delivery and ordering
+============================
+
+Each message sent is delivered atomically to either none of the recipients or
+all the recipients, even with interruptions and errors.
+
+Locking is used in order to keep the ordering consistent on all recipients. We
+want to avoid the following scenario. Two emitters A and B, and 2 recipients, C
+and D:
+
+           C    D
+A -------->|    |    Step 1: A's message is delivered to C
+B -------->|    |    Step 2: B's message is delivered to C
+B ---------|--->|    Step 3: B's message is delivered to D
+A ---------|--->|    Step 4: A's message is delivered to D
+
+Result: - C received (A, B)
+        - D received (B, A)
+
+Although A and B had a list of recipients (C, D) in the same order, C and D
+received the messages in a different order. To avoid this scenario, we need a
+locking mechanism while the messages are being delivered with skb_queue_tail().
+
+Solution 1:
+The easiest implementation would be to use a global spinlock on the group, but
+it creates an avoidable contention, especially when there are two independent
+streams set up with socket filters; e.g. if A sends messages received only by
+C, and B sends messages received only by D.
+
+Solution 2:
+Fine-grained locking could be implemented with a spinlock on each recipient.
+Before delivering the message to the recipients, the sender takes a spinlock on
+each recipient at the same time.
+
+Taking several spinlocks on the same struct can be dangerous and leads to
+deadlocks. This is prevented by sorting the list of sockets by memory address
+and taking the spinlocks in that order. The ordered list of recipients is
+computed on demand when a message is sent and the list is cached for
+performance. When the group membership changes, the generation of the
+membership is incremented and the ordered recipient list is invalidated.
+
+With this solution, the number of spinlocks taken simultaneously can be
+arbitrary big. Whilst it works, it breaks the lockdep mechanism.
+
+Solution 3:
+The current implementation is similar to solution 2 but with a limit on the
+number of spinlocks taken simultaneously (8), so lockdep works fine. A hash
+function and bit array with n=8 specifies which spinlocks to take.  Contention
+on independent streams can still happen but it is less likely.
+
+
+Flow control
+============
+
+When a socket's receiving queue is full, the default behavior is to block
+senders (or to return -EAGAIN on non-blocking sockets). The socket can also
+join a multicast group with the flag UNIX_MREQ_DROP_WHEN_FULL. In this case,
+messages sent to the group will not be delivered to that socket when its
+receiving queue is full.
+
+Messages are still delivered atomically to all members who don't have the flag
+UNIX_MREQ_DROP_WHEN_FULL. If send() returns -EAGAIN, nobody received the
+message. If send() blocks because of one member, the other members don't
+receive the message until all sockets (except those with
+UNIX_MREQ_DROP_WHEN_FULL set) can receive at the same time.
+
+poll/epoll/select on POLLOUT events have a consistent behavior; they block if
+at least one member of the multicast group without UNIX_MREQ_DROP_WHEN_FULL has
+a full receiving queue.
+
+
+Multicast socket reference counting
+===================================
+
+A poller for POLLOUT events can block for any member of the group. The poller
+can use the wait queue "peer_wait" of any member. So it is important that Unix
+sockets are not released before all pollers exit. This is achieved by:
+
+- Incrementing the reference counter of a socket when it joins a multicast
+  group.
+- Decrementing it when the group is destroyed, that is when all
+  sockets keeping a reference on the group released their reference on the
+  group.
+
+struct unix_mcast_group keeps track of both current members and previous
+members. When a socket leaves a group, it is removed from the members list and
+put in the dead members list. This is done in order to take advantage of RCU
+lists, which reduces lock contention.
-- 
1.7.2.3

^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 2/8] af_unix: Add constant for unix socket options level
       [not found] <20110121143751.57b1453d@chocolatine.cbg.collabora.co.uk>
@ 2011-01-21 14:39   ` Alban Crequy
  2011-01-21 14:39   ` Alban Crequy
                     ` (6 subsequent siblings)
  7 siblings, 0 replies; 19+ messages in thread
From: Alban Crequy @ 2011-01-21 14:39 UTC (permalink / raw)
  To: David S. Miller, Eric Dumazet, Lennart Poettering, netdev,
	linux-doc, linux-kernel, Alban Crequy, Ian Molton
  Cc: Alban Crequy

Assign the next free socket options level to be used by the unix
protocol and address family.

Signed-off-by: Alban Crequy <alban.crequy@collabora.co.uk>
Reviewed-by: Ian Molton <ian.molton@collabora.co.uk>
---
 include/linux/socket.h |    1 +
 1 files changed, 1 insertions(+), 0 deletions(-)

diff --git a/include/linux/socket.h b/include/linux/socket.h
index edbb1d0..a257d1c 100644
--- a/include/linux/socket.h
+++ b/include/linux/socket.h
@@ -308,6 +308,7 @@ struct ucred {
 #define SOL_IUCV	277
 #define SOL_CAIF	278
 #define SOL_ALG		279
+#define SOL_UNIX	280
 
 /* IPX options */
 #define IPX_TYPE	1
-- 
1.7.2.3


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 2/8] af_unix: Add constant for unix socket options level
@ 2011-01-21 14:39   ` Alban Crequy
  0 siblings, 0 replies; 19+ messages in thread
From: Alban Crequy @ 2011-01-21 14:39 UTC (permalink / raw)
  To: David S. Miller, Eric Dumazet, Lennart Poettering, netdev,
	linux-doc, linux-
  Cc: Alban Crequy

Assign the next free socket options level to be used by the unix
protocol and address family.

Signed-off-by: Alban Crequy <alban.crequy@collabora.co.uk>
Reviewed-by: Ian Molton <ian.molton@collabora.co.uk>
---
 include/linux/socket.h |    1 +
 1 files changed, 1 insertions(+), 0 deletions(-)

diff --git a/include/linux/socket.h b/include/linux/socket.h
index edbb1d0..a257d1c 100644
--- a/include/linux/socket.h
+++ b/include/linux/socket.h
@@ -308,6 +308,7 @@ struct ucred {
 #define SOL_IUCV	277
 #define SOL_CAIF	278
 #define SOL_ALG		279
+#define SOL_UNIX	280
 
 /* IPX options */
 #define IPX_TYPE	1
-- 
1.7.2.3


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 3/8] af_unix: add setsockopt on unix sockets
       [not found] <20110121143751.57b1453d@chocolatine.cbg.collabora.co.uk>
@ 2011-01-21 14:39   ` Alban Crequy
  2011-01-21 14:39   ` Alban Crequy
                     ` (6 subsequent siblings)
  7 siblings, 0 replies; 19+ messages in thread
From: Alban Crequy @ 2011-01-21 14:39 UTC (permalink / raw)
  To: David S. Miller, Eric Dumazet, Lennart Poettering, netdev,
	linux-doc, linux-kernel, Alban Crequy, Ian Molton
  Cc: Alban Crequy

unix_setsockopt() is called only on SOCK_DGRAM and SOCK_SEQPACKET unix sockets

Signed-off-by: Alban Crequy <alban.crequy@collabora.co.uk>
Reviewed-by: Ian Molton <ian.molton@collabora.co.uk>
---
 net/unix/af_unix.c |   13 +++++++++++--
 1 files changed, 11 insertions(+), 2 deletions(-)

diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index d8d98d5..7ea85de 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -512,6 +512,8 @@ static unsigned int unix_dgram_poll(struct file *, struct socket *,
 				    poll_table *);
 static int unix_ioctl(struct socket *, unsigned int, unsigned long);
 static int unix_shutdown(struct socket *, int);
+static int unix_setsockopt(struct socket *, int, int,
+			   char __user *, unsigned int);
 static int unix_stream_sendmsg(struct kiocb *, struct socket *,
 			       struct msghdr *, size_t);
 static int unix_stream_recvmsg(struct kiocb *, struct socket *,
@@ -559,7 +561,7 @@ static const struct proto_ops unix_dgram_ops = {
 	.ioctl =	unix_ioctl,
 	.listen =	sock_no_listen,
 	.shutdown =	unix_shutdown,
-	.setsockopt =	sock_no_setsockopt,
+	.setsockopt =	unix_setsockopt,
 	.getsockopt =	sock_no_getsockopt,
 	.sendmsg =	unix_dgram_sendmsg,
 	.recvmsg =	unix_dgram_recvmsg,
@@ -580,7 +582,7 @@ static const struct proto_ops unix_seqpacket_ops = {
 	.ioctl =	unix_ioctl,
 	.listen =	unix_listen,
 	.shutdown =	unix_shutdown,
-	.setsockopt =	sock_no_setsockopt,
+	.setsockopt =	unix_setsockopt,
 	.getsockopt =	sock_no_getsockopt,
 	.sendmsg =	unix_seqpacket_sendmsg,
 	.recvmsg =	unix_dgram_recvmsg,
@@ -1561,6 +1563,13 @@ out:
 }
 
 
+static int unix_setsockopt(struct socket *sock, int level, int optname,
+			   char __user *optval, unsigned int optlen)
+{
+	return -EOPNOTSUPP;
+}
+
+
 static int unix_stream_sendmsg(struct kiocb *kiocb, struct socket *sock,
 			       struct msghdr *msg, size_t len)
 {
-- 
1.7.2.3


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 3/8] af_unix: add setsockopt on unix sockets
@ 2011-01-21 14:39   ` Alban Crequy
  0 siblings, 0 replies; 19+ messages in thread
From: Alban Crequy @ 2011-01-21 14:39 UTC (permalink / raw)
  To: David S. Miller, Eric Dumazet, Lennart Poettering, netdev,
	linux-doc, linux-
  Cc: Alban Crequy

unix_setsockopt() is called only on SOCK_DGRAM and SOCK_SEQPACKET unix sockets

Signed-off-by: Alban Crequy <alban.crequy@collabora.co.uk>
Reviewed-by: Ian Molton <ian.molton@collabora.co.uk>
---
 net/unix/af_unix.c |   13 +++++++++++--
 1 files changed, 11 insertions(+), 2 deletions(-)

diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index d8d98d5..7ea85de 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -512,6 +512,8 @@ static unsigned int unix_dgram_poll(struct file *, struct socket *,
 				    poll_table *);
 static int unix_ioctl(struct socket *, unsigned int, unsigned long);
 static int unix_shutdown(struct socket *, int);
+static int unix_setsockopt(struct socket *, int, int,
+			   char __user *, unsigned int);
 static int unix_stream_sendmsg(struct kiocb *, struct socket *,
 			       struct msghdr *, size_t);
 static int unix_stream_recvmsg(struct kiocb *, struct socket *,
@@ -559,7 +561,7 @@ static const struct proto_ops unix_dgram_ops = {
 	.ioctl =	unix_ioctl,
 	.listen =	sock_no_listen,
 	.shutdown =	unix_shutdown,
-	.setsockopt =	sock_no_setsockopt,
+	.setsockopt =	unix_setsockopt,
 	.getsockopt =	sock_no_getsockopt,
 	.sendmsg =	unix_dgram_sendmsg,
 	.recvmsg =	unix_dgram_recvmsg,
@@ -580,7 +582,7 @@ static const struct proto_ops unix_seqpacket_ops = {
 	.ioctl =	unix_ioctl,
 	.listen =	unix_listen,
 	.shutdown =	unix_shutdown,
-	.setsockopt =	sock_no_setsockopt,
+	.setsockopt =	unix_setsockopt,
 	.getsockopt =	sock_no_getsockopt,
 	.sendmsg =	unix_seqpacket_sendmsg,
 	.recvmsg =	unix_dgram_recvmsg,
@@ -1561,6 +1563,13 @@ out:
 }
 
 
+static int unix_setsockopt(struct socket *sock, int level, int optname,
+			   char __user *optval, unsigned int optlen)
+{
+	return -EOPNOTSUPP;
+}
+
+
 static int unix_stream_sendmsg(struct kiocb *kiocb, struct socket *sock,
 			       struct msghdr *msg, size_t len)
 {
-- 
1.7.2.3

^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 4/8] af_unix: create, join and leave multicast groups with setsockopt
       [not found] <20110121143751.57b1453d@chocolatine.cbg.collabora.co.uk>
@ 2011-01-21 14:39   ` Alban Crequy
  2011-01-21 14:39   ` Alban Crequy
                     ` (6 subsequent siblings)
  7 siblings, 0 replies; 19+ messages in thread
From: Alban Crequy @ 2011-01-21 14:39 UTC (permalink / raw)
  To: David S. Miller, Eric Dumazet, Lennart Poettering, netdev,
	linux-doc, linux-kernel, Alban Crequy, Ian Molton
  Cc: Alban Crequy, Ian Molton

Multicast is implemented on SOCK_DGRAM and SOCK_SEQPACKET unix sockets.

An userspace application can create a multicast group with:
  struct unix_mreq mreq;
  mreq.address.sun_family = AF_UNIX;
  mreq.address.sun_path[0] = '\0';
  strcpy(mreq.address.sun_path + 1, "socket-address");
  mreq.flags = 0;

  sockfd = socket(AF_UNIX, SOCK_DGRAM, 0);
  ret = setsockopt(sockfd, SOL_UNIX, UNIX_CREATE_GROUP, &mreq, sizeof(mreq));

Then a multicast group can be joined and left with:
  ret = setsockopt(sockfd, SOL_UNIX, UNIX_JOIN_GROUP, &mreq, sizeof(mreq));
  ret = setsockopt(sockfd, SOL_UNIX, UNIX_LEAVE_GROUP, &mreq, sizeof(mreq));

A socket can be a member of several multicast group.

Signed-off-by: Alban Crequy <alban.crequy@collabora.co.uk>
Signed-off-by: Ian Molton <ian.molton@collabora.co.uk>
---
 include/net/af_unix.h |   77 +++++++++++
 net/unix/Kconfig      |   10 ++
 net/unix/af_unix.c    |  339 ++++++++++++++++++++++++++++++++++++++++++++++++-
 3 files changed, 424 insertions(+), 2 deletions(-)

diff --git a/include/net/af_unix.h b/include/net/af_unix.h
index 18e5c3f..f2b605b 100644
--- a/include/net/af_unix.h
+++ b/include/net/af_unix.h
@@ -41,7 +41,62 @@ struct unix_skb_parms {
 				spin_lock_nested(&unix_sk(s)->lock, \
 				SINGLE_DEPTH_NESTING)
 
+/* UNIX socket options */
+#define UNIX_CREATE_GROUP	1
+#define UNIX_JOIN_GROUP		2
+#define UNIX_LEAVE_GROUP	3
+
+/* Flags on unix_mreq */
+
+/* On UNIX_JOIN_GROUP: the socket will receive its own messages */
+#define UNIX_MREQ_LOOPBACK		0x01
+
+/* ON UNIX_JOIN_GROUP: the messages will also be received by the peer */
+#define UNIX_MREQ_SEND_TO_PEER		0x02
+
+/* ON UNIX_JOIN_GROUP: just drop the message instead of blocking if the
+ * receiving queue is full */
+#define UNIX_MREQ_DROP_WHEN_FULL	0x04
+
+struct unix_mreq {
+	struct sockaddr_un	address;
+	unsigned int		flags;
+};
+
 #ifdef __KERNEL__
+
+struct unix_mcast_group {
+	/* RCU list of (struct unix_mcast)->member_node
+	 * Messages sent to the multicast group are delivered to this list of
+	 * members */
+	struct hlist_head	mcast_members;
+
+	/* RCU list of (struct unix_mcast)->member_dead_node
+	 * When the group dies, previous members' reference counters must be
+	 * decremented */
+	struct hlist_head	mcast_dead_members;
+
+	/* RCU list of (struct sock_set)->list */
+	struct hlist_head	mcast_members_lists;
+
+	atomic_t		mcast_members_cnt;
+
+	/* The generation is incremented each time a peer joins or
+	 * leaves the group. It is used to invalidate old lists
+	 * struct sock_set */
+	atomic_t		mcast_membership_generation;
+
+	/* Locks to guarantee causal order in deliveries */
+#define MCAST_LOCK_CLASS_COUNT	8
+	spinlock_t		lock[MCAST_LOCK_CLASS_COUNT];
+
+	/* The group is referenced by:
+	 * - the socket who created the multicast group
+	 * - the accepted sockets (SOCK_SEQPACKET only)
+	 * - the current members of the group */
+	atomic_t		refcnt;
+};
+
 /* The AF_UNIX socket */
 struct unix_sock {
 	/* WARNING: sk has to be the first member */
@@ -57,9 +112,31 @@ struct unix_sock {
 	spinlock_t		lock;
 	unsigned int		gc_candidate : 1;
 	unsigned int		gc_maybe_cycle : 1;
+	unsigned int		mcast_send_to_peer : 1;
+	unsigned int		mcast_drop_when_peer_full : 1;
 	unsigned char		recursion_level;
+	struct unix_mcast_group	*mcast_group;
+
+	/* RCU List of (struct unix_mcast)->subscription_node
+	 * A socket can subscribe to several multicast group
+	 */
+	struct hlist_head	mcast_subscriptions;
+
 	struct socket_wq	peer_wq;
 };
+
+struct unix_mcast {
+	struct unix_sock	*member;
+	struct unix_mcast_group	*group;
+	unsigned int		flags;
+	struct hlist_node	subscription_node;
+	/* A subscription cannot be both alive and dead but we cannot use the
+	 * same field because RCU readers run lockless. member_dead_node is
+	 * not read by lockless RCU readers. */
+	struct hlist_node	member_node;
+	struct hlist_node	member_dead_node;
+};
+
 #define unix_sk(__sk) ((struct unix_sock *)__sk)
 
 #define peer_wait peer_wq.wait
diff --git a/net/unix/Kconfig b/net/unix/Kconfig
index 5a69733..e3e5d9b 100644
--- a/net/unix/Kconfig
+++ b/net/unix/Kconfig
@@ -19,3 +19,13 @@ config UNIX
 
 	  Say Y unless you know what you are doing.
 
+config UNIX_MULTICAST
+	depends on UNIX && EXPERIMENTAL
+	bool "Multicast over Unix domain sockets"
+	---help---
+	  If you say Y here, you will include support for multicasting on Unix
+	  domain sockets. Support is available for SOCK_DGRAM and
+	  SOCK_SEQPACKET. Certain types of delivery synchronisation are
+	  provided, see Documentation/networking/multicast-unix-sockets.txt
+
+
diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index 7ea85de..f25c020 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -117,6 +117,9 @@
 
 static struct hlist_head unix_socket_table[UNIX_HASH_SIZE + 1];
 static DEFINE_SPINLOCK(unix_table_lock);
+#ifdef CONFIG_UNIX_MULTICAST
+static DEFINE_SPINLOCK(unix_multicast_lock);
+#endif
 static atomic_long_t unix_nr_socks;
 
 #define unix_sockets_unbound	(&unix_socket_table[UNIX_HASH_SIZE])
@@ -371,6 +374,28 @@ static void unix_sock_destructor(struct sock *sk)
 #endif
 }
 
+#ifdef CONFIG_UNIX_MULTICAST
+static void
+destroy_mcast_group(struct unix_mcast_group *group)
+{
+	struct unix_mcast *node;
+	struct hlist_node *pos;
+	struct hlist_node *pos_tmp;
+
+	BUG_ON(atomic_read(&group->refcnt) != 0);
+	BUG_ON(!hlist_empty(&group->mcast_members));
+
+	hlist_for_each_entry_safe(node, pos, pos_tmp,
+				  &group->mcast_dead_members,
+				  member_dead_node) {
+		hlist_del_rcu(&node->member_dead_node);
+		sock_put(&node->member->sk);
+		kfree(node);
+	}
+	kfree(group);
+}
+#endif
+
 static int unix_release_sock(struct sock *sk, int embrion)
 {
 	struct unix_sock *u = unix_sk(sk);
@@ -379,6 +404,11 @@ static int unix_release_sock(struct sock *sk, int embrion)
 	struct sock *skpair;
 	struct sk_buff *skb;
 	int state;
+#ifdef CONFIG_UNIX_MULTICAST
+	struct unix_mcast *node;
+	struct hlist_node *pos;
+	struct hlist_node *pos_tmp;
+#endif
 
 	unix_remove_socket(sk);
 
@@ -392,6 +422,23 @@ static int unix_release_sock(struct sock *sk, int embrion)
 	u->mnt	     = NULL;
 	state = sk->sk_state;
 	sk->sk_state = TCP_CLOSE;
+#ifdef CONFIG_UNIX_MULTICAST
+	spin_lock(&unix_multicast_lock);
+	hlist_for_each_entry_safe(node, pos, pos_tmp, &u->mcast_subscriptions,
+				  subscription_node) {
+		hlist_del_rcu(&node->member_node);
+		hlist_del_rcu(&node->subscription_node);
+		atomic_dec(&node->group->mcast_members_cnt);
+		atomic_inc(&node->group->mcast_membership_generation);
+		hlist_add_head_rcu(&node->member_dead_node,
+				   &node->group->mcast_dead_members);
+		if (atomic_dec_and_test(&node->group->refcnt))
+			destroy_mcast_group(node->group);
+	}
+	if (u->mcast_group && atomic_dec_and_test(&u->mcast_group->refcnt))
+		destroy_mcast_group(u->mcast_group);
+	spin_unlock(&unix_multicast_lock);
+#endif
 	unix_state_unlock(sk);
 
 	wake_up_interruptible_all(&u->peer_wait);
@@ -631,6 +678,9 @@ static struct sock *unix_create1(struct net *net, struct socket *sock)
 	atomic_long_set(&u->inflight, 0);
 	INIT_LIST_HEAD(&u->link);
 	mutex_init(&u->readlock); /* single task reading lock */
+#ifdef CONFIG_UNIX_MULTICAST
+	INIT_HLIST_HEAD(&u->mcast_subscriptions);
+#endif
 	init_waitqueue_head(&u->peer_wait);
 	unix_insert_socket(unix_sockets_unbound, sk);
 out:
@@ -1055,6 +1105,10 @@ static int unix_stream_connect(struct socket *sock, struct sockaddr *uaddr,
 	struct sock *newsk = NULL;
 	struct sock *other = NULL;
 	struct sk_buff *skb = NULL;
+#ifdef CONFIG_UNIX_MULTICAST
+	struct unix_mcast *node;
+	struct hlist_node *pos;
+#endif
 	unsigned hash;
 	int st;
 	int err;
@@ -1082,6 +1136,7 @@ static int unix_stream_connect(struct socket *sock, struct sockaddr *uaddr,
 	newsk = unix_create1(sock_net(sk), NULL);
 	if (newsk == NULL)
 		goto out;
+	newu = unix_sk(newsk);
 
 	/* Allocate skb for sending to listening sock */
 	skb = sock_wmalloc(newsk, 1, 0, GFP_KERNEL);
@@ -1094,6 +1149,8 @@ restart:
 	if (!other)
 		goto out;
 
+	otheru = unix_sk(other);
+
 	/* Latch state of peer */
 	unix_state_lock(other);
 
@@ -1165,6 +1222,18 @@ restart:
 		goto out_unlock;
 	}
 
+#ifdef CONFIG_UNIX_MULTICAST
+	/* Multicast sockets */
+	hlist_for_each_entry_rcu(node, pos, &u->mcast_subscriptions,
+				 subscription_node) {
+		if (node->group == otheru->mcast_group) {
+			atomic_inc(&otheru->mcast_group->refcnt);
+			newu->mcast_group = otheru->mcast_group;
+			break;
+		}
+	}
+#endif
+
 	/* The way is open! Fastly set all the necessary fields... */
 
 	sock_hold(sk);
@@ -1172,9 +1241,7 @@ restart:
 	newsk->sk_state		= TCP_ESTABLISHED;
 	newsk->sk_type		= sk->sk_type;
 	init_peercred(newsk);
-	newu = unix_sk(newsk);
 	newsk->sk_wq		= &newu->peer_wq;
-	otheru = unix_sk(other);
 
 	/* copy address information from listening to new sock*/
 	if (otheru->addr) {
@@ -1563,10 +1630,278 @@ out:
 }
 
 
+#ifdef CONFIG_UNIX_MULTICAST
+static int unix_mc_create(struct socket *sock, struct unix_mreq *mreq)
+{
+	struct sock *other;
+	int err;
+	unsigned hash;
+	int namelen;
+	struct unix_mcast_group *mcast_group;
+	int i;
+
+	if (mreq->address.sun_family != AF_UNIX ||
+	    mreq->address.sun_path[0] != '\0')
+		return -EINVAL;
+
+	err = unix_mkname(&mreq->address, sizeof(struct sockaddr_un), &hash);
+	if (err < 0)
+		return err;
+
+	namelen = err;
+	other = unix_find_other(sock_net(sock->sk), &mreq->address, namelen,
+				sock->type, hash, &err);
+	if (other) {
+		sock_put(other);
+		return -EADDRINUSE;
+	}
+
+	mcast_group = kmalloc(sizeof(struct unix_mcast_group), GFP_KERNEL);
+	if (!mcast_group)
+		return -ENOBUFS;
+
+	INIT_HLIST_HEAD(&mcast_group->mcast_members);
+	INIT_HLIST_HEAD(&mcast_group->mcast_dead_members);
+	INIT_HLIST_HEAD(&mcast_group->mcast_members_lists);
+	atomic_set(&mcast_group->mcast_members_cnt, 0);
+	atomic_set(&mcast_group->mcast_membership_generation, 1);
+	atomic_set(&mcast_group->refcnt, 1);
+	for (i = 0 ; i < MCAST_LOCK_CLASS_COUNT ; i++) {
+		spin_lock_init(&mcast_group->lock[i]);
+		lockdep_set_subclass(&mcast_group->lock[i], i);
+	}
+
+	err = sock->ops->bind(sock,
+		(struct sockaddr *)&mreq->address,
+		sizeof(struct sockaddr_un));
+	if (err < 0) {
+		kfree(mcast_group);
+		return err;
+	}
+
+	unix_state_lock(sock->sk);
+	unix_sk(sock->sk)->mcast_group = mcast_group;
+	unix_state_unlock(sock->sk);
+
+	return 0;
+}
+
+
+static int unix_mc_join(struct socket *sock, struct unix_mreq *mreq)
+{
+	struct unix_sock *u = unix_sk(sock->sk);
+	struct sock *other, *peer;
+	struct unix_mcast_group *group;
+	struct unix_mcast *node;
+	int err;
+	unsigned hash;
+	int namelen;
+
+	if (mreq->address.sun_family != AF_UNIX ||
+	    mreq->address.sun_path[0] != '\0')
+		return -EINVAL;
+
+	/* sockets which represent a group are not allowed to join another
+	 * group */
+	if (u->mcast_group)
+		return -EINVAL;
+
+	err = unix_autobind(sock);
+	if (err < 0)
+		return err;
+
+	err = unix_mkname(&mreq->address, sizeof(struct sockaddr_un), &hash);
+	if (err < 0)
+		return err;
+
+	namelen = err;
+	other = unix_find_other(sock_net(sock->sk), &mreq->address, namelen,
+				sock->type, hash, &err);
+	if (!other)
+		return -EINVAL;
+
+	group = unix_sk(other)->mcast_group;
+
+	if (!group) {
+		err = -EADDRINUSE;
+		goto sock_put_out;
+	}
+
+	node = kmalloc(sizeof(struct unix_mcast), GFP_KERNEL);
+	if (!node) {
+		err = -ENOMEM;
+		goto sock_put_out;
+	}
+	node->member = u;
+	node->group = group;
+	node->flags = mreq->flags;
+
+	if (sock->sk->sk_type == SOCK_SEQPACKET) {
+		peer = unix_peer_get(sock->sk);
+		if (peer) {
+			atomic_inc(&group->refcnt);
+			unix_sk(peer)->mcast_group = group;
+			sock_put(peer);
+		}
+	}
+
+	unix_state_lock(sock->sk);
+	unix_sk(sock->sk)->mcast_send_to_peer =
+		!!(mreq->flags & UNIX_MREQ_SEND_TO_PEER);
+	unix_sk(sock->sk)->mcast_drop_when_peer_full =
+		!!(mreq->flags & UNIX_MREQ_DROP_WHEN_FULL);
+	unix_state_unlock(sock->sk);
+
+	/* Keep a reference */
+	sock_hold(sock->sk);
+	atomic_inc(&group->refcnt);
+
+	spin_lock(&unix_multicast_lock);
+	hlist_add_head_rcu(&node->member_node,
+			   &group->mcast_members);
+	hlist_add_head_rcu(&node->subscription_node, &u->mcast_subscriptions);
+	atomic_inc(&group->mcast_members_cnt);
+	atomic_inc(&group->mcast_membership_generation);
+	spin_unlock(&unix_multicast_lock);
+
+	return 0;
+
+sock_put_out:
+	sock_put(other);
+	return err;
+}
+
+
+static int unix_mc_leave(struct socket *sock, struct unix_mreq *mreq)
+{
+	struct unix_sock *u = unix_sk(sock->sk);
+	struct sock *other;
+	struct unix_mcast_group *group;
+	struct unix_mcast *node;
+	struct hlist_node *pos;
+	int err;
+	unsigned hash;
+	int namelen;
+
+	if (mreq->address.sun_family != AF_UNIX ||
+	    mreq->address.sun_path[0] != '\0')
+		return -EINVAL;
+
+	err = unix_mkname(&mreq->address, sizeof(struct sockaddr_un), &hash);
+	if (err < 0)
+		return err;
+
+	namelen = err;
+	other = unix_find_other(sock_net(sock->sk), &mreq->address, namelen,
+				sock->type, hash, &err);
+	if (!other)
+		return -EINVAL;
+
+	group = unix_sk(other)->mcast_group;
+
+	if (!group) {
+		err = -EINVAL;
+		goto sock_put_out;
+	}
+
+	spin_lock(&unix_multicast_lock);
+
+	hlist_for_each_entry_rcu(node, pos, &u->mcast_subscriptions,
+			     subscription_node) {
+		if (node->group == group)
+			break;
+	}
+
+	if (!pos) {
+		spin_unlock(&unix_multicast_lock);
+		err = -EINVAL;
+		goto sock_put_out;
+	}
+
+	hlist_del_rcu(&node->member_node);
+	hlist_del_rcu(&node->subscription_node);
+	atomic_dec(&group->mcast_members_cnt);
+	atomic_inc(&group->mcast_membership_generation);
+	hlist_add_head_rcu(&node->member_dead_node,
+			   &group->mcast_dead_members);
+	spin_unlock(&unix_multicast_lock);
+
+	if (sock->sk->sk_type == SOCK_SEQPACKET) {
+		struct sock *peer = unix_peer_get(sock->sk);
+		if (peer) {
+			unix_sk(peer)->mcast_group = NULL;
+			atomic_dec(&group->refcnt);
+			sock_put(peer);
+		}
+	}
+
+	synchronize_rcu();
+
+	if (atomic_dec_and_test(&group->refcnt)) {
+		spin_lock(&unix_multicast_lock);
+		destroy_mcast_group(group);
+		spin_unlock(&unix_multicast_lock);
+	}
+
+	err = 0;
+
+	/* If the receiving queue of that socket was full, some writers on the
+	 * multicast group may be blocked */
+	wake_up_interruptible_sync_poll(&u->peer_wait,
+					POLLOUT | POLLWRNORM | POLLWRBAND);
+
+sock_put_out:
+	sock_put(other);
+	return err;
+}
+#endif
+
 static int unix_setsockopt(struct socket *sock, int level, int optname,
 			   char __user *optval, unsigned int optlen)
 {
+#ifdef CONFIG_UNIX_MULTICAST
+	struct unix_mreq mreq;
+	int err = 0;
+
+	if (level != SOL_UNIX)
+		return -ENOPROTOOPT;
+
+	switch (optname) {
+	case UNIX_CREATE_GROUP:
+	case UNIX_JOIN_GROUP:
+	case UNIX_LEAVE_GROUP:
+		if (optlen < sizeof(struct unix_mreq))
+			return -EINVAL;
+		if (copy_from_user(&mreq, optval, sizeof(struct unix_mreq)))
+			return -EFAULT;
+		break;
+
+	default:
+		break;
+	}
+
+	switch (optname) {
+	case UNIX_CREATE_GROUP:
+		err = unix_mc_create(sock, &mreq);
+		break;
+
+	case UNIX_JOIN_GROUP:
+		err = unix_mc_join(sock, &mreq);
+		break;
+
+	case UNIX_LEAVE_GROUP:
+		err = unix_mc_leave(sock, &mreq);
+		break;
+
+	default:
+		err = -ENOPROTOOPT;
+		break;
+	}
+
+	return err;
+#else
 	return -EOPNOTSUPP;
+#endif
 }
 
 
-- 
1.7.2.3


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 4/8] af_unix: create, join and leave multicast groups with setsockopt
@ 2011-01-21 14:39   ` Alban Crequy
  0 siblings, 0 replies; 19+ messages in thread
From: Alban Crequy @ 2011-01-21 14:39 UTC (permalink / raw)
  To: David S. Miller, Eric Dumazet, Lennart Poettering, netdev,
	linux-doc, linux-
  Cc: Alban Crequy, Ian Molton

Multicast is implemented on SOCK_DGRAM and SOCK_SEQPACKET unix sockets.

An userspace application can create a multicast group with:
  struct unix_mreq mreq;
  mreq.address.sun_family = AF_UNIX;
  mreq.address.sun_path[0] = '\0';
  strcpy(mreq.address.sun_path + 1, "socket-address");
  mreq.flags = 0;

  sockfd = socket(AF_UNIX, SOCK_DGRAM, 0);
  ret = setsockopt(sockfd, SOL_UNIX, UNIX_CREATE_GROUP, &mreq, sizeof(mreq));

Then a multicast group can be joined and left with:
  ret = setsockopt(sockfd, SOL_UNIX, UNIX_JOIN_GROUP, &mreq, sizeof(mreq));
  ret = setsockopt(sockfd, SOL_UNIX, UNIX_LEAVE_GROUP, &mreq, sizeof(mreq));

A socket can be a member of several multicast group.

Signed-off-by: Alban Crequy <alban.crequy@collabora.co.uk>
Signed-off-by: Ian Molton <ian.molton@collabora.co.uk>
---
 include/net/af_unix.h |   77 +++++++++++
 net/unix/Kconfig      |   10 ++
 net/unix/af_unix.c    |  339 ++++++++++++++++++++++++++++++++++++++++++++++++-
 3 files changed, 424 insertions(+), 2 deletions(-)

diff --git a/include/net/af_unix.h b/include/net/af_unix.h
index 18e5c3f..f2b605b 100644
--- a/include/net/af_unix.h
+++ b/include/net/af_unix.h
@@ -41,7 +41,62 @@ struct unix_skb_parms {
 				spin_lock_nested(&unix_sk(s)->lock, \
 				SINGLE_DEPTH_NESTING)
 
+/* UNIX socket options */
+#define UNIX_CREATE_GROUP	1
+#define UNIX_JOIN_GROUP		2
+#define UNIX_LEAVE_GROUP	3
+
+/* Flags on unix_mreq */
+
+/* On UNIX_JOIN_GROUP: the socket will receive its own messages */
+#define UNIX_MREQ_LOOPBACK		0x01
+
+/* ON UNIX_JOIN_GROUP: the messages will also be received by the peer */
+#define UNIX_MREQ_SEND_TO_PEER		0x02
+
+/* ON UNIX_JOIN_GROUP: just drop the message instead of blocking if the
+ * receiving queue is full */
+#define UNIX_MREQ_DROP_WHEN_FULL	0x04
+
+struct unix_mreq {
+	struct sockaddr_un	address;
+	unsigned int		flags;
+};
+
 #ifdef __KERNEL__
+
+struct unix_mcast_group {
+	/* RCU list of (struct unix_mcast)->member_node
+	 * Messages sent to the multicast group are delivered to this list of
+	 * members */
+	struct hlist_head	mcast_members;
+
+	/* RCU list of (struct unix_mcast)->member_dead_node
+	 * When the group dies, previous members' reference counters must be
+	 * decremented */
+	struct hlist_head	mcast_dead_members;
+
+	/* RCU list of (struct sock_set)->list */
+	struct hlist_head	mcast_members_lists;
+
+	atomic_t		mcast_members_cnt;
+
+	/* The generation is incremented each time a peer joins or
+	 * leaves the group. It is used to invalidate old lists
+	 * struct sock_set */
+	atomic_t		mcast_membership_generation;
+
+	/* Locks to guarantee causal order in deliveries */
+#define MCAST_LOCK_CLASS_COUNT	8
+	spinlock_t		lock[MCAST_LOCK_CLASS_COUNT];
+
+	/* The group is referenced by:
+	 * - the socket who created the multicast group
+	 * - the accepted sockets (SOCK_SEQPACKET only)
+	 * - the current members of the group */
+	atomic_t		refcnt;
+};
+
 /* The AF_UNIX socket */
 struct unix_sock {
 	/* WARNING: sk has to be the first member */
@@ -57,9 +112,31 @@ struct unix_sock {
 	spinlock_t		lock;
 	unsigned int		gc_candidate : 1;
 	unsigned int		gc_maybe_cycle : 1;
+	unsigned int		mcast_send_to_peer : 1;
+	unsigned int		mcast_drop_when_peer_full : 1;
 	unsigned char		recursion_level;
+	struct unix_mcast_group	*mcast_group;
+
+	/* RCU List of (struct unix_mcast)->subscription_node
+	 * A socket can subscribe to several multicast group
+	 */
+	struct hlist_head	mcast_subscriptions;
+
 	struct socket_wq	peer_wq;
 };
+
+struct unix_mcast {
+	struct unix_sock	*member;
+	struct unix_mcast_group	*group;
+	unsigned int		flags;
+	struct hlist_node	subscription_node;
+	/* A subscription cannot be both alive and dead but we cannot use the
+	 * same field because RCU readers run lockless. member_dead_node is
+	 * not read by lockless RCU readers. */
+	struct hlist_node	member_node;
+	struct hlist_node	member_dead_node;
+};
+
 #define unix_sk(__sk) ((struct unix_sock *)__sk)
 
 #define peer_wait peer_wq.wait
diff --git a/net/unix/Kconfig b/net/unix/Kconfig
index 5a69733..e3e5d9b 100644
--- a/net/unix/Kconfig
+++ b/net/unix/Kconfig
@@ -19,3 +19,13 @@ config UNIX
 
 	  Say Y unless you know what you are doing.
 
+config UNIX_MULTICAST
+	depends on UNIX && EXPERIMENTAL
+	bool "Multicast over Unix domain sockets"
+	---help---
+	  If you say Y here, you will include support for multicasting on Unix
+	  domain sockets. Support is available for SOCK_DGRAM and
+	  SOCK_SEQPACKET. Certain types of delivery synchronisation are
+	  provided, see Documentation/networking/multicast-unix-sockets.txt
+
+
diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index 7ea85de..f25c020 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -117,6 +117,9 @@
 
 static struct hlist_head unix_socket_table[UNIX_HASH_SIZE + 1];
 static DEFINE_SPINLOCK(unix_table_lock);
+#ifdef CONFIG_UNIX_MULTICAST
+static DEFINE_SPINLOCK(unix_multicast_lock);
+#endif
 static atomic_long_t unix_nr_socks;
 
 #define unix_sockets_unbound	(&unix_socket_table[UNIX_HASH_SIZE])
@@ -371,6 +374,28 @@ static void unix_sock_destructor(struct sock *sk)
 #endif
 }
 
+#ifdef CONFIG_UNIX_MULTICAST
+static void
+destroy_mcast_group(struct unix_mcast_group *group)
+{
+	struct unix_mcast *node;
+	struct hlist_node *pos;
+	struct hlist_node *pos_tmp;
+
+	BUG_ON(atomic_read(&group->refcnt) != 0);
+	BUG_ON(!hlist_empty(&group->mcast_members));
+
+	hlist_for_each_entry_safe(node, pos, pos_tmp,
+				  &group->mcast_dead_members,
+				  member_dead_node) {
+		hlist_del_rcu(&node->member_dead_node);
+		sock_put(&node->member->sk);
+		kfree(node);
+	}
+	kfree(group);
+}
+#endif
+
 static int unix_release_sock(struct sock *sk, int embrion)
 {
 	struct unix_sock *u = unix_sk(sk);
@@ -379,6 +404,11 @@ static int unix_release_sock(struct sock *sk, int embrion)
 	struct sock *skpair;
 	struct sk_buff *skb;
 	int state;
+#ifdef CONFIG_UNIX_MULTICAST
+	struct unix_mcast *node;
+	struct hlist_node *pos;
+	struct hlist_node *pos_tmp;
+#endif
 
 	unix_remove_socket(sk);
 
@@ -392,6 +422,23 @@ static int unix_release_sock(struct sock *sk, int embrion)
 	u->mnt	     = NULL;
 	state = sk->sk_state;
 	sk->sk_state = TCP_CLOSE;
+#ifdef CONFIG_UNIX_MULTICAST
+	spin_lock(&unix_multicast_lock);
+	hlist_for_each_entry_safe(node, pos, pos_tmp, &u->mcast_subscriptions,
+				  subscription_node) {
+		hlist_del_rcu(&node->member_node);
+		hlist_del_rcu(&node->subscription_node);
+		atomic_dec(&node->group->mcast_members_cnt);
+		atomic_inc(&node->group->mcast_membership_generation);
+		hlist_add_head_rcu(&node->member_dead_node,
+				   &node->group->mcast_dead_members);
+		if (atomic_dec_and_test(&node->group->refcnt))
+			destroy_mcast_group(node->group);
+	}
+	if (u->mcast_group && atomic_dec_and_test(&u->mcast_group->refcnt))
+		destroy_mcast_group(u->mcast_group);
+	spin_unlock(&unix_multicast_lock);
+#endif
 	unix_state_unlock(sk);
 
 	wake_up_interruptible_all(&u->peer_wait);
@@ -631,6 +678,9 @@ static struct sock *unix_create1(struct net *net, struct socket *sock)
 	atomic_long_set(&u->inflight, 0);
 	INIT_LIST_HEAD(&u->link);
 	mutex_init(&u->readlock); /* single task reading lock */
+#ifdef CONFIG_UNIX_MULTICAST
+	INIT_HLIST_HEAD(&u->mcast_subscriptions);
+#endif
 	init_waitqueue_head(&u->peer_wait);
 	unix_insert_socket(unix_sockets_unbound, sk);
 out:
@@ -1055,6 +1105,10 @@ static int unix_stream_connect(struct socket *sock, struct sockaddr *uaddr,
 	struct sock *newsk = NULL;
 	struct sock *other = NULL;
 	struct sk_buff *skb = NULL;
+#ifdef CONFIG_UNIX_MULTICAST
+	struct unix_mcast *node;
+	struct hlist_node *pos;
+#endif
 	unsigned hash;
 	int st;
 	int err;
@@ -1082,6 +1136,7 @@ static int unix_stream_connect(struct socket *sock, struct sockaddr *uaddr,
 	newsk = unix_create1(sock_net(sk), NULL);
 	if (newsk == NULL)
 		goto out;
+	newu = unix_sk(newsk);
 
 	/* Allocate skb for sending to listening sock */
 	skb = sock_wmalloc(newsk, 1, 0, GFP_KERNEL);
@@ -1094,6 +1149,8 @@ restart:
 	if (!other)
 		goto out;
 
+	otheru = unix_sk(other);
+
 	/* Latch state of peer */
 	unix_state_lock(other);
 
@@ -1165,6 +1222,18 @@ restart:
 		goto out_unlock;
 	}
 
+#ifdef CONFIG_UNIX_MULTICAST
+	/* Multicast sockets */
+	hlist_for_each_entry_rcu(node, pos, &u->mcast_subscriptions,
+				 subscription_node) {
+		if (node->group == otheru->mcast_group) {
+			atomic_inc(&otheru->mcast_group->refcnt);
+			newu->mcast_group = otheru->mcast_group;
+			break;
+		}
+	}
+#endif
+
 	/* The way is open! Fastly set all the necessary fields... */
 
 	sock_hold(sk);
@@ -1172,9 +1241,7 @@ restart:
 	newsk->sk_state		= TCP_ESTABLISHED;
 	newsk->sk_type		= sk->sk_type;
 	init_peercred(newsk);
-	newu = unix_sk(newsk);
 	newsk->sk_wq		= &newu->peer_wq;
-	otheru = unix_sk(other);
 
 	/* copy address information from listening to new sock*/
 	if (otheru->addr) {
@@ -1563,10 +1630,278 @@ out:
 }
 
 
+#ifdef CONFIG_UNIX_MULTICAST
+static int unix_mc_create(struct socket *sock, struct unix_mreq *mreq)
+{
+	struct sock *other;
+	int err;
+	unsigned hash;
+	int namelen;
+	struct unix_mcast_group *mcast_group;
+	int i;
+
+	if (mreq->address.sun_family != AF_UNIX ||
+	    mreq->address.sun_path[0] != '\0')
+		return -EINVAL;
+
+	err = unix_mkname(&mreq->address, sizeof(struct sockaddr_un), &hash);
+	if (err < 0)
+		return err;
+
+	namelen = err;
+	other = unix_find_other(sock_net(sock->sk), &mreq->address, namelen,
+				sock->type, hash, &err);
+	if (other) {
+		sock_put(other);
+		return -EADDRINUSE;
+	}
+
+	mcast_group = kmalloc(sizeof(struct unix_mcast_group), GFP_KERNEL);
+	if (!mcast_group)
+		return -ENOBUFS;
+
+	INIT_HLIST_HEAD(&mcast_group->mcast_members);
+	INIT_HLIST_HEAD(&mcast_group->mcast_dead_members);
+	INIT_HLIST_HEAD(&mcast_group->mcast_members_lists);
+	atomic_set(&mcast_group->mcast_members_cnt, 0);
+	atomic_set(&mcast_group->mcast_membership_generation, 1);
+	atomic_set(&mcast_group->refcnt, 1);
+	for (i = 0 ; i < MCAST_LOCK_CLASS_COUNT ; i++) {
+		spin_lock_init(&mcast_group->lock[i]);
+		lockdep_set_subclass(&mcast_group->lock[i], i);
+	}
+
+	err = sock->ops->bind(sock,
+		(struct sockaddr *)&mreq->address,
+		sizeof(struct sockaddr_un));
+	if (err < 0) {
+		kfree(mcast_group);
+		return err;
+	}
+
+	unix_state_lock(sock->sk);
+	unix_sk(sock->sk)->mcast_group = mcast_group;
+	unix_state_unlock(sock->sk);
+
+	return 0;
+}
+
+
+static int unix_mc_join(struct socket *sock, struct unix_mreq *mreq)
+{
+	struct unix_sock *u = unix_sk(sock->sk);
+	struct sock *other, *peer;
+	struct unix_mcast_group *group;
+	struct unix_mcast *node;
+	int err;
+	unsigned hash;
+	int namelen;
+
+	if (mreq->address.sun_family != AF_UNIX ||
+	    mreq->address.sun_path[0] != '\0')
+		return -EINVAL;
+
+	/* sockets which represent a group are not allowed to join another
+	 * group */
+	if (u->mcast_group)
+		return -EINVAL;
+
+	err = unix_autobind(sock);
+	if (err < 0)
+		return err;
+
+	err = unix_mkname(&mreq->address, sizeof(struct sockaddr_un), &hash);
+	if (err < 0)
+		return err;
+
+	namelen = err;
+	other = unix_find_other(sock_net(sock->sk), &mreq->address, namelen,
+				sock->type, hash, &err);
+	if (!other)
+		return -EINVAL;
+
+	group = unix_sk(other)->mcast_group;
+
+	if (!group) {
+		err = -EADDRINUSE;
+		goto sock_put_out;
+	}
+
+	node = kmalloc(sizeof(struct unix_mcast), GFP_KERNEL);
+	if (!node) {
+		err = -ENOMEM;
+		goto sock_put_out;
+	}
+	node->member = u;
+	node->group = group;
+	node->flags = mreq->flags;
+
+	if (sock->sk->sk_type == SOCK_SEQPACKET) {
+		peer = unix_peer_get(sock->sk);
+		if (peer) {
+			atomic_inc(&group->refcnt);
+			unix_sk(peer)->mcast_group = group;
+			sock_put(peer);
+		}
+	}
+
+	unix_state_lock(sock->sk);
+	unix_sk(sock->sk)->mcast_send_to_peer =
+		!!(mreq->flags & UNIX_MREQ_SEND_TO_PEER);
+	unix_sk(sock->sk)->mcast_drop_when_peer_full =
+		!!(mreq->flags & UNIX_MREQ_DROP_WHEN_FULL);
+	unix_state_unlock(sock->sk);
+
+	/* Keep a reference */
+	sock_hold(sock->sk);
+	atomic_inc(&group->refcnt);
+
+	spin_lock(&unix_multicast_lock);
+	hlist_add_head_rcu(&node->member_node,
+			   &group->mcast_members);
+	hlist_add_head_rcu(&node->subscription_node, &u->mcast_subscriptions);
+	atomic_inc(&group->mcast_members_cnt);
+	atomic_inc(&group->mcast_membership_generation);
+	spin_unlock(&unix_multicast_lock);
+
+	return 0;
+
+sock_put_out:
+	sock_put(other);
+	return err;
+}
+
+
+static int unix_mc_leave(struct socket *sock, struct unix_mreq *mreq)
+{
+	struct unix_sock *u = unix_sk(sock->sk);
+	struct sock *other;
+	struct unix_mcast_group *group;
+	struct unix_mcast *node;
+	struct hlist_node *pos;
+	int err;
+	unsigned hash;
+	int namelen;
+
+	if (mreq->address.sun_family != AF_UNIX ||
+	    mreq->address.sun_path[0] != '\0')
+		return -EINVAL;
+
+	err = unix_mkname(&mreq->address, sizeof(struct sockaddr_un), &hash);
+	if (err < 0)
+		return err;
+
+	namelen = err;
+	other = unix_find_other(sock_net(sock->sk), &mreq->address, namelen,
+				sock->type, hash, &err);
+	if (!other)
+		return -EINVAL;
+
+	group = unix_sk(other)->mcast_group;
+
+	if (!group) {
+		err = -EINVAL;
+		goto sock_put_out;
+	}
+
+	spin_lock(&unix_multicast_lock);
+
+	hlist_for_each_entry_rcu(node, pos, &u->mcast_subscriptions,
+			     subscription_node) {
+		if (node->group == group)
+			break;
+	}
+
+	if (!pos) {
+		spin_unlock(&unix_multicast_lock);
+		err = -EINVAL;
+		goto sock_put_out;
+	}
+
+	hlist_del_rcu(&node->member_node);
+	hlist_del_rcu(&node->subscription_node);
+	atomic_dec(&group->mcast_members_cnt);
+	atomic_inc(&group->mcast_membership_generation);
+	hlist_add_head_rcu(&node->member_dead_node,
+			   &group->mcast_dead_members);
+	spin_unlock(&unix_multicast_lock);
+
+	if (sock->sk->sk_type == SOCK_SEQPACKET) {
+		struct sock *peer = unix_peer_get(sock->sk);
+		if (peer) {
+			unix_sk(peer)->mcast_group = NULL;
+			atomic_dec(&group->refcnt);
+			sock_put(peer);
+		}
+	}
+
+	synchronize_rcu();
+
+	if (atomic_dec_and_test(&group->refcnt)) {
+		spin_lock(&unix_multicast_lock);
+		destroy_mcast_group(group);
+		spin_unlock(&unix_multicast_lock);
+	}
+
+	err = 0;
+
+	/* If the receiving queue of that socket was full, some writers on the
+	 * multicast group may be blocked */
+	wake_up_interruptible_sync_poll(&u->peer_wait,
+					POLLOUT | POLLWRNORM | POLLWRBAND);
+
+sock_put_out:
+	sock_put(other);
+	return err;
+}
+#endif
+
 static int unix_setsockopt(struct socket *sock, int level, int optname,
 			   char __user *optval, unsigned int optlen)
 {
+#ifdef CONFIG_UNIX_MULTICAST
+	struct unix_mreq mreq;
+	int err = 0;
+
+	if (level != SOL_UNIX)
+		return -ENOPROTOOPT;
+
+	switch (optname) {
+	case UNIX_CREATE_GROUP:
+	case UNIX_JOIN_GROUP:
+	case UNIX_LEAVE_GROUP:
+		if (optlen < sizeof(struct unix_mreq))
+			return -EINVAL;
+		if (copy_from_user(&mreq, optval, sizeof(struct unix_mreq)))
+			return -EFAULT;
+		break;
+
+	default:
+		break;
+	}
+
+	switch (optname) {
+	case UNIX_CREATE_GROUP:
+		err = unix_mc_create(sock, &mreq);
+		break;
+
+	case UNIX_JOIN_GROUP:
+		err = unix_mc_join(sock, &mreq);
+		break;
+
+	case UNIX_LEAVE_GROUP:
+		err = unix_mc_leave(sock, &mreq);
+		break;
+
+	default:
+		err = -ENOPROTOOPT;
+		break;
+	}
+
+	return err;
+#else
 	return -EOPNOTSUPP;
+#endif
 }
 
 
-- 
1.7.2.3

^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 5/8] af_unix: find the recipients of a multicast group
       [not found] <20110121143751.57b1453d@chocolatine.cbg.collabora.co.uk>
@ 2011-01-21 14:39   ` Alban Crequy
  2011-01-21 14:39   ` Alban Crequy
                     ` (6 subsequent siblings)
  7 siblings, 0 replies; 19+ messages in thread
From: Alban Crequy @ 2011-01-21 14:39 UTC (permalink / raw)
  To: David S. Miller, Eric Dumazet, Lennart Poettering, netdev,
	linux-doc, linux-kernel, Alban Crequy, Ian Molton
  Cc: Alban Crequy

unix_find_multicast_recipients() returns a list of recipients for the specific
multicast address. It checks the options UNIX_MREQ_SEND_TO_PEER and
UNIX_MREQ_LOOPBACK to get the right recipients.

The list of recipients is ordered and guaranteed not to have duplicates.

When the caller has finished with the list of recipients, it will call
up_sock_set() and the list can be reused by another sender.

Signed-off-by: Alban Crequy <alban.crequy@collabora.co.uk>
Reviewed-by: Ian Molton <ian.molton@collabora.co.uk>
---
 net/unix/af_unix.c |  259 +++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 files changed, 256 insertions(+), 3 deletions(-)

diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index f25c020..fe0d3bb 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -114,18 +114,84 @@
 #include <linux/mount.h>
 #include <net/checksum.h>
 #include <linux/security.h>
-
-static struct hlist_head unix_socket_table[UNIX_HASH_SIZE + 1];
-static DEFINE_SPINLOCK(unix_table_lock);
 #ifdef CONFIG_UNIX_MULTICAST
+#include <linux/sort.h>
+
 static DEFINE_SPINLOCK(unix_multicast_lock);
 #endif
+static struct hlist_head unix_socket_table[UNIX_HASH_SIZE + 1];
+static DEFINE_SPINLOCK(unix_table_lock);
 static atomic_long_t unix_nr_socks;
 
 #define unix_sockets_unbound	(&unix_socket_table[UNIX_HASH_SIZE])
 
 #define UNIX_ABSTRACT(sk)	(unix_sk(sk)->addr->hash != UNIX_HASH_SIZE)
 
+#ifdef CONFIG_UNIX_MULTICAST
+/* Array of sockets used in multicast deliveries */
+struct sock_item {
+	/* constant fields */
+	struct sock *s;
+	unsigned int flags;
+
+	/* fields reinitialized at every send */
+	struct sk_buff *skb;
+	unsigned int to_deliver:1;
+};
+
+struct sock_set {
+	/* struct sock_set is used by one sender at a time */
+	struct semaphore sem;
+	struct hlist_node list;
+	struct rcu_head rcu;
+	int generation;
+
+	/* the sender should consider only sockets from items[offset] to
+	 * item[cnt-1] */
+	int cnt;
+	int offset;
+	/* Bitfield of (struct unix_mcast_group)->lock spinlocks to take in
+	 * order to guarantee causal order of delivery */
+	u8 hash;
+	/* ordered list of sockets without duplicates. Cell zero is reserved
+	 * for sending a message to the accepted socket (SOCK_SEQPACKET only).
+	 */
+	struct sock_item items[0];
+};
+
+static void up_sock_set(struct sock_set *set)
+{
+	if ((set->offset == 0) && set->items[0].s) {
+		sock_put(set->items[0].s);
+		set->items[0].s = NULL;
+		set->items[0].skb = NULL;
+	}
+	up(&set->sem);
+}
+
+static void kfree_sock_set(struct sock_set *set)
+{
+	int i;
+	for (i = set->offset ; i < set->cnt ; i++) {
+		if (set->items[i].s)
+			sock_put(set->items[i].s);
+	}
+	kfree(set);
+}
+
+static int sock_item_compare(const void *_a, const void *_b)
+{
+	const struct sock_item *a = _a;
+	const struct sock_item *b = _b;
+	if (a->s > b->s)
+		return 1;
+	else if (a->s < b->s)
+		return -1;
+	else
+		return 0;
+}
+#endif
+
 #ifdef CONFIG_SECURITY_NETWORK
 static void unix_get_secdata(struct scm_cookie *scm, struct sk_buff *skb)
 {
@@ -379,6 +445,7 @@ static void
 destroy_mcast_group(struct unix_mcast_group *group)
 {
 	struct unix_mcast *node;
+	struct sock_set *set;
 	struct hlist_node *pos;
 	struct hlist_node *pos_tmp;
 
@@ -392,6 +459,12 @@ destroy_mcast_group(struct unix_mcast_group *group)
 		sock_put(&node->member->sk);
 		kfree(node);
 	}
+	hlist_for_each_entry_safe(set, pos, pos_tmp,
+				  &group->mcast_members_lists,
+				  list) {
+		hlist_del_rcu(&set->list);
+		kfree_sock_set(set);
+	}
 	kfree(group);
 }
 #endif
@@ -851,6 +924,186 @@ fail:
 	return NULL;
 }
 
+#ifdef CONFIG_UNIX_MULTICAST
+static int unix_find_multicast_members(struct sock_set *set,
+				       int recipient_cnt,
+				       struct hlist_head *list)
+{
+	struct unix_mcast *node;
+	struct hlist_node *pos;
+
+	hlist_for_each_entry_rcu(node, pos, list,
+			     member_node) {
+		struct sock *s;
+
+		if (set->cnt + 1 > recipient_cnt)
+			return -ENOMEM;
+
+		s = &node->member->sk;
+		sock_hold(s);
+		set->items[set->cnt].s = s;
+		set->items[set->cnt].flags = node->flags;
+		set->cnt++;
+
+		set->hash |= 1 << ((((int)s) >> 6) & 0x07);
+	}
+
+	return 0;
+}
+
+void sock_set_reclaim(struct rcu_head *rp)
+{
+	struct sock_set *set = container_of(rp, struct sock_set, rcu);
+	kfree_sock_set(set);
+}
+
+static struct sock_set *unix_find_multicast_recipients(struct sock *sender,
+				struct unix_mcast_group *group,
+				int *err)
+{
+	struct sock_set *set = NULL; /* fake GCC */
+	struct sock_set *del_set;
+	struct hlist_node *pos;
+	int recipient_cnt;
+	int generation;
+	int i;
+
+	BUG_ON(sender == NULL);
+	BUG_ON(group == NULL);
+
+	/* Find an available set if any */
+	generation = atomic_read(&group->mcast_membership_generation);
+	rcu_read_lock();
+	hlist_for_each_entry_rcu(set, pos, &group->mcast_members_lists,
+			     list) {
+		if (down_trylock(&set->sem)) {
+			/* the set is being used by someone else */
+			continue;
+		}
+		if (set->generation == generation) {
+			/* the set is still valid, use it */
+			break;
+		}
+		/* The set is outdated. It will be removed from the RCU list
+		 * soon but not in this lockless RCU read */
+		up(&set->sem);
+	}
+	rcu_read_unlock();
+	if (pos)
+		goto list_found;
+
+	/* We cannot allocate in the spin lock. First, count the recipients */
+try_again:
+	generation = atomic_read(&group->mcast_membership_generation);
+	recipient_cnt = atomic_read(&group->mcast_members_cnt);
+
+	/* Allocate for the set and hope the number of recipients does not
+	 * change while the lock is released. If it changes, we have to try
+	 * again... We allocate a bit more than needed, so if a _few_ members
+	 * are added in a multicast group meanwhile, we don't always need to
+	 * try again. */
+	recipient_cnt += 5;
+
+	set = kmalloc(sizeof(struct sock_set)
+		      + sizeof(struct sock_item) * recipient_cnt,
+	    GFP_KERNEL);
+	if (!set) {
+		*err = -ENOMEM;
+		return NULL;
+	}
+	sema_init(&set->sem, 0);
+	set->cnt = 1;
+	set->offset = 1;
+	set->generation = generation;
+	set->hash = 0;
+
+	rcu_read_lock();
+	if (unix_find_multicast_members(set, recipient_cnt,
+			&group->mcast_members)) {
+		rcu_read_unlock();
+		kfree_sock_set(set);
+		goto try_again;
+	}
+	rcu_read_unlock();
+
+	/* Keep the array ordered to prevent deadlocks when locking the
+	 * receiving queues. The ordering is:
+	 * - First, the accepted socket (SOCK_SEQPACKET only)
+	 * - Then, the member sockets ordered by memory address
+	 * The accepted socket cannot be member of a multicast group.
+	 */
+	sort(set->items + 1, set->cnt - 1, sizeof(struct sock_item),
+	     sock_item_compare, NULL);
+	/* Avoid duplicates */
+	for (i = 2 ; i < set->cnt ; i++) {
+		if (set->items[i].s == set->items[i - 1].s) {
+			sock_put(set->items[i - 1].s);
+			set->items[i - 1].s = NULL;
+		}
+	}
+
+	if (generation != atomic_read(&group->mcast_membership_generation)) {
+		kfree_sock_set(set);
+		goto try_again;
+	}
+
+	/* Take the lock to insert the new list but take the opportunity to do
+	 * some garbage collection on outdated lists */
+	spin_lock(&unix_multicast_lock);
+	hlist_for_each_entry_rcu(del_set, pos, &group->mcast_members_lists,
+			     list) {
+		if (down_trylock(&del_set->sem)) {
+			/* the list is being used by someone else */
+			continue;
+		}
+		if (del_set->generation < generation) {
+			hlist_del_rcu(&del_set->list);
+			call_rcu(&del_set->rcu, sock_set_reclaim);
+		}
+		up(&del_set->sem);
+	}
+	hlist_add_head_rcu(&set->list,
+			   &group->mcast_members_lists);
+	spin_unlock(&unix_multicast_lock);
+
+list_found:
+	/* List found. Initialize the first item. */
+	if (sender->sk_type == SOCK_SEQPACKET
+	    && unix_peer(sender)
+	    && unix_sk(sender)->mcast_send_to_peer) {
+		set->offset = 0;
+		sock_hold(unix_peer(sender));
+		set->items[0].s = unix_peer(sender);
+		set->items[0].skb = NULL;
+		set->items[0].to_deliver = 1;
+		set->items[0].flags =
+			unix_sk(sender)->mcast_drop_when_peer_full
+			? UNIX_MREQ_DROP_WHEN_FULL : 0;
+	} else {
+		set->items[0].s = NULL;
+		set->items[0].skb = NULL;
+		set->items[0].to_deliver = 0;
+		set->offset = 1;
+	}
+
+	/* Initialize the other items. */
+	for (i = 1 ; i < set->cnt ; i++) {
+		set->items[i].skb = NULL;
+		if (set->items[i].s == NULL) {
+			set->items[i].to_deliver = 0;
+			continue;
+		}
+		if (set->items[i].flags & UNIX_MREQ_LOOPBACK
+		    || sender != set->items[i].s)
+			set->items[i].to_deliver = 1;
+		else
+			set->items[i].to_deliver = 0;
+	}
+
+	return set;
+}
+#endif
+
 
 static int unix_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len)
 {
-- 
1.7.2.3


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 5/8] af_unix: find the recipients of a multicast group
@ 2011-01-21 14:39   ` Alban Crequy
  0 siblings, 0 replies; 19+ messages in thread
From: Alban Crequy @ 2011-01-21 14:39 UTC (permalink / raw)
  To: David S. Miller, Eric Dumazet, Lennart Poettering, netdev,
	linux-doc, linux-
  Cc: Alban Crequy

unix_find_multicast_recipients() returns a list of recipients for the specific
multicast address. It checks the options UNIX_MREQ_SEND_TO_PEER and
UNIX_MREQ_LOOPBACK to get the right recipients.

The list of recipients is ordered and guaranteed not to have duplicates.

When the caller has finished with the list of recipients, it will call
up_sock_set() and the list can be reused by another sender.

Signed-off-by: Alban Crequy <alban.crequy@collabora.co.uk>
Reviewed-by: Ian Molton <ian.molton@collabora.co.uk>
---
 net/unix/af_unix.c |  259 +++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 files changed, 256 insertions(+), 3 deletions(-)

diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index f25c020..fe0d3bb 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -114,18 +114,84 @@
 #include <linux/mount.h>
 #include <net/checksum.h>
 #include <linux/security.h>
-
-static struct hlist_head unix_socket_table[UNIX_HASH_SIZE + 1];
-static DEFINE_SPINLOCK(unix_table_lock);
 #ifdef CONFIG_UNIX_MULTICAST
+#include <linux/sort.h>
+
 static DEFINE_SPINLOCK(unix_multicast_lock);
 #endif
+static struct hlist_head unix_socket_table[UNIX_HASH_SIZE + 1];
+static DEFINE_SPINLOCK(unix_table_lock);
 static atomic_long_t unix_nr_socks;
 
 #define unix_sockets_unbound	(&unix_socket_table[UNIX_HASH_SIZE])
 
 #define UNIX_ABSTRACT(sk)	(unix_sk(sk)->addr->hash != UNIX_HASH_SIZE)
 
+#ifdef CONFIG_UNIX_MULTICAST
+/* Array of sockets used in multicast deliveries */
+struct sock_item {
+	/* constant fields */
+	struct sock *s;
+	unsigned int flags;
+
+	/* fields reinitialized at every send */
+	struct sk_buff *skb;
+	unsigned int to_deliver:1;
+};
+
+struct sock_set {
+	/* struct sock_set is used by one sender at a time */
+	struct semaphore sem;
+	struct hlist_node list;
+	struct rcu_head rcu;
+	int generation;
+
+	/* the sender should consider only sockets from items[offset] to
+	 * item[cnt-1] */
+	int cnt;
+	int offset;
+	/* Bitfield of (struct unix_mcast_group)->lock spinlocks to take in
+	 * order to guarantee causal order of delivery */
+	u8 hash;
+	/* ordered list of sockets without duplicates. Cell zero is reserved
+	 * for sending a message to the accepted socket (SOCK_SEQPACKET only).
+	 */
+	struct sock_item items[0];
+};
+
+static void up_sock_set(struct sock_set *set)
+{
+	if ((set->offset == 0) && set->items[0].s) {
+		sock_put(set->items[0].s);
+		set->items[0].s = NULL;
+		set->items[0].skb = NULL;
+	}
+	up(&set->sem);
+}
+
+static void kfree_sock_set(struct sock_set *set)
+{
+	int i;
+	for (i = set->offset ; i < set->cnt ; i++) {
+		if (set->items[i].s)
+			sock_put(set->items[i].s);
+	}
+	kfree(set);
+}
+
+static int sock_item_compare(const void *_a, const void *_b)
+{
+	const struct sock_item *a = _a;
+	const struct sock_item *b = _b;
+	if (a->s > b->s)
+		return 1;
+	else if (a->s < b->s)
+		return -1;
+	else
+		return 0;
+}
+#endif
+
 #ifdef CONFIG_SECURITY_NETWORK
 static void unix_get_secdata(struct scm_cookie *scm, struct sk_buff *skb)
 {
@@ -379,6 +445,7 @@ static void
 destroy_mcast_group(struct unix_mcast_group *group)
 {
 	struct unix_mcast *node;
+	struct sock_set *set;
 	struct hlist_node *pos;
 	struct hlist_node *pos_tmp;
 
@@ -392,6 +459,12 @@ destroy_mcast_group(struct unix_mcast_group *group)
 		sock_put(&node->member->sk);
 		kfree(node);
 	}
+	hlist_for_each_entry_safe(set, pos, pos_tmp,
+				  &group->mcast_members_lists,
+				  list) {
+		hlist_del_rcu(&set->list);
+		kfree_sock_set(set);
+	}
 	kfree(group);
 }
 #endif
@@ -851,6 +924,186 @@ fail:
 	return NULL;
 }
 
+#ifdef CONFIG_UNIX_MULTICAST
+static int unix_find_multicast_members(struct sock_set *set,
+				       int recipient_cnt,
+				       struct hlist_head *list)
+{
+	struct unix_mcast *node;
+	struct hlist_node *pos;
+
+	hlist_for_each_entry_rcu(node, pos, list,
+			     member_node) {
+		struct sock *s;
+
+		if (set->cnt + 1 > recipient_cnt)
+			return -ENOMEM;
+
+		s = &node->member->sk;
+		sock_hold(s);
+		set->items[set->cnt].s = s;
+		set->items[set->cnt].flags = node->flags;
+		set->cnt++;
+
+		set->hash |= 1 << ((((int)s) >> 6) & 0x07);
+	}
+
+	return 0;
+}
+
+void sock_set_reclaim(struct rcu_head *rp)
+{
+	struct sock_set *set = container_of(rp, struct sock_set, rcu);
+	kfree_sock_set(set);
+}
+
+static struct sock_set *unix_find_multicast_recipients(struct sock *sender,
+				struct unix_mcast_group *group,
+				int *err)
+{
+	struct sock_set *set = NULL; /* fake GCC */
+	struct sock_set *del_set;
+	struct hlist_node *pos;
+	int recipient_cnt;
+	int generation;
+	int i;
+
+	BUG_ON(sender == NULL);
+	BUG_ON(group == NULL);
+
+	/* Find an available set if any */
+	generation = atomic_read(&group->mcast_membership_generation);
+	rcu_read_lock();
+	hlist_for_each_entry_rcu(set, pos, &group->mcast_members_lists,
+			     list) {
+		if (down_trylock(&set->sem)) {
+			/* the set is being used by someone else */
+			continue;
+		}
+		if (set->generation == generation) {
+			/* the set is still valid, use it */
+			break;
+		}
+		/* The set is outdated. It will be removed from the RCU list
+		 * soon but not in this lockless RCU read */
+		up(&set->sem);
+	}
+	rcu_read_unlock();
+	if (pos)
+		goto list_found;
+
+	/* We cannot allocate in the spin lock. First, count the recipients */
+try_again:
+	generation = atomic_read(&group->mcast_membership_generation);
+	recipient_cnt = atomic_read(&group->mcast_members_cnt);
+
+	/* Allocate for the set and hope the number of recipients does not
+	 * change while the lock is released. If it changes, we have to try
+	 * again... We allocate a bit more than needed, so if a _few_ members
+	 * are added in a multicast group meanwhile, we don't always need to
+	 * try again. */
+	recipient_cnt += 5;
+
+	set = kmalloc(sizeof(struct sock_set)
+		      + sizeof(struct sock_item) * recipient_cnt,
+	    GFP_KERNEL);
+	if (!set) {
+		*err = -ENOMEM;
+		return NULL;
+	}
+	sema_init(&set->sem, 0);
+	set->cnt = 1;
+	set->offset = 1;
+	set->generation = generation;
+	set->hash = 0;
+
+	rcu_read_lock();
+	if (unix_find_multicast_members(set, recipient_cnt,
+			&group->mcast_members)) {
+		rcu_read_unlock();
+		kfree_sock_set(set);
+		goto try_again;
+	}
+	rcu_read_unlock();
+
+	/* Keep the array ordered to prevent deadlocks when locking the
+	 * receiving queues. The ordering is:
+	 * - First, the accepted socket (SOCK_SEQPACKET only)
+	 * - Then, the member sockets ordered by memory address
+	 * The accepted socket cannot be member of a multicast group.
+	 */
+	sort(set->items + 1, set->cnt - 1, sizeof(struct sock_item),
+	     sock_item_compare, NULL);
+	/* Avoid duplicates */
+	for (i = 2 ; i < set->cnt ; i++) {
+		if (set->items[i].s == set->items[i - 1].s) {
+			sock_put(set->items[i - 1].s);
+			set->items[i - 1].s = NULL;
+		}
+	}
+
+	if (generation != atomic_read(&group->mcast_membership_generation)) {
+		kfree_sock_set(set);
+		goto try_again;
+	}
+
+	/* Take the lock to insert the new list but take the opportunity to do
+	 * some garbage collection on outdated lists */
+	spin_lock(&unix_multicast_lock);
+	hlist_for_each_entry_rcu(del_set, pos, &group->mcast_members_lists,
+			     list) {
+		if (down_trylock(&del_set->sem)) {
+			/* the list is being used by someone else */
+			continue;
+		}
+		if (del_set->generation < generation) {
+			hlist_del_rcu(&del_set->list);
+			call_rcu(&del_set->rcu, sock_set_reclaim);
+		}
+		up(&del_set->sem);
+	}
+	hlist_add_head_rcu(&set->list,
+			   &group->mcast_members_lists);
+	spin_unlock(&unix_multicast_lock);
+
+list_found:
+	/* List found. Initialize the first item. */
+	if (sender->sk_type == SOCK_SEQPACKET
+	    && unix_peer(sender)
+	    && unix_sk(sender)->mcast_send_to_peer) {
+		set->offset = 0;
+		sock_hold(unix_peer(sender));
+		set->items[0].s = unix_peer(sender);
+		set->items[0].skb = NULL;
+		set->items[0].to_deliver = 1;
+		set->items[0].flags =
+			unix_sk(sender)->mcast_drop_when_peer_full
+			? UNIX_MREQ_DROP_WHEN_FULL : 0;
+	} else {
+		set->items[0].s = NULL;
+		set->items[0].skb = NULL;
+		set->items[0].to_deliver = 0;
+		set->offset = 1;
+	}
+
+	/* Initialize the other items. */
+	for (i = 1 ; i < set->cnt ; i++) {
+		set->items[i].skb = NULL;
+		if (set->items[i].s == NULL) {
+			set->items[i].to_deliver = 0;
+			continue;
+		}
+		if (set->items[i].flags & UNIX_MREQ_LOOPBACK
+		    || sender != set->items[i].s)
+			set->items[i].to_deliver = 1;
+		else
+			set->items[i].to_deliver = 0;
+	}
+
+	return set;
+}
+#endif
+
 
 static int unix_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len)
 {
-- 
1.7.2.3

^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 6/8] af_unix: Deliver message to several recipients in case of multicast
       [not found] <20110121143751.57b1453d@chocolatine.cbg.collabora.co.uk>
@ 2011-01-21 14:39   ` Alban Crequy
  2011-01-21 14:39   ` Alban Crequy
                     ` (6 subsequent siblings)
  7 siblings, 0 replies; 19+ messages in thread
From: Alban Crequy @ 2011-01-21 14:39 UTC (permalink / raw)
  To: David S. Miller, Eric Dumazet, Lennart Poettering, netdev,
	linux-doc, linux-kernel, Alban Crequy, Ian Molton
  Cc: Alban Crequy, Ian Molton

unix_dgram_sendmsg() implements the delivery both for SOCK_DGRAM and
SOCK_SEQPACKET unix sockets.

The delivery is done in an atomic way; either the message is delivered to all
recipients or none, even in case of interruptions or errors.

Signed-off-by: Alban Crequy <alban.crequy@collabora.co.uk>
Signed-off-by: Ian Molton <ian.molton@collabora.co.uk>
---
 net/unix/af_unix.c |  242 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 242 insertions(+), 0 deletions(-)

diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index fe0d3bb..4147d64 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -1715,6 +1715,210 @@ static int unix_scm_to_skb(struct scm_cookie *scm, struct sk_buff *skb, bool sen
 	return err;
 }
 
+#ifdef CONFIG_UNIX_MULTICAST
+static void kfree_skb_sock_set(struct sock_set *set)
+{
+	int i;
+	for (i = set->offset ; i < set->cnt ; i++) {
+		if (set->items[i].skb) {
+			kfree_skb(set->items[i].skb);
+			set->items[i].skb = NULL;
+		}
+	}
+}
+
+static void unix_mcast_lock(struct unix_mcast_group *group,
+			    struct sock_set *set)
+{
+	int i;
+	for (i = 0 ; i < MCAST_LOCK_CLASS_COUNT ; i++) {
+		if (set->hash & (1 << i))
+			spin_lock_nested(&group->lock[i], i);
+	}
+}
+
+static void unix_mcast_unlock(struct unix_mcast_group *group,
+			      struct sock_set *set)
+{
+	int i;
+	for (i = MCAST_LOCK_CLASS_COUNT - 1 ; i >= 0 ; i--) {
+		if (set->hash & (1 << i))
+			spin_unlock(&group->lock[i]);
+	}
+}
+
+
+static int unix_dgram_sendmsg_multicast(struct sock_iocb *siocb,
+					struct sock *sk,
+					struct sk_buff *skb,
+					struct unix_mcast_group *group,
+					struct sock_set *others_set,
+					size_t len,
+					int max_level,
+					long timeo)
+{
+	int err;
+	int i;
+
+	BUG_ON(!others_set);
+
+restart:
+	for (i = others_set->offset ; i < others_set->cnt ; i++) {
+		struct sock *cur = others_set->items[i].s;
+		unsigned int pkt_len;
+		struct sk_filter *filter;
+
+		if (!others_set->items[i].to_deliver)
+			continue;
+
+		BUG_ON(others_set->items[i].skb);
+		BUG_ON(cur == NULL);
+
+		rcu_read_lock();
+		filter = rcu_dereference(cur->sk_filter);
+		if (filter)
+			pkt_len = sk_run_filter(skb, filter->insns);
+		else
+			pkt_len = 0xffffffff;
+		rcu_read_unlock();
+
+		if (pkt_len == 0) {
+			others_set->items[i].to_deliver = 0;
+			continue;
+		}
+
+		others_set->items[i].skb = skb_clone(skb, GFP_KERNEL);
+		if (!others_set->items[i].skb) {
+			kfree_skb_sock_set(others_set);
+			err = -ENOMEM;
+			goto out_free;
+		}
+		skb_set_owner_w(others_set->items[i].skb, sk);
+		err = unix_scm_to_skb(siocb->scm, others_set->items[i].skb,
+				      true);
+		if (err < 0)
+			goto out_free;
+		unix_get_secdata(siocb->scm, others_set->items[i].skb);
+		pskb_trim(others_set->items[i].skb, pkt_len);
+	}
+
+	for (i = others_set->offset ; i < others_set->cnt ; i++) {
+		struct sock *cur = others_set->items[i].s;
+
+		if (!others_set->items[i].to_deliver)
+			continue;
+
+		unix_state_lock(cur);
+
+		if (cur->sk_shutdown & RCV_SHUTDOWN) {
+			unix_state_unlock(cur);
+			kfree_skb(others_set->items[i].skb);
+			others_set->items[i].skb = NULL;
+				others_set->items[i].to_deliver = 0;
+				continue;
+		}
+
+		if (sk->sk_type != SOCK_SEQPACKET) {
+			err = security_unix_may_send(sk->sk_socket,
+						     cur->sk_socket);
+			if (err) {
+				unix_state_unlock(cur);
+				kfree_skb(others_set->items[i].skb);
+				others_set->items[i].skb = NULL;
+					others_set->items[i].to_deliver = 0;
+					continue;
+			}
+		}
+
+		if (unix_peer(cur) != sk && unix_recvq_full(cur)) {
+			kfree_skb(others_set->items[i].skb);
+			others_set->items[i].skb = NULL;
+
+			if (others_set->items[i].flags
+					& UNIX_MREQ_DROP_WHEN_FULL) {
+				/* Drop the skbs and continue */
+				unix_state_unlock(cur);
+				others_set->items[i].to_deliver = 0;
+				continue;
+			} else {
+				if (!timeo) {
+					unix_state_unlock(cur);
+					err = -EAGAIN;
+					goto out_free;
+				}
+
+				timeo = unix_wait_for_peer(cur, timeo);
+
+				err = sock_intr_errno(timeo);
+				if (signal_pending(current))
+					goto out_free;
+
+				kfree_skb_sock_set(others_set);
+				goto restart;
+			}
+		}
+		unix_state_unlock(cur);
+	}
+
+	unix_mcast_lock(group, others_set);
+	for (i = others_set->offset ; i < others_set->cnt ; i++) {
+		struct sock *cur = others_set->items[i].s;
+
+		if (!others_set->items[i].to_deliver)
+			continue;
+
+		BUG_ON(cur == NULL);
+		BUG_ON(others_set->items[i].skb == NULL);
+
+		unix_state_lock(cur);
+
+		if (sock_flag(cur, SOCK_DEAD)) {
+			unix_state_unlock(cur);
+
+			kfree_skb(others_set->items[i].skb);
+			others_set->items[i].skb = NULL;
+			others_set->items[i].to_deliver = 0;
+			continue;
+		}
+
+		if (sock_flag(cur, SOCK_RCVTSTAMP))
+			__net_timestamp(others_set->items[i].skb);
+
+		skb_queue_tail(&cur->sk_receive_queue,
+			       others_set->items[i].skb);
+		others_set->items[i].skb = NULL;
+		if (max_level > unix_sk(cur)->recursion_level)
+			unix_sk(cur)->recursion_level = max_level;
+
+		unix_state_unlock(cur);
+	}
+	unix_mcast_unlock(group, others_set);
+
+	for (i = others_set->offset ; i < others_set->cnt ; i++) {
+		struct sock *cur = others_set->items[i].s;
+
+		if (!others_set->items[i].to_deliver)
+			continue;
+
+		cur->sk_data_ready(cur, len);
+	}
+
+	kfree_skb(skb);
+	scm_destroy(siocb->scm);
+	up_sock_set(others_set);
+	return len;
+
+out_free:
+	kfree_skb(skb);
+	if (others_set) {
+		kfree_skb_sock_set(others_set);
+		up_sock_set(others_set);
+	}
+	return err;
+}
+#endif
+
+
 /*
  *	Send AF_UNIX data.
  */
@@ -1735,6 +1939,10 @@ static int unix_dgram_sendmsg(struct kiocb *kiocb, struct socket *sock,
 	long timeo;
 	struct scm_cookie tmp_scm;
 	int max_level;
+#ifdef CONFIG_UNIX_MULTICAST
+	struct unix_mcast_group *group = NULL;
+	struct sock_set *others_set = NULL;
+#endif
 
 	if (NULL == siocb->scm)
 		siocb->scm = &tmp_scm;
@@ -1756,8 +1964,20 @@ static int unix_dgram_sendmsg(struct kiocb *kiocb, struct socket *sock,
 		sunaddr = NULL;
 		err = -ENOTCONN;
 		other = unix_peer_get(sk);
+
 		if (!other)
 			goto out;
+
+#ifdef CONFIG_UNIX_MULTICAST
+		group = unix_sk(other)->mcast_group;
+		if (group) {
+			others_set = unix_find_multicast_recipients(sk,
+				group, &err);
+
+			if (!others_set)
+				goto out;
+		}
+#endif
 	}
 
 	if (test_bit(SOCK_PASSCRED, &sock->flags) && !u->addr
@@ -1795,6 +2015,28 @@ restart:
 					hash, &err);
 		if (other == NULL)
 			goto out_free;
+
+#ifdef CONFIG_UNIX_MULTICAST
+		group = unix_sk(other)->mcast_group;
+		if (group) {
+			others_set = unix_find_multicast_recipients(sk,
+				group, &err);
+
+			sock_put(other);
+			other = NULL;
+
+			if (!others_set)
+				goto out;
+		}
+	}
+
+	if (group) {
+		err = unix_dgram_sendmsg_multicast(siocb, sk, skb, group,
+			others_set, len, max_level, timeo);
+		if (err < 0)
+			goto out;
+		return err;
+#endif
 	}
 
 	if (sk_filter(other, skb) < 0) {
-- 
1.7.2.3


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 6/8] af_unix: Deliver message to several recipients in case of multicast
@ 2011-01-21 14:39   ` Alban Crequy
  0 siblings, 0 replies; 19+ messages in thread
From: Alban Crequy @ 2011-01-21 14:39 UTC (permalink / raw)
  To: David S. Miller, Eric Dumazet, Lennart Poettering, netdev,
	linux-doc, linux-
  Cc: Alban Crequy, Ian Molton

unix_dgram_sendmsg() implements the delivery both for SOCK_DGRAM and
SOCK_SEQPACKET unix sockets.

The delivery is done in an atomic way; either the message is delivered to all
recipients or none, even in case of interruptions or errors.

Signed-off-by: Alban Crequy <alban.crequy@collabora.co.uk>
Signed-off-by: Ian Molton <ian.molton@collabora.co.uk>
---
 net/unix/af_unix.c |  242 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 242 insertions(+), 0 deletions(-)

diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index fe0d3bb..4147d64 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -1715,6 +1715,210 @@ static int unix_scm_to_skb(struct scm_cookie *scm, struct sk_buff *skb, bool sen
 	return err;
 }
 
+#ifdef CONFIG_UNIX_MULTICAST
+static void kfree_skb_sock_set(struct sock_set *set)
+{
+	int i;
+	for (i = set->offset ; i < set->cnt ; i++) {
+		if (set->items[i].skb) {
+			kfree_skb(set->items[i].skb);
+			set->items[i].skb = NULL;
+		}
+	}
+}
+
+static void unix_mcast_lock(struct unix_mcast_group *group,
+			    struct sock_set *set)
+{
+	int i;
+	for (i = 0 ; i < MCAST_LOCK_CLASS_COUNT ; i++) {
+		if (set->hash & (1 << i))
+			spin_lock_nested(&group->lock[i], i);
+	}
+}
+
+static void unix_mcast_unlock(struct unix_mcast_group *group,
+			      struct sock_set *set)
+{
+	int i;
+	for (i = MCAST_LOCK_CLASS_COUNT - 1 ; i >= 0 ; i--) {
+		if (set->hash & (1 << i))
+			spin_unlock(&group->lock[i]);
+	}
+}
+
+
+static int unix_dgram_sendmsg_multicast(struct sock_iocb *siocb,
+					struct sock *sk,
+					struct sk_buff *skb,
+					struct unix_mcast_group *group,
+					struct sock_set *others_set,
+					size_t len,
+					int max_level,
+					long timeo)
+{
+	int err;
+	int i;
+
+	BUG_ON(!others_set);
+
+restart:
+	for (i = others_set->offset ; i < others_set->cnt ; i++) {
+		struct sock *cur = others_set->items[i].s;
+		unsigned int pkt_len;
+		struct sk_filter *filter;
+
+		if (!others_set->items[i].to_deliver)
+			continue;
+
+		BUG_ON(others_set->items[i].skb);
+		BUG_ON(cur == NULL);
+
+		rcu_read_lock();
+		filter = rcu_dereference(cur->sk_filter);
+		if (filter)
+			pkt_len = sk_run_filter(skb, filter->insns);
+		else
+			pkt_len = 0xffffffff;
+		rcu_read_unlock();
+
+		if (pkt_len == 0) {
+			others_set->items[i].to_deliver = 0;
+			continue;
+		}
+
+		others_set->items[i].skb = skb_clone(skb, GFP_KERNEL);
+		if (!others_set->items[i].skb) {
+			kfree_skb_sock_set(others_set);
+			err = -ENOMEM;
+			goto out_free;
+		}
+		skb_set_owner_w(others_set->items[i].skb, sk);
+		err = unix_scm_to_skb(siocb->scm, others_set->items[i].skb,
+				      true);
+		if (err < 0)
+			goto out_free;
+		unix_get_secdata(siocb->scm, others_set->items[i].skb);
+		pskb_trim(others_set->items[i].skb, pkt_len);
+	}
+
+	for (i = others_set->offset ; i < others_set->cnt ; i++) {
+		struct sock *cur = others_set->items[i].s;
+
+		if (!others_set->items[i].to_deliver)
+			continue;
+
+		unix_state_lock(cur);
+
+		if (cur->sk_shutdown & RCV_SHUTDOWN) {
+			unix_state_unlock(cur);
+			kfree_skb(others_set->items[i].skb);
+			others_set->items[i].skb = NULL;
+				others_set->items[i].to_deliver = 0;
+				continue;
+		}
+
+		if (sk->sk_type != SOCK_SEQPACKET) {
+			err = security_unix_may_send(sk->sk_socket,
+						     cur->sk_socket);
+			if (err) {
+				unix_state_unlock(cur);
+				kfree_skb(others_set->items[i].skb);
+				others_set->items[i].skb = NULL;
+					others_set->items[i].to_deliver = 0;
+					continue;
+			}
+		}
+
+		if (unix_peer(cur) != sk && unix_recvq_full(cur)) {
+			kfree_skb(others_set->items[i].skb);
+			others_set->items[i].skb = NULL;
+
+			if (others_set->items[i].flags
+					& UNIX_MREQ_DROP_WHEN_FULL) {
+				/* Drop the skbs and continue */
+				unix_state_unlock(cur);
+				others_set->items[i].to_deliver = 0;
+				continue;
+			} else {
+				if (!timeo) {
+					unix_state_unlock(cur);
+					err = -EAGAIN;
+					goto out_free;
+				}
+
+				timeo = unix_wait_for_peer(cur, timeo);
+
+				err = sock_intr_errno(timeo);
+				if (signal_pending(current))
+					goto out_free;
+
+				kfree_skb_sock_set(others_set);
+				goto restart;
+			}
+		}
+		unix_state_unlock(cur);
+	}
+
+	unix_mcast_lock(group, others_set);
+	for (i = others_set->offset ; i < others_set->cnt ; i++) {
+		struct sock *cur = others_set->items[i].s;
+
+		if (!others_set->items[i].to_deliver)
+			continue;
+
+		BUG_ON(cur == NULL);
+		BUG_ON(others_set->items[i].skb == NULL);
+
+		unix_state_lock(cur);
+
+		if (sock_flag(cur, SOCK_DEAD)) {
+			unix_state_unlock(cur);
+
+			kfree_skb(others_set->items[i].skb);
+			others_set->items[i].skb = NULL;
+			others_set->items[i].to_deliver = 0;
+			continue;
+		}
+
+		if (sock_flag(cur, SOCK_RCVTSTAMP))
+			__net_timestamp(others_set->items[i].skb);
+
+		skb_queue_tail(&cur->sk_receive_queue,
+			       others_set->items[i].skb);
+		others_set->items[i].skb = NULL;
+		if (max_level > unix_sk(cur)->recursion_level)
+			unix_sk(cur)->recursion_level = max_level;
+
+		unix_state_unlock(cur);
+	}
+	unix_mcast_unlock(group, others_set);
+
+	for (i = others_set->offset ; i < others_set->cnt ; i++) {
+		struct sock *cur = others_set->items[i].s;
+
+		if (!others_set->items[i].to_deliver)
+			continue;
+
+		cur->sk_data_ready(cur, len);
+	}
+
+	kfree_skb(skb);
+	scm_destroy(siocb->scm);
+	up_sock_set(others_set);
+	return len;
+
+out_free:
+	kfree_skb(skb);
+	if (others_set) {
+		kfree_skb_sock_set(others_set);
+		up_sock_set(others_set);
+	}
+	return err;
+}
+#endif
+
+
 /*
  *	Send AF_UNIX data.
  */
@@ -1735,6 +1939,10 @@ static int unix_dgram_sendmsg(struct kiocb *kiocb, struct socket *sock,
 	long timeo;
 	struct scm_cookie tmp_scm;
 	int max_level;
+#ifdef CONFIG_UNIX_MULTICAST
+	struct unix_mcast_group *group = NULL;
+	struct sock_set *others_set = NULL;
+#endif
 
 	if (NULL == siocb->scm)
 		siocb->scm = &tmp_scm;
@@ -1756,8 +1964,20 @@ static int unix_dgram_sendmsg(struct kiocb *kiocb, struct socket *sock,
 		sunaddr = NULL;
 		err = -ENOTCONN;
 		other = unix_peer_get(sk);
+
 		if (!other)
 			goto out;
+
+#ifdef CONFIG_UNIX_MULTICAST
+		group = unix_sk(other)->mcast_group;
+		if (group) {
+			others_set = unix_find_multicast_recipients(sk,
+				group, &err);
+
+			if (!others_set)
+				goto out;
+		}
+#endif
 	}
 
 	if (test_bit(SOCK_PASSCRED, &sock->flags) && !u->addr
@@ -1795,6 +2015,28 @@ restart:
 					hash, &err);
 		if (other == NULL)
 			goto out_free;
+
+#ifdef CONFIG_UNIX_MULTICAST
+		group = unix_sk(other)->mcast_group;
+		if (group) {
+			others_set = unix_find_multicast_recipients(sk,
+				group, &err);
+
+			sock_put(other);
+			other = NULL;
+
+			if (!others_set)
+				goto out;
+		}
+	}
+
+	if (group) {
+		err = unix_dgram_sendmsg_multicast(siocb, sk, skb, group,
+			others_set, len, max_level, timeo);
+		if (err < 0)
+			goto out;
+		return err;
+#endif
 	}
 
 	if (sk_filter(other, skb) < 0) {
-- 
1.7.2.3

^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 7/8] af_unix: implement poll(POLLOUT) for multicast sockets
       [not found] <20110121143751.57b1453d@chocolatine.cbg.collabora.co.uk>
@ 2011-01-21 14:39   ` Alban Crequy
  2011-01-21 14:39   ` Alban Crequy
                     ` (6 subsequent siblings)
  7 siblings, 0 replies; 19+ messages in thread
From: Alban Crequy @ 2011-01-21 14:39 UTC (permalink / raw)
  To: David S. Miller, Eric Dumazet, Lennart Poettering, netdev,
	linux-doc, linux-kernel, Alban Crequy, Ian Molton
  Cc: Alban Crequy

When a socket subscribed to a multicast group has its incoming queue full, it
can either block the emission to the multicast group or let the messages be
dropped. The latter is useful to monitor all messages without slowing down the
traffic.

It is specified with the flag UNIX_MREQ_DROP_WHEN_FULL when the multicast group
is joined.

poll(POLLOUT) is implemented by checking all receiving queues of subscribed
sockets. If only one of them has its receiving queue full and does not have
UNIX_MREQ_DROP_WHEN_FULL, the multicast socket is not writeable.

Signed-off-by: Alban Crequy <alban.crequy@collabora.co.uk>
Reviewed-by: Ian Molton <ian.molton@collabora.co.uk>
---
 net/unix/af_unix.c |   33 +++++++++++++++++++++++++++++++++
 1 files changed, 33 insertions(+), 0 deletions(-)

diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index 4147d64..138d9a2 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -2940,6 +2940,11 @@ static unsigned int unix_dgram_poll(struct file *file, struct socket *sock,
 {
 	struct sock *sk = sock->sk, *other;
 	unsigned int mask, writable;
+#ifdef CONFIG_UNIX_MULTICAST
+	struct sock_set *others;
+	int err = 0;
+	int i;
+#endif
 
 	sock_poll_wait(file, sk_sleep(sk), wait);
 	mask = 0;
@@ -2980,6 +2985,34 @@ static unsigned int unix_dgram_poll(struct file *file, struct socket *sock,
 		sock_put(other);
 	}
 
+#ifdef CONFIG_UNIX_MULTICAST
+	/*
+	 * On multicast sockets, we need to check if the receiving queue is
+	 * full on all peers who don't have UNIX_MREQ_DROP_WHEN_FULL.
+	 */
+	if (!other || !unix_sk(other)->mcast_group)
+		goto skip_multicast;
+	others = unix_find_multicast_recipients(sk,
+		unix_sk(other)->mcast_group, &err);
+	if (!others)
+		goto skip_multicast;
+	for (i = others->offset ; i < others->cnt ; i++) {
+		if (others->items[i].flags & UNIX_MREQ_DROP_WHEN_FULL)
+			continue;
+		if (unix_peer(others->items[i].s) != sk) {
+			sock_poll_wait(file,
+				&unix_sk(others->items[i].s)->peer_wait, wait);
+			if (unix_recvq_full(others->items[i].s)) {
+				writable = 0;
+				break;
+			}
+		}
+	}
+	up_sock_set(others);
+
+skip_multicast:
+#endif
+
 	if (writable)
 		mask |= POLLOUT | POLLWRNORM | POLLWRBAND;
 	else
-- 
1.7.2.3


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 7/8] af_unix: implement poll(POLLOUT) for multicast sockets
@ 2011-01-21 14:39   ` Alban Crequy
  0 siblings, 0 replies; 19+ messages in thread
From: Alban Crequy @ 2011-01-21 14:39 UTC (permalink / raw)
  To: David S. Miller, Eric Dumazet, Lennart Poettering, netdev,
	linux-doc, linux-
  Cc: Alban Crequy

When a socket subscribed to a multicast group has its incoming queue full, it
can either block the emission to the multicast group or let the messages be
dropped. The latter is useful to monitor all messages without slowing down the
traffic.

It is specified with the flag UNIX_MREQ_DROP_WHEN_FULL when the multicast group
is joined.

poll(POLLOUT) is implemented by checking all receiving queues of subscribed
sockets. If only one of them has its receiving queue full and does not have
UNIX_MREQ_DROP_WHEN_FULL, the multicast socket is not writeable.

Signed-off-by: Alban Crequy <alban.crequy@collabora.co.uk>
Reviewed-by: Ian Molton <ian.molton@collabora.co.uk>
---
 net/unix/af_unix.c |   33 +++++++++++++++++++++++++++++++++
 1 files changed, 33 insertions(+), 0 deletions(-)

diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index 4147d64..138d9a2 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -2940,6 +2940,11 @@ static unsigned int unix_dgram_poll(struct file *file, struct socket *sock,
 {
 	struct sock *sk = sock->sk, *other;
 	unsigned int mask, writable;
+#ifdef CONFIG_UNIX_MULTICAST
+	struct sock_set *others;
+	int err = 0;
+	int i;
+#endif
 
 	sock_poll_wait(file, sk_sleep(sk), wait);
 	mask = 0;
@@ -2980,6 +2985,34 @@ static unsigned int unix_dgram_poll(struct file *file, struct socket *sock,
 		sock_put(other);
 	}
 
+#ifdef CONFIG_UNIX_MULTICAST
+	/*
+	 * On multicast sockets, we need to check if the receiving queue is
+	 * full on all peers who don't have UNIX_MREQ_DROP_WHEN_FULL.
+	 */
+	if (!other || !unix_sk(other)->mcast_group)
+		goto skip_multicast;
+	others = unix_find_multicast_recipients(sk,
+		unix_sk(other)->mcast_group, &err);
+	if (!others)
+		goto skip_multicast;
+	for (i = others->offset ; i < others->cnt ; i++) {
+		if (others->items[i].flags & UNIX_MREQ_DROP_WHEN_FULL)
+			continue;
+		if (unix_peer(others->items[i].s) != sk) {
+			sock_poll_wait(file,
+				&unix_sk(others->items[i].s)->peer_wait, wait);
+			if (unix_recvq_full(others->items[i].s)) {
+				writable = 0;
+				break;
+			}
+		}
+	}
+	up_sock_set(others);
+
+skip_multicast:
+#endif
+
 	if (writable)
 		mask |= POLLOUT | POLLWRNORM | POLLWRBAND;
 	else
-- 
1.7.2.3

^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 8/8] af_unix: Unsubscribe sockets from their multicast groups on RCV_SHUTDOWN
       [not found] <20110121143751.57b1453d@chocolatine.cbg.collabora.co.uk>
@ 2011-01-21 14:39   ` Alban Crequy
  2011-01-21 14:39   ` Alban Crequy
                     ` (6 subsequent siblings)
  7 siblings, 0 replies; 19+ messages in thread
From: Alban Crequy @ 2011-01-21 14:39 UTC (permalink / raw)
  To: David S. Miller, Eric Dumazet, Lennart Poettering, netdev,
	linux-doc, linux-kernel, Alban Crequy, Ian Molton
  Cc: Alban Crequy

Signed-off-by: Alban Crequy <alban.crequy@collabora.co.uk>
Reviewed-by: Ian Molton <ian.molton@collabora.co.uk>
---
 net/unix/af_unix.c |   35 +++++++++++++++++++++++++++++++++++
 1 files changed, 35 insertions(+), 0 deletions(-)

diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index 138d9a2..9b281cf 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -2820,6 +2820,10 @@ static int unix_shutdown(struct socket *sock, int mode)
 {
 	struct sock *sk = sock->sk;
 	struct sock *other;
+#ifdef CONFIG_UNIX_MULTICAST
+	struct unix_sock *u = unix_sk(sk);
+	int unsubscribed = 0;
+#endif
 
 	mode = (mode+1)&(RCV_SHUTDOWN|SEND_SHUTDOWN);
 
@@ -2831,7 +2835,38 @@ static int unix_shutdown(struct socket *sock, int mode)
 	other = unix_peer(sk);
 	if (other)
 		sock_hold(other);
+
+#ifdef CONFIG_UNIX_MULTICAST
+	/* If the socket subscribed to a multicast group and it is shutdown
+	 * with (mode&RCV_SHUTDOWN), it should be unsubscribed or at least
+	 * stop blocking the peers */
+	if (mode&RCV_SHUTDOWN) {
+		struct unix_mcast *node;
+		struct hlist_node *pos;
+		struct hlist_node *pos_tmp;
+
+		spin_lock(&unix_multicast_lock);
+		hlist_for_each_entry_safe(node, pos, pos_tmp,
+					  &u->mcast_subscriptions,
+					  subscription_node) {
+			hlist_del_rcu(&node->member_node);
+			hlist_del_rcu(&node->subscription_node);
+			atomic_dec(&node->group->mcast_members_cnt);
+			atomic_inc(&node->group->mcast_membership_generation);
+			hlist_add_head_rcu(&node->member_dead_node,
+					   &node->group->mcast_dead_members);
+			unsubscribed = 1;
+		}
+		spin_unlock(&unix_multicast_lock);
+	}
+#endif
 	unix_state_unlock(sk);
+
+#ifdef CONFIG_UNIX_MULTICAST
+	if (unsubscribed)
+		wake_up_interruptible_all(&u->peer_wait);
+#endif
+
 	sk->sk_state_change(sk);
 
 	if (other &&
-- 
1.7.2.3


^ permalink raw reply related	[flat|nested] 19+ messages in thread

* [PATCH 8/8] af_unix: Unsubscribe sockets from their multicast groups on RCV_SHUTDOWN
@ 2011-01-21 14:39   ` Alban Crequy
  0 siblings, 0 replies; 19+ messages in thread
From: Alban Crequy @ 2011-01-21 14:39 UTC (permalink / raw)
  To: David S. Miller, Eric Dumazet, Lennart Poettering, netdev,
	linux-doc, linux-
  Cc: Alban Crequy

Signed-off-by: Alban Crequy <alban.crequy@collabora.co.uk>
Reviewed-by: Ian Molton <ian.molton@collabora.co.uk>
---
 net/unix/af_unix.c |   35 +++++++++++++++++++++++++++++++++++
 1 files changed, 35 insertions(+), 0 deletions(-)

diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
index 138d9a2..9b281cf 100644
--- a/net/unix/af_unix.c
+++ b/net/unix/af_unix.c
@@ -2820,6 +2820,10 @@ static int unix_shutdown(struct socket *sock, int mode)
 {
 	struct sock *sk = sock->sk;
 	struct sock *other;
+#ifdef CONFIG_UNIX_MULTICAST
+	struct unix_sock *u = unix_sk(sk);
+	int unsubscribed = 0;
+#endif
 
 	mode = (mode+1)&(RCV_SHUTDOWN|SEND_SHUTDOWN);
 
@@ -2831,7 +2835,38 @@ static int unix_shutdown(struct socket *sock, int mode)
 	other = unix_peer(sk);
 	if (other)
 		sock_hold(other);
+
+#ifdef CONFIG_UNIX_MULTICAST
+	/* If the socket subscribed to a multicast group and it is shutdown
+	 * with (mode&RCV_SHUTDOWN), it should be unsubscribed or at least
+	 * stop blocking the peers */
+	if (mode&RCV_SHUTDOWN) {
+		struct unix_mcast *node;
+		struct hlist_node *pos;
+		struct hlist_node *pos_tmp;
+
+		spin_lock(&unix_multicast_lock);
+		hlist_for_each_entry_safe(node, pos, pos_tmp,
+					  &u->mcast_subscriptions,
+					  subscription_node) {
+			hlist_del_rcu(&node->member_node);
+			hlist_del_rcu(&node->subscription_node);
+			atomic_dec(&node->group->mcast_members_cnt);
+			atomic_inc(&node->group->mcast_membership_generation);
+			hlist_add_head_rcu(&node->member_dead_node,
+					   &node->group->mcast_dead_members);
+			unsubscribed = 1;
+		}
+		spin_unlock(&unix_multicast_lock);
+	}
+#endif
 	unix_state_unlock(sk);
+
+#ifdef CONFIG_UNIX_MULTICAST
+	if (unsubscribed)
+		wake_up_interruptible_all(&u->peer_wait);
+#endif
+
 	sk->sk_state_change(sk);
 
 	if (other &&
-- 
1.7.2.3

^ permalink raw reply related	[flat|nested] 19+ messages in thread

* Re: [PATCH 5/8] af_unix: find the recipients of a multicast group
  2011-01-21 14:39   ` Alban Crequy
  (?)
@ 2011-01-21 17:24   ` Alban Crequy
  2011-01-22  0:58     ` David Miller
  -1 siblings, 1 reply; 19+ messages in thread
From: Alban Crequy @ 2011-01-21 17:24 UTC (permalink / raw)
  To: Alban Crequy
  Cc: David S. Miller, Eric Dumazet, Lennart Poettering, netdev,
	linux-kernel, Ian Molton

[drop Cc on linux-doc]

I've got a this message with my multicast patches:

[  109.314741] =================================
[  109.316007] [ INFO: inconsistent lock state ]
[  109.316007] 2.6.38-rc1+ #14
[  109.316007] ---------------------------------
[  109.316007] inconsistent {SOFTIRQ-ON-W} -> {IN-SOFTIRQ-W} usage.
[  109.316007] ksoftirqd/1/9 [HC0[0]:SC1[1]:HE0:SE0] takes:
[  109.316007]  (&af_unix_sk_receive_queue_lock_key){+.?...}, at: [<c1256028>] skb_dequeue+0x12/0x4a
[  109.316007] {SOFTIRQ-ON-W} state was registered at:
[  109.316007]   [<c105b9b9>] __lock_acquire+0x2df/0xb95
[  109.316007]   [<c105c334>] lock_acquire+0xc5/0xe6
[  109.316007]   [<c12fd21d>] _raw_spin_lock+0x33/0x40
[  109.316007]   [<e080cbc8>] unix_stream_connect+0x34f/0x3d5 [unix]
[  109.316007]   [<c1250918>] sys_connect+0x7c/0xb2
[  109.316007]   [<c125169e>] sys_socketcall+0xb0/0x289
[  109.316007]   [<c12fdb4c>] syscall_call+0x7/0xb
[  109.316007] irq event stamp: 463879
[  109.316007] hardirqs last  enabled at (463878): [<c10c8d3c>] kmem_cache_free+0xa4/0xe2
[  109.316007] hardirqs last disabled at (463879): [<c12fd2ed>] _raw_spin_lock_irqsave+0x1d/0x57
[  109.316007] softirqs last  enabled at (463638): [<c10385d9>] __do_softirq+0x17c/0x190
[  109.316007] softirqs last disabled at (463641): [<c1004bd3>] do_softirq+0x60/0xb9
[  109.316007] 
[  109.316007] other info that might help us debug this:
[  109.316007] no locks held by ksoftirqd/1/9.
[  109.316007] 
[  109.316007] stack backtrace:
[  109.316007] Pid: 9, comm: ksoftirqd/1 Not tainted 2.6.38-rc1+ #14
[  109.316007] Call Trace:
[  109.316007]  [<c105a70f>] ? valid_state+0x168/0x174
[  109.316007]  [<c105a803>] ? mark_lock+0xe8/0x1e8
[  109.316007]  [<c105aefb>] ? check_usage_forwards+0x0/0x77
[  109.316007]  [<c105b94b>] ? __lock_acquire+0x271/0xb95
[  109.316007]  [<c1059af3>] ? register_lock_class+0x17/0x2a4
[  109.316007]  [<c105a739>] ? mark_lock+0x1e/0x1e8
[  109.316007]  [<c1059787>] ? trace_hardirqs_off+0xb/0xd
[  109.316007]  [<c105ace5>] ? debug_check_no_locks_freed+0x115/0x12d
[  109.316007]  [<c1256028>] ? skb_dequeue+0x12/0x4a
[  109.316007]  [<c105c334>] ? lock_acquire+0xc5/0xe6
[  109.316007]  [<c1256028>] ? skb_dequeue+0x12/0x4a
[  109.316007]  [<c12fd317>] ? _raw_spin_lock_irqsave+0x47/0x57
[  109.316007]  [<c1256028>] ? skb_dequeue+0x12/0x4a
[  109.316007]  [<c1256028>] ? skb_dequeue+0x12/0x4a
[  109.316007]  [<c1256a75>] ? skb_queue_purge+0x14/0x1b
[  109.316007]  [<e080cc62>] ? unix_sock_destructor+0x14/0xb6 [unix]
[  109.316007]  [<c12532fe>] ? __sk_free+0x17/0x13f
[  109.316007]  [<c105ab89>] ? trace_hardirqs_on_caller+0xeb/0x125
[  109.316007]  [<c1253488>] ? sk_free+0x16/0x18
[  109.316007]  [<e0809f74>] ? sock_put+0x13/0x15 [unix]
[  109.316007]  [<e080a107>] ? kfree_sock_set+0x21/0x36 [unix]
[  109.316007]  [<e080a127>] ? sock_set_reclaim+0xb/0xd [unix]
[  109.316007]  [<c1080068>] ? __rcu_process_callbacks+0x176/0x26b
[  109.316007]  [<c108017b>] ? rcu_process_callbacks+0x1e/0x3b
[  109.316007]  [<c103850e>] ? __do_softirq+0xb1/0x190
[  109.316007]  [<c103845d>] ? __do_softirq+0x0/0x190
[  109.316007]  <IRQ>  [<c1037d27>] ? run_ksoftirqd+0x57/0xd3
[  109.316007]  [<c1037cd0>] ? run_ksoftirqd+0x0/0xd3
[  109.316007]  [<c104a930>] ? kthread+0x6d/0x72
[  109.316007]  [<c104a8c3>] ? kthread+0x0/0x72
[  109.316007]  [<c1003742>] ? kernel_thread_helper+0x6/0x10

The socket is released and skb is dequeued in a call_rcu() callback:

> +	/* Take the lock to insert the new list but take the opportunity to do
> +	 * some garbage collection on outdated lists */
> +	spin_lock(&unix_multicast_lock);
> +	hlist_for_each_entry_rcu(del_set, pos, &group->mcast_members_lists,
> +			     list) {
> +		if (down_trylock(&del_set->sem)) {
> +			/* the list is being used by someone else */
> +			continue;
> +		}
> +		if (del_set->generation < generation) {
> +			hlist_del_rcu(&del_set->list);
> +			call_rcu(&del_set->rcu, sock_set_reclaim);

The purpose of that chunk is to release outdated struct sock_set soon
enough instead of doing it in destroy_mcast_group(). So senders of
multicast messages don't have to iterate on outdated sock_set when
they are looking for an available set of sockets.

In af_unix.c, lockdep annotations (a09785a2):
/*
 * AF_UNIX sockets do not interact with hardware, hence they
 * dont trigger interrupts - so it's safe for them to have
 * bh-unsafe locking for their sk_receive_queue.lock. Split off
 * this special lock-class by reinitializing the spinlock key:
 */
static struct lock_class_key af_unix_sk_receive_queue_lock_key;

       lockdep_set_class(&sk->sk_receive_queue.lock,
                               &af_unix_sk_receive_queue_lock_key);


I don't know if I should avoid releasing sockets in RCU callbacks or
update the lockdep annotations.

-- 
Alban

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 5/8] af_unix: find the recipients of a multicast group
  2011-01-21 17:24   ` Alban Crequy
@ 2011-01-22  0:58     ` David Miller
  0 siblings, 0 replies; 19+ messages in thread
From: David Miller @ 2011-01-22  0:58 UTC (permalink / raw)
  To: alban.crequy; +Cc: eric.dumazet, lennart, netdev, linux-kernel, ian.molton

From: Alban Crequy <alban.crequy@collabora.co.uk>
Date: Fri, 21 Jan 2011 17:24:20 +0000

> I don't know if I should avoid releasing sockets in RCU callbacks or
> update the lockdep annotations.

Releasing sockets in RCU callbacks is dangerous at best.

^ permalink raw reply	[flat|nested] 19+ messages in thread

* Re: [PATCH 7/8] af_unix: implement poll(POLLOUT) for multicast sockets
  2011-01-21 14:39   ` Alban Crequy
  (?)
@ 2011-02-09 14:14   ` Alban Crequy
  -1 siblings, 0 replies; 19+ messages in thread
From: Alban Crequy @ 2011-02-09 14:14 UTC (permalink / raw)
  To: Alban Crequy
  Cc: David S. Miller, Eric Dumazet, Lennart Poettering, netdev,
	linux-kernel, Ian Molton

Le Fri, 21 Jan 2011 14:39:47 +0000,
Alban Crequy <alban.crequy@collabora.co.uk> a écrit :

> diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c
> index 4147d64..138d9a2 100644
> --- a/net/unix/af_unix.c
> +++ b/net/unix/af_unix.c
...
>  	sock_poll_wait(file, sk_sleep(sk), wait);
                             ^^^^^^^^^^^^
> +#ifdef CONFIG_UNIX_MULTICAST
> +	/*
> +	 * On multicast sockets, we need to check if the receiving queue is
> +	 * full on all peers who don't have UNIX_MREQ_DROP_WHEN_FULL.
> +	 */
> +	if (!other || !unix_sk(other)->mcast_group)
> +		goto skip_multicast;
> +	others = unix_find_multicast_recipients(sk,
> +		unix_sk(other)->mcast_group, &err);
> +	if (!others)
> +		goto skip_multicast;
> +	for (i = others->offset ; i < others->cnt ; i++) {
> +		if (others->items[i].flags & UNIX_MREQ_DROP_WHEN_FULL)
> +			continue;
> +		if (unix_peer(others->items[i].s) != sk) {
> +			sock_poll_wait(file,
> +				&unix_sk(others->items[i].s)->peer_wait, wait);
                                                              ^^^^^^^^^

This code does not work correctly: a poller cannot sleep on two wait
queues at the same time. When the poller is added in ->peer_wait, it
will not be in sk_sleep(sk) so it will miss POLLIN events.

I think I need another wait queue at the group level for waiters of
POLLIN|POLLOUT events. Waiters on that global wait queue would be woken
up when a message is delivered to any peer (in unix_dgram_sendmsg along
sk_data_ready) and when a member of the group receives a message (in
unix_dgram_recvmsg along wake_up_interruptible_sync_poll).


^ permalink raw reply	[flat|nested] 19+ messages in thread

end of thread, other threads:[~2011-02-09 14:16 UTC | newest]

Thread overview: 19+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
     [not found] <20110121143751.57b1453d@chocolatine.cbg.collabora.co.uk>
2011-01-21 14:39 ` [PATCH 1/8] af_unix: Documentation on multicast unix sockets Alban Crequy
2011-01-21 14:39   ` Alban Crequy
2011-01-21 14:39 ` [PATCH 2/8] af_unix: Add constant for unix socket options level Alban Crequy
2011-01-21 14:39   ` Alban Crequy
2011-01-21 14:39 ` [PATCH 3/8] af_unix: add setsockopt on unix sockets Alban Crequy
2011-01-21 14:39   ` Alban Crequy
2011-01-21 14:39 ` [PATCH 4/8] af_unix: create, join and leave multicast groups with setsockopt Alban Crequy
2011-01-21 14:39   ` Alban Crequy
2011-01-21 14:39 ` [PATCH 5/8] af_unix: find the recipients of a multicast group Alban Crequy
2011-01-21 14:39   ` Alban Crequy
2011-01-21 17:24   ` Alban Crequy
2011-01-22  0:58     ` David Miller
2011-01-21 14:39 ` [PATCH 6/8] af_unix: Deliver message to several recipients in case of multicast Alban Crequy
2011-01-21 14:39   ` Alban Crequy
2011-01-21 14:39 ` [PATCH 7/8] af_unix: implement poll(POLLOUT) for multicast sockets Alban Crequy
2011-01-21 14:39   ` Alban Crequy
2011-02-09 14:14   ` Alban Crequy
2011-01-21 14:39 ` [PATCH 8/8] af_unix: Unsubscribe sockets from their multicast groups on RCV_SHUTDOWN Alban Crequy
2011-01-21 14:39   ` Alban Crequy

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.