Skip to content

Commit

Permalink
change(get): Change get.py to match idf_tools.py more closely
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasssvaz committed Jan 14, 2025
1 parent 1a8400f commit 3f4504b
Showing 1 changed file with 153 additions and 91 deletions.
244 changes: 153 additions & 91 deletions tools/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,74 @@
import re
import time
import argparse
import ssl
import contextlib

from urllib.request import urlretrieve
from urllib.request import urlopen
from urllib.response import addinfourl
from typing import IO, Any, Callable, Dict, Iterator, List, Optional, Set, Tuple, Union
from ssl import SSLContext

unicode = lambda s: str(s) # noqa: E731

# the older "DigiCert Global Root CA" certificate used with github.com
DIGICERT_ROOT_CA_CERT = """
-----BEGIN CERTIFICATE-----
MIIDrzCCApegAwIBAgIQCDvgVpBCRrGhdWrJWZHHSjANBgkqhkiG9w0BAQUFADBh
MQswCQYDVQQGEwJVUzEVMBMGA1UEChMMRGlnaUNlcnQgSW5jMRkwFwYDVQQLExB3
d3cuZGlnaWNlcnQuY29tMSAwHgYDVQQDExdEaWdpQ2VydCBHbG9iYWwgUm9vdCBD
QTAeFw0wNjExMTAwMDAwMDBaFw0zMTExMTAwMDAwMDBaMGExCzAJBgNVBAYTAlVT
MRUwEwYDVQQKEwxEaWdpQ2VydCBJbmMxGTAXBgNVBAsTEHd3dy5kaWdpY2VydC5j
b20xIDAeBgNVBAMTF0RpZ2lDZXJ0IEdsb2JhbCBSb290IENBMIIBIjANBgkqhkiG
9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4jvhEXLeqKTTo1eqUKKPC3eQyaKl7hLOllsB
CSDMAZOnTjC3U/dDxGkAV53ijSLdhwZAAIEJzs4bg7/fzTtxRuLWZscFs3YnFo97
nh6Vfe63SKMI2tavegw5BmV/Sl0fvBf4q77uKNd0f3p4mVmFaG5cIzJLv07A6Fpt
43C/dxC//AH2hdmoRBBYMql1GNXRor5H4idq9Joz+EkIYIvUX7Q6hL+hqkpMfT7P
T19sdl6gSzeRntwi5m3OFBqOasv+zbMUZBfHWymeMr/y7vrTC0LUq7dBMtoM1O/4
gdW7jVg/tRvoSSiicNoxBN33shbyTApOB6jtSj1etX+jkMOvJwIDAQABo2MwYTAO
BgNVHQ8BAf8EBAMCAYYwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4EFgQUA95QNVbR
TLtm8KPiGxvDl7I90VUwHwYDVR0jBBgwFoAUA95QNVbRTLtm8KPiGxvDl7I90VUw
DQYJKoZIhvcNAQEFBQADggEBAMucN6pIExIK+t1EnE9SsPTfrgT1eXkIoyQY/Esr
hMAtudXH/vTBH1jLuG2cenTnmCmrEbXjcKChzUyImZOMkXDiqw8cvpOp/2PV5Adg
06O/nVsJ8dWO41P0jmP6P6fbtGbfYmbW0W5BjfIttep3Sp+dWOIrWcBAI+0tKIJF
PnlUkiaY4IBIqDfv8NZ5YBberOgOzW6sRBc4L0na4UU+Krk2U886UAb3LujEV0ls
YSEY1QSteDwsOoBrp+uvFRTp2InBuThs4pFsiv9kuXclVzDAGySj4dzp30d8tbQk
CAUw7C29C79Fv1C5qfPrmAESrciIxpg0X40KPMbp1ZWVbd4=
-----END CERTIFICATE-----
"""

# Initialize start_time globally
start_time = -1

