In the following examples, we will request aa Routine Start with the RoutineControl service in 4 different ways. We will start by crafting a binary payload manually, then we will add a layer of interpretation making the code more comprehensive each time.
my_connection.send(b'\x31\x01\x12\x34') # Sends RoutineControl, with ControlType=1, Routine ID=0x1234
payload = my_connection.wait_frame(timeout=1)
if payload == b'\x71\x01\x12\x34':
print('Success!')
else:
print('Start of routine 0x1234 failed')
req = Request(services.RoutineControl, control_type=1, routine_id=0x1234) # control_type=1 --> StartRoutine
my_connection.send(req.get_payload())
payload = my_connection.wait_frame(timeout=1)
response = Response.from_payload(payload)
if response.service == service.RoutineControl and response.code == Response.Code.PositiveResponse and response.data == b'\x01\x12\x34':
print('Success!')
else:
print('Start of routine 0x1234 failed')
req = services.RoutineControl.make_request(control_type=services.RoutineControl.ControlType.startRoutine, routine_id=0x1234)
my_connection.send(req.get_payload())
payload = my_connection.wait_frame(timeout=1)
response = Response.from_payload(payload)
services.ECUReset.interpret_response(response)
if ( response.service == service.RoutineControl and response.code == Response.Code.PositiveResponse
and response.service_data.control_type_echo == 1
and response.service_data.routine_id_echo == 0x1234):
print('Success!')
else:
print('Start of routine 0x1234 failed')
try:
response = client.start_routine(routine_id=0x1234) # control_type_echo and routine_id_echo are validated by the client.
print('Success!')
except Exception:
print('Start of routine 0x1234 failed')
In this example, we show how to use :class:`PythonIsoTpConnection<udsoncan.connections.PythonIsoTpConnection>` with a fictive Vector interface.
Note that, in order to run this code, both python-can
and can-isotp
must be installed.
import can
from can.interfaces.vector import VectorBus
from udsoncan.connections import PythonIsoTpConnection
from udsoncan.client import Client
import udsoncan.configs
import isotp
# Refer to isotp documentation for full details about parameters
isotp_params = {
'stmin': 32, # Will request the sender to wait 32ms between consecutive frame. 0-127ms or 100-900ns with values from 0xF1-0xF9
'blocksize': 8, # Request the sender to send 8 consecutives frames before sending a new flow control message
'wftmax': 0, # Number of wait frame allowed before triggering an error
'tx_data_length': 8, # Link layer (CAN layer) works with 8 byte payload (CAN 2.0)
# Minimum length of CAN messages. When different from None, messages are padded to meet this length. Works with CAN 2.0 and CAN FD.
'tx_data_min_length': None,
'tx_padding': 0, # Will pad all transmitted CAN messages with byte 0x00.
'rx_flowcontrol_timeout': 1000, # Triggers a timeout if a flow control is awaited for more than 1000 milliseconds
'rx_consecutive_frame_timeout': 1000, # Triggers a timeout if a consecutive frame is awaited for more than 1000 milliseconds
'override_receiver_stmin': None, # When sending, respect the stmin requirement of the receiver. Could be set to a float value in seconds.
'max_frame_size': 4095, # Limit the size of receive frame.
'can_fd': False, # Does not set the can_fd flag on the output CAN messages
'bitrate_switch': False, # Does not set the bitrate_switch flag on the output CAN messages
'rate_limit_enable': False, # Disable the rate limiter
'rate_limit_max_bitrate': 1000000, # Ignored when rate_limit_enable=False. Sets the max bitrate when rate_limit_enable=True
'rate_limit_window_size': 0.2, # Ignored when rate_limit_enable=False. Sets the averaging window size for bitrate calculation when rate_limit_enable=True
'listen_mode': False, # Does not use the listen_mode which prevent transmission.
}
uds_config = udsoncan.configs.default_client_config.copy()
bus = VectorBus(channel=0, bitrate=500000) # Link Layer (CAN protocol)
notifier = can.Notifier(bus, [can.Printer()]) # Add a debug listener that print all messages
tp_addr = isotp.Address(isotp.AddressingMode.Normal_11bits, txid=0x123, rxid=0x456) # Network layer addressing scheme
#stack = isotp.CanStack(bus=bus, address=tp_addr, params=isotp_params) # isotp v1.x has no notifier support
stack = isotp.NotifierBasedCanStack(bus=bus, notifier=notifier, address=tp_addr, params=isotp_params) # Network/Transport layer (IsoTP protocol). Register a new listenenr
conn = PythonIsoTpConnection(stack) # interface between Application and Transport layer
with Client(conn, config=uds_config) as client: # Application layer (UDS protocol)
client.change_session(1)
# ...
In this example, we show how to use :class:`SyncAioIsotpConnection<udsoncan.connections.SyncAioIsotpConnection>` with a virtual can interface.
Note that, in order to run this code, both python-can
and aioisotp
must be installed.
from udsoncan.connections import SyncAioIsotpConnection
from udsoncan.client import Client
import logging
logging.basicConfig(level=logging.DEBUG)
conn = SyncAioIsotpConnection(interface="virtual", channel=0, bitrate=500000, rx_id=0x123, tx_id=0x456)
with Client(conn) as client:
with client.suppress_positive_response:
client.change_session(3)
# ...
In this example, we show how the :ref:`Client<Client>` uses the memory location format configurations.
client.config['server_address_format'] = 16
client.config['server_memorysize_format'] = 8
# Explicit declaration. Client will used this value
memloc1 = MemoryLocation(address=0x1234, memorysize=0x10, address_format=16, address_format=8)
# No explicit declaration. Client will use the default values in the configuration
memloc2 = MemoryLocation(address=0x1234, memorysize=0x10)
response = client.read_memory_by_address(memloc1)
response = client.read_memory_by_address(memloc2)
The following example shows how to define a security algorithm in the client configuration. The algorithm XOR the seed with a pre-shared key passed as a parameter.
def myalgo(level, seed, params):
"""
Builds the security key to unlock a security level. Returns the seed xor'ed with pre-shared key.
"""
output_key = bytearray(seed)
xorkey = bytearray(params['xorkey'])
for i in range(len(seed)):
output_key[i] = seed[i] ^ xorkey[i%len(xorkey)]
return bytes(output_key)
client.config['security_algo'] = myalgo
client.config['security_algo_params'] = dict(xorkey=b'\x12\x34\x56\x78')
Warning
This algorithm is not secure and is given as an example only because of its simple implementation. XOR encryption is weak on many levels; it is vulnerable to known-plaintext attacks, relatively weak against replay attacks and does not provide enough diffusion (pattern recognition is possible). If you are an ECU programmer, please do not implement this.
This example shows how to configure the client with a DID configuration and request the server with ReadDataByIdentifier
import udsoncan
from udsoncan.connections import IsoTPSocketConnection
from udsoncan.client import Client
import udsoncan.configs
import struct
class MyCustomCodecThatShiftBy4(udsoncan.DidCodec):
def encode(self, val):
val = (val << 4) & 0xFFFFFFFF # Do some stuff
return struct.pack('<L', val) # Little endian, 32 bit value
def decode(self, payload):
val = struct.unpack('<L', payload)[0] # decode the 32 bits value
return val >> 4 # Do some stuff (reversed)
def __len__(self):
return 4 # encoded payload is 4 byte long.
config = dict(udsoncan.configs.default_client_config)
config['data_identifiers'] = {
'default' : '>H', # Default codec is a struct.pack/unpack string. 16bits little endian
0x1234 : MyCustomCodecThatShiftBy4, # Uses own custom defined codec. Giving the class is ok
0x1235 : MyCustomCodecThatShiftBy4(), # Same as 0x1234, giving an instance is good also
0xF190 : udsoncan.AsciiCodec(17) # Codec that read ASCII string. We must tell the length of the string
}
# IsoTPSocketconnection only works with SocketCAN under Linux. Use another connection if needed.
conn = IsoTPSocketConnection('vcan0', rxid=0x123, txid=0x456)
with Client(conn, request_timeout=2, config=config) as client:
response = client.read_data_by_identifier([0xF190])
print(response.service_data.values[0xF190]) # This is a dict of DID:Value
# Or, if a single DID is expected, a shortcut to read the value of the first DID
vin = client.read_data_by_identifier_first(0xF190)
print(vin) # 'ABCDEFG0123456789' (17 chars)
This example shows how the InputOutputControlByIdentifier can be used with a composite data identifier and how to build a proper ioconfig dict which can be tricky. The example shown below correspond to a real example provided in ISO-14229 document
# Example taken from UDS standard
class MyCompositeDidCodec(DidCodec):
def encode(self, IAC_pintle, rpm, pedalA, pedalB, EGR_duty):
pedal = (pedalA << 4) | pedalB
return struct.pack('>BHBB', IAC_pintle, rpm, pedal, EGR_duty)
def decode(self, payload):
vals = struct.unpack('>BHBB', payload)
return {
'IAC_pintle': vals[0],
'rpm' : vals[1],
'pedalA' : (vals[2] >> 4) & 0xF,
'pedalB' : vals[2] & 0xF,
'EGR_duty' : vals[3]
}
def __len__(self):
return 5
ioconfig = {
0x132 : MyDidCodec,
0x456 : '<HH',
0x155 : {
'codec' : MyCompositeDidCodec,
'mask' : {
'IAC_pintle': 0x80,
'rpm' : 0x40,
'pedalA' : 0x20,
'pedalB' : 0x10,
'EGR_duty' : 0x08
},
'mask_size' : 2 # Mask encoded over 2 bytes
}
}
values = {'IAC_pintle': 0x07, 'rpm': 0x1234, 'pedalA': 0x4, 'pedalB' : 0x5, 'EGR_duty': 0x99}
req = InputOutputControlByIdentifier.make_request(0x155, values=values, masks=['IAC_pintle', 'pedalA'], ioconfig=ioconfig)
This is an example for how to use :class:`J2534Connection<udsoncan.connections.J2534Connection>`. This connection requires a compatible J2534 PassThru device (such as a tactrix openport 2.0 cable), with a DLL for said device installed. Note, this connection has been written to plug in where a standard IsoTPSocketConnection had been used (i.e. code ported from Linux to Windows). Functionality, from a high level, is identical.
from udsoncan.connections import J2534Connection
conn = J2534Connection(windll='C:\Program Files (x86)\OpenECU\OpenPort 2.0\drivers\openport 2.0\op20pt32.dll',
rxid=0x7E8, txid=0x7E0) # Define the connection using the absolute path to the DLL, rxid and txid's for isotp
conn.send(b'\x22\xf2\x00') # Mode 22 request for DID F200
response = conn.wait_frame() # response should = 0x62 F2 00 data data data data
with Client(conn, request_timeout=1) as client: # Application layer (UDS protocol)
client.change_session(1)
# ...
# Example 1) defineByIdentifier - single value
my_def = DynamicDidDefinition(source_did = 0x1111, position=1, memorysize=2)
client.dynamically_define_did(0x1234, my_def)
# Example 2) defineByIdentifier - composite value
my_def = DynamicDidDefinition(source_did = 0x1111, position=1, memorysize=2)
my_def.add(source_did = 0x2222, position=5, memorysize=4)
client.dynamically_define_did(0x1234, my_def)
# Example 3) defineByMemoryAddress - single value
my_memloc = MemoryLocation(address=0x1111, memorysize=2, address_format=16, memorysize_format=8)
client.dynamically_define_did(0x1234, my_memloc)
# Example 4) defineByMemoryAddress - composite value
my_def = DynamicDidDefinition(MemoryLocation(address=0x1111, memorysize=2, address_format=16, memorysize_format=8))
my_def.add(MemoryLocation(address=0x2222, memorysize=4, address_format=16, memorysize_format=8))
my_def.add(MemoryLocation(address=0x3333, memorysize=1, address_format=16, memorysize_format=8))
client.dynamically_define_did(0x1234, my_def)
# Example 5) defineByMemoryAddress - composite value and client default format
client.set_config('server_address_format', 16)
client.set_config('server_memorysize_format', 8)
my_def = DynamicDidDefinition(MemoryLocation(address=0x1111, memorysize=2))
my_def.add(MemoryLocation(address=0x2222, memorysize=4))
my_def.add(MemoryLocation(address=0x3333, memorysize=1))
client.dynamically_define_did(0x1234, my_def)