Skip to content

Commit 1703528

Browse files
wip
1 parent ef5f8c9 commit 1703528

File tree

2 files changed

+69
-145
lines changed

2 files changed

+69
-145
lines changed

backend/src/backend/primary/user_session_proxy.py

+11-5
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
class _RedisUserJobs:
1818
def __init__(self) -> None:
1919
# redis.Redis does not yet have namespace support - https://github.com/redis/redis-py/issues/12 - need to prefix manually.
20-
self._redis_client = redis.Redis.from_url(config.REDIS_USER_SESSION_URL, decode_responses=True)
20+
self._redis_client = redis.Redis.from_url(
21+
config.REDIS_USER_SESSION_URL, decode_responses=True
22+
)
2123

2224
def get_job_name(self, user_id: str) -> Optional[str]:
2325
return self._redis_client.get("user-job-name:" + user_id)
@@ -51,7 +53,9 @@ async def _active_running_job(self, user_id: str) -> bool:
5153
return True
5254

5355
async with httpx.AsyncClient() as client:
54-
res = await client.get(f"http://{self._name}:{self._port}/api/v1/jobs/{existing_job_name}")
56+
res = await client.get(
57+
f"http://{self._name}:{self._port}/api/v1/jobs/{existing_job_name}"
58+
)
5559

5660
job = res.json()
5761

@@ -88,8 +92,8 @@ async def _create_new_job(self, user_id: str) -> None:
8892
# these could be dynamic based on e.g. the selected ensemble sizess by the user.
8993
json={
9094
"resources": {
91-
"limits": {"memory": "64GiB", "cpu": "4"},
92-
"requests": {"memory": "32GiB", "cpu": "2"},
95+
"limits": {"memory": "128GiB", "cpu": "16"},
96+
"requests": {"memory": "32GiB", "cpu": "4"},
9397
}
9498
},
9599
)
@@ -115,7 +119,9 @@ async def get_base_url(self, user_id: str) -> str:
115119
RADIX_JOB_SCHEDULER_INSTANCE = RadixJobScheduler("backend-user-session", 8000)
116120

117121

118-
async def proxy_to_user_session(request: Request, authenticated_user: AuthenticatedUser) -> Any:
122+
async def proxy_to_user_session(
123+
request: Request, authenticated_user: AuthenticatedUser
124+
) -> Any:
119125
# Ideally this function should probably be a starlette/FastAPI middleware, but it appears that
120126
# it is not yet possible to put middleware on single routes through decorator like in express.js.
121127

backend/src/backend/user_session/routers/surface/router.py

+58-140
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,15 @@
1414
from fastapi import APIRouter, Depends, Request, Body
1515

1616
from src.backend.auth.auth_helper import AuthenticatedUser, AuthHelper
17-
18-
from src.services.sumo_access.surface_access import SurfaceAccess
1917
import asyncio
2018
from src.backend.primary.routers.surface import schemas
21-
from .test_async import async_get_cached_surf
22-
from .test_go import go_get_surface_blobs
19+
2320
from sumo.wrapper import SumoClient
24-
from fmu.sumo.explorer.objects import CaseCollection, Case, SurfaceCollection
21+
from fmu.sumo.explorer.objects import CaseCollection
2522
from src.services.utils.perf_timer import PerfTimer
26-
import base64
23+
2724
from io import BytesIO
28-
from azure.storage.blob.aio import BlobServiceClient, BlobClient, ContainerClient
25+
from azure.storage.blob.aio import BlobClient, ContainerClient
2926
import requests
3027

