picossa module

Classes for communicating with a Pico device running a picod daemon.

The device must be physically connected via USB 2.0. The software connection is then established via pyserial and a device file under /dev/tty*.

Classes for serial communication

class gpiosvr.picossa.EdgeCallback(loop, pico, gpio, edge=0, callable_=None)

A wrapper for a callable that is invoked whenever specific GPIO edges occurs.

BOTH = 2
FALLING = 1
RISING = 0
__init__(loop, pico, gpio, edge=0, callable_=None)
Parameters:
  • loop (asyncio.EventLoop) – The asyncio event loop of the thread waiting for the callback.

  • pico (PicoSerialAdapter) – The Pico adapter to use for cancelling the callback.

  • gpio (int) – The GPIO number to monitor.

  • edge (int) – The type of edges to monitor, i.e. RISING, FALLING or BOTH.

  • callable_ (callable) – The function to call whenever the specified GPIO edges occur.

call(*args)

Invokes the callback function.

If a loop was provided in the constructor, the function is scheduled in the loop in a thread-safe way.

Parameters:

args (list) – The arguments to the callback function.

async cancel()

Cancels and deregisters the callback.

class gpiosvr.picossa.ResponseFuture(loop, requestType, channel)

Bases: Future

A synchronization primitive for notifying a Pico modem of response to a request.

Represents a thread-safe variant of asyncio.Future.

__init__(loop, requestType, channel)
Parameters:
  • loop (asyncio.EventLoop) – The asyncio event loop of the thread waiting for the response.

  • requestType (RequestType) – The request type for which the response is expected.

  • channel (int) – The channel of the picod daemon on which the response is expected. May be an int from 0 to PICOD_CHANNEL_COUNT - 1

set_exception(exception)

Marks the future done and sets an exception.

If a loop was provided in the constructor, the exception is set in a thread-safe way.

Raises:

InvalidStateError if the future is already done.

set_result(result)

Marks the future done and sets its result.

If a loop was provided in the constructor, the result is set in a thread-safe way.

Raises:

InvalidStateError if the future is already done.

class gpiosvr.picossa.PicoModem(device, loop, responseTimeout_ms=1500, serialReadBufferBytes=256, cancelCheckingInterval_ms=1500, msgBuffer=None)

Sends commands to a Pico via a serial USB connection, and receives responses and asynchronous edge callbacks.

__init__(device, loop, responseTimeout_ms=1500, serialReadBufferBytes=256, cancelCheckingInterval_ms=1500, msgBuffer=None)

Initializes a Pico modem along with a receiver thread of class PicoReceiverThread.

Parameters:
  • loop (asyncio.EventLoop) – The asyncio event loop of the thread waiting for responses and edge callbacks.

  • device (str) – The path to the Pico device file, e.g. /dev/ttyACM1.

  • responseTimeout_ms (int) – The maximum time span in milliseconds to wait for the response to a request. Corresponds to the sendRequest() method. Defaults to SERIAL_RESPONSE_TIMEOUT_DEFAULT_MS.

baud

The baud rate is set to fixed value as it is not honored anyway when communicating with a Pico device.

responseFutures

A fixed-length list for futures holding the responses to synchronous requests.

The list is initialized with a length of PICOD_CHANNEL_COUNT. It is used in a round-robin style. That is, if the next “free” slot in the list is responseFutures[1], it will hold the response to the next request being issued. The slot responseFutures[2] in turn will hold the response after the next response and so on, until the list is wrapped around to responseFutures[0]. This is to avoid race conditions when receiving synchronous responses out of sequence.

The indices correspond to the response channels of the picod deamon, e.g.. index 0 stands for response channel 0 of the picod deamon.

appendEdgeCallback(callback_)

Registers a callback for specific GPIO edges.

Parameters:

callback_ (EdgeCallback) – The callback to register.

removeEdgeCallback(callback_)

Deregisters a callback for specific GPIO edges.

Parameters:

callback_ (EdgeCallback) – The callback to deregister.

async sendRequest(requestType, requestData=bytearray(b''), reply=Expected.NOW, flush=True)

Sends a request of a pre-defined type to the Pico device and waits for the response synchronously if reply is NOW.

Returns:

A tuple of the processing status and the response (payload) from the picod daemon. If the request does not complete within responseTimeout_ms milliseconds, a TIMED_OUT status is returned.

Return type:

tuple(Status, bytes)

async getUsingProcessIds()

Determines the PIDs of processes currently accessing the Pico device file.

Uses the host system’s lsof executable which must be in the search path of the current Python interpreter.

class gpiosvr.picossa.PicoReceiverThread(picoSerial, edgeCallbacks=None, responseFutures=None, serialReadBufferBytes=256, cancelCheckingInterval_ms=1500, msgBuffer=None, excepthook=None)

Bases: Thread

Daemon thread receiving bytes from a Pico via its serial connection.

The data is communicated back via instances of one of two classes:

The thread executes its processBytes() method.

__init__(picoSerial, edgeCallbacks=None, responseFutures=None, serialReadBufferBytes=256, cancelCheckingInterval_ms=1500, msgBuffer=None, excepthook=None)
Parameters:
  • edgeCallbacks (list[EdgeCallback] | tuple[EdgeCallback]) – A sequence of callbacks for specific GPIO edges. The callbacks are processed in sequence, i.e. edgeCallbacks[0] is processed first and edgeCallbacks[-1] last.

  • responseFutures (list[ResponseFuture] | tuple[ResponseFuture]) – A sequence of futures for responses to requests.

  • serialReadBufferBytes (int, optional) – Maximum number of bytes read at once when the serial connection has signalled new data. Defaults to SERIAL_READ_BUFFER_BYTES_DEFAULT.

  • cancelCheckingInterval_ms (int) – When waiting for data from the serial connection, the interval in milliseconds in which a stop request is checked. In other terms, the maximum delay after which a call to stop() will take effect. Defaults to CANCEL_CHECKING_INTERVAL_MS_DEFAULT.

  • excepthook (callable) – A callable to invoke when an exception has occurred in processBytes(). The callable needs to follow the signature of threading.excepthook.

processBytes()

Executes a loop that continously processes bytes received from the serial connection.

The loop is terminated by calling stop().

stop()

Stops the thread gracefully if it is already running.

class gpiosvr.picossa.PicoSerialAdapter(taskMonitor, device='/dev/ttyACM0', responseTimeout_ms=1500, msgBuffer=None)

Bases: PicodApi, PigpioMinimalApi, PigpioCallbackApi

Main class for establishing and operating a serial connection with a Pico device.

Methods starting with GPIO_ (uppercase) operate on multiple GPIOs. They are adapted from picod .

__init__(taskMonitor, device='/dev/ttyACM0', responseTimeout_ms=1500, msgBuffer=None)

Initializes a Pico adapter backed by a Pico modem.

Parameters:
  • device (str) – The path to the Pico device file, e.g. /dev/ttyACM1.

  • responseTimeout_ms – See responseTimeout_ms

async static fromDiscovery(targetUid, taskMonitor, msgBuffer=None, maxRetryCount=10, retryInterval_ms=500)

Convenience method for creating a Pico adapter from a discovered Pico device.

Parameters:
  • targetUid (int | str, optional) – UID of the Pico to connect to. See uid(). If not specified, the first unconnected /dev/ttyACM* device will be taken.

  • maxRetryCount (int) – The maximum number of retries for connecting to a specific device, if the connection fails at the first attempt. If the device does not show up in the device tree after this maximum number of retries, the next possible device is checked. Useful for initialization scenarios when it is indetermined whether the device or the fromDiscovery() method will be started first.

  • retryInterval_ms (int) – The wait period for one retry cycle in milliseconds.

Returns:

A Pico adapter discovered from the given values.

Return type:

PicoSerialAdapter

static gpioToBitmask(gpio)

Helper method for converting a sequence of GPIO numbers to a bit mask for communicating with the picod daemon.

Parameters:

gpio (int | str | list[int] | tuple[int]) – One or more GPIO numbers to combine in one bit mask.

Returns:

A bit mask representing all the given GPIO numbers.

Return type:

int

static gpioToGpioPullBitmask(gpio, pull: GpioPull)

Helper method for creating a bit mask specifying the pull values for a sequence of GPIOs. Needed for communicating with the picod daemon.

Parameters:
  • gpio (int | str | list[int] | tuple[int]) – One or more GPIO numbers to incorporate in the bit mask.

  • pull (GpioPull) – The pull value for all the given GPIOs.

Returns:

A bit mask representing all the given GPIO numbers with the given pull value.

Return type:

int

stop()

A non-async variant of close() for compatibility with PigpioMinimalApi.

async GPIO_get_functions(reply=Expected.NOW, flush=True)

Returns the functions of all GPIOs.

Returns:

A tuple of status, function bit mask and a set of unique functions. In the bit mask, the function for GPIO x is indicated by bits 4x+3, 4x+2, 4x+1 and 4x.

Return type:

tuple(Status, int, set[GpioFunction])

async cancelCallback(callback_)

Cancels and deregisters a previously registered edge callback.

Parameters:

callback_ (EdgeCallback) – The callback to remove.

async openInput(gpio)

Opens one or more GPIOs as inputs on the picod daemon and marks them as opened in this Pico adapter.

Performs a de-duplication, i.e. if one GPIO number is given more than once it is applied only once.

Parameters:

gpio (int | str | list[int] | tuple[int]) – The GPIO number(s) to open as inputs.

async closeInput(gpio)

Closes one or more GPIOs as inputs on the picod daemon and unmarks them as opened in this Pico adapter.

Parameters:

gpio (int | str | list[int] | tuple[int]) – The GPIO number(s) to close.

async setDirection(gpio, direction, levelOrGpioPull=0)

Batch operation for setting one or more GPIOs as input/output GPIOs with a specific initial level or pull.

Parameters:
  • gpio (int | str | list[int] | tuple[int]) – The GPIO number(s) to set.

  • direction (int | GpioDirection) – The GPIO direction, i.e. input or output.

  • levelOrGpioPull (GpioPull | GpioLevel) – The pull to set if direction == GpioDirection.ip or the level to set if direction == GpioDirection.op.

async close()

Closes this Pico adapter by closing all opened GPIOs and by stopping the backing Pico modem.

Constants for the picod deamon

The following constants are used in C structs for the serial communication with a Pico device via the picod daemon running on that device.

class gpiosvr.picossa.MessageId(value)

Constants for message headers and types.

class gpiosvr.picossa.GpioFunction(value)

int values representing the functions of specific GPIOs.

class gpiosvr.picossa.RequestType(value)

int values representing the available request types.

class gpiosvr.picossa.Expected(value)

int values for the expected response (timing) when issuing a request.

class gpiosvr.picossa.Status(value)

int values representing the processing status of a request.

class gpiosvr.picossa.Uart(value)

Constants for the initialization of UART channels.

class gpiosvr.picossa.Pwm(value)

Constants for the initialization of pulse width modulation (PWM).

Internal constants and defaults

gpiosvr.picossa.PICOD_CHANNEL_COUNT = 4
gpiosvr.picossa.SERIAL_RESPONSE_TIMEOUT_DEFAULT_MS = 1500
gpiosvr.picossa.SERIAL_RESPONSE_CHECKING_INTERVAL_μS_DEFAULT = 30
gpiosvr.picossa.SERIAL_READ_BUFFER_BYTES_DEFAULT = 256
gpiosvr.picossa.CANCEL_CHECKING_INTERVAL_MS_DEFAULT = 1500