1
1
import asyncio
2
2
import os
3
- from httpx import AsyncClient , Limits , ReadTimeout , Client
3
+ from typing import Optional , cast
4
+ from httpx import AsyncClient , Limits , ReadTimeout , Client , ConnectError , Response
4
5
import json
5
6
import pandas as pd
6
7
import random
18
19
from final_check import RENAME_DF , START_ID
19
20
20
21
21
- async def upload_task (files_path_chunk : list [str ], wait_seconds : int ) -> list [dict ]:
22
+ async def upload_task (
23
+ files_path_chunk : list [str ], wait_seconds : int
24
+ ) -> Optional [list [dict ]]:
22
25
"""
23
26
upload task for asyncio, a task process 10 files
24
27
@@ -39,10 +42,12 @@ async def upload_task(files_path_chunk: list[str], wait_seconds: int) -> list[di
39
42
for file_path in files_path_chunk
40
43
]
41
44
result = await asyncio .gather (* tasks )
45
+ if all (map (lambda i : i is None , result )):
46
+ return None
42
47
return result
43
48
44
49
45
- async def upload_single_async (client : AsyncClient , file_path : str ) -> dict :
50
+ async def upload_single_async (client : AsyncClient , file_path : str ) -> Optional [ dict ] :
46
51
"""
47
52
upload folder to ipfs
48
53
@@ -56,26 +61,36 @@ async def upload_single_async(client: AsyncClient, file_path: str) -> dict:
56
61
retry = 0
57
62
while retry < 5 :
58
63
try :
59
- response = await client .post (
64
+ response : Response = await client .post (
60
65
f"https://ipfs.infura.io:5001/api/v0/add" ,
61
66
params = {
62
67
"pin" : "true" if PIN_FILES else "false"
63
68
}, # pin=true if want to pin files
64
- auth = (PROJECT_ID , PROJECT_SECRET ),
69
+ auth = (PROJECT_ID , PROJECT_SECRET ), # type: ignore
65
70
files = {"file" : open (file_path , "rb" )},
66
71
)
67
- res_json = response .json ()
72
+ if response .status_code == 401 :
73
+ print ("Project ID and scecret is invalid" )
74
+ exit ()
75
+ res_json : dict = response .json ()
68
76
if res_json ["Name" ] != "" :
69
77
return res_json
70
78
except Exception as e :
71
79
if isinstance (e , ReadTimeout ):
72
80
print (f"upload { file_path .split ('-' )[0 ]} timeout, retry { retry } " )
81
+ elif isinstance (e , ConnectError ):
82
+ print (f"can't connect to ipfs, please check network or proxy setting" )
83
+ exit ()
84
+ else :
85
+ print (f"upload { file_path .split ('-' )[0 ]} error, exit" )
86
+ exit ()
73
87
retry += 1
88
+ return None
74
89
75
90
76
91
def upload_folder (
77
92
folder_name : str , content_type : str = "image/png"
78
- ) -> tuple [str , list [dict ]]:
93
+ ) -> tuple [Optional [ str ], Optional [ list [dict ] ]]:
79
94
"""
80
95
upload folder to ipfs
81
96
@@ -84,10 +99,10 @@ def upload_folder(
84
99
content_type (str, optional): mime file type. Defaults to "image/png".
85
100
86
101
Returns:
87
- tuple[str, list[dict]]: (folder_hash, images_dict_list)
102
+ tuple[Optional[ str], Optional[ list[dict] ]]: (folder_hash, images_dict_list)
88
103
"""
89
104
files = []
90
- extension = content_type .split (os . sep )[- 1 ]
105
+ extension = content_type .split ("/" )[- 1 ]
91
106
92
107
files = [
93
108
(file , open (os .path .join (folder_name , file ), "rb" ))
@@ -105,14 +120,15 @@ def upload_folder(
105
120
"wrap-with-directory" : "true" ,
106
121
},
107
122
files = files , # files should be List[filename, bytes]
108
- auth = (PROJECT_ID , PROJECT_SECRET ),
123
+ auth = (PROJECT_ID , PROJECT_SECRET ), # type: ignore
109
124
)
110
125
upload_folder_res_list = response .text .strip ().split ("\n " )
111
126
if (
112
127
upload_folder_res_list [0 ]
113
128
== "basic auth failure: invalid project id or project secret"
114
129
):
115
130
assert False , "invalid project id or project secret"
131
+ folder_hash : Optional [str ] = ""
116
132
try :
117
133
folder_hash = json .loads (
118
134
[i for i in upload_folder_res_list if json .loads (i )["Name" ] == "" ][0 ]
@@ -136,7 +152,7 @@ def upload_files(folder_name: str, content_type: str = "image/png") -> list[dict
136
152
Returns:
137
153
list[dict]: ipfs info list, example: [{ 'Name': str, 'Hash': str, 'Size': str }]
138
154
"""
139
- extension = content_type .split (os . sep )[- 1 ]
155
+ extension = content_type .split ("/" )[- 1 ]
140
156
file_paths = [
141
157
os .path .join (folder_name , file_path )
142
158
for file_path in list (
@@ -151,10 +167,16 @@ def upload_files(folder_name: str, content_type: str = "image/png") -> list[dict
151
167
152
168
def complete_batch_callback (images_ipfs_data ):
153
169
results .append (images_ipfs_data .result ())
170
+ if results [0 ] == None :
171
+ print ("No upload info return" )
172
+ exit ()
154
173
print (f"complete { len (results )/ len (chunks ):.2%} " )
155
174
156
175
loop = asyncio .get_event_loop ()
157
- print (f"Total { len (file_count )} files to upload, estimate time: { len (chunks )+ 10 } s" )
176
+ if file_count == 0 :
177
+ print (f"no any images in folder { IMAGES } " )
178
+ exit ()
179
+ print (f"Total { file_count } files to upload, estimate time: { len (chunks )+ 10 } s" )
158
180
for epoch , path_chunk in enumerate (chunks ):
159
181
task = loop .create_task (upload_task (path_chunk , epoch ))
160
182
tasks .append (task )
@@ -185,8 +207,10 @@ def generate_metadata(
185
207
Returns:
186
208
tuple[int, int]: (start_id, end_id)
187
209
"""
210
+ index : Optional [int ]
188
211
for idx , row in df .iterrows ():
189
212
path = row ["path" ]
213
+ idx = cast (int , idx )
190
214
index = idx + start_id
191
215
image_dict = next (
192
216
filter (
@@ -227,7 +251,7 @@ def read_images_from_local() -> list[dict]:
227
251
list[dict]: images ipfs info
228
252
"""
229
253
with open ("image_ipfs_data.backup" , "r" ) as f :
230
- result = json .loads (f .read ())
254
+ result : list [ dict ] = json .loads (f .read ())
231
255
print (f"read { len (result )} ipfs data from local" )
232
256
return result
233
257
0 commit comments