Skip to content

Commit a60571b

Browse files
fix concurrency issue when sending multiple Peer-to-peer messages (CMDT)
close juergenH87#31
1 parent ff8e048 commit a60571b

File tree

5 files changed

+229
-13
lines changed

5 files changed

+229
-13
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import logging
2+
import time
3+
import can
4+
import j1939
5+
from hexdump import hexdump
6+
7+
logging.getLogger('j1939').setLevel(logging.DEBUG)
8+
logging.getLogger('can').setLevel(logging.DEBUG)
9+
10+
MY_ADDR = 1
11+
# compose the name descriptor for the new ca
12+
name = j1939.Name(
13+
arbitrary_address_capable=0,
14+
industry_group=j1939.Name.IndustryGroup.Industrial,
15+
vehicle_system_instance=1,
16+
vehicle_system=1,
17+
function=1,
18+
function_instance=1,
19+
ecu_instance=1,
20+
manufacturer_code=666,
21+
identity_number=1234567
22+
)
23+
24+
# create the ControllerApplications
25+
ca = j1939.ControllerApplication(name, MY_ADDR)
26+
27+
def on_message(priority, pgn, sa, timestamp, data):
28+
"""Receive incoming messages from the bus
29+
30+
:param int priority:
31+
Priority of the message
32+
:param int pgn:
33+
Parameter Group Number of the message
34+
:param int sa:
35+
Source Address of the message
36+
:param int timestamp:
37+
Timestamp of the message
38+
:param bytearray data:
39+
Data of the PDU
40+
"""
41+
print(f"PGN {pgn} length {len(data)} source {hex(sa)} time {timestamp} my_addr {hex(MY_ADDR)}")
42+
# print(hexdump(data))
43+
44+
def main():
45+
print("Initializing")
46+
47+
# create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)
48+
ecu = j1939.ElectronicControlUnit()
49+
50+
# Connect to the CAN bus
51+
# Arguments are passed to python-can's can.interface.Bus() constructor
52+
# (see https://python-can.readthedocs.io/en/stable/bus.html).
53+
ecu.connect(bustype='socketcan', channel='can0')
54+
# ecu.connect(bustype='kvaser', channel=0, bitrate=250000)
55+
# ecu.connect(bustype='pcan', channel='PCAN_USBBUS1', bitrate=250000)
56+
# ecu.connect(bustype='ixxat', channel=0, bitrate=250000)
57+
# ecu.connect(bustype='vector', app_name='CANalyzer', channel=0, bitrate=250000)
58+
# ecu.connect(bustype='nican', channel='CAN0', bitrate=250000)
59+
60+
# add CA to the ECU
61+
ecu.add_ca(controller_application=ca)
62+
ca.subscribe(on_message)
63+
ca.start()
64+
65+
print("Initialized")
66+
67+
time.sleep(300)
68+
69+
print("Deinitializing")
70+
ca.stop()
71+
ecu.disconnect()
72+
73+
if __name__ == '__main__':
74+
main()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import logging
2+
import time
3+
import can
4+
import j1939
5+
import os
6+
from hexdump import hexdump
7+
8+
logging.getLogger('j1939').setLevel(logging.DEBUG)
9+
logging.getLogger('can').setLevel(logging.DEBUG)
10+
11+
MY_ADDR = 0x03
12+
MAX_PACKET_SIZE = 1785
13+
14+
# compose the name descriptor for the new ca
15+
name = j1939.Name(
16+
arbitrary_address_capable=1,
17+
industry_group=j1939.Name.IndustryGroup.Industrial,
18+
vehicle_system_instance=1,
19+
vehicle_system=1,
20+
function=1,
21+
function_instance=1,
22+
ecu_instance=1,
23+
manufacturer_code=666,
24+
identity_number=1234567
25+
)
26+
27+
# create the ControllerApplications
28+
ca = j1939.ControllerApplication(name, MY_ADDR)
29+
30+
def ca_receive(priority, pgn, source, timestamp, data):
31+
"""Feed incoming message to this CA.
32+
(OVERLOADED function)
33+
:param int priority:
34+
Priority of the message
35+
:param int pgn:
36+
Parameter Group Number of the message
37+
:param intsa:
38+
Source Address of the message
39+
:param int timestamp:
40+
Timestamp of the message
41+
:param bytearray data:
42+
Data of the PDU
43+
"""
44+
print(f"PGN {pgn} length {len(data)} source {source} time {timestamp} my_addr {hex(MY_ADDR)}")
45+
print(hexdump(data))
46+
47+
def ca_send_broadcast_pgn(size=100):
48+
# wait until we have our device_address
49+
while ca.state != j1939.ControllerApplication.State.NORMAL:
50+
time.sleep(1)
51+
continue
52+
53+
print(f"sending {size} bytes")
54+
# create custom length data
55+
# data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * size
56+
data = [0x01] * size
57+
58+
# sending normal broadcast message
59+
ca.send_pgn(0, 0xFD, 0xED, 6, data)
60+
print(f"sent {size} bytes to broadcast")
61+
62+
return True
63+
64+
def ca_send_direct_pgn(dest, size=100):
65+
# wait until we have our device_address
66+
while ca.state != j1939.ControllerApplication.State.NORMAL:
67+
time.sleep(1)
68+
continue
69+
70+
# create custom length data
71+
data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * size
72+
73+
# sending normal peer-to-peer message
74+
ca.send_pgn(0, 0xE0, dest, 6, data)
75+
print(f"sent {size} bytes to {hex(dest)}")
76+
return True
77+
78+
def ca_timer_callback1(cookie):
79+
"""Callback for sending messages
80+
81+
This callback is registered at the ECU timer event mechanism to be
82+
executed every 500ms.
83+
84+
:param cookie:
85+
A cookie registered at 'add_timer'. May be None.
86+
"""
87+
# wait until we have our device_address
88+
if ca.state != j1939.ControllerApplication.State.NORMAL:
89+
# returning true keeps the timer event active
90+
return True
91+
92+
ca_send_direct_pgn(0x1, 100)
93+
ca_send_direct_pgn(0x2, 100)
94+
95+
# returning true keeps the timer event active
96+
return True
97+
98+
def main():
99+
print("Initializing")
100+
101+
# create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)
102+
ecu = j1939.ElectronicControlUnit()
103+
104+
# Connect to the CAN bus
105+
# Arguments are passed to python-can's can.interface.Bus() constructor
106+
# (see https://python-can.readthedocs.io/en/stable/bus.html).
107+
ecu.connect(bustype='socketcan', channel='can0')
108+
# ecu.connect(bustype='kvaser', channel=0, bitrate=250000)
109+
# ecu.connect(bustype='pcan', channel='PCAN_USBBUS1', bitrate=250000)
110+
# ecu.connect(bustype='ixxat', channel=0, bitrate=250000)
111+
# ecu.connect(bustype='vector', app_name='CANalyzer', channel=0, bitrate=250000)
112+
# ecu.connect(bustype='nican', channel='CAN0', bitrate=250000)
113+
114+
# add CA to the ECU
115+
ecu.add_ca(controller_application=ca)
116+
ca.subscribe(ca_receive)
117+
118+
# setup periodic message callbacks
119+
ca.add_timer(2, ca_timer_callback1)
120+
121+
# by starting the CA it starts the address claiming procedure on the bus
122+
ca.start()
123+
print("waiting for addr ...")
124+
125+
time.sleep(120)
126+
127+
print("Deinitializing")
128+
ca.stop()
129+
ecu.disconnect()
130+
131+
if __name__ == '__main__':
132+
main()

