24
24
from io import BytesIO
25
25
from azure .storage .blob .aio import BlobClient , ContainerClient
26
26
import requests
27
+ from xtgeo .surface ._regsurf_import import _import_irap_binary_purepy
28
+ from xtgeo import _XTGeoFile
29
+ from struct import unpack
30
+ from xtgeo .common .constants import UNDEF_MAP_IRAPA , UNDEF_MAP_IRAPB
27
31
28
32
LOGGER = logging .getLogger (__name__ )
29
33
router = APIRouter ()
@@ -55,43 +59,56 @@ async def well_intersection_reals_from_user_session(
55
59
realization_surface_set_spec ,
56
60
authenticated_user .get_sumo_access_token (),
57
61
)
58
- base_uri , auth_token = get_base_uri_and_auth_token_for_case (
59
- case_uuid , "prod" , authenticated_user .get_sumo_access_token ()
60
- )
61
62
62
- async with ContainerClient .from_container_url (
63
- container_url = base_uri , credential = auth_token
64
- ) as container_client :
65
- coro_array = []
66
- timer = PerfTimer ()
63
+ res_array = await download_surface_blobs (
64
+ case_uuid , authenticated_user .get_sumo_access_token (), uuids
65
+ )
67
66
68
- for uuid in uuids :
69
- coro_array .append (
70
- download_blob (container_client = container_client , sumo_surf_uuid = uuid )
71
- )
72
- res_array = await asyncio .gather (* coro_array )
73
- elapsed_download = timer .lap_s ()
67
+ elapsed_download = timer .lap_s ()
74
68
75
- tot_mb = 0
76
- for res in res_array :
77
- tot_mb += len (res ) / (1024 * 1024 )
69
+ tot_mb = 0
70
+ for res in res_array :
71
+ tot_mb += len (res ) / (1024 * 1024 )
72
+ bytesios = [BytesIO (bytestr ) for bytestr in res_array ]
73
+ xtgeofiles = [_XTGeoFile (bytestr ) for bytestr in bytesios ]
74
+ test = [BytesIO (bytestr ).read () for bytestr in res_array ]
75
+ elapsed_bytesio = timer .lap_s ()
78
76
79
- surfaces = await load_xtgeo (res_array )
77
+ surfaces = [
78
+ xtgeo .surface_from_file (bytestr , fformat = "irap_binary" ) for bytestr in bytesios
79
+ ]
80
80
elapsed_xtgeo = timer .lap_s ()
81
81
82
+ # surfaces2 = [
83
+ # _import_irap_binary_purepy(xtgeofile, values=True) for xtgeofile in xtgeofiles
84
+ # ]
85
+
86
+ for idx , byteio in enumerate (bytesios ):
87
+ byteio .seek (0 )
88
+ buf = byteio .read ()
89
+ header = read_header (buf )
90
+ values = read_values_optimized (header , buf )
91
+ surfaces [idx ].values = values
92
+ del buf
93
+
94
+ elapsed_xtgeo2 = timer .lap_s ()
82
95
intersections = await make_intersections (surfaces , surface_fence_spec )
83
96
elapsed_intersect = timer .lap_s ()
84
97
85
98
LOGGER .info (
86
99
f"Got intersected surface set from Sumo: { timer .elapsed_ms ()} ms ("
87
100
f"download={ elapsed_download } ms, "
101
+ f"bytesio={ elapsed_bytesio } ms, "
88
102
f"xtgeo={ elapsed_xtgeo } ms, "
103
+ f"xtgeo2={ elapsed_xtgeo2 } ms, "
89
104
f"intersect={ elapsed_intersect } ms, "
90
105
f"size={ tot_mb :.2f} MB, "
91
106
f"speed={ tot_mb / elapsed_download :.2f} MB/s)" ,
92
107
extra = {
93
108
"download" : elapsed_download ,
109
+ "bytesio" : elapsed_bytesio ,
94
110
"xtgeo" : elapsed_xtgeo ,
111
+ "xtgeo2" : elapsed_xtgeo2 ,
95
112
"intersect" : elapsed_intersect ,
96
113
"size" : tot_mb ,
97
114
"speed" : tot_mb / elapsed_download ,
@@ -100,6 +117,93 @@ async def well_intersection_reals_from_user_session(
100
117
return ORJSONResponse ([section .dict () for section in intersections ])
101
118
102
119
120
+ def read_header (buf ):
121
+ # unpack header with big-endian format string
122
+ hed = unpack (">3i6f3i3f10i" , buf [:100 ])
123
+
124
+ args = {}
125
+ args ["nrow" ] = hed [2 ]
126
+ args ["xori" ] = hed [3 ]
127
+ args ["yori" ] = hed [5 ]
128
+ args ["xinc" ] = hed [7 ]
129
+ args ["yinc" ] = hed [8 ]
130
+ args ["ncol" ] = hed [11 ]
131
+ args ["rotation" ] = hed [12 ]
132
+
133
+ args ["yflip" ] = 1
134
+ if args ["yinc" ] < 0.0 :
135
+ args ["yinc" ] *= - 1
136
+ args ["yflip" ] = - 1
137
+
138
+ return args
139
+
140
+
141
+ def read_values_optimized (header , buf ):
142
+ stv = 100
143
+ n_blocks = (len (buf ) - stv ) // (
144
+ header ["ncol" ] * 4 + 8
145
+ ) # approximate number of blocks
146
+ datav = np .empty (
147
+ (header ["ncol" ] * n_blocks ,), dtype = np .float32
148
+ ) # preallocated array
149
+
150
+ idx = 0
151
+ while stv < len (buf ):
152
+ blockv = unpack (">i" , buf [stv : stv + 4 ])[0 ]
153
+ stv += 4
154
+ datav [idx : idx + blockv // 4 ] = np .frombuffer (
155
+ buf [stv : blockv + stv ], dtype = ">f4"
156
+ )
157
+ idx += blockv // 4
158
+ stv += blockv + 4
159
+
160
+ values = np .reshape (datav [:idx ], (header ["ncol" ], header ["nrow" ]), order = "F" )
161
+ return np .ma .masked_greater_equal (values , UNDEF_MAP_IRAPB )
162
+
163
+
164
+ def read_values (header , buf ):
165
+ # Values: traverse through data blocks
166
+ stv = 100 # Starting byte
167
+ datav = []
168
+
169
+ while True :
170
+ # start block integer - number of bytes of floats in following block
171
+ blockv = unpack (">i" , buf [stv : stv + 4 ])[0 ]
172
+ stv += 4
173
+ # floats
174
+ datav .append (
175
+ np .array (unpack (">" + str (int (blockv / 4 )) + "f" , buf [stv : blockv + stv ]))
176
+ )
177
+ stv += blockv
178
+ # end block integer not needed really
179
+ _ = unpack (">i" , buf [stv : stv + 4 ])[0 ]
180
+ stv += 4
181
+ if stv == len (buf ):
182
+ break
183
+
184
+ values = np .hstack (datav )
185
+ values = np .reshape (values , (header ["ncol" ], header ["nrow" ]), order = "F" )
186
+ values = np .array (values , order = "C" )
187
+ values = np .ma .masked_greater_equal (values , UNDEF_MAP_IRAPB )
188
+ return np .ma .masked_invalid (values )
189
+
190
+
191
+ async def download_surface_blobs (case_uuid , access_token , uuids ):
192
+ base_uri , auth_token = get_base_uri_and_auth_token_for_case (
193
+ case_uuid , "prod" , access_token
194
+ )
195
+ async with ContainerClient .from_container_url (
196
+ container_url = base_uri , credential = auth_token
197
+ ) as container_client :
198
+ coro_array = []
199
+ for uuid in uuids :
200
+ coro_array .append (
201
+ download_blob (container_client = container_client , sumo_surf_uuid = uuid )
202
+ )
203
+ res_array = await asyncio .gather (* coro_array )
204
+ return res_array
205
+
206
+
103
207
async def download_blob (container_client : ContainerClient , sumo_surf_uuid ):
104
208
blob_client : BlobClient = container_client .get_blob_client (blob = sumo_surf_uuid )
105
209
download_stream = await blob_client .download_blob (
@@ -139,7 +243,7 @@ async def make_intersections(surfaces, surface_fence_spec):
139
243
140
244
141
245
def load_surf (bytestr ) -> xtgeo .RegularSurface :
142
- return xtgeo .surface_from_file (BytesIO ( bytestr ) , fformat = "irap_binary" )
246
+ return xtgeo .surface_from_file (bytestr , fformat = "irap_binary" )
143
247
144
248
145
249
async def load_xtgeo (res_array ):
@@ -149,7 +253,7 @@ async def load_xtgeo(res_array):
149
253
# loop.run_in_executor(executor, load_surf, bytestr) for bytestr in res_array
150
254
# ]
151
255
# surfaces = await asyncio.gather(*tasks)
152
- surfaces = [load_surf (bytestr ) for bytestr in res_array ]
256
+ surfaces = [load_surf (BytesIO ( bytestr ) ) for bytestr in res_array ]
153
257
return surfaces
154
258
155
259
0 commit comments