Skip to content

Commit cf78257

Browse files
committed
Adds a instrumented HttpShardHandlerFactory
- Reports metrics for dead shards and http destination request queues - Makes heavy use of reflection (and is thus not ready for upstream yet)
1 parent ed2bf22 commit cf78257

File tree

2 files changed

+235
-0
lines changed

2 files changed

+235
-0
lines changed

gradle/solr/solr-forbidden-apis.gradle

+3
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,8 @@ configure(project(":solr:core")) {
2222
tasks.matching { it.name == "forbiddenApisMain" || it.name == "forbiddenApisTest" }.all {
2323
exclude "org/apache/solr/internal/**"
2424
exclude "org/apache/hadoop/**"
25+
26+
// uses reflection for now, needs revision
27+
exclude "org/apache/solr/handler/component/InstrumentedHtttpShardHandlerFactory*"
2528
}
2629
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.solr.handler.component;
18+
19+
import com.codahale.metrics.Gauge;
20+
import java.lang.invoke.MethodHandles;
21+
import java.lang.reflect.Field;
22+
import java.lang.reflect.InvocationTargetException;
23+
import java.lang.reflect.Method;
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
import java.util.Queue;
27+
import java.util.concurrent.ExecutorService;
28+
import org.apache.solr.client.solrj.impl.Http2SolrClient;
29+
import org.apache.solr.client.solrj.impl.LBHttp2SolrClient;
30+
import org.apache.solr.client.solrj.impl.LBSolrClient;
31+
import org.apache.solr.common.util.ExecutorUtil;
32+
import org.apache.solr.common.util.SolrNamedThreadFactory;
33+
import org.apache.solr.core.SolrInfoBean;
34+
import org.apache.solr.metrics.SolrMetricsContext;
35+
import org.eclipse.jetty.client.HttpClient;
36+
import org.eclipse.jetty.client.HttpDestination;
37+
import org.eclipse.jetty.client.HttpExchange;
38+
import org.eclipse.jetty.client.api.Destination;
39+
import org.slf4j.Logger;
40+
import org.slf4j.LoggerFactory;
41+
42+
/**
43+
* A shard handler factory that instruments the internal jetty http2 client to emit size metrics per
44+
* http destincation queue. This helps in identifying troublesome nodes in your Solr cloud setup.
45+
*
46+
* <p>It furthermore registers metrics for alive and zombie members of the shard ensemble reflecting
47+
* the current nodes view.
48+
*
49+
* <p>This code uses reflection to break up closed internal APIs.
50+
*/
51+
public class InstrumentedHtttpShardHandlerFactory extends HttpShardHandlerFactory {
52+
53+
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
54+
55+
private ExecutorService scheduler;
56+
57+
// store gauges here
58+
private final Map<String, HttpDestinationGauge> destinationGauges = new HashMap<>();
59+
60+
@Override
61+
public void initializeMetrics(SolrMetricsContext parentContext, String scope) {
62+
super.initializeMetrics(parentContext, scope);
63+
64+
// a scheduler to check available HttpDestinations
65+
this.scheduler =
66+
ExecutorUtil.newMDCAwareSingleThreadExecutor(
67+
new SolrNamedThreadFactory("httpshardhandler-destination-checker-"));
68+
69+
registerDeadServerMetricGauges();
70+
registerHttpDestinationGauges(defaultClient);
71+
}
72+
73+
void registerDeadServerMetricGauges() {
74+
try {
75+
// break up loadbalancer in super class
76+
final Field loadbalancer = HttpShardHandlerFactory.class.getDeclaredField("loadbalancer");
77+
loadbalancer.setAccessible(true);
78+
79+
// break up zombie servers in LBHttp2SolrClient
80+
final Field zombieServers = LBSolrClient.class.getDeclaredField("zombieServers");
81+
zombieServers.setAccessible(true);
82+
83+
// break up zombie servers in LBHttp2SolrClient
84+
final Field aliveServers = LBSolrClient.class.getDeclaredField("aliveServers");
85+
aliveServers.setAccessible(true);
86+
87+
// get instance
88+
final LBHttp2SolrClient loadbalancerSolrClient = (LBHttp2SolrClient) loadbalancer.get(this);
89+
@SuppressWarnings("unchecked")
90+
final Map<String, Object> zombies =
91+
(Map<String, Object>) zombieServers.get(loadbalancerSolrClient);
92+
@SuppressWarnings("unchecked")
93+
final Map<String, Object> alives =
94+
(Map<String, Object>) aliveServers.get(loadbalancerSolrClient);
95+
96+
// register the gauges
97+
getSolrMetricsContext()
98+
.gauge(
99+
() -> zombies.size(),
100+
true,
101+
"count",
102+
SolrInfoBean.Category.QUERY.name(),
103+
"httpShardHandler",
104+
"zombieServers");
105+
getSolrMetricsContext()
106+
.gauge(
107+
() -> alives.size(),
108+
true,
109+
"count",
110+
SolrInfoBean.Category.QUERY.name(),
111+
"httpShardHandler",
112+
"aliveServers");
113+
114+
} catch (NoSuchFieldException
115+
| SecurityException
116+
| IllegalArgumentException
117+
| IllegalAccessException e) {
118+
if (log.isWarnEnabled()) {
119+
log.warn("Could not attach gauge to zombie server list", e);
120+
}
121+
}
122+
}
123+
124+
void registerHttpDestinationGauges(Http2SolrClient solrClient) {
125+
try {
126+
// extract internal Jetty HttpClient
127+
Method getHttpClient = Http2SolrClient.class.getDeclaredMethod("getHttpClient");
128+
getHttpClient.setAccessible(true);
129+
final HttpClient httpClient = (HttpClient) getHttpClient.invoke(solrClient);
130+
131+
// submit a thread to the single scheduler to poll
132+
// for new destinations every second
133+
scheduler.submit(
134+
() -> {
135+
while (!scheduler.isShutdown()) {
136+
// wait a second
137+
try {
138+
Thread.sleep(1000);
139+
} catch (InterruptedException e) {
140+
if (log.isInfoEnabled()) {
141+
log.info("Got interrupted while checking for new destinations to monitor ...");
142+
}
143+
}
144+
145+
// check for new destinations
146+
try {
147+
checkAvailableHttpDestinations(httpClient);
148+
} catch (Exception e) {
149+
if (log.isWarnEnabled()) {
150+
log.warn("Could not check available destinations", e);
151+
}
152+
}
153+
}
154+
});
155+
} catch (NoSuchMethodException
156+
| SecurityException
157+
| IllegalAccessException
158+
| IllegalArgumentException
159+
| InvocationTargetException e) {
160+
if (log.isWarnEnabled()) {
161+
log.warn("Could not attach gauge to HttpDestination queue", e);
162+
}
163+
}
164+
}
165+
166+
void checkAvailableHttpDestinations(HttpClient httpClient) {
167+
for (Destination destination : httpClient.getDestinations()) {
168+
169+
// build a finite destination name
170+
final String destinationName =
171+
String.format(
172+
"%s://%s:%s", destination.getScheme(), destination.getHost(), destination.getPort());
173+
174+
// new destination appeared. Create Gauge
175+
if (!destinationGauges.containsKey(destinationName) && getSolrMetricsContext() != null) {
176+
try {
177+
final HttpDestinationGauge destinationGauge =
178+
new HttpDestinationGauge((HttpDestination) destination);
179+
getSolrMetricsContext()
180+
.gauge(
181+
destinationGauge,
182+
true,
183+
"count",
184+
SolrInfoBean.Category.QUERY.name(),
185+
"httpShardHandler",
186+
"shardRequestQueue",
187+
destinationName);
188+
destinationGauges.put(destinationName, destinationGauge);
189+
} catch (NoSuchFieldException | SecurityException e) {
190+
if (log.isWarnEnabled()) {
191+
log.warn("Could not inspect HttpDestination queue size", e);
192+
}
193+
}
194+
}
195+
}
196+
}
197+
198+
@Override
199+
public void close() {
200+
scheduler.shutdownNow();
201+
destinationGauges.clear();
202+
}
203+
204+
public static class HttpDestinationGauge implements Gauge<Integer> {
205+
206+
private final Field exchanges;
207+
private final HttpDestination destination;
208+
209+
public HttpDestinationGauge(HttpDestination destination)
210+
throws NoSuchFieldException, SecurityException {
211+
this.destination = destination;
212+
213+
// break up implementation
214+
this.exchanges = HttpDestination.class.getDeclaredField("exchanges");
215+
this.exchanges.setAccessible(true);
216+
}
217+
218+
@SuppressWarnings("unchecked")
219+
@Override
220+
public Integer getValue() {
221+
try {
222+
return ((Queue<HttpExchange>) exchanges.get(destination)).size();
223+
} catch (IllegalArgumentException | IllegalAccessException e) {
224+
if (log.isInfoEnabled()) {
225+
log.info("Could not pull http request queue size", e);
226+
}
227+
228+
return 0;
229+
}
230+
}
231+
}
232+
}

0 commit comments

Comments
 (0)