-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathnode.py
1001 lines (853 loc) · 42.2 KB
/
node.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Objectdict Node class containting the object dictionary."""
#
# Copyright (C) 2022-2024 Svein Seldal, Laerdal Medical AS
# Copyright (C): Edouard TISSERANT, Francis DUPIN and Laurent BESSARD
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
# USA
from __future__ import annotations
import copy
import logging
from pathlib import Path
from typing import Any, Generator, Iterable, Iterator
# The following import needs care when importing node
from objdictgen import eds_utils, gen_cfile, jsonod, maps, nosis
from objdictgen.maps import OD, ODMapping, ODMappingList
from objdictgen.typing import (NodeProtocol, TIndexEntry, TODObj, TODSubObj,
TODValue, TParamEntry, TPath, TProfileMenu)
log = logging.getLogger('objdictgen')
# ------------------------------------------------------------------------------
# Definition of Node Object
# ------------------------------------------------------------------------------
class Node(NodeProtocol):
"""
A Object Dictionary representation of a CAN node.
"""
Name: str
"""Name of the node"""
Type: str
"""Type of the node. Should be 'slave' or 'master'"""
ID: int
"""Node ID"""
Description: str
"""Description of the node"""
Dictionary: dict[int, TODValue|list[TODValue]]
"""Object dictionary of the node. The key is the index and the value is the
literal value. For objects that have multiple subindexes, the object
is a list of values."""
ParamsDictionary: dict[int, TParamEntry|dict[int, TParamEntry]]
"""Dictionary of parameters for the node. The key is the index and the value
contains the parameter for the index object. It can be a dict of subindexes.
"""
# FIXME: The type definition on ParamsDictionary is not precisely accurate.
# When self.Dictionary is not a list, ParamsDictionary is TParamEntry.
# When self.Dictionary is a list, ParamsDictionary is a dict with
# int subindexes as keys and "TParamEntryN" (a type without callback) as
# values. The subindex dict also may contain the "callback" key.
Profile: ODMapping
"""Profile object dictionary mapping"""
DS302: ODMapping
"""DS-302 object dictionary mapping"""
UserMapping: ODMapping
"""Custom user object dictionary mapping"""
ProfileName: str
"""Name of the loaded profile. If no profile is loaded, it should be 'None'
"""
SpecificMenu: TProfileMenu
"""Specific menu for the profile"""
IndexOrder: list[int]
"""Order of the indexes in the object dictionary to preserve the order"""
DefaultStringSize: int = 10
"""Default string size for the node"""
def __init__(
self, name: str = "", type: str = "slave", id: int = 0,
description: str = "", profilename: str = "None",
profile: ODMapping | None = None, specificmenu: TProfileMenu | None = None,
):
self.Name: str = name
self.Type: str = type
self.ID: int = id
self.Description: str = description
self.ProfileName: str = profilename
self.Profile: ODMapping = profile or ODMapping()
self.SpecificMenu: TProfileMenu = specificmenu or []
self.Dictionary: dict[int, TODValue|list[TODValue]] = {}
self.ParamsDictionary: dict[int, TParamEntry|dict[int, TParamEntry]] = {}
self.DS302: ODMapping = ODMapping()
self.UserMapping: ODMapping = ODMapping()
self.IndexOrder: list[int] = []
# --------------------------------------------------------------------------
# Dunders
# --------------------------------------------------------------------------
def __iter__(self) -> Iterator[int]:
"""Iterate over all indexes in the dictionary"""
return iter(sorted(self.Dictionary))
def __setattr__(self, name: str, value: Any):
"""Ensure that that internal attrs are of the right datatype."""
if name in ("Profile", "DS302", "UserMapping"):
if not isinstance(value, ODMapping):
value = ODMapping(value)
super().__setattr__(name, value)
# --------------------------------------------------------------------------
# Legacy access methods
# --------------------------------------------------------------------------
def GetNodeName(self) -> str:
"""Get the name of the node"""
return self.Name
def GetNodeID(self) -> int:
"""Get the ID of the node"""
return self.ID
def GetNodeType(self) -> str:
"""Get the type of the node"""
return self.Type
def GetNodeDescription(self) -> str:
"""Get the description of the node"""
return self.Description
def GetDefaultStringSize(self) -> int:
"""Get the default string size"""
return self.DefaultStringSize
def GetIndexes(self) -> list[int]:
""" Return a sorted list of indexes in Object Dictionary """
return list(self)
# --------------------------------------------------------------------------
# Node Input/Output
# --------------------------------------------------------------------------
@staticmethod
def isXml(filepath: TPath) -> bool:
"""Check if the file is an XML file"""
with open(filepath, 'r', encoding="utf-8") as f:
header = f.read(5)
return header == "<?xml"
@staticmethod
def isEds(filepath: TPath) -> bool:
"""Check if the file is an EDS file"""
with open(filepath, 'r', encoding="utf-8") as f:
header = f.readline().rstrip()
return header == "[FileInfo]"
@staticmethod
def LoadFile(filepath: TPath, **kwargs) -> Node:
""" Open a file and create a new node """
if Node.isXml(filepath):
log.debug("Loading XML OD '%s'", filepath)
with open(filepath, "r", encoding="utf-8") as f:
return nosis.xmlload(f)
if Node.isEds(filepath):
log.debug("Loading EDS '%s'", filepath)
return eds_utils.generate_node(filepath)
log.debug("Loading JSON OD '%s'", filepath)
with open(filepath, "r", encoding="utf-8") as f:
return Node.LoadJson(f.read(), **kwargs)
@staticmethod
def LoadJson(contents: str, validate=True) -> Node:
""" Import a new Node from a JSON string """
return jsonod.generate_node(contents, validate=validate)
def DumpFile(self, filepath: TPath, filetype: str|None = "jsonc", **kwargs):
""" Save node into file """
# Attempt to determine the filetype from the filepath
if not filetype:
filetype = Path(filepath).suffix[1:]
if not filetype:
filetype = "jsonc"
if filetype == 'od':
log.debug("Writing XML OD '%s'", filepath)
with open(filepath, "w", encoding="utf-8") as f:
# Never generate an od with IndexOrder in it
nosis.xmldump(f, self, omit=('IndexOrder', ))
return
if filetype == 'eds':
log.debug("Writing EDS '%s'", filepath)
content = eds_utils.generate_eds_content(self, filepath)
with open(filepath, "w", encoding="utf-8") as f:
f.write(content)
return
if filetype in ('json', 'jsonc'):
log.debug("Writing JSON OD '%s'", filepath)
kw = kwargs.copy()
kw['jsonc'] = filetype == 'jsonc'
jdata = self.DumpJson(**kw)
with open(filepath, "w", encoding="utf-8") as f:
f.write(jdata)
return
if filetype == 'c':
log.debug("Writing C files '%s'", filepath)
# Convert filepath to str because it might be used with legacy code
gen_cfile.GenerateFile(str(filepath), self)
return
raise ValueError("Unknown file suffix, unable to write file")
def DumpJson(self, compact=False, sort=False, internal=False, validate=True, jsonc=True) -> str:
""" Dump the node into a JSON string """
return jsonod.generate_jsonc(
self, compact=compact, sort=sort, internal=internal, validate=validate, jsonc=jsonc
)
def asdict(self) -> dict[str, Any]:
""" Return the class data as a dict """
return copy.deepcopy(self.__dict__)
def copy(self) -> Node:
"""
Return a copy of the node
"""
return copy.deepcopy(self)
# --------------------------------------------------------------------------
# Node Informations Functions
# --------------------------------------------------------------------------
def GetMappings(self, userdefinedtoo: bool=True, withmapping=False) -> ODMappingList:
"""Return the different Mappings available for this node"""
mapping = ODMappingList([self.Profile, self.DS302])
if userdefinedtoo:
mapping.append(self.UserMapping)
if withmapping:
mapping.append(maps.MAPPING_DICTIONARY)
return mapping
def GetEntry(self, index: int, subindex: int|None = None, compute=True, aslist=False) -> list[TODValue]|TODValue:
"""
Returns the value of the entry specified by the index and subindex. If
subindex is None, it will return the value or the list of values of the
entire index. If aslist is True, it will always return a list.
"""
if index not in self.Dictionary:
raise KeyError(f"Index 0x{index:04x} does not exist")
dictval = self.Dictionary[index]
# Variables needed by the eval_value function
base = self.GetBaseIndexNumber(index)
nodeid = self.ID
if subindex is None:
if isinstance(dictval, list):
out: list[TODValue] = [len(dictval)]
out.extend(
maps.eval_value(value, base, nodeid, compute)
for value in dictval
)
return out # Type is list[TValue]
result = maps.eval_value(dictval, base, nodeid, compute)
# This option ensures that the function consistently returns a list
if aslist:
return [result] # Type is list[TValue]
return result # Type is TValue
if isinstance(dictval, list):
if subindex == 0:
return len(dictval) # Type is int
if 0 < subindex <= len(dictval):
# Type is TValue
return maps.eval_value(dictval[subindex - 1], base, nodeid, compute)
raise ValueError(f"Invalid subindex {subindex} for index 0x{index:04x}")
# Special case: If the dictionary value is not a list, subindex 0
# can be used to retrieve the entry.
if subindex == 0:
return maps.eval_value(dictval, base, nodeid, compute)
raise ValueError(f"Invalid subindex {subindex} for index 0x{index:04x} for a non-list entry")
def GetParamsEntry(self, index: int, subindex: int|None = None,
aslist: bool = False) -> TParamEntry|list[TParamEntry]:
"""
Returns the value of the entry asked. If the entry has the value "count", it
returns the number of subindex in the entry except the first.
"""
if index not in self.Dictionary:
raise KeyError(f"Index 0x{index:04x} does not exist")
dictval = self.Dictionary[index]
params = self.ParamsDictionary.get(index)
def _get_param(v: TParamEntry|None) -> TParamEntry:
params = maps.DEFAULT_PARAMS.copy()
if v is not None:
params.update(v)
return params
if subindex is None:
if isinstance(dictval, list):
# FIXME: An interesting difference beween GetParamsEntry() and GetEntry() is that
# the latter returns the number of subindexes in index 0, while the former does not
# FIXME: There is a programmed assumption here: It checks dictval for a
# list but it assumes then that param_value is a dict. This is not always the case.
params = params or {}
return [_get_param(params.get(i)) for i in range(len(dictval) + 1)] # type: ignore[call-overload]
# Dictionary value is not a list
result = _get_param(params) # type: ignore[arg-type]
# This option ensures that the function consistently returns a list
if aslist:
return [result]
return result
if isinstance(dictval, list):
if 0 <= subindex <= len(dictval):
params = params or {}
return _get_param(params.get(subindex)) # type: ignore[call-overload]
raise ValueError(f"Invalid subindex {subindex} for index 0x{index:04x}")
# Special case: If the dictionary value is not a list, subindex 0
# will fetch the parameter.
if subindex == 0:
return _get_param(params)# type: ignore[arg-type]
raise ValueError(f"Invalid subindex {subindex} for index 0x{index:04x}")
def GetIndexEntry(self, index: int, withbase: bool = False) -> TIndexEntry:
""" Return a full and raw representation of the index """
def _mapping_for_index(index: int) -> Generator[tuple[str, TODObj], None, None]:
for name, o in (
('profile', self.Profile),
('ds302', self.DS302),
('user', self.UserMapping),
('built-in', maps.MAPPING_DICTIONARY),
):
if index in o:
yield name, o[index]
objmaps = list(_mapping_for_index(index))
firstobj: TODObj = objmaps[0][1] if objmaps else {}
obj: TIndexEntry = {
"index": index,
"groups": list(n for n, _ in objmaps),
}
if firstobj: # Safe to assume False here is not just an empty ODObj
obj['object'] = firstobj
if index in self.Dictionary:
obj['dictionary'] = self.Dictionary[index]
if index in self.ParamsDictionary:
obj['params'] = self.ParamsDictionary[index]
baseindex = self.GetBaseIndex(index)
if index != baseindex:
obj['base'] = baseindex
_, baseobject = next(_mapping_for_index(baseindex))
obj['basestruct'] = baseobject["struct"]
if withbase:
# If "object" is not present, add the base object
obj.setdefault("object", baseobject)
# FIXME: Add the ability to evaluate the names and the values
# with the "compute" flag
# Ensure that the object is safe to mutate
return copy.deepcopy(obj)
def GetSubentryLength(self, index: int) -> int:
""" Return the length of the subindex """
val = self.Dictionary.get(index, [])
if not isinstance(val, list):
return 0
return len(val)
def GetBaseIndex(self, index: int) -> int:
""" Return the index number of the base object """
return self.GetMappings(withmapping=True).FindBaseIndex(index)
def GetBaseIndexNumber(self, index: int) -> int:
""" Return the index number from the base object """
return self.GetMappings(withmapping=True).FindBaseIndexNumber(index)
def GetCustomisedTypeValues(self, index: int) -> tuple[list[TODValue], int]:
"""Return the customization struct type from the index. It returns
a tuple containing the entry value and the int of the type of the object.
0 indicates numerical value, 1 indicates string value."""
values = self.GetEntry(index)
if not isinstance(values, list):
raise ValueError(f"Index 0x{index:04x} is not an entry with subobjects")
customisabletypes = self.GetCustomisableTypes()
# values[1] contains the object type index
return values, customisabletypes[values[1]][1] # type: ignore[index]
def GetEntryName(self, index: int, compute=True) -> str:
"""Return the entry name for the given index"""
return self.GetMappings(withmapping=True).FindEntryName(index, compute)
def GetEntryInfos(self, index: int, compute=True) -> TODObj:
"""Return the entry infos for the given index"""
# FIXME: Add flags. Add the ability to determine the mapping source
result = self.GetMappings(withmapping=True).FindEntryInfos(index, compute)
try:
# If present in built-in dictionary, use the built-in values
# and update with the user provided values
r301 = maps.MAPPING_DICTIONARY.FindEntryInfos(index, compute)
r301.update(result)
return r301
except ValueError:
pass
return result
def GetSubentryInfos(self, index: int, subindex: int, compute: bool = True) -> TODSubObj:
"""Return the subentry infos for the given index and subindex"""
# FIXME: Add flags. Add the ability to determine the mapping source
result = self.GetMappings(withmapping=True).FindSubentryInfos(index, subindex, compute)
# FIXME: This will alter objects in the mapping store. This is probably not intended
result["user_defined"] = index in self.UserMapping
try:
r301 = maps.MAPPING_DICTIONARY.FindSubentryInfos(index, subindex, compute)
r301.update(result)
return r301
except ValueError:
pass
return result
def GetTypeIndex(self, typename: str) -> int:
"""Return the type index for the given type name."""
return self.GetMappings(withmapping=True).FindTypeIndex(typename)
def GetTypeName(self, index: int) -> str:
"""Return the type name for the given type index."""
return self.GetMappings(withmapping=True).FindTypeName(index)
def GetTypeDefaultValue(self, index: int) -> TODValue:
"""Return the default value for the given type index."""
return self.GetMappings(withmapping=True).FindTypeDefaultValue(index)
def GetMapVariableList(self, compute=True) -> list[tuple[int, int, int, str]]:
"""Return a list of all objects and subobjects available for mapping into
pdos. Returns a list of tuples with the index, subindex, size and name of the object."""
return list(sorted(self.GetMappings(withmapping=True).FindMapVariableList(self, compute)))
def GetMandatoryIndexes(self) -> list[int]:
"""Return the mandatory indexes for the node."""
# FIXME: Old code listed MAPPING_DIRECTORY first, this is last. Important?
return self.GetMappings(withmapping=True).FindMandatoryIndexes()
def GetCustomisableTypes(self) -> dict[int, tuple[str, int]]:
""" Return the customisable types. It returns a dict by the index number.
The value is a tuple with the type name and the size of the type."""
return {
index: (self.GetTypeName(index), valuetype)
for index, valuetype in maps.CUSTOMISABLE_TYPES
}
def GetTypeList(self) -> list[str]:
"""Return a list of all object types available for the current node"""
# FIXME: Old code listed MAPPING_DIRECTORY first, this puts it last. Important?
return self.GetMappings(withmapping=True).FindTypeList()
@staticmethod
def GenerateMapName(name: str, index: int, subindex: int) -> str:
"""Return how a mapping object should be named in UI"""
return f"{name} (0x{index:04X})"
def GetMapValue(self, mapname: str) -> int:
"""Return the mapping value from the given printable name"""
if mapname == "None":
return 0
def _get_buffer_size(index: int, subindex: int, size: int, name: str) -> int:
try:
params: TParamEntry = self.ParamsDictionary[index][subindex] # type: ignore[literal-required]
bs = params["buffer_size"]
if bs <= 8:
return (index << 16) + (subindex << 8) + size * bs
raise ValueError(f"String size of '{name}' too big to fit in a PDO")
except KeyError:
raise ValueError(
"No string length found and default string size too big to fit in a PDO"
) from None
varlist = self.GetMapVariableList()
for index, subindex, size, name in varlist:
if mapname == self.GenerateMapName(name, index, subindex):
# array type, only look at subindex 1 in UserMapping
if self.UserMapping[index]["struct"] == OD.ARRAY:
if self.IsStringType(self.UserMapping[index]["values"][1]["type"]):
return _get_buffer_size(index, subindex, size, mapname)
else:
if self.IsStringType(self.UserMapping[index]["values"][subindex]["type"]):
return _get_buffer_size(index, subindex, size, mapname)
return (index << 16) + (subindex << 8) + size
raise ValueError(f"Mapping '{mapname}' not found")
@staticmethod
def GetMapIndex(value: int) -> tuple[int, int, int]:
"""Return the index, subindex, size from a map value"""
if value:
index = value >> 16
subindex = (value >> 8) % (1 << 8)
size = (value) % (1 << 8)
return index, subindex, size
return 0, 0, 0
def GetMapName(self, value: int) -> str:
"""Return the printable name for the given map value."""
index, subindex, _ = self.GetMapIndex(value)
if value:
result = self.GetSubentryInfos(index, subindex)
# FIXME: Removed a "if result" check here
return self.GenerateMapName(result["name"], index, subindex)
return "None"
def GetMapList(self) -> list[str]:
"""
Return the list of variables that can be mapped into pdos for the current node
"""
return ["None"] + [
self.GenerateMapName(name, index, subindex)
for index, subindex, size, name in self.GetMapVariableList()
]
def GetAllIndices(self, sort=False) -> list[int]:
""" Get a list of all indices. If node maintains a sort order,
it will be used. Otherwise if sort is False, the order
will be arbitrary. If sort is True they will be sorted.
"""
order = list(self.UserMapping)
order += [k for k in self.Dictionary if k not in order]
order += [k for k in self.ParamsDictionary if k not in order]
if self.Profile:
order += [k for k in self.Profile if k not in order]
if self.DS302:
order += [k for k in self.DS302 if k not in order]
if sort:
order = sorted(order)
# Is there a recorded order that should supersede the above sequence?
# Node might not contain IndexOrder if read from legacy od file
elif hasattr(self, 'IndexOrder'):
# Pick k from IndexOrder which is present in order
keys = [k for k in self.IndexOrder if k in order]
# Append any missing k from order that is not in IndexOrder
keys += (k for k in order if k not in keys)
order = keys
return order
def GetUnusedParameters(self):
""" Return a list of all unused parameter indexes """
return [
k for k in self.GetAllIndices()
if k not in self.Dictionary
]
# --------------------------------------------------------------------------
# Type helper functions
# --------------------------------------------------------------------------
def IsStringType(self, index: int) -> bool:
"""Is the object index a string type?"""
if index in (0x9, 0xA, 0xB, 0xF): # VISIBLE_STRING, OCTET_STRING, UNICODE_STRING, DOMAIN
return True
if 0xA0 <= index < 0x100: # Custom types
result = self.GetEntry(index, 1)
if result in (0x9, 0xA, 0xB):
return True
return False
def IsRealType(self, index: int) -> bool:
"""Is the object index a real (float) type?"""
if index in (0x8, 0x11): # REAL32, REAL64
return True
if 0xA0 <= index < 0x100: # Custom types
result = self.GetEntry(index, 1)
if result in (0x8, 0x11):
return True
return False
def IsMappingEntry(self, index: int) -> bool:
"""
Check if an entry exists in the User Mapping Dictionary and returns the answer.
"""
# FIXME: Is usermapping only used when defining custom objects?
# Come back to this and test if this is the case. If it is the function
# should probably be renamed to "IsUserEntry" or somesuch
return index in self.UserMapping
def IsEntry(self, index: int, subindex: int=0) -> bool:
"""
Check if an entry exists in the Object Dictionary
"""
if index in self.Dictionary:
if not subindex:
return True
dictval = self.Dictionary[index]
return isinstance(dictval, list) and subindex <= len(dictval)
return False
def HasEntryCallbacks(self, index: int) -> bool:
"""Check if entry has the callback flag defined."""
entry_infos = self.GetEntryInfos(index)
if entry_infos and "callback" in entry_infos:
return entry_infos["callback"]
if index in self.Dictionary and index in self.ParamsDictionary:
params = self.ParamsDictionary[index]
return params.get("callback", False) # type: ignore[call-overload, return-value]
return False
# --------------------------------------------------------------------------
# Node mutuation functions
# --------------------------------------------------------------------------
def AddEntry(self, index: int, subindex: int|None = None, value: TODValue|list[TODValue]|None = None):
"""
Add a new entry in the Object Dictionary
"""
# FIXME: It need a value, but the order of fn arguments is placed after an optional arg
assert value is not None
if index not in self.Dictionary:
if not subindex:
self.Dictionary[index] = value
return
if subindex == 1:
# FIXME: When specifying a subindex, the value should never be a list
assert not isinstance(value, list)
self.Dictionary[index] = [value]
return
raise ValueError(f"Invalid subindex {subindex} when 0x{index:04x} is not in the dictionary")
dictval = self.Dictionary[index]
if subindex and isinstance(dictval, list) and subindex == len(dictval) + 1:
# FIXME: When specifying a subindex, the value should never be a list
assert not isinstance(value, list)
dictval.append(value)
return
raise ValueError(f"Unable to add entry 0x{index:04x} subindex {subindex}")
def SetEntry(self, index: int, subindex: int|None = None, value: TODValue|None = None):
"""Modify an existing entry in the Object Dictionary"""
# FIXME: Is it permissible to have value as None? The code seems to suggest that it is
assert value is not None
if index not in self.Dictionary:
raise ValueError(f"Index 0x{index:04x} does not exist")
dictval = self.Dictionary[index]
if not subindex:
# if value is not None: # FIXME: Can this be None?
self.Dictionary[index] = value
return
if isinstance(dictval, list) and 0 < subindex <= len(dictval):
# if value is not None: # FIXME: Can this be None?
dictval[subindex - 1] = value
return
raise ValueError(f"Failed to set entry 0x{index:04x} subindex {subindex}")
def SetParamsEntry(self, index: int, subindex: int|None = None, params: TParamEntry|None = None):
"""Set parameter values for an entry in the Object Dictionary."""
if index not in self.Dictionary:
raise ValueError(f"Index 0x{index:04x} does not exist")
if not params:
raise ValueError("No parameters to set for index 0x{index:04x}")
dictval = self.Dictionary[index]
pardict = self.ParamsDictionary.setdefault(index, {})
if subindex is None or (not isinstance(dictval, list) and subindex == 0):
pardict.update(params) # type: ignore[arg-type]
return
if isinstance(dictval, list) and 0 <= subindex <= len(dictval):
subparam: TParamEntry = pardict.setdefault(subindex, {}) # type: ignore[typeddict-item,misc]
subparam.update(params)
return
raise ValueError(f"Failed to set params entry 0x{index:04x} subindex {subindex}")
def RemoveEntry(self, index: int, subindex: int|None = None):
"""
Removes an existing entry in the Object Dictionary. If a subindex is specified
it will remove this subindex only if it's the last of the index. If no subindex
is specified it removes the whole index and subIndexes from the Object Dictionary.
"""
if index not in self.Dictionary:
raise ValueError(f"Index 0x{index:04x} does not exist")
if subindex is None:
self.Dictionary.pop(index)
self.ParamsDictionary.pop(index, None)
return
dictval = self.Dictionary[index]
if isinstance(dictval, list) and subindex == len(dictval):
dictval.pop(subindex - 1)
if index in self.ParamsDictionary:
self.ParamsDictionary[index].pop(subindex, None) # type: ignore[typeddict-item,misc]
if len(self.ParamsDictionary[index]) == 0:
self.ParamsDictionary.pop(index)
if len(dictval) == 0:
self.Dictionary.pop(index)
self.ParamsDictionary.pop(index, None)
return
raise ValueError(f"Failed to remove entry 0x{index:04x} subindex {subindex}")
def AddMappingEntry(self, index: int, entry: TODObj):
"""
Add a new entry in the User Mapping Dictionary
"""
if index in self.UserMapping:
raise ValueError(f"Index 0x{index:04x} already exists in UserMapping")
if not entry:
raise ValueError("No entry to set for index 0x{index:04x}")
if index not in self.UserMapping:
entry.setdefault("values", [])
self.UserMapping[index] = entry
return
raise ValueError(f"Failed to add mapping entry 0x{index:04x}")
def AddMappingSubEntry(self, index: int, subindex: int, values: TODSubObj):
"""
Add a new subentry in the User Mapping Dictionary
"""
if not values:
raise ValueError("No values to set for index 0x{index:04x} subindex {subindex}")
if index not in self.UserMapping:
raise ValueError(f"Index 0x{index:04x} does not exist in User Mapping")
if subindex == len(self.UserMapping[index]["values"]):
self.UserMapping[index]["values"].append(values)
return
raise ValueError(f"Failed to add mapping entry 0x{index:04x} subindex {subindex}")
def SetMappingEntry(self, index: int, entry: TODObj):
"""
Modify an existing entry in the User Mapping Dictionary
"""
if index not in self.UserMapping:
raise ValueError(f"Index 0x{index:04x} does not exist in User Mapping")
if not entry:
raise ValueError("No entry to set for index 0x{index:04x}")
usermap = self.UserMapping[index]
if "name" in entry:
name = entry["name"]
if usermap["struct"] & OD.IdenticalSubindexes:
usermap["values"][1]["name"] = name + " %d[(sub)]"
elif not usermap["struct"] & OD.MultipleSubindexes:
usermap["values"][0]["name"] = name
usermap.update(entry)
def SetMappingSubEntry(self, index: int, subindex: int, values: TODSubObj):
"""
Modify an existing subentry in the User Mapping Dictionary
"""
if index not in self.UserMapping:
raise ValueError(f"Index 0x{index:04x} subindex {subindex} does not exist in User Mapping")
if not values:
raise ValueError(f"No values to set for index 0x{index:04x} subindex {subindex}")
usermap = self.UserMapping[index]
if subindex >= len(usermap["values"]):
raise ValueError(f"Subindex {subindex} for index 0x{index:04x} does not exist in User Mapping")
submap = usermap["values"][subindex]
if "type" in values:
if usermap["struct"] & OD.IdenticalSubindexes:
if self.IsStringType(submap["type"]):
if self.IsRealType(values["type"]):
for i in range(len(self.Dictionary[index])): # type: ignore[arg-type]
self.SetEntry(index, i + 1, 0.)
elif not self.IsStringType(values["type"]):
for i in range(len(self.Dictionary[index])): # type: ignore[arg-type]
self.SetEntry(index, i + 1, 0)
elif self.IsRealType(submap["type"]):
if self.IsStringType(values["type"]):
for i in range(len(self.Dictionary[index])): # type: ignore[arg-type]
self.SetEntry(index, i + 1, "")
elif not self.IsRealType(values["type"]):
for i in range(len(self.Dictionary[index])): # type: ignore[arg-type]
self.SetEntry(index, i + 1, 0)
elif self.IsStringType(values["type"]):
for i in range(len(self.Dictionary[index])): # type: ignore[arg-type]
self.SetEntry(index, i + 1, "")
elif self.IsRealType(values["type"]):
for i in range(len(self.Dictionary[index])): # type: ignore[arg-type]
self.SetEntry(index, i + 1, 0.)
else:
if self.IsStringType(submap["type"]):
if self.IsRealType(values["type"]):
self.SetEntry(index, subindex, 0.)
elif not self.IsStringType(values["type"]):
self.SetEntry(index, subindex, 0)
elif self.IsRealType(submap["type"]):
if self.IsStringType(values["type"]):
self.SetEntry(index, subindex, "")
elif not self.IsRealType(values["type"]):
self.SetEntry(index, subindex, 0)
elif self.IsStringType(values["type"]):
self.SetEntry(index, subindex, "")
elif self.IsRealType(values["type"]):
self.SetEntry(index, subindex, 0.)
submap.update(values)
def RemoveMappingEntry(self, index: int, subindex: int|None = None):
"""
Removes an existing entry in the User Mapping Dictionary. If a subindex is specified
it will remove this subindex only if it's the last of the index. If no subindex
is specified it removes the whole index and subIndexes from the User Mapping Dictionary.
"""
if index not in self.UserMapping:
raise ValueError(f"Index 0x{index:04x} does not exist in User Mapping")
if subindex is None:
self.UserMapping.pop(index)
return
obj = self.UserMapping[index]
if subindex == len(obj["values"]) - 1:
obj["values"].pop(subindex)
return
if obj['struct'] & OD.IdenticalSubindexes:
return
raise ValueError(f"Invalid subindex {subindex} for index 0x{index:04x}")
def RemoveMapVariable(self, index: int, subindex: int = 0):
"""
Remove all PDO mappings references to the specificed index and subindex.
"""
model = index << 16
mask = 0xFFFF << 16
if subindex:
model += subindex << 8
mask += 0xFF << 8
# Iterate over all RPDO and TPDO mappings and remove the reference to this variable
for i, dictval in self.Dictionary.items():
if 0x1600 <= i <= 0x17FF or 0x1A00 <= i <= 0x1BFF:
# FIXME: Assumes that PDO mappings are records
assert isinstance(dictval, list)
for j, value in enumerate(dictval):
# FIXME: Assumes that the data in the records are ints
assert isinstance(value, int)
if (value & mask) == model:
dictval[j] = 0
def UpdateMapVariable(self, index: int, subindex: int, size: int):
"""
Update the PDO mappings references to the specificed index and subindex
and set the size value.
"""
model = index << 16
mask = 0xFFFF << 16
if subindex:
model += subindex << 8
mask = 0xFF << 8
for i, dictval in self.Dictionary.items():
if 0x1600 <= i <= 0x17FF or 0x1A00 <= i <= 0x1BFF:
# FIXME: Assumes that PDO mappings are records
assert isinstance(dictval, list)
for j, value in enumerate(dictval):
# FIXME: Assumes that the data in the records are ints
assert isinstance(value, int)
if (value & mask) == model:
dictval[j] = model + size
def RemoveLine(self, index: int, maxval: int, incr: int = 1):
""" Remove the given index and shift all the following indexes """
# FIXME: This function is called from NodeManager.RemoveCurrentVariable()
# but uncertain on how it is used.
i = index
while i < maxval and self.IsEntry(i + incr):
self.Dictionary[i] = self.Dictionary[i + incr]
i += incr
self.Dictionary.pop(i)
def RemoveIndex(self, index: int|Iterable[int]) -> None:
""" Remove the given index or indexes """
if isinstance(index, int):
index = [index]
for i in index:
self.UserMapping.pop(i, None)
self.Dictionary.pop(i, None)
self.ParamsDictionary.pop(i, None)
if self.DS302:
self.DS302.pop(i, None)
if self.Profile:
self.Profile.pop(i, None)
if not self.Profile:
self.ProfileName = "None"
# --------------------------------------------------------------------------
# Validator
# --------------------------------------------------------------------------
def Validate(self, fix=False):
""" Verify any inconsistencies when loading an OD. The function will
attempt to fix the data if the correct flag is enabled.
"""
def _warn(text: str):
name = self.GetEntryName(index)
log.warning("WARNING: 0x%04x (%d) '%s': %s", index, index, name, text)
# Iterate over all the values and user parameters
for index in set(self.Dictionary) | set(self.ParamsDictionary):
#
# Test if ParamDictionary exists without Dictionary
#
if index not in self.Dictionary:
_warn("Parameter value without any dictionary entry")
if fix:
del self.ParamsDictionary[index]
_warn("FIX: Deleting ParamDictionary entry")
continue
base = self.GetEntryInfos(index)
is_var = base["struct"] in (OD.VAR, OD.NVAR)
# FIXME: This probably needs a revisit. Is this checking that the
# dimensions of Dictionary and ParamsDictionary match?
#
# Test if ParamDictionary matches Dictionary
#
# Complile a list of all subindexes
dictlen = 1 if is_var else len(self.Dictionary.get(index, [])) # type: ignore[arg-type]
params = {
k: v
# This assumes that ParamsDictionary is always a dict with or without subindexes
for k, v in self.ParamsDictionary.get(index, {}).items()
if isinstance(k, int) # Any other key is not a subindex
}
excessive_params = {k for k in params if k > dictlen}
if excessive_params:
log.debug("Excessive params: %s", excessive_params)
_warn(
f"Excessive user parameters ({len(excessive_params)}) "
f"or too few dictionary values ({dictlen})"
)
if index in self.Dictionary:
for idx in excessive_params:
del self.ParamsDictionary[index][idx] # type: ignore[typeddict-item,misc]
del params[idx]
t_p = ", ".join(str(k) for k in excessive_params)
_warn(f"FIX: Deleting ParamDictionary entries {t_p}")
# If params have been emptied because of this, remove it altogether
if not params:
del self.ParamsDictionary[index]
_warn("FIX: Deleting ParamDictionary entry")
# Iterate over all user mappings
for index in set(self.UserMapping):
for idx, subvals in enumerate(self.UserMapping[index]['values']):
#
# Test that subindexi have a name
#
if not subvals["name"]:
_warn(f"Sub index {idx}: Missing name")
if fix:
subvals["name"] = f"Subindex {idx}"
_warn(f"FIX: Set name to '{subvals['name']}'")
# Register node with gnosis