Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurations to run benchmarks #2

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,13 @@ prometheus = "0.13.3"
actix-web = "4.4.0"

[[bin]]
doc = false
name = "controller"
path = "src/main.rs"

[[bin]]
doc = false
name = "crdgen"
path = "src/crdgen.rs"

[lib]
name = "controller"
path = "src/lib.rs"

1 change: 1 addition & 0 deletions benchmark/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.env
5 changes: 5 additions & 0 deletions benchmark/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM alpine:latest

RUN apk --no-cache add postgresql-contrib

ENTRYPOINT [ "pgbench" ]
58 changes: 58 additions & 0 deletions benchmark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Benchmark

This folder has a configuration to run benchmarks on Postgres dbsync. Docker is used to access the tool pgbench and bench.sql is some common queries.

## Compile docker image

To use the image is necessary to compile

```bash
docker build -t pgbench .
```

## Environment

The pgbench needs some environment variables to work, then create a file `.env` and set these envs below

| Key | Value |
| ---------- | ----- |
| PGDATABASE | |
| PGHOST | |
| PGPORT | |
| PGUSER | |
| PGPASSWORD | |

## Run benchmark

To run the benchmark it's necessary to run the docker image compiled before, but it's necessary to use some parameters of pgbench.

```bash
docker run --env-file .env --network host --volume ./bench.sql:/bench.sql pgbench:latest -c 10 -T 5 -n -f /bench.sql
```

- `-c` concurrences users
- `-T` execution time(seconds)
- `-n` enable for the custom scripts
- `-f` script path

more parameters
https://www.postgresql.org/docs/devel/pgbench.html

### Metrics example

The return when the command is finished

```
transaction type: /bench.sql
scaling factor: 1
query mode: simple
number of clients: 4
number of threads: 1
maximum number of tries: 1
duration: 10 s
number of transactions actually processed: 16
number of failed transactions: 0 (0.000%)
latency average = 1562.050 ms
initial connection time = 3951.848 ms
tps = 2.560738 (without initial connection time)
```
163 changes: 163 additions & 0 deletions benchmark/bench.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
--- Select latest epoch parameters
select
*
from
epoch_param ep
order by
ep.epoch_no desc
limit 1;

--- Select assets by policy and apply filters by metadata
with asset as (
select
ma_tx_mint.tx_id,
encode(multi_asset.name, 'escape') as name,
encode(multi_asset.policy, 'hex') as policy,
multi_asset.fingerprint
from
multi_asset
inner join
ma_tx_mint on
ma_tx_mint.ident = multi_asset.id
where
multi_asset.policy = '\x8f80ebfaf62a8c33ae2adf047572604c74db8bc1daba2b43f9a65635'
),metadata as (
select
tx_metadata.tx_id,
tx_metadata.json as metadata
from
tx_metadata
where
tx_metadata.tx_id in (select tx_id from asset)
)
select
*,
count(*) over () as count
from
asset
inner join metadata on
asset.tx_id = metadata.tx_id
where
jsonb_path_query_array(metadata.metadata,'$.*.*.type') ?| array['Orc']
order by
asset.name asc
limit 20 offset 0;

--- Select total assets by policy from stake address
select
sum(ma_tx_out.quantity) as quantity,
encode(multi_asset.policy, 'hex') as policy
from
utxo_view
inner join
stake_address on stake_address.id = utxo_view.stake_address_id
inner join
ma_tx_out on ma_tx_out.tx_out_id = utxo_view.id
inner join
multi_asset on multi_asset.id = ma_tx_out.ident
where
stake_address."view" = 'stake1u90nkx5yw6qkpas3kxa0wcql93axph8fetw20l0j2ntszucgg4rr2'
and
multi_asset.policy = '\xb7761c472eef3b6e0505441efaf940892bb59c01be96070b0a0a89b3'
group by multi_asset.policy;

--- Select all assets from a stake address
select
ma_tx_out.tx_out_id,
ma_tx_out.quantity,
encode(multi_asset.name, 'escape') as name,
encode(multi_asset.policy, 'hex') as policy,
multi_asset.fingerprint,
tx_metadata.json as metadata,
count(*) over () as count
from
utxo_view
inner join
stake_address on stake_address.id = utxo_view.stake_address_id
inner join
ma_tx_out on ma_tx_out.tx_out_id = utxo_view.id
inner join
multi_asset on multi_asset.id = ma_tx_out.ident
inner join
ma_tx_mint on ma_tx_mint.ident = multi_asset.id
inner join
tx_metadata on tx_metadata.tx_id = ma_tx_mint.tx_id
where
stake_address.view = 'stake1u90nkx5yw6qkpas3kxa0wcql93axph8fetw20l0j2ntszucgg4rr2'
order by
multi_asset.name asc
limit 20 offset 1;

--- Select all utxos from a stake address
select
tx_out.id,
tx.hash,
tx_out.index,
tx_out.address,
tx_out.value
from
tx_out
left join
tx_in on tx_out.tx_id = tx_in.tx_out_id and tx_out.index::smallint = tx_in.tx_out_index::smallint
left join
tx on tx.id = tx_out.tx_id
left join
block on tx.block_id = block.id
inner join
stake_address on stake_address.id = tx_out.stake_address_id
where
tx_in.tx_in_id is null and
block.epoch_no is not null and
stake_address.view = 'stake1u90nkx5yw6qkpas3kxa0wcql93axph8fetw20l0j2ntszucgg4rr2';

--- Select slot number of the most recent block
select
slot_no
from
block
where
block_no is not null
order by
block_no desc
limit 1;