if sys.version_info[0] == 3:
from urllib.request import urlretrieve
from urllib.request import urlopen
# the newer "DigiCert Global Root G2" certificate used with dl.espressif.com
DIGICERT_ROOT_G2_CERT = """
-----BEGIN CERTIFICATE-----
MIIDjjCCAnagAwIBAgIQAzrx5qcRqaC7KGSxHQn65TANBgkqhkiG9w0BAQsFADBh
MQswCQYDVQQGEwJVUzEVMBMGA1UEChMMRGlnaUNlcnQgSW5jMRkwFwYDVQQLExB3
d3cuZGlnaWNlcnQuY29tMSAwHgYDVQQDExdEaWdpQ2VydCBHbG9iYWwgUm9vdCBH
MjAeFw0xMzA4MDExMjAwMDBaFw0zODAxMTUxMjAwMDBaMGExCzAJBgNVBAYTAlVT
MRUwEwYDVQQKEwxEaWdpQ2VydCBJbmMxGTAXBgNVBAsTEHd3dy5kaWdpY2VydC5j
b20xIDAeBgNVBAMTF0RpZ2lDZXJ0IEdsb2JhbCBSb290IEcyMIIBIjANBgkqhkiG
9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuzfNNNx7a8myaJCtSnX/RrohCgiN9RlUyfuI
2/Ou8jqJkTx65qsGGmvPrC3oXgkkRLpimn7Wo6h+4FR1IAWsULecYxpsMNzaHxmx
1x7e/dfgy5SDN67sH0NO3Xss0r0upS/kqbitOtSZpLYl6ZtrAGCSYP9PIUkY92eQ
q2EGnI/yuum06ZIya7XzV+hdG82MHauVBJVJ8zUtluNJbd134/tJS7SsVQepj5Wz
tCO7TG1F8PapspUwtP1MVYwnSlcUfIKdzXOS0xZKBgyMUNGPHgm+F6HmIcr9g+UQ
vIOlCsRnKPZzFBQ9RnbDhxSJITRNrw9FDKZJobq7nMWxM4MphQIDAQABo0IwQDAP
BgNVHRMBAf8EBTADAQH/MA4GA1UdDwEB/wQEAwIBhjAdBgNVHQ4EFgQUTiJUIBiV
5uNu5g/6+rkS7QYXjzkwDQYJKoZIhvcNAQELBQADggEBAGBnKJRvDkhj6zHd6mcY
1Yl9PMWLSn/pvtsrF9+wX3N3KjITOYFnQoQj8kVnNeyIv/iPsGEMNKSuIEyExtv4
NeF22d+mQrvHRAiGfzZ0JFrabA0UWTW98kndth/Jsw1HKj2ZL7tcu7XUIOGZX1NG
Fdtom/DzMNU+MeKNhJ7jitralj41E6Vf8PlwUHBHQRFXGU7Aj64GxJUTFy8bJZ91
8rGOmaFvE7FBcf6IKshPECBV1/MUReXgRPTqh5Uykw7+U0b6LJ3/iyK5S9kJRaTe
pLiaWN0bfVKfjllDiIGknibVb63dDcY3fe0Dkhvld1927jyNxF1WW6LZZm6zNTfl
MrY=
-----END CERTIFICATE-----
"""

unicode = lambda s: str(s) # noqa: E731
else:
# Not Python 3 - today, it is most likely to be Python 2
from urllib import urlretrieve
from urllib import urlopen
DL_CERT_DICT = {'dl.espressif.com': DIGICERT_ROOT_G2_CERT,
'github.com': DIGICERT_ROOT_CA_CERT}

if "Windows" in platform.system():
import requests
# Initialize start_time globally
start_time = -1

# determine if application is a script file or frozen exe
if getattr(sys, "frozen", False):
Expand Down Expand Up @@ -71,20 +123,18 @@ def format_time(seconds):
return "{:02}:{:05.2f}".format(int(minutes), seconds)


def report_progress(block_count, block_size, total_size, start_time):
def report_progress(block_count, block_size, total_size):
downloaded_size = block_count * block_size
time_elapsed = time.time() - start_time
current_speed = downloaded_size / (time_elapsed)

if sys.stdout.isatty():
if total_size > 0:
percent_complete = min((downloaded_size / total_size) * 100, 100)
sys.stdout.write(
f"\rDownloading... {percent_complete:.2f}% - {downloaded_size / 1024 / 1024:.2f} MB downloaded - Elapsed Time: {format_time(time_elapsed)} - Speed: {current_speed / 1024 / 1024:.2f} MB/s" # noqa: E501
f"\rDownloading... {percent_complete:.2f}% - {downloaded_size / 1024 / 1024:.2f} MB downloaded" # noqa: E501
)
else:
sys.stdout.write(
f"\rDownloading... {downloaded_size / 1024 / 1024:.2f} MB downloaded - Elapsed Time: {format_time(time_elapsed)} - Speed: {current_speed / 1024 / 1024:.2f} MB/s" # noqa: E501
f"\rDownloading... {downloaded_size / 1024 / 1024:.2f} MB downloaded" # noqa: E501
)
sys.stdout.flush()

Expand Down Expand Up @@ -243,18 +293,25 @@ def unpack(filename, destination, force_extract, checksum): # noqa: C901
if filename.endswith("tar.gz"):
if not cfile:
cfile = tarfile.open(filename, "r:gz")
cfile.extractall(destination, filter="tar")
cfile.extractall(destination, filter="fully_trusted")
elif filename.endswith("tar.xz"):
if not cfile:
cfile = tarfile.open(filename, "r:xz")
cfile.extractall(destination, filter="tar")
cfile.extractall(destination, filter="fully_trusted")
elif filename.endswith("zip"):
if not cfile:
cfile = zipfile.ZipFile(filename)
cfile.extractall(destination)
else:
raise NotImplementedError("Unsupported archive type")

