From 770f9c909f2c2bdce852cefacf0477113d7766f0 Mon Sep 17 00:00:00 2001 From: Martin-Molinero Date: Mon, 7 Mar 2022 13:20:32 -0300 Subject: [PATCH] Stream downloads - Stream data downloads. Skip on tar error --- lean/components/api/api_client.py | 14 ++++++++++---- lean/components/api/data_client.py | 19 ++++++++++++++----- lean/components/cloud/data_downloader.py | 10 ++++------ 3 files changed, 28 insertions(+), 15 deletions(-) diff --git a/lean/components/api/api_client.py b/lean/components/api/api_client.py index 0eb42b62..c3a02ec8 100644 --- a/lean/components/api/api_client.py +++ b/lean/components/api/api_client.py @@ -123,12 +123,18 @@ def _request(self, method: str, endpoint: str, options: Dict[str, Any] = {}, ret timestamp = str(int(time())) password = sha256(f"{self._api_token}:{timestamp}".encode("utf-8")).hexdigest() + headers = { + "Timestamp": timestamp + } + + version = lean.__version__ + if lean.__version__ == 'dev': + version = 99999999 + headers["User-Agent"] = f"Lean CLI {version}" + response = self._http_client.request(method, full_url, - headers={ - "Timestamp": timestamp, - "User-Agent": f"Lean CLI {lean.__version__}" - }, + headers=headers, auth=(self._user_id, password), raise_for_status=False, **options) diff --git a/lean/components/api/data_client.py b/lean/components/api/data_client.py index 38655f5c..3f43eb23 100644 --- a/lean/components/api/data_client.py +++ b/lean/components/api/data_client.py @@ -13,6 +13,8 @@ from typing import List +from shutil import move +from tempfile import NamedTemporaryFile from lean.components.api.api_client import * from lean.models.api import QCDataInformation @@ -31,20 +33,27 @@ def __init__(self, api_client: 'APIClient', http_client: 'HTTPClient') -> None: self._api = api_client self._http_client = http_client - def download_file(self, file_path: str, organization_id: str) -> bytes: + def download_file(self, relative_file_path: str, organization_id: str, local_filename: str) -> None: """Downloads the content of a downloadable data file. - :param file_path: the path of the data file + :param relative_file_path: the relative path of the data file :param organization_id: the id of the organization that should be billed - :return: the content of the data file + :param local_filename: the final local path where the data file will be stored """ data = self._api.post("data/read", { "format": "link", - "filePath": file_path, + "filePath": relative_file_path, "organizationId": organization_id }) - return self._http_client.get(data["link"]).content + # we stream the data into a temporary file and later move it to it's final location + with self._http_client.get(data["link"], stream=True) as r: + r.raise_for_status() + with NamedTemporaryFile(delete=False) as f: + temp_file_name = f.name + for chunk in r.iter_content(chunk_size=8192): + f.write(chunk) + move(temp_file_name, local_filename) def download_public_file(self, data_endpoint: str) -> bytes: """Downloads the content of a downloadable public file. diff --git a/lean/components/cloud/data_downloader.py b/lean/components/cloud/data_downloader.py index 921f64ad..c2dac517 100644 --- a/lean/components/cloud/data_downloader.py +++ b/lean/components/cloud/data_downloader.py @@ -68,7 +68,7 @@ def update_database_files(self): if "not found" in str(e): pass else: - self._logger.error(str(e)) + self._logger.error(str(e)) except Exception as e: self._logger.error(str(e)) @@ -100,7 +100,7 @@ def download_files(self, data_files: List[Any], overwrite: bool, organization_id self._lean_config_manager.set_properties({ "factor-file-provider": "QuantConnect.Data.Auxiliary.LocalZipFactorFileProvider" }) - + progress.stop() except KeyboardInterrupt as e: progress.stop() @@ -108,6 +108,7 @@ def download_files(self, data_files: List[Any], overwrite: bool, organization_id def _process_bulk(self, file: Path, destination: Path): tar = tarfile.open(file) + tar.errorlevel = 0 tar.extractall(destination) tar.close() @@ -138,16 +139,13 @@ def _download_file(self, callback() return - try: - file_content = self._api_client.data.download_file(relative_file, organization_id) + self._api_client.data.download_file(relative_file, organization_id, local_path) except RequestFailedError as error: self._logger.warn(f"{local_path}: {error}\nYou have not been charged for this file") callback() return - _store_local_file(file_content, local_path) - # Special case: bulk files need unpacked if "setup/" in relative_file and relative_file.endswith(".tar"): self._process_bulk(local_path, data_directory)