--- Select current valid pools
select
*
from
pool_update
where
registered_tx_id in (select max(registered_tx_id) from pool_update group by hash_id)
and
not exists(
select
*
from
pool_retire
where
pool_retire.hash_id = pool_update.hash_id
and
pool_retire.retiring_epoch <= (select max (epoch_no) from block)
);

--- Select the stake address for a given Shelley address
select
stake_address.id as stake_address_id,
tx_out.address,
stake_address.view as stake_address
from
tx_out
inner join
stake_address on tx_out.stake_address_id = stake_address.id
where
address = 'addr1q8u4wgd8qplhxpt4xm2l8yagy5ng7veurwrns2ysh03zuh2l8vdgga5pvrmprvd67asp7tr6vrwwnjku5l7ly4xhq9esr9h59t';

--- Select transaction outputs for specified transaction hash
select
tx_out.*
from
tx_out
inner join
tx on tx_out.tx_id = tx.id
where
tx.hash = '\xabd21556d9bb817d436e33a5fa32619702633dc809e707a5297566e9d74d57c1';
49 changes: 15 additions & 34 deletions src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
use crate::{
postgres::{self, user_already_exists, user_create, user_disable, user_enable},
Config, Error, Metrics, Result,
};
use futures::StreamExt;
use kube::{
api::{Patch, PatchParams},
Expand All @@ -20,6 +16,8 @@ use serde_json::json;
use std::{sync::Arc, time::Duration};
use tracing::error;

use crate::{postgres::Postgres, Config, Error, Metrics, State};

pub static DB_SYNC_PORT_FINALIZER: &str = "dbsyncports.demeter.run";

struct Context {
Expand All @@ -36,15 +34,6 @@ impl Context {
}
}
}
#[derive(Clone, Default)]
pub struct State {
registry: prometheus::Registry,
}
impl State {
pub fn metrics(&self) -> Vec<prometheus::proto::MetricFamily> {
self.registry.gather()
}
}

#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
pub enum Network {
Expand Down Expand Up @@ -75,11 +64,7 @@ impl DbSyncPort {
.unwrap_or(false)
}

async fn reconcile(
&self,
ctx: Arc<Context>,
pg_client: &mut tokio_postgres::Client,
) -> Result<Action> {
async fn reconcile(&self, ctx: Arc<Context>, pg: &mut Postgres) -> Result<Action, Error> {
let client = ctx.client.clone();
let ns = self.namespace().unwrap();
let name = self.name_any();
Expand All @@ -89,9 +74,9 @@ impl DbSyncPort {
let password = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);

if !self.was_executed() {
match user_already_exists(pg_client, &username).await? {
true => user_enable(pg_client, &username, &password).await?,
false => user_create(pg_client, &username, &password).await?,
match pg.user_already_exists(&username).await? {
true => pg.user_enable(&username, &password).await?,
false => pg.user_create(&username, &password).await?,
};

let new_status = Patch::Apply(json!({
Expand All @@ -114,19 +99,15 @@ impl DbSyncPort {
Ok(Action::requeue(Duration::from_secs(5 * 60)))
}

async fn cleanup(
&self,
ctx: Arc<Context>,
pg_client: &mut tokio_postgres::Client,
) -> Result<Action> {
async fn cleanup(&self, ctx: Arc<Context>, pg: &mut Postgres) -> Result<Action, Error> {
let username = self.status.as_ref().unwrap().username.clone();
user_disable(pg_client, &username).await?;
pg.user_disable(&username).await?;
ctx.metrics.count_user_deactivated(&username);
Ok(Action::await_change())
}
}

async fn reconcile(crd: Arc<DbSyncPort>, ctx: Arc<Context>) -> Result<Action> {
async fn reconcile(crd: Arc<DbSyncPort>, ctx: Arc<Context>) -> Result<Action, Error> {
let url = match crd.spec.network {
Network::Mainnet => &ctx.config.db_url_mainnet,
Network::Preprod => &ctx.config.db_url_preprod,
Expand All @@ -136,12 +117,12 @@ async fn reconcile(crd: Arc<DbSyncPort>, ctx: Arc<Context>) -> Result<Action> {
let ns = crd.namespace().unwrap();
let crds: Api<DbSyncPort> = Api::namespaced(ctx.client.clone(), &ns);

let mut pg_client = postgres::connect(url).await?;
let mut postgres = Postgres::new(url).await?;

finalizer(&crds, DB_SYNC_PORT_FINALIZER, crd, |event| async {
match event {
Event::Apply(crd) => crd.reconcile(ctx.clone(), &mut pg_client).await,
Event::Cleanup(crd) => crd.cleanup(ctx.clone(), &mut pg_client).await,
Event::Apply(crd) => crd.reconcile(ctx.clone(), &mut postgres).await,
Event::Cleanup(crd) => crd.cleanup(ctx.clone(), &mut postgres).await,
}
})
.await
Expand All @@ -154,11 +135,11 @@ fn error_policy(crd: Arc<DbSyncPort>, err: &Error, ctx: Arc<Context>) -> Action
Action::requeue(Duration::from_secs(5))
}

pub async fn run(state: State, config: Config) -> Result<(), Error> {
pub async fn run(state: Arc<State>, config: Config) -> Result<(), Error> {
let client = Client::try_default().await?;
let crds = Api::<DbSyncPort>::all(client.clone());
let metrics = Metrics::default().register(&state.registry).unwrap();
let ctx = Context::new(client, metrics, config);

let ctx = Context::new(client, state.metrics.clone(), config);

Controller::new(crds, WatcherConfig::default().any_semantic())
.shutdown_on_signal()
Expand Down
1 change: 1 addition & 0 deletions src/crdgen.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use ext_cardano_dbsync::controller;
use kube::CustomResourceExt;

fn main() {
Expand Down
Loading