Skip to content

Commit a48182b

Browse files
jordanhunt22Convex, Inc.
authored andcommitted
[Http Actions -> Funrun] Add new function to run http actions in FunctionRunner (#29849)
GitOrigin-RevId: e73e4e3a9b937efd7f51f02ab23ddbd1d881d8e5
1 parent 70f4cfc commit a48182b

File tree

5 files changed

+315
-50
lines changed

5 files changed

+315
-50
lines changed

crates/function_runner/src/isolate_worker.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@ use isolate::{
2929
service_request_timer,
3030
RequestStatus,
3131
},
32+
HttpActionResult,
3233
IsolateConfig,
3334
};
35+
use sync_types::CanonicalizedUdfPath;
3436
#[derive(Clone)]
3537
pub(crate) struct FunctionRunnerIsolateWorker<RT: Runtime> {
3638
rt: RT,
@@ -174,6 +176,56 @@ impl<RT: Runtime> IsolateWorker<RT> for FunctionRunnerIsolateWorker<RT> {
174176
let _ = response.send(r);
175177
format!("Action: {udf_path:?}")
176178
},
179+
RequestType::HttpAction {
180+
request,
181+
environment_data,
182+
mut response,
183+
queue_timer,
184+
action_callbacks,
185+
fetch_client,
186+
log_line_sender,
187+
http_response_streamer,
188+
} => {
189+
drop(queue_timer);
190+
let timer = service_request_timer(&UdfType::HttpAction);
191+
let udf_path: CanonicalizedUdfPath =
192+
request.http_module_path.path().udf_path.clone();
193+
let environment = ActionEnvironment::new(
194+
self.rt.clone(),
195+
request.http_module_path.path().component,
196+
environment_data,
197+
request.identity,
198+
request.transaction,
199+
action_callbacks,
200+
fetch_client,
201+
log_line_sender,
202+
Some(http_response_streamer),
203+
heap_stats.clone(),
204+
request.context,
205+
);
206+
let r = environment
207+
.run_http_action(
208+
client_id,
209+
isolate,
210+
isolate_clean,
211+
request.http_module_path,
212+
request.routed_path,
213+
request.http_request,
214+
response.cancellation().boxed(),
215+
)
216+
.await;
217+
let status = match &r {
218+
Ok(outcome) => match outcome.result {
219+
// Note that the stream could potentially encounter errors later
220+
HttpActionResult::Streamed => RequestStatus::Success,
221+
HttpActionResult::Error(_) => RequestStatus::DeveloperError,
222+
},
223+
Err(_) => RequestStatus::SystemError,
224+
};
225+
finish_service_request_timer(timer, status);
226+
let _ = response.send(r);
227+
format!("Http: {udf_path:?}")
228+
},
177229
_ => {
178230
report_error(&mut anyhow::anyhow!(
179231
"Unsupported request sent to funrun isolate",

crates/function_runner/src/server.rs

Lines changed: 166 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ use common::{
1616
CoDelQueueSender,
1717
},
1818
execution_context::ExecutionContext,
19-
http::fetch::FetchClient,
19+
http::{
20+
fetch::FetchClient,
21+
RoutedHttpPath,
22+
},
2023
knobs::{
2124
FUNRUN_ISOLATE_ACTIVE_THREADS,
2225
ISOLATE_QUEUE_SIZE,
@@ -60,6 +63,7 @@ use isolate::{
6063
client::{
6164
initialize_v8,
6265
EnvironmentData,
66+
HttpActionRequest,
6367
IsolateWorkerHandle,
6468
Request as IsolateRequest,
6569
RequestType as IsolateRequestType,
@@ -75,8 +79,11 @@ use isolate::{
7579
ActionRequestParams,
7680
ConcurrencyLimiter,
7781
FunctionOutcome,
82+
HttpActionRequest as HttpActionRequestInner,
83+
HttpActionResponseStreamer,
7884
IsolateConfig,
7985
UdfCallback,
86+
ValidatedHttpPath,
8087
ValidatedPathAndArgs,
8188
};
8289
use keybroker::{
@@ -121,6 +128,38 @@ const MAX_ISOLATE_WORKERS: usize = 128;
121128
// active permits more frequently than that.
122129
const ACTIVE_CONCURRENCY_PERMITS_LOG_FREQUENCY: Duration = Duration::from_secs(10);
123130

131+
pub struct RunRequestArgs {
132+
pub instance_name: String,
133+
pub instance_secret: InstanceSecret,
134+
pub reader: Arc<dyn PersistenceReader>,
135+
pub convex_origin: ConvexOrigin,
136+
pub bootstrap_metadata: BootstrapMetadata,
137+
pub table_count_snapshot: Arc<dyn TableCountSnapshot>,
138+
pub text_index_snapshot: Arc<dyn TransactionTextSnapshot>,
139+
pub action_callbacks: Arc<dyn ActionCallbacks>,
140+
pub fetch_client: Arc<dyn FetchClient>,
141+
pub log_line_sender: Option<mpsc::UnboundedSender<LogLine>>,
142+
pub udf_type: UdfType,
143+
pub identity: Identity,
144+
pub ts: RepeatableTimestamp,
145+
pub existing_writes: FunctionWrites,
146+
pub system_env_vars: BTreeMap<EnvVarName, EnvVarValue>,
147+
pub in_memory_index_last_modified: BTreeMap<IndexId, Timestamp>,
148+
pub context: ExecutionContext,
149+
}
150+
151+
pub struct FunctionMetadata {
152+
pub path_and_args: ValidatedPathAndArgs,
153+
pub journal: QueryJournal,
154+
}
155+
156+
pub struct HttpActionMetadata {
157+
pub http_response_streamer: HttpActionResponseStreamer,
158+
pub http_module_path: ValidatedHttpPath,
159+
pub routed_path: RoutedHttpPath,
160+
pub http_request: HttpActionRequestInner,
161+
}
162+
124163
#[async_trait]
125164
pub trait StorageForInstance<RT: Runtime>: Debug + Clone + Send + Sync + 'static {
126165
/// Gets a storage impl for a instance. Agnostic to what kind of storage -
@@ -336,25 +375,64 @@ impl<RT: Runtime, S: StorageForInstance<RT>> FunctionRunnerCore<RT, S> {
336375
#[minitrace::trace]
337376
pub async fn run_function_no_retention_check(
338377
&self,
339-
instance_name: String,
340-
instance_secret: InstanceSecret,
341-
reader: Arc<dyn PersistenceReader>,
342-
convex_origin: ConvexOrigin,
343-
bootstrap_metadata: BootstrapMetadata,
344-
table_count_snapshot: Arc<dyn TableCountSnapshot>,
345-
text_index_snapshot: Arc<dyn TransactionTextSnapshot>,
346-
action_callbacks: Arc<dyn ActionCallbacks>,
347-
fetch_client: Arc<dyn FetchClient>,
348-
log_line_sender: Option<mpsc::UnboundedSender<LogLine>>,
349-
path_and_args: ValidatedPathAndArgs,
350-
udf_type: UdfType,
351-
identity: Identity,
352-
ts: RepeatableTimestamp,
353-
existing_writes: FunctionWrites,
354-
journal: QueryJournal,
355-
system_env_vars: BTreeMap<EnvVarName, EnvVarValue>,
356-
in_memory_index_last_modified: BTreeMap<IndexId, Timestamp>,
357-
context: ExecutionContext,
378+
run_request_args: RunRequestArgs,
379+
function_metadata: FunctionMetadata,
380+
) -> anyhow::Result<(
381+
Option<FunctionFinalTransaction>,
382+
FunctionOutcome,
383+
FunctionUsageStats,
384+
)> {
385+
if run_request_args.udf_type == UdfType::HttpAction {
386+
anyhow::bail!("You can't run http actions from this method");
387+
}
388+
self.run_function_no_retention_check_inner(run_request_args, Some(function_metadata), None)
389+
.await
390+
}
391+
392+
#[minitrace::trace]
393+
pub async fn run_http_action_no_retention_check(
394+
&self,
395+
run_request_args: RunRequestArgs,
396+
http_action_metadata: HttpActionMetadata,
397+
) -> anyhow::Result<(FunctionOutcome, FunctionUsageStats)> {
398+
if run_request_args.udf_type != UdfType::HttpAction {
399+
anyhow::bail!("You can only run http actions with this method");
400+
}
401+
let (_, outcome, stats) = self
402+
.run_function_no_retention_check_inner(
403+
run_request_args,
404+
None,
405+
Some(http_action_metadata),
406+
)
407+
.await?;
408+
409+
Ok((outcome, stats))
410+
}
411+
412+
#[minitrace::trace]
413+
pub async fn run_function_no_retention_check_inner(
414+
&self,
415+
RunRequestArgs {
416+
instance_name,
417+
instance_secret,
418+
reader,
419+
convex_origin,
420+
bootstrap_metadata,
421+
table_count_snapshot,
422+
text_index_snapshot,
423+
action_callbacks,
424+
fetch_client,
425+
log_line_sender,
426+
udf_type,
427+
identity,
428+
ts,
429+
existing_writes,
430+
system_env_vars,
431+
in_memory_index_last_modified,
432+
context,
433+
}: RunRequestArgs,
434+
function_metadata: Option<FunctionMetadata>,
435+
http_action_metadata: Option<HttpActionMetadata>,
358436
) -> anyhow::Result<(
359437
Option<FunctionFinalTransaction>,
360438
FunctionOutcome,
@@ -410,6 +488,10 @@ impl<RT: Runtime, S: StorageForInstance<RT>> FunctionRunnerCore<RT, S> {
410488

411489
match udf_type {
412490
UdfType::Query | UdfType::Mutation => {
491+
let FunctionMetadata {
492+
path_and_args,
493+
journal,
494+
} = function_metadata.context("Missing function metadata for query or mutation")?;
413495
let (tx, rx) = oneshot::channel();
414496
let request = IsolateRequest::new(
415497
instance_name,
@@ -439,6 +521,8 @@ impl<RT: Runtime, S: StorageForInstance<RT>> FunctionRunnerCore<RT, S> {
439521
))
440522
},
441523
UdfType::Action => {
524+
let FunctionMetadata { path_and_args, .. } =
525+
function_metadata.context("Missing function metadata for action")?;
442526
let (tx, rx) = oneshot::channel();
443527
let log_line_sender =
444528
log_line_sender.context("Missing log line sender for action")?;
@@ -469,7 +553,43 @@ impl<RT: Runtime, S: StorageForInstance<RT>> FunctionRunnerCore<RT, S> {
469553
))
470554
},
471555
UdfType::HttpAction => {
472-
anyhow::bail!("Funrun does not support http actions yet")
556+
let HttpActionMetadata {
557+
http_response_streamer,
558+
http_module_path,
559+
routed_path,
560+
http_request,
561+
} = http_action_metadata.context("Missing http action metadata")?;
562+
let (tx, rx) = oneshot::channel();
563+
let log_line_sender =
564+
log_line_sender.context("Missing log line sender for http action")?;
565+
let request = IsolateRequest::new(
566+
instance_name,
567+
IsolateRequestType::HttpAction {
568+
request: HttpActionRequest {
569+
http_module_path,
570+
routed_path,
571+
http_request,
572+
identity,
573+
transaction,
574+
context,
575+
},
576+
response: tx,
577+
queue_timer: queue_timer(),
578+
action_callbacks,
579+
fetch_client,
580+
log_line_sender,
581+
http_response_streamer,
582+
environment_data,
583+
},
584+
EncodedSpan::from_parent(),
585+
);
586+
self.send_request(request)?;
587+
let outcome = Self::receive_response(rx).await??;
588+
Ok((
589+
None,
590+
FunctionOutcome::HttpAction(outcome),
591+
usage_tracker.gather_user_stats(),
592+
))
473593
},
474594
}
475595
}
@@ -593,31 +713,37 @@ impl<RT: Runtime> FunctionRunner<RT> for InProcessFunctionRunner<RT> {
593713
.upgrade()
594714
.context(shutdown_error())?;
595715

716+
let request_metadata = RunRequestArgs {
717+
instance_name: self.instance_name.clone(),
718+
instance_secret: self.instance_secret,
719+
reader: self.persistence_reader.clone(),
720+
convex_origin: self.convex_origin.clone(),
721+
bootstrap_metadata: self.database.bootstrap_metadata.clone(),
722+
table_count_snapshot,
723+
text_index_snapshot,
724+
action_callbacks,
725+
fetch_client: self.fetch_client.clone(),
726+
log_line_sender,
727+
udf_type,
728+
identity,
729+
ts,
730+
existing_writes,
731+
system_env_vars,
732+
in_memory_index_last_modified,
733+
context,
734+
};
735+
596736
// NOTE: We run the function without checking retention until after the
597737
// function execution. It is important that we do not surface any errors
598738
// or results until after we call `validate_run_function_result` below.
599739
let result = self
600740
.server
601741
.run_function_no_retention_check(
602-
self.instance_name.clone(),
603-
self.instance_secret,
604-
self.persistence_reader.clone(),
605-
self.convex_origin.clone(),
606-
self.database.bootstrap_metadata.clone(),
607-
table_count_snapshot,
608-
text_index_snapshot,
609-
action_callbacks,
610-
self.fetch_client.clone(),
611-
log_line_sender,
612-
path_and_args,
613-
udf_type,
614-
identity,
615-
ts,
616-
existing_writes,
617-
journal,
618-
system_env_vars,
619-
in_memory_index_last_modified,
620-
context,
742+
request_metadata,
743+
FunctionMetadata {
744+
path_and_args,
745+
journal,
746+
},
621747
)
622748
.await;
623749
validate_run_function_result(udf_type, *ts, self.database.retention_validator()).await?;

crates/isolate/src/client.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -387,12 +387,12 @@ pub struct UdfRequest<RT: Runtime> {
387387
}
388388

389389
pub struct HttpActionRequest<RT: Runtime> {
390-
http_module_path: ValidatedHttpPath,
391-
routed_path: RoutedHttpPath,
392-
http_request: http_action::HttpActionRequest,
393-
transaction: Transaction<RT>,
394-
identity: Identity,
395-
context: ExecutionContext,
390+
pub http_module_path: ValidatedHttpPath,
391+
pub routed_path: RoutedHttpPath,
392+
pub http_request: http_action::HttpActionRequest,
393+
pub transaction: Transaction<RT>,
394+
pub identity: Identity,
395+
pub context: ExecutionContext,
396396
}
397397

398398
pub struct ActionRequest<RT: Runtime> {

0 commit comments

Comments
 (0)