3128
LOGGER = logging.getLogger(__name__)
@@ -42,169 +39,73 @@ async def well_intersection_reals_from_user_session(
4239
) -> List[schemas.SurfaceIntersectionPoints]:
4340
body = await request.json()
4441
ensemble_ident = schemas.EnsembleIdent(**body.get("ensemble_ident"))
45-
realization_surface_set_spec = schemas.RealizationsSurfaceSetSpec(**body.get("realizations_surface_set_spec"))
42+
realization_surface_set_spec = schemas.RealizationsSurfaceSetSpec(
43+
**body.get("realizations_surface_set_spec")
44+
)
4645
surface_fence_spec = schemas.SurfaceFenceSpec(**body.get("surface_fence_spec"))
4746
timer = PerfTimer()
4847
# Config
4948
case_uuid = ensemble_ident.case_uuid
50-
snames = realization_surface_set_spec.surface_names
51-
sattr = realization_surface_set_spec.surface_attribute
5249
ensemble_name = ensemble_ident.ensemble_name
53-
realization_nums = realization_surface_set_spec.realization_nums
5450
intersections = []
5551

56-
uuids = get_uuids(
57-
case_uuid, ensemble_name, realization_nums, snames, sattr, authenticated_user.get_sumo_access_token()
52+
uuids = get_surface_set_uuids(
53+
case_uuid,
54+
ensemble_name,
55+
realization_surface_set_spec,
56+
authenticated_user.get_sumo_access_token(),
5857
)
5958
base_uri, auth_token = get_base_uri_and_auth_token_for_case(
6059
case_uuid, "prod", authenticated_user.get_sumo_access_token()
6160
)
6261

63-
async with ContainerClient.from_container_url(container_url=base_uri, credential=auth_token) as container_client:
62+
async with ContainerClient.from_container_url(
63+
container_url=base_uri, credential=auth_token
64+
) as container_client:
6465
coro_array = []
6566
timer = PerfTimer()
6667

6768
for uuid in uuids:
68-
coro_array.append(my_download_to_file(container_client=container_client, sumo_surf_uuid=uuid))
69+
coro_array.append(
70+
download_blob(container_client=container_client, sumo_surf_uuid=uuid)
71+
)
6972
res_array = await asyncio.gather(*coro_array)
70-
dl_time_s = timer.lap_s()
71-
print(f"download surfs: {dl_time_s:.2f}s", flush=True)
72-
LOGGER.info(f"download surfs: {dl_time_s:.2f}s", extra={"download_surfs": dl_time_s})
73+
elapsed_download = timer.lap_s()
7374

7475
tot_mb = 0
7576
for res in res_array:
7677
tot_mb += len(res) / (1024 * 1024)
77-
print(f"Total MB downloaded: {tot_mb:.2f}MB => {tot_mb/dl_time_s:.2f}MB/s", flush=True)
78-
LOGGER.info(
79-
f"Total MB downloaded: {tot_mb:.2f}MB => {tot_mb/dl_time_s:.2f}MB/s",
80-
extra={"total_mb": tot_mb, "mb_per_s": tot_mb / dl_time_s},
81-
)
8278

83-
# surfs = [xtgeo.surface_from_file(BytesIO(bytestr), fformat="irap_binary") for bytestr in res_array]
8479
surfaces = await load_xtgeo(res_array)
85-
print(f"convert surfs: {timer.lap_s():.2f}s", flush=True)
86-
fence_arr = np.array(
87-
[
88-
surface_fence_spec.x_points,
89-
surface_fence_spec.y_points,
90-
np.zeros(len(surface_fence_spec.y_points)),
91-
surface_fence_spec.cum_length,
92-
]
93-
).T
94-
intersections = await make_intersections(surfaces, fence_arr)
95-
96-
print(f"intersect surfs: {timer.lap_s():.2f}s", flush=True)
97-
return ORJSONResponse([section.dict() for section in intersections])
98-
80+
elapsed_xtgeo = timer.lap_s()
9981

100-
async def my_download_to_file(container_client: ContainerClient, sumo_surf_uuid):
101-
blob_client: BlobClient = container_client.get_blob_client(blob=sumo_surf_uuid)
102-
download_stream = await blob_client.download_blob(
103-
max_concurrency=4,
104-
)
105-
return await download_stream.readall()
106-
107-
108-
@router.post("/well_intersection_reals_from_user_session")
109-
async def well_intersection_reals_from_user_session2(
110-
request: Request,
111-
authenticated_user: AuthenticatedUser = Depends(AuthHelper.get_authenticated_user),
112-
) -> List[schemas.SurfaceIntersectionPoints]:
113-
body = await request.json()
114-
115-
ensemble_ident = schemas.EnsembleIdent(**body.get("ensemble_ident"))
116-
realization_surface_set_spec = schemas.RealizationsSurfaceSetSpec(**body.get("realizations_surface_set_spec"))
117-
surface_fence_spec = schemas.SurfaceFenceSpec(**body.get("surface_fence_spec"))
118-
timer = PerfTimer()
119-
# Config
120-
case_uuid = ensemble_ident.case_uuid
121-
snames = realization_surface_set_spec.surface_names
122-
sattr = realization_surface_set_spec.surface_attribute
123-
ensemble_name = ensemble_ident.ensemble_name
124-
realization_nums = realization_surface_set_spec.realization_nums
125-
# Get uuids
126-
uuids = get_uuids(
127-
case_uuid, ensemble_name, realization_nums, snames, sattr, authenticated_user.get_sumo_access_token()
128-
)
129-
elapsed_meta = timer.lap_ms()
130-
131-
# Check if cached
132-
uuids_to_download = []
133-
surfaces = []
134-
for uuid in uuids:
135-
possible_surface = await cache.get(f"{authenticated_user._user_id}-{uuid}")
136-
if possible_surface is None:
137-
uuids_to_download.append(uuid)
138-
else:
139-
surfaces.append(possible_surface)
140-
elapsed_cache = timer.lap_ms()
141-
if uuids_to_download:
142-
# Download remaining
143-
data_map_b64 = go_get_surface_blobs(authenticated_user.get_sumo_access_token(), case_uuid, uuids_to_download)
144-
elapsed_download = timer.lap_ms()
145-
146-
# Convert to xtgeo
147-
downloaded_surface_dict = await b64_to_xtgeo(data_map_b64)
148-
elapsed_xtgeo = timer.lap_ms()
149-
150-
# Add to cache
151-
for uuid, surface in downloaded_surface_dict.items():
152-
await cache.set(f"{authenticated_user._user_id}-{uuid}", surface)
153-
surfaces.append(surface)
154-
else:
155-
elapsed_download = 0
156-
elapsed_xtgeo = 0
157-
# Intersect
158-
fence_arr = np.array(
159-
[
160-
surface_fence_spec.x_points,
161-
surface_fence_spec.y_points,
162-
np.zeros(len(surface_fence_spec.y_points)),
163-
surface_fence_spec.cum_length,
164-
]
165-
).T
166-
intersections = await make_intersections(surfaces, fence_arr)
167-
elapsed_intersect = timer.lap_ms()
82+
intersections = await make_intersections(surfaces, surface_fence_spec)
83+
elapsed_intersect = timer.lap_s()
16884

169-
result = [intersection.dict() for intersection in intersections]
170-
elapsed_response_format = timer.lap_ms()
17185
LOGGER.info(
17286
f"Got intersected surface set from Sumo: {timer.elapsed_ms()}ms ("
173-
f"meta={elapsed_meta}ms, "
174-
f"cache={elapsed_cache}ms, "
17587
f"download={elapsed_download}ms, "
17688
f"xtgeo={elapsed_xtgeo}ms, "
17789
f"intersect={elapsed_intersect}ms, "
178-
f"response_format={elapsed_response_format}ms) ",
90+
f"size={tot_mb:.2f}MB, "
91+
f"speed={tot_mb/elapsed_download:.2f}MB/s)",
17992
extra={
180-
"meta": elapsed_meta,
181-
"cache": elapsed_cache,
18293
"download": elapsed_download,
18394
"xtgeo": elapsed_xtgeo,
18495
"intersect": elapsed_intersect,
185-
"response_format": elapsed_response_format,
96+
"size": tot_mb,
97+
"speed": tot_mb / elapsed_download,
18698
},
18799
)
188-
return ORJSONResponse(result)
189-
100+
return ORJSONResponse([section.dict() for section in intersections])
190101

191-
async def b64_to_xtgeo(data_map_b64):
192-
def to_xtgeo(object_id, b64_blob):
193-
bytestr = base64.b64decode(b64_blob)
194-
xtgeo_surface = xtgeo.surface_from_file(BytesIO(bytestr), fformat="irap_binary")
195-
return {object_id: xtgeo_surface}
196102

197-
loop = asyncio.get_running_loop()
198-
with ThreadPoolExecutor() as executor:
199-
tasks = [
200-
loop.run_in_executor(executor, to_xtgeo, object_id, b64_blob)
201-
for object_id, b64_blob in data_map_b64.items()
202-
]
203-
results = await asyncio.gather(*tasks)
204-
downloaded_surface_dict = {}
205-
for result in results:
206-
downloaded_surface_dict.update(result)
207-
return downloaded_surface_dict
103+
async def download_blob(container_client: ContainerClient, sumo_surf_uuid):
104+
blob_client: BlobClient = container_client.get_blob_client(blob=sumo_surf_uuid)
105+
download_stream = await blob_client.download_blob(
106+
max_concurrency=4,
107+
)
108+
return await download_stream.readall()
208109

209110

210111
def make_intersection(surf, fence_arr):
@@ -217,11 +118,21 @@ def make_intersection(surf, fence_arr):
217118
return intersection
218119

219120

220-
async def make_intersections(surfaces, fence_arr):
121+
async def make_intersections(surfaces, surface_fence_spec):
221122
loop = asyncio.get_running_loop()
222-
123+
fence_arr = np.array(
124+
[
125+
surface_fence_spec.x_points,
126+
surface_fence_spec.y_points,
127+
np.zeros(len(surface_fence_spec.y_points)),
128+
surface_fence_spec.cum_length,
129+
]
130+
).T
223131
with ProcessPoolExecutor() as executor:
224-
tasks = [loop.run_in_executor(executor, make_intersection, surf, fence_arr) for surf in surfaces]
132+
tasks = [
133+
loop.run_in_executor(executor, make_intersection, surf, fence_arr)
134+
for surf in surfaces
135+
]
225136
intersections = await asyncio.gather(*tasks)
226137
return intersections
227138

@@ -233,20 +144,27 @@ def load_surf(bytestr) -> xtgeo.RegularSurface:
233144
async def load_xtgeo(res_array):
234145
loop = asyncio.get_running_loop()
235146
with ProcessPoolExecutor() as executor:
236-
tasks = [loop.run_in_executor(executor, load_surf, bytestr) for bytestr in res_array]
147+
tasks = [
148+
loop.run_in_executor(executor, load_surf, bytestr) for bytestr in res_array
149+
]
237150
surfaces = await asyncio.gather(*tasks)
238151
return surfaces
239152

240153

241-
def get_uuids(case_uuid, ensemble_name, realization_nums, snames, sattr, bearer_token):
154+
def get_surface_set_uuids(
155+
case_uuid,
156+
ensemble_name,
157+
realization_surface_set_spec: schemas.RealizationsSurfaceSetSpec,
158+
bearer_token,
159+
):
242160
sumo_client = SumoClient(env="prod", token=bearer_token, interactive=False)
243161
case_collection = CaseCollection(sumo_client).filter(uuid=case_uuid)
244162
case = case_collection[0]
245163
surface_collection = case.surfaces.filter(
246164
iteration=ensemble_name,
247-
name=snames,
248-
tagname=sattr,
249-
realization=realization_nums,
165+
name=realization_surface_set_spec.surface_names,
166+
tagname=realization_surface_set_spec.surface_attribute,
167+
realization=realization_surface_set_spec.realization_nums,
250168
)
251169
return [surf.uuid for surf in surface_collection]
252170

0 commit comments

Comments
 (0)