晋太元中,武陵人捕鱼为业。缘溪行,忘路之远近。忽逢桃花林,夹岸数百步,中无杂树,芳草鲜美,落英缤纷。渔人甚异之,复前行,欲穷其林。 林尽水源,便得一山,山有小口,仿佛若有光。便舍船,从口入。初极狭,才通人。复行数十步,豁然开朗。土地平旷,屋舍俨然,有良田、美池、桑竹之属。阡陌交通,鸡犬相闻。其中往来种作,男女衣着,悉如外人。黄发垂髫,并怡然自乐。 见渔人,乃大惊,问所从来。具答之。便要还家,设酒杀鸡作食。村中闻有此人,咸来问讯。自云先世避秦时乱,率妻子邑人来此绝境,不复出焉,遂与外人间隔。问今是何世,乃不知有汉,无论魏晋。此人一一为具言所闻,皆叹惋。余人各复延至其家,皆出酒食。停数日,辞去。此中人语云:“不足为外人道也。”(间隔 一作:隔绝) 既出,得其船,便扶向路,处处志之。及郡下,诣太守,说如此。太守即遣人随其往,寻向所志,遂迷,不复得路。 南阳刘子骥,高尚士也,闻之,欣然规往。未果,寻病终。后遂无问津者。
|
Server : Apache System : Linux srv.rainic.com 4.18.0-553.47.1.el8_10.x86_64 #1 SMP Wed Apr 2 05:45:37 EDT 2025 x86_64 User : rainic ( 1014) PHP Version : 7.4.33 Disable Function : exec,passthru,shell_exec,system Directory : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/plugins/ |
Upload File : |
import asyncio
import concurrent.futures
import contextlib
import logging
import os
import uuid
from typing import Generator, List
from defence360agent.api.server import APIError, send_message
from defence360agent.contracts import license
from defence360agent.contracts.config import Core
from defence360agent.contracts.messages import Message, MessageType
from defence360agent.contracts.plugins import MessageSink, expect
from defence360agent.utils import RecurringCheckStop, recurring_check, Scope
logger = logging.getLogger(__name__)
class SendToServer(MessageSink):
"""Send messages to server.
* process Reportable messages;
* add them to a pending messages list;
* send all pending messages to server when list is full (contains
_PENDING_MESSAGES_LIMIT items or more);
* send all pending messages on plugin shutdown."""
# TODO: consider reducing the limit,
# since it can lead to messages loss on shutdown (DEF-20261)
_PENDING_MESSAGES_LIMIT = int(
os.environ.get("IMUNIFYAV_MESSAGES_COUNT_TO_SEND", 50)
)
# 50 second because it should be less than DefaultTimeoutStopSec
_SHUTDOWN_SEND_TIMEOUT = 50
SCOPE = Scope.AV
async def create_sink(self, loop: asyncio.AbstractEventLoop):
self._loop = loop
self._pending = [] # type: List[Message]
self._try_send = asyncio.Event()
self._lock = asyncio.Lock()
self._sender_task = loop.create_task(self._send())
async def shutdown(self) -> None:
"""
When shutdown begins it gives 50 seconds
to send _pending messages to the server
(after 60 seconds process will bi killed by systemd)
If stop() isn't done in 50 second it terminates
process of sending messages and logs error
"""
try:
await asyncio.wait_for(self.stop(), self._SHUTDOWN_SEND_TIMEOUT)
except asyncio.TimeoutError:
# Used logger.error to notify sentry
logger.error(
"Timeout (%ds) sending messages to server on shutdown.",
self._SHUTDOWN_SEND_TIMEOUT,
)
if not self._sender_task.cancelled():
self._sender_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._sender_task
if self._pending:
logger.warning(
"Dropped %s messages not sent to server", len(self._pending)
)
logger.warning("Dropped queue %r", self._pending)
async def stop(self):
"""
Stop sending.
1. wait for the lock being available
i.e., while _sender_task finishes the current round
of sending message (if it takes too long, then
the timeout in shutdown() is triggered
2. once the sending round complete (we got the lock),
cancel the next iteration of the _sender_task (it exits)
3. send _pending messages (again, if it takes too long,
the timeout in shutdown() is triggered
and the coroutine is cancelled
That method makes sure that the coroutine
that was started in it has ended.
It excludes a situation when:
-> The result of a coroutine that started
BEFORE shutdown() is started.
-> And the process of sending messages
from _pending is interrupted because of it
"""
# The _lock allows you to be sure that the _send_pending_messages
# coroutine is not running and _pending is not being used
async with self._lock:
# Cancel _sender_task. The lock ensures that the coroutine
# is not in its critical part
self._sender_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._sender_task
# send messages that are in _pending at the time of agent shutdown
await self._send_pending_messages()
@contextlib.contextmanager
def _get_api(self) -> Generator[send_message.SendMessageAPI, None, None]:
base_url = os.environ.get("IMUNIFYAV_API_BASE")
# we send messages sequentially, so max_workers=1
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
api = send_message.SendMessageAPI(
Core.VERSION, base_url, executor=executor
)
api.set_product_name(license.LicenseCLN.get_product_name())
api.set_server_id(license.LicenseCLN.get_server_id())
api.set_license(license.LicenseCLN.get_token())
yield api
@expect(MessageType.Reportable)
async def send_to_server(self, message: Message) -> None:
if "message_id" not in message:
message["message_id"] = uuid.uuid4().hex
self._pending.append(message)
self._try_send.set()
@recurring_check(0)
async def _send(self):
await self._try_send.wait()
self._try_send.clear()
if len(self._pending) >= self._PENDING_MESSAGES_LIMIT:
# The _lock protects critical part of _send method
async with self._lock:
await self._send_pending_messages()
async def _send_pending_messages(self) -> None:
messages, self._pending = self._pending, []
unsuccessful = []
logger.info("Sending %s messages", len(messages))
with self._get_api() as api:
if api.server_id is not None:
for message in messages:
try:
await api.send_message(message)
logger.info(
"message sent %s",
{
"method": message.get("method"),
"message_id": message.get("message_id"),
},
)
except APIError as exc:
logger.warning(
"Failed to send message %s to server: %s",
message,
exc,
)
unsuccessful.append(message)
if unsuccessful:
logger.info("Unsuccessful to send %s messages", len(unsuccessful))
self._pending = unsuccessful + self._pending