12
12
from pathlib import Path
13
13
from typing import Union , Optional
14
14
import re
15
+ import warnings
15
16
import json
16
17
from collections import OrderedDict
17
18
from packaging .version import Version , parse
@@ -1262,12 +1263,14 @@ def read_openephys(
1262
1263
oe_version = parse (info_chain .find ("VERSION" ).text )
1263
1264
signal_chain = root .find ("SIGNALCHAIN" )
1264
1265
neuropix_pxi = None
1266
+ record_node = None
1265
1267
for processor in signal_chain :
1266
1268
if "PROCESSOR" == processor .tag :
1267
1269
name = processor .attrib ["name" ]
1268
1270
if "Neuropix-PXI" in name :
1269
1271
neuropix_pxi = processor
1270
- break
1272
+ if "Record Node" in name :
1273
+ record_node = processor
1271
1274
1272
1275
if neuropix_pxi is None :
1273
1276
if raise_error :
@@ -1468,8 +1471,10 @@ def read_openephys(
1468
1471
else :
1469
1472
# in case of a single probe, make sure it is consistent with optional
1470
1473
# stream_name, probe_name, or serial number
1474
+ available_probe_name = np_probes_info [0 ]['name' ]
1475
+ available_serial_number = np_probes_info [0 ]['serial_number' ]
1476
+
1471
1477
if stream_name :
1472
- available_probe_name = np_probes_info [0 ]['name' ]
1473
1478
if available_probe_name not in stream_name :
1474
1479
if raise_error :
1475
1480
raise Exception (
@@ -1478,7 +1483,6 @@ def read_openephys(
1478
1483
)
1479
1484
return None
1480
1485
if probe_name :
1481
- available_probe_name = np_probes_info [0 ]['name' ]
1482
1486
if probe_name != available_probe_name :
1483
1487
if raise_error :
1484
1488
raise Exception (
@@ -1487,7 +1491,6 @@ def read_openephys(
1487
1491
)
1488
1492
return None
1489
1493
if serial_number :
1490
- available_serial_number = np_probes_info [0 ]['serial_number' ]
1491
1494
if str (serial_number ) != available_serial_number :
1492
1495
if raise_error :
1493
1496
raise Exception (
@@ -1511,6 +1514,19 @@ def read_openephys(
1511
1514
contact_width = 12
1512
1515
shank_pitch = 250
1513
1516
1517
+ contact_ids = np_probe_info ['contact_ids' ] if np_probe_info ['contact_ids' ] is not None else None
1518
+
1519
+ # check if subset of channels
1520
+ chans_saved = get_saved_channel_indices_from_openephys_settings (settings_file , stream_name = stream_name )
1521
+
1522
+ # if a recording state is found, slice probe
1523
+ if chans_saved is not None :
1524
+ positions = positions [chans_saved ]
1525
+ if shank_ids is not None :
1526
+ shank_ids = np .array (shank_ids )[chans_saved ]
1527
+ if contact_ids is not None :
1528
+ contact_ids = np .array (contact_ids )[chans_saved ]
1529
+
1514
1530
probe = Probe (ndim = 2 , si_units = "um" )
1515
1531
probe .set_contacts (
1516
1532
positions = positions ,
@@ -1526,8 +1542,8 @@ def read_openephys(
1526
1542
probe_serial_number = np_probe .attrib ["probe_serial_number" ],
1527
1543
)
1528
1544
1529
- if np_probe_info [ ' contact_ids' ] is not None :
1530
- probe .set_contact_ids (np_probe_info [ ' contact_ids' ] )
1545
+ if contact_ids is not None :
1546
+ probe .set_contact_ids (contact_ids )
1531
1547
1532
1548
polygon = polygon_description ["default" ]
1533
1549
if shank_ids is None :
@@ -1547,6 +1563,69 @@ def read_openephys(
1547
1563
return probe
1548
1564
1549
1565
1566
+ def get_saved_channel_indices_from_openephys_settings (settings_file , stream_name ):
1567
+ """
1568
+ Returns an array with the subset of saved channels indices (if used)
1569
+
1570
+ Parameters
1571
+ ----------
1572
+ settings_file : str or Path
1573
+ The path to the settings file
1574
+ stream_name : str
1575
+ The stream name that contains the probe name
1576
+ For example, "Record Node 100#ProbeA-AP" will select the AP stream of ProbeA.
1577
+
1578
+ Returns
1579
+ -------
1580
+ chans_saved
1581
+ np.array of saved channel indices or None
1582
+ """
1583
+ # check if subset of channels
1584
+ ET = import_safely ("xml.etree.ElementTree" )
1585
+ # parse xml
1586
+ tree = ET .parse (str (settings_file ))
1587
+ root = tree .getroot ()
1588
+
1589
+ signal_chain = root .find ("SIGNALCHAIN" )
1590
+ record_node = None
1591
+ for processor in signal_chain :
1592
+ if "PROCESSOR" == processor .tag :
1593
+ name = processor .attrib ["name" ]
1594
+ if "Record Node" in name :
1595
+ record_node = processor
1596
+ break
1597
+ chans_saved = None
1598
+ if record_node is not None :
1599
+ custom_params = record_node .find ("CUSTOM_PARAMETERS" )
1600
+ if custom_params is not None :
1601
+ custom_streams = custom_params .findall ("STREAM" )
1602
+ custom_stream_names = [stream .attrib ["name" ] for stream in custom_streams ]
1603
+ if len (custom_streams ) > 0 :
1604
+ recording_states = [stream .attrib .get ("recording_state" , None ) for stream in custom_streams ]
1605
+ has_custom_states = False
1606
+ for rs in recording_states :
1607
+ if rs is not None and rs != "ALL" :
1608
+ has_custom_states = True
1609
+ if has_custom_states :
1610
+ if len (custom_streams ) > 1 :
1611
+ assert stream_name is not None , \
1612
+ (f"More than one stream found with custom parameters: { custom_stream_names } . "
1613
+ f"Use the `stream_name` argument to choose the correct stream" )
1614
+ possible_custom_streams = [stream for stream in custom_streams
1615
+ if stream .attrib ["name" ] in stream_name ]
1616
+ if len (possible_custom_streams ) > 1 :
1617
+ warnings .warn (f"More than one custom parameters associated to { stream_name } "
1618
+ f"found. Using fisrt one" )
1619
+ custom_stream = possible_custom_streams [0 ]
1620
+ else :
1621
+ custom_stream = custom_streams [0 ]
1622
+ recording_state = custom_stream .attrib .get ("recording_state" , None )
1623
+ if recording_state is not None :
1624
+ if recording_state != "ALL" :
1625
+ chans_saved = np .array ([chan for chan , r in enumerate (recording_state ) if int (r ) == 1 ])
1626
+ return chans_saved
1627
+
1628
+
1550
1629
def read_mearec (file : Union [str , Path ]) -> Probe :
1551
1630
"""
1552
1631
Read probe position, and contact shape from a MEArec file.
0 commit comments