3
3
4
4
from typing import List , Tuple
5
5
import logging
6
- from concurrent .futures import ThreadPoolExecutor
6
+ from concurrent .futures import ProcessPoolExecutor
7
7
from aiocache import Cache
8
8
9
9
import numpy as np
@@ -80,7 +80,9 @@ async def well_intersection_reals_from_user_session(
80
80
extra = {"total_mb" : tot_mb , "mb_per_s" : tot_mb / dl_time_s },
81
81
)
82
82
83
- surfs = [xtgeo .surface_from_file (BytesIO (bytestr ), fformat = "irap_binary" ) for bytestr in res_array ]
83
+ # surfs = [xtgeo.surface_from_file(BytesIO(bytestr), fformat="irap_binary") for bytestr in res_array]
84
+ surfaces = await load_xtgeo (res_array )
85
+ print (f"convert surfs: { timer .lap_s ():.2f} s" , flush = True )
84
86
fence_arr = np .array (
85
87
[
86
88
surface_fence_spec .x_points ,
@@ -89,16 +91,9 @@ async def well_intersection_reals_from_user_session(
89
91
surface_fence_spec .cum_length ,
90
92
]
91
93
).T
94
+ intersections = await make_intersections (surfaces , fence_arr )
92
95
93
- for surf in surfs :
94
- line = surf .get_randomline (fence_arr )
95
- intersection = schemas .SurfaceIntersectionPoints (
96
- name = f"{ surf .name } " ,
97
- cum_length = line [:, 0 ].tolist (),
98
- z_array = line [:, 1 ].tolist (),
99
- )
100
- intersections .append (intersection )
101
-
96
+ print (f"intersect surfs: { timer .lap_s ():.2f} s" , flush = True )
102
97
return ORJSONResponse ([section .dict () for section in intersections ])
103
98
104
99
@@ -212,24 +207,37 @@ def to_xtgeo(object_id, b64_blob):
212
207
return downloaded_surface_dict
213
208
214
209
215
- async def make_intersections ( surfaces , fence_arr ):
216
- def make_intersection ( surf ):
217
- line = surf . get_randomline ( fence_arr )
218
- intersection = schemas . SurfaceIntersectionPoints (
219
- name = f" { surf . name } " ,
220
- cum_length = line [:, 0 ].tolist (),
221
- z_array = line [:, 1 ]. tolist (),
222
- )
223
- return intersection
210
+ def make_intersection ( surf , fence_arr ):
211
+ line = surf . get_randomline ( fence_arr )
212
+ intersection = schemas . SurfaceIntersectionPoints (
213
+ name = f" { surf . name } " ,
214
+ cum_length = line [:, 0 ]. tolist () ,
215
+ z_array = line [:, 1 ].tolist (),
216
+ )
217
+ return intersection
218
+
224
219
220
+ async def make_intersections (surfaces , fence_arr ):
225
221
loop = asyncio .get_running_loop ()
226
222
227
- with ThreadPoolExecutor () as executor :
228
- tasks = [loop .run_in_executor (executor , make_intersection , surf ) for surf in surfaces ]
223
+ with ProcessPoolExecutor () as executor :
224
+ tasks = [loop .run_in_executor (executor , make_intersection , surf , fence_arr ) for surf in surfaces ]
229
225
intersections = await asyncio .gather (* tasks )
230
226
return intersections
231
227
232
228
229
+ def load_surf (bytestr ) -> xtgeo .RegularSurface :
230
+ return xtgeo .surface_from_file (BytesIO (bytestr ), fformat = "irap_binary" )
231
+
232
+
233
+ async def load_xtgeo (res_array ):
234
+ loop = asyncio .get_running_loop ()
235
+ with ProcessPoolExecutor () as executor :
236
+ tasks = [loop .run_in_executor (executor , load_surf , bytestr ) for bytestr in res_array ]
237
+ surfaces = await asyncio .gather (* tasks )
238
+ return surfaces
239
+
240
+
233
241
def get_uuids (case_uuid , ensemble_name , realization_nums , snames , sattr , bearer_token ):
234
242
sumo_client = SumoClient (env = "prod" , token = bearer_token , interactive = False )
235
243
case_collection = CaseCollection (sumo_client ).filter (uuid = case_uuid )
0 commit comments