22
22
from .test_go import go_get_surface_blobs
23
23
from sumo .wrapper import SumoClient
24
24
from fmu .sumo .explorer .objects import CaseCollection , Case , SurfaceCollection
25
+ from src .services .utils .perf_timer import PerfTimer
26
+ import base64
27
+ from io import BytesIO
25
28
26
- router = APIRouter ()
27
29
LOGGER = logging .getLogger (__name__ )
30
+ router = APIRouter ()
31
+
28
32
29
33
cache = Cache (Cache .MEMORY , ttl = 3600 )
30
34
# @router.post("/well_intersection_reals_from_user_session")
@@ -68,22 +72,17 @@ async def well_intersection_reals_from_user_session(
68
72
) -> List [schemas .SurfaceIntersectionPoints ]:
69
73
body = await request .json ()
70
74
polyline = schemas .FencePolyline (** body .get ("polyline" ))
71
-
75
+ timer = PerfTimer ()
72
76
# Config
73
77
case_uuid = polyline .case_uuid
74
78
snames = polyline .names
75
79
sattr = polyline .attribute
76
80
ensemble_name = polyline .ensemble_name
77
- sumo_client = SumoClient (env = "prod" , token = authenticated_user .get_sumo_access_token (), interactive = False )
78
- case_collection = CaseCollection (sumo_client ).filter (uuid = case_uuid )
79
- case = case_collection [0 ]
80
- surface_collection = case .surfaces .filter (
81
- iteration = ensemble_name ,
82
- name = snames ,
83
- tagname = sattr ,
84
- realization = polyline .realization_nums ,
85
- )
86
- uuids = [surf .uuid for surf in surface_collection ]
81
+ # Get uuids
82
+ uuids = get_uuids (case_uuid , ensemble_name , polyline , snames , sattr , authenticated_user .get_sumo_access_token ())
83
+ elapsed_meta = timer .lap_ms ()
84
+
85
+ # Check if cached
87
86
uuids_to_download = []
88
87
surfaces = []
89
88
for uuid in uuids :
@@ -92,13 +91,22 @@ async def well_intersection_reals_from_user_session(
92
91
uuids_to_download .append (uuid )
93
92
else :
94
93
surfaces .append (possible_surface )
95
- downloaded_surface_dict = go_get_surface_blobs (
96
- authenticated_user .get_sumo_access_token (), case_uuid , uuids_to_download
97
- )
94
+ elapsed_cache = timer .lap_ms ()
95
+
96
+ # Download remaining
97
+ data_map_b64 = go_get_surface_blobs (authenticated_user .get_sumo_access_token (), case_uuid , uuids_to_download )
98
+ elapsed_download = timer .lap_ms ()
99
+
100
+ # Convert to xtgeo
101
+ downloaded_surface_dict = await b64_to_xtgeo (data_map_b64 )
102
+ elapsed_xtgeo = timer .lap_ms ()
103
+
104
+ # Add to cache
98
105
for uuid , surface in downloaded_surface_dict .items ():
99
106
# await cache.set(f"{authenticated_user._user_id}-{uuid}", surface)
100
107
surfaces .append (surface )
101
108
109
+ # Intersect
102
110
fence_arr = np .array (
103
111
[
104
112
polyline .x_points ,
@@ -107,8 +115,52 @@ async def well_intersection_reals_from_user_session(
107
115
polyline .cum_length ,
108
116
]
109
117
).T
118
+ intersections = await make_intersections (surfaces , fence_arr )
119
+ elapsed_intersect = timer .lap_ms ()
110
120
111
- def worker (surf ):
121
+ result = [intersection .dict () for intersection in intersections ]
122
+ elapsed_response_format = timer .lap_ms ()
123
+ LOGGER .info (
124
+ f"Got intersected surface set from Sumo: { timer .elapsed_ms ()} ms ("
125
+ f"meta={ elapsed_meta } ms, "
126
+ f"cache={ elapsed_cache } ms, "
127
+ f"download={ elapsed_download } ms, "
128
+ f"xtgeo={ elapsed_xtgeo } ms, "
129
+ f"intersect={ elapsed_intersect } ms, "
130
+ f"response_format={ elapsed_response_format } ms) " ,
131
+ extra = {
132
+ "meta" : elapsed_meta ,
133
+ "cache" : elapsed_cache ,
134
+ "download" : elapsed_download ,
135
+ "xtgeo" : elapsed_xtgeo ,
136
+ "intersect" : elapsed_intersect ,
137
+ "response_format" : elapsed_response_format ,
138
+ },
139
+ )
140
+ return ORJSONResponse (result )
141
+
142
+
143
+ async def b64_to_xtgeo (data_map_b64 ):
144
+ def to_xtgeo (object_id , b64_blob ):
145
+ bytestr = base64 .b64decode (b64_blob )
146
+ xtgeo_surface = xtgeo .surface_from_file (BytesIO (bytestr ), fformat = "irap_binary" )
147
+ return {object_id : xtgeo_surface }
148
+
149
+ loop = asyncio .get_running_loop ()
150
+ with ThreadPoolExecutor () as executor :
151
+ tasks = [
152
+ loop .run_in_executor (executor , to_xtgeo , object_id , b64_blob )
153
+ for object_id , b64_blob in data_map_b64 .items ()
154
+ ]
155
+ results = await asyncio .gather (* tasks )
156
+ downloaded_surface_dict = {}
157
+ for result in results :
158
+ downloaded_surface_dict .update (result )
159
+ return downloaded_surface_dict
160
+
161
+
162
+ async def make_intersections (surfaces , fence_arr ):
163
+ def make_intersection (surf ):
112
164
line = surf .get_randomline (fence_arr )
113
165
intersection = schemas .SurfaceIntersectionPoints (
114
166
name = f"{ surf .name } " ,
@@ -120,8 +172,19 @@ def worker(surf):
120
172
loop = asyncio .get_running_loop ()
121
173
122
174
with ThreadPoolExecutor () as executor :
123
- tasks = [loop .run_in_executor (executor , worker , surf ) for surf in surfaces ]
175
+ tasks = [loop .run_in_executor (executor , make_intersection , surf ) for surf in surfaces ]
124
176
intersections = await asyncio .gather (* tasks )
177
+ return intersections
125
178
126
- result = [intersection .dict () for intersection in intersections ]
127
- return ORJSONResponse (result )
179
+
180
+ def get_uuids (case_uuid , ensemble_name , polyline , snames , sattr , bearer_token ):
181
+ sumo_client = SumoClient (env = "prod" , token = bearer_token , interactive = False )
182
+ case_collection = CaseCollection (sumo_client ).filter (uuid = case_uuid )
183
+ case = case_collection [0 ]
184
+ surface_collection = case .surfaces .filter (
185
+ iteration = ensemble_name ,
186
+ name = snames ,
187
+ tagname = sattr ,
188
+ realization = polyline .realization_nums ,
189
+ )
190
+ return [surf .uuid for surf in surface_collection ]
0 commit comments