-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
118 lines (97 loc) · 3.34 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import csv
import os
import shutil
import sys
from distutils.util import strtobool
from pathlib import Path
from zipfile import ZipFile
import boto3
from dotenv import load_dotenv
load_dotenv()
USE_AWS = strtobool(os.getenv("USE_AWS", "False"))
AWS_AD_IMAGES_BUCKET = os.getenv("AWS_AD_IMAGES_BUCKET")
AWS_DATA_BUCKET = os.getenv("AWS_DATA_BUCKET")
AWS_IMAGES_BUCKET = os.getenv("AWS_IMAGES_BUCKET")
AWS_BUCKETS = (AWS_AD_IMAGES_BUCKET, AWS_DATA_BUCKET, AWS_IMAGES_BUCKET)
def create_path(path_name):
"""
Create path_name path if it does not exist
"""
cwd = Path.cwd()
dirname = cwd / path_name
if not dirname.exists():
print(f"Creating {dirname}")
os.makedirs(dirname, exist_ok=True)
return dirname
def rename_xml_files():
"""Align xml filename with image file name"""
with open('datafolder/metadaten.csv') as csv_file:
csv_reader = csv.reader(csv_file)
# Skip the first line
next(csv_reader)
for row in csv_reader:
name = Path(row[5])
target = 'datafolder/xml' / name.with_suffix('.xml')
source = Path(f'datafolder/xml/{row[6]}')
# XXX Add logging here!
shutil.move(source, target)
def zip_output():
"""Put result of the pipeline into a zip file"""
# zip file name
file = "output"
directory = "output"
# zipping the directory
shutil.make_archive(file, "zip", directory)
# print("Contents of the zip file:")
# with ZipFile(f"{file}.zip", 'r') as zip:
# zip.printdir()
def get_existing_buckets(s3):
"""Getting existing buckets from s3"""
response = s3.list_buckets()
existing_buckets = [bucket["Name"] for bucket in response["Buckets"]]
return existing_buckets
def create_s3_bucket(s3, bucket_name):
"""Create a bucket on s3"""
location = {"LocationConstraint": "eu-central-1"}
s3.create_bucket(Bucket=bucket_name, CreateBucketConfiguration=location)
def check_s3():
"""Check if all buckets exist on s3 or create them"""
s3 = boto3.client("s3")
existing_buckets = get_existing_buckets(s3)
for bucket_name in AWS_BUCKETS:
if bucket_name not in existing_buckets:
create_s3_bucket(s3, bucket_name)
def delete_objects_in_bucket(s3bucket):
"""Deletes all the objects in a s3 bucket"""
for s3_object in s3bucket.objects.all():
s3_object.delete()
def delete_s3():
"""Deleting s3 buckets and content"""
for bucket_name in AWS_BUCKETS:
s3bucket = get_s3_bucket(bucket_name)
delete_objects_in_bucket(s3bucket)
s3bucket.delete()
def list_s3():
"""Show content for configured s3 buckets"""
for bucket_name in AWS_BUCKETS:
s3bucket = get_s3_bucket(bucket_name)
print(f"Bucket: {bucket_name}")
for obj in s3bucket.objects.all():
print(f"-- {obj.key}")
def get_s3_bucket(bucket_name):
"""Connect to a s3 bucket for further operations"""
s3 = boto3.resource("s3")
return s3.Bucket(bucket_name)
if __name__ == "__main__":
if len(sys.argv) == 2:
param = sys.argv[1]
if param == "rename_xml_files":
rename_xml_files()
if param == "zip_output":
zip_output()
if USE_AWS and param == "check_s3":
check_s3()
if USE_AWS and param == "list_s3":
list_s3()
if USE_AWS and param == "delete_s3":
delete_s3()