import atexit
import codecs
-VERSION = "1.9"
+VERSION = "1.10"
exceptions = True
def _b(x):
return codecs.latin_1_encode(x)[0]
-def _u2i(number):
- """Converts a 32 bit unsigned number to signed."""
+def u2i(number):
+ """
+ Converts a 32 bit unsigned number to signed.
+
+ number:= an unsigned 32 bit number
+
+ ...
+ print(u2i(4294967272))
+ -24
+ print(u2i(37))
+ 37
+ ...
+ """
mask = (2 ** 32) - 1
if number & (1 << 31):
v = number | ~mask
else:
v = number & mask
- if v >= 0:
- return v;
- else:
+ return v
+
+def _u2i(number):
+ """
+ Converts a 32 bit unsigned number to signed. If the number
+ is negative it indicates an error. On error a pigpio
+ exception will be raised if exceptions is True.
+ """
+ v = u2i(number)
+ if v < 0:
if exceptions:
raise error(error_text(v))
- else:
- return v
+ return v
-def _pigpio_command(sl, cmd, p1, p2):
+def _pigpio_command(sl, cmd, p1, p2, rl=True):
"""
Runs a pigpio socket command.
sl.l.acquire()
sl.s.send(struct.pack('IIII', cmd, p1, p2, 0))
dummy, res = struct.unpack('12sI', sl.s.recv(16))
- sl.l.release()
+ if rl: sl.l.release()
return res
-def _pigpio_command_ext(sl, cmd, p1, p2, p3, extents):
+def _pigpio_command_ext(sl, cmd, p1, p2, p3, extents, rl=True):
"""
Runs an extended pigpio socket command.
sl.l.acquire()
sl.s.sendall(ext)
dummy, res = struct.unpack('12sI', sl.s.recv(16))
- sl.l.release()
+ if rl: sl.l.release()
return res
class _callback_ADT:
(count, data) = pi.i2c_read_device(h, 12)
...
"""
- bytes = _u2i(_pigpio_command(self.sl, _PI_CMD_I2CRD, handle, count))
+ # Don't raise exception. Must release lock.
+ bytes = u2i(
+ _pigpio_command(self.sl, _PI_CMD_I2CRD, handle, count, False))
if bytes > 0:
- return bytes, self._rxbuf(bytes)
- return bytes, ""
+ data = self._rxbuf(bytes)
+ else:
+ data = ""
+ self.sl.l.release()
+ return bytes, data
def i2c_write_device(self, handle, data):
"""
# process read failure
...
"""
- bytes = _u2i(_pigpio_command(self.sl, _PI_CMD_I2CRK, handle, reg))
+ # Don't raise exception. Must release lock.
+ bytes = u2i(_pigpio_command(self.sl, _PI_CMD_I2CRK, handle, reg, False))
if bytes > 0:
- return bytes, self._rxbuf(bytes)
- return bytes, ""
+ data = self._rxbuf(bytes)
+ else:
+ data = ""
+ self.sl.l.release()
+ return bytes, data
def i2c_block_process_call(self, handle, reg, data):
"""
# I p3 len
## extension ##
# s len data bytes
- bytes = _u2i(_pigpio_command_ext(
- self.sl, _PI_CMD_I2CPK, handle, reg, len(data), [data]))
+
+ # Don't raise exception. Must release lock.
+ bytes = u2i(_pigpio_command_ext(
+ self.sl, _PI_CMD_I2CPK, handle, reg, len(data), [data]), False)
if bytes > 0:
- return bytes, self._rxbuf(bytes)
- return bytes, ""
+ data = self._rxbuf(bytes)
+ else:
+ data = ""
+ self.sl.l.release()
+ return bytes, data
def i2c_write_i2c_block_data(self, handle, reg, data):
"""
## extension ##
# I count
extents = [struct.pack("I", count)]
- bytes = _u2i(_pigpio_command_ext(
- self.sl, _PI_CMD_I2CRI, handle, reg, 4, extents))
+
+ # Don't raise exception. Must release lock.
+ bytes = u2i(_pigpio_command_ext(
+ self.sl, _PI_CMD_I2CRI, handle, reg, 4, extents), False)
if bytes > 0:
- return bytes, self._rxbuf(bytes)
- return bytes, ""
+ data = self._rxbuf(bytes)
+ else:
+ data = ""
+ self.sl.l.release()
+ return bytes, data
def spi_open(self, spi_channel, spi_baud, spi_flags=0):
"""
# error path
...
"""
- bytes = _u2i(_pigpio_command(
- self.sl, _PI_CMD_SPIR, handle, count))
+ # Don't raise exception. Must release lock.
+ bytes = u2i(_pigpio_command(
+ self.sl, _PI_CMD_SPIR, handle, count, False))
if bytes > 0:
- return bytes, self._rxbuf(bytes)
- return bytes, ""
+ data = self._rxbuf(bytes)
+ else:
+ data = ""
+ self.sl.l.release()
+ return bytes, data
def spi_write(self, handle, data):
"""
# I p3 len
## extension ##
# s len data bytes
- bytes = _u2i(_pigpio_command_ext(
- self.sl, _PI_CMD_SPIX, handle, 0, len(data), [data]))
+
+ # Don't raise exception. Must release lock.
+ bytes = u2i(_pigpio_command_ext(
+ self.sl, _PI_CMD_SPIX, handle, 0, len(data), [data]), False)
if bytes > 0:
- return bytes, self._rxbuf(bytes)
- return bytes, ""
+ data = self._rxbuf(bytes)
+ else:
+ data = ""
+ self.sl.l.release()
+ return bytes, data
def serial_open(self, tty, ser_baud, ser_flags=0):
"""
# process read data
...
"""
- bytes = _u2i(_pigpio_command(self.sl, _PI_CMD_SERR, handle, count))
-
+ # Don't raise exception. Must release lock.
+ bytes = u2i(
+ _pigpio_command(self.sl, _PI_CMD_SERR, handle, count, False))
if bytes > 0:
- return bytes, self._rxbuf(bytes)
- return bytes, ""
+ data = self._rxbuf(bytes)
+ else:
+ data = ""
+ self.sl.l.release()
+ return bytes, data
def serial_write(self, handle, data):
"""
level for pulse_len microseconds and then reset to not level.
user_gpio:= 0-31
- pulse_len:= 1-50
+ pulse_len:= 1-100
level:= 0-1
...
(s, pars) = pi.script_status(sid)
...
"""
- status = _u2i(_pigpio_command(self.sl, _PI_CMD_PROCP, script_id, 0))
- if status > 0:
- params = struct.unpack('I10i', self.sl.s.recv(44))
- return params[0], params[1:]
- return status, ()
+ # Don't raise exception. Must release lock.
+ bytes = u2i(
+ _pigpio_command(self.sl, _PI_CMD_PROCP, script_id, 0, False))
+ if bytes > 0:
+ data = self._rxbuf(bytes)
+ pars = struct.unpack('11i', data)
+ status = pars[0]
+ params = pars[1:]
+ else:
+ status = bytes
+ params = ()
+ self.sl.l.release()
+ return status, params
def stop_script(self, script_id):
"""
(count, data) = pi.bb_serial_read(4)
...
"""
- bytes = _u2i(_pigpio_command(self.sl, _PI_CMD_SLR, user_gpio, 10000))
+ # Don't raise exception. Must release lock.
+ bytes = u2i(
+ _pigpio_command(self.sl, _PI_CMD_SLR, user_gpio, 10000, False))
if bytes > 0:
- return bytes, self._rxbuf(bytes)
- return bytes, ""
+ data = self._rxbuf(bytes)
+ else:
+ data = ""
+ self.sl.l.release()
+ return bytes, data
def bb_serial_read_close(self, user_gpio):
PUD_OFF = 0
PUD_UP = 2
- pulse_len: 1-50
- A whole number.
+ pulse_len: 1-100
+ The length of the trigger pulse in microseconds.
pulses:
A list of class pulse objects defining the characteristics of a