晋太元中,武陵人捕鱼为业。缘溪行,忘路之远近。忽逢桃花林,夹岸数百步,中无杂树,芳草鲜美,落英缤纷。渔人甚异之,复前行,欲穷其林。 林尽水源,便得一山,山有小口,仿佛若有光。便舍船,从口入。初极狭,才通人。复行数十步,豁然开朗。土地平旷,屋舍俨然,有良田、美池、桑竹之属。阡陌交通,鸡犬相闻。其中往来种作,男女衣着,悉如外人。黄发垂髫,并怡然自乐。 见渔人,乃大惊,问所从来。具答之。便要还家,设酒杀鸡作食。村中闻有此人,咸来问讯。自云先世避秦时乱,率妻子邑人来此绝境,不复出焉,遂与外人间隔。问今是何世,乃不知有汉,无论魏晋。此人一一为具言所闻,皆叹惋。余人各复延至其家,皆出酒食。停数日,辞去。此中人语云:“不足为外人道也。”(间隔 一作:隔绝) 既出,得其船,便扶向路,处处志之。及郡下,诣太守,说如此。太守即遣人随其往,寻向所志,遂迷,不复得路。 南阳刘子骥,高尚士也,闻之,欣然规往。未果,寻病终。后遂无问津者。
|
Server : Apache System : Linux srv.rainic.com 4.18.0-553.47.1.el8_10.x86_64 #1 SMP Wed Apr 2 05:45:37 EDT 2025 x86_64 User : rainic ( 1014) PHP Version : 7.4.33 Disable Function : exec,passthru,shell_exec,system Directory : /lib64/python3.6/site-packages/systemd/test/ |
Upload File : |
import sys
import os
import posix
import socket
import contextlib
import errno
from systemd.daemon import (booted,
is_fifo, _is_fifo,
is_socket, _is_socket,
is_socket_inet, _is_socket_inet,
is_socket_unix, _is_socket_unix,
is_socket_sockaddr, _is_socket_sockaddr,
is_mq, _is_mq,
listen_fds,
notify)
import pytest
@contextlib.contextmanager
def skip_enosys():
try:
yield
except OSError as e:
if e.errno == errno.ENOSYS:
pytest.skip()
raise
@contextlib.contextmanager
def closing_socketpair(family):
pair = socket.socketpair(family)
try:
yield pair
finally:
pair[0].close()
pair[1].close()
def test_booted():
if os.path.exists('/run/systemd/system'):
# assume we are running under systemd
assert booted()
else:
# don't assume anything
assert booted() in {False, True}
def test__is_fifo(tmpdir):
path = tmpdir.join('test.fifo').strpath
posix.mkfifo(path)
fd = os.open(path, os.O_RDONLY|os.O_NONBLOCK)
assert _is_fifo(fd, None)
assert _is_fifo(fd, path)
def test__is_fifo_file(tmpdir):
file = tmpdir.join('test.fifo')
file.write('boo')
path = file.strpath
fd = os.open(path, os.O_RDONLY|os.O_NONBLOCK)
assert not _is_fifo(fd, None)
assert not _is_fifo(fd, path)
def test__is_fifo_bad_fd(tmpdir):
path = tmpdir.join('test.fifo').strpath
with pytest.raises(OSError):
assert not _is_fifo(-1, None)
with pytest.raises(OSError):
assert not _is_fifo(-1, path)
def test_is_fifo(tmpdir):
path = tmpdir.join('test.fifo').strpath
posix.mkfifo(path)
fd = os.open(path, os.O_RDONLY|os.O_NONBLOCK)
file = os.fdopen(fd, 'r')
assert is_fifo(file, None)
assert is_fifo(file, path)
assert is_fifo(fd, None)
assert is_fifo(fd, path)
def test_is_fifo_file(tmpdir):
file = tmpdir.join('test.fifo')
file.write('boo')
path = file.strpath
fd = os.open(path, os.O_RDONLY|os.O_NONBLOCK)
file = os.fdopen(fd, 'r')
assert not is_fifo(file, None)
assert not is_fifo(file, path)
assert not is_fifo(fd, None)
assert not is_fifo(fd, path)
def test_is_fifo_bad_fd(tmpdir):
path = tmpdir.join('test.fifo').strpath
with pytest.raises(OSError):
assert not is_fifo(-1, None)
with pytest.raises(OSError):
assert not is_fifo(-1, path)
def is_mq_wrapper(arg):
try:
return is_mq(arg)
except OSError as error:
# systemd < 227 compatibility
assert error.errno == errno.EBADF
return False
def _is_mq_wrapper(arg):
try:
return _is_mq(arg)
except OSError as error:
# systemd < 227 compatibility
assert error.errno == errno.EBADF
return False
def test_no_mismatch():
with closing_socketpair(socket.AF_UNIX) as pair:
for sock in pair:
assert not is_fifo(sock)
assert not is_mq_wrapper(sock)
assert not is_socket_inet(sock)
with skip_enosys():
assert not is_socket_sockaddr(sock, '127.0.0.1:2000')
fd = sock.fileno()
assert not is_fifo(fd)
assert not is_mq_wrapper(fd)
assert not is_socket_inet(fd)
with skip_enosys():
assert not is_socket_sockaddr(fd, '127.0.0.1:2000')
assert not _is_fifo(fd)
assert not _is_mq_wrapper(fd)
assert not _is_socket_inet(fd)
with skip_enosys():
assert not _is_socket_sockaddr(fd, '127.0.0.1:2000')
def test_is_socket():
with closing_socketpair(socket.AF_UNIX) as pair:
for sock in pair:
for arg in (sock, sock.fileno()):
assert is_socket(arg)
assert is_socket(arg, socket.AF_UNIX)
assert not is_socket(arg, socket.AF_INET)
assert is_socket(arg, socket.AF_UNIX, socket.SOCK_STREAM)
assert not is_socket(arg, socket.AF_INET, socket.SOCK_DGRAM)
with skip_enosys():
assert not is_socket_sockaddr(arg, '8.8.8.8:2000', socket.SOCK_DGRAM, 0, 0)
assert _is_socket(arg)
assert _is_socket(arg, socket.AF_UNIX)
assert not _is_socket(arg, socket.AF_INET)
assert _is_socket(arg, socket.AF_UNIX, socket.SOCK_STREAM)
assert not _is_socket(arg, socket.AF_INET, socket.SOCK_DGRAM)
with skip_enosys():
assert not _is_socket_sockaddr(arg, '8.8.8.8:2000', socket.SOCK_DGRAM, 0, 0)
def test_is_socket_sockaddr():
with contextlib.closing(socket.socket(socket.AF_INET)) as sock:
sock.bind(('127.0.0.1', 0))
addr, port = sock.getsockname()
port = ':{}'.format(port)
for listening in (0, 1):
for arg in (sock, sock.fileno()):
with skip_enosys():
assert is_socket_sockaddr(arg, '127.0.0.1', socket.SOCK_STREAM)
with skip_enosys():
assert is_socket_sockaddr(arg, '127.0.0.1' + port, socket.SOCK_STREAM)
with skip_enosys():
assert is_socket_sockaddr(arg, '127.0.0.1' + port, listening=listening)
with skip_enosys():
assert is_socket_sockaddr(arg, '127.0.0.1' + port, listening=-1)
with skip_enosys():
assert not is_socket_sockaddr(arg, '127.0.0.1' + port, listening=not listening)
with pytest.raises(ValueError):
is_socket_sockaddr(arg, '127.0.0.1', flowinfo=123456)
with skip_enosys():
assert not is_socket_sockaddr(arg, '129.168.11.11:23', socket.SOCK_STREAM)
with skip_enosys():
assert not is_socket_sockaddr(arg, '127.0.0.1', socket.SOCK_DGRAM)
with pytest.raises(ValueError):
_is_socket_sockaddr(arg, '127.0.0.1', 0, 123456)
with skip_enosys():
assert not _is_socket_sockaddr(arg, '129.168.11.11:23', socket.SOCK_STREAM)
with skip_enosys():
assert not _is_socket_sockaddr(arg, '127.0.0.1', socket.SOCK_DGRAM)
sock.listen(11)
def test__is_socket():
with closing_socketpair(socket.AF_UNIX) as pair:
for sock in pair:
fd = sock.fileno()
assert _is_socket(fd)
assert _is_socket(fd, socket.AF_UNIX)
assert not _is_socket(fd, socket.AF_INET)
assert _is_socket(fd, socket.AF_UNIX, socket.SOCK_STREAM)
assert not _is_socket(fd, socket.AF_INET, socket.SOCK_DGRAM)
assert _is_socket(fd)
assert _is_socket(fd, socket.AF_UNIX)
assert not _is_socket(fd, socket.AF_INET)
assert _is_socket(fd, socket.AF_UNIX, socket.SOCK_STREAM)
assert not _is_socket(fd, socket.AF_INET, socket.SOCK_DGRAM)
def test_is_socket_unix():
with closing_socketpair(socket.AF_UNIX) as pair:
for sock in pair:
for arg in (sock, sock.fileno()):
assert is_socket_unix(arg)
assert not is_socket_unix(arg, path="/no/such/path")
assert is_socket_unix(arg, socket.SOCK_STREAM)
assert not is_socket_unix(arg, socket.SOCK_DGRAM)
def test__is_socket_unix():
with closing_socketpair(socket.AF_UNIX) as pair:
for sock in pair:
fd = sock.fileno()
assert _is_socket_unix(fd)
assert not _is_socket_unix(fd, 0, -1, "/no/such/path")
assert _is_socket_unix(fd, socket.SOCK_STREAM)
assert not _is_socket_unix(fd, socket.SOCK_DGRAM)
def test_listen_fds_no_fds():
# make sure we have no fds to listen to
os.unsetenv('LISTEN_FDS')
os.unsetenv('LISTEN_PID')
assert listen_fds() == []
assert listen_fds(True) == []
assert listen_fds(False) == []
def test_listen_fds():
os.environ['LISTEN_FDS'] = '3'
os.environ['LISTEN_PID'] = str(os.getpid())
assert listen_fds(False) == [3, 4, 5]
assert listen_fds(True) == [3, 4, 5]
assert listen_fds() == []
def test_listen_fds_default_unset():
os.environ['LISTEN_FDS'] = '1'
os.environ['LISTEN_PID'] = str(os.getpid())
assert listen_fds(False) == [3]
assert listen_fds() == [3]
assert listen_fds() == []
def test_notify_no_socket():
assert notify('READY=1') is False
with skip_enosys():
assert notify('FDSTORE=1', fds=[]) is False
assert notify('FDSTORE=1', fds=[1, 2]) is False
assert notify('FDSTORE=1', pid=os.getpid()) is False
assert notify('FDSTORE=1', pid=os.getpid(), fds=(1,)) is False
if sys.version_info >= (3,):
connection_error = ConnectionRefusedError
else:
connection_error = OSError
def test_notify_bad_socket():
os.environ['NOTIFY_SOCKET'] = '/dev/null'
with pytest.raises(connection_error):
notify('READY=1')
with pytest.raises(connection_error):
with skip_enosys():
notify('FDSTORE=1', fds=[])
with pytest.raises(connection_error):
notify('FDSTORE=1', fds=[1, 2])
with pytest.raises(connection_error):
notify('FDSTORE=1', pid=os.getpid())
with pytest.raises(connection_error):
notify('FDSTORE=1', pid=os.getpid(), fds=(1,))
def test_notify_with_socket(tmpdir):
path = tmpdir.join('socket').strpath
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
try:
sock.bind(path)
except socket.error as e:
pytest.xfail('failed to bind socket (%s)' % e)
# SO_PASSCRED is not defined in python2.7
SO_PASSCRED = getattr(socket, 'SO_PASSCRED', 16)
sock.setsockopt(socket.SOL_SOCKET, SO_PASSCRED, 1)
os.environ['NOTIFY_SOCKET'] = path
assert notify('READY=1')
with skip_enosys():
assert notify('FDSTORE=1', fds=[])
assert notify('FDSTORE=1', fds=[1, 2])
assert notify('FDSTORE=1', pid=os.getpid())
assert notify('FDSTORE=1', pid=os.getpid(), fds=(1,))