Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ingress client #42

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

patrickariel
Copy link

@patrickariel patrickariel commented Feb 14, 2025

An initial implementation for the ingress client (#33). I'll do the macro in a separate PR, once the current implementation is finalized.

A few usage examples:

MyService
use reqwest::Url;
use restate_sdk::{
    context::RequestTarget,
    ingress::{
        request::IngressRequest,
        result::{IngressResult, ResultTarget},
        IngressClient, IntoServiceIngress, IntoServiceResult,
    },
};
use tracing::info;

pub struct MyServiceIngress<'a> {
    client: &'a IngressClient,
}

impl<'a> IntoServiceIngress<'a> for MyServiceIngress<'a> {
    fn create_ingress(client: &'a IngressClient) -> Self {
        Self { client }
    }
}

impl<'a> MyServiceIngress<'a> {
    pub fn my_handler(&self, req: String) -> IngressRequest<'a, String, String> {
        self.client
            .request(RequestTarget::service("MyService", "my_handler"), req)
    }
}

pub struct MyServiceResult<'a> {
    client: &'a IngressClient,
}

impl<'a> IntoServiceResult<'a> for MyServiceResult<'a> {
    fn create_result(client: &'a IngressClient) -> Self {
        Self { client }
    }
}

impl<'a> MyServiceResult<'a> {
    pub fn my_handler(&self, idempotency_key: impl Into<String>) -> IngressResult<'a, String> {
        self.client.result(ResultTarget::service(
            "MyService",
            "my_handler",
            idempotency_key,
        ))
    }
}

#[tokio::main]
async fn main() -> Result<(), reqwest::Error> {
    tracing_subscriber::fmt::init();

    let client = IngressClient::new(Url::parse("http://localhost:8080").unwrap());

    let my_service = client.service_ingress::<MyServiceIngress>();

    let res = my_service.my_handler("hello world".into()).call().await?;
    info!("{res:?}");

    let res = my_service
        .my_handler("hello other world".into())
        .idempotency_key("lorem_ipsum".parse().unwrap())
        .send()
        .await?;
    info!("{res:?}");

    let res = client
        .service_result::<MyServiceResult>()
        .my_handler("lorem_ipsum")
        .attach()
        .await?;
    info!("{res:?}");

    Ok(())
}
MyVirtualObject
use reqwest::Url;
use restate_sdk::{
    context::RequestTarget,
    ingress::{
        request::IngressRequest,
        result::{IngressResult, ResultTarget},
        IngressClient, IntoObjectIngress, IntoObjectResult,
    },
};
use tracing::info;

pub struct MyVirtualObjectIngress<'a> {
    client: &'a IngressClient,
    key: String,
}

impl<'a> IntoObjectIngress<'a> for MyVirtualObjectIngress<'a> {
    fn create_ingress(client: &'a IngressClient, key: String) -> Self {
        Self { client, key }
    }
}

impl<'a> MyVirtualObjectIngress<'a> {
    pub fn my_handler(&self, req: String) -> IngressRequest<'a, String, String> {
        self.client.request(
            RequestTarget::object("MyVirtualObject", &self.key, "my_handler"),
            req,
        )
    }

    pub fn my_concurrent_handler(&self, req: String) -> IngressRequest<'a, String, String> {
        self.client.request(
            RequestTarget::object("MyVirtualObject", &self.key, "my_concurrent_handler"),
            req,
        )
    }
}

pub struct MyVirtualObjectResult<'a> {
    client: &'a IngressClient,
    key: String,
}

impl<'a> IntoObjectResult<'a> for MyVirtualObjectResult<'a> {
    fn create_result(client: &'a IngressClient, key: String) -> Self {
        Self { client, key }
    }
}

impl<'a> MyVirtualObjectResult<'a> {
    pub fn my_handler(&self, idempotency_key: impl Into<String>) -> IngressResult<'a, String> {
        self.client.result(ResultTarget::object(
            "MyVirtualObject",
            &self.key,
            "my_handler",
            idempotency_key,
        ))
    }

    pub fn my_concurrent_handler(
        &self,
        idempotency_key: impl Into<String>,
    ) -> IngressResult<'a, String> {
        self.client.result(ResultTarget::object(
            "MyVirtualObject",
            &self.key,
            "my_concurrent_handler",
            idempotency_key,
        ))
    }
}

#[tokio::main]
async fn main() -> Result<(), reqwest::Error> {
    tracing_subscriber::fmt::init();

    let client = IngressClient::new(Url::parse("http://localhost:8080").unwrap());

    let res = client
        .object_ingress::<MyVirtualObjectIngress>("Jane")
        .my_handler("Mary".into())
        .call()
        .await?;
    info!("{res:?}");

    let res = client
        .object_ingress::<MyVirtualObjectIngress>("Smith")
        .my_handler("John".into())
        .idempotency_key("lorem_ipsum".parse().unwrap())
        .send()
        .await?;
    info!("{res:?}");

    let res = client
        .invocation_result::<String>(res.unwrap().invocation_id)
        .attach()
        .await?;
    info!("{res:?}");

    let res = client
        .object_result::<MyVirtualObjectResult>("Smith")
        .my_handler("lorem_ipsum")
        .attach()
        .await?;
    info!("{res:?}");

    Ok(())
}
MyWorkflow
use reqwest::Url;
use restate_sdk::{
    context::RequestTarget,
    ingress::{
        request::IngressRequest,
        result::{IngressResult, ResultTarget},
        IngressClient, IntoWorkflowIngress, IntoWorkflowResult,
    },
    prelude::*,
};
use tracing::info;

pub struct MyWorkflowIngress<'a> {
    client: &'a IngressClient,
    id: String,
}

impl<'a> IntoWorkflowIngress<'a> for MyWorkflowIngress<'a> {
    fn create_ingress(client: &'a IngressClient, id: String) -> Self {
        Self { client, id }
    }
}

impl<'a> MyWorkflowIngress<'a> {
    pub fn run(&self, req: String) -> IngressRequest<'a, String, String> {
        self.client
            .request(RequestTarget::workflow("MyWorkflow", &self.id, "run"), req)
    }

    pub fn interact_with_workflow(&self) -> IngressRequest<'a, (), ()> {
        self.client.request(
            RequestTarget::workflow("MyWorkflow", &self.id, "interact_with_workflow"),
            (),
        )
    }
}

pub struct MyWorkflowResult<'a> {
    result: IngressResult<'a, String>,
}

impl<'a> IntoWorkflowResult<'a> for MyWorkflowResult<'a> {
    fn create_result(client: &'a IngressClient, id: String) -> Self {
        Self {
            result: client.result(ResultTarget::workflow("MyWorkflow", id)),
        }
    }
}

impl<'a> MyWorkflowResult<'a> {
    pub async fn attach(self) -> Result<Result<String, TerminalError>, reqwest::Error> {
        self.result.attach().await
    }

    pub async fn output(self) -> Result<Result<String, TerminalError>, reqwest::Error> {
        self.result.output().await
    }
}

#[tokio::main]
async fn main() -> Result<(), reqwest::Error> {
    tracing_subscriber::fmt::init();

    let client = IngressClient::new(Url::parse("http://localhost:8080").unwrap());

    let res = client
        .workflow_ingress::<MyWorkflowIngress>("Me")
        .run("foobar".into())
        .send()
        .await?;
    info!("{res:?}");

    let res = client
        .workflow_result::<MyWorkflowResult>("Me")
        .attach()
        .await?;
    info!("{res:?}");

    Ok(())
}

There was an issue with idempotency keys and virtual objects, but it looks like that it got fixed on 1.2.0.

Copy link
Collaborator

@slinkydeveloper slinkydeveloper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the direction! I left few comments here and there, but overall looks good!

Would it be ok to just add the macro code on top of this PR? This way I could have a full understanding of all the moving parts here.

}

/// This struct encapsulates the parameters for retrieving a result of an invocation or workflow.
pub struct IngressResult<'a, Res = ()> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When reading the code, i've found this name confusing, Result usually has a very specific meaning in Rust. I think it should be something like AttachInvocationRequest or something like that.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file name is super confusing too :)

}

/// Create a new [`IngressResult`].
pub fn result<Res>(&self, target: ResultTarget) -> IngressResult<Res> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the naming result should be replaced by attach

Copy link
Author

@patrickariel patrickariel Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is that the method returns a type that the user will have to either call .attach() or .output() on. So calling it attach will make the chain look a bit weird (e.g. .attach().attach()).

The actual usage will look something like this (see my first post for more examples):

let res = client
    .service_result::<MyServiceResult>()
    .my_handler("lorem_ipsum") // <- this is the idempotency key
    .attach() // or .output()
    .await?;

let res = client
    .workflow_result::<MyWorkflowResult>("Me")
    .attach() // or .output()
    .await?;

The term "result" was taken from this part of the docs:

Restate allows you to retrieve the result of workflows and invocations with an idempotency key.

Do you have any suggestions for a better term? Some ideas: response, answer.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahhhh got it, i think then the name should be workflow_handle/invocation_handle, see here https://docs.restate.dev/javadocs/dev/restate/sdk/client/Client

@patrickariel
Copy link
Author

Thanks for the review!

@slinkydeveloper
Copy link
Collaborator

@patrickariel I'm not sure feature gating what the macro generates is a good idea, it might be counter-intuitive. The way I'm thinking about it is that the macro should always generate a client and link to some "trait" that is always available in restate_sdk, no matter whether you add the feature ingress-client or not. Then, depending on whether you specify the ingress-client feature, you get the concrete implementation of that trait that contains the client. This would also let people implement a client with the http client of their choice.

Let me think this through though before asking u to make any further changes to this PR...

@patrickariel
Copy link
Author

patrickariel commented Feb 20, 2025

Whoops sorry about that, pushed a few commits before I had a chance to see your comment.

This would also let people implement a client with the http client of their choice.

This is also something that I've been thinking. Maybe we can copy a pattern often seen in other Rust libraries, where a generic "executor" is passed to the domain client as late as possible. This way the user can simply implement the trait if they want to use their own custom executor. For example, in oauth2:

let client = BasicClient::new(github_client_id)
    .set_client_secret(github_client_secret)
    .set_auth_uri(auth_url)
    .set_token_uri(token_url)
    .set_redirect_uri(
        RedirectUrl::new("http://localhost:8080".to_string()).expect("Invalid redirect URL"),
    );

let http_client = reqwest::ClientBuilder::new()
    .redirect(reqwest::redirect::Policy::none())
    .build()
    .expect("Client should build");

let token_res = client.exchange_code(code).request_async(&http_client).await;

sqlx also uses a similar pattern:

async fn notify(pool: &PgPool, s: &str) -> Result<(), sqlx::Error> {
    sqlx::query(
        r#"
SELECT pg_notify(chan, payload)
FROM (VALUES ('chan0', $1)) v(chan, payload)
"#,
    )
    .bind(s)
    .execute(pool)
    .await?;

    Ok(())
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants