import os
import atexit
-VERSION = "1.40"
+VERSION = "1.41"
exceptions = True
PI_BAD_SPI_BAUD =-141
PI_NOT_SPI_GPIO =-142
PI_BAD_EVENT_ID =-143
+PI_CMD_INTERRUPTED =-144
# pigpio error text
[PI_BAD_SPI_BAUD , "bad SPI baud rate, not 50-500k"],
[PI_NOT_SPI_GPIO , "no bit bang SPI in progress on GPIO"],
[PI_BAD_EVENT_ID , "bad event id"],
+ [PI_CMD_INTERRUPTED , "pigpio command interrupted"],
]
_except_a = "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n{}"
p1:= command parameter 1 (if applicable).
p2:= command parameter 2 (if applicable).
"""
- sl.l.acquire()
- try:
+ res = PI_CMD_INTERRUPTED
+ with sl.l:
sl.s.send(struct.pack('IIII', cmd, p1, p2, 0))
dummy, res = struct.unpack('12sI', sl.s.recv(_SOCK_CMD_LEN))
- finally:
- sl.l.release()
return res
def _pigpio_command_nolock(sl, cmd, p1, p2):
p1:= command parameter 1 (if applicable).
p2:= command parameter 2 (if applicable).
"""
+ res = PI_CMD_INTERRUPTED
sl.s.send(struct.pack('IIII', cmd, p1, p2, 0))
dummy, res = struct.unpack('12sI', sl.s.recv(_SOCK_CMD_LEN))
return res
ext.extend(_b(x))
else:
ext.extend(x)
- sl.l.acquire()
- try:
+ res = PI_CMD_INTERRUPTED
+ with sl.l:
sl.s.sendall(ext)
dummy, res = struct.unpack('12sI', sl.s.recv(_SOCK_CMD_LEN))
- finally:
- sl.l.release()
return res
def _pigpio_command_ext_nolock(sl, cmd, p1, p2, p3, extents):
p3:= total size in bytes of following extents
extents:= additional data blocks
"""
+ res = PI_CMD_INTERRUPTED
ext = bytearray(struct.pack('IIII', cmd, p1, p2, p3))
for x in extents:
if type(x) == type(""):
# process read failure
...
"""
- self.sl.l.acquire()
- try:
+ bytes = PI_CMD_INTERRUPTED
+ rdata = ""
+ with self.sl.l:
bytes = u2i(_pigpio_command_nolock(
self.sl, _PI_CMD_I2CRK, handle, reg))
if bytes > 0:
- data = self._rxbuf(bytes)
- else:
- data = ""
- finally:
- self.sl.l.release()
- return bytes, data
+ rdata = self._rxbuf(bytes)
+ return bytes, rdata
def i2c_block_process_call(self, handle, reg, data):
"""
## extension ##
# s len data bytes
- self.sl.l.acquire()
- try:
+ bytes = PI_CMD_INTERRUPTED
+ rdata = ""
+ with self.sl.l:
bytes = u2i(_pigpio_command_ext_nolock(
self.sl, _PI_CMD_I2CPK, handle, reg, len(data), [data]))
if bytes > 0:
- data = self._rxbuf(bytes)
- else:
- data = ""
- finally:
- self.sl.l.release()
- return bytes, data
+ rdata = self._rxbuf(bytes)
+ return bytes, rdata
def i2c_write_i2c_block_data(self, handle, reg, data):
"""
# I count
extents = [struct.pack("I", count)]
- self.sl.l.acquire()
- try:
+ bytes = PI_CMD_INTERRUPTED
+ rdata = ""
+ with self.sl.l:
bytes = u2i(_pigpio_command_ext_nolock(
- self.sl, _PI_CMD_I2CRI, handle, reg, 4, extentse))
+ self.sl, _PI_CMD_I2CRI, handle, reg, 4, extents))
if bytes > 0:
- data = self._rxbuf(bytes)
- else:
- data = ""
- finally:
- self.sl.l.release()
- return bytes, data
+ rdata = self._rxbuf(bytes)
+ return bytes, rdata
def i2c_read_device(self, handle, count):
"""
(count, data) = pi.i2c_read_device(h, 12)
...
"""
- self.sl.l.acquire()
- try:
+ bytes = PI_CMD_INTERRUPTED
+ rdata = ""
+ with self.sl.l:
bytes = u2i(
_pigpio_command_nolock(self.sl, _PI_CMD_I2CRD, handle, count))
if bytes > 0:
- data = self._rxbuf(bytes)
- else:
- data = ""
- finally:
- self.sl.l.release()
- return bytes, data
+ rdata = self._rxbuf(bytes)
+ return bytes, rdata
def i2c_write_device(self, handle, data):
"""
## extension ##
# s len data bytes
- self.sl.l.acquire()
- try:
+ bytes = PI_CMD_INTERRUPTED
+ rdata = ""
+ with self.sl.l:
bytes = u2i(_pigpio_command_ext_nolock(
self.sl, _PI_CMD_I2CZ, handle, 0, len(data), [data]))
if bytes > 0:
- data = self._rxbuf(bytes)
- else:
- data = ""
- finally:
- self.sl.l.release()
- return bytes, data
+ rdata = self._rxbuf(bytes)
+ return bytes, rdata
def bb_spi_open(self, CS, MISO, MOSI, SCLK, baud=100000, spi_flags=0):
## extension ##
# s len data bytes
- self.sl.l.acquire()
- try:
+ bytes = PI_CMD_INTERRUPTED
+ rdata = ""
+ with self.sl.l:
bytes = u2i(_pigpio_command_ext_nolock(
self.sl, _PI_CMD_BSPIX, CS, 0, len(data), [data]))
if bytes > 0:
- data = self._rxbuf(bytes)
- else:
- data = ""
- finally:
- self.sl.l.release()
- return bytes, data
+ rdata = self._rxbuf(bytes)
+ return bytes, rdata
def bb_i2c_open(self, SDA, SCL, baud=100000):
## extension ##
# s len data bytes
- self.sl.l.acquire()
- try:
+ bytes = PI_CMD_INTERRUPTED
+ rdata = ""
+ with self.sl.l:
bytes = u2i(_pigpio_command_ext_nolock(
self.sl, _PI_CMD_BI2CZ, SDA, 0, len(data), [data]))
if bytes > 0:
- data = self._rxbuf(bytes)
- else:
- data = ""
- finally:
- self.sl.l.release()
- return bytes, data
+ rdata = self._rxbuf(bytes)
+ return bytes, rdata
def event_trigger(self, event):
"""
## extension ##
# s len data bytes
- self.sl.l.acquire()
- try:
+ status = PI_CMD_INTERRUPTED
+ bytes = 0
+ rdata = bytearray(b'')
+ with self.sl.l:
bytes = u2i(_pigpio_command_ext_nolock(
self.sl, _PI_CMD_BSCX, bsc_control, 0, len(data), [data]))
if bytes > 0:
rx = self._rxbuf(bytes)
status = struct.unpack('I', rx[0:4])[0]
bytes -= 4
- data = rx[4:]
+ rdata = rx[4:]
else:
status = bytes
bytes = 0
- data = bytearray(b'')
- finally:
- self.sl.l.release()
- return status, bytes, data
+ return status, bytes, rdata
def bsc_i2c(self, i2c_address, data=[]):
"""
# error path
...
"""
- self.sl.l.acquire()
- try:
+ bytes = PI_CMD_INTERRUPTED
+ rdata = ""
+ with self.sl.l:
bytes = u2i(_pigpio_command_nolock(
self.sl, _PI_CMD_SPIR, handle, count))
if bytes > 0:
- data = self._rxbuf(bytes)
- else:
- data = ""
- finally:
- self.sl.l.release()
- return bytes, data
+ rdata = self._rxbuf(bytes)
+ return bytes, rdata
def spi_write(self, handle, data):
"""
## extension ##
# s len data bytes
- self.sl.l.acquire()
- try:
+ bytes = PI_CMD_INTERRUPTED
+ rdata = ""
+ with self.sl.l:
bytes = u2i(_pigpio_command_ext_nolock(
self.sl, _PI_CMD_SPIX, handle, 0, len(data), [data]))
if bytes > 0:
- data = self._rxbuf(bytes)
- else:
- data = ""
- finally:
- self.sl.l.release()
- return bytes, data
+ rdata = self._rxbuf(bytes)
+ return bytes, rdata
def serial_open(self, tty, baud, ser_flags=0):
"""
# process read data
...
"""
- self.sl.l.acquire()
- try:
+ bytes = PI_CMD_INTERRUPTED
+ rdata = ""
+ with self.sl.l:
bytes = u2i(
_pigpio_command_nolock(self.sl, _PI_CMD_SERR, handle, count))
if bytes > 0:
- data = self._rxbuf(bytes)
- else:
- data = ""
- finally:
- self.sl.l.release()
- return bytes, data
+ rdata = self._rxbuf(bytes)
+ return bytes, rdata
def serial_write(self, handle, data):
"""
(s, pars) = pi.script_status(sid)
...
"""
- self.sl.l.acquire()
- try:
+ status = PI_CMD_INTERRUPTED
+ params = ()
+ with self.sl.l:
bytes = u2i(
_pigpio_command_nolock(self.sl, _PI_CMD_PROCP, script_id, 0))
if bytes > 0:
params = pars[1:]
else:
status = bytes
- params = ()
- finally:
- self.sl.l.release()
return status, params
def stop_script(self, script_id):
(count, data) = pi.bb_serial_read(4)
...
"""
- self.sl.l.acquire()
- try:
+ bytes = PI_CMD_INTERRUPTED
+ rdata = ""
+ with self.sl.l:
bytes = u2i(
_pigpio_command_nolock(self.sl, _PI_CMD_SLR, user_gpio, 10000))
if bytes > 0:
- data = self._rxbuf(bytes)
- else:
- data = ""
- finally:
- self.sl.l.release()
- return bytes, data
+ rdata = self._rxbuf(bytes)
+ return bytes, rdata
def bb_serial_read_close(self, user_gpio):
## extension ##
# s len argx bytes
- self.sl.l.acquire()
- try:
+ bytes = PI_CMD_INTERRUPTED
+ rdata = ""
+ with self.sl.l:
bytes = u2i(_pigpio_command_ext_nolock(
self.sl, _PI_CMD_CF2, arg1, retMax, len(argx), [argx]))
if bytes > 0:
- data = self._rxbuf(bytes)
- else:
- data = ""
- finally:
- self.sl.l.release()
- return bytes, data
+ rdata = self._rxbuf(bytes)
+ return bytes, rdata
def get_pad_strength(self, pad):
"""
# process read data
...
"""
- self.sl.l.acquire()
- try:
+ bytes = PI_CMD_INTERRUPTED
+ rdata = ""
+ with self.sl.l:
bytes = u2i(
_pigpio_command_nolock(self.sl, _PI_CMD_FR, handle, count))
if bytes > 0:
- data = self._rxbuf(bytes)
- else:
- data = ""
- finally:
- self.sl.l.release()
- return bytes, data
+ rdata = self._rxbuf(bytes)
+ return bytes, rdata
def file_write(self, handle, data):
"""
## extension ##
# s len data bytes
- self.sl.l.acquire()
- try:
+ bytes = PI_CMD_INTERRUPTED
+ rdata = ""
+ with self.sl.l:
bytes = u2i(_pigpio_command_ext_nolock(
self.sl, _PI_CMD_FL, 60000, 0, len(fpattern), [fpattern]))
if bytes > 0:
- data = self._rxbuf(bytes)
- else:
- data = ""
- finally:
- self.sl.l.release()
- return bytes, data
+ rdata = self._rxbuf(bytes)
+ return bytes, rdata
def shell(self, shellscr, pstring=""):
"""
def __init__(self,
host = os.getenv("PIGPIO_ADDR", 'localhost'),
- port = os.getenv("PIGPIO_PORT", 8888)):
+ port = os.getenv("PIGPIO_PORT", 8888),
+ show_errors = True):
"""
Grants access to a Pi's GPIO.
exception = 2
except error:
+ # assumed to be no handle available
exception = 3
else:
if self.sl.s is not None:
self.sl.s = None
- s = "Can't connect to pigpio at {}({})".format(host, str(port))
+ if show_errors:
- print(_except_a.format(s))
- if exception == 1:
- print(_except_1)
- elif exception == 2:
- print(_except_2)
- else:
- print(_except_3)
- print(_except_z)
+ s = "Can't connect to pigpio at {}({})".format(host, str(port))
+
+
+ print(_except_a.format(s))
+ if exception == 1:
+ print(_except_1)
+ elif exception == 2:
+ print(_except_2)
+ else:
+ print(_except_3)
+ print(_except_z)
def stop(self):
"""Release pigpio resources.
PI_BAD_SCRIPT_NAME = -140
PI_BAD_SPI_BAUD = -141
PI_NOT_SPI_GPIO = -142
- PI_BAD_EVENT_ID = -143
+ PI_BAD_EVENT_ID = -143
+ PI_CMD_INTERRUPTED = -144
. .
event:0-31
The name of a shell script. The script must exist
in /opt/pigpio/cgi and must be executable.
+ show_errors:
+ Controls the display of pigpio daemon connection failures.
+ The default of True prints the probable failure reasons to
+ standard output.
+
spi_*:
One of the spi_ functions.