diff --git a/.gitignore b/.gitignore index 18cb9dec..5eabca97 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ .tox/* __pycache__/* htmlcov/* +.coverage .coverage.* coverage.xml *.pyc diff --git a/README.rst b/README.rst index f99e40e7..7e87da57 100644 --- a/README.rst +++ b/README.rst @@ -45,59 +45,86 @@ This library was built with the intention of allowing easy communication with Bl Quick Start ============= -The simplest way to use this package from a terminal is to call ``Blink.start()`` which will prompt for your Blink username and password and then log you in. Alternatively, you can instantiate the Blink class with a username and password, and call ``Blink.start()`` to login and setup without prompt, as shown below. In addition, http requests are throttled internally via use of the ``Blink.refresh_rate`` variable, which can be set at initialization and defaults to 30 seconds. +The simplest way to use this package from a terminal is to call ``Blink.start()`` which will prompt for your Blink username and password and then log you in. In addition, http requests are throttled internally via use of the ``Blink.refresh_rate`` variable, which can be set at initialization and defaults to 30 seconds. .. code:: python - from blinkpy import blinkpy - blink = blinkpy.Blink(username='YOUR USER NAME', password='YOUR PASSWORD', refresh_rate=30) + from blinkpy.blinkpy import Blink + + blink = Blink() blink.start() -At startup, you may be prompted for a verification key. Just enter this in the command-line prompt. If you just receive a verification email asking to validate access for your device, enter nothing at this prompt. To avoid any command-line interaction, call the ``Blink`` class with the ``no_prompt=True`` flag. Instead, once you receive the verification email, call the following functions: + +This flow will prompt you for your username and password. Once entered, if you likely will need to send a 2FA key to the blink servers (this pin is sent to your email address). When you receive this pin, enter at the prompt and the Blink library will proceed with setup. + +Starting blink without a prompt +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +In some cases, having an interactive command-line session is not desired. In this case, you will need to set the ``Blink.auth.no_prompt`` value to ``True``. In addition, since you will not be prompted with a username and password, you must supply the login data to the blink authentication handler. This is best done by instantiating your own auth handler with a dictionary containing at least your username and password. .. code:: python - blink.login_handler.send_auth_key(blink, VERIFICATION_KEY) + from blinkpy.blinkpy import Blink + from blinkpy.auth import Auth + + blink = Blink() + # Can set no_prompt when initializing auth handler + auth = Auth({"username": , "password": }, no_prompt=True) + blink.auth = auth + blink.start() + + +Since you will not be prompted for any 2FA pin, you must call the ``blink.auth.send_auth_key`` function. There are two required parameters: the ``blink`` object as well as the ``key`` you received from Blink for 2FA: + +.. code:: python + + auth.send_auth_key(blink, ) blink.setup_post_verify() -In addition, you can also save your credentials in a json file and initialize Blink with the credential file as follows: + +Supplying credentials from file +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Other use cases may involved loading credentials from a file. This file must be ``json`` formatted and contain a minimum of ``username`` and ``password``. A built in function in the ``blinkpy.helpers.util`` module can aid in loading this file. Note, if ``no_prompt`` is desired, a similar flow can be followed as above. .. code:: python - from blinkpy import blinkpy - blink = blinkpy.Blink(cred_file="path/to/credentials.json") + from blinkpy.blinkpy import Blink + from blinkpy.auth import Auth + from blinkpy.helpers.util import json_load + + blink = Blink() + auth = Auth(json_load("")) + blink.auth = auth blink.start() -The credential file must be json formatted with a ``username`` and ``password`` key like follows: -.. code:: json +Saving credentials +~~~~~~~~~~~~~~~~~~~ +This library also allows you to save your credentials to use in future sessions. Saved information includes authentication tokens as well as unique ids which should allow for a more streamlined experience and limits the frequency of login requests. This data can be saved as follows (it can then be loaded by following the instructions above for supplying credentials from a file): + +.. code:: python + + blink.save("") - { - "username": "YOUR USER NAME", - "password": "YOUR PASSWORD" - } +Getting cameras +~~~~~~~~~~~~~~~~ Cameras are instantiated as individual ``BlinkCamera`` classes within a ``BlinkSyncModule`` instance. All of your sync modules are stored within the ``Blink.sync`` dictionary and can be accessed using the name of the sync module as the key (this is the name of your sync module in the Blink App). The below code will display cameras and their available attributes: .. code:: python - from blinkpy import blinkpy - - blink = blinkpy.Blink(username='YOUR USER NAME', password='YOUR PASSWORD') - blink.start() - for name, camera in blink.cameras.items(): print(name) # Name of the camera print(camera.attributes) # Print available attributes of camera -The most recent images and videos can be accessed as a bytes-object via internal variables. These can be updated with calls to ``Blink.refresh()`` but will only make a request if motion has been detected or other changes have been found. This can be overridden with the ``force_cache`` flag, but this should be used for debugging only since it overrides the internal request throttling. + +The most recent images and videos can be accessed as a bytes-object via internal variables. These can be updated with calls to ``Blink.refresh()`` but will only make a request if motion has been detected or other changes have been found. This can be overridden with the ``force`` flag, but this should be used for debugging only since it overrides the internal request throttling. .. code:: python camera = blink.cameras['SOME CAMERA NAME'] - blink.refresh(force_cache=True) # force a cache update USE WITH CAUTION + blink.refresh(force=True) # force a cache update USE WITH CAUTION camera.image_from_cache.raw # bytes-like image object (jpg) camera.video_from_cache.raw # bytes-like video object (mp4) @@ -110,15 +137,15 @@ The ``blinkpy`` api also allows for saving images and videos to a file and snapp blink.refresh() # Get new information from server camera.image_to_file('/local/path/for/image.jpg') camera.video_to_file('/local/path/for/video.mp4') - + +Download videos +~~~~~~~~~~~~~~~~ You can also use this library to download all videos from the server. In order to do this, you must specify a ``path``. You may also specifiy a how far back in time to go to retrieve videos via the ``since=`` variable (a simple string such as ``"2017/09/21"`` is sufficient), as well as how many pages to traverse via the ``page=`` variable. Note that by default, the library will search the first ten pages which is sufficient in most use cases. Additionally, you can specidy one or more cameras via the ``camera=`` property. This can be a single string indicating the name of the camera, or a list of camera names. By default, it is set to the string ``'all'`` to grab videos from all cameras. Example usage, which downloads all videos recorded since July 4th, 2018 at 9:34am to the ``/home/blink`` directory: .. code:: python - blink = blinkpy.Blink(username="YOUR USER NAME", password="YOUR PASSWORD") - blink.start() blink.download_videos('/home/blink', since='2018/07/04 09:34') diff --git a/blinkpy/api.py b/blinkpy/api.py index c850a2b9..2df91274 100644 --- a/blinkpy/api.py +++ b/blinkpy/api.py @@ -2,8 +2,7 @@ import logging from json import dumps -import blinkpy.helpers.errors as ERROR -from blinkpy.helpers.util import http_req, get_time, BlinkException, Throttle +from blinkpy.helpers.util import get_time, Throttle from blinkpy.helpers.constants import DEFAULT_URL _LOGGER = logging.getLogger(__name__) @@ -12,45 +11,31 @@ def request_login( - blink, - url, - username, - password, - notification_key, - uid, - is_retry=False, - device_id="Blinkpy", + auth, url, login_data, is_retry=False, ): """ Login request. - :param blink: Blink instance. + :param auth: Auth instance. :param url: Login url. - :param username: Blink username. - :param password: Blink password. - :param notification_key: Randomly genereated key. - :param uid: Randomly generated unique id key. - :param is_retry: Is this part of a re-authorization attempt? - :param device_id: Name of application to send at login. + :login_data: Dictionary containing blink login data. """ headers = {"Host": DEFAULT_URL, "Content-Type": "application/json"} data = dumps( { - "email": username, - "password": password, - "notification_key": notification_key, - "unique_id": uid, + "email": login_data["username"], + "password": login_data["password"], + "notification_key": login_data["notification_key"], + "unique_id": login_data["uid"], "app_version": "6.0.7 (520300) #afb0be72a", + "device_identifier": login_data["device_id"], "client_name": "Computer", "client_type": "android", - "device_identifier": device_id, - "device_name": "Blinkpy", "os_version": "5.1.1", "reauth": "true", } ) - return http_req( - blink, + return auth.query( url=url, headers=headers, data=data, @@ -60,19 +45,14 @@ def request_login( ) -def request_verify(blink, verify_key): +def request_verify(auth, blink, verify_key): """Send verification key to blink servers.""" url = "{}/api/v4/account/{}/client/{}/pin/verify".format( blink.urls.base_url, blink.account_id, blink.client_id ) data = dumps({"pin": verify_key}) - return http_req( - blink, - url=url, - headers=blink.auth_header, - data=data, - json_resp=False, - reqtype="post", + return auth.query( + url=url, headers=auth.header, data=data, json_resp=False, reqtype="post", ) @@ -288,13 +268,10 @@ def http_get(blink, url, stream=False, json=True, is_retry=False): :param json: Return json response? TRUE/False :param is_retry: Is this part of a re-auth attempt? """ - if blink.auth_header is None: - raise BlinkException(ERROR.AUTH_TOKEN) _LOGGER.debug("Making GET request to %s", url) - return http_req( - blink, + return blink.auth.query( url=url, - headers=blink.auth_header, + headers=blink.auth.header, reqtype="get", stream=stream, json_resp=json, @@ -309,9 +286,7 @@ def http_post(blink, url, is_retry=False): :param url: URL to perfom post request. :param is_retry: Is this part of a re-auth attempt? """ - if blink.auth_header is None: - raise BlinkException(ERROR.AUTH_TOKEN) _LOGGER.debug("Making POST request to %s", url) - return http_req( - blink, url=url, headers=blink.auth_header, reqtype="post", is_retry=is_retry + return blink.auth.query( + url=url, headers=blink.auth.header, reqtype="post", is_retry=is_retry ) diff --git a/blinkpy/auth.py b/blinkpy/auth.py new file mode 100644 index 00000000..778daf35 --- /dev/null +++ b/blinkpy/auth.py @@ -0,0 +1,194 @@ +"""Login handler for blink.""" +import logging +from functools import partial +from requests import Request, Session, exceptions +from blinkpy import api +from blinkpy.helpers import util +from blinkpy.helpers.constants import BLINK_URL, LOGIN_ENDPOINT +from blinkpy.helpers import errors as ERROR + +_LOGGER = logging.getLogger(__name__) + + +class Auth: + """Class to handle login communication.""" + + def __init__(self, login_data=None, no_prompt=False): + """ + Initialize auth handler. + + :param login_data: dictionary for login data + must contain the following: + - username + - password + :param no_prompt: Should any user input prompts + be supressed? True/FALSE + """ + if login_data is None: + login_data = {} + self.data = login_data + self.token = login_data.get("token", None) + self.host = login_data.get("host", None) + self.region_id = login_data.get("region_id", None) + self.client_id = login_data.get("client_id", None) + self.account_id = login_data.get("account_id", None) + self.login_response = None + self.no_prompt = no_prompt + self.session = self.create_session() + + @property + def login_attributes(self): + """Return a dictionary of login attributes.""" + self.data["token"] = self.token + self.data["host"] = self.host + self.data["region_id"] = self.region_id + self.data["client_id"] = self.client_id + self.data["account_id"] = self.account_id + return self.data + + @property + def header(self): + """Return authorization header.""" + if self.token is None: + return None + return {"Host": self.host, "TOKEN_AUTH": self.token} + + def create_session(self): + """Create a session for blink communication.""" + sess = Session() + sess.get = partial(sess.get, timeout=10) + return sess + + def prepare_request(self, url, headers, data, reqtype): + """Prepare a request.""" + req = Request(reqtype.upper(), url, headers=headers, data=data) + return req.prepare() + + def validate_login(self): + """Check login information and prompt if not available.""" + self.data["username"] = self.data.get("username", None) + self.data["password"] = self.data.get("password", None) + if not self.no_prompt: + self.data = util.prompt_login_data(self.data) + + self.data = util.validate_login_data(self.data) + + def login(self, login_url=LOGIN_ENDPOINT): + """Attempt login to blink servers.""" + self.validate_login() + _LOGGER.info("Attempting login with %s", login_url) + response = api.request_login(self, login_url, self.data, is_retry=False,) + try: + if response.status_code == 200: + return response.json() + raise LoginError + except AttributeError: + raise LoginError + + def refresh_token(self): + """Refresh auth token.""" + try: + _LOGGER.info("Token expired, attempting automatic refresh.") + self.login_response = self.login() + self.region_id = self.login_response["region"]["tier"] + self.host = f"{self.region_id}.{BLINK_URL}" + self.token = self.login_response["authtoken"]["authtoken"] + self.client_id = self.login_response["client"]["id"] + self.account_id = self.login_response["account"]["id"] + except KeyError: + _LOGGER.error("Malformed login response: %s", self.login_response) + raise TokenRefreshFailed + return True + + def startup(self): + """Initialize tokens for communication.""" + self.validate_login() + if None in self.login_attributes.values(): + self.refresh_token() + + def validate_response(self, response, json_resp): + """Check for valid response.""" + if not json_resp: + return response + + json_data = response.json() + try: + if json_data["code"] in ERROR.BLINK_ERRORS: + raise exceptions.ConnectionError + except KeyError: + pass + + return json_data + + def query( + self, + url=None, + data=None, + headers=None, + reqtype="get", + stream=False, + json_resp=True, + is_retry=False, + ): + """ + Perform server requests. + + :param url: URL to perform request + :param data: Data to send + :param headers: Headers to send + :param reqtype: Can be 'get' or 'post' (default: 'get') + :param stream: Stream response? True/FALSE + :param json_resp: Return JSON response? TRUE/False + :param is_retry: Is this a retry attempt? True/FALSE + """ + req = self.prepare_request(url, headers, data, reqtype) + try: + response = self.session.send(req, stream=stream) + return self.validate_response(response, json_resp) + + except (exceptions.ConnectionError, exceptions.Timeout, TokenRefreshFailed): + try: + if not is_retry: + self.refresh_token() + return self.query( + url=url, + data=data, + headers=headers, + reqtype=reqtype, + stream=stream, + json_resp=json_resp, + is_retry=True, + ) + except (TokenRefreshFailed, LoginError): + _LOGGER.error("Endpoint %s failed. Unable to refresh login tokens", url) + _LOGGER.error("Endpoint %s failed", url) + return None + + def send_auth_key(self, blink, key): + """Send 2FA key to blink servers.""" + if key is not None: + response = api.request_verify(self, blink, key) + try: + json_resp = response.json() + blink.available = json_resp["valid"] + except (KeyError, TypeError): + _LOGGER.error("Did not receive valid response from server.") + return False + return True + + def check_key_required(self): + """Check if 2FA key is required.""" + try: + if self.login_response["client"]["verification_required"]: + return True + except (KeyError, TypeError): + pass + return False + + +class TokenRefreshFailed(Exception): + """Class to throw failed refresh exception.""" + + +class LoginError(Exception): + """Class to throw failed login exception.""" diff --git a/blinkpy/blinkpy.py b/blinkpy/blinkpy.py index 48091140..ae4fc737 100644 --- a/blinkpy/blinkpy.py +++ b/blinkpy/blinkpy.py @@ -24,22 +24,14 @@ from blinkpy import api from blinkpy.sync_module import BlinkSyncModule -from blinkpy.helpers.util import ( - create_session, - merge_dicts, - get_time, - BlinkURLHandler, - Throttle, -) +from blinkpy.helpers import util from blinkpy.helpers.constants import ( - BLINK_URL, DEFAULT_MOTION_INTERVAL, DEFAULT_REFRESH, MIN_THROTTLE_TIME, - LOGIN_URLS, ) from blinkpy.helpers.constants import __version__ -from blinkpy.login_handler import LoginHandler +from blinkpy.auth import Auth, TokenRefreshFailed, LoginError _LOGGER = logging.getLogger(__name__) @@ -49,202 +41,161 @@ class Blink: """Class to initialize communication.""" def __init__( - self, - username=None, - password=None, - cred_file=None, - refresh_rate=DEFAULT_REFRESH, - motion_interval=DEFAULT_MOTION_INTERVAL, - legacy_subdomain=False, - no_prompt=False, - persist_key=None, - device_id="Blinkpy", + self, refresh_rate=DEFAULT_REFRESH, motion_interval=DEFAULT_MOTION_INTERVAL, ): """ Initialize Blink system. - :param username: Blink username (usually email address) - :param password: Blink password - :param cred_file: JSON formatted file to store credentials. - If username and password are given, file - is ignored. Otherwise, username and password - are loaded from file. :param refresh_rate: Refresh rate of blink information. Defaults to 15 (seconds) :param motion_interval: How far back to register motion in minutes. Defaults to last refresh time. Useful for preventing motion_detected property from de-asserting too quickly. - :param legacy_subdomain: Set to TRUE to use old 'rest.region' - endpoints (only use if you are having - api issues). - :param no_prompt: Set to TRUE if using an implementation that needs to - suppress command-line output. - :param persist_key: Location of persistant identifier. - :param device_id: Identifier for the application. Default is 'Blinkpy'. - This is used when logging in and should be changed to - fit the implementation (ie. "Home Assistant" in a - Home Assistant integration). """ - self.login_handler = LoginHandler( - username=username, - password=password, - cred_file=cred_file, - persist_key=persist_key, - device_id=device_id, - ) - self._token = None - self._auth_header = None - self._host = None + self.auth = Auth() self.account_id = None self.client_id = None self.network_ids = [] self.urls = None self.sync = CaseInsensitiveDict({}) - self.region = None - self.region_id = None self.last_refresh = None self.refresh_rate = refresh_rate - self.session = create_session() self.networks = [] self.cameras = CaseInsensitiveDict({}) self.video_list = CaseInsensitiveDict({}) - self.login_url = LOGIN_URLS[0] - self.login_urls = [] self.motion_interval = motion_interval self.version = __version__ - self.legacy = legacy_subdomain - self.no_prompt = no_prompt self.available = False self.key_required = False - self.login_response = {} - - @property - def auth_header(self): - """Return the authentication header.""" - return self._auth_header - def start(self): + @util.Throttle(seconds=MIN_THROTTLE_TIME) + def refresh(self, force=False): """ - Perform full system setup. + Perform a system refresh. - Method logs in and sets auth token, urls, and ids for future requests. - Essentially this is just a wrapper function for ease of use. + :param force: Force an update of the camera data """ - if not self.available: - self.get_auth_token() - - if self.key_required and not self.no_prompt: - email = self.login_handler.data["username"] - key = input("Enter code sent to {}: ".format(email)) - result = self.login_handler.send_auth_key(self, key) - self.key_required = not result - self.setup_post_verify() - elif not self.key_required: - self.setup_post_verify() + if self.check_if_ok_to_update() or force: + if not self.available: + self.auth.refresh_token() + self.setup_post_verify() - def setup_post_verify(self): - """Initialize blink system after verification.""" - camera_list = self.get_cameras() - networks = self.get_ids() - for network_name, network_id in networks.items(): - if network_id not in camera_list.keys(): - camera_list[network_id] = {} - _LOGGER.warning("No cameras found for %s", network_name) - sync_module = BlinkSyncModule( - self, network_name, network_id, camera_list[network_id] - ) - sync_module.start() - self.sync[network_name] = sync_module - self.cameras = self.merge_cameras() - self.available = self.refresh() - self.key_required = False + for sync_name, sync_module in self.sync.items(): + _LOGGER.debug("Attempting refresh of sync %s", sync_name) + sync_module.refresh(force_cache=force) + if not force: + # Prevents rapid clearing of motion detect property + self.last_refresh = int(time.time()) + return True + return False - def login(self): - """Perform server login. DEPRECATED.""" - _LOGGER.warning( - "Method is deprecated and will be removed in a future version. Please use the LoginHandler.login() method instead." - ) - return self.login_handler.login(self) - - def get_auth_token(self, is_retry=False): - """Retrieve the authentication token from Blink.""" - self.login_response = self.login_handler.login(self) - if not self.login_response: + def start(self): + """Perform full system setup.""" + try: + self.auth.startup() + self.setup_login_ids() + self.setup_urls() + except (LoginError, TokenRefreshFailed, BlinkSetupError): + _LOGGER.error("Cannot setup Blink platform.") self.available = False return False - self.setup_params(self.login_response) - if self.login_handler.check_key_required(self): - self.key_required = True - return self._auth_header - - def setup_params(self, response): - """Retrieve blink parameters from login response.""" - self.login_url = self.login_handler.login_url - ((self.region_id, self.region),) = response["region"].items() - self._host = "{}.{}".format(self.region_id, BLINK_URL) - self._token = response["authtoken"]["authtoken"] - self._auth_header = {"Host": self._host, "TOKEN_AUTH": self._token} - self.urls = BlinkURLHandler(self.region_id, legacy=self.legacy) - self.networks = self.get_networks() - self.client_id = response["client"]["id"] - self.account_id = response["account"]["id"] - - def get_networks(self): - """Get network information.""" - response = api.request_networks(self) + + self.key_required = self.auth.check_key_required() + if self.key_required: + if self.auth.no_prompt: + return True + self.setup_prompt_2fa() + return self.setup_post_verify() + + def setup_prompt_2fa(self): + """Prompt for 2FA.""" + email = self.auth.data["username"] + pin = input(f"Enter code sent to {email}: ") + result = self.auth.send_auth_key(self, pin) + self.key_required = not result + + def setup_post_verify(self): + """Initialize blink system after verification.""" try: - return response["summary"] - except KeyError: - return None + self.setup_networks() + networks = self.setup_network_ids() + cameras = self.setup_camera_list() + except BlinkSetupError: + self.available = False + return False - def get_ids(self): - """Set the network ID and Account ID.""" - all_networks = [] - network_dict = {} - for network, status in self.networks.items(): - if status["onboarded"]: - all_networks.append("{}".format(network)) - network_dict[status["name"]] = network + for name, network_id in networks.items(): + sync_cameras = cameras.get(network_id, {}) + self.setup_sync_module(name, network_id, sync_cameras) + self.cameras = self.merge_cameras() - self.network_ids = all_networks - return network_dict + self.available = True + self.key_required = False + return True + + def setup_sync_module(self, name, network_id, cameras): + """Initialize a sync module.""" + self.sync[name] = BlinkSyncModule(self, name, network_id, cameras) + self.sync[name].start() - def get_cameras(self): - """Retrieve a camera list for each onboarded network.""" + def setup_camera_list(self): + """Create camera list for onboarded networks.""" + all_cameras = {} response = api.request_homescreen(self) try: - all_cameras = {} for camera in response["cameras"]: camera_network = str(camera["network_id"]) - camera_name = camera["name"] - camera_id = camera["id"] - camera_info = {"name": camera_name, "id": camera_id} if camera_network not in all_cameras: all_cameras[camera_network] = [] - - all_cameras[camera_network].append(camera_info) + all_cameras[camera_network].append( + {"name": camera["name"], "id": camera["id"]} + ) return all_cameras except KeyError: - _LOGGER.error("Initialization failue. Could not retrieve cameras.") - return {} + _LOGGER.error("Unable to retrieve cameras from response %s", response) + raise BlinkSetupError - @Throttle(seconds=MIN_THROTTLE_TIME) - def refresh(self, force_cache=False): - """ - Perform a system refresh. + def setup_login_ids(self): + """Retrieve login id numbers from login response.""" + self.client_id = self.auth.client_id + self.account_id = self.auth.account_id - :param force_cache: Force an update of the camera cache - """ - if self.check_if_ok_to_update() or force_cache: - for sync_name, sync_module in self.sync.items(): - _LOGGER.debug("Attempting refresh of sync %s", sync_name) - sync_module.refresh(force_cache=force_cache) - if not force_cache: - # Prevents rapid clearing of motion detect property - self.last_refresh = int(time.time()) - return True - return False + def setup_urls(self): + """Create urls for api.""" + try: + self.urls = util.BlinkURLHandler(self.auth.region_id) + except TypeError: + _LOGGER.error( + "Unable to extract region is from response %s", self.auth.login_response + ) + raise BlinkSetupError + + def setup_networks(self): + """Get network information.""" + response = api.request_networks(self) + try: + self.networks = response["summary"] + except KeyError: + raise BlinkSetupError + + def setup_network_ids(self): + """Create the network ids for onboarded networks.""" + all_networks = [] + network_dict = {} + try: + for network, status in self.networks.items(): + if status["onboarded"]: + all_networks.append(f"{network}") + network_dict[status["name"]] = network + except AttributeError: + _LOGGER.error( + "Unable to retrieve network information from %s", self.networks + ) + raise BlinkSetupError + + self.network_ids = all_networks + return network_dict def check_if_ok_to_update(self): """Check if it is ok to perform an http request.""" @@ -260,9 +211,13 @@ def merge_cameras(self): """Merge all sync camera dicts into one.""" combined = CaseInsensitiveDict({}) for sync in self.sync: - combined = merge_dicts(combined, self.sync[sync].cameras) + combined = util.merge_dicts(combined, self.sync[sync].cameras) return combined + def save(self, file_name): + """Save login data to file.""" + util.json_save(self.auth.login_attributes, file_name) + def download_videos(self, path, since=None, camera="all", stop=10, debug=False): """ Download all videos from server since specified time. @@ -283,7 +238,7 @@ def download_videos(self, path, since=None, camera="all", stop=10, debug=False): parsed_datetime = parse(since, fuzzy=True) since_epochs = parsed_datetime.timestamp() - formatted_date = get_time(time_to_convert=since_epochs) + formatted_date = util.get_time(time_to_convert=since_epochs) _LOGGER.info("Retrieving videos since %s", formatted_date) if not isinstance(camera, list): @@ -343,3 +298,7 @@ def _parse_downloaded_items(self, result, camera, path, debug): camera_name, created_at, address, filename ) ) + + +class BlinkSetupError(Exception): + """Class to handle setup errors.""" diff --git a/blinkpy/camera.py b/blinkpy/camera.py index a2f05bed..de97c5a3 100644 --- a/blinkpy/camera.py +++ b/blinkpy/camera.py @@ -110,7 +110,7 @@ def update(self, config, force_cache=False, **kwargs): ) try: self.temperature_calibrated = resp["temp"] - except KeyError: + except (TypeError, KeyError): self.temperature_calibrated = self.temperature _LOGGER.warning("Could not retrieve calibrated temperature.") diff --git a/blinkpy/helpers/constants.py b/blinkpy/helpers/constants.py index 097cd35c..e42c1e6f 100644 --- a/blinkpy/helpers/constants.py +++ b/blinkpy/helpers/constants.py @@ -48,10 +48,10 @@ BLINK_URL = "immedia-semi.com" DEFAULT_URL = "{}.{}".format("rest-prod", BLINK_URL) BASE_URL = "https://{}".format(DEFAULT_URL) +LOGIN_ENDPOINT = "{}/api/v4/account/login".format(BASE_URL) LOGIN_URLS = [ - "{}/api/v4/login".format(BASE_URL), + "{}/api/v4/account/login".format(BASE_URL), "{}/api/v3/login".format(BASE_URL), - "{}/api/v2/login".format(BASE_URL), ] """ @@ -62,6 +62,7 @@ """ OTHER """ +DEVICE_ID = "Blinkpy" TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S%z" DEFAULT_MOTION_INTERVAL = 1 DEFAULT_REFRESH = 30 diff --git a/blinkpy/helpers/util.py b/blinkpy/helpers/util.py index 9079e3d3..3bc7380f 100644 --- a/blinkpy/helpers/util.py +++ b/blinkpy/helpers/util.py @@ -1,19 +1,38 @@ """Useful functions for blinkpy.""" +import json import logging import time import secrets from calendar import timegm -from functools import partial, wraps -from requests import Request, Session, exceptions +from functools import wraps +from getpass import getpass import dateutil.parser -from blinkpy.helpers.constants import BLINK_URL, TIMESTAMP_FORMAT -import blinkpy.helpers.errors as ERROR +from blinkpy.helpers import constants as const _LOGGER = logging.getLogger(__name__) +def json_load(file_name): + """Load json credentials from file.""" + try: + with open(file_name, "r") as json_file: + data = json.load(json_file) + return data + except FileNotFoundError: + _LOGGER.error("Could not find %s", file_name) + except json.decoder.JSONDecodeError: + _LOGGER.error("File %s has improperly formatted json", file_name) + return None + + +def json_save(data, file_name): + """Save data to file location.""" + with open(file_name, "w") as json_file: + json.dump(data, json_file, indent=4) + + def gen_uid(size): """Create a random sring.""" full_token = secrets.token_hex(size) @@ -34,7 +53,7 @@ def get_time(time_to_convert=None): """Create blink-compatible timestamp.""" if time_to_convert is None: time_to_convert = time.time() - return time.strftime(TIMESTAMP_FORMAT, time.gmtime(time_to_convert)) + return time.strftime(const.TIMESTAMP_FORMAT, time.gmtime(time_to_convert)) def merge_dicts(dict_a, dict_b): @@ -48,103 +67,25 @@ def merge_dicts(dict_a, dict_b): return {**dict_a, **dict_b} -def create_session(): - """ - Create a session for blink communication. - - From @ericfrederich via - https://github.com/kennethreitz/requests/issues/2011 - """ - sess = Session() - sess.get = partial(sess.get, timeout=5) - return sess - - -def attempt_reauthorization(blink): - """Attempt to refresh auth token and links.""" - _LOGGER.info("Auth token expired, attempting reauthorization.") - headers = blink.get_auth_token(is_retry=True) - return headers - - -def http_req( - blink, - url="http://example.com", - data=None, - headers=None, - reqtype="get", - stream=False, - json_resp=True, - is_retry=False, -): - """ - Perform server requests and check if reauthorization neccessary. - - :param blink: Blink instance - :param url: URL to perform request - :param data: Data to send (default: None) - :param headers: Headers to send (default: None) - :param reqtype: Can be 'get' or 'post' (default: 'get') - :param stream: Stream response? True/FALSE - :param json_resp: Return JSON response? TRUE/False - :param is_retry: Is this a retry attempt? True/FALSE - """ - if reqtype == "post": - req = Request("POST", url, headers=headers, data=data) - elif reqtype == "get": - req = Request("GET", url, headers=headers) - else: - _LOGGER.error("Invalid request type: %s", reqtype) - raise BlinkException(ERROR.REQUEST) - - prepped = req.prepare() +def prompt_login_data(data): + """Prompt user for username and password.""" + if data["username"] is None: + data["username"] = input("Username:") + if data["password"] is None: + data["password"] = getpass("Password:") - try: - response = blink.session.send(prepped, stream=stream) - if json_resp and "code" in response.json(): - resp_dict = response.json() - code = resp_dict["code"] - message = resp_dict["message"] - if is_retry and code in ERROR.BLINK_ERRORS: - _LOGGER.error("Cannot obtain new token for server auth.") - return None - elif code in ERROR.BLINK_ERRORS: - headers = attempt_reauthorization(blink) - if not headers: - raise exceptions.ConnectionError - return http_req( - blink, - url=url, - data=data, - headers=headers, - reqtype=reqtype, - stream=stream, - json_resp=json_resp, - is_retry=True, - ) - _LOGGER.warning("Response from server: %s - %s", code, message) - - except (exceptions.ConnectionError, exceptions.Timeout): - _LOGGER.info("Cannot connect to server with url %s.", url) - if not is_retry: - headers = attempt_reauthorization(blink) - return http_req( - blink, - url=url, - data=data, - headers=headers, - reqtype=reqtype, - stream=stream, - json_resp=json_resp, - is_retry=True, - ) - _LOGGER.error("Endpoint %s failed. Possible issue with Blink servers.", url) - return None - - if json_resp: - return response.json() - - return response + return data + + +def validate_login_data(data): + """Check for missing keys.""" + data["uid"] = data.get("uid", gen_uid(const.SIZE_UID)) + data["notification_key"] = data.get( + "notification_key", gen_uid(const.SIZE_NOTIFICATION_KEY) + ) + data["device_id"] = data.get("device_id", const.DEVICE_ID) + + return data class BlinkException(Exception): @@ -164,17 +105,17 @@ class BlinkAuthenticationException(BlinkException): class BlinkURLHandler: """Class that handles Blink URLS.""" - def __init__(self, region_id, legacy=False): + def __init__(self, region_id): """Initialize the urls.""" - self.subdomain = "rest-{}".format(region_id) - if legacy: - self.subdomain = "rest.{}".format(region_id) - self.base_url = "https://{}.{}".format(self.subdomain, BLINK_URL) - self.home_url = "{}/homescreen".format(self.base_url) - self.event_url = "{}/events/network".format(self.base_url) - self.network_url = "{}/network".format(self.base_url) - self.networks_url = "{}/networks".format(self.base_url) - self.video_url = "{}/api/v2/videos".format(self.base_url) + if region_id is None: + raise TypeError + self.subdomain = f"rest-{region_id}" + self.base_url = f"https://{self.subdomain}.{const.BLINK_URL}" + self.home_url = f"{self.base_url}/homescreen" + self.event_url = f"{self.base_url}/events/network" + self.network_url = f"{self.base_url}/network" + self.networks_url = f"{self.base_url}/networks" + self.video_url = f"{self.base_url}/api/v2/videos" _LOGGER.debug("Setting base url to %s.", self.base_url) diff --git a/blinkpy/login_handler.py b/blinkpy/login_handler.py deleted file mode 100644 index 2e978f75..00000000 --- a/blinkpy/login_handler.py +++ /dev/null @@ -1,154 +0,0 @@ -"""Login handler for blink.""" -import json -import logging -from os.path import isfile -from getpass import getpass -from blinkpy import api -from blinkpy.helpers import util -from blinkpy.helpers import constants as const - -_LOGGER = logging.getLogger(__name__) - - -class LoginHandler: - """Class to handle login communication.""" - - def __init__( - self, - username=None, - password=None, - cred_file=None, - persist_key=None, - device_id="Blinkpy", - ): - """ - Initialize login handler. - - :param username: Blink username - :param password: Blink password - :param cred_file: JSON formatted credential file. - :param persist_key: File location of persistant key. - :param device_id: Name of application to send at login. - """ - self.login_url = None - self.login_urls = const.LOGIN_URLS - self.cred_file = cred_file - self.persist_key = persist_key - self.device_id = device_id - self.data = { - "username": username, - "password": password, - "uid": None, - "notification_key": None, - } - - self.check_keys() - - def check_keys(self): - """Check if uid exists, if not create.""" - uid = util.gen_uid(const.SIZE_UID) - notification_key = util.gen_uid(const.SIZE_NOTIFICATION_KEY) - data = {"uid": uid, "notification_key": notification_key} - if self.persist_key is None: - return data - if not isfile(self.persist_key): - with open(self.persist_key, "w") as json_file: - json.dump(data, json_file) - else: - with open(self.persist_key, "r") as json_file: - data = json.load(json_file) - return data - - def check_cred_file(self): - """Check if credential file supplied and use if so.""" - if isfile(self.cred_file): - try: - with open(self.cred_file, "r") as json_file: - creds = json.load(json_file) - self.data["username"] = creds["username"] - self.data["password"] = creds["password"] - - except ValueError: - _LOGGER.error( - "Improperly formatted json file %s.", self.cred_file, exc_info=True - ) - return False - - except KeyError: - _LOGGER.error("JSON file information incomplete %s.", exc_info=True) - return False - return True - return False - - def check_login(self): - """Check login information and prompt if not available.""" - if self.data["username"] is None: - self.data["username"] = input("Username:") - if self.data["password"] is None: - self.data["password"] = getpass("Password:") - - if self.data["username"] and self.data["password"]: - return True - return False - - def validate_response(self, url, response): - """Validate response from login endpoint.""" - try: - if response.status_code != 200: - return False - except AttributeError: - _LOGGER.error( - "Response for %s did not return a status code. Deprecated endpoint?", - url, - ) - return False - return True - - def login(self, blink): - """Attempt login to blink servers.""" - if self.cred_file is not None: - self.check_cred_file() - if not self.check_login(): - _LOGGER.error("Cannot login with username %s", self.data["username"]) - return False - - for url in self.login_urls: - _LOGGER.info("Attempting login with %s", url) - response = api.request_login( - blink, - url, - self.data["username"], - self.data["password"], - self.data["notification_key"], - self.data["uid"], - is_retry=False, - device_id=self.device_id, - ) - - if self.validate_response(url, response): - self.login_url = url - return response.json() - - _LOGGER.error("Failed to login to Blink servers. Last response: %s", response) - return False - - def send_auth_key(self, blink, key): - """Send 2FA key to blink servers.""" - if key is not None: - response = api.request_verify(blink, key) - try: - json_resp = response.json() - blink.available = json_resp["valid"] - except (KeyError, TypeError): - _LOGGER.error("Did not receive valid response from server.") - return False - return True - - def check_key_required(self, blink): - """Check if 2FA key is required.""" - try: - if blink.login_response["client"]["verification_required"]: - return True - except KeyError: - pass - return False diff --git a/blinkpy/sync_module.py b/blinkpy/sync_module.py index def98104..4d5fe9b7 100644 --- a/blinkpy/sync_module.py +++ b/blinkpy/sync_module.py @@ -21,13 +21,11 @@ def __init__(self, blink, network_name, network_id, camera_list): :param blink: Blink class instantiation """ self.blink = blink - self._auth_header = blink.auth_header self.network_id = network_id - self.region = blink.region - self.region_id = blink.region_id + self.region_id = blink.auth.region_id self.name = network_name self.serial = None - self.status = None + self.status = "offline" self.sync_id = None self.host = None self.summary = None @@ -49,7 +47,6 @@ def attributes(self): "network_id": self.network_id, "serial": self.serial, "status": self.status, - "region": self.region, "region_id": self.region_id, } return attr @@ -62,7 +59,12 @@ def urls(self): @property def online(self): """Return boolean system online status.""" - return ONLINE[self.status] + try: + return ONLINE[self.status] + except KeyError: + _LOGGER.error("Unknown sync module status %s", self.status) + self.available = False + return False @property def arm(self): @@ -70,6 +72,7 @@ def arm(self): try: return self.network_info["network"]["armed"] except (KeyError, TypeError): + self.available = False return None @arm.setter @@ -103,7 +106,7 @@ def start(self): "Could not extract some sync module info: %s", response, exc_info=True ) - self.get_network_info() + is_ok = self.get_network_info() self.check_new_videos() try: for camera_config in self.camera_list: @@ -120,6 +123,8 @@ def start(self): ) return False + if not is_ok: + return False self.available = True return True @@ -153,15 +158,19 @@ def get_network_info(self): if is_errored: self.available = False + return False + return True def refresh(self, force_cache=False): """Get all blink cameras and pulls their most recent status.""" - self.get_network_info() + if not self.get_network_info(): + return self.check_new_videos() for camera_name in self.cameras.keys(): camera_id = self.cameras[camera_name].camera_id camera_info = self.get_camera_info(camera_id) self.cameras[camera_name].update(camera_info, force_cache=force_cache) + self.available = True def check_new_videos(self): """Check if new videos since last refresh.""" diff --git a/tests/mock_responses.py b/tests/mock_responses.py index ea837765..3981926e 100644 --- a/tests/mock_responses.py +++ b/tests/mock_responses.py @@ -1,16 +1,5 @@ """Simple mock responses definitions.""" -from blinkpy.helpers.util import BlinkURLHandler -import blinkpy.helpers.constants as const - -LOGIN_RESPONSE = { - "region": {"mock": "Test"}, - "networks": {"1234": {"name": "test", "onboarded": True}}, - "authtoken": {"authtoken": "foobar123", "message": "auth"}, - "client": {"id": "5678"}, - "account": {"id": "1337"}, -} - class MockResponse: """Class for mock request response.""" @@ -29,41 +18,3 @@ def json(self): def raw(self): """Return raw data from get request.""" return self.raw_data - - -def mocked_session_send(*args, **kwargs): - """Mock session.""" - prepped = args[0] - url = prepped.url - header = prepped.headers - method = prepped.method - if method == "GET": - expected_token = LOGIN_RESPONSE["authtoken"]["authtoken"] - if header["TOKEN_AUTH"] != expected_token: - response = {"message": "Not Authorized", "code": 400} - status = 400 - elif url == "use_bad_response": - response = {"foo": "bar"} - status = 200 - elif url == "reauth": - response = {"message": "REAUTH", "code": 777} - status = 777 - else: - response = {"test": "foo"} - status = 200 - elif method == "POST": - if url in const.LOGIN_URLS: - response = LOGIN_RESPONSE - status = 200 - elif url == "http://wrong.url/" or url is None: - response = {"message": "Error", "code": 404} - status = 404 - else: - response = {"message": "foo", "code": 200} - status = 200 - - return MockResponse(response, status) - - -class MockURLHandler(BlinkURLHandler): - """Mocks URL Handler in blinkpy module.""" diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 00000000..88f9d7de --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,214 @@ +"""Test login handler.""" + +import unittest +from unittest import mock +from requests import exceptions +from blinkpy.auth import Auth, LoginError, TokenRefreshFailed +import blinkpy.helpers.constants as const +import tests.mock_responses as mresp + +USERNAME = "foobar" +PASSWORD = "deadbeef" + + +class TestAuth(unittest.TestCase): + """Test the Auth class in blinkpy.""" + + def setUp(self): + """Set up Login Handler.""" + self.auth = Auth() + + def tearDown(self): + """Clean up after test.""" + self.auth = None + + @mock.patch("blinkpy.helpers.util.gen_uid") + @mock.patch("blinkpy.auth.util.getpass") + def test_empty_init(self, getpwd, genuid): + """Test initialization with no params.""" + auth = Auth() + self.assertDictEqual(auth.data, {}) + getpwd.return_value = "bar" + genuid.return_value = 1234 + with mock.patch("builtins.input", return_value="foo"): + auth.validate_login() + expected_data = { + "username": "foo", + "password": "bar", + "uid": 1234, + "notification_key": 1234, + "device_id": const.DEVICE_ID, + } + self.assertDictEqual(auth.data, expected_data) + + @mock.patch("blinkpy.helpers.util.gen_uid") + @mock.patch("blinkpy.auth.util.getpass") + def test_barebones_init(self, getpwd, genuid): + """Test basebones initialization.""" + login_data = {"username": "foo", "password": "bar"} + auth = Auth(login_data) + self.assertDictEqual(auth.data, login_data) + getpwd.return_value = "bar" + genuid.return_value = 1234 + with mock.patch("builtins.input", return_value="foo"): + auth.validate_login() + expected_data = { + "username": "foo", + "password": "bar", + "uid": 1234, + "notification_key": 1234, + "device_id": const.DEVICE_ID, + } + self.assertDictEqual(auth.data, expected_data) + + def test_full_init(self): + """Test full initialization.""" + login_data = { + "username": "foo", + "password": "bar", + "token": "token", + "host": "host", + "region_id": "region_id", + "client_id": "client_id", + "account_id": "account_id", + "uid": 1234, + "notification_key": 4321, + "device_id": "device_id", + } + auth = Auth(login_data) + self.assertEqual(auth.token, "token") + self.assertEqual(auth.host, "host") + self.assertEqual(auth.region_id, "region_id") + self.assertEqual(auth.client_id, "client_id") + self.assertEqual(auth.account_id, "account_id") + auth.validate_login() + self.assertDictEqual(auth.login_attributes, login_data) + + def test_bad_response_code(self): + """Check bad response code from server.""" + fake_resp = mresp.MockResponse({"code": 404}, 404) + with self.assertRaises(exceptions.ConnectionError): + self.auth.validate_response(fake_resp, True) + + def test_good_response_code(self): + """Check good response code from server.""" + fake_resp = mresp.MockResponse({"foo": "bar"}, 200) + self.assertEqual(self.auth.validate_response(fake_resp, True), {"foo": "bar"}) + + def test_response_not_json(self): + """Check response when not json.""" + fake_resp = "foobar" + self.assertEqual(self.auth.validate_response(fake_resp, False), "foobar") + + def test_header(self): + """Test header data.""" + self.auth.token = "bar" + self.auth.host = "foo" + expected_header = {"Host": "foo", "TOKEN_AUTH": "bar"} + self.assertDictEqual(self.auth.header, expected_header) + + def test_header_no_token(self): + """Test header without token.""" + self.auth.token = None + self.assertEqual(self.auth.header, None) + + @mock.patch("blinkpy.auth.Auth.validate_login", return_value=None) + @mock.patch("blinkpy.auth.api.request_login") + def test_login(self, mock_req, mock_validate): + """Test login handling.""" + fake_resp = mresp.MockResponse({"foo": "bar"}, 200) + mock_req.return_value = fake_resp + self.assertEqual(self.auth.login(), {"foo": "bar"}) + + @mock.patch("blinkpy.auth.Auth.validate_login", return_value=None) + @mock.patch("blinkpy.auth.api.request_login") + def test_login_bad_response(self, mock_req, mock_validate): + """Test login handling when bad response.""" + fake_resp = mresp.MockResponse({"foo": "bar"}, 404) + mock_req.return_value = fake_resp + with self.assertRaises(LoginError): + self.auth.login() + + @mock.patch("blinkpy.auth.Auth.login") + def test_refresh_token(self, mock_login): + """Test refresh token method.""" + mock_login.return_value = { + "region": {"tier": "test"}, + "authtoken": {"authtoken": "foobar"}, + "client": {"id": 1234}, + "account": {"id": 5678}, + } + self.assertTrue(self.auth.refresh_token()) + self.assertEqual(self.auth.region_id, "test") + self.assertEqual(self.auth.token, "foobar") + self.assertEqual(self.auth.client_id, 1234) + self.assertEqual(self.auth.account_id, 5678) + + @mock.patch("blinkpy.auth.Auth.login") + def test_refresh_token_failed(self, mock_login): + """Test refresh token failed.""" + mock_login.return_value = {} + with self.assertRaises(TokenRefreshFailed): + self.auth.refresh_token() + + def test_check_key_required(self): + """Check key required method.""" + self.auth.login_response = {} + self.assertFalse(self.auth.check_key_required()) + + self.auth.login_response = {"client": {"verification_required": False}} + self.assertFalse(self.auth.check_key_required()) + + self.auth.login_response = {"client": {"verification_required": True}} + self.assertTrue(self.auth.check_key_required()) + + @mock.patch("blinkpy.auth.api.request_verify") + def test_send_auth_key(self, mock_req): + """Check sending of auth key.""" + mock_blink = MockBlink(None) + mock_req.return_value = mresp.MockResponse({"valid": True}, 200) + self.assertTrue(self.auth.send_auth_key(mock_blink, 1234)) + self.assertTrue(mock_blink.available) + + mock_req.return_value = mresp.MockResponse(None, 200) + self.assertFalse(self.auth.send_auth_key(mock_blink, 1234)) + + mock_req.return_value = mresp.MockResponse({}, 200) + self.assertFalse(self.auth.send_auth_key(mock_blink, 1234)) + + self.assertTrue(self.auth.send_auth_key(mock_blink, None)) + + @mock.patch("blinkpy.auth.api.request_verify") + def test_send_auth_key_fail(self, mock_req): + """Check handling of auth key failure.""" + mock_blink = MockBlink(None) + mock_req.return_value = mresp.MockResponse(None, 200) + self.assertFalse(self.auth.send_auth_key(mock_blink, 1234)) + mock_req.return_value = mresp.MockResponse({}, 200) + self.assertFalse(self.auth.send_auth_key(mock_blink, 1234)) + + @mock.patch("blinkpy.auth.Auth.validate_response") + @mock.patch("blinkpy.auth.Auth.refresh_token") + def test_query_retry(self, mock_refresh, mock_validate): + """Check handling of request retry.""" + self.auth.session = MockSession() + mock_validate.side_effect = [TokenRefreshFailed, "foobar"] + mock_refresh.return_value = True + self.assertEqual(self.auth.query(url="http://example.com"), "foobar") + + +class MockSession: + """Object to mock a session.""" + + def send(self, *args, **kwargs): + """Mock send function.""" + return None + + +class MockBlink: + """Object to mock basic blink class.""" + + def __init__(self, login_response): + """Initialize mock blink class.""" + self.available = False + self.login_response = login_response diff --git a/tests/test_blink_functions.py b/tests/test_blink_functions.py index cd1c0562..89a45dc6 100644 --- a/tests/test_blink_functions.py +++ b/tests/test_blink_functions.py @@ -5,11 +5,7 @@ from blinkpy import blinkpy from blinkpy.sync_module import BlinkSyncModule -from blinkpy.helpers.util import create_session, get_time -import tests.mock_responses as mresp - -USERNAME = "foobar" -PASSWORD = "deadbeef" +from blinkpy.helpers.util import get_time, BlinkURLHandler class MockSyncModule(BlinkSyncModule): @@ -34,42 +30,19 @@ def http_post(self, url): return self.return_value -@mock.patch("blinkpy.helpers.util.Session.send", side_effect=mresp.mocked_session_send) class TestBlinkFunctions(unittest.TestCase): """Test Blink and BlinkCamera functions in blinkpy.""" def setUp(self): """Set up Blink module.""" - self.blink = blinkpy.Blink(username=USERNAME, password=PASSWORD) - # pylint: disable=protected-access - self.blink._auth_header = {"Host": "test.url.tld", "TOKEN_AUTH": "foobar123"} - self.blink.urls = blinkpy.BlinkURLHandler("test") - self.blink.session = create_session() + self.blink = blinkpy.Blink() + self.blink.urls = BlinkURLHandler("test") def tearDown(self): """Clean up after test.""" self.blink = None - @mock.patch("blinkpy.login_handler.api.request_login") - def test_backup_url(self, req, mock_sess): - """Test backup login method.""" - json_resp = { - "authtoken": {"authtoken": "foobar123"}, - "networks": {"1234": {"name": "foobar", "onboarded": True}}, - } - bad_req = mresp.MockResponse({}, 404) - new_req = mresp.MockResponse(json_resp, 200) - req.side_effect = [bad_req, bad_req, new_req] - self.blink.login_handler.login_urls = ["test1", "test2", "test3"] - self.blink.login_handler.login(self.blink) - self.assertEqual(self.blink.login_handler.login_url, "test3") - - req.side_effect = [bad_req, new_req, bad_req] - self.blink.login_handler.login_urls = ["test1", "test2", "test3"] - self.blink.login_handler.login(self.blink) - self.assertEqual(self.blink.login_handler.login_url, "test2") - - def test_merge_cameras(self, mock_sess): + def test_merge_cameras(self): """Test merge camera functionality.""" first_dict = {"foo": "bar", "test": 123} next_dict = {"foobar": 456, "bar": "foo"} @@ -82,7 +55,7 @@ def test_merge_cameras(self, mock_sess): self.assertEqual(expected, result) @mock.patch("blinkpy.blinkpy.api.request_videos") - def test_download_video_exit(self, mock_req, mock_sess): + def test_download_video_exit(self, mock_req): """Test we exit method when provided bad response.""" blink = blinkpy.Blink() # pylint: disable=protected-access @@ -100,7 +73,7 @@ def test_download_video_exit(self, mock_req, mock_sess): self.assertEqual(dl_log.output, expected_log) @mock.patch("blinkpy.blinkpy.api.request_videos") - def test_parse_downloaded_items(self, mock_req, mock_sess): + def test_parse_downloaded_items(self, mock_req): """Test ability to parse downloaded items list.""" blink = blinkpy.Blink() # pylint: disable=protected-access @@ -125,7 +98,7 @@ def test_parse_downloaded_items(self, mock_req, mock_sess): self.assertEqual(dl_log.output, expected_log) @mock.patch("blinkpy.blinkpy.api.request_videos") - def test_parse_camera_not_in_list(self, mock_req, mock_sess): + def test_parse_camera_not_in_list(self, mock_req): """Test ability to parse downloaded items list.""" blink = blinkpy.Blink() # pylint: disable=protected-access diff --git a/tests/test_blinkpy.py b/tests/test_blinkpy.py index cfda5e74..2d980ff8 100644 --- a/tests/test_blinkpy.py +++ b/tests/test_blinkpy.py @@ -8,96 +8,57 @@ import unittest from unittest import mock -from blinkpy import api -from blinkpy.blinkpy import Blink -from blinkpy.sync_module import BlinkSyncModule -from blinkpy.login_handler import LoginHandler -from blinkpy.helpers.util import ( - http_req, - create_session, - BlinkException, - BlinkURLHandler, -) +from blinkpy.blinkpy import Blink, BlinkSetupError from blinkpy.helpers.constants import __version__ -import tests.mock_responses as mresp -USERNAME = "foobar" -PASSWORD = "deadbeef" - -@mock.patch("blinkpy.helpers.util.Session.send", side_effect=mresp.mocked_session_send) class TestBlinkSetup(unittest.TestCase): """Test the Blink class in blinkpy.""" def setUp(self): - """Set up Blink module.""" - self.blink = Blink(username=USERNAME, password=PASSWORD) - self.blink.sync["test"] = BlinkSyncModule(self.blink, "test", "1234", []) - self.blink.urls = BlinkURLHandler("test") - self.blink.session = create_session() + """Initialize blink test object.""" + self.blink = Blink() + self.blink.available = True def tearDown(self): - """Clean up after test.""" + """Cleanup blink test object.""" self.blink = None - def test_initialization(self, mock_sess): + def test_initialization(self): """Verify we can initialize blink.""" - self.assertEqual(self.blink.version, __version__) - self.assertEqual(self.blink.login_handler.data["username"], USERNAME) - self.assertEqual(self.blink.login_handler.data["password"], PASSWORD) - - def test_bad_request(self, mock_sess): - """Check that we raise an Exception with a bad request.""" - self.blink.session = create_session() - explog = "WARNING:blinkpy.helpers.util:" "Response from server: 200 - foo" - with self.assertRaises(BlinkException): - http_req(self.blink, reqtype="bad") - - with self.assertLogs() as logrecord: - http_req(self.blink, reqtype="post", is_retry=True) - self.assertEqual(logrecord.output, [explog]) - - def test_authentication(self, mock_sess): - """Check that we can authenticate Blink up properly.""" - authtoken = self.blink.get_auth_token()["TOKEN_AUTH"] - expected = mresp.LOGIN_RESPONSE["authtoken"]["authtoken"] - self.assertEqual(authtoken, expected) - - def test_reauthorization_attempt(self, mock_sess): - """Check that we can reauthorize after first unsuccessful attempt.""" - original_header = self.blink.get_auth_token() - # pylint: disable=protected-access - bad_header = {"Host": self.blink._host, "TOKEN_AUTH": "BADTOKEN"} - # pylint: disable=protected-access - self.blink._auth_header = bad_header - self.assertEqual(self.blink.auth_header, bad_header) - api.request_homescreen(self.blink) - self.assertEqual(self.blink.auth_header, original_header) - - def test_multiple_networks(self, mock_sess): + blink = Blink() + self.assertEqual(blink.version, __version__) + + def test_network_id_failure(self): + """Check that with bad network data a setup error is raised.""" + self.blink.networks = None + with self.assertRaises(BlinkSetupError): + self.blink.setup_network_ids() + + def test_multiple_networks(self): """Check that we handle multiple networks appropriately.""" self.blink.networks = { "0000": {"onboarded": False, "name": "foo"}, "5678": {"onboarded": True, "name": "bar"}, "1234": {"onboarded": False, "name": "test"}, } - self.blink.get_ids() + self.blink.setup_network_ids() self.assertTrue("5678" in self.blink.network_ids) - def test_multiple_onboarded_networks(self, mock_sess): + def test_multiple_onboarded_networks(self): """Check that we handle multiple networks appropriately.""" self.blink.networks = { "0000": {"onboarded": False, "name": "foo"}, "5678": {"onboarded": True, "name": "bar"}, "1234": {"onboarded": True, "name": "test"}, } - self.blink.get_ids() + self.blink.setup_network_ids() self.assertTrue("0000" not in self.blink.network_ids) self.assertTrue("5678" in self.blink.network_ids) self.assertTrue("1234" in self.blink.network_ids) @mock.patch("blinkpy.blinkpy.time.time") - def test_throttle(self, mock_time, mock_sess): + def test_throttle(self, mock_time): """Check throttling functionality.""" now = self.blink.refresh_rate + 1 mock_time.return_value = now @@ -113,19 +74,15 @@ def test_throttle(self, mock_time, mock_sess): self.assertEqual(self.blink.check_if_ok_to_update(), False) self.assertEqual(self.blink.last_refresh, now) - def test_sync_case_insensitive_dict(self, mock_sess): + def test_sync_case_insensitive_dict(self): """Check that we can access sync modules ignoring case.""" - self.assertEqual(self.blink.sync["test"].name, "test") - self.assertEqual(self.blink.sync["TEST"].name, "test") - - @mock.patch("blinkpy.api.request_login") - def test_unexpected_login(self, mock_login, mock_sess): - """Check that we appropriately handle unexpected login info.""" - mock_login.return_value = None - self.assertFalse(self.blink.get_auth_token()) + self.blink.sync["test"] = 1234 + self.assertEqual(self.blink.sync["test"], 1234) + self.assertEqual(self.blink.sync["TEST"], 1234) + self.assertEqual(self.blink.sync["tEsT"], 1234) @mock.patch("blinkpy.api.request_homescreen") - def test_get_cameras(self, mock_home, mock_sess): + def test_setup_cameras(self, mock_home): """Check retrieval of camera information.""" mock_home.return_value = { "cameras": [ @@ -134,7 +91,7 @@ def test_get_cameras(self, mock_home, mock_sess): {"name": "test", "network_id": 4321, "id": 0000}, ] } - result = self.blink.get_cameras() + result = self.blink.setup_camera_list() self.assertEqual( result, { @@ -144,28 +101,90 @@ def test_get_cameras(self, mock_home, mock_sess): ) @mock.patch("blinkpy.api.request_homescreen") - def test_get_cameras_failure(self, mock_home, mock_sess): - """Check that on failure we initialize empty info and move on.""" + def test_setup_cameras_failure(self, mock_home): + """Check that on failure we raise a setup error.""" mock_home.return_value = {} - result = self.blink.get_cameras() - self.assertEqual(result, {}) - - @mock.patch.object(LoginHandler, "send_auth_key") - @mock.patch.object(Blink, "setup_post_verify") - def test_startup_prompt(self, mock_send_key, mock_verify, mock_sess): - """Test startup logic with command-line prompt.""" - mock_send_key.return_value = True - mock_verify.return_value = True - self.blink.no_prompt = False + with self.assertRaises(BlinkSetupError): + self.blink.setup_camera_list() + + def test_setup_urls(self): + """Check setup of URLS.""" + self.blink.auth.region_id = "test" + self.blink.setup_urls() + self.assertEqual(self.blink.urls.subdomain, "rest-test") + + def test_setup_urls_failure(self): + """Check that on failure we raise a setup error.""" + self.blink.auth.region_id = None + with self.assertRaises(BlinkSetupError): + self.blink.setup_urls() + + @mock.patch("blinkpy.api.request_networks") + def test_setup_networks(self, mock_networks): + """Check setup of networks.""" + mock_networks.return_value = {"summary": "foobar"} + self.blink.setup_networks() + self.assertEqual(self.blink.networks, "foobar") + + @mock.patch("blinkpy.api.request_networks") + def test_setup_networks_failure(self, mock_networks): + """Check that on failure we raise a setup error.""" + mock_networks.return_value = {} + with self.assertRaises(BlinkSetupError): + self.blink.setup_networks() + + @mock.patch("blinkpy.blinkpy.Auth.send_auth_key") + def test_setup_prompt_2fa(self, mock_key): + """Test setup with 2fa prompt.""" + self.blink.auth.data["username"] = "foobar" self.blink.key_required = True - self.blink.available = True - with mock.patch("builtins.input", return_value="1234"): - self.blink.start() + mock_key.return_value = True + with mock.patch("builtins.input", return_value="foo"): + self.blink.setup_prompt_2fa() self.assertFalse(self.blink.key_required) + mock_key.return_value = False + with mock.patch("builtins.input", return_value="foo"): + self.blink.setup_prompt_2fa() + self.assertTrue(self.blink.key_required) - def test_startup_no_prompt(self, mock_sess): - """Test startup with no_prompt flag set.""" + @mock.patch("blinkpy.blinkpy.Blink.setup_camera_list") + @mock.patch("blinkpy.api.request_networks") + def test_setup_post_verify(self, mock_networks, mock_camera): + """Test setup after verification.""" + self.blink.available = False self.blink.key_required = True - self.blink.no_prompt = True - self.blink.start() - self.assertTrue(self.blink.key_required) + mock_networks.return_value = { + "summary": {"foo": {"onboarded": False, "name": "bar"}} + } + mock_camera.return_value = [] + self.assertTrue(self.blink.setup_post_verify()) + self.assertTrue(self.blink.available) + self.assertFalse(self.blink.key_required) + + @mock.patch("blinkpy.api.request_networks") + def test_setup_post_verify_failure(self, mock_networks): + """Test failed setup after verification.""" + self.blink.available = False + mock_networks.return_value = {} + self.assertFalse(self.blink.setup_post_verify()) + self.assertFalse(self.blink.available) + + def test_merge_cameras(self): + """Test merging of cameras.""" + self.blink.sync = { + "foo": MockSync({"test": 123, "foo": "bar"}), + "bar": MockSync({"fizz": "buzz", "bar": "foo"}), + } + combined = self.blink.merge_cameras() + self.assertEqual(combined["test"], 123) + self.assertEqual(combined["foo"], "bar") + self.assertEqual(combined["fizz"], "buzz") + self.assertEqual(combined["bar"], "foo") + + +class MockSync: + """Mock sync module class.""" + + def __init__(self, cameras): + """Initialize fake class.""" + self.cameras = cameras diff --git a/tests/test_cameras.py b/tests/test_cameras.py index ca2fbbf2..01064bfa 100644 --- a/tests/test_cameras.py +++ b/tests/test_cameras.py @@ -8,14 +8,11 @@ import unittest from unittest import mock -from blinkpy import blinkpy -from blinkpy.helpers.util import create_session, BlinkURLHandler +from blinkpy.blinkpy import Blink +from blinkpy.helpers.util import BlinkURLHandler from blinkpy.sync_module import BlinkSyncModule from blinkpy.camera import BlinkCamera -import tests.mock_responses as mresp -USERNAME = "foobar" -PASSWORD = "deadbeef" CAMERA_CFG = { "camera": [ @@ -29,20 +26,13 @@ } -@mock.patch("blinkpy.helpers.util.Session.send", side_effect=mresp.mocked_session_send) +@mock.patch("blinkpy.auth.Auth.query") class TestBlinkCameraSetup(unittest.TestCase): """Test the Blink class in blinkpy.""" def setUp(self): """Set up Blink module.""" - self.blink = blinkpy.Blink(username=USERNAME, password=PASSWORD) - header = { - "Host": "abc.zxc", - "TOKEN_AUTH": mresp.LOGIN_RESPONSE["authtoken"]["authtoken"], - } - # pylint: disable=protected-access - self.blink._auth_header = header - self.blink.session = create_session() + self.blink = Blink() self.blink.urls = BlinkURLHandler("test") self.blink.sync["test"] = BlinkSyncModule(self.blink, "test", 1234, []) self.camera = BlinkCamera(self.blink.sync["test"]) @@ -52,8 +42,9 @@ def setUp(self): def tearDown(self): """Clean up after test.""" self.blink = None + self.camera = None - def test_camera_update(self, mock_sess): + def test_camera_update(self, mock_resp): """Test that we can properly update camera properties.""" config = { "name": "new", @@ -71,8 +62,8 @@ def test_camera_update(self, mock_sess): self.camera.sync.last_record = { "new": {"clip": "/test.mp4", "time": "1970-01-01T00:00:00"} } - mock_sess.side_effect = [ - mresp.MockResponse({"temp": 71}, 200), + mock_resp.side_effect = [ + {"temp": 71}, "test", "foobar", ] @@ -96,9 +87,9 @@ def test_camera_update(self, mock_sess): self.assertEqual(self.camera.image_from_cache, "test") self.assertEqual(self.camera.video_from_cache, "foobar") - def test_no_thumbnails(self, mock_sess): + def test_no_thumbnails(self, mock_resp): """Tests that thumbnail is 'None' if none found.""" - mock_sess.return_value = "foobar" + mock_resp.return_value = "foobar" self.camera.last_record = ["1"] config = { "name": "new", @@ -133,9 +124,9 @@ def test_no_thumbnails(self, mock_sess): ], ) - def test_no_video_clips(self, mock_sess): + def test_no_video_clips(self, mock_resp): """Tests that we still proceed with camera setup with no videos.""" - mock_sess.return_value = "foobar" + mock_resp.return_value = "foobar" config = { "name": "new", "id": 1234, @@ -152,3 +143,12 @@ def test_no_video_clips(self, mock_sess): self.camera.update(config, force_cache=True) self.assertEqual(self.camera.clip, None) self.assertEqual(self.camera.video_from_cache, None) + + @mock.patch("blinkpy.camera.api.request_motion_detection_enable") + @mock.patch("blinkpy.camera.api.request_motion_detection_disable") + def test_motion_detection_enable_disable(self, mock_dis, mock_en, mock_rep): + """Test setting motion detection enable properly.""" + mock_dis.return_value = "disable" + mock_en.return_value = "enable" + self.assertEqual(self.camera.set_motion_detect(True), "enable") + self.assertEqual(self.camera.set_motion_detect(False), "disable") diff --git a/tests/test_login_handler.py b/tests/test_login_handler.py deleted file mode 100644 index 11bfc16b..00000000 --- a/tests/test_login_handler.py +++ /dev/null @@ -1,135 +0,0 @@ -"""Test login handler.""" - -import unittest -from unittest import mock -from blinkpy.login_handler import LoginHandler -import tests.mock_responses as mresp - -USERNAME = "foobar" -PASSWORD = "deadbeef" - - -@mock.patch("blinkpy.helpers.util.Session.send", side_effect=mresp.mocked_session_send) -class TestLoginHandler(unittest.TestCase): - """Test the LoginHandler class in blinkpy.""" - - def setUp(self): - """Set up Login Handler.""" - self.login_handler = LoginHandler() - - def tearDown(self): - """Clean up after test.""" - self.login_handler = None - - @mock.patch("blinkpy.login_handler.getpass") - def test_manual_login(self, getpwd, mock_sess): - """Check that we can manually use the login() function.""" - getpwd.return_value = PASSWORD - with mock.patch("builtins.input", return_value=USERNAME): - self.assertTrue(self.login_handler.check_login()) - self.assertEqual(self.login_handler.data["username"], USERNAME) - self.assertEqual(self.login_handler.data["password"], PASSWORD) - - def test_no_cred_file(self, mock_sess): - """Check that we return false when cred file doesn't exist.""" - self.login_handler.cred_file = "/tmp/fake.file" - self.assertFalse(self.login_handler.check_cred_file()) - - @mock.patch("blinkpy.login_handler.isfile") - def test_exit_on_missing_json(self, mockisfile, mock_sess): - """Test that we fail on missing json data.""" - self.login_handler.cred_file = "/tmp/fake.file" - mockisfile.return_value = True - with mock.patch("builtins.open", mock.mock_open(read_data="{}")): - self.assertFalse(self.login_handler.check_cred_file()) - - @mock.patch("blinkpy.login_handler.json.load") - @mock.patch("blinkpy.login_handler.isfile") - def test_cred_file(self, mockisfile, mockjson, mock_sess): - """Test that loading credential file works.""" - self.login_handler.cred_file = "/tmp/fake.file" - mockjson.return_value = {"username": "foo", "password": "bar"} - mockisfile.return_value = True - with mock.patch("builtins.open", mock.mock_open(read_data="")): - self.assertTrue(self.login_handler.check_cred_file()) - self.assertEqual(self.login_handler.data["username"], "foo") - self.assertEqual(self.login_handler.data["password"], "bar") - - def test_bad_response(self, mock_sess): - """Check bad response from server.""" - self.assertFalse(self.login_handler.validate_response(None, None)) - - def test_bad_response_code(self, mock_sess): - """Check bad response code from server.""" - fake_resp = mresp.MockResponse(None, 404) - self.assertFalse(self.login_handler.validate_response(None, fake_resp)) - - def test_good_response_code(self, mock_sess): - """Check good response code from server.""" - fake_resp = mresp.MockResponse(None, 200) - self.assertTrue(self.login_handler.validate_response(None, fake_resp)) - - @mock.patch("blinkpy.login_handler.util.gen_uid") - def test_check_keys_no_persist(self, mock_uid, mock_sess): - """Check key generation.""" - uid_value = "abc123" - mock_uid.return_value = "abc123" - self.login_handler.persist_key = None - data = self.login_handler.check_keys() - self.assertEqual(data["uid"], uid_value) - self.assertEqual(data["notification_key"], uid_value) - - @mock.patch("blinkpy.login_handler.util.gen_uid") - @mock.patch("blinkpy.login_handler.json.load") - @mock.patch("blinkpy.login_handler.isfile") - def test_check_keys_persist(self, mockisfile, mockjson, mock_uid, mock_sess): - """Check key load from file.""" - uid_value = "abc123" - mock_file = {"uid": "321cba", "notification_key": "foobar123"} - mock_uid.return_value = uid_value - mockjson.return_value = mock_file - mockisfile.return_value = True - self.login_handler.persist_key = True - data = self.login_handler.check_keys() - self.assertEqual(mock_file["uid"], data["uid"]) - self.assertEqual(mock_file["notification_key"], data["notification_key"]) - - def test_check_key_required(self, mock_sess): - """Check key required method.""" - response_true = {"client": {"verification_required": True}} - response_false = {"client": {"verification_required": False}} - response_nokey = {} - - mock_blink = MockBlink(response_nokey) - self.assertFalse(self.login_handler.check_key_required(mock_blink)) - - mock_blink = MockBlink(response_false) - self.assertFalse(self.login_handler.check_key_required(mock_blink)) - - mock_blink = MockBlink(response_true) - self.assertTrue(self.login_handler.check_key_required(mock_blink)) - - @mock.patch("blinkpy.login_handler.api.request_verify") - def test_send_auth_key(self, mock_req, mock_sess): - """Check sending of auth key.""" - mock_blink = MockBlink(None) - mock_req.return_value = mresp.MockResponse({"valid": True}, 200) - self.assertTrue(self.login_handler.send_auth_key(mock_blink, 1234)) - self.assertTrue(mock_blink.available) - - mock_req.return_value = mresp.MockResponse(None, 200) - self.assertFalse(self.login_handler.send_auth_key(mock_blink, 1234)) - - mock_req.return_value = mresp.MockResponse({}, 200) - self.assertFalse(self.login_handler.send_auth_key(mock_blink, 1234)) - - self.assertTrue(self.login_handler.send_auth_key(mock_blink, None)) - - -class MockBlink: - """Object to mock basic blink class.""" - - def __init__(self, login_response): - """Initialize mock blink class.""" - self.available = False - self.login_response = login_response diff --git a/tests/test_sync_module.py b/tests/test_sync_module.py index 5a8912f8..6dbc04e4 100644 --- a/tests/test_sync_module.py +++ b/tests/test_sync_module.py @@ -2,27 +2,21 @@ import unittest from unittest import mock -from blinkpy import blinkpy +from blinkpy.blinkpy import Blink +from blinkpy.helpers.util import BlinkURLHandler from blinkpy.sync_module import BlinkSyncModule from blinkpy.camera import BlinkCamera -USERNAME = "foobar" -PASSWORD = "deadbeef" - -@mock.patch("blinkpy.api.http_req") +@mock.patch("blinkpy.auth.Auth.query") class TestBlinkSyncModule(unittest.TestCase): """Test BlinkSyncModule functions in blinkpy.""" def setUp(self): """Set up Blink module.""" - self.blink = blinkpy.Blink( - username=USERNAME, password=PASSWORD, motion_interval=0 - ) - # pylint: disable=protected-access - self.blink._auth_header = {"Host": "test.url.tld", "TOKEN_AUTH": "foobar123"} + self.blink = Blink(motion_interval=0) self.blink.last_refresh = 0 - self.blink.urls = blinkpy.BlinkURLHandler("test") + self.blink.urls = BlinkURLHandler("test") self.blink.sync["test"] = BlinkSyncModule(self.blink, "test", "1234", []) self.camera = BlinkCamera(self.blink.sync) self.mock_start = [ @@ -48,16 +42,50 @@ def tearDown(self): self.camera = None self.mock_start = None + def test_bad_status(self, mock_resp): + """Check that we mark module unavaiable on bad status.""" + self.blink.sync["test"].status = None + self.blink.sync["test"].available = True + self.assertFalse(self.blink.sync["test"].online) + self.assertFalse(self.blink.sync["test"].available) + + def test_bad_arm(self, mock_resp): + """Check that we mark module unavaiable if bad arm status.""" + self.blink.sync["test"].network_info = None + self.blink.sync["test"].available = True + self.assertEqual(self.blink.sync["test"].arm, None) + self.assertFalse(self.blink.sync["test"].available) + self.blink.sync["test"].network_info = {} + self.blink.sync["test"].available = True + self.assertEqual(self.blink.sync["test"].arm, None) + self.assertFalse(self.blink.sync["test"].available) + def test_get_events(self, mock_resp): """Test get events function.""" mock_resp.return_value = {"event": True} self.assertEqual(self.blink.sync["test"].get_events(), True) + def test_get_events_fail(self, mock_resp): + """Test handling of failed get events function.""" + mock_resp.return_value = None + self.assertFalse(self.blink.sync["test"].get_events()) + mock_resp.return_value = {} + self.assertFalse(self.blink.sync["test"].get_events()) + def test_get_camera_info(self, mock_resp): """Test get camera info function.""" mock_resp.return_value = {"camera": ["foobar"]} self.assertEqual(self.blink.sync["test"].get_camera_info("1234"), "foobar") + def test_get_camera_info_fail(self, mock_resp): + """Test hadnling of failed get camera info function.""" + mock_resp.return_value = None + self.assertEqual(self.blink.sync["test"].get_camera_info("1"), []) + mock_resp.return_value = {} + self.assertEqual(self.blink.sync["test"].get_camera_info("1"), []) + mock_resp.return_value = {"camera": None} + self.assertEqual(self.blink.sync["test"].get_camera_info("1"), []) + def test_check_new_videos_startup(self, mock_resp): """Test that check_new_videos does not block startup.""" sync_module = self.blink.sync["test"] @@ -224,3 +252,8 @@ def test_missing_camera_info(self, mock_resp): self.mock_start[5] = {} self.blink.sync["test"].start() self.assertEqual(self.blink.sync["test"].cameras, {"foo": None}) + + def test_sync_attributes(self, mock_resp): + """Test sync attributes.""" + self.assertEqual(self.blink.sync["test"].attributes["name"], "test") + self.assertEqual(self.blink.sync["test"].attributes["network_id"], "1234") diff --git a/tests/test_util.py b/tests/test_util.py index f048c35b..e26c1126 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -3,7 +3,7 @@ import unittest from unittest import mock import time -from blinkpy.helpers.util import Throttle, BlinkURLHandler, time_to_seconds +from blinkpy.helpers.util import json_load, Throttle, time_to_seconds class TestUtil(unittest.TestCase): @@ -101,16 +101,15 @@ def test2(self): self.assertEqual(tester.test1(), None) self.assertEqual(tester.test2(), True) - def test_legacy_subdomains(self): - """Test that subdomain can be set to legacy mode.""" - urls = BlinkURLHandler("test") - self.assertEqual(urls.subdomain, "rest-test") - urls = BlinkURLHandler("test", legacy=True) - self.assertEqual(urls.subdomain, "rest.test") - def test_time_to_seconds(self): """Test time to seconds conversion.""" correct_time = "1970-01-01T00:00:05+00:00" wrong_time = "1/1/1970 00:00:03" self.assertEqual(time_to_seconds(correct_time), 5) self.assertFalse(time_to_seconds(wrong_time)) + + def test_json_load_bad_data(self): + """Check that bad file is handled.""" + self.assertEqual(json_load("fake.file"), None) + with mock.patch("builtins.open", mock.mock_open(read_data="")): + self.assertEqual(json_load("fake.file"), None)