Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions grafana/dashboard/dev/cluster_essential.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,26 @@ def _(outer_panels: Panels):
"remote storage error {{type}}: {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
# We add a small constant 0.05 to make sure that the counter jumps from null to not-null,
# the line will be flat at y=0.05 instead of disappearing.
panels.target(
f"sum(irate({metric('user_compute_error_cnt')}[$__rate_interval])) by (error_type, executor_name, fragment_id) or "
+ f"sum({metric('user_compute_error_cnt')}) by (error_type, executor_name, fragment_id) * 0 + 0.05 "
+ f"unless on({COMPONENT_LABEL}, {NODE_LABEL}) ((absent_over_time({metric('user_compute_error_cnt')}[20s])) > 0)",
"{{error_type}} @ {{executor_name}} (fragment_id={{fragment_id}})",
),
panels.target(
f"sum(irate({metric('user_source_error_cnt')}[$__rate_interval])) by (error_type, source_id, source_name, fragment_id) or "
+ f"sum({metric('user_source_error_cnt')}) by (error_type, source_id, source_name, fragment_id) * 0 + 0.05 "
+ f"unless on({COMPONENT_LABEL}, {NODE_LABEL}) ((absent_over_time({metric('user_source_error_cnt')}[20s])) > 0)",
"{{error_type}} @ {{source_name}} (source_id={{source_id}} fragment_id={{fragment_id}})",
),
panels.target(
f"sum(irate({metric('user_sink_error_cnt')}[$__rate_interval])) by (error_type, sink_id, sink_name, fragment_id) or "
+ f"sum({metric('user_sink_error_cnt')}) by (error_type, sink_id, sink_name, fragment_id) * 0 + 0.05 "
+ f"unless on({COMPONENT_LABEL}, {NODE_LABEL}) ((absent_over_time({metric('user_sink_error_cnt')}[20s])) > 0)",
"{{error_type}} @ {{sink_name}} (sink_id={{sink_id}} fragment_id={{fragment_id}})",
),
],
),
panels.subheader("User Streaming Errors"),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion src/common/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ itertools = { workspace = true }
parking_lot = { workspace = true }
pin-project-lite = { workspace = true }
prometheus = { version = "0.14" }
rw_iter_util = { workspace = true }
rw_resource_util = { workspace = true }
serde = { version = "1", features = ["derive"] }
thiserror-ext = { workspace = true }
Expand Down
138 changes: 26 additions & 112 deletions src/common/metrics/src/error_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,150 +12,64 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::{Arc, LazyLock};
use std::sync::LazyLock;

use itertools::Itertools;
use parking_lot::Mutex;
use prometheus::Registry;
use prometheus::core::{Collector, Desc};
use prometheus::proto::{Gauge, LabelPair, Metric, MetricFamily};
use rw_iter_util::ZipEqFast;
use prometheus::{IntCounterVec, Registry, register_int_counter_vec_with_registry};

use crate::monitor::GLOBAL_METRICS_REGISTRY;

#[derive(Clone)]
pub struct ErrorMetric<const N: usize> {
payload: Arc<Mutex<HashMap<[String; N], u32>>>,
desc: Desc,
inner: IntCounterVec,
}

impl<const N: usize> ErrorMetric<N> {
pub fn new(name: &str, help: &str, label_names: &[&str; N]) -> Self {
pub fn new(name: &str, help: &str, label_names: &[&str; N], registry: &Registry) -> Self {
Self {
payload: Default::default(),
desc: Desc::new(
name.to_owned(),
help.to_owned(),
label_names.iter().map(|l| l.to_string()).collect_vec(),
Default::default(),
)
.unwrap(),
inner: register_int_counter_vec_with_registry!(name, help, label_names, registry)
.unwrap(),
}
}

pub fn report(&self, labels: [String; N]) {
let mut m = self.payload.lock();
let v = m.entry(labels).or_default();
*v += 1;
}

fn collect(&self) -> MetricFamily {
let mut m = MetricFamily::default();
m.set_name(self.desc.fq_name.clone());
m.set_help(self.desc.help.clone());
m.set_field_type(prometheus::proto::MetricType::GAUGE);

let payload = self.payload.lock().drain().collect_vec();
let mut metrics = Vec::with_capacity(payload.len());
for (labels, count) in payload {
let mut label_pairs = Vec::with_capacity(self.desc.variable_labels.len());
for (name, label) in self.desc.variable_labels.iter().zip_eq_fast(labels) {
let mut label_pair = LabelPair::default();
label_pair.set_name(name.clone());
label_pair.set_value(label);
label_pairs.push(label_pair);
}

let mut metric = Metric::new();
metric.set_label(label_pairs);
let mut gauge = Gauge::default();
gauge.set_value(count as f64);
metric.set_gauge(gauge);
metrics.push(metric);
}
m.set_metric(metrics);
m
self.inner.with_label_values(&labels).inc();
}
}

pub type ErrorMetricRef<const N: usize> = Arc<ErrorMetric<N>>;

/// Metrics for counting errors in the system.
/// The detailed error messages are not supposed to be stored in the metrics, but in the logs.
///
/// Please avoid adding new error metrics here. Instead, introduce new `error_type` for new errors.
#[derive(Clone)]
pub struct ErrorMetrics {
pub user_sink_error: ErrorMetricRef<4>,
pub user_compute_error: ErrorMetricRef<3>,
pub user_source_error: ErrorMetricRef<4>,
pub user_sink_error: ErrorMetric<4>,
pub user_compute_error: ErrorMetric<3>,
pub user_source_error: ErrorMetric<4>,
}

impl ErrorMetrics {
pub fn new() -> Self {
pub fn new(registry: &Registry) -> Self {
Self {
user_sink_error: Arc::new(ErrorMetric::new(
"user_sink_error",
user_sink_error: ErrorMetric::new(
"user_sink_error_cnt",
"Sink errors in the system, queryable by tags",
&["error_type", "sink_id", "sink_name", "fragment_id"],
)),
user_compute_error: Arc::new(ErrorMetric::new(
"user_compute_error",
registry,
),
user_compute_error: ErrorMetric::new(
"user_compute_error_cnt",
"Compute errors in the system, queryable by tags",
&["error_type", "executor_name", "fragment_id"],
)),
user_source_error: Arc::new(ErrorMetric::new(
"user_source_error",
registry,
),
user_source_error: ErrorMetric::new(
"user_source_error_cnt",
"Source errors in the system, queryable by tags",
&["error_type", "source_id", "source_name", "fragment_id"],
)),
registry,
),
}
}

fn desc(&self) -> Vec<&Desc> {
vec![
&self.user_sink_error.desc,
&self.user_compute_error.desc,
&self.user_source_error.desc,
]
}

fn collect(&self) -> Vec<prometheus::proto::MetricFamily> {
vec![
self.user_sink_error.collect(),
self.user_compute_error.collect(),
self.user_source_error.collect(),
]
}
}

impl Default for ErrorMetrics {
fn default() -> Self {
ErrorMetrics::new()
}
}

pub struct ErrorMetricsCollector {
metrics: ErrorMetrics,
}

impl Collector for ErrorMetricsCollector {
fn desc(&self) -> Vec<&Desc> {
self.metrics.desc()
}

fn collect(&self) -> Vec<prometheus::proto::MetricFamily> {
self.metrics.collect()
}
}

pub fn monitor_errors(registry: &Registry, metrics: ErrorMetrics) {
let ec = ErrorMetricsCollector { metrics };
registry.register(Box::new(ec)).unwrap()
}

pub static GLOBAL_ERROR_METRICS: LazyLock<ErrorMetrics> = LazyLock::new(|| {
let e = ErrorMetrics::new();
monitor_errors(&GLOBAL_METRICS_REGISTRY, e.clone());
e
});
pub static GLOBAL_ERROR_METRICS: LazyLock<ErrorMetrics> =
LazyLock::new(|| ErrorMetrics::new(&GLOBAL_METRICS_REGISTRY));