linux-kernel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
From: Leonard Crestez <cdleonard@gmail.com>
To: David Ahern <dsahern@kernel.org>, Shuah Khan <shuah@kernel.org>,
	Dmitry Safonov <0x7f454c46@gmail.com>,
	Eric Dumazet <edumazet@google.com>
Cc: "David S. Miller" <davem@davemloft.net>,
	Herbert Xu <herbert@gondor.apana.org.au>,
	Kuniyuki Iwashima <kuniyu@amazon.co.jp>,
	Hideaki YOSHIFUJI <yoshfuji@linux-ipv6.org>,
	Jakub Kicinski <kuba@kernel.org>,
	Yuchung Cheng <ycheng@google.com>,
	Francesco Ruggeri <fruggeri@arista.com>,
	Mat Martineau <mathew.j.martineau@linux.intel.com>,
	Christoph Paasch <cpaasch@apple.com>,
	Ivan Delalande <colona@arista.com>,
	Priyaranjan Jha <priyarjha@google.com>,
	netdev@vger.kernel.org, linux-crypto@vger.kernel.org,
	linux-kselftest@vger.kernel.org, linux-kernel@vger.kernel.org
Subject: [PATCH v2 04/25] selftests: tcp_authopt: Initial sockopt manipulation
Date: Mon,  1 Nov 2021 18:34:39 +0200	[thread overview]
Message-ID: <f4f651740378974dad93bc97ced6d5cc97ffb373.1635784253.git.cdleonard@gmail.com> (raw)
In-Reply-To: <cover.1635784253.git.cdleonard@gmail.com>

Add a python translation of the linux ABI for tcpao and test the
behavior of TCP_AUTHOPT and TCP_AUTHOPT_KEY sockopts.

This includes several corner cases not normally covered by traffic
tests.

Signed-off-by: Leonard Crestez <cdleonard@gmail.com>
---
 .../tcp_authopt/tcp_authopt_test/conftest.py  |  71 +++++
 .../tcp_authopt_test/linux_tcp_authopt.py     | 266 ++++++++++++++++++
 .../tcp_authopt/tcp_authopt_test/sockaddr.py  | 122 ++++++++
 .../tcp_authopt_test/test_sockopt.py          | 203 +++++++++++++
 4 files changed, 662 insertions(+)
 create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/conftest.py
 create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/linux_tcp_authopt.py
 create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/sockaddr.py
 create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_sockopt.py

diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/conftest.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/conftest.py
new file mode 100644
index 000000000000..a06ba848669d
--- /dev/null
+++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/conftest.py
@@ -0,0 +1,71 @@
+# SPDX-License-Identifier: GPL-2.0
+import logging
+import os
+from contextlib import ExitStack, nullcontext
+from typing import ContextManager
+
+import pytest
+
+from .linux_tcp_authopt import enable_sysctl_tcp_authopt, has_tcp_authopt
+
+logger = logging.getLogger(__name__)
+
+skipif_missing_tcp_authopt = pytest.mark.skipif(
+    not has_tcp_authopt(), reason="Need CONFIG_TCP_AUTHOPT"
+)
+
+
+def get_effective_capabilities():
+    for line in open("/proc/self/status", "r"):
+        if line.startswith("CapEff:"):
+            return int(line.split(":")[1], 16)
+
+
+def has_effective_capability(bit) -> bool:
+    return get_effective_capabilities() & (1 << bit) != 0
+
+
+def can_capture() -> bool:
+    return has_effective_capability(13)
+
+
+def raise_skip_no_netns():
+    if not has_effective_capability(12):
+        pytest.skip("Need CAP_NET_ADMIN for network namespaces")
+
+
+skipif_cant_capture = pytest.mark.skipif(
+    not can_capture(), reason="run as root to capture packets"
+)
+
+
+@pytest.fixture
+def exit_stack():
+    """Return a contextlib.ExitStack as a pytest fixture
+
+    This reduces indentation making code more readable
+    """
+    with ExitStack() as exit_stack:
+        yield exit_stack
+
+
+def pytest_configure():
+    # Silence messages regarding netns enter/exit:
+    logging.getLogger("nsenter").setLevel(logging.INFO)
+    if has_tcp_authopt():
+        enable_sysctl_tcp_authopt()
+
+
+def parametrize_product(**kw):
+    """Parametrize each key to each item in the value list"""
+    import itertools
+
+    return pytest.mark.parametrize(",".join(kw.keys()), itertools.product(*kw.values()))
+
+
+def raises_optional_exception(expected_exception, **kw) -> ContextManager:
+    """Like pytest.raises except accept expected_exception=None"""
+    if expected_exception is None:
+        return nullcontext()
+    else:
+        return pytest.raises(expected_exception, **kw)
diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/linux_tcp_authopt.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/linux_tcp_authopt.py
new file mode 100644
index 000000000000..b9dc9decda07
--- /dev/null
+++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/linux_tcp_authopt.py
@@ -0,0 +1,266 @@
+# SPDX-License-Identifier: GPL-2.0
+"""Python wrapper around linux TCP_AUTHOPT ABI"""
+
+import errno
+import logging
+import socket
+import struct
+import typing
+from dataclasses import dataclass
+from enum import IntEnum, IntFlag
+
+from .sockaddr import (
+    SockaddrConvertType,
+    sockaddr_base,
+    sockaddr_convert,
+    sockaddr_storage,
+    sockaddr_unpack,
+)
+
+logger = logging.getLogger(__name__)
+
+
+def BIT(x):
+    return 1 << x
+
+
+TCP_AUTHOPT = 38
+TCP_AUTHOPT_KEY = 39
+
+TCP_AUTHOPT_MAXKEYLEN = 80
+
+
+class TCP_AUTHOPT_FLAG(IntFlag):
+    REJECT_UNEXPECTED = BIT(2)
+
+
+class TCP_AUTHOPT_KEY_FLAG(IntFlag):
+    DEL = BIT(0)
+    EXCLUDE_OPTS = BIT(1)
+    BIND_ADDR = BIT(2)
+
+
+class TCP_AUTHOPT_ALG(IntEnum):
+    HMAC_SHA_1_96 = 1
+    AES_128_CMAC_96 = 2
+
+
+@dataclass
+class tcp_authopt:
+    """Like linux struct tcp_authopt"""
+
+    flags: int = 0
+    sizeof = 4
+
+    def pack(self) -> bytes:
+        return struct.pack(
+            "I",
+            self.flags,
+        )
+
+    def __bytes__(self):
+        return self.pack()
+
+    @classmethod
+    def unpack(cls, b: bytes):
+        tup = struct.unpack("I", b)
+        return cls(*tup)
+
+
+def set_tcp_authopt(sock, opt: tcp_authopt):
+    return sock.setsockopt(socket.SOL_TCP, TCP_AUTHOPT, bytes(opt))
+
+
+def get_tcp_authopt(sock: socket.socket) -> tcp_authopt:
+    b = sock.getsockopt(socket.SOL_TCP, TCP_AUTHOPT, tcp_authopt.sizeof)
+    return tcp_authopt.unpack(b)
+
+
+class tcp_authopt_key:
+    """Like linux struct tcp_authopt_key
+
+    :ivar auto_flags: If true(default) then set "binding" flags based on non-null values attributes.
+    """
+
+    KeyArgType = typing.Union[str, bytes]
+    AddrArgType = typing.Union[None, str, bytes, SockaddrConvertType]
+
+    def __init__(
+        self,
+        flags: TCP_AUTHOPT_KEY_FLAG = TCP_AUTHOPT_KEY_FLAG(0),
+        send_id: int = 0,
+        recv_id: int = 0,
+        alg=TCP_AUTHOPT_ALG.HMAC_SHA_1_96,
+        key: KeyArgType = b"",
+        addr: AddrArgType = None,
+        auto_flags: bool = True,
+        include_options=None,
+    ):
+        self.flags = flags
+        self.send_id = send_id
+        self.recv_id = recv_id
+        self.alg = alg
+        self.key = key
+        self.addr = addr
+        self.auto_flags = auto_flags
+        if include_options is not None:
+            self.include_options = include_options
+
+    def get_real_flags(self) -> TCP_AUTHOPT_KEY_FLAG:
+        result = self.flags
+        if self.auto_flags:
+            if self.addr is not None:
+                result |= TCP_AUTHOPT_KEY_FLAG.BIND_ADDR
+            else:
+                result &= ~TCP_AUTHOPT_KEY_FLAG.BIND_ADDR
+        return result
+
+    def pack(self):
+        if len(self.key) > TCP_AUTHOPT_MAXKEYLEN:
+            raise ValueError(f"Max key length is {TCP_AUTHOPT_MAXKEYLEN}")
+        data = struct.pack(
+            "IBBBB80s",
+            self.get_real_flags(),
+            self.send_id,
+            self.recv_id,
+            self.alg,
+            len(self.key),
+            self.key,
+        )
+        data += bytes(self.addrbuf.ljust(sockaddr_storage.sizeof, b"\x00"))
+        return data
+
+    def __bytes__(self):
+        return self.pack()
+
+    @property
+    def key(self) -> KeyArgType:
+        return self._key
+
+    @key.setter
+    def key(self, val: KeyArgType) -> bytes:
+        if isinstance(val, str):
+            val = val.encode("utf-8")
+        if len(val) > TCP_AUTHOPT_MAXKEYLEN:
+            raise ValueError(f"Max key length is {TCP_AUTHOPT_MAXKEYLEN}")
+        self._key = val
+        return val
+
+    @property
+    def addr(self):
+        if not self.addrbuf:
+            return None
+        else:
+            return sockaddr_unpack(bytes(self.addrbuf))
+
+    @addr.setter
+    def addr(self, val: AddrArgType):
+        if isinstance(val, bytes):
+            if len(val) > sockaddr_storage.sizeof:
+                raise ValueError(f"Must be up to {sockaddr_storage.sizeof}")
+            self.addrbuf = val
+        elif val is None:
+            self.addrbuf = b""
+        elif isinstance(val, sockaddr_base):
+            self.addr = bytes(val)
+        else:
+            self.addr = sockaddr_convert(val)
+        return self.addr
+
+    @property
+    def include_options(self) -> bool:
+        return not self.flags & TCP_AUTHOPT_KEY_FLAG.EXCLUDE_OPTS
+
+    @include_options.setter
+    def include_options(self, value) -> bool:
+        if value:
+            self.flags &= ~TCP_AUTHOPT_KEY_FLAG.EXCLUDE_OPTS
+        else:
+            self.flags |= TCP_AUTHOPT_KEY_FLAG.EXCLUDE_OPTS
+        return value
+
+    @property
+    def delete_flag(self) -> bool:
+        return bool(self.flags & TCP_AUTHOPT_KEY_FLAG.DEL)
+
+    @delete_flag.setter
+    def delete_flag(self, value) -> bool:
+        if value:
+            self.flags |= TCP_AUTHOPT_KEY_FLAG.DEL
+        else:
+            self.flags &= ~TCP_AUTHOPT_KEY_FLAG.DEL
+        return value
+
+
+def set_tcp_authopt_key(sock, keyopt: tcp_authopt_key):
+    return sock.setsockopt(socket.SOL_TCP, TCP_AUTHOPT_KEY, bytes(keyopt))
+
+
+def set_tcp_authopt_key_kwargs(sock, keyopt: tcp_authopt_key = None, **kw):
+    if keyopt is None:
+        keyopt = tcp_authopt_key()
+    for k, v in kw.items():
+        setattr(keyopt, k, v)
+    return set_tcp_authopt_key(sock, keyopt)
+
+
+def del_tcp_authopt_key(sock, key: tcp_authopt_key) -> bool:
+    """Try to delete an authopt key
+
+    :return: True if a key was deleted, False if it was not present
+    """
+    import copy
+
+    key = copy.copy(key)
+    key.delete_flag = True
+    try:
+        sock.setsockopt(socket.SOL_TCP, TCP_AUTHOPT_KEY, bytes(key))
+        return True
+    except OSError as e:
+        if e.errno == errno.ENOENT:
+            return False
+        raise
+
+
+def get_sysctl_tcp_authopt() -> typing.Optional[bool]:
+    from pathlib import Path
+
+    path = Path("/proc/sys/net/ipv4/tcp_authopt")
+    if path.exists():
+        return path.read_text().strip() != "0"
+    else:
+        return None
+
+
+def enable_sysctl_tcp_authopt():
+    from pathlib import Path
+
+    path = Path("/proc/sys/net/ipv4/tcp_authopt")
+    # Do nothing if absent
+    if not path.exists():
+        return
+    try:
+        if path.read_text().strip() == "0":
+            path.write_text("1")
+    except:
+        raise Exception("Failed to enable /proc/sys/net/ipv4/tcp_authopt")
+
+
+def has_tcp_authopt() -> bool:
+    """Check is TCP_AUTHOPT is implemented by the OS
+
+    Returns True if implemented but disabled by sysctl
+    Returns False if disabled at compile time
+    """
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+        try:
+            optbuf = bytes(4)
+            sock.setsockopt(socket.SOL_TCP, TCP_AUTHOPT, optbuf)
+            return True
+        except OSError as e:
+            if e.errno == errno.ENOPROTOOPT:
+                return False
+            elif e.errno == errno.EPERM and get_sysctl_tcp_authopt() is False:
+                return True
+            else:
+                raise
diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/sockaddr.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/sockaddr.py
new file mode 100644
index 000000000000..3ad22c0b4015
--- /dev/null
+++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/sockaddr.py
@@ -0,0 +1,122 @@
+# SPDX-License-Identifier: GPL-2.0
+"""pack/unpack wrappers for sockaddr"""
+import socket
+import struct
+import typing
+from dataclasses import dataclass
+from ipaddress import IPv4Address, IPv6Address, ip_address
+
+
+class sockaddr_base:
+    def pack(self) -> bytes:
+        raise NotImplementedError()
+
+    def __bytes__(self):
+        return self.pack()
+
+
+class sockaddr_in(sockaddr_base):
+    port: int
+    addr: IPv4Address
+    sizeof = 8
+
+    def __init__(self, port=0, addr=None):
+        self.port = port
+        if addr is None:
+            addr = IPv4Address(0)
+        self.addr = IPv4Address(addr)
+
+    def pack(self):
+        return struct.pack("HH4s", socket.AF_INET, self.port, self.addr.packed)
+
+    @classmethod
+    def unpack(cls, buffer):
+        family, port, addr_packed = struct.unpack("HH4s", buffer[:8])
+        if family != socket.AF_INET:
+            raise ValueError(f"Must be AF_INET not {family}")
+        return cls(port, addr_packed)
+
+
+@dataclass
+class sockaddr_in6(sockaddr_base):
+    """Like sockaddr_in6 but for python. Always contains scope_id"""
+
+    port: int
+    addr: IPv6Address
+    flowinfo: int
+    scope_id: int
+    sizeof = 28
+
+    def __init__(self, port=0, addr=None, flowinfo=0, scope_id=0):
+        self.port = port
+        if addr is None:
+            addr = IPv6Address(0)
+        self.addr = IPv6Address(addr)
+        self.flowinfo = flowinfo
+        self.scope_id = scope_id
+
+    def pack(self):
+        return struct.pack(
+            "HHI16sI",
+            socket.AF_INET6,
+            self.port,
+            self.flowinfo,
+            self.addr.packed,
+            self.scope_id,
+        )
+
+    @classmethod
+    def unpack(cls, buffer):
+        family, port, flowinfo, addr_packed, scope_id = struct.unpack(
+            "HHI16sI", buffer[:28]
+        )
+        if family != socket.AF_INET6:
+            raise ValueError(f"Must be AF_INET6 not {family}")
+        return cls(port, addr_packed, flowinfo=flowinfo, scope_id=scope_id)
+
+
+@dataclass
+class sockaddr_storage(sockaddr_base):
+    family: int
+    data: bytes
+    sizeof = 128
+
+    def pack(self):
+        return struct.pack("H126s", self.family, self.data)
+
+    @classmethod
+    def unpack(cls, buffer):
+        return cls(*struct.unpack("H126s", buffer))
+
+
+def sockaddr_unpack(buffer: bytes):
+    """Unpack based on family"""
+    family = struct.unpack("H", buffer[:2])[0]
+    if family == socket.AF_INET:
+        return sockaddr_in.unpack(buffer)
+    elif family == socket.AF_INET6:
+        return sockaddr_in6.unpack(buffer)
+    else:
+        return sockaddr_storage.unpack(buffer)
+
+
+SockaddrConvertType = typing.Union[
+    sockaddr_in, sockaddr_in6, sockaddr_storage, IPv4Address, IPv6Address, str
+]
+
+
+def sockaddr_convert(val: SockaddrConvertType) -> sockaddr_base:
+    """Try to convert address into some sort of sockaddr"""
+    if (
+        isinstance(val, sockaddr_in)
+        or isinstance(val, sockaddr_in6)
+        or isinstance(val, sockaddr_storage)
+    ):
+        return val
+    if isinstance(val, IPv4Address):
+        return sockaddr_in(addr=val)
+    if isinstance(val, IPv6Address):
+        return sockaddr_in6(addr=val)
+    if isinstance(val, str):
+        return sockaddr_convert(ip_address(val))
+    raise TypeError(f"Don't know how to convert {val!r} to sockaddr")
diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_sockopt.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_sockopt.py
new file mode 100644
index 000000000000..41ebde8b1b7c
--- /dev/null
+++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_sockopt.py
@@ -0,0 +1,203 @@
+# SPDX-License-Identifier: GPL-2.0
+"""Test TCP_AUTHOPT sockopt API"""
+import errno
+import socket
+import struct
+from ipaddress import IPv4Address, IPv6Address
+
+import pytest
+
+from .conftest import skipif_missing_tcp_authopt
+from .linux_tcp_authopt import (
+    TCP_AUTHOPT,
+    TCP_AUTHOPT_ALG,
+    TCP_AUTHOPT_FLAG,
+    TCP_AUTHOPT_KEY,
+    TCP_AUTHOPT_KEY_FLAG,
+    del_tcp_authopt_key,
+    get_tcp_authopt,
+    set_tcp_authopt,
+    set_tcp_authopt_key,
+    tcp_authopt,
+    tcp_authopt_key,
+)
+from .sockaddr import sockaddr_in, sockaddr_in6, sockaddr_unpack
+
+pytestmark = skipif_missing_tcp_authopt
+
+
+def test_authopt_key_pack_noaddr():
+    b = bytes(tcp_authopt_key(key=b"a\x00b"))
+    assert b[7] == 3
+    assert b[8:13] == b"a\x00b\x00\x00"
+
+
+def test_authopt_key_pack_addr():
+    b = bytes(tcp_authopt_key(key=b"a\x00b", addr="10.0.0.1"))
+    assert struct.unpack("H", b[88:90])[0] == socket.AF_INET
+    assert sockaddr_unpack(b[88:]).addr == IPv4Address("10.0.0.1")
+
+
+def test_authopt_key_pack_addr6():
+    b = bytes(tcp_authopt_key(key=b"abc", addr="fd00::1"))
+    assert struct.unpack("H", b[88:90])[0] == socket.AF_INET6
+    assert sockaddr_unpack(b[88:]).addr == IPv6Address("fd00::1")
+
+
+def test_tcp_authopt_key_del_without_active(exit_stack):
+    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    exit_stack.push(sock)
+
+    # nothing happens:
+    key = tcp_authopt_key()
+    assert key.delete_flag is False
+    key.delete_flag = True
+    assert key.delete_flag is True
+    with pytest.raises(OSError) as e:
+        set_tcp_authopt_key(sock, key)
+    assert e.value.errno in [errno.EINVAL, errno.ENOENT]
+
+
+def test_tcp_authopt_key_setdel(exit_stack):
+    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    exit_stack.push(sock)
+    set_tcp_authopt(sock, tcp_authopt())
+
+    # delete returns ENOENT
+    key = tcp_authopt_key()
+    key.delete_flag = True
+    with pytest.raises(OSError) as e:
+        set_tcp_authopt_key(sock, key)
+    assert e.value.errno == errno.ENOENT
+
+    key = tcp_authopt_key(send_id=1, recv_id=2)
+    set_tcp_authopt_key(sock, key)
+    # First delete works fine:
+    key.delete_flag = True
+    set_tcp_authopt_key(sock, key)
+    # Duplicate delete returns ENOENT
+    with pytest.raises(OSError) as e:
+        set_tcp_authopt_key(sock, key)
+    assert e.value.errno == errno.ENOENT
+
+
+def test_get_tcp_authopt():
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+        with pytest.raises(OSError) as e:
+            sock.getsockopt(socket.SOL_TCP, TCP_AUTHOPT, 4)
+        assert e.value.errno == errno.ENOENT
+
+
+def test_set_get_tcp_authopt_flags():
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+        # No flags by default
+        set_tcp_authopt(sock, tcp_authopt())
+        opt = get_tcp_authopt(sock)
+        assert opt.flags == 0
+
+        # simple flags are echoed
+        goodflag = TCP_AUTHOPT_FLAG.REJECT_UNEXPECTED
+        set_tcp_authopt(sock, tcp_authopt(flags=goodflag))
+        opt = get_tcp_authopt(sock)
+        assert opt.flags == goodflag
+
+        # attempting to set a badflag returns an error and has no effect
+        badflag = 1 << 27
+        with pytest.raises(OSError) as e:
+            set_tcp_authopt(sock, tcp_authopt(flags=badflag))
+        opt = get_tcp_authopt(sock)
+        assert opt.flags == goodflag
+
+
+def test_set_ipv6_key_on_ipv4():
+    """Binding a key to an ipv6 address on an ipv4 socket makes no sense"""
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+        key = tcp_authopt_key("abc")
+        key.flags = TCP_AUTHOPT_KEY_FLAG.BIND_ADDR
+        key.addr = IPv6Address("::1234")
+        with pytest.raises(OSError):
+            set_tcp_authopt_key(sock, key)
+
+
+def test_set_ipv4_key_on_ipv6():
+    """This could be implemented for ipv6-mapped-ipv4 but it is not
+
+    TCP_MD5SIG has a similar limitation
+    """
+    with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock:
+        key = tcp_authopt_key("abc")
+        key.flags = TCP_AUTHOPT_KEY_FLAG.BIND_ADDR
+        key.addr = IPv4Address("1.2.3.4")
+        with pytest.raises(OSError):
+            set_tcp_authopt_key(sock, key)
+
+
+def test_authopt_key_badflags():
+    """Don't pretend to handle unknown flags"""
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+        with pytest.raises(OSError):
+            set_tcp_authopt_key(sock, tcp_authopt_key(flags=0xABCDEF))
+
+
+def test_authopt_key_longer_bad():
+    """Test that pass a longer sockopt with unknown data fails
+
+    Old kernels won't pretend to handle features they don't know about
+    """
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+        key = tcp_authopt_key(alg=TCP_AUTHOPT_ALG.HMAC_SHA_1_96, key="aaa")
+        optbuf = bytes(key)
+        optbuf = optbuf.ljust(len(optbuf) + 256, b"\x5a")
+        with pytest.raises(OSError):
+            sock.setsockopt(socket.SOL_TCP, TCP_AUTHOPT_KEY, optbuf)
+
+
+def test_authopt_key_longer_zeros():
+    """Test that passing a longer sockopt padded with zeros works
+
+    This ensures applications using a larger struct tcp_authopt_key won't have
+    to pass a shorter optlen on old kernels.
+    """
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+        key = tcp_authopt_key(alg=TCP_AUTHOPT_ALG.HMAC_SHA_1_96, key="aaa")
+        optbuf = bytes(key)
+        optbuf = optbuf.ljust(len(optbuf) + 256, b"\x00")
+        sock.setsockopt(socket.SOL_TCP, TCP_AUTHOPT_KEY, optbuf)
+        # the key was added and can be deleted normally
+        assert del_tcp_authopt_key(sock, key) == True
+        assert del_tcp_authopt_key(sock, key) == False
+
+
+def test_authopt_longer_baddata():
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+        opt = tcp_authopt()
+        optbuf = bytes(opt)
+        optbuf = optbuf.ljust(len(optbuf) + 256, b"\x5a")
+        with pytest.raises(OSError):
+            sock.setsockopt(socket.SOL_TCP, TCP_AUTHOPT, optbuf)
+
+
+def test_authopt_longer_zeros():
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+        opt = tcp_authopt()
+        optbuf = bytes(opt)
+        optbuf = optbuf.ljust(len(optbuf) + 256, b"\x00")
+        sock.setsockopt(socket.SOL_TCP, TCP_AUTHOPT, optbuf)
+
+
+def test_authopt_setdel_addrbind():
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+        key = tcp_authopt_key(addr="1.1.1.1", recv_id=1, send_id=1)
+        key2 = tcp_authopt_key(addr="1.1.1.2", recv_id=1, send_id=1)
+        set_tcp_authopt_key(sock, key)
+        assert del_tcp_authopt_key(sock, key2) == False
+        assert del_tcp_authopt_key(sock, key) == True
+        assert del_tcp_authopt_key(sock, key) == False
+
+
+def test_authopt_include_options():
+    key = tcp_authopt_key()
+    assert key.include_options
+    key.include_options = False
+    assert key.flags & TCP_AUTHOPT_KEY_FLAG.EXCLUDE_OPTS
+    assert not key.include_options
-- 
2.25.1


  parent reply	other threads:[~2021-11-01 16:35 UTC|newest]

Thread overview: 55+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-11-01 16:34 [PATCH v2] tcp: Initial support for RFC5925 auth option Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 01/25] tcp: authopt: Initial support and key management Leonard Crestez
2021-11-03  2:29   ` David Ahern
2021-11-05 12:10     ` Leonard Crestez
2021-11-05  1:22   ` Dmitry Safonov
2021-11-05  7:04     ` Leonard Crestez
2021-11-05 14:50       ` Dmitry Safonov
2021-11-05 18:00         ` Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 02/25] docs: Add user documentation for tcp_authopt Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 03/25] selftests: Initial tcp_authopt test module Leonard Crestez
2021-11-01 16:34 ` Leonard Crestez [this message]
2021-11-01 16:34 ` [PATCH v2 05/25] tcp: authopt: Add crypto initialization Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 06/25] tcp: authopt: Compute packet signatures Leonard Crestez
2021-11-05  1:53   ` Dmitry Safonov
2021-11-05  6:39     ` Leonard Crestez
2021-11-05  2:08   ` Dmitry Safonov
2021-11-05  6:09     ` Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 07/25] tcp: Use BIT() for OPTION_* constants Leonard Crestez
2021-11-03  2:31   ` David Ahern
2021-11-03 22:19     ` Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 08/25] tcp: authopt: Hook into tcp core Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 09/25] tcp: authopt: Disable via sysctl by default Leonard Crestez
2021-11-03  2:39   ` David Ahern
2021-11-05  8:50     ` Leonard Crestez
2021-11-05  1:46   ` Dmitry Safonov
2021-11-01 16:34 ` [PATCH v2 10/25] selftests: tcp_authopt: Test key address binding Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 11/25] tcp: authopt: Implement Sequence Number Extension Leonard Crestez
2021-11-01 19:22   ` Francesco Ruggeri
2021-11-02 10:03     ` Leonard Crestez
2021-11-02 19:21       ` Francesco Ruggeri
2021-11-03 22:01         ` Leonard Crestez
2021-11-01 20:54   ` Eric Dumazet
2021-11-02  9:50     ` Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 12/25] tcp: ipv6: Add AO signing for tcp_v6_send_response Leonard Crestez
2021-11-03  2:44   ` David Ahern
2021-11-03 22:09     ` Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 13/25] tcp: authopt: Add support for signing skb-less replies Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 14/25] tcp: ipv4: Add AO signing for " Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 15/25] selftests: tcp_authopt: Implement SNE in python Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 16/25] selftests: tcp_authopt: Add scapy-based packet signing code Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 17/25] selftests: tcp_authopt: Add packet-level tests Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 18/25] selftests: tcp_authopt: Initial sne test Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 19/25] tcp: authopt: Add key selection controls Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 20/25] selftests: tcp_authopt: Add tests for rollover Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 21/25] tcp: authopt: Add initial l3index support Leonard Crestez
2021-11-03  3:06   ` David Ahern
2021-11-05 12:26     ` Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 22/25] selftests: tcp_authopt: Initial tests for l3mdev handling Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 23/25] selftests: nettest: Rename md5_prefix to key_addr_prefix Leonard Crestez
2021-11-03  3:08   ` David Ahern
2021-11-01 16:34 ` [PATCH v2 24/25] selftests: nettest: Initial tcp_authopt support Leonard Crestez
2021-11-03  3:09   ` David Ahern
2021-11-01 16:35 ` [PATCH v2 25/25] selftests: net/fcnal: " Leonard Crestez
2021-11-03  3:18 ` [PATCH v2] tcp: Initial support for RFC5925 auth option David Ahern
2021-11-03 22:22   ` Leonard Crestez

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=f4f651740378974dad93bc97ced6d5cc97ffb373.1635784253.git.cdleonard@gmail.com \
    --to=cdleonard@gmail.com \
    --cc=0x7f454c46@gmail.com \
    --cc=colona@arista.com \
    --cc=cpaasch@apple.com \
    --cc=davem@davemloft.net \
    --cc=dsahern@kernel.org \
    --cc=edumazet@google.com \
    --cc=fruggeri@arista.com \
    --cc=herbert@gondor.apana.org.au \
    --cc=kuba@kernel.org \
    --cc=kuniyu@amazon.co.jp \
    --cc=linux-crypto@vger.kernel.org \
    --cc=linux-kernel@vger.kernel.org \
    --cc=linux-kselftest@vger.kernel.org \
    --cc=mathew.j.martineau@linux.intel.com \
    --cc=netdev@vger.kernel.org \
    --cc=priyarjha@google.com \
    --cc=shuah@kernel.org \
    --cc=ycheng@google.com \
    --cc=yoshfuji@linux-ipv6.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).