Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added initial version of search APIs #25

Merged
merged 10 commits into from
Apr 5, 2024
3 changes: 3 additions & 0 deletions server/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ target/

# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb

# These will be created by the sqlx CLI
*.sqlx
44 changes: 44 additions & 0 deletions server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 14 additions & 2 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@ reqwest = { version = "0.12.1", features = ["json"] }
serde = "1.0.197"
serde_json = "1.0.114"
serde_urlencoded = "0.7.1"
sqlx = { version = "0.7.4", features = ["postgres", "runtime-tokio", "tls-rustls", "migrate", "uuid", "time"] }
sqlx = { version = "0.7.4", features = [
"postgres",
"runtime-tokio",
"tls-rustls",
"migrate",
"uuid",
"time",
] }
thiserror = "1.0.58"
tokio = { version = "1.36.0", features = ["full"] }
tower-http = { version = "0.5.2", features = ["trace", "cors"] }
Expand All @@ -36,6 +43,11 @@ tower = "0.4.13"
tracing-error = "0.2.0"
tracing-log = "0.2.0"
tracing-logfmt = "0.3.4"
tracing-subscriber = { version = "0.3.18", features = ["json", "registry", "env-filter"] }
tracing-subscriber = { version = "0.3.18", features = [
"json",
"registry",
"env-filter",
] }
uuid = { version = "1.8.0", features = ["serde"] }
log = "0.4.21"
redis = { version = "0.25.2", features = ["tokio-comp", "json"] }
1 change: 1 addition & 0 deletions server/config/dev.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
db = "postgresql://postgres:postgres@localhost/curieo"
cache = "redis://127.0.0.1/"

[log]
level = "info"
Expand Down
11 changes: 11 additions & 0 deletions server/migrations/20240403115016_search_history.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
create table search_history (
search_history_id uuid primary key default uuid_generate_v1mc(),
search_text text not null,
response_text text not null,
response_sources text[] not null,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);

-- And applying our `updated_at` trigger is as easy as this.
SELECT trigger_updated_at('search_history');
10 changes: 10 additions & 0 deletions server/src/err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub enum AppError {
UnprocessableEntity(ErrorMap),
Sqlx(sqlx::Error),
GenericError(color_eyre::eyre::Error),
Redis(redis::RedisError),
}

impl AppError {
Expand All @@ -22,6 +23,7 @@ impl AppError {
Self::Unauthorized => StatusCode::UNAUTHORIZED,
Self::UnprocessableEntity { .. } => StatusCode::UNPROCESSABLE_ENTITY,
Self::Sqlx(_) | Self::GenericError(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::Redis(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}
/// Convenient constructor for `Error::UnprocessableEntity`.
Expand All @@ -47,13 +49,20 @@ impl From<sqlx::Error> for AppError {
}
}

impl From<redis::RedisError> for AppError {
fn from(inner: redis::RedisError) -> Self {
AppError::Redis(inner)
}
}

impl Display for AppError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AppError::GenericError(e) => write!(f, "{}", e),
AppError::Sqlx(e) => write!(f, "{}", e),
AppError::UnprocessableEntity(e) => write!(f, "{:?}", e),
AppError::Unauthorized => write!(f, "Unauthorized"),
AppError::Redis(e) => write!(f, "{}", e),
}
}
}
Expand Down Expand Up @@ -105,6 +114,7 @@ impl IntoResponse for AppError {
}
AppError::Sqlx(ref e) => error!("SQLx error: {:?}", e),
AppError::GenericError(ref e) => error!("Generic error: {:?}", e),
AppError::Redis(ref e) => error!("Redis error: {:?}", e),
};

// Return a http status code and json body with error message.
Expand Down
1 change: 1 addition & 0 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod auth;
mod err;
mod health_check;
pub mod routing;
pub mod search;
pub mod secrets;
pub mod settings;
pub mod startup;
Expand Down
3 changes: 2 additions & 1 deletion server/src/routing/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tracing::Level;

use crate::auth::models::{OAuth2Clients, PostgresBackend};
use crate::startup::AppState;
use crate::{auth, health_check, users};
use crate::{auth, health_check, search, users};

pub fn router(state: AppState) -> color_eyre::Result<Router> {
// Session layer.
Expand All @@ -33,6 +33,7 @@ pub fn router(state: AppState) -> color_eyre::Result<Router> {
let api_routes = Router::new()
//.nest("/search", search::routes())
//.layer(middleware::from_fn(some_auth_middleware))
.nest("/search", search::routes())
.nest("/users", users::routes())
.route_layer(login_required!(
PostgresBackend,
Expand Down
7 changes: 7 additions & 0 deletions server/src/search/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pub use models::*;
pub use routes::*;
pub use services::*;

pub mod models;
pub mod routes;
pub mod services;
33 changes: 33 additions & 0 deletions server/src/search/models.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use serde::{Deserialize, Serialize};
use sqlx::types::time;
use sqlx::FromRow;
use std::fmt::Debug;

#[derive(Serialize, Deserialize, Debug)]
pub struct SearchQueryRequest {
pub query: String,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct SearchHistoryRequest {
pub limit: Option<u8>,
pub offset: Option<u8>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct SearchResponse {
pub response_text: String,
pub response_sources: Vec<String>,
}

#[derive(FromRow, Serialize, Deserialize, Clone, Debug)]
pub struct SearchHistory {
pub search_history_id: uuid::Uuid,
// pub user_id: uuid::Uuid,
pub search_text: String,
pub response_text: String,
pub response_sources: Vec<String>,

pub created_at: time::OffsetDateTime,
pub updated_at: time::OffsetDateTime,
}
53 changes: 53 additions & 0 deletions server/src/search/routes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use crate::err::AppError;
use crate::search::services;
use crate::search::{SearchHistory, SearchHistoryRequest, SearchQueryRequest};
use crate::startup::AppState;
use axum::extract::{Query, State};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::get;
use axum::{Json, Router};
use redis::Client as RedisClient;
use sqlx::PgPool;

#[tracing::instrument(level = "debug", skip_all, ret, err(Debug))]
async fn get_search_handler(
State(pool): State<PgPool>,
State(cache): State<RedisClient>,
Query(search_query): Query<SearchQueryRequest>,
) -> crate::Result<impl IntoResponse> {
let mut connection = cache
.get_multiplexed_async_connection()
.await
.map_err(|e| AppError::from(e))?;

let search_response = services::search(&mut connection, &search_query).await?;
services::insert_search_history(&pool, &mut connection, &search_query, &search_response)
.await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know the performance characteristics of the two insertions (cache and pg). They are both async, so it shouldn't be a problem, but if we see something in the future this could be a good candidate for the spawn_blocking_with_tracing util in telemetry.rs.


Ok((StatusCode::OK, Json(search_response)))
}

#[tracing::instrument(level = "debug", skip_all, ret, err(Debug))]
async fn get_search_history_handler(
State(pool): State<PgPool>,
Query(search_history_request): Query<SearchHistoryRequest>,
) -> crate::Result<impl IntoResponse> {
let search_history = sqlx::query_as!(
SearchHistory,
"select * from search_history order by created_at desc limit $1 offset $2",
search_history_request.limit.unwrap_or(10) as i64,
search_history_request.offset.unwrap_or(0) as i64
)
.fetch_all(&pool)
.await
.map_err(|e| AppError::from(e))?;

Ok((StatusCode::OK, Json(search_history)))
}

pub fn routes() -> Router<AppState> {
Router::new()
.route("/", get(get_search_handler))
.route("/history", get(get_search_history_handler))
}
70 changes: 70 additions & 0 deletions server/src/search/services.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use crate::err::AppError;
use crate::search::{SearchHistory, SearchQueryRequest, SearchResponse};
use color_eyre::eyre::eyre;
use redis::aio::MultiplexedConnection;
use redis::AsyncCommands;
use sqlx::PgPool;

#[tracing::instrument(level = "debug", ret, err)]
pub async fn search(
cache: &mut MultiplexedConnection,
search_query: &SearchQueryRequest,
) -> crate::Result<SearchResponse> {
let cache_response: Option<String> = cache
.get(search_query.query.clone())
.await
.map_err(|e| AppError::from(e))?;

let cache_response = match cache_response {
Some(response) => response,
None => String::new(),
};

let cache_response: Option<SearchResponse> =
serde_json::from_str(&cache_response).unwrap_or(None);

if let Some(response) = cache_response {
return Ok(response);
}

// sleep for 3 seconds to simulate a slow search
// TODO: replace this with actual search logic using GRPC calls with backend services
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

let response = SearchResponse {
response_text: "sample response".to_string(),
response_sources: vec!["www.source1.com".to_string(), "www.source2.com".to_string()],
};

return Ok(response);
}

#[tracing::instrument(level = "debug", ret, err)]
pub async fn insert_search_history(
pool: &PgPool,
cache: &mut MultiplexedConnection,
search_query: &SearchQueryRequest,
search_response: &SearchResponse,
) -> crate::Result<SearchHistory> {
cache
.set(
&search_query.query,
serde_json::to_string(&search_response)
.map_err(|_| eyre!("unable to convert string to json"))?,
)
.await
.map_err(|e| AppError::from(e))?;
Comment on lines +70 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noice!
We'll find an approach to do this as simple as cache.set(key, value).await? later 👌


let search_history = sqlx::query_as!(
SearchHistory,
"insert into search_history (search_text, response_text, response_sources) values ($1, $2, $3) returning *",
&search_query.query,
&search_response.response_text,
&search_response.response_sources
)
.fetch_one(pool)
.await
.map_err(|e| AppError::from(e))?;

return Ok(search_history);
}
1 change: 1 addition & 0 deletions server/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pub struct Settings {
pub host: String,
pub port: u16,
pub db: Secret<String>,
pub cache: Secret<String>,
}

impl Settings {
Expand Down
Loading