晋太元中,武陵人捕鱼为业。缘溪行,忘路之远近。忽逢桃花林,夹岸数百步,中无杂树,芳草鲜美,落英缤纷。渔人甚异之,复前行,欲穷其林。 林尽水源,便得一山,山有小口,仿佛若有光。便舍船,从口入。初极狭,才通人。复行数十步,豁然开朗。土地平旷,屋舍俨然,有良田、美池、桑竹之属。阡陌交通,鸡犬相闻。其中往来种作,男女衣着,悉如外人。黄发垂髫,并怡然自乐。 见渔人,乃大惊,问所从来。具答之。便要还家,设酒杀鸡作食。村中闻有此人,咸来问讯。自云先世避秦时乱,率妻子邑人来此绝境,不复出焉,遂与外人间隔。问今是何世,乃不知有汉,无论魏晋。此人一一为具言所闻,皆叹惋。余人各复延至其家,皆出酒食。停数日,辞去。此中人语云:“不足为外人道也。”(间隔 一作:隔绝) 既出,得其船,便扶向路,处处志之。及郡下,诣太守,说如此。太守即遣人随其往,寻向所志,遂迷,不复得路。 南阳刘子骥,高尚士也,闻之,欣然规往。未果,寻病终。后遂无问津者。
|
Server : Apache System : Linux srv.rainic.com 4.18.0-553.47.1.el8_10.x86_64 #1 SMP Wed Apr 2 05:45:37 EDT 2025 x86_64 User : rainic ( 1014) PHP Version : 7.4.33 Disable Function : exec,passthru,shell_exec,system Directory : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/rpc_tools/ |
Upload File : |
import datetime
import logging
import math
import os
import re
from collections import namedtuple
from functools import wraps
from cerberus.validator import Validator
from defence360agent.contracts.config import (
ANTIVIRUS_MODE,
BackupRestore,
Malware,
)
from defence360agent.contracts.license import LicenseCLN
from defence360agent.subsys.backup_systems import BackupSystem, get_backend
logger = logging.getLogger(__name__)
SHA256_REGEXP = re.compile("^[A-Fa-f0-9]{64}$")
class ValidationError(Exception):
def __init__(self, errors, extra_data=None):
if isinstance(errors, str):
self.errors = [errors]
else:
self.errors = errors
self.extra_data = extra_data or {}
OrderByBase = namedtuple("OrderByBase", ["column_name", "desc"])
class OrderBy(OrderByBase):
def __new__(cls, column_name, desc):
return super().__new__(cls, column_name, desc)
@classmethod
def fromstring(cls, ob_string):
"""
:param ob_string: for example: 'user+', 'id-'
:return:
"""
try:
col_name, sign = re.compile("^(.+)([+|-])").split(ob_string)[1:-1]
return cls(col_name, sign == "-")
except ValueError as e:
raise ValueError(
"Incorrect order_by: ({}): {}".format(str(e), ob_string)
)
class SchemaValidator(Validator):
_DATE_FORMAT = "%Y-%m-%d"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.extra_data = {}
def _normalize_coerce_order_by(self, value):
if isinstance(value, OrderBy):
return value
return OrderBy.fromstring(value)
def _normalize_coerce_sha256hash(self, value):
return str(value).strip().lower()
def _normalize_coerce_scan_db(self, value):
if ANTIVIRUS_MODE:
return False
if value is None:
return Malware.DATABASE_SCAN_ENABLED
return value
def _validate_type_order_by(self, value):
if isinstance(value, OrderBy):
return True
return False
def _validate_type_sha256hash(self, value: str):
return SHA256_REGEXP.match(str(value).strip())
def _validate_is_absolute_path(self, is_absolute_path, field, value):
"""{'type': 'boolean', 'empty': False}"""
if is_absolute_path:
if not os.path.isabs(value):
self._error(field, "Path {} should be absolute".format(value))
def _validate_isascii(self, isascii, field, value):
"""{'type': 'boolean'}"""
if isascii:
try:
value.encode("ascii")
except UnicodeEncodeError:
self._error(field, "Must only contain ascii symbols")
def _normalize_coerce_int(self, value):
return int(value)
def _normalize_default_setter_now(self, document) -> int:
return math.ceil(datetime.datetime.now().timestamp())
# for argparser support
def _validate_cli(self, *args, **kwargs):
"""{'type': 'dict', 'empty': False, 'schema': {
'users': {'type': 'list', 'allowed': ['non-root', 'root'],
'empty': False},
'require_rpc': {'type': 'string', 'empty': True, 'default': 'running',
'allowed': ['running', 'stopped', 'any', 'direct']}
}}
"""
# for argparser support
def _validate_help(self, *args, **kwargs):
"""{'type': 'string', 'empty': False}"""
# for argparser support
def _validate_positional(self, *args, **kwargs):
"""{'type': 'boolean', 'empty': True, 'default': False}"""
# metadata for response validation
def _validate_return_type(self, *args, **kwargs):
"""{'type': 'string', 'empty': True}"""
def _validate_envvar(self, *args, **kwargs):
"""
Parameter can be passed via the specified environment variable.
The value specified via a CLI argument takes precedence.
The rule's arguments are validated against this schema:
{'type': 'string', 'empty': False}
"""
def _validate_envvar_only(self, *args, **kwargs):
"""
Parameter will only be accepted if provided via environment
variable specified by `envvar`. It will be rejected if passed as
a CLI argument.
The rule's arguments are validated against this schema:
{'type': 'boolean', 'default': False}
"""
def _normalize_coerce_path(self, value: str):
if value:
return os.path.abspath(value)
return value
def _normalize_coerce_backup_system(self, value):
if isinstance(value, BackupSystem):
return value
return get_backend(value)
def _validator_backup_is_enabled(self, field, value):
if not (BackupRestore.ENABLED and BackupRestore.backup_system()):
self._error(field, "Backup is not enabled!")
def validate(validator, hashable, params):
values = validator.normalized(
{hashable: params}, always_return_document=True
)
if not validator.validate({hashable: values[hashable]}):
logger.warning(
"Validation error with command {}, params {}, errors {}".format(
hashable, params, validator.errors
)
)
raise ValidationError(validator.errors, validator.extra_data)
return validator.document[hashable]
def validate_middleware(validator):
def wrapped(f):
@wraps(f)
async def wrapper(request, *args, **kwargs):
hashable = tuple(request["command"])
request["params"] = validate(
validator, hashable, request["params"]
)
result = await f(request, *args, **kwargs)
return result
return wrapper
return wrapped
def validate_av_plus_license(func):
"""
Decorator for CLI commands methods that ensures that the AV+ license
is valid.
:raises ValidationError:
"""
exception = ValidationError("ImunifyAV+ license required")
@wraps(func)
async def async_wrapper(*args, **kwargs):
if LicenseCLN.is_valid_av_plus():
return await func(*args, **kwargs)
raise exception
if ANTIVIRUS_MODE:
return async_wrapper
return func