Source code for pyo.lib.listener

from ._core import *
import time
import threading


[docs]class MidiListener(threading.Thread): """ Self-contained midi listener thread. This object allows to setup a Midi server that is independent of the audio server (mainly to be able to receive Midi data even when the audio server is stopped). Although it runs in a separated thread, the same device can't be used by this object and the audio server at the same time. It is adviced to call the deactivateMidi() method on the audio server to avoid conflicts. :Parent: threading.Thread :Args: function: Python function (can't be a list) Function that will be called when a new midi event is available. This function is called with the incoming midi data as arguments. The signature of the function must be: def myfunc(status, data1, data2) mididev: int or list of ints, optional Sets the midi input device (see `pm_list_devices()` for the available devices). The default, -1, means the system default device. A number greater than the highest portmidi device index will open all available input devices. Specific devices can be set with a list of integers. reportdevice: boolean, optional If True, the device ID will be reported as a fourth argument to the callback. The signature will then be: def myfunc(status, data1, data2, id) Available at initialization only. Defaults to False. .. note:: This object is available only if pyo is built with portmidi support (see withPortmidi function). >>> s = Server() >>> s.deactivateMidi() >>> s.boot() >>> def midicall(status, data1, data2): ... print(status, data1, data2) >>> listen = MidiListener(midicall, 5) >>> listen.start() """ def __init__(self, function, mididev=-1, reportdevice=False): threading.Thread.__init__(self) self.daemon = True self._function = WeakMethod(function) self._mididev = mididev if type(mididev) is not list: mididev = [mididev] self._reportdevice = reportdevice self._listener = MidiListener_base(self._function, mididev, self._reportdevice)
[docs] def run(self): """ Starts the process. The thread runs as daemon, so no need to stop it. """ self._listener.play() while True: try: time.sleep(0.001) except: pass
[docs] def stop(self): """ Stops the listener and properly close the midi ports. """ self._listener.stop()
[docs] def getDeviceInfos(self): """ Returns infos about connected midi devices. This method returns a list of dictionaries, one per device. Dictionary format is: {"id": device_id (int), "name": device_name (str), "interface": interface (str)} """ infos = self._listener.getDeviceInfos() if infos: lst = [] for info in infos: dct = {} items = info.split(", ") for item in items: isplit = item.split(": ") dct[isplit[0]] = isplit[1] dct["id"] = int(dct["id"]) lst.append(dct) return lst return []
[docs]class MidiDispatcher(threading.Thread): """ Self-contained midi dispatcher thread. This object allows to setup a Midi server that is independent of the audio server (mainly to be able to send Midi data even when the audio server is stopped). Although it runs in a separated thread, the same device can't be used by this object and the audio server at the same time. It is adviced to call the deactivateMidi() method on the audio server to avoid conflicts. Use the `send` method to send midi event to connected devices. Use the `sendx` method to send sysex event to connected devices. :Parent: threading.Thread :Args: mididev: int or list of ints, optional Sets the midi output device (see `pm_list_devices()` for the available devices). The default, -1, means the system default device. A number greater than the highest portmidi device index will open all available input devices. Specific devices can be set with a list of integers. .. note:: This object is available only if pyo is built with portmidi support (see withPortmidi function). >>> s = Server() >>> s.deactivateMidi() >>> s.boot() >>> dispatch = MidiDispatcher(5) >>> dispatch.start() >>> dispatch.send(144, 60, 127) """ def __init__(self, mididev=-1): threading.Thread.__init__(self) self.daemon = True self._mididev = mididev if type(mididev) is not list: mididev = [mididev] self._dispatcher = MidiDispatcher_base(mididev)
[docs] def run(self): """ Starts the process. The thread runs as daemon, so no need to stop it. """ self._dispatcher.play() while True: try: time.sleep(0.001) except: pass
[docs] def send(self, status, data1, data2=0, timestamp=0, device=-1): """ Send a MIDI message to the selected midi output device. Arguments can be list of values to generate multiple events in one call. :Args: status: int Status byte. data1: int First data byte. data2: int, optional Second data byte. Defaults to 0. timestamp: int, optional The delay time, in milliseconds, before the note is sent on the portmidi stream. A value of 0 means to play the note now. Defaults to 0. device: int, optional The index of the device to which the message will be sent. The default (-1) means all devices. See `getDeviceInfos()` to retrieve device indexes. """ status, data1, data2, timestamp, device, lmax = convertArgsToLists(status, data1, data2, timestamp, device) [ self._dispatcher.send(wrap(status, i), wrap(data1, i), wrap(data2, i), wrap(timestamp, i), wrap(device, i)) for i in range(lmax) ]
[docs] def sendx(self, msg, timestamp=0, device=-1): """ Send a MIDI system exclusive message to the selected midi output device. Arguments can be list of values to generate multiple events in one call. :Args: msg: str A valid system exclusive message as a string. The first byte must be 0xf0 and the last one must be 0xf7. timestamp: int, optional The delay time, in milliseconds, before the note is sent on the portmidi stream. A value of 0 means to play the note now. Defaults to 0. device: int, optional The index of the device to which the message will be sent. The default (-1) means all devices. See `getDeviceInfos()` to retrieve device indexes. """ msg, timestamp, device, lmax = convertArgsToLists(msg, timestamp, device) [self._dispatcher.sendx(wrap(msg, i), wrap(timestamp, i), wrap(device, i)) for i in range(lmax)]
[docs] def getDeviceInfos(self): """ Returns infos about connected midi devices. This method returns a list of dictionaries, one per device. Dictionary format is: {"id": device_id (int), "name": device_name (str), "interface": interface (str)} """ infos = self._dispatcher.getDeviceInfos() if infos: lst = [] for info in infos: dct = {} items = info.split(", ") for item in items: isplit = item.split(": ") dct[isplit[0]] = isplit[1] dct["id"] = int(dct["id"]) lst.append(dct) return lst return []
OscListenerLock = threading.Lock()
[docs]class OscListener(threading.Thread): """ Self-contained OSC listener thread. This object allows to setup an OSC server that is independent of the audio server (mainly to be able to receive OSC data even when the audio server is stopped). :Parent: threadind.Thread :Args: function: Python function (can't be a list) Function that will be called when a new OSC event is available. This function is called with the incoming address and values as arguments. The signature of the function must be:: def myfunc(address, *args) port: int, optional The OSC port on which the values are received. Defaults to 9000. >>> s = Server().boot() >>> def call(address, *args): ... print(address, args) >>> listen = OscListener(call, 9901) >>> listen.start() """ def __init__(self, function, port=9000): threading.Thread.__init__(self) self.daemon = True self._function = WeakMethod(function) self._port = port self._listener = OscListener_base(self._oscrecv, self._port) def _oscrecv(self, address, *args): with OscListenerLock: self._function(address, *args)
[docs] def run(self): """ Starts the process. The thread runs as daemon, so no need to stop it. """ while True: self._listener.get() try: time.sleep(0.001) except: pass