if sys.platform != 'win32' and filename.endswith('zip') and isinstance(cfile, ZipFile):
for file_info in cfile.infolist():
extracted_file = os.path.join(destination, file_info.filename)
extracted_permissions = file_info.external_attr >> 16 & 0o777 # Extract Unix permissions
if os.path.exists(extracted_file):
os.chmod(extracted_file, extracted_permissions)

if rename_to != dirname:
print("Renaming {0} to {1} ...".format(dirname, rename_to))
shutil.move(dirname, rename_to)
Expand All @@ -275,55 +332,67 @@ def unpack(filename, destination, force_extract, checksum): # noqa: C901
return False


def download_file_with_progress(url, filename, start_time):
import ssl
import contextlib

ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
with contextlib.closing(urlopen(url, context=ctx)) as fp:
total_size = int(fp.getheader("Content-Length", fp.getheader("Content-length", "0")))
block_count = 0
block_size = 1024 * 8
block = fp.read(block_size)
if block:
with open(filename, "wb") as out_file:
out_file.write(block)
block_count += 1
report_progress(block_count, block_size, total_size, start_time)
while True:
block = fp.read(block_size)
if not block:
break
out_file.write(block)
block_count += 1
report_progress(block_count, block_size, total_size, start_time)
else:
raise Exception("Non-existing file or connection error")


def download_file(url, filename):
import ssl
import contextlib

ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
with contextlib.closing(urlopen(url, context=ctx)) as fp:
block_size = 1024 * 8
block = fp.read(block_size)
if block:
with open(filename, "wb") as out_file:
out_file.write(block)
while True:
block = fp.read(block_size)
if not block:
break
out_file.write(block)
else:
raise Exception("Non-existing file or connection error")

def splittype(url: str) -> Tuple[Optional[str], str]:
"""
Splits given url into its type (e.g. https, file) and the rest.
"""
match = re.match('([^/:]+):(.*)', url, re.DOTALL)
if match:
scheme, data = match.groups()
return scheme.lower(), data
return None, url

def urlretrieve_ctx(url: str,
filename: str,
reporthook: Optional[Callable[[int, int, int], None]]=None,
data: Optional[bytes]=None,
context: Optional[SSLContext]=None) -> Tuple[str, addinfourl]:
"""
Retrieve data from given URL. An alternative version of urlretrieve which takes SSL context as an argument.
"""
url_type, path = splittype(url)

# urlopen doesn't have context argument in Python <=2.7.9
extra_urlopen_args = {}
if context:
extra_urlopen_args['context'] = context
with contextlib.closing(urlopen(url, data, **extra_urlopen_args)) as fp: # type: ignore
headers = fp.info()

# Just return the local path and the "headers" for file://
# URLs. No sense in performing a copy unless requested.
if url_type == 'file' and not filename:
return os.path.normpath(path), headers

# Handle temporary file setup.
tfp = open(filename, 'wb')

with tfp:
result = filename, headers
bs = 1024 * 8
size = int(headers.get('content-length', -1))
read = 0
blocknum = 0

if reporthook:
reporthook(blocknum, bs, size)

while True:
block = fp.read(bs)
if not block:
break
read += len(block)
tfp.write(block)
blocknum += 1
if reporthook:
reporthook(blocknum, bs, size)

if size >= 0 and read < size:
raise ContentTooShortError(
'retrieval incomplete: got only %i out of %i bytes'
% (read, size), result)

return result

def get_tool(tool, force_download, force_extract):
sys_name = platform.system()
Expand All @@ -339,29 +408,22 @@ def get_tool(tool, force_download, force_extract):
else:
print("Downloading '" + archive_name + "' ...")
sys.stdout.flush()
if "CYGWIN_NT" in sys_name:
import ssl

ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
urlretrieve(url, local_path, report_progress, context=ctx)
elif "Windows" in sys_name:
r = requests.get(url)
f = open(local_path, "wb")
f.write(r.content)
f.close()
else:
is_ci = os.environ.get("GITHUB_WORKSPACE")
if is_ci:
download_file(url, local_path)

try:
for site, cert in DL_CERT_DICT.items():
if site in url:
ctx = ssl.create_default_context()
ctx.load_verify_locations(cadata=cert)
break
else:
try:
urlretrieve(url, local_path, report_progress)
except: # noqa: E722
download_file_with_progress(url, local_path, start_time)
sys.stdout.write(" - Done\n")
sys.stdout.flush()
ctx = None

urlretrieve_ctx(url, local_path, report_progress, context=ctx)
except Exception as e:
print(f"Failed to download {archive_name}: {e}")
return False
finally:
sys.stdout.flush()
else:
print("Tool {0} already downloaded".format(archive_name))
sys.stdout.flush()
Expand Down

0 comments on commit 3f4504b

Please sign in to comment.