j1939/j1939_21.py

+10-5
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,10 @@ class Timeout:
3333
Tb = 0.050
3434

3535
class SendBufferState:
36-
WAITING_CTS = 0 # waiting for CTS
37-
SENDING_IN_CTS = 1 # sending packages (temporary state)
38-
SENDING_BM = 2 # sending broadcast packages
36+
WAITING_CTS = 0 # waiting for CTS
37+
SENDING_IN_CTS = 1 # sending packages (temporary state)
38+
SENDING_BM = 2 # sending broadcast packages
39+
TRANSMISSION_FINISHED = 3 # finished, remove buffer
3940

4041
def __init__(self, send_message, job_thread_wakeup, notify_subscribers, max_cmdt_packets, minimum_tp_rts_cts_dt_interval, minimum_tp_bam_dt_interval, ecu_is_message_acceptable):
4142
# Receive buffers
@@ -237,6 +238,8 @@ def async_job_thread(self, now):
237238
else:
238239
# done
239240
del self._snd_buffer[bufid]
241+
elif buf['state'] == self.SendBufferState.TRANSMISSION_FINISHED:
242+
del self._snd_buffer[bufid]
240243
else:
241244
logger.critical("unknown SendBufferState %d", buf['state'])
242245
del self._snd_buffer[bufid]
@@ -326,7 +329,8 @@ def _process_tp_cm(self, mid, dest_address, data, timestamp):
326329
self.__send_tp_abort(dest_address, src_address, self.ConnectionAbortReason.RESOURCES, pgn)
327330
return
328331
# TODO: should we inform the application about the successful transmission?
329-
del self._snd_buffer[buffer_hash]
332+
self._snd_buffer[buffer_hash]['state'] = self.SendBufferState.TRANSMISSION_FINISHED
333+
self._snd_buffer[buffer_hash]['deadline'] = time.time()
330334
self.__job_thread_wakeup()
331335
elif control_byte == self.ConnectionMode.BAM:
332336
message_size = data[1] | (data[2] << 8)
@@ -354,7 +358,8 @@ def _process_tp_cm(self, mid, dest_address, data, timestamp):
354358
# if abort received before transmission established -> cancel transmission
355359
buffer_hash = self._buffer_hash(dest_address, src_address)
356360
if buffer_hash in self._snd_buffer and self._snd_buffer[buffer_hash]['state'] == self.SendBufferState.WAITING_CTS:
357-
del self._snd_buffer[buffer_hash] # cancel transmission
361+
self._snd_buffer[buffer_hash]['state'] = self.SendBufferState.TRANSMISSION_FINISHED
362+
self._snd_buffer[buffer_hash]['deadline'] = time.time()
358363
# TODO: any more abort responses?
359364
pass
360365
else:

j1939/j1939_22.py

+12-7
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,13 @@ class Timeout:
4444
T5 = 3.000 # Maximum time, for originator, to receive EOMA after sending EOMS
4545

4646
class SendBufferState:
47-
WAITING_CTS = 0 # waiting for CTS
48-
SENDING_RTS_CTS = 1 # sending rts/cts packages
49-
SENDING_BAM = 2 # sending broadcast packages
50-
SENDING_EOM_STATUS = 3 # sending end of message
51-
WAITING_EOM_ACK = 4 # waiting for end of message acknowledge
52-
EOM_ACK_RECEIVED = 5 # eom acknowledge received successfully
47+
WAITING_CTS = 0 # waiting for CTS
48+
SENDING_RTS_CTS = 1 # sending rts/cts packages
49+
SENDING_BAM = 2 # sending broadcast packages
50+
SENDING_EOM_STATUS = 3 # sending end of message
51+
WAITING_EOM_ACK = 4 # waiting for end of message acknowledge
52+
EOM_ACK_RECEIVED = 5 # eom acknowledge received successfully
53+
TRANSMISSION_FINISHED = 6 # finished, remove buffer
5354

5455
class Acknowledgement:
5556
ACK = 0
@@ -428,6 +429,8 @@ def async_job_thread(self, now):
428429
buf['message_size'], buf['num_segments'], buf['pgn'])
429430
del self._snd_buffer[bufid]
430431
self.__put_bam_session(buf['session'])
432+
elif buf['state'] == self.SendBufferState.TRANSMISSION_FINISHED:
433+
del self._snd_buffer[bufid]
431434
else:
432435
logger.critical('unknown SendBufferState %d', buf['state'])
433436
del self._snd_buffer[bufid]
@@ -578,7 +581,9 @@ def _process_tp_cm(self, mid, dest_address, data, timestamp):
578581
# if abort received before transmission established -> cancel transmission
579582
buffer_hash = self._buffer_hash(session_num, dest_address, src_address)
580583
if buffer_hash in self._snd_buffer and self._snd_buffer[buffer_hash]['state'] == self.SendBufferState.WAITING_CTS:
581-
del self._snd_buffer[buffer_hash] # cancel transmission
584+
# cancel transmission
585+
self._snd_buffer[buffer_hash]['state'] = self.SendBufferState.TRANSMISSION_FINISHED
586+
self._snd_buffer[buffer_hash]['deadline'] = time.time()
582587
# TODO: any more abort responses?
583588
else:
584589
raise RuntimeError('Received TP.CM with unknown control_byte %d', control_byte)

j1939/version.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "2.0.7"
1+
__version__ = "2.0.8"

0 commit comments

Comments
 (0)