Skip to content

Commit 0df0038

Browse files
committed
libmidi_io: backends: Add ALSA MIDI backend
Change-Id: I13212437fc19738c17dec763d581b1ea00a0ddf2
1 parent 817489c commit 0df0038

File tree

1 file changed

+137
-0
lines changed

1 file changed

+137
-0
lines changed
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
#
2+
# Copyright (C) 2022 Sebastiano Barezzi
3+
#
4+
# SPDX-License-Identifier: LGPL-3.0-or-later
5+
#
6+
"""MIDI I/O ALSA MIDI backend."""
7+
8+
from alsa_midi import MidiBytesEvent, PortCaps, PortType, SequencerClient
9+
from alsa_midi.mido_backend import _find_port
10+
from libmidi.types.message import message_from_bytes
11+
from libmidi.types.messages.common import BaseMessage
12+
from threading import Thread
13+
from weakref import WeakValueDictionary
14+
15+
from libmidi_io.types.device import Device
16+
from libmidi_io.types.port import BasePort
17+
18+
class _Client:
19+
instance: '_Client' = None
20+
21+
def __init__(self):
22+
name = "libmidi_io"
23+
self.ports: WeakValueDictionary[int, Port] = WeakValueDictionary()
24+
self.client = SequencerClient(name)
25+
self.closing = False
26+
self.in_thread = Thread(name="ALSA seq input", target=self._input_loop, daemon=True)
27+
self.in_thread.start()
28+
29+
def __del__(self):
30+
self.close()
31+
32+
def close(self):
33+
self.closing = True
34+
self.client.close()
35+
36+
@classmethod
37+
def get_instance(cls):
38+
if cls.instance is not None:
39+
return cls.instance
40+
cls.instance = cls()
41+
return cls.instance
42+
43+
def _input_loop(self):
44+
try:
45+
while not self.closing:
46+
event = self.client.event_input(timeout=1, prefer_bytes=True)
47+
if event is None:
48+
continue
49+
if not isinstance(event, MidiBytesEvent):
50+
continue
51+
52+
assert event.dest is not None
53+
54+
libmidi_io_port = self.ports.get(event.dest.port_id)
55+
if libmidi_io_port is None:
56+
continue
57+
if not libmidi_io_port.is_input():
58+
continue
59+
60+
libmidi_io_port._handle_input_bytes(event.midi_bytes)
61+
except Exception as e:
62+
print(f"Error in libmidi_io.backend.alsamidi input loop: {e}")
63+
64+
def get_devices():
65+
devices = []
66+
67+
client = _Client.get_instance().client
68+
69+
for port in client.list_ports():
70+
devices.append(Device(
71+
f"{port.client_name}:{port.name} {port.client_id}:{port.port_id}",
72+
Port, port.capability & PortCaps.READ,
73+
port.capability & PortCaps.WRITE
74+
))
75+
76+
return devices
77+
78+
class Port(BasePort):
79+
_last_num = 0
80+
_name_prefix = "inout"
81+
82+
def __init__(self,
83+
port_caps: PortCaps = PortCaps.READ | PortCaps.WRITE,
84+
port_type: PortType = PortType.MIDI_GENERIC,
85+
**kwargs):
86+
super().__init__(**kwargs)
87+
88+
self.port_caps = port_caps
89+
self.port_type = port_type
90+
91+
client = _Client.get_instance()
92+
93+
name = self._generate_alsa_port_name()
94+
95+
ports = client.client.list_ports()
96+
if not ports:
97+
raise IOError("no ports available")
98+
99+
self._dest_port = ports[0] if self.name is None else _find_port(ports, self.name)
100+
self._port = client.client.create_port(name, caps=self.port_caps, type=self.port_type)
101+
102+
if self._dest_port is not None:
103+
if self.is_input():
104+
self._port.connect_from(self._dest_port)
105+
if self.is_output():
106+
self._port.connect_to(self._dest_port)
107+
108+
client.ports[self._port.port_id] = self
109+
110+
def _close(self):
111+
if self._port is not None:
112+
self._port.close()
113+
self._port = None
114+
115+
def is_input(self):
116+
return self._dest_port.capability & PortCaps.READ
117+
118+
def is_output(self):
119+
return self._dest_port.capability & PortCaps.WRITE
120+
121+
def _receive(self, block: bool):
122+
return None
123+
124+
def _send(self, message: BaseMessage):
125+
client = _Client.get_instance().client
126+
event = MidiBytesEvent(message.to_bytes())
127+
client.event_output(event, port=self._port, dest=self._dest_port)
128+
client.drain_output()
129+
130+
@classmethod
131+
def _generate_alsa_port_name(cls) -> str:
132+
num = cls._last_num + 1
133+
cls._last_num = num
134+
return f"{cls._name_prefix}{num}"
135+
136+
def _handle_input_bytes(self, midi_bytes: bytes):
137+
self._messages.append(message_from_bytes(midi_bytes))

0 commit comments

Comments
 (0)