晋太元中,武陵人捕鱼为业。缘溪行,忘路之远近。忽逢桃花林,夹岸数百步,中无杂树,芳草鲜美,落英缤纷。渔人甚异之,复前行,欲穷其林。 林尽水源,便得一山,山有小口,仿佛若有光。便舍船,从口入。初极狭,才通人。复行数十步,豁然开朗。土地平旷,屋舍俨然,有良田、美池、桑竹之属。阡陌交通,鸡犬相闻。其中往来种作,男女衣着,悉如外人。黄发垂髫,并怡然自乐。 见渔人,乃大惊,问所从来。具答之。便要还家,设酒杀鸡作食。村中闻有此人,咸来问讯。自云先世避秦时乱,率妻子邑人来此绝境,不复出焉,遂与外人间隔。问今是何世,乃不知有汉,无论魏晋。此人一一为具言所闻,皆叹惋。余人各复延至其家,皆出酒食。停数日,辞去。此中人语云:“不足为外人道也。”(间隔 一作:隔绝) 既出,得其船,便扶向路,处处志之。及郡下,诣太守,说如此。太守即遣人随其往,寻向所志,遂迷,不复得路。 南阳刘子骥,高尚士也,闻之,欣然规往。未果,寻病终。后遂无问津者。
|
Server : Apache System : Linux srv.rainic.com 4.18.0-553.47.1.el8_10.x86_64 #1 SMP Wed Apr 2 05:45:37 EDT 2025 x86_64 User : rainic ( 1014) PHP Version : 7.4.33 Disable Function : exec,passthru,shell_exec,system Directory : /opt/alt/python311/lib64/python3.11/multiprocessing/ |
Upload File : |
"""Provides shared memory for direct access across processes.
The API of this package is currently provisional. Refer to the
documentation for details.
"""
__all__ = [ 'SharedMemory', 'ShareableList' ]
from functools import partial
import mmap
import os
import errno
import struct
import secrets
import types
if os.name == "nt":
import _winapi
_USE_POSIX = False
else:
import _posixshmem
_USE_POSIX = True
from . import resource_tracker
_O_CREX = os.O_CREAT | os.O_EXCL
# FreeBSD (and perhaps other BSDs) limit names to 14 characters.
_SHM_SAFE_NAME_LENGTH = 14
# Shared memory block name prefix
if _USE_POSIX:
_SHM_NAME_PREFIX = '/psm_'
else:
_SHM_NAME_PREFIX = 'wnsm_'
def _make_filename():
"Create a random filename for the shared memory object."
# number of random bytes to use for name
nbytes = (_SHM_SAFE_NAME_LENGTH - len(_SHM_NAME_PREFIX)) // 2
assert nbytes >= 2, '_SHM_NAME_PREFIX too long'
name = _SHM_NAME_PREFIX + secrets.token_hex(nbytes)
assert len(name) <= _SHM_SAFE_NAME_LENGTH
return name
class SharedMemory:
"""Creates a new shared memory block or attaches to an existing
shared memory block.
Every shared memory block is assigned a unique name. This enables
one process to create a shared memory block with a particular name
so that a different process can attach to that same shared memory
block using that same name.
As a resource for sharing data across processes, shared memory blocks
may outlive the original process that created them. When one process
no longer needs access to a shared memory block that might still be
needed by other processes, the close() method should be called.
When a shared memory block is no longer needed by any process, the
unlink() method should be called to ensure proper cleanup."""
# Defaults; enables close() and unlink() to run without errors.
_name = None
_fd = -1
_mmap = None
_buf = None
_flags = os.O_RDWR
_mode = 0o600
_prepend_leading_slash = True if _USE_POSIX else False
def __init__(self, name=None, create=False, size=0):
if not size >= 0:
raise ValueError("'size' must be a positive integer")
if create:
self._flags = _O_CREX | os.O_RDWR
if size == 0:
raise ValueError("'size' must be a positive number different from zero")
if name is None and not self._flags & os.O_EXCL:
raise ValueError("'name' can only be None if create=True")
if _USE_POSIX:
# POSIX Shared Memory
if name is None:
while True:
name = _make_filename()
try:
self._fd = _posixshmem.shm_open(
name,
self._flags,
mode=self._mode
)
except FileExistsError:
continue
self._name = name
break
else:
name = "/" + name if self._prepend_leading_slash else name
self._fd = _posixshmem.shm_open(
name,
self._flags,
mode=self._mode
)
self._name = name
try:
if create and size:
os.ftruncate(self._fd, size)
stats = os.fstat(self._fd)
size = stats.st_size
self._mmap = mmap.mmap(self._fd, size)
except OSError:
self.unlink()
raise
resource_tracker.register(self._name, "shared_memory")
else:
# Windows Named Shared Memory
if create:
while True:
temp_name = _make_filename() if name is None else name
# Create and reserve shared memory block with this name
# until it can be attached to by mmap.
h_map = _winapi.CreateFileMapping(
_winapi.INVALID_HANDLE_VALUE,
_winapi.NULL,
_winapi.PAGE_READWRITE,
(size >> 32) & 0xFFFFFFFF,
size & 0xFFFFFFFF,
temp_name
)
try:
last_error_code = _winapi.GetLastError()
if last_error_code == _winapi.ERROR_ALREADY_EXISTS:
if name is not None:
raise FileExistsError(
errno.EEXIST,
os.strerror(errno.EEXIST),
name,
_winapi.ERROR_ALREADY_EXISTS
)
else:
continue
self._mmap = mmap.mmap(-1, size, tagname=temp_name)
finally:
_winapi.CloseHandle(h_map)
self._name = temp_name
break
else:
self._name = name
# Dynamically determine the existing named shared memory
# block's size which is likely a multiple of mmap.PAGESIZE.
h_map = _winapi.OpenFileMapping(
_winapi.FILE_MAP_READ,
False,
name
)
try:
p_buf = _winapi.MapViewOfFile(
h_map,
_winapi.FILE_MAP_READ,
0,
0,
0
)
finally:
_winapi.CloseHandle(h_map)
try:
size = _winapi.VirtualQuerySize(p_buf)
finally:
_winapi.UnmapViewOfFile(p_buf)
self._mmap = mmap.mmap(-1, size, tagname=name)
self._size = size
self._buf = memoryview(self._mmap)
def __del__(self):
try:
self.close()
except OSError:
pass
def __reduce__(self):
return (
self.__class__,
(
self.name,
False,
self.size,
),
)
def __repr__(self):
return f'{self.__class__.__name__}({self.name!r}, size={self.size})'
@property
def buf(self):
"A memoryview of contents of the shared memory block."
return self._buf
@property
def name(self):
"Unique name that identifies the shared memory block."
reported_name = self._name
if _USE_POSIX and self._prepend_leading_slash:
if self._name.startswith("/"):
reported_name = self._name[1:]
return reported_name
@property
def size(self):
"Size in bytes."
return self._size
def close(self):
"""Closes access to the shared memory from this instance but does
not destroy the shared memory block."""
if self._buf is not None:
self._buf.release()
self._buf = None
if self._mmap is not None:
self._mmap.close()
self._mmap = None
if _USE_POSIX and self._fd >= 0:
os.close(self._fd)
self._fd = -1
def unlink(self):
"""Requests that the underlying shared memory block be destroyed.
In order to ensure proper cleanup of resources, unlink should be
called once (and only once) across all processes which have access
to the shared memory block."""
if _USE_POSIX and self._name:
_posixshmem.shm_unlink(self._name)
resource_tracker.unregister(self._name, "shared_memory")
_encoding = "utf8"
class ShareableList:
"""Pattern for a mutable list-like object shareable via a shared
memory block. It differs from the built-in list type in that these
lists can not change their overall length (i.e. no append, insert,
etc.)
Because values are packed into a memoryview as bytes, the struct
packing format for any storable value must require no more than 8
characters to describe its format."""
# The shared memory area is organized as follows:
# - 8 bytes: number of items (N) as a 64-bit integer
# - (N + 1) * 8 bytes: offsets of each element from the start of the
# data area
# - K bytes: the data area storing item values (with encoding and size
# depending on their respective types)
# - N * 8 bytes: `struct` format string for each element
# - N bytes: index into _back_transforms_mapping for each element
# (for reconstructing the corresponding Python value)
_types_mapping = {
int: "q",
float: "d",
bool: "xxxxxxx?",
str: "%ds",
bytes: "%ds",
None.__class__: "xxxxxx?x",
}
_alignment = 8
_back_transforms_mapping = {
0: lambda value: value, # int, float, bool
1: lambda value: value.rstrip(b'\x00').decode(_encoding), # str
2: lambda value: value.rstrip(b'\x00'), # bytes
3: lambda _value: None, # None
}
@staticmethod
def _extract_recreation_code(value):
"""Used in concert with _back_transforms_mapping to convert values
into the appropriate Python objects when retrieving them from
the list as well as when storing them."""
if not isinstance(value, (str, bytes, None.__class__)):
return 0
elif isinstance(value, str):
return 1
elif isinstance(value, bytes):
return 2
else:
return 3 # NoneType
def __init__(self, sequence=None, *, name=None):
if name is None or sequence is not None:
sequence = sequence or ()
_formats = [
self._types_mapping[type(item)]
if not isinstance(item, (str, bytes))
else self._types_mapping[type(item)] % (
self._alignment * (len(item) // self._alignment + 1),
)
for item in sequence
]
self._list_len = len(_formats)
assert sum(len(fmt) <= 8 for fmt in _formats) == self._list_len
offset = 0
# The offsets of each list element into the shared memory's
# data area (0 meaning the start of the data area, not the start
# of the shared memory area).
self._allocated_offsets = [0]
for fmt in _formats:
offset += self._alignment if fmt[-1] != "s" else int(fmt[:-1])
self._allocated_offsets.append(offset)
_recreation_codes = [
self._extract_recreation_code(item) for item in sequence
]
requested_size = struct.calcsize(
"q" + self._format_size_metainfo +
"".join(_formats) +
self._format_packing_metainfo +
self._format_back_transform_codes
)
self.shm = SharedMemory(name, create=True, size=requested_size)
else:
self.shm = SharedMemory(name)
if sequence is not None:
_enc = _encoding
struct.pack_into(
"q" + self._format_size_metainfo,
self.shm.buf,
0,
self._list_len,
*(self._allocated_offsets)
)
struct.pack_into(
"".join(_formats),
self.shm.buf,
self._offset_data_start,
*(v.encode(_enc) if isinstance(v, str) else v for v in sequence)
)
struct.pack_into(
self._format_packing_metainfo,
self.shm.buf,
self._offset_packing_formats,
*(v.encode(_enc) for v in _formats)
)
struct.pack_into(
self._format_back_transform_codes,
self.shm.buf,
self._offset_back_transform_codes,
*(_recreation_codes)
)
else:
self._list_len = len(self) # Obtains size from offset 0 in buffer.
self._allocated_offsets = list(
struct.unpack_from(
self._format_size_metainfo,
self.shm.buf,
1 * 8
)
)
def _get_packing_format(self, position):
"Gets the packing format for a single value stored in the list."
position = position if position >= 0 else position + self._list_len
if (position >= self._list_len) or (self._list_len < 0):
raise IndexError("Requested position out of range.")
v = struct.unpack_from(
"8s",
self.shm.buf,
self._offset_packing_formats + position * 8
)[0]
fmt = v.rstrip(b'\x00')
fmt_as_str = fmt.decode(_encoding)
return fmt_as_str
def _get_back_transform(self, position):
"Gets the back transformation function for a single value."
if (position >= self._list_len) or (self._list_len < 0):
raise IndexError("Requested position out of range.")
transform_code = struct.unpack_from(
"b",
self.shm.buf,
self._offset_back_transform_codes + position
)[0]
transform_function = self._back_transforms_mapping[transform_code]
return transform_function
def _set_packing_format_and_transform(self, position, fmt_as_str, value):
"""Sets the packing format and back transformation code for a
single value in the list at the specified position."""
if (position >= self._list_len) or (self._list_len < 0):
raise IndexError("Requested position out of range.")
struct.pack_into(
"8s",
self.shm.buf,
self._offset_packing_formats + position * 8,
fmt_as_str.encode(_encoding)
)
transform_code = self._extract_recreation_code(value)
struct.pack_into(
"b",
self.shm.buf,
self._offset_back_transform_codes + position,
transform_code
)
def __getitem__(self, position):
position = position if position >= 0 else position + self._list_len
try:
offset = self._offset_data_start + self._allocated_offsets[position]
(v,) = struct.unpack_from(
self._get_packing_format(position),
self.shm.buf,
offset
)
except IndexError:
raise IndexError("index out of range")
back_transform = self._get_back_transform(position)
v = back_transform(v)
return v
def __setitem__(self, position, value):
position = position if position >= 0 else position + self._list_len
try:
item_offset = self._allocated_offsets[position]
offset = self._offset_data_start + item_offset
current_format = self._get_packing_format(position)
except IndexError:
raise IndexError("assignment index out of range")
if not isinstance(value, (str, bytes)):
new_format = self._types_mapping[type(value)]
encoded_value = value
else:
allocated_length = self._allocated_offsets[position + 1] - item_offset
encoded_value = (value.encode(_encoding)
if isinstance(value, str) else value)
if len(encoded_value) > allocated_length:
raise ValueError("bytes/str item exceeds available storage")
if current_format[-1] == "s":
new_format = current_format
else:
new_format = self._types_mapping[str] % (
allocated_length,
)
self._set_packing_format_and_transform(
position,
new_format,
value
)
struct.pack_into(new_format, self.shm.buf, offset, encoded_value)
def __reduce__(self):
return partial(self.__class__, name=self.shm.name), ()
def __len__(self):
return struct.unpack_from("q", self.shm.buf, 0)[0]
def __repr__(self):
return f'{self.__class__.__name__}({list(self)}, name={self.shm.name!r})'
@property
def format(self):
"The struct packing format used by all currently stored items."
return "".join(
self._get_packing_format(i) for i in range(self._list_len)
)
@property
def _format_size_metainfo(self):
"The struct packing format used for the items' storage offsets."
return "q" * (self._list_len + 1)
@property
def _format_packing_metainfo(self):
"The struct packing format used for the items' packing formats."
return "8s" * self._list_len
@property
def _format_back_transform_codes(self):
"The struct packing format used for the items' back transforms."
return "b" * self._list_len
@property
def _offset_data_start(self):
# - 8 bytes for the list length
# - (N + 1) * 8 bytes for the element offsets
return (self._list_len + 2) * 8
@property
def _offset_packing_formats(self):
return self._offset_data_start + self._allocated_offsets[-1]
@property
def _offset_back_transform_codes(self):
return self._offset_packing_formats + self._list_len * 8
def count(self, value):
"L.count(value) -> integer -- return number of occurrences of value."
return sum(value == entry for entry in self)
def index(self, value):
"""L.index(value) -> integer -- return first index of value.
Raises ValueError if the value is not present."""
for position, entry in enumerate(self):
if value == entry:
return position
else:
raise ValueError(f"{value!r} not in this container")
__class_getitem__ = classmethod(types.GenericAlias)