晋太元中,武陵人捕鱼为业。缘溪行,忘路之远近。忽逢桃花林,夹岸数百步,中无杂树,芳草鲜美,落英缤纷。渔人甚异之,复前行,欲穷其林。 林尽水源,便得一山,山有小口,仿佛若有光。便舍船,从口入。初极狭,才通人。复行数十步,豁然开朗。土地平旷,屋舍俨然,有良田、美池、桑竹之属。阡陌交通,鸡犬相闻。其中往来种作,男女衣着,悉如外人。黄发垂髫,并怡然自乐。 见渔人,乃大惊,问所从来。具答之。便要还家,设酒杀鸡作食。村中闻有此人,咸来问讯。自云先世避秦时乱,率妻子邑人来此绝境,不复出焉,遂与外人间隔。问今是何世,乃不知有汉,无论魏晋。此人一一为具言所闻,皆叹惋。余人各复延至其家,皆出酒食。停数日,辞去。此中人语云:“不足为外人道也。”(间隔 一作:隔绝) 既出,得其船,便扶向路,处处志之。及郡下,诣太守,说如此。太守即遣人随其往,寻向所志,遂迷,不复得路。 南阳刘子骥,高尚士也,闻之,欣然规往。未果,寻病终。后遂无问津者。
|
Server : Apache System : Linux srv.rainic.com 4.18.0-553.47.1.el8_10.x86_64 #1 SMP Wed Apr 2 05:45:37 EDT 2025 x86_64 User : rainic ( 1014) PHP Version : 7.4.33 Disable Function : exec,passthru,shell_exec,system Directory : /opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/subsys/ |
Upload File : |
import asyncio
import logging
import os
import subprocess as su
from typing import Iterable, Union
from defence360agent.contracts.config import Core
from defence360agent.utils import check_run, run
logger = logging.getLogger(__name__)
def _apply_cmd(func):
async def wrapper(*args, **kwargs):
cmd = func(*args, **kwargs)
logger.debug("check_call(%r)", cmd)
await check_run(cmd)
return wrapper
async def _reset_failed_state(
services: Iterable[Union["_CentOs6", "_SystemctlBased"]]
):
for s in services:
await s.reset_failed()
await s.restart()
for _ in range(10):
if await s.is_active():
break
logger.warning(
"Service %s is still not active, sleep for %s seconds", s, 1
)
await asyncio.sleep(1)
class _CentOs6:
SVC_CTL_BIN = "/sbin/service"
_CHKCONFIG = "/sbin/chkconfig"
def __init__(self, service_name):
self._service_name = service_name
@_apply_cmd
def start(self):
return [self.SVC_CTL_BIN, self._service_name, "start"]
@_apply_cmd
def stop(self):
return [self.SVC_CTL_BIN, self._service_name, "stop"]
@_apply_cmd
def restart(self):
return [self.SVC_CTL_BIN, self._service_name, "restart"]
async def reset_failed(self):
"""Not implemented for Centos6"""
pass
@_apply_cmd
def enable(self):
return [self._CHKCONFIG, "--add", self._service_name]
async def is_enabled(self):
cmd = [self._CHKCONFIG, "--list", self._service_name]
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=su.PIPE, stderr=su.DEVNULL
)
out, _ = await proc.communicate()
rc = await proc.wait()
return rc == 0 and b":on" in out
def is_enabled_sync(self):
cmd = [self._CHKCONFIG, "--list", self._service_name]
cp = su.run(cmd, stdout=su.PIPE, stderr=su.DEVNULL)
return cp.returncode == 0 and b":on" in cp.stdout
@_apply_cmd
def disable(self):
return [self._CHKCONFIG, "--del", self._service_name]
mask = disable
unmask = enable
async def is_active(self):
cmd = [self.SVC_CTL_BIN, self._service_name, "status"]
exit_code, _, _ = await run(cmd)
return exit_code == 0
async def activate_socket_service(self):
if await self.is_enabled() and not await self.is_active():
await _reset_failed_state((self,))
class _SystemctlBased:
SVC_CTL_BIN = "systemctl"
def __init__(self, service_name):
self._service_name = service_name
@_apply_cmd
def start(self):
return [self.SVC_CTL_BIN, "start", self._service_name]
@_apply_cmd
def stop(self):
return [self.SVC_CTL_BIN, "stop", self._service_name]
@_apply_cmd
def restart(self):
return [self.SVC_CTL_BIN, "restart", self._service_name]
@_apply_cmd
def enable(self):
return [self.SVC_CTL_BIN, "enable", self._service_name]
async def is_enabled(self):
cmd = [self.SVC_CTL_BIN, "is-enabled", self._service_name]
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=su.DEVNULL, stderr=su.DEVNULL
)
await proc.communicate()
rc = await proc.wait()
return rc == 0
def is_enabled_sync(self):
cmd = [self.SVC_CTL_BIN, "is-enabled", self._service_name]
rc = su.call(cmd, stdout=su.DEVNULL, stderr=su.DEVNULL)
return rc == 0
@_apply_cmd
def disable(self):
return [self.SVC_CTL_BIN, "disable", self._service_name]
@_apply_cmd
def mask(self):
"""
It was created for imunify360-webshield which required masking as far
This is no more relevant but let it stay
is started by 'Wants=' in imunify360.service
"""
return [self.SVC_CTL_BIN, "mask", self._service_name]
@_apply_cmd
def unmask(self):
"""
It was created for imunify360-webshield which required masking as far
This is no more relevant but let it stay
is started by 'Wants=' in imunify360.service
"""
return [self.SVC_CTL_BIN, "unmask", self._service_name]
async def is_active(self):
cmd = [self.SVC_CTL_BIN, "is-active", self._service_name]
exit_code, _, _ = await run(cmd)
return exit_code == 0
@_apply_cmd
def reset_failed(self):
return [self.SVC_CTL_BIN, "reset-failed", self._service_name]
async def activate_socket_service(self):
agent_service_socket = adaptor(f"{self._service_name}.socket")
if (
await agent_service_socket.is_enabled()
and not await agent_service_socket.is_active()
):
await _reset_failed_state((self, agent_service_socket))
class _CentOs7(_SystemctlBased):
SVC_CTL_BIN = "/usr/bin/systemctl"
class _DebianUbuntu(_SystemctlBased):
SVC_CTL_BIN = "/bin/systemctl"
def adaptor(service_name):
for a in (_DebianUbuntu, _CentOs7, _CentOs6):
if os.path.exists(a.SVC_CTL_BIN):
return a(service_name)
else:
raise RuntimeError("Cannot instantiate appropriate adaptor.")
def imunify360_service():
return adaptor(Core.SVC_NAME)