Skip to content

Commit

Permalink
Consolidate Metrics Query and Metrics Proccessor Services
Browse files Browse the repository at this point in the history
  • Loading branch information
bhardwaj-priyanshu committed Feb 10, 2025
1 parent ea42fd0 commit fb804c6
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import io.cdap.cdap.internal.app.services.AppFabricServer;
import io.cdap.cdap.logging.run.LogSaverServiceManager;
import io.cdap.cdap.messaging.distributed.MessagingServiceManager;
import io.cdap.cdap.metrics.runtime.MetricsProcessorStatusServiceManager;
import io.cdap.cdap.metrics.runtime.MetricsServiceManager;
import io.cdap.http.HttpHandler;
import java.util.Map;
Expand Down Expand Up @@ -115,9 +114,6 @@ private void addNonHadoopBindings(Binder binder) {
.toProvider(new NonHadoopMasterServiceManagerProvider(LogSaverServiceManager.class));
mapBinder.addBinding(Constants.Service.TRANSACTION)
.toProvider(new NonHadoopMasterServiceManagerProvider(TransactionServiceManager.class));
mapBinder.addBinding(Constants.Service.METRICS_PROCESSOR)
.toProvider(
new NonHadoopMasterServiceManagerProvider(MetricsProcessorStatusServiceManager.class));
mapBinder.addBinding(Constants.Service.METRICS)
.toProvider(new NonHadoopMasterServiceManagerProvider(MetricsServiceManager.class));
mapBinder.addBinding(Constants.Service.APP_FABRIC_HTTP)
Expand Down Expand Up @@ -152,11 +148,10 @@ private void addHadoopBindings(Binder binder) {
MasterServiceManager.class);
mapBinder.addBinding(Constants.Service.LOGSAVER).to(LogSaverServiceManager.class);
mapBinder.addBinding(Constants.Service.TRANSACTION).to(TransactionServiceManager.class);
mapBinder.addBinding(Constants.Service.METRICS_PROCESSOR)
.to(MetricsProcessorStatusServiceManager.class);
mapBinder.addBinding(Constants.Service.METRICS).to(MetricsServiceManager.class);
mapBinder.addBinding(Constants.Service.APP_FABRIC_HTTP).to(AppFabricServiceManager.class);
mapBinder.addBinding(Constants.Service.APP_FABRIC_PROCESSOR).to(AppFabricProcessorManager.class);
mapBinder.addBinding(Constants.Service.APP_FABRIC_PROCESSOR)
.to(AppFabricProcessorManager.class);
mapBinder.addBinding(Constants.Service.DATASET_EXECUTOR)
.to(DatasetExecutorServiceManager.class);
mapBinder.addBinding(Constants.Service.METADATA_SERVICE).to(MetadataServiceManager.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void testSystemServices() throws Exception {
List<SystemServiceMeta> actual = GSON.fromJson(new String(ByteStreams.toByteArray(urlConn.getInputStream()),
Charsets.UTF_8), token);

Assert.assertEquals(10, actual.size());
Assert.assertEquals(9, actual.size());
urlConn.disconnect();
}

Expand All @@ -81,7 +81,7 @@ public void testSystemServicesStatus() throws Exception {

Map<String, String> result = GSON.fromJson(new String(ByteStreams.toByteArray(urlConn.getInputStream()),
Charsets.UTF_8), token);
Assert.assertEquals(10, result.size());
Assert.assertEquals(9, result.size());
urlConn.disconnect();
Assert.assertEquals("OK", result.get(Constants.Service.APP_FABRIC_HTTP));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public static final class Service {
public static final String LOG_QUERY = "log.query";
public static final String GATEWAY = "gateway";
public static final String MASTER_SERVICES = "master.services";
public static final String METRICS_PROCESSOR = "metrics.processor";
public static final String METRICS_PROCESSOR = "metrics";
public static final String DATASET_MANAGER = "dataset.service";
public static final String DATASET_EXECUTOR = "dataset.executor";
public static final String EXTERNAL_AUTHENTICATION = "external.authentication";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,6 @@ private RouteDestination getV3RoutingService(String[] uriParts, AllowedMethod re
return LOG_SAVER;
case Constants.Service.TRANSACTION:
return TRANSACTION;
case Constants.Service.METRICS_PROCESSOR:
return METRICS_PROCESSOR;
case Constants.Service.METRICS:
return METRICS;
case Constants.Service.APP_FABRIC_HTTP:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.PrivateModule;
import com.google.inject.Scopes;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.api.metrics.MetricsContext;
Expand All @@ -39,9 +41,8 @@
import io.cdap.cdap.metrics.guice.MetricsStoreModule;
import io.cdap.cdap.metrics.process.MessagingMetricsProcessorServiceFactory;
import io.cdap.cdap.metrics.process.MetricsAdminSubscriberService;
import io.cdap.cdap.metrics.process.MetricsProcessorStatusService;
import io.cdap.cdap.metrics.process.loader.MetricsWriterModule;
import io.cdap.cdap.metrics.query.MetricsQueryService;
import io.cdap.cdap.metrics.service.MetricsService;
import io.cdap.cdap.metrics.store.MetricsCleanUpService;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.security.authorization.AuthorizationEnforcementModule;
Expand Down Expand Up @@ -78,7 +79,14 @@ protected List<Module> getServiceModules(MasterEnvironment masterEnv,
new MetricsProcessorStatusServiceModule(),
new MetricsHandlerModule(),
new DFSLocationModule(),
new MetricsWriterModule()
new MetricsWriterModule(),
new PrivateModule() {
@Override
protected void configure() {
bind(MetricsService.class).in(Scopes.SINGLETON);
expose(MetricsService.class);
}
}
);
}

Expand All @@ -98,8 +106,7 @@ protected void addServices(Injector injector, List<? super Service> services,

services.add(injector.getInstance(MessagingMetricsProcessorServiceFactory.class)
.create(topicNumbers, metricsContext, 0));
services.add(injector.getInstance(MetricsProcessorStatusService.class));
services.add(injector.getInstance(MetricsQueryService.class));
services.add(injector.getInstance(MetricsService.class));
services.add(injector.getInstance(MetricsAdminSubscriberService.class));
services.add(injector.getInstance(MetricsCleanUpService.class));
Binding<ZKClientService> zkBinding = injector.getExistingBinding(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class MetricsQueryService extends AbstractIdleService {
private static final Logger LOG = LoggerFactory.getLogger(MetricsQueryService.class);

private final NettyHttpService httpService;
private final Set<HttpHandler> handlers;
private final DiscoveryService discoveryService;
private Cancellable cancelDiscovery;

Expand All @@ -53,6 +54,7 @@ public MetricsQueryService(CConfiguration cConf, SConfiguration sConf,
@Named(Constants.Service.METRICS) Set<HttpHandler> handlers,
DiscoveryService discoveryService,
CommonNettyHttpServiceFactory commonNettyHttpServiceFactory) {
this.handlers = handlers;
// netty http server config
String address = cConf.get(Constants.Metrics.ADDRESS);
int backlogcnxs = cConf.getInt(Constants.Metrics.BACKLOG_CONNECTIONS, 20000);
Expand Down Expand Up @@ -109,4 +111,11 @@ protected void shutDown() throws Exception {
cancelDiscovery.cancel();
httpService.stop();
}

/**
* Returns the set of HTTP handlers associated.
*/
public Set<HttpHandler> getHandlers() {
return handlers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.metrics.service;

import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.SConfiguration;
import io.cdap.cdap.common.discovery.ResolvingDiscoverable;
import io.cdap.cdap.common.discovery.URIScheme;
import io.cdap.cdap.common.http.CommonNettyHttpServiceFactory;
import io.cdap.cdap.common.id.Id;
import io.cdap.cdap.common.logging.LoggingContextAccessor;
import io.cdap.cdap.common.logging.ServiceLoggingContext;
import io.cdap.cdap.common.security.HttpsEnabler;
import io.cdap.cdap.metrics.query.MetricsQueryService;
import io.cdap.http.HttpHandler;
import io.cdap.http.NettyHttpService;
import java.util.Set;
import org.apache.twill.common.Cancellable;
import org.apache.twill.discovery.DiscoveryService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Metrics implemented using the common http netty framework.
*/
public class MetricsService extends AbstractIdleService {

private static final Logger LOG = LoggerFactory.getLogger(MetricsService.class);

private final NettyHttpService httpService;
private final DiscoveryService discoveryService;
private final MetricsQueryService metricsQueryService;
private Cancellable cancelDiscovery;

@Inject

Check warning on line 52 in cdap-watchdog/src/main/java/io/cdap/cdap/metrics/service/MetricsService.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.MissingJavadocMethodCheck

Missing a Javadoc comment.
public MetricsService(CConfiguration cConf, SConfiguration sConf,
MetricsQueryService metricsQueryService,
DiscoveryService discoveryService,
CommonNettyHttpServiceFactory commonNettyHttpServiceFactory) {
this.metricsQueryService = metricsQueryService;
// netty http server config
String address = cConf.get(Constants.Metrics.ADDRESS);
int backlogcnxs = cConf.getInt(Constants.Metrics.BACKLOG_CONNECTIONS, 20000);
int execthreads = cConf.getInt(Constants.Metrics.EXEC_THREADS, 20);
int bossthreads = cConf.getInt(Constants.Metrics.BOSS_THREADS, 1);
int workerthreads = cConf.getInt(Constants.Metrics.WORKER_THREADS, 10);
Set<HttpHandler> handlers = metricsQueryService.getHandlers();

NettyHttpService.Builder builder = commonNettyHttpServiceFactory.builder(
Constants.Service.METRICS)
.setHttpHandlers(handlers)
.setHost(address)
.setPort(cConf.getInt(Constants.Metrics.PORT))
.setConnectionBacklog(backlogcnxs)
.setExecThreadPoolSize(execthreads)
.setBossThreadPoolSize(bossthreads)
.setWorkerThreadPoolSize(workerthreads);

if (cConf.getBoolean(Constants.Security.SSL.INTERNAL_ENABLED)) {
new HttpsEnabler().configureKeyStore(cConf, sConf).enable(builder);
}

this.httpService = builder.build();
this.discoveryService = discoveryService;

LOG.info("Configuring MetricsService "
+ ", address: " + address
+ ", backlog connections: " + backlogcnxs
+ ", execthreads: " + execthreads
+ ", bossthreads: " + bossthreads
+ ", workerthreads: " + workerthreads);
}

@Override
protected void startUp() throws Exception {
LoggingContextAccessor.setLoggingContext(new ServiceLoggingContext(Id.Namespace.SYSTEM.getId(),
Constants.Logging.COMPONENT_NAME,
Constants.Service.METRICS));

LOG.info("Starting Metrics Service...");
httpService.start();
LOG.info("Started Metrics HTTP Service...");
// Register the service
cancelDiscovery = discoveryService.register(
ResolvingDiscoverable.of(
URIScheme.createDiscoverable(Constants.Service.METRICS, httpService)));
LOG.info("Metrics Service started successfully on {}", httpService.getBindAddress());
}

@Override
protected void shutDown() throws Exception {
LOG.info("Stopping Metrics Service...");

// Unregister the service
try {
if (cancelDiscovery != null) {
cancelDiscovery.cancel();
}
} finally {
httpService.stop();
}
}
}

0 comments on commit fb804c6

Please sign in to comment.