Skip to content

Commit d831a56

Browse files
authored
Merge pull request #34157 from vespa-engine/vekterli/content-node-message-stats
Track, aggregate and report content node response (network) errors
2 parents f227f0d + 3d6e156 commit d831a56

53 files changed

Lines changed: 1671 additions & 101 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

clustercontroller-apps/src/main/java/com/yahoo/vespa/clustercontroller/apps/clustercontroller/ClusterControllerClusterConfigurer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ private static void configure(FleetControllerOptions.Builder builder, Fleetcontr
9999
builder.setClusterFeedBlockNoiseLevel(config.cluster_feed_block_noise_level());
100100
builder.setMaxNumberOfGroupsAllowedToBeDown(config.max_number_of_groups_allowed_to_be_down());
101101
builder.setIncludeDistributionConfigInClusterStateBundles(config.include_distribution_config_in_cluster_state_bundle());
102+
builder.setAggregateContentNodeErrorReportsFromDistributors(config.aggregate_content_node_error_reports_from_distributors());
102103
}
103104

104105
private static void configure(FleetControllerOptions.Builder builder, SlobroksConfig config) {

clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AggregatedClusterStats.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ public interface AggregatedClusterStats {
1010

1111
ContentClusterStats getStats();
1212

13+
ContentClusterErrorStats getErrorStats();
14+
1315
ContentNodeStats getGlobalStats();
1416

1517
}

clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateView.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ static Set<Integer> getIndicesOfUpNodes(ClusterState clusterState, NodeType type
7171

7272
public ClusterState getClusterState() { return clusterState; }
7373

74-
public void handleUpdatedHostInfo(NodeInfo node, HostInfo hostInfo) {
74+
public void handleUpdatedHostInfo(NodeInfo node, HostInfo hostInfo, boolean aggregateErrorReports) {
7575
if ( ! node.isDistributor()) return;
7676

7777
final int hostVersion;
@@ -85,6 +85,17 @@ public void handleUpdatedHostInfo(NodeInfo node, HostInfo hostInfo) {
8585
}
8686
int currentStateVersion = clusterState.getVersion();
8787

88+
if (aggregateErrorReports) {
89+
var errorStats = StorageNodeStatsBridge.generateErrors(node.getNodeIndex(), hostInfo.getDistributor());
90+
// Error statistics are always updated, even if the node has not ACKed the latest cluster
91+
// state version. Otherwise, we will never be able to observe errors that prevent convergence!
92+
statsAggregator.updateErrorStatsFromDistributor(node.getNodeIndex(), errorStats);
93+
} else {
94+
// If there's an edge for disabling aggregation, clear out any existing statistics.
95+
// No-op if already disabled.
96+
statsAggregator.clearAllErrorStatsFromDistributors();
97+
}
98+
8899
if (hostVersion != currentStateVersion) {
89100
// The distributor may be old (null), or the distributor may not have updated
90101
// to the latest state version just yet. We log here with fine, because it may
@@ -95,8 +106,12 @@ public void handleUpdatedHostInfo(NodeInfo node, HostInfo hostInfo) {
95106
return;
96107
}
97108

98-
statsAggregator.updateForDistributor(node.getNodeIndex(),
99-
StorageNodeStatsBridge.generate(hostInfo.getDistributor()));
109+
var stats = StorageNodeStatsBridge.generate(hostInfo.getDistributor());
110+
statsAggregator.updateForDistributor(node.getNodeIndex(), stats);
111+
}
112+
113+
public void handleUpdatedHostInfo(NodeInfo node, HostInfo hostInfo) {
114+
handleUpdatedHostInfo(node, hostInfo, false);
100115
}
101116

102117
public ClusterStatsAggregator getStatsAggregator() {

clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStatsAggregator.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,18 @@ public class ClusterStatsAggregator {
3333
// Maps the distributor node index to a map of content node index to the
3434
// content node's stats.
3535
private final Map<Integer, ContentClusterStats> distributorToStats = new HashMap<>();
36+
private final Map<Integer, ContentClusterErrorStats> distributorToErrorStats = new HashMap<>();
3637

3738
// This is only needed as an optimization. Is just the sum of distributorToStats' ContentClusterStats.
3839
// Maps the content node index to the content node stats for that node.
3940
// This MUST be kept up-to-date with distributorToStats;
4041
private final ContentClusterStats aggregatedStats;
42+
// Aggregate of all error reports received from all distributors for all content nodes.
43+
// Makes it cheap to determine which distributors, if any, have observed errors towards
44+
// any given content node in the current cluster state. This conceptually mirrors
45+
// `aggregatedStats`, but with an extra internal indirection that is expected to be
46+
// very sparse in a healthy system.
47+
private final ContentClusterErrorStats aggregatedErrorStats;
4148
// This is the aggregate of aggregates across content nodes, allowing a reader to
4249
// get a O(1) view of all merges pending in the cluster.
4350
private final ContentNodeStats globallyAggregatedNodeStats = new ContentNodeStats(-1);
@@ -49,6 +56,7 @@ public class ClusterStatsAggregator {
4956
this.distributors = distributors;
5057
nonUpdatedDistributors = new HashSet<>(distributors);
5158
aggregatedStats = new ContentClusterStats(storageNodes);
59+
aggregatedErrorStats = new ContentClusterErrorStats(storageNodes);
5260
}
5361

5462
public AggregatedClusterStats getAggregatedStats() {
@@ -64,6 +72,11 @@ public ContentClusterStats getStats() {
6472
return aggregatedStats;
6573
}
6674

75+
@Override
76+
public ContentClusterErrorStats getErrorStats() {
77+
return aggregatedErrorStats;
78+
}
79+
6780
@Override
6881
public ContentNodeStats getGlobalStats() {
6982
return globallyAggregatedNodeStats;
@@ -105,6 +118,37 @@ void updateForDistributor(int distributorIndex, ContentClusterStats clusterStats
105118
addStatsFromDistributor(distributorIndex, clusterStats);
106119
}
107120

121+
void updateErrorStatsFromDistributor(int distributorIndex, ContentClusterErrorStats updatedErrorStats) {
122+
if (!distributors.contains(distributorIndex)) {
123+
return;
124+
}
125+
// This does _not_ remove the distributor from `nonUpdatedDistributors` since this
126+
// method will be called even if the distributor has not yet converged to the newest
127+
// cluster state.
128+
// It also does not implicitly put an entry into the `distributorToStats` mapping; error
129+
// stats are kept entirely separate.
130+
ContentClusterErrorStats prevErrorStats = distributorToErrorStats.put(distributorIndex, updatedErrorStats);
131+
for (var contentNodeEntry : aggregatedErrorStats.contentNodeErrorStats().entrySet()) {
132+
var nodeIndex = contentNodeEntry.getKey();
133+
var contentNode = contentNodeEntry.getValue();
134+
ContentNodeErrorStats statsToAdd = updatedErrorStats.getNodeErrorStats(nodeIndex);
135+
if (statsToAdd != null) {
136+
contentNode.addErrorStatsFrom(distributorIndex, statsToAdd);
137+
}
138+
if (prevErrorStats != null) {
139+
ContentNodeErrorStats statsToSubtract = prevErrorStats.getNodeErrorStats(nodeIndex);
140+
if (statsToSubtract != null) {
141+
contentNode.subtractErrorStatsFrom(distributorIndex, statsToSubtract);
142+
}
143+
}
144+
}
145+
}
146+
147+
void clearAllErrorStatsFromDistributors() {
148+
distributorToErrorStats.clear();
149+
aggregatedErrorStats.clearAllStats();
150+
}
151+
108152
private void addStatsFromDistributor(int distributorIndex, ContentClusterStats clusterStats) {
109153
ContentClusterStats prevClusterStats = distributorToStats.put(distributorIndex, clusterStats);
110154

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
2+
package com.yahoo.vespa.clustercontroller.core;
3+
4+
import java.util.Arrays;
5+
import java.util.HashMap;
6+
import java.util.Map;
7+
import java.util.Objects;
8+
import java.util.Set;
9+
10+
/**
11+
* Stores, for each content node, the aggregate error reports from all distributors in
12+
* a content cluster, for a particular cluster state version (or attempt at reaching
13+
* a state version in the case the cluster does not converge). The per-node statistics
14+
* are sparse, so in a healthy cluster this is expected to be a lightweight mapping
15+
* from all nodes -> empty ContentNodeErrorStats instances.
16+
*
17+
* @author vekterli
18+
*/
19+
public record ContentClusterErrorStats(Map<Integer, ContentNodeErrorStats> contentNodeErrorStats) {
20+
21+
private static Map<Integer, ContentNodeErrorStats> toEmptyErrorStats(Set<Integer> storageNodes) {
22+
Map<Integer, ContentNodeErrorStats> contentNodeErrorStats = new HashMap<>(storageNodes.size());
23+
for (Integer index : storageNodes) {
24+
contentNodeErrorStats.put(index, new ContentNodeErrorStats(index));
25+
}
26+
return contentNodeErrorStats;
27+
}
28+
29+
public ContentClusterErrorStats(Set<Integer> storageNodes) {
30+
this(toEmptyErrorStats(storageNodes));
31+
}
32+
33+
public ContentNodeErrorStats getNodeErrorStats(Integer index) {
34+
return contentNodeErrorStats.get(index);
35+
}
36+
37+
public void clearAllStats() {
38+
contentNodeErrorStats.clear();
39+
}
40+
41+
}

clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ContentClusterStats.java

Lines changed: 13 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -13,30 +13,26 @@
1313
*
1414
* @author hakonhall
1515
*/
16-
public class ContentClusterStats implements Iterable<ContentNodeStats> {
17-
18-
private final long documentCountTotal;
19-
private final long bytesTotal;
20-
// Maps a content node index to the content node's stats.
21-
private final Map<Integer, ContentNodeStats> mapToNodeStats;
22-
23-
public ContentClusterStats(long documentCountTotal, long bytesTotal, Set<Integer> storageNodes) {
24-
this.documentCountTotal = documentCountTotal;
25-
this.bytesTotal = bytesTotal;
26-
mapToNodeStats = new HashMap<>(storageNodes.size());
16+
public record ContentClusterStats(long documentCountTotal,
17+
long bytesTotal,
18+
// Maps a content node index to the content node's stats.
19+
Map<Integer, ContentNodeStats> mapToNodeStats)
20+
implements Iterable<ContentNodeStats> {
21+
22+
private static Map<Integer, ContentNodeStats> toEmptyNodeStats(Set<Integer> storageNodes) {
23+
Map<Integer, ContentNodeStats> mapToNodeStats = new HashMap<>(storageNodes.size());
2724
for (Integer index : storageNodes) {
2825
mapToNodeStats.put(index, new ContentNodeStats(index));
2926
}
27+
return mapToNodeStats;
3028
}
3129

32-
public ContentClusterStats(Set<Integer> storageNodes) {
33-
this(0, 0, storageNodes);
30+
public ContentClusterStats(long documentCountTotal, long bytesTotal, Set<Integer> storageNodes) {
31+
this(documentCountTotal, bytesTotal, toEmptyNodeStats(storageNodes));
3432
}
3533

36-
public ContentClusterStats(long documentCountTotal, long bytesTotal, Map<Integer, ContentNodeStats> mapToNodeStats) {
37-
this.documentCountTotal = documentCountTotal;
38-
this.bytesTotal = bytesTotal;
39-
this.mapToNodeStats = mapToNodeStats;
34+
public ContentClusterStats(Set<Integer> storageNodes) {
35+
this(0, 0, storageNodes);
4036
}
4137

4238
public ContentClusterStats(Map<Integer, ContentNodeStats> mapToNodeStats) {
@@ -57,21 +53,4 @@ public int size() {
5753
return mapToNodeStats.size();
5854
}
5955

60-
@Override
61-
public boolean equals(Object o) {
62-
if (this == o) return true;
63-
if (o == null || getClass() != o.getClass()) return false;
64-
ContentClusterStats that = (ContentClusterStats) o;
65-
return Objects.equals(mapToNodeStats, that.mapToNodeStats);
66-
}
67-
68-
@Override
69-
public int hashCode() {
70-
return Objects.hash(mapToNodeStats);
71-
}
72-
73-
@Override
74-
public String toString() {
75-
return String.format("{mapToNodeStats=[%s]}", Arrays.toString(mapToNodeStats.entrySet().toArray()));
76-
}
7756
}

0 commit comments

Comments
 (0)