From fb804c6385f39e911005aeb54c8d177e9f0e4b8d Mon Sep 17 00:00:00 2001 From: Priyanshu Bhardwaj Date: Mon, 10 Feb 2025 13:53:32 +0000 Subject: [PATCH] Consolidate Metrics Query and Metrics Proccessor Services --- .../cdap/app/guice/MonitorHandlerModule.java | 9 +- .../http/handlers/MonitorHandlerTest.java | 4 +- .../io/cdap/cdap/common/conf/Constants.java | 2 +- .../cdap/gateway/router/RouterPathLookup.java | 2 - .../environment/k8s/MetricsServiceMain.java | 17 ++- .../metrics/query/MetricsQueryService.java | 9 ++ .../cdap/metrics/service/MetricsService.java | 120 ++++++++++++++++++ 7 files changed, 146 insertions(+), 17 deletions(-) create mode 100644 cdap-watchdog/src/main/java/io/cdap/cdap/metrics/service/MetricsService.java diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/MonitorHandlerModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/MonitorHandlerModule.java index 98e91fdccfeb..f042af1c0e7a 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/MonitorHandlerModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/MonitorHandlerModule.java @@ -51,7 +51,6 @@ import io.cdap.cdap.internal.app.services.AppFabricServer; import io.cdap.cdap.logging.run.LogSaverServiceManager; import io.cdap.cdap.messaging.distributed.MessagingServiceManager; -import io.cdap.cdap.metrics.runtime.MetricsProcessorStatusServiceManager; import io.cdap.cdap.metrics.runtime.MetricsServiceManager; import io.cdap.http.HttpHandler; import java.util.Map; @@ -115,9 +114,6 @@ private void addNonHadoopBindings(Binder binder) { .toProvider(new NonHadoopMasterServiceManagerProvider(LogSaverServiceManager.class)); mapBinder.addBinding(Constants.Service.TRANSACTION) .toProvider(new NonHadoopMasterServiceManagerProvider(TransactionServiceManager.class)); - mapBinder.addBinding(Constants.Service.METRICS_PROCESSOR) - .toProvider( - new NonHadoopMasterServiceManagerProvider(MetricsProcessorStatusServiceManager.class)); mapBinder.addBinding(Constants.Service.METRICS) .toProvider(new NonHadoopMasterServiceManagerProvider(MetricsServiceManager.class)); mapBinder.addBinding(Constants.Service.APP_FABRIC_HTTP) @@ -152,11 +148,10 @@ private void addHadoopBindings(Binder binder) { MasterServiceManager.class); mapBinder.addBinding(Constants.Service.LOGSAVER).to(LogSaverServiceManager.class); mapBinder.addBinding(Constants.Service.TRANSACTION).to(TransactionServiceManager.class); - mapBinder.addBinding(Constants.Service.METRICS_PROCESSOR) - .to(MetricsProcessorStatusServiceManager.class); mapBinder.addBinding(Constants.Service.METRICS).to(MetricsServiceManager.class); mapBinder.addBinding(Constants.Service.APP_FABRIC_HTTP).to(AppFabricServiceManager.class); - mapBinder.addBinding(Constants.Service.APP_FABRIC_PROCESSOR).to(AppFabricProcessorManager.class); + mapBinder.addBinding(Constants.Service.APP_FABRIC_PROCESSOR) + .to(AppFabricProcessorManager.class); mapBinder.addBinding(Constants.Service.DATASET_EXECUTOR) .to(DatasetExecutorServiceManager.class); mapBinder.addBinding(Constants.Service.METADATA_SERVICE).to(MetadataServiceManager.class); diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/MonitorHandlerTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/MonitorHandlerTest.java index f3587dc7360a..9e875adb30ef 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/MonitorHandlerTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/MonitorHandlerTest.java @@ -66,7 +66,7 @@ public void testSystemServices() throws Exception { List actual = GSON.fromJson(new String(ByteStreams.toByteArray(urlConn.getInputStream()), Charsets.UTF_8), token); - Assert.assertEquals(10, actual.size()); + Assert.assertEquals(9, actual.size()); urlConn.disconnect(); } @@ -81,7 +81,7 @@ public void testSystemServicesStatus() throws Exception { Map result = GSON.fromJson(new String(ByteStreams.toByteArray(urlConn.getInputStream()), Charsets.UTF_8), token); - Assert.assertEquals(10, result.size()); + Assert.assertEquals(9, result.size()); urlConn.disconnect(); Assert.assertEquals("OK", result.get(Constants.Service.APP_FABRIC_HTTP)); } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java index f231022e5733..2c341729807c 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java @@ -129,7 +129,7 @@ public static final class Service { public static final String LOG_QUERY = "log.query"; public static final String GATEWAY = "gateway"; public static final String MASTER_SERVICES = "master.services"; - public static final String METRICS_PROCESSOR = "metrics.processor"; + public static final String METRICS_PROCESSOR = "metrics"; public static final String DATASET_MANAGER = "dataset.service"; public static final String DATASET_EXECUTOR = "dataset.executor"; public static final String EXTERNAL_AUTHENTICATION = "external.authentication"; diff --git a/cdap-gateway/src/main/java/io/cdap/cdap/gateway/router/RouterPathLookup.java b/cdap-gateway/src/main/java/io/cdap/cdap/gateway/router/RouterPathLookup.java index 038e4170f5c5..e38861757267 100644 --- a/cdap-gateway/src/main/java/io/cdap/cdap/gateway/router/RouterPathLookup.java +++ b/cdap-gateway/src/main/java/io/cdap/cdap/gateway/router/RouterPathLookup.java @@ -225,8 +225,6 @@ private RouteDestination getV3RoutingService(String[] uriParts, AllowedMethod re return LOG_SAVER; case Constants.Service.TRANSACTION: return TRANSACTION; - case Constants.Service.METRICS_PROCESSOR: - return METRICS_PROCESSOR; case Constants.Service.METRICS: return METRICS; case Constants.Service.APP_FABRIC_HTTP: diff --git a/cdap-master/src/main/java/io/cdap/cdap/master/environment/k8s/MetricsServiceMain.java b/cdap-master/src/main/java/io/cdap/cdap/master/environment/k8s/MetricsServiceMain.java index 0bfd646ee4f1..65e2fd19d143 100644 --- a/cdap-master/src/main/java/io/cdap/cdap/master/environment/k8s/MetricsServiceMain.java +++ b/cdap-master/src/main/java/io/cdap/cdap/master/environment/k8s/MetricsServiceMain.java @@ -21,6 +21,8 @@ import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; +import com.google.inject.PrivateModule; +import com.google.inject.Scopes; import com.google.inject.assistedinject.FactoryModuleBuilder; import io.cdap.cdap.api.metrics.MetricsCollectionService; import io.cdap.cdap.api.metrics.MetricsContext; @@ -39,9 +41,8 @@ import io.cdap.cdap.metrics.guice.MetricsStoreModule; import io.cdap.cdap.metrics.process.MessagingMetricsProcessorServiceFactory; import io.cdap.cdap.metrics.process.MetricsAdminSubscriberService; -import io.cdap.cdap.metrics.process.MetricsProcessorStatusService; import io.cdap.cdap.metrics.process.loader.MetricsWriterModule; -import io.cdap.cdap.metrics.query.MetricsQueryService; +import io.cdap.cdap.metrics.service.MetricsService; import io.cdap.cdap.metrics.store.MetricsCleanUpService; import io.cdap.cdap.proto.id.NamespaceId; import io.cdap.cdap.security.authorization.AuthorizationEnforcementModule; @@ -78,7 +79,14 @@ protected List getServiceModules(MasterEnvironment masterEnv, new MetricsProcessorStatusServiceModule(), new MetricsHandlerModule(), new DFSLocationModule(), - new MetricsWriterModule() + new MetricsWriterModule(), + new PrivateModule() { + @Override + protected void configure() { + bind(MetricsService.class).in(Scopes.SINGLETON); + expose(MetricsService.class); + } + } ); } @@ -98,8 +106,7 @@ protected void addServices(Injector injector, List services, services.add(injector.getInstance(MessagingMetricsProcessorServiceFactory.class) .create(topicNumbers, metricsContext, 0)); - services.add(injector.getInstance(MetricsProcessorStatusService.class)); - services.add(injector.getInstance(MetricsQueryService.class)); + services.add(injector.getInstance(MetricsService.class)); services.add(injector.getInstance(MetricsAdminSubscriberService.class)); services.add(injector.getInstance(MetricsCleanUpService.class)); Binding zkBinding = injector.getExistingBinding( diff --git a/cdap-watchdog/src/main/java/io/cdap/cdap/metrics/query/MetricsQueryService.java b/cdap-watchdog/src/main/java/io/cdap/cdap/metrics/query/MetricsQueryService.java index 585fc4ef6a58..a33e5f2041d2 100644 --- a/cdap-watchdog/src/main/java/io/cdap/cdap/metrics/query/MetricsQueryService.java +++ b/cdap-watchdog/src/main/java/io/cdap/cdap/metrics/query/MetricsQueryService.java @@ -45,6 +45,7 @@ public class MetricsQueryService extends AbstractIdleService { private static final Logger LOG = LoggerFactory.getLogger(MetricsQueryService.class); private final NettyHttpService httpService; + private final Set handlers; private final DiscoveryService discoveryService; private Cancellable cancelDiscovery; @@ -53,6 +54,7 @@ public MetricsQueryService(CConfiguration cConf, SConfiguration sConf, @Named(Constants.Service.METRICS) Set handlers, DiscoveryService discoveryService, CommonNettyHttpServiceFactory commonNettyHttpServiceFactory) { + this.handlers = handlers; // netty http server config String address = cConf.get(Constants.Metrics.ADDRESS); int backlogcnxs = cConf.getInt(Constants.Metrics.BACKLOG_CONNECTIONS, 20000); @@ -109,4 +111,11 @@ protected void shutDown() throws Exception { cancelDiscovery.cancel(); httpService.stop(); } + + /** + * Returns the set of HTTP handlers associated. + */ + public Set getHandlers() { + return handlers; + } } diff --git a/cdap-watchdog/src/main/java/io/cdap/cdap/metrics/service/MetricsService.java b/cdap-watchdog/src/main/java/io/cdap/cdap/metrics/service/MetricsService.java new file mode 100644 index 000000000000..988dc53043e6 --- /dev/null +++ b/cdap-watchdog/src/main/java/io/cdap/cdap/metrics/service/MetricsService.java @@ -0,0 +1,120 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.metrics.service; + +import com.google.common.util.concurrent.AbstractIdleService; +import com.google.inject.Inject; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.common.conf.SConfiguration; +import io.cdap.cdap.common.discovery.ResolvingDiscoverable; +import io.cdap.cdap.common.discovery.URIScheme; +import io.cdap.cdap.common.http.CommonNettyHttpServiceFactory; +import io.cdap.cdap.common.id.Id; +import io.cdap.cdap.common.logging.LoggingContextAccessor; +import io.cdap.cdap.common.logging.ServiceLoggingContext; +import io.cdap.cdap.common.security.HttpsEnabler; +import io.cdap.cdap.metrics.query.MetricsQueryService; +import io.cdap.http.HttpHandler; +import io.cdap.http.NettyHttpService; +import java.util.Set; +import org.apache.twill.common.Cancellable; +import org.apache.twill.discovery.DiscoveryService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Metrics implemented using the common http netty framework. + */ +public class MetricsService extends AbstractIdleService { + + private static final Logger LOG = LoggerFactory.getLogger(MetricsService.class); + + private final NettyHttpService httpService; + private final DiscoveryService discoveryService; + private final MetricsQueryService metricsQueryService; + private Cancellable cancelDiscovery; + + @Inject + public MetricsService(CConfiguration cConf, SConfiguration sConf, + MetricsQueryService metricsQueryService, + DiscoveryService discoveryService, + CommonNettyHttpServiceFactory commonNettyHttpServiceFactory) { + this.metricsQueryService = metricsQueryService; + // netty http server config + String address = cConf.get(Constants.Metrics.ADDRESS); + int backlogcnxs = cConf.getInt(Constants.Metrics.BACKLOG_CONNECTIONS, 20000); + int execthreads = cConf.getInt(Constants.Metrics.EXEC_THREADS, 20); + int bossthreads = cConf.getInt(Constants.Metrics.BOSS_THREADS, 1); + int workerthreads = cConf.getInt(Constants.Metrics.WORKER_THREADS, 10); + Set handlers = metricsQueryService.getHandlers(); + + NettyHttpService.Builder builder = commonNettyHttpServiceFactory.builder( + Constants.Service.METRICS) + .setHttpHandlers(handlers) + .setHost(address) + .setPort(cConf.getInt(Constants.Metrics.PORT)) + .setConnectionBacklog(backlogcnxs) + .setExecThreadPoolSize(execthreads) + .setBossThreadPoolSize(bossthreads) + .setWorkerThreadPoolSize(workerthreads); + + if (cConf.getBoolean(Constants.Security.SSL.INTERNAL_ENABLED)) { + new HttpsEnabler().configureKeyStore(cConf, sConf).enable(builder); + } + + this.httpService = builder.build(); + this.discoveryService = discoveryService; + + LOG.info("Configuring MetricsService " + + ", address: " + address + + ", backlog connections: " + backlogcnxs + + ", execthreads: " + execthreads + + ", bossthreads: " + bossthreads + + ", workerthreads: " + workerthreads); + } + + @Override + protected void startUp() throws Exception { + LoggingContextAccessor.setLoggingContext(new ServiceLoggingContext(Id.Namespace.SYSTEM.getId(), + Constants.Logging.COMPONENT_NAME, + Constants.Service.METRICS)); + + LOG.info("Starting Metrics Service..."); + httpService.start(); + LOG.info("Started Metrics HTTP Service..."); + // Register the service + cancelDiscovery = discoveryService.register( + ResolvingDiscoverable.of( + URIScheme.createDiscoverable(Constants.Service.METRICS, httpService))); + LOG.info("Metrics Service started successfully on {}", httpService.getBindAddress()); + } + + @Override + protected void shutDown() throws Exception { + LOG.info("Stopping Metrics Service..."); + + // Unregister the service + try { + if (cancelDiscovery != null) { + cancelDiscovery.cancel(); + } + } finally { + httpService.stop(); + } + } +}