From 1ed3db26dade7d92ba5a6237a51265cc8f442861 Mon Sep 17 00:00:00 2001 From: Dima Wittmann Date: Fri, 24 Jan 2020 14:24:34 +0200 Subject: [PATCH] Stabilize BLE Security Implementation (#941) * Refactor BLE broadcast encryption flags The original spec defines the flags that are used to indicate the keying material for BLE broadcasts: ... kBroadcastDataIsEncrypted = 1 << 3, kBroadcastKeyIsDeviceKey = 1 << 4, kBroadcastKeyIsUserKey = 1 << 5, ... Using this implementation only three states can be encoded: no encryption, encrypted with device key and encrypted with a user key. New implementation treats this three bytes like enum instead of seperate bools and introduce a new encryption state (encrypted with null key): kNoEncryption = 0, kEncryptedWithNullKey = 1, kEncryptedWithUserKey = 2, kEncryptedWithDeviceKey = 3, * Consolidate auth characteristics The BLE authentication flow is completely synchronous so there is no need for two separate characteristics for the client and the server. This commit unifies two authentication characteristics into one. * Modify Security Level 0 For security level 0, the root key is considered to be 32-bytes of zeros and authentication proceeds exactly the same as level 1 and 2 given that root key. * Stabilize flags/info ble characteristic Info characteristic holds 20 bytes 1 byte: version 2 byte: security flags 3-4 bytes: high flags 5-20 bytes: reserved flags Added RPC IN PROGRESS flag for tracking if the device handles a RPC and cannot handle another one * Distinguish password-based user-key and random data Introduce a 3rd AuthMethod and key: PasswordBasedAuthentication and PasswordBasedUserKey The primary benefit is that it distinguishes between when we are looking for a password that a user could enter from when we are looking for something that has to be stored in a server or key store somewhere. * Update comments in test_signed_list_report.py --- iotilecore/RELEASE.md | 1 + .../iotile/core/hw/auth/auth_provider.py | 23 ++++- .../iotile/core/hw/auth/cli_auth_provider.py | 33 +++---- .../iotile/core/hw/auth/env_auth_provider.py | 19 ++-- .../iotile/core/hw/auth/null_auth_provider.py | 2 +- .../test_reports/test_signed_list_report.py | 28 ++++-- transport_plugins/bled112/RELEASE.md | 10 +++ .../iotile_transport_bled112/bled112.py | 50 +++++++---- .../iotile_transport_bled112/bled112_auth.py | 88 ++++++++++++------- .../iotile_transport_bled112/bled112_cmd.py | 36 +++++--- .../iotile_transport_bled112/tilebus.py | 5 +- 11 files changed, 193 insertions(+), 102 deletions(-) diff --git a/iotilecore/RELEASE.md b/iotilecore/RELEASE.md index 2dc9bf0a5..df5449335 100644 --- a/iotilecore/RELEASE.md +++ b/iotilecore/RELEASE.md @@ -10,6 +10,7 @@ All major changes in each released version of `iotile-core` are listed here. - Add shim to make `iotile-core` compatible with Python 3.8.0 on Windows. There is a bug in that python version that breaks background event loops only on Windows. It is fixed in python 3.8.1. +- Add new BLE broadcast encryption method: Encrypted with NullKey ## 5.0.11 diff --git a/iotilecore/iotile/core/hw/auth/auth_provider.py b/iotilecore/iotile/core/hw/auth/auth_provider.py index 6f6ce8adc..fee825d16 100644 --- a/iotilecore/iotile/core/hw/auth/auth_provider.py +++ b/iotilecore/iotile/core/hw/auth/auth_provider.py @@ -16,13 +16,17 @@ class AuthProvider: ReportKeyMagic = 0x00000002 NoKey = 0 - UserKey = 1 - DeviceKey = 2 + NullKey = 1 + UserKey = 2 + DeviceKey = 3 + PasswordBasedKey = 4 KnownKeyRoots = { NoKey: 'no_key', + NullKey: 'null_key', UserKey: 'user_key', - DeviceKey: 'device_key' + DeviceKey: 'device_key', + PasswordBasedKey: 'pasword_based_userkey' } def __init__(self, args=None): @@ -48,6 +52,19 @@ def verify_key(self, root_key_type): if root_key_type not in self.supported_keys: raise NotFoundError("Not supported key type", key=root_key_type) + @classmethod + def DeriveRebootKeyFromPassword(cls, password): + """Derive the root key from the user password + TODO hashlib.pbkdf2_hmac arguments needs to be revised, + current values are not proved to be secure + + Args: + password (str): user password + + Returns: + bytes: derived key + """ + return hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), b'salt', 100000) @classmethod def DeriveReportKey(cls, root_key, report_id, sent_timestamp): diff --git a/iotilecore/iotile/core/hw/auth/cli_auth_provider.py b/iotilecore/iotile/core/hw/auth/cli_auth_provider.py index ab2dc7690..2e514cf4e 100644 --- a/iotilecore/iotile/core/hw/auth/cli_auth_provider.py +++ b/iotilecore/iotile/core/hw/auth/cli_auth_provider.py @@ -6,6 +6,8 @@ import hashlib from iotile.core.exceptions import NotFoundError from .rootkey_auth_provider import RootKeyAuthProvider +from .env_auth_provider import EnvAuthProvider +from .auth_provider import AuthProvider import getpass @@ -18,36 +20,29 @@ def __init__(self, args=None): if args is None: args = {} - args['supported_keys'] = [self.UserKey] + args['supported_keys'] = [self.PasswordBasedKey] super().__init__(args) - @classmethod - def derive_key(cls, password): - """Derive the root key from user password - TODO hashlib.pbkdf2_hmac arguments needs to be revised, - current values are not proved to be secure - - Args: - password (str): user password - - Returns: - bytes: derived key - """ - return hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), b'salt', 100000) - def get_root_key(self, key_type, device_id): - """Prompt user for the password and derive root key from it + """Prompt a user for the password and derive the root key from it Args: key_type (int): see KnownKeyRoots - device_id (int): uuid of the device + device_id (int): the uuid of the device Returns: bytes: the root key """ self.verify_key(key_type) - password = getpass.getpass("Please, input user password for device {} :".format(device_id)) + password = getpass.getpass("Please input the user password for the device {} :".format(device_id)) + userkey = AuthProvider.DeriveRebootKeyFromPassword(password) + + if device_id: + answer = input("Would you like to save the user-key until the end of the current session? (y/n)") + if answer and answer[0].lower() == 'y': + variable_name = EnvAuthProvider.construct_var_name(device_id) + os.environ[variable_name] = userkey.hex() - return CliAuthProvider.derive_key(password) + return userkey diff --git a/iotilecore/iotile/core/hw/auth/env_auth_provider.py b/iotilecore/iotile/core/hw/auth/env_auth_provider.py index fbe53a383..c0ac17018 100644 --- a/iotilecore/iotile/core/hw/auth/env_auth_provider.py +++ b/iotilecore/iotile/core/hw/auth/env_auth_provider.py @@ -18,6 +18,19 @@ def __init__(self, args=None): super().__init__(args) + @classmethod + def construct_var_name(cls, device_id): + """Build name of environment variable used to store userkey + + Returns: + str: variable name + """ + if isinstance(device_id, str): + var_name = "USER_KEY_{}".format(device_id) + else: + var_name = "USER_KEY_{0:08X}".format(device_id) + return var_name + def get_root_key(self, key_type, device_id): """Attempt to get a user key from an environment variable @@ -30,10 +43,7 @@ def get_root_key(self, key_type, device_id): """ self.verify_key(key_type) - if isinstance(device_id, str): - var_name = "USER_KEY_{}".format(device_id) - else: - var_name = "USER_KEY_{0:08X}".format(device_id) + var_name = EnvAuthProvider.construct_var_name(device_id) if var_name not in os.environ: raise NotFoundError("No key could be found for devices", device_id=device_id, @@ -43,7 +53,6 @@ def get_root_key(self, key_type, device_id): if len(key_var) != 64: raise NotFoundError("Key in variable is not the correct length, should be 64 hex characters", device_id=device_id, key_value=key_var) - try: key = binascii.unhexlify(key_var) except ValueError as exc: diff --git a/iotilecore/iotile/core/hw/auth/null_auth_provider.py b/iotilecore/iotile/core/hw/auth/null_auth_provider.py index 547de72e9..346bbf7f0 100644 --- a/iotilecore/iotile/core/hw/auth/null_auth_provider.py +++ b/iotilecore/iotile/core/hw/auth/null_auth_provider.py @@ -10,7 +10,7 @@ def __init__(self, args=None): if args is None: args = {} - args['supported_keys'] = [self.NoKey] + args['supported_keys'] = [self.NullKey] super().__init__(args) diff --git a/iotilecore/test/test_reports/test_signed_list_report.py b/iotilecore/test/test_reports/test_signed_list_report.py index 56f29b34c..6543beca0 100644 --- a/iotilecore/test/test_reports/test_signed_list_report.py +++ b/iotilecore/test/test_reports/test_signed_list_report.py @@ -5,9 +5,21 @@ from iotile.core.hw.reports.signed_list_format import SignedListReport from iotile.core.hw.reports.report import IOTileReading from iotile.core.hw.auth.env_auth_provider import EnvAuthProvider +from iotile.core.hw.auth.auth_provider import AuthProvider -def make_sequential(iotile_id, stream, num_readings, give_ids=False, root_key=0, signer=None): +def make_sequential(iotile_id, stream, num_readings, give_ids=False, root_key=AuthProvider.NoKey, signer=None): + """Create sequaltial report from reading + + Args: + iotile_id (int): The uuid of the device that this report came from + stream (int): The stream that these readings are part of + num_readings(int): amount of readings + give_ids(bool): whether to set sequantial id for every reading + root_key(int): type of root key to sign the report + signer (AuthProvider): An optional preconfigured AuthProvider that should be used to sign this + report. If no AuthProvider is provided, the default ChainedAuthProvider is used. + """ readings = [] for i in range(0, num_readings): @@ -22,8 +34,7 @@ def make_sequential(iotile_id, stream, num_readings, give_ids=False, root_key=0, return report def test_basic_parsing(): - """Make sure we can decode a signed report - """ + """Make sure we can decode a signed report""" report = make_sequential(1, 0x1000, 10) encoded = report.encode() @@ -41,8 +52,7 @@ def test_basic_parsing(): assert report.signature_flags == 0 def test_footer_calculation(): - """ - """ + """Test if make_sesuentail set properly ids""" report1 = make_sequential(1, 0x1000, 10, give_ids=False) report2 = make_sequential(1, 0x1000, 10, give_ids=True) @@ -60,15 +70,15 @@ def test_userkey_signing(monkeypatch): signer = EnvAuthProvider() with pytest.raises(ExternalError): - report1 = make_sequential(1, 0x1000, 10, give_ids=True, root_key=1, signer=signer) + report1 = make_sequential(1, 0x1000, 10, give_ids=True, root_key=AuthProvider.UserKey, signer=signer) - report1 = make_sequential(2, 0x1000, 10, give_ids=True, root_key=1, signer=signer) + report1 = make_sequential(2, 0x1000, 10, give_ids=True, root_key=AuthProvider.UserKey, signer=signer) encoded = report1.encode() report2 = SignedListReport(encoded) - assert report1.signature_flags == 1 - assert report2.signature_flags == 1 + assert report1.signature_flags == AuthProvider.UserKey + assert report2.signature_flags == AuthProvider.UserKey assert report1.verified assert report1.encrypted assert report2.verified diff --git a/transport_plugins/bled112/RELEASE.md b/transport_plugins/bled112/RELEASE.md index ef865edfe..6596464d5 100644 --- a/transport_plugins/bled112/RELEASE.md +++ b/transport_plugins/bled112/RELEASE.md @@ -2,6 +2,16 @@ All major changes in each released version of the bled112 transport plugin are listed here. +## HEAD + +- Refactor BLE broadcast encryption flags: three bits are treated as an enumeration +- Consilidate two authentication characteristics into one +- Update info characteristic +- Remove separate logic for NullKey encryption temp key generation, the temp key is + generated the same way it is done for user key +- Add the password-based authentication method +- Update the authentication flow + ## 3.0.3 - Fix bled112_auth error handling diff --git a/transport_plugins/bled112/iotile_transport_bled112/bled112.py b/transport_plugins/bled112/iotile_transport_bled112/bled112.py index 1bb5037f1..8bc7c4ee7 100644 --- a/transport_plugins/bled112/iotile_transport_bled112/bled112.py +++ b/transport_plugins/bled112/iotile_transport_bled112/bled112.py @@ -311,6 +311,10 @@ def send_rpc_async(self, conn_id, address, rpc_id, payload, timeout, callback): services = self._connections[found_handle]['services'] + if self.check_is_rpc_in_progress(found_handle, services): + callback(conn_id, self.id, False, 'RPC still in progress', None, None) + return + self._command_task.async_command(['_send_rpc', found_handle, services, address, rpc_id, payload, timeout], self._send_rpc_finished, {'connection_id': conn_id, 'handle': found_handle, 'callback': callback}) @@ -664,17 +668,13 @@ def _parse_v2_advertisement(self, rssi, sender, data): # bit 0: Has pending data to stream # bit 1: Low voltage indication # bit 2: User connected - # bit 3: Broadcast data is encrypted - # bit 4: Encryption key is device key - # bit 5: Encryption key is user key + # bit 3 - 5: Broadcast encryption key type # bit 6: broadcast data is time synchronized to avoid leaking # information about when it changes is_pending_data = bool(flags & (1 << 0)) is_low_voltage = bool(flags & (1 << 1)) is_user_connected = bool(flags & (1 << 2)) - is_encrypted = bool(flags & (1 << 3)) - is_device_key = bool(flags & (1 << 4)) - is_user_key = bool(flags & (1 << 5)) + broadcast_encryption_key_type = (flags >> 3) & 7 self._device_scan_counts.setdefault(device_id, {'v1': 0, 'v2': 0})['v2'] += 1 @@ -691,24 +691,17 @@ def _parse_v2_advertisement(self, rssi, sender, data): 'battery': battery / 32.0, 'advertising_version':2} - key_type = AuthProvider.NoKey - if is_encrypted: - if is_device_key: - key_type = AuthProvider.DeviceKey - elif is_user_key: - key_type = AuthProvider.UserKey - - if is_encrypted: + if broadcast_encryption_key_type: if not _HAS_CRYPTO: return info, timestamp, None, None, None, None, None try: - key = self._key_provider.get_rotated_key(key_type, device_id, + key = self._key_provider.get_rotated_key(broadcast_encryption_key_type, device_id, reboot_counter=reboots, rotation_interval_power=EPHEMERAL_KEY_CYCLE_POWER, current_timestamp=timestamp) except NotFoundError: - self._logger.warning("Key type {} is not found".format(key_type), exc_info=True) + self._logger.warning("Key type {} is not found".format(broadcast_encryption_key_type), exc_info=True) return info, timestamp, None, None, None, None, None nonce = generate_nonce(device_id, timestamp, reboot_low, reboot_high_packed, counter_packed) @@ -870,6 +863,27 @@ def check_authentication(self, uuid, conn_id, handle, services): 'handle': handle, 'services': services}) + def check_is_rpc_in_progress(self, handle, services): + """ Discover if the device handles RPC at the moment + + Another RPC should not be sent to the device if handling of previous + is not finished + + Args: + handle (int): a handle to the connection on the BLED112 dongle + services (dict): A dictionary of GATT services produced by probe_services() + + Returns: + bool: True if RPC is being handled at the moment + """ + RPC_IN_PROGRESS_FLAG = 0x0001 + try: + value = self._command_task.sync_command(["get_info_flags", handle, services]) + version, _, high_flags = struct.unpack("BBH16x", value['data']) + return (high_flags & RPC_IN_PROGRESS_FLAG) == 0x01 + except HardwareError: + return False + def initialize_system_sync(self): """Remove all active connections and query the maximum number of supported connections """ @@ -1125,8 +1139,8 @@ def _on_authentication_check_response(self, result): context = result['context'] if result['result']: - flags, = struct.unpack("B", result['return_value']['data']) - if flags == 0x01: + version, security_flags, _ = struct.unpack("BBH16x", result['return_value']['data']) + if security_flags == 0x01: self._logger.debug("Authentication is required") self.authenticate(context['uuid'], context['connection_id'], diff --git a/transport_plugins/bled112/iotile_transport_bled112/bled112_auth.py b/transport_plugins/bled112/iotile_transport_bled112/bled112_auth.py index 6d60c5386..6b081a150 100644 --- a/transport_plugins/bled112/iotile_transport_bled112/bled112_auth.py +++ b/transport_plugins/bled112/iotile_transport_bled112/bled112_auth.py @@ -13,11 +13,13 @@ import hashlib import hmac from iotile.core.hw.auth.auth_chain import ChainedAuthProvider +from iotile.core.exceptions import NotFoundError class AuthType(enum.Enum): AUTH_METHOD_0 = 1 << 0 # Null Key AUTH_METHOD_1 = 1 << 1 # User Key AUTH_METHOD_2 = 1 << 2 # Device Key + AUTH_METHOD_3 = 1 << 3 # Password based user key class State(enum.Enum): NOT_AUTHENTICATED = 0 @@ -33,8 +35,7 @@ class State(enum.Enum): class BLED112AuthManager: - def __init__(self, supported_auth, permissions, token_generation): - self._supported_auth = supported_auth + def __init__(self, permissions, token_generation): self._permissions = permissions self._token_generation = token_generation @@ -51,27 +52,30 @@ def __init__(self, supported_auth, permissions, token_generation): self._state = State.NOT_AUTHENTICATED - def _send_client_hello(self, command_processor, *command_processor_args): + def _send_client_hello(self, supported_auth, command_processor, *command_processor_args): """Initiate the authentication process Tell the server what authentication methods it supports and nonce """ - self._client_nonce = bytearray(b"\x02" * 16) #bytearray.fromhex(os.urandom(16)) - self._client_hello = struct.pack("xxxB16s", self._supported_auth, self._client_nonce) + self._client_nonce = bytearray(os.urandom(16)) + self._client_hello = struct.pack("xxxB16s", supported_auth, self._client_nonce) return command_processor.send_auth_client_request(self._client_hello, *command_processor_args) def _send_client_verify(self, session_key, auth_type, command_processor, *command_processor_args): """Verify session key between the client and the device""" - client_verify = hmac.new(session_key, self._client_hello + self._server_hello, hashlib.sha256).digest()[0:16] - self._client_verify = struct.pack("BBH16s", auth_type, self._permissions, self._token_generation, client_verify) + verify_data = self._client_hello + self._server_hello + client_verify = hmac.new(session_key, verify_data, hashlib.sha256).digest()[0:16] + + self._client_verify = struct.pack( + "BBH16s", auth_type.value, self._permissions, self._token_generation, client_verify) server_verify = command_processor.send_auth_client_request(self._client_verify, *command_processor_args) err, granted_permissions, server_verify = struct.unpack("BBxx16s", server_verify) return err, granted_permissions, server_verify - def _compute_session_key_security_level_1(self, root_key=None, user_token=None, scoped_token=None): + def _compute_session_key(self, root_key=None, user_token=None, scoped_token=None): if not scoped_token and not user_token and not root_key: return None @@ -80,21 +84,48 @@ def _compute_session_key_security_level_1(self, root_key=None, user_token=None, user_token = hmac.new(root_key, data, hashlib.sha256).digest() if not scoped_token: - data = struct.pack("I", self._permissions) + data = struct.pack("I", self._permissions) scoped_token = hmac.new(user_token, data, hashlib.sha256).digest() session_key = hmac.new(scoped_token, self._client_nonce + self._device_nonce, hashlib.sha256).digest() return session_key - def _compute_session_key_security_level_0(self, uuid=b'\x00'): - scoped_token = uuid + b'\x00' * (32 - len(uuid)) #FIXME uuid - return self._compute_session_key_security_level_1(scoped_token=scoped_token) - def _compute_server_verify(self, session_key): data = self._client_hello + self._server_hello + self._client_verify return hmac.new(session_key, data, hashlib.sha256).digest()[0:16] - def authenticate(self, uuid, auth_type, command_processor, *command_processor_args): + def _get_root_key(self, server_supported_auth, client_supported_auth, device_uuid): + """Find a root key to authenticate connection to the POD device + + Look up is based to supported method by both client and server + in order from DeviceKey to NullKey + """ + supported_auth = server_supported_auth & client_supported_auth + + def _try_to_find_key(auth_method, key_type): + _root_key = None + if supported_auth & auth_method.value: + try: + _root_key = self._key_provider.get_root_key(key_type=key_type, device_id=device_uuid) + except NotFoundError: + pass + return _root_key + + method_key_type_list = [ # order shows look up priority + (AuthType.AUTH_METHOD_2, ChainedAuthProvider.DeviceKey), + (AuthType.AUTH_METHOD_1, ChainedAuthProvider.UserKey), + (AuthType.AUTH_METHOD_3, ChainedAuthProvider.PasswordBasedKey), + (AuthType.AUTH_METHOD_0, ChainedAuthProvider.NullKey) + ] + + for (method, key) in method_key_type_list: + root_key = _try_to_find_key(method, key) + if root_key: + return root_key, method + + return None, None + + def authenticate(self, uuid, client_supported_auth, command_processor, *command_processor_args): """ Perform authentication @@ -107,14 +138,17 @@ def authenticate(self, uuid, auth_type, command_processor, *command_processor_ar The routine is expected to send a request to server, used to send both hello and verify messages, The device expects that messages are sent one after another unless server hello convey an error Returns: - (bool, bytearray/dict): tuple of success flag, sessin key or dict with reason of failure + (bool, bytearray/dict): tuple of success flag, session key or dict with reason of failure """ required_method = getattr(command_processor, "send_auth_client_request", None) if not required_method or not callable(required_method): return False, {"reason": "command_processor should implement send_auth_client_request"} - self._server_hello = self._send_client_hello(command_processor, *command_processor_args) - generation, err, server_supported_auth, self._device_nonce = struct.unpack("HBB16s", self._server_hello) + self._server_hello = self._send_client_hello( + client_supported_auth, command_processor, *command_processor_args) + + generation, err, server_supported_auth, self._device_nonce = \ + struct.unpack("HBB16s", self._server_hello) if err: return False, {"reason": "Server hello contains error code {}".format(err)} @@ -122,21 +156,11 @@ def authenticate(self, uuid, auth_type, command_processor, *command_processor_ar if generation > self._token_generation: return False, {"reason": "Server support only newer generation tokens {}".format(generation)} - try: - auth_method = AuthType(auth_type) - except ValueError: - return False, {"reason": "Auth type {} does not exist".format(auth_method)} - - if server_supported_auth & auth_type == 0: - return False, {"reason": "{} is not supported, bitmap of supported: {}".format(auth_method, bin(server_supported_auth))} - - if auth_method == AuthType.AUTH_METHOD_0: - self._session_key = self._compute_session_key_security_level_0() - elif auth_method == AuthType.AUTH_METHOD_1: - root_key = self._key_provider.get_root_key(key_type=ChainedAuthProvider.UserKey, device_id=uuid) - self._session_key = self._compute_session_key_security_level_1(root_key=root_key) - else: - return False, {"reason": "Auth type is not implemented {}".format(auth_method)} + root_key, auth_type = self._get_root_key(server_supported_auth, client_supported_auth, uuid) + if root_key is None: + return False, {"reason": "Root key is not found"} + + self._session_key = self._compute_session_key(root_key=root_key) err, granted_permissions, received_server_verify = \ self._send_client_verify(self._session_key, auth_type, command_processor, *command_processor_args) diff --git a/transport_plugins/bled112/iotile_transport_bled112/bled112_cmd.py b/transport_plugins/bled112/iotile_transport_bled112/bled112_cmd.py index 7b7ebc13f..0cf3aec41 100644 --- a/transport_plugins/bled112/iotile_transport_bled112/bled112_cmd.py +++ b/transport_plugins/bled112/iotile_transport_bled112/bled112_cmd.py @@ -11,7 +11,7 @@ from .bgapi_structures import process_gatt_service, process_attribute, process_read_handle, process_notification from .bgapi_structures import parse_characteristic_declaration from .async_packet import InternalTimeoutError, DeviceNotConfiguredError -from .bled112_auth import BLED112AuthManager +from .bled112_auth import AuthType, BLED112AuthManager BGAPIPacket = namedtuple("BGAPIPacket", ["is_event", "command_class", "command", "payload"]) @@ -262,17 +262,21 @@ def _probe_characteristics(self, conn, services, timeout=5.0): return True, {'services': services} - def _authenticate_async(self, uuid, conn_handle, services): - manager = BLED112AuthManager(0x01, 0x01, 0x01) - success, data = manager.authenticate(uuid, 0x02, self, conn_handle, services) + def _authenticate_async(self, device_uuid, conn_handle, services): + supported_auth = AuthType.AUTH_METHOD_0.value | AuthType.AUTH_METHOD_1.value \ + | AuthType.AUTH_METHOD_2.value | AuthType.AUTH_METHOD_3.value + permisions = 0x00 + token_gen = 0x01 + + manager = BLED112AuthManager(permisions, token_gen) + success, data = manager.authenticate(device_uuid, supported_auth, self, conn_handle, services) return success, data def send_auth_client_request(self, payload, conn_handle, services, timeout=1.0): - auth_server_write_handle = services[TileBusService]['characteristics'][TileBusAuthServerWriteCharacteristic]['handle'] - auth_client_write_handle = services[TileBusService]['characteristics'][TileBusAuthClientWriteCharacteristic]['handle'] + auth_handle = services[TileBusService]['characteristics'][TileBusAuthCharacteristic]['handle'] - result, value = self._write_handle(conn_handle, auth_client_write_handle, False, bytes(payload)) + result, value = self._write_handle(conn_handle, auth_handle, False, bytes(payload)) if not result: return result, value @@ -280,7 +284,7 @@ def send_auth_client_request(self, payload, conn_handle, services, timeout=1.0): def notified_payload(event): if event.command_class == 4 and event.command == 5: event_handle, att_handle = unpack("