All of lore.kernel.org
 help / color / mirror / Atom feed
From: Leonard Crestez <cdleonard@gmail.com>
To: David Ahern <dsahern@kernel.org>, Shuah Khan <shuah@kernel.org>,
	Dmitry Safonov <0x7f454c46@gmail.com>,
	Eric Dumazet <edumazet@google.com>
Cc: "David S. Miller" <davem@davemloft.net>,
	Herbert Xu <herbert@gondor.apana.org.au>,
	Kuniyuki Iwashima <kuniyu@amazon.co.jp>,
	Hideaki YOSHIFUJI <yoshfuji@linux-ipv6.org>,
	Jakub Kicinski <kuba@kernel.org>,
	Yuchung Cheng <ycheng@google.com>,
	Francesco Ruggeri <fruggeri@arista.com>,
	Mat Martineau <mathew.j.martineau@linux.intel.com>,
	Christoph Paasch <cpaasch@apple.com>,
	Ivan Delalande <colona@arista.com>,
	Priyaranjan Jha <priyarjha@google.com>,
	netdev@vger.kernel.org, linux-crypto@vger.kernel.org,
	linux-kselftest@vger.kernel.org, linux-kernel@vger.kernel.org
Subject: [PATCH v2 04/25] selftests: tcp_authopt: Initial sockopt manipulation
Date: Mon,  1 Nov 2021 18:34:39 +0200	[thread overview]
Message-ID: <f4f651740378974dad93bc97ced6d5cc97ffb373.1635784253.git.cdleonard@gmail.com> (raw)
In-Reply-To: <cover.1635784253.git.cdleonard@gmail.com>

Add a python translation of the linux ABI for tcpao and test the
behavior of TCP_AUTHOPT and TCP_AUTHOPT_KEY sockopts.

This includes several corner cases not normally covered by traffic
tests.

Signed-off-by: Leonard Crestez <cdleonard@gmail.com>
---
 .../tcp_authopt/tcp_authopt_test/conftest.py  |  71 +++++
 .../tcp_authopt_test/linux_tcp_authopt.py     | 266 ++++++++++++++++++
 .../tcp_authopt/tcp_authopt_test/sockaddr.py  | 122 ++++++++
 .../tcp_authopt_test/test_sockopt.py          | 203 +++++++++++++
 4 files changed, 662 insertions(+)
 create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/conftest.py
 create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/linux_tcp_authopt.py
 create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/sockaddr.py
 create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_sockopt.py

diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/conftest.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/conftest.py
new file mode 100644
index 000000000000..a06ba848669d
--- /dev/null
+++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/conftest.py
@@ -0,0 +1,71 @@
+# SPDX-License-Identifier: GPL-2.0
+import logging
+import os
+from contextlib import ExitStack, nullcontext
+from typing import ContextManager
+
+import pytest
+
+from .linux_tcp_authopt import enable_sysctl_tcp_authopt, has_tcp_authopt
+
+logger = logging.getLogger(__name__)
+
+skipif_missing_tcp_authopt = pytest.mark.skipif(
+    not has_tcp_authopt(), reason="Need CONFIG_TCP_AUTHOPT"
+)
+
+
+def get_effective_capabilities():
+    for line in open("/proc/self/status", "r"):
+        if line.startswith("CapEff:"):
+            return int(line.split(":")[1], 16)
+
+
+def has_effective_capability(bit) -> bool:
+    return get_effective_capabilities() & (1 << bit) != 0
+
+
+def can_capture() -> bool:
+    return has_effective_capability(13)
+
+
+def raise_skip_no_netns():
+    if not has_effective_capability(12):
+        pytest.skip("Need CAP_NET_ADMIN for network namespaces")
+
+
+skipif_cant_capture = pytest.mark.skipif(
+    not can_capture(), reason="run as root to capture packets"
+)
+
+
+@pytest.fixture
+def exit_stack():
+    """Return a contextlib.ExitStack as a pytest fixture
+
+    This reduces indentation making code more readable
+    """
+    with ExitStack() as exit_stack:
+        yield exit_stack
+
+
+def pytest_configure():
+    # Silence messages regarding netns enter/exit:
+    logging.getLogger("nsenter").setLevel(logging.INFO)
+    if has_tcp_authopt():
+        enable_sysctl_tcp_authopt()
+
+
+def parametrize_product(**kw):
+    """Parametrize each key to each item in the value list"""
+    import itertools
+
+    return pytest.mark.parametrize(",".join(kw.keys()), itertools.product(*kw.values()))
+
+
+def raises_optional_exception(expected_exception, **kw) -> ContextManager:
+    """Like pytest.raises except accept expected_exception=None"""
+    if expected_exception is None:
+        return nullcontext()
+    else:
+        return pytest.raises(expected_exception, **kw)
diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/linux_tcp_authopt.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/linux_tcp_authopt.py
new file mode 100644
index 000000000000..b9dc9decda07
--- /dev/null
+++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/linux_tcp_authopt.py
@@ -0,0 +1,266 @@
+# SPDX-License-Identifier: GPL-2.0
+"""Python wrapper around linux TCP_AUTHOPT ABI"""
+
+import errno
+import logging
+import socket
+import struct
+import typing
+from dataclasses import dataclass
+from enum import IntEnum, IntFlag
+
+from .sockaddr import (
+    SockaddrConvertType,
+    sockaddr_base,
+    sockaddr_convert,
+    sockaddr_storage,
+    sockaddr_unpack,
+)
+
+logger = logging.getLogger(__name__)
+
+
+def BIT(x):
+    return 1 << x
+
+
+TCP_AUTHOPT = 38
+TCP_AUTHOPT_KEY = 39
+
+TCP_AUTHOPT_MAXKEYLEN = 80
+
+
+class TCP_AUTHOPT_FLAG(IntFlag):
+    REJECT_UNEXPECTED = BIT(2)
+
+
+class TCP_AUTHOPT_KEY_FLAG(IntFlag):
+    DEL = BIT(0)
+    EXCLUDE_OPTS = BIT(1)
+    BIND_ADDR = BIT(2)
+
+
+class TCP_AUTHOPT_ALG(IntEnum):
+    HMAC_SHA_1_96 = 1
+    AES_128_CMAC_96 = 2
+
+
+@dataclass
+class tcp_authopt:
+    """Like linux struct tcp_authopt"""
+
+    flags: int = 0
+    sizeof = 4
+
+    def pack(self) -> bytes:
+        return struct.pack(
+            "I",
+            self.flags,
+        )
+
+    def __bytes__(self):
+        return self.pack()
+
+    @classmethod
+    def unpack(cls, b: bytes):
+        tup = struct.unpack("I", b)
+        return cls(*tup)
+
+
+def set_tcp_authopt(sock, opt: tcp_authopt):
+    return sock.setsockopt(socket.SOL_TCP, TCP_AUTHOPT, bytes(opt))
+
+
+def get_tcp_authopt(sock: socket.socket) -> tcp_authopt:
+    b = sock.getsockopt(socket.SOL_TCP, TCP_AUTHOPT, tcp_authopt.sizeof)
+    return tcp_authopt.unpack(b)
+
+
+class tcp_authopt_key:
+    """Like linux struct tcp_authopt_key
+
+    :ivar auto_flags: If true(default) then set "binding" flags based on non-null values attributes.
+    """
+
+    KeyArgType = typing.Union[str, bytes]
+    AddrArgType = typing.Union[None, str, bytes, SockaddrConvertType]
+
+    def __init__(
+        self,
+        flags: TCP_AUTHOPT_KEY_FLAG = TCP_AUTHOPT_KEY_FLAG(0),
+        send_id: int = 0,
+        recv_id: int = 0,
+        alg=TCP_AUTHOPT_ALG.HMAC_SHA_1_96,
+        key: KeyArgType = b"",
+        addr: AddrArgType = None,
+        auto_flags: bool = True,
+        include_options=None,
+    ):
+        self.flags = flags
+        self.send_id = send_id
+        self.recv_id = recv_id
+        self.alg = alg
+        self.key = key
+        self.addr = addr
+        self.auto_flags = auto_flags
+        if include_options is not None:
+            self.include_options = include_options
+
+    def get_real_flags(self) -> TCP_AUTHOPT_KEY_FLAG:
+        result = self.flags
+        if self.auto_flags:
+            if self.addr is not None:
+                result |= TCP_AUTHOPT_KEY_FLAG.BIND_ADDR
+            else:
+                result &= ~TCP_AUTHOPT_KEY_FLAG.BIND_ADDR
+        return result
+
+    def pack(self):
+        if len(self.key) > TCP_AUTHOPT_MAXKEYLEN:
+            raise ValueError(f"Max key length is {TCP_AUTHOPT_MAXKEYLEN}")
+        data = struct.pack(
+            "IBBBB80s",
+            self.get_real_flags(),
+            self.send_id,
+            self.recv_id,
+            self.alg,
+            len(self.key),
+            self.key,
+        )
+        data += bytes(self.addrbuf.ljust(sockaddr_storage.sizeof, b"\x00"))
+        return data
+
+    def __bytes__(self):
+        return self.pack()
+
+    @property
+    def key(self) -> KeyArgType:
+        return self._key
+
+    @key.setter
+    def key(self, val: KeyArgType) -> bytes:
+        if isinstance(val, str):
+            val = val.encode("utf-8")
+        if len(val) > TCP_AUTHOPT_MAXKEYLEN:
+            raise ValueError(f"Max key length is {TCP_AUTHOPT_MAXKEYLEN}")
+        self._key = val
+        return val
+
+    @property
+    def addr(self):
+        if not self.addrbuf:
+            return None
+        else:
+            return sockaddr_unpack(bytes(self.addrbuf))
+
+    @addr.setter
+    def addr(self, val: AddrArgType):
+        if isinstance(val, bytes):
+            if len(val) > sockaddr_storage.sizeof:
+                raise ValueError(f"Must be up to {sockaddr_storage.sizeof}")
+            self.addrbuf = val
+        elif val is None:
+            self.addrbuf = b""
+        elif isinstance(val, sockaddr_base):
+            self.addr = bytes(val)
+        else:
+            self.addr = sockaddr_convert(val)
+        return self.addr
+
+    @property
+    def include_options(self) -> bool:
+        return not self.flags & TCP_AUTHOPT_KEY_FLAG.EXCLUDE_OPTS
+
+    @include_options.setter
+    def include_options(self, value) -> bool:
+        if value:
+            self.flags &= ~TCP_AUTHOPT_KEY_FLAG.EXCLUDE_OPTS
+        else:
+            self.flags |= TCP_AUTHOPT_KEY_FLAG.EXCLUDE_OPTS
+        return value
+
+    @property
+    def delete_flag(self) -> bool:
+        return bool(self.flags & TCP_AUTHOPT_KEY_FLAG.DEL)
+
+    @delete_flag.setter
+    def delete_flag(self, value) -> bool:
+        if value:
+            self.flags |= TCP_AUTHOPT_KEY_FLAG.DEL
+        else:
+            self.flags &= ~TCP_AUTHOPT_KEY_FLAG.DEL
+        return value
+
+
+def set_tcp_authopt_key(sock, keyopt: tcp_authopt_key):
+    return sock.setsockopt(socket.SOL_TCP, TCP_AUTHOPT_KEY, bytes(keyopt))
+
+
+def set_tcp_authopt_key_kwargs(sock, keyopt: tcp_authopt_key = None, **kw):
+    if keyopt is None:
+        keyopt = tcp_authopt_key()
+    for k, v in kw.items():
+        setattr(keyopt, k, v)
+    return set_tcp_authopt_key(sock, keyopt)
+
+
+def del_tcp_authopt_key(sock, key: tcp_authopt_key) -> bool:
+    """Try to delete an authopt key
+
+    :return: True if a key was deleted, False if it was not present
+    """
+    import copy
+
+    key = copy.copy(key)
+    key.delete_flag = True
+    try:
+        sock.setsockopt(socket.SOL_TCP, TCP_AUTHOPT_KEY, bytes(key))
+        return True
+    except OSError as e:
+        if e.errno == errno.ENOENT:
+            return False
+        raise
+
+
+def get_sysctl_tcp_authopt() -> typing.Optional[bool]:
+    from pathlib import Path
+
+    path = Path("/proc/sys/net/ipv4/tcp_authopt")
+    if path.exists():
+        return path.read_text().strip() != "0"
+    else:
+        return None
+
+
+def enable_sysctl_tcp_authopt():
+    from pathlib import Path
+
+    path = Path("/proc/sys/net/ipv4/tcp_authopt")
+    # Do nothing if absent
+    if not path.exists():
+        return
+    try:
+        if path.read_text().strip() == "0":
+            path.write_text("1")
+    except:
+        raise Exception("Failed to enable /proc/sys/net/ipv4/tcp_authopt")
+
+
+def has_tcp_authopt() -> bool:
+    """Check is TCP_AUTHOPT is implemented by the OS
+
+    Returns True if implemented but disabled by sysctl
+    Returns False if disabled at compile time
+    """
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+        try:
+            optbuf = bytes(4)
+            sock.setsockopt(socket.SOL_TCP, TCP_AUTHOPT, optbuf)
+            return True
+        except OSError as e:
+            if e.errno == errno.ENOPROTOOPT:
+                return False
+            elif e.errno == errno.EPERM and get_sysctl_tcp_authopt() is False:
+                return True
+            else:
+                raise
diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/sockaddr.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/sockaddr.py
new file mode 100644
index 000000000000..3ad22c0b4015
--- /dev/null
+++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/sockaddr.py
@@ -0,0 +1,122 @@
+# SPDX-License-Identifier: GPL-2.0
+"""pack/unpack wrappers for sockaddr"""
+import socket
+import struct
+import typing
+from dataclasses import dataclass
+from ipaddress import IPv4Address, IPv6Address, ip_address
+
+
+class sockaddr_base:
+    def pack(self) -> bytes:
+        raise NotImplementedError()
+
+    def __bytes__(self):
+        return self.pack()
+
+
+class sockaddr_in(sockaddr_base):
+    port: int
+    addr: IPv4Address
+    sizeof = 8
+
+    def __init__(self, port=0, addr=None):
+        self.port = port
+        if addr is None:
+            addr = IPv4Address(0)
+        self.addr = IPv4Address(addr)
+
+    def pack(self):
+        return struct.pack("HH4s", socket.AF_INET, self.port, self.addr.packed)
+
+    @classmethod
+    def unpack(cls, buffer):
+        family, port, addr_packed = struct.unpack("HH4s", buffer[:8])
+        if family != socket.AF_INET:
+            raise ValueError(f"Must be AF_INET not {family}")
+        return cls(port, addr_packed)
+
+
+@dataclass
+class sockaddr_in6(sockaddr_base):
+    """Like sockaddr_in6 but for python. Always contains scope_id"""
+
+    port: int
+    addr: IPv6Address
+    flowinfo: int
+    scope_id: int
+    sizeof = 28
+
+    def __init__(self, port=0, addr=None, flowinfo=0, scope_id=0):
+        self.port = port
+        if addr is None:
+            addr = IPv6Address(0)
+        self.addr = IPv6Address(addr)
+        self.flowinfo = flowinfo
+        self.scope_id = scope_id
+
+    def pack(self):
+        return struct.pack(
+            "HHI16sI",
+            socket.AF_INET6,
+            self.port,
+            self.flowinfo,
+            self.addr.packed,
+            self.scope_id,
+        )
+
+    @classmethod
+    def unpack(cls, buffer):
+        family, port, flowinfo, addr_packed, scope_id = struct.unpack(
+            "HHI16sI", buffer[:28]
+        )
+        if family != socket.AF_INET6:
+            raise ValueError(f"Must be AF_INET6 not {family}")
+        return cls(port, addr_packed, flowinfo=flowinfo, scope_id=scope_id)
+
+
+@dataclass
+class sockaddr_storage(sockaddr_base):
+    family: int
+    data: bytes
+    sizeof = 128
+
+    def pack(self):
+        return struct.pack("H126s", self.family, self.data)
+
+    @classmethod
+    def unpack(cls, buffer):
+        return cls(*struct.unpack("H126s", buffer))
+
+
+def sockaddr_unpack(buffer: bytes):
+    """Unpack based on family"""
+    family = struct.unpack("H", buffer[:2])[0]
+    if family == socket.AF_INET:
+        return sockaddr_in.unpack(buffer)
+    elif family == socket.AF_INET6:
+        return sockaddr_in6.unpack(buffer)
+    else:
+        return sockaddr_storage.unpack(buffer)
+
+
+SockaddrConvertType = typing.Union[
+    sockaddr_in, sockaddr_in6, sockaddr_storage, IPv4Address, IPv6Address, str
+]
+
+
+def sockaddr_convert(val: SockaddrConvertType) -> sockaddr_base:
+    """Try to convert address into some sort of sockaddr"""
+    if (
+        isinstance(val, sockaddr_in)
+        or isinstance(val, sockaddr_in6)
+        or isinstance(val, sockaddr_storage)
+    ):
+        return val
+    if isinstance(val, IPv4Address):
+        return sockaddr_in(addr=val)
+    if isinstance(val, IPv6Address):
+        return sockaddr_in6(addr=val)
+    if isinstance(val, str):
+        return sockaddr_convert(ip_address(val))
+    raise TypeError(f"Don't know how to convert {val!r} to sockaddr")
diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_sockopt.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_sockopt.py
new file mode 100644
index 000000000000..41ebde8b1b7c
--- /dev/null
+++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_sockopt.py
@@ -0,0 +1,203 @@
+# SPDX-License-Identifier: GPL-2.0
+"""Test TCP_AUTHOPT sockopt API"""
+import errno
+import socket
+import struct
+from ipaddress import IPv4Address, IPv6Address
+
+import pytest
+
+from .conftest import skipif_missing_tcp_authopt
+from .linux_tcp_authopt import (
+    TCP_AUTHOPT,
+    TCP_AUTHOPT_ALG,
+    TCP_AUTHOPT_FLAG,
+    TCP_AUTHOPT_KEY,
+    TCP_AUTHOPT_KEY_FLAG,
+    del_tcp_authopt_key,
+    get_tcp_authopt,
+    set_tcp_authopt,
+    set_tcp_authopt_key,
+    tcp_authopt,
+    tcp_authopt_key,
+)
+from .sockaddr import sockaddr_in, sockaddr_in6, sockaddr_unpack
+
+pytestmark = skipif_missing_tcp_authopt
+
+
+def test_authopt_key_pack_noaddr():
+    b = bytes(tcp_authopt_key(key=b"a\x00b"))
+    assert b[7] == 3
+    assert b[8:13] == b"a\x00b\x00\x00"
+
+
+def test_authopt_key_pack_addr():
+    b = bytes(tcp_authopt_key(key=b"a\x00b", addr="10.0.0.1"))
+    assert struct.unpack("H", b[88:90])[0] == socket.AF_INET
+    assert sockaddr_unpack(b[88:]).addr == IPv4Address("10.0.0.1")
+
+
+def test_authopt_key_pack_addr6():
+    b = bytes(tcp_authopt_key(key=b"abc", addr="fd00::1"))
+    assert struct.unpack("H", b[88:90])[0] == socket.AF_INET6
+    assert sockaddr_unpack(b[88:]).addr == IPv6Address("fd00::1")
+
+
+def test_tcp_authopt_key_del_without_active(exit_stack):
+    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    exit_stack.push(sock)
+
+    # nothing happens:
+    key = tcp_authopt_key()
+    assert key.delete_flag is False
+    key.delete_flag = True
+    assert key.delete_flag is True
+    with pytest.raises(OSError) as e:
+        set_tcp_authopt_key(sock, key)
+    assert e.value.errno in [errno.EINVAL, errno.ENOENT]
+
+
+def test_tcp_authopt_key_setdel(exit_stack):
+    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    exit_stack.push(sock)
+    set_tcp_authopt(sock, tcp_authopt())
+
+    # delete returns ENOENT
+    key = tcp_authopt_key()
+    key.delete_flag = True
+    with pytest.raises(OSError) as e:
+        set_tcp_authopt_key(sock, key)
+    assert e.value.errno == errno.ENOENT
+
+    key = tcp_authopt_key(send_id=1, recv_id=2)
+    set_tcp_authopt_key(sock, key)
+    # First delete works fine:
+    key.delete_flag = True
+    set_tcp_authopt_key(sock, key)
+    # Duplicate delete returns ENOENT
+    with pytest.raises(OSError) as e:
+        set_tcp_authopt_key(sock, key)
+    assert e.value.errno == errno.ENOENT
+
+
+def test_get_tcp_authopt():
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+        with pytest.raises(OSError) as e:
+            sock.getsockopt(socket.SOL_TCP, TCP_AUTHOPT, 4)
+        assert e.value.errno == errno.ENOENT
+
+
+def test_set_get_tcp_authopt_flags():
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+        # No flags by default
+        set_tcp_authopt(sock, tcp_authopt())
+        opt = get_tcp_authopt(sock)
+        assert opt.flags == 0
+
+        # simple flags are echoed
+        goodflag = TCP_AUTHOPT_FLAG.REJECT_UNEXPECTED
+        set_tcp_authopt(sock, tcp_authopt(flags=goodflag))
+        opt = get_tcp_authopt(sock)
+        assert opt.flags == goodflag
+
+        # attempting to set a badflag returns an error and has no effect
+        badflag = 1 << 27
+        with pytest.raises(OSError) as e:
+            set_tcp_authopt(sock, tcp_authopt(flags=badflag))
+        opt = get_tcp_authopt(sock)
+        assert opt.flags == goodflag
+
+
+def test_set_ipv6_key_on_ipv4():
+    """Binding a key to an ipv6 address on an ipv4 socket makes no sense"""
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+        key = tcp_authopt_key("abc")
+        key.flags = TCP_AUTHOPT_KEY_FLAG.BIND_ADDR
+        key.addr = IPv6Address("::1234")
+        with pytest.raises(OSError):
+            set_tcp_authopt_key(sock, key)
+
+
+def test_set_ipv4_key_on_ipv6():
+    """This could be implemented for ipv6-mapped-ipv4 but it is not
+
+    TCP_MD5SIG has a similar limitation
+    """
+    with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock:
+        key = tcp_authopt_key("abc")
+        key.flags = TCP_AUTHOPT_KEY_FLAG.BIND_ADDR
+        key.addr = IPv4Address("1.2.3.4")
+        with pytest.raises(OSError):
+            set_tcp_authopt_key(sock, key)
+
+
+def test_authopt_key_badflags():
+    """Don't pretend to handle unknown flags"""
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+        with pytest.raises(OSError):
+            set_tcp_authopt_key(sock, tcp_authopt_key(flags=0xABCDEF))
+
+
+def test_authopt_key_longer_bad():
+    """Test that pass a longer sockopt with unknown data fails
+
+    Old kernels won't pretend to handle features they don't know about
+    """
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+        key = tcp_authopt_key(alg=TCP_AUTHOPT_ALG.HMAC_SHA_1_96, key="aaa")
+        optbuf = bytes(key)
+        optbuf = optbuf.ljust(len(optbuf) + 256, b"\x5a")
+        with pytest.raises(OSError):
+            sock.setsockopt(socket.SOL_TCP, TCP_AUTHOPT_KEY, optbuf)
+
+
+def test_authopt_key_longer_zeros():
+    """Test that passing a longer sockopt padded with zeros works
+
+    This ensures applications using a larger struct tcp_authopt_key won't have
+    to pass a shorter optlen on old kernels.
+    """
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+        key = tcp_authopt_key(alg=TCP_AUTHOPT_ALG.HMAC_SHA_1_96, key="aaa")
+        optbuf = bytes(key)
+        optbuf = optbuf.ljust(len(optbuf) + 256, b"\x00")
+        sock.setsockopt(socket.SOL_TCP, TCP_AUTHOPT_KEY, optbuf)
+        # the key was added and can be deleted normally
+        assert del_tcp_authopt_key(sock, key) == True
+        assert del_tcp_authopt_key(sock, key) == False
+
+
+def test_authopt_longer_baddata():
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+        opt = tcp_authopt()
+        optbuf = bytes(opt)
+        optbuf = optbuf.ljust(len(optbuf) + 256, b"\x5a")
+        with pytest.raises(OSError):
+            sock.setsockopt(socket.SOL_TCP, TCP_AUTHOPT, optbuf)
+
+
+def test_authopt_longer_zeros():
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+        opt = tcp_authopt()
+        optbuf = bytes(opt)
+        optbuf = optbuf.ljust(len(optbuf) + 256, b"\x00")
+        sock.setsockopt(socket.SOL_TCP, TCP_AUTHOPT, optbuf)
+
+
+def test_authopt_setdel_addrbind():
+    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+        key = tcp_authopt_key(addr="1.1.1.1", recv_id=1, send_id=1)
+        key2 = tcp_authopt_key(addr="1.1.1.2", recv_id=1, send_id=1)
+        set_tcp_authopt_key(sock, key)
+        assert del_tcp_authopt_key(sock, key2) == False
+        assert del_tcp_authopt_key(sock, key) == True
+        assert del_tcp_authopt_key(sock, key) == False
+
+
+def test_authopt_include_options():
+    key = tcp_authopt_key()
+    assert key.include_options
+    key.include_options = False
+    assert key.flags & TCP_AUTHOPT_KEY_FLAG.EXCLUDE_OPTS
+    assert not key.include_options
-- 
2.25.1


  parent reply	other threads:[~2021-11-01 16:35 UTC|newest]

Thread overview: 55+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-11-01 16:34 [PATCH v2] tcp: Initial support for RFC5925 auth option Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 01/25] tcp: authopt: Initial support and key management Leonard Crestez
2021-11-03  2:29   ` David Ahern
2021-11-05 12:10     ` Leonard Crestez
2021-11-05  1:22   ` Dmitry Safonov
2021-11-05  7:04     ` Leonard Crestez
2021-11-05 14:50       ` Dmitry Safonov
2021-11-05 18:00         ` Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 02/25] docs: Add user documentation for tcp_authopt Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 03/25] selftests: Initial tcp_authopt test module Leonard Crestez
2021-11-01 16:34 ` Leonard Crestez [this message]
2021-11-01 16:34 ` [PATCH v2 05/25] tcp: authopt: Add crypto initialization Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 06/25] tcp: authopt: Compute packet signatures Leonard Crestez
2021-11-05  1:53   ` Dmitry Safonov
2021-11-05  6:39     ` Leonard Crestez
2021-11-05  2:08   ` Dmitry Safonov
2021-11-05  6:09     ` Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 07/25] tcp: Use BIT() for OPTION_* constants Leonard Crestez
2021-11-03  2:31   ` David Ahern
2021-11-03 22:19     ` Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 08/25] tcp: authopt: Hook into tcp core Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 09/25] tcp: authopt: Disable via sysctl by default Leonard Crestez
2021-11-03  2:39   ` David Ahern
2021-11-05  8:50     ` Leonard Crestez
2021-11-05  1:46   ` Dmitry Safonov
2021-11-01 16:34 ` [PATCH v2 10/25] selftests: tcp_authopt: Test key address binding Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 11/25] tcp: authopt: Implement Sequence Number Extension Leonard Crestez
2021-11-01 19:22   ` Francesco Ruggeri
2021-11-02 10:03     ` Leonard Crestez
2021-11-02 19:21       ` Francesco Ruggeri
2021-11-03 22:01         ` Leonard Crestez
2021-11-01 20:54   ` Eric Dumazet
2021-11-02  9:50     ` Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 12/25] tcp: ipv6: Add AO signing for tcp_v6_send_response Leonard Crestez
2021-11-03  2:44   ` David Ahern
2021-11-03 22:09     ` Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 13/25] tcp: authopt: Add support for signing skb-less replies Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 14/25] tcp: ipv4: Add AO signing for " Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 15/25] selftests: tcp_authopt: Implement SNE in python Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 16/25] selftests: tcp_authopt: Add scapy-based packet signing code Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 17/25] selftests: tcp_authopt: Add packet-level tests Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 18/25] selftests: tcp_authopt: Initial sne test Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 19/25] tcp: authopt: Add key selection controls Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 20/25] selftests: tcp_authopt: Add tests for rollover Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 21/25] tcp: authopt: Add initial l3index support Leonard Crestez
2021-11-03  3:06   ` David Ahern
2021-11-05 12:26     ` Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 22/25] selftests: tcp_authopt: Initial tests for l3mdev handling Leonard Crestez
2021-11-01 16:34 ` [PATCH v2 23/25] selftests: nettest: Rename md5_prefix to key_addr_prefix Leonard Crestez
2021-11-03  3:08   ` David Ahern
2021-11-01 16:34 ` [PATCH v2 24/25] selftests: nettest: Initial tcp_authopt support Leonard Crestez
2021-11-03  3:09   ` David Ahern
2021-11-01 16:35 ` [PATCH v2 25/25] selftests: net/fcnal: " Leonard Crestez
2021-11-03  3:18 ` [PATCH v2] tcp: Initial support for RFC5925 auth option David Ahern
2021-11-03 22:22   ` Leonard Crestez

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=f4f651740378974dad93bc97ced6d5cc97ffb373.1635784253.git.cdleonard@gmail.com \
    --to=cdleonard@gmail.com \
    --cc=0x7f454c46@gmail.com \
    --cc=colona@arista.com \
    --cc=cpaasch@apple.com \
    --cc=davem@davemloft.net \
    --cc=dsahern@kernel.org \
    --cc=edumazet@google.com \
    --cc=fruggeri@arista.com \
    --cc=herbert@gondor.apana.org.au \
    --cc=kuba@kernel.org \
    --cc=kuniyu@amazon.co.jp \
    --cc=linux-crypto@vger.kernel.org \
    --cc=linux-kernel@vger.kernel.org \
    --cc=linux-kselftest@vger.kernel.org \
    --cc=mathew.j.martineau@linux.intel.com \
    --cc=netdev@vger.kernel.org \
    --cc=priyarjha@google.com \
    --cc=shuah@kernel.org \
    --cc=ycheng@google.com \
    --cc=yoshfuji@linux-ipv6.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.