Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add memory in state to API service #420

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

This file was deleted.

2 changes: 1 addition & 1 deletion backend-rust/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ ccdscan-api --help
Example:

```
cargo run --bin ccdscan-api
cargo run --bin ccdscan-api -- --node https://grpc.testnet.concordium.com:20000
```

<!-- TODO When service become stable: add documentation of arguments and environment variables. -->
Expand Down
8 changes: 0 additions & 8 deletions backend-rust/migrations/0001_initialize.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,6 @@ CREATE TYPE module_reference_contract_link_action AS ENUM (
'Removed'
);

-- Consensus status.
CREATE TABLE current_consensus_status(
-- Duration of an epoch in milliseconds of the current consensus algorithm.
epoch_duration
BIGINT
NOT NULL
);

-- Every block on chain.
CREATE TABLE blocks(
-- The absolute height of the block.
Expand Down
100 changes: 89 additions & 11 deletions backend-rust/src/bin/ccdscan-api.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
use anyhow::Context;
use async_graphql::SDLExportOptions;
use clap::Parser;
use concordium_scan::{graphql_api, router};
use concordium_rust_sdk::{
types::ProtocolVersion,
v2::{self, BlockIdentifier, ChainParameters},
};
use concordium_scan::{
graphql_api::{self, MemoryState, ServiceConfig},
router,
};
use prometheus_client::{
metrics::{family::Family, gauge::Gauge},
registry::Registry,
};
use sqlx::postgres::PgPoolOptions;
use std::{net::SocketAddr, path::PathBuf};
use std::{net::SocketAddr, path::PathBuf, time::Duration};
use tokio::net::TcpListener;
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
Expand All @@ -19,40 +26,108 @@ struct Cli {
/// Use an environment variable when the connection contains a password, as
/// command line arguments are visible across OS processes.
#[arg(long, env = "DATABASE_URL")]
database_url: String,
database_url: String,
#[arg(long, env = "DATABASE_RETRY_DELAY_SECS", default_value_t = 5)]
database_retry_delay_secs: u64,
/// Minimum number of connections in the pool.
#[arg(long, env = "DATABASE_MIN_CONNECTIONS", default_value_t = 5)]
min_connections: u32,
min_connections: u32,
/// Maximum number of connections in the pool.
#[arg(long, env = "DATABASE_MAX_CONNECTIONS", default_value_t = 10)]
max_connections: u32,
max_connections: u32,
/// Output the GraphQL Schema for the API to this path.
#[arg(long)]
schema_out: Option<PathBuf>,
schema_out: Option<PathBuf>,
/// Address to listen to for API requests.
#[arg(long, env = "CCDSCAN_API_ADDRESS", default_value = "127.0.0.1:8000")]
listen: SocketAddr,
listen: SocketAddr,
/// Address to listen for monitoring related requests
#[arg(long, env = "CCDSCAN_API_MONITORING_ADDRESS", default_value = "127.0.0.1:8003")]
monitoring_listen: SocketAddr,
monitoring_listen: SocketAddr,
#[command(flatten, next_help_heading = "Configuration")]
api_config: graphql_api::ApiServiceConfig,
api_config: graphql_api::ApiServiceConfig,
#[arg(
long = "log-level",
default_value = "info",
help = "The maximum log level. Possible values are: `trace`, `debug`, `info`, `warn`, and \
`error`.",
env = "LOG_LEVEL"
)]
log_level: tracing_subscriber::filter::LevelFilter,
log_level: tracing_subscriber::filter::LevelFilter,
/// Concordium node URL to a caught-up node.
#[arg(long, env = "CCDSCAN_API_GRPC_ENDPOINT", default_value = "http://localhost:20000")]
node: v2::Endpoint,
/// Request timeout in seconds when querying a Concordium Node.
#[arg(long, env = "CCDSCAN_API_NODE_REQUEST_TIMEOUT", default_value = "60")]
node_request_timeout: u64,
/// Connection timeout in seconds when connecting a Concordium Node.
#[arg(long, env = "CCDSCAN_API_NODE_CONNECT_TIMEOUT", default_value = "10")]
node_connect_timeout: u64,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let _ = dotenvy::dotenv();
let cli = Cli::parse();

let endpoint = if cli
.node
.uri()
.scheme()
.map_or(false, |x| x == &concordium_rust_sdk::v2::Scheme::HTTPS)
{
cli.node
.clone()
.tls_config(tonic::transport::ClientTlsConfig::new())
.context("Unable to construct TLS configuration for the Concordium node.")?
} else {
cli.node.clone()
};
let endpoint: v2::Endpoint = endpoint
.timeout(Duration::from_secs(cli.node_request_timeout))
.connect_timeout(Duration::from_secs(cli.node_connect_timeout));

let mut client = v2::Client::new(endpoint).await?;
// Get the current block.
let current_block = client.get_block_info(BlockIdentifier::LastFinal).await?.response;
// We ensure that the connected node has caught up with the current protocol
// version 8. This ensures that the parameters `current_epoch_duration` and
// `current_reward_period_length` are available.
if current_block.protocol_version < ProtocolVersion::P8 {
anyhow::bail!(
"Ensure the connected node has caught up with the current protocol version 8.
This ensures that the `current_epoch_duration` and `current_reward_period_length` are \
available to be queried"
);
}

// Get the current `epoch_duration` value.
let current_epoch_duration = client.get_consensus_info().await?.epoch_duration;

// Get the current `reward_period_length` value.
let current_chain_parmeters =
client.get_block_chain_parameters(BlockIdentifier::LastFinal).await?.response;
let current_reward_period_length = match current_chain_parmeters {
ChainParameters::V3(chain_parameters_v3) => {
chain_parameters_v3.time_parameters.reward_period_length
}
ChainParameters::V2(chain_parameters_v2) => {
chain_parameters_v2.time_parameters.reward_period_length
}
ChainParameters::V1(chain_parameters_v1) => {
chain_parameters_v1.time_parameters.reward_period_length
}
_ => todo!(
"Expect the chain to have caught up enought for the `reward_period_length` value \
being available."
),
};

let memory_state = MemoryState {
current_epoch_duration,
current_reward_period_length,
};

tracing_subscriber::fmt().with_max_level(cli.log_level).init();
let pool = PgPoolOptions::new()
.min_connections(cli.min_connections)
Expand Down Expand Up @@ -87,7 +162,10 @@ async fn main() -> anyhow::Result<()> {

let mut queries_task = {
let pool = pool.clone();
let service = graphql_api::Service::new(subscription, &mut registry, pool, cli.api_config);
let service = graphql_api::Service::new(subscription, &mut registry, pool, ServiceConfig {
api_config: cli.api_config,
memory_state,
});
if let Some(schema_file) = cli.schema_out {
info!("Writing schema to {}", schema_file.to_string_lossy());
std::fs::write(
Expand Down
82 changes: 64 additions & 18 deletions backend-rust/src/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ macro_rules! todo_api {
pub(crate) use todo_api;

use crate::{
scalar_types::{BlockHeight, DateTime, TimeSpan},
scalar_types::{
BlockHeight, DateTime, RewardPeriodLength as ScalarRewardPeriodLength, TimeSpan,
},
transaction_event::smart_contracts::InvalidContractVersionError,
};
use account::Account;
Expand All @@ -36,9 +38,10 @@ use async_graphql::{
};
use async_graphql_axum::GraphQLSubscription;
use block::Block;
use chrono::Duration;
use chrono::{Duration, TimeDelta};
use concordium_rust_sdk::{
base::contracts_common::schema::VersionedSchemaError, id::types as sdk_types,
types::RewardPeriodLength,
};
use derive_more::Display;
use futures::prelude::*;
Expand Down Expand Up @@ -158,6 +161,26 @@ pub struct Query(
token::QueryToken,
);

/// This state is not persisted in the database and will be re-created at
/// start-up of this service.
#[derive(Copy, Clone)]
pub struct MemoryState {
// Duration of an epoch of the current consensus algorithm.
// E.g. This value is 1 hour for testnet in protocol version 8.
pub current_epoch_duration: TimeDelta,
// Number of epochs between reward payout to happen.
// E.g. This value is 24 for testnet in protocol version 8.
// E.g. This means after 24 hours a new payday block is happening on testnet with reward
// payouts.
pub current_reward_period_length: RewardPeriodLength,
}

/// The service config with state in memory and the api configuration.
pub struct ServiceConfig {
pub api_config: ApiServiceConfig,
pub memory_state: MemoryState,
}

pub struct Service {
pub schema: Schema<Query, EmptyMutation, Subscription>,
}
Expand All @@ -166,7 +189,7 @@ impl Service {
subscription: Subscription,
registry: &mut Registry,
pool: PgPool,
config: ApiServiceConfig,
config: ServiceConfig,
) -> Self {
let schema = Schema::build(Query::default(), EmptyMutation, subscription)
.extension(async_graphql::extensions::Tracing)
Expand Down Expand Up @@ -363,10 +386,12 @@ mod monitor {
pub enum ApiError {
#[error("Could not find resource")]
NotFound,
#[error("Internal error: {}", .0.message)]
#[error("Internal error (no NoDatabasePool): {}", .0.message)]
NoDatabasePool(async_graphql::Error),
#[error("Internal error: {}", .0.message)]
#[error("Internal error (no NoServiceConfig): {}", .0.message)]
NoServiceConfig(async_graphql::Error),
#[error("Internal error (no MemoryState): {}", .0.message)]
MemoryState(async_graphql::Error),
#[error("Internal error: {0}")]
FailedDatabaseQuery(Arc<sqlx::Error>),
#[error("Invalid ID format: {0}")]
Expand Down Expand Up @@ -400,7 +425,14 @@ pub fn get_pool<'a>(ctx: &Context<'a>) -> ApiResult<&'a PgPool> {

/// Get service configuration from the context.
pub fn get_config<'a>(ctx: &Context<'a>) -> ApiResult<&'a ApiServiceConfig> {
ctx.data::<ApiServiceConfig>().map_err(ApiError::NoServiceConfig)
let service_config = ctx.data::<ServiceConfig>().map_err(ApiError::NoServiceConfig)?;
Ok(&service_config.api_config)
}

/// Get the memory state from the context.
pub fn get_memory_state<'a>(ctx: &Context<'a>) -> ApiResult<&'a MemoryState> {
let service_config = ctx.data::<ServiceConfig>().map_err(ApiError::NoServiceConfig)?;
Ok(&service_config.memory_state)
}

trait ConnectionCursor {
Expand Down Expand Up @@ -474,23 +506,34 @@ impl BaseQuery {
}

async fn import_state<'a>(&self, ctx: &Context<'a>) -> ApiResult<ImportState> {
let epoch_duration =
sqlx::query_scalar!("SELECT epoch_duration FROM current_consensus_status")
.fetch_optional(get_pool(ctx)?)
.await?
.ok_or(ApiError::NotFound)?;
let memory_state = get_memory_state(ctx)?;

Ok(ImportState {
epoch_duration: TimeSpan(
Duration::try_milliseconds(epoch_duration).ok_or(ApiError::InternalError(
"Epoch duration (epoch_duration) in the database should be a valid duration \
in milliseconds."
.to_string(),
))?,
Duration::try_milliseconds(memory_state.current_epoch_duration.num_milliseconds())
.ok_or(ApiError::InternalError(
"Epoch duration in the memory state should be a valid duration in \
milliseconds."
.to_string(),
))?,
),
})
}

async fn latest_chain_parameters<'a>(
&self,
ctx: &Context<'a>,
) -> ApiResult<LatestChainParameters> {
let memory_state = get_memory_state(ctx)?;

Ok(LatestChainParameters {
reward_period_length: memory_state
.current_reward_period_length
.reward_period_epochs()
.epoch,
})
}

async fn tokens(
&self,
ctx: &Context<'_>,
Expand Down Expand Up @@ -586,8 +629,6 @@ impl BaseQuery {
// poolRewardMetricsForBakerPool(bakerId: ID! period: MetricsPeriod!):
// PoolRewardMetrics! passiveDelegation: PassiveDelegation
// paydayStatus: PaydayStatus
// latestChainParameters: ChainParameters
// importState: ImportState
// nodeStatuses(sortField: NodeSortField! sortDirection: NodeSortDirection!
// "Returns the first _n_ elements from the list." first: Int "Returns the
// elements in the list that come after the specified cursor." after: String
Expand Down Expand Up @@ -749,6 +790,11 @@ pub struct AccountsUpdatedSubscriptionItem {
address: String,
}

#[derive(SimpleObject)]
struct LatestChainParameters {
reward_period_length: ScalarRewardPeriodLength,
}

#[derive(SimpleObject)]
struct ImportState {
epoch_duration: TimeSpan,
Expand Down
11 changes: 0 additions & 11 deletions backend-rust/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,8 +707,6 @@ async fn save_genesis_data(endpoint: v2::Endpoint, pool: &PgPool) -> anyhow::Res
let mut tx = pool.begin().await.context("Failed to create SQL transaction")?;
let genesis_height = v2::BlockIdentifier::AbsoluteHeight(0.into());
{
let epoch_duration = client.get_consensus_info().await?.epoch_duration.num_milliseconds();

let genesis_block_info = client.get_block_info(genesis_height).await?.response;
let block_hash = genesis_block_info.block_hash.to_string();
let slot_time = genesis_block_info.block_slot_time;
Expand Down Expand Up @@ -746,15 +744,6 @@ async fn save_genesis_data(endpoint: v2::Endpoint, pool: &PgPool) -> anyhow::Res
)
.execute(&mut *tx)
.await?;

sqlx::query!(
"INSERT INTO current_consensus_status (
epoch_duration
) VALUES ($1);",
epoch_duration
)
.execute(&mut *tx)
.await?;
}

let mut genesis_accounts = client.get_account_list(genesis_height).await?.response;
Expand Down
Loading
Loading