晋太元中,武陵人捕鱼为业。缘溪行,忘路之远近。忽逢桃花林,夹岸数百步,中无杂树,芳草鲜美,落英缤纷。渔人甚异之,复前行,欲穷其林。 林尽水源,便得一山,山有小口,仿佛若有光。便舍船,从口入。初极狭,才通人。复行数十步,豁然开朗。土地平旷,屋舍俨然,有良田、美池、桑竹之属。阡陌交通,鸡犬相闻。其中往来种作,男女衣着,悉如外人。黄发垂髫,并怡然自乐。 见渔人,乃大惊,问所从来。具答之。便要还家,设酒杀鸡作食。村中闻有此人,咸来问讯。自云先世避秦时乱,率妻子邑人来此绝境,不复出焉,遂与外人间隔。问今是何世,乃不知有汉,无论魏晋。此人一一为具言所闻,皆叹惋。余人各复延至其家,皆出酒食。停数日,辞去。此中人语云:“不足为外人道也。”(间隔 一作:隔绝) 既出,得其船,便扶向路,处处志之。及郡下,诣太守,说如此。太守即遣人随其往,寻向所志,遂迷,不复得路。 南阳刘子骥,高尚士也,闻之,欣然规往。未果,寻病终。后遂无问津者。
|
Server : Apache System : Linux srv.rainic.com 4.18.0-553.47.1.el8_10.x86_64 #1 SMP Wed Apr 2 05:45:37 EDT 2025 x86_64 User : rainic ( 1014) PHP Version : 7.4.33 Disable Function : exec,passthru,shell_exec,system Directory : /proc/self/root/opt/imunify360/venv/lib64/python3.11/site-packages/imav/malwarelib/scan/ |
Upload File : |
"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import asyncio
import collections
import logging
import time
from abc import ABC, abstractmethod
from base64 import b64encode
from functools import cached_property
from glob import iglob
from os import fsencode
from pathlib import Path
from typing import ClassVar, Deque, List, Optional
from uuid import uuid4
from defence360agent.contracts.config import (
Malware as Config,
MalwareScanIntensity,
)
from defence360agent.contracts.messages import MessageType
from defence360agent.internals.global_scope import g
from defence360agent.utils.common import HOUR, rate_limit
from imav.malwarelib.config import (
MalwareScanResourceType,
MalwareScanType,
QueuedScanState,
ExitDetachedScanType,
)
from imav.malwarelib.scan.ai_bolit.detached import (
AiBolitDetachedScan,
)
from imav.malwarelib.scan.detached import DetachedScan
from imav.malwarelib.utils.user_list import panel_users
from defence360agent.utils import (
antivirus_mode,
create_task_and_log_exceptions,
)
from defence360agent.utils.serialization import serialize_attr, unserialize
from imav.malwarelib.scan.app_version_detector import (
AppVersionDetector,
AVDExecutionError,
)
from imav.malwarelib.scan.mds.detached import MDSDetachedScan
from imav.malwarelib.scan.mds.scanner import MalwareDatabaseScanner
logger = logging.getLogger(__name__)
throttled_log_error = rate_limit(period=HOUR, on_drop=logger.warning)(
logger.error
)
INTENSITY_FALLBACK = {
"low": {
"intensity_cpu": 1,
"intensity_io": 1,
"intensity_ram": 1024,
},
"moderate": {
"intensity_cpu": 4,
"intensity_io": 4,
"intensity_ram": 2048,
},
"high": {
"intensity_cpu": 7,
"intensity_io": 7,
"intensity_ram": 4096,
},
}
class QueuedScanBase(ABC):
resource_type: ClassVar[MalwareScanResourceType]
detached_scan: DetachedScan
state = None
def __init__(
self,
path,
*,
scanid=None,
scan_type: str = MalwareScanType.ON_DEMAND,
created: int = None,
started: Optional[float] = None,
intensity=None,
home_dirs=None,
intensity_cpu=None,
intensity_io=None,
intensity_ram=None,
initiator=None,
state: Optional[str] = None,
**_,
):
self.path = path
if intensity:
intensity_fallback = INTENSITY_FALLBACK[intensity]
else:
intensity_fallback = {
"intensity_cpu": MalwareScanIntensity.CPU,
"intensity_io": MalwareScanIntensity.IO,
"intensity_ram": MalwareScanIntensity.RAM,
}
self.args = {
"intensity_cpu": intensity_cpu
or intensity_fallback["intensity_cpu"],
"intensity_io": intensity_io or intensity_fallback["intensity_io"],
"intensity_ram": intensity_ram
or intensity_fallback["intensity_ram"],
"initiator": initiator,
}
home_dirs = home_dirs or []
if scan_type == MalwareScanType.ON_DEMAND and Path(path) in home_dirs:
scan_type = MalwareScanType.USER
self.scanid = scanid or uuid4().hex
self.scan_type = scan_type
self.created = created or int(time.time())
self.state = (
QueuedScanState.queued if state is None else QueuedScanState(state)
)
self.started = started
self.scanner_task = None
@abstractmethod
async def start(self):
pass
@property
@abstractmethod
def total_resources(self):
pass
def stop(self):
if self.scanner_task:
self.scanner_task.cancel()
def status(self):
result = {
"status": self.state.value,
"path": self.path,
"scanid": self.scanid,
"started": self.started,
"created": self.created,
"scan_type": self.scan_type,
"resource_type": self.resource_type.value,
**self.args,
}
if self.state == QueuedScanState.running:
result["phase"] = self.detached_scan.phase
result["progress"] = self.detached_scan.progress
return result
def __getstate__(self):
state = self.__dict__.copy()
del state["detached_scan"]
del state["scanner_task"]
return state
def __setstate__(self, state):
self.__dict__.update(state)
self.scanner_task = None
def __eq__(self, other):
return (
self.resource_type == other.resource_type
and self.path == other.path
)
def __repr__(self):
return "<{}({!r}, scanid={})>".format(
self.__class__.__qualname__, self.path, self.scanid
)
# We don't need to hash queued scans
__hash__ = None # type: ignore
def to_dict(self):
return {
"path": self.path,
"scanid": self.scanid,
"scan_type": self.scan_type,
"created": self.created,
"started": self.started,
"args": self.args,
"resource_type": self.resource_type.value,
"state": self.state.value,
}
@classmethod
def from_dict(cls, kwargs):
for arg, value in kwargs.pop("args", {}).items():
kwargs[arg] = value
return cls(**kwargs)
class QueuedFileScan(QueuedScanBase):
resource_type = MalwareScanResourceType.FILE
def __init__(
self,
path,
*,
scanid=None,
scan_type: str = MalwareScanType.ON_DEMAND,
created: int = None,
started: Optional[float] = None,
intensity=None,
home_dirs=None,
intensity_cpu=None,
intensity_io=None,
intensity_ram=None,
file_patterns=None,
exclude_patterns=None,
follow_symlinks=None,
detect_elf=None,
initiator=None,
state=None,
**_,
):
super().__init__(
path,
scanid=scanid,
scan_type=scan_type,
created=created,
started=started,
intensity=intensity,
home_dirs=home_dirs,
intensity_cpu=intensity_cpu,
intensity_io=intensity_io,
intensity_ram=intensity_ram,
initiator=initiator,
state=state,
)
self.args.update(
file_patterns=file_patterns or None,
exclude_patterns=exclude_patterns or None,
follow_symlinks=follow_symlinks or False,
detect_elf=detect_elf,
)
self.detached_scan = AiBolitDetachedScan(self.scanid)
def __setstate__(self, state):
# WARNING: Avoid adding a new attribute to a serializable class.
# If an object deserializes after a package upgrade, it will lack it
super().__setstate__(state)
self.detached_scan = AiBolitDetachedScan(self.scanid)
async def start(self):
self.started = time.time()
from imav.malwarelib.scan.scanner import MalwareScanner
scanner = MalwareScanner(sink=g.sink, hooks=True)
self.scanner_task = scanner.start(
self.path,
scan_id=self.scanid,
scan_type=self.scan_type,
started=self.started,
**self.args,
)
scan_data = await scanner.async_wait()
if scan_data is None:
logger.info("Scan cancelled for %s", self.path)
self.state = QueuedScanState.stopped
else:
self.state = QueuedScanState.running
scan_data["initiator"] = (
# using `get` because there is no initiator before version 6.8
self.args.get("initiator")
# for compatibility reason: when `self.initiator` is available
or getattr(self, "initiator", "undefined")
)
return scan_data
def stop(self):
if self.scanner_task:
self.scanner_task.cancel()
@property
def total_resources(self):
return self.detached_scan.total_resources
class QueuedDbScan(QueuedScanBase):
resource_type = MalwareScanResourceType.DB
def __init__(
self,
path: str,
scanid: Optional[str] = None,
# FIXME: Use Enum instead of a class with str attributes.
scan_type: str = MalwareScanType.ON_DEMAND,
created: int = None,
started: Optional[float] = None,
intensity: Optional[str] = None,
home_dirs=None,
intensity_cpu=None,
intensity_io=None,
intensity_ram=None,
state=None,
**_,
):
super().__init__(
path=path,
scanid=scanid,
scan_type=scan_type,
created=created,
started=started,
intensity=intensity,
home_dirs=home_dirs,
intensity_cpu=intensity_cpu,
intensity_io=intensity_io,
intensity_ram=intensity_ram,
state=state,
)
self.detached_scan = MDSDetachedScan(self.scanid)
def __setstate__(self, state):
super().__setstate__(state)
self.detached_scan = MDSDetachedScan(self.scanid)
@property
def total_resources(self) -> int:
return self.detached_scan.total_resources
async def _scan(self):
# app-version-detector should recursive check all directories,
# no need to extract them explicitly.
# Used to make files and db scans idempotent (DEF-19264)
# MDS scanner (php) should alerady handle /path/* as /path/ (DEF-19096)
apps_path = (
self.path.rstrip("*") if self.path.endswith("/*") else self.path
)
unglobbed_paths = [b64encode(fsencode(d)) for d in iglob(apps_path)]
try:
await AppVersionDetector().start(unglobbed_paths)
except AVDExecutionError as exc: # Exited with non-zero return code
await g.sink.process_message(
self._generate_scan_failed_message(
exc.command, exc.out, exc.err
)
)
else:
await MalwareDatabaseScanner(
self.path,
[self.path], # FIXME: pass unglobbed_paths here
**self.args,
scan_type=self.scan_type,
scan_id=self.scanid,
).scan()
async def start(self) -> None:
self.started = time.time()
self.scanner_task = create_task_and_log_exceptions(
asyncio.get_event_loop(), self._scan
)
@staticmethod
def _generate_scan_failed_message(cmd, stdout, stderr):
"""Generate ScanFailed message"""
msg = MessageType.ScanFailed(
out=stdout,
err=stderr,
command=" ".join(cmd),
message=ExitDetachedScanType.ABORTED,
)
logger.warning("Scan was aborted: %s, %s", msg["err"], msg["out"])
return msg
# For backward compatibility to deserialize an old queue
State = QueuedScanState
QueuedScan = QueuedFileScan
serialize_scans = serialize_attr(path=Config.SCANS_PATH, attr="_scans_info")
SCAN_TYPE_CLASSES = {
MalwareScanResourceType.FILE.value: QueuedFileScan,
MalwareScanResourceType.DB.value: QueuedDbScan,
}
class ScanQueue:
@property
def _scans_info(self):
return collections.deque(item.to_dict() for item in self._scans)
@cached_property
def _scans(self) -> Deque[QueuedScanBase]:
# it should be loaded once per instance
scans = collections.deque()
for scan_info in unserialize(
path=Config.SCANS_PATH, fallback=collections.deque
):
try:
cls = SCAN_TYPE_CLASSES[scan_info["resource_type"]]
scans.append(cls.from_dict(scan_info))
except Exception as exc:
# don't flood Sentry, send one error message
throttled_log_error(
"Can't get scan class for %s due to %s", scan_info, exc
)
return scans
@property
def current_scan(self):
return self.peek(0)
@serialize_scans
async def put(
self,
paths,
resource_type: MalwareScanResourceType,
prioritize=False,
**scan_args,
):
home_dirs = [Path(user["home"]) for user in await panel_users()]
if resource_type == MalwareScanResourceType.FILE:
scans_to_add: List[QueuedScanBase] = [
QueuedFileScan(path, home_dirs=home_dirs, **scan_args)
for path in paths
]
elif (
antivirus_mode.disabled
and resource_type == MalwareScanResourceType.DB
):
scans_to_add = db_scans(paths, home_dirs, scan_args)
else:
raise ValueError("Unknown resource_type: {}".format(resource_type))
if prioritize and self._scans:
running = self._scans.popleft()
self._scans.extendleft(reversed(scans_to_add))
self._scans.appendleft(running)
else:
self._scans.extend(scans_to_add)
@serialize_scans
def remove(self, scan=None):
if len(self) == 0:
return
scan = scan or self.current_scan
self._scans.remove(scan)
scan.stop()
logger.info("Scans pending: %d", len(self))
def peek(self, priority):
if -1 < priority < len(self):
return self._scans[priority]
def find_all(self, scan_ids):
return [scan for scan in self._scans if scan.scanid in scan_ids]
def find(self, **kwargs) -> Optional[QueuedScanBase]:
for scan in self._scans:
if all([getattr(scan, k) == v for k, v in kwargs.items()]):
return scan
return None
def update(self, scan_ids, status) -> None:
for scan in self._scans:
if scan.scanid in scan_ids:
scan.state = status
def get_scans_from_paths(self, paths):
for scan in self.scans:
if scan.path in paths:
yield scan, scan.state.value
def scan_summaries(self, scans=None):
scans = scans or self._scans
return collections.OrderedDict(
(
scan.scanid,
{
"path": scan.path,
"scan_status": scan.state.value,
"scan_type": scan.scan_type,
"started": scan.started,
"created": scan.created,
"error": None,
"total_resources": scan.total_resources,
"total_malicious": 0,
"resource_type": scan.resource_type.value,
},
)
for scan in scans
)
@property
def scans(self):
return list(self._scans)
def __bool__(self):
return len(self._scans) > 0
def __contains__(self, scan):
return scan in self._scans
def __len__(self):
return len(self._scans)
def db_scans(paths, home_dirs, scan_args):
return [
QueuedDbScan(path, home_dirs=home_dirs, **scan_args) for path in paths
]