From 4472bba342efd62ecb90e21ad9838c62dbb1776a Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Mon, 30 Sep 2024 14:43:31 +0530 Subject: [PATCH] HIVE-28542: OTEL: Implement OTEL Exporter to expose JVM details of HiveServer2 (#5469). (Ayush Saxena, reviewed by Simhadri Govindappa, Tanishq Chugh) --- .../apache/hadoop/hive/common/JvmMetrics.java | 36 ++++- .../hive/service/servlet/OTELExporter.java | 133 +++++++++++++++++- 2 files changed, 161 insertions(+), 8 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/common/JvmMetrics.java b/common/src/java/org/apache/hadoop/hive/common/JvmMetrics.java index 6edf396288d8..c0d689d550f7 100644 --- a/common/src/java/org/apache/hadoop/hive/common/JvmMetrics.java +++ b/common/src/java/org/apache/hadoop/hive/common/JvmMetrics.java @@ -152,6 +152,16 @@ private MetricsInfo[] getGcInfo(String gcName) { } private void getThreadUsage(MetricsRecordBuilder rb) { + ThreadCountResult result = getThreadCountResult(threadMXBean); + rb.addGauge(ThreadsNew, result.threadsNew) + .addGauge(ThreadsRunnable, result.threadsRunnable) + .addGauge(ThreadsBlocked, result.threadsBlocked) + .addGauge(ThreadsWaiting, result.threadsWaiting) + .addGauge(ThreadsTimedWaiting, result.threadsTimedWaiting) + .addGauge(ThreadsTerminated, result.threadsTerminated); + } + + public static ThreadCountResult getThreadCountResult(ThreadMXBean threadMXBean) { int threadsNew = 0; int threadsRunnable = 0; int threadsBlocked = 0; @@ -170,12 +180,26 @@ private void getThreadUsage(MetricsRecordBuilder rb) { case TERMINATED: threadsTerminated++; break; } } - rb.addGauge(ThreadsNew, threadsNew) - .addGauge(ThreadsRunnable, threadsRunnable) - .addGauge(ThreadsBlocked, threadsBlocked) - .addGauge(ThreadsWaiting, threadsWaiting) - .addGauge(ThreadsTimedWaiting, threadsTimedWaiting) - .addGauge(ThreadsTerminated, threadsTerminated); + return new ThreadCountResult(threadsNew, threadsRunnable, threadsBlocked, threadsWaiting, threadsTimedWaiting, threadsTerminated); + } + + public static class ThreadCountResult { + public final int threadsNew; + public final int threadsRunnable; + public final int threadsBlocked; + public final int threadsWaiting; + public final int threadsTimedWaiting; + public final int threadsTerminated; + + public ThreadCountResult(int threadsNew, int threadsRunnable, int threadsBlocked, int threadsWaiting, int threadsTimedWaiting, + int threadsTerminated) { + this.threadsNew = threadsNew; + this.threadsRunnable = threadsRunnable; + this.threadsBlocked = threadsBlocked; + this.threadsWaiting = threadsWaiting; + this.threadsTimedWaiting = threadsTimedWaiting; + this.threadsTerminated = threadsTerminated; + } } private void getEventCounters(MetricsRecordBuilder rb) { diff --git a/service/src/java/org/apache/hive/service/servlet/OTELExporter.java b/service/src/java/org/apache/hive/service/servlet/OTELExporter.java index 8315c4e912e9..f8a2d55fea64 100644 --- a/service/src/java/org/apache/hive/service/servlet/OTELExporter.java +++ b/service/src/java/org/apache/hive/service/servlet/OTELExporter.java @@ -18,6 +18,11 @@ package org.apache.hive.service.servlet; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; +import java.lang.management.OperatingSystemMXBean; +import java.lang.management.ThreadMXBean; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -25,8 +30,11 @@ import java.util.Set; import java.util.concurrent.TimeUnit; +import com.sun.management.UnixOperatingSystemMXBean; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.metrics.DoubleGauge; +import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; @@ -34,13 +42,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.common.JvmMetrics; +import org.apache.hadoop.hive.common.JvmMetricsInfo; import org.apache.hadoop.hive.ql.QueryDisplay; import org.apache.hadoop.hive.ql.QueryInfo; import org.apache.hive.service.cli.operation.OperationManager; import org.apache.hive.service.cli.session.SessionManager; public class OTELExporter extends Thread { - private static final String INSTRUMENTATION_NAME = OTELExporter.class.getName(); + private static final String QUERY_SCOPE = OTELExporter.class.getName(); + private static final String JVM_SCOPE = JVMMetrics.class.getName(); private static final Logger LOG = LoggerFactory.getLogger(OTELExporter.class); private final OperationManager operationManager; private final Set historicalQueryId; @@ -48,9 +59,12 @@ public class OTELExporter extends Thread { private final Tracer tracer; private final Map queryIdToSpanMap; private final Map> queryIdToTasksMap; + private final JVMMetrics jvmMetrics; + public OTELExporter(OpenTelemetry openTelemetry, SessionManager sessionManager, long frequency) { - this.tracer = openTelemetry.getTracer(INSTRUMENTATION_NAME); + this.tracer = openTelemetry.getTracer(QUERY_SCOPE); + this.jvmMetrics = new JVMMetrics(openTelemetry.getMeter(JVM_SCOPE)); this.operationManager = sessionManager.getOperationManager(); this.historicalQueryId = new HashSet<>(); this.frequency = frequency; @@ -61,6 +75,7 @@ public OTELExporter(OpenTelemetry openTelemetry, SessionManager sessionManager, @Override public void run() { while (true) { + jvmMetrics.setJvmMetrics(); exposeMetricsToOTEL(); try { Thread.sleep(frequency); @@ -221,4 +236,118 @@ private AttributesMap addTaskAttributes(QueryDisplay.TaskDisplay taskDisplay) { attributes.put(AttributeKey.longKey("EndTime"), taskDisplay.getEndTime()); return attributes; } + + static class JVMMetrics { + + // The MXBean used to fetch values + private final MemoryMXBean memoryMXBean; + private final ThreadMXBean threadMXBean; + private final OperatingSystemMXBean osMXBean; + + // Memory Level Gauge + private final DoubleGauge memNonHeapUsedMGauge; + private final DoubleGauge memNonHeapMaxM; + private final DoubleGauge memHeapUsedM; + private final DoubleGauge memHeapCommittedM; + private final DoubleGauge memHeapMaxM; + private final DoubleGauge memMaxM; + private final DoubleGauge memNonHeapCommittedM; + + // Thread Level Gauge + private final DoubleGauge threadsNew; + private final DoubleGauge threadsRunnable; + private final DoubleGauge threadsBlocked; + private final DoubleGauge threadsWaiting; + private final DoubleGauge threadsTimedWaiting; + private final DoubleGauge threadsTerminated; + + // OS Level Gauge + private final DoubleGauge systemLoadAverage; + private final DoubleGauge systemCpuLoad; + private final DoubleGauge committedVirtualMemorySize; + private final DoubleGauge processCpuTime; + private final DoubleGauge freePhysicalMemorySize; + private final DoubleGauge freeSwapSpaceSize; + private final DoubleGauge totalPhysicalMemorySize; + private final DoubleGauge processCpuLoad; + + // 1 MB Constant + static final float M = 1024 * 1024; + + public JVMMetrics(Meter meter) { + memoryMXBean = ManagementFactory.getMemoryMXBean(); + threadMXBean = ManagementFactory.getThreadMXBean(); + osMXBean = ManagementFactory.getOperatingSystemMXBean(); + memNonHeapUsedMGauge = meter.gaugeBuilder(JvmMetricsInfo.MemNonHeapUsedM.name()).build(); + memNonHeapCommittedM = meter.gaugeBuilder(JvmMetricsInfo.MemNonHeapCommittedM.name()).build(); + memNonHeapMaxM = meter.gaugeBuilder(JvmMetricsInfo.MemNonHeapMaxM.name()).build(); + memHeapUsedM = meter.gaugeBuilder(JvmMetricsInfo.MemHeapUsedM.name()).build(); + memHeapCommittedM = meter.gaugeBuilder(JvmMetricsInfo.MemHeapCommittedM.name()).build(); + memHeapMaxM = meter.gaugeBuilder(JvmMetricsInfo.MemHeapMaxM.name()).build(); + memMaxM = meter.gaugeBuilder(JvmMetricsInfo.MemMaxM.name()).build(); + + // Thread Level Counters + threadsNew = meter.gaugeBuilder(JvmMetricsInfo.ThreadsNew.name()).build(); + threadsRunnable = meter.gaugeBuilder(JvmMetricsInfo.ThreadsRunnable.name()).build(); + threadsBlocked = meter.gaugeBuilder(JvmMetricsInfo.ThreadsBlocked.name()).build(); + threadsWaiting = meter.gaugeBuilder(JvmMetricsInfo.ThreadsWaiting.name()).build(); + threadsTimedWaiting = meter.gaugeBuilder(JvmMetricsInfo.ThreadsTimedWaiting.name()).build(); + threadsTerminated = meter.gaugeBuilder(JvmMetricsInfo.ThreadsTerminated.name()).build(); + + // Os Level Counters + systemLoadAverage = meter.gaugeBuilder("SystemLoadAverage").build(); + systemCpuLoad = meter.gaugeBuilder("SystemCpuLoad").build(); + committedVirtualMemorySize = meter.gaugeBuilder("CommittedVirtualMemorySize").build(); + + processCpuTime = meter.gaugeBuilder("ProcessCpuTime").build(); + freePhysicalMemorySize = meter.gaugeBuilder("FreePhysicalMemorySize").build(); + + freeSwapSpaceSize = meter.gaugeBuilder("FreeSwapSpaceSize").build(); + totalPhysicalMemorySize = meter.gaugeBuilder("TotalPhysicalMemorySize").build(); + processCpuLoad = meter.gaugeBuilder("ProcessCpuLoad").build(); + } + + public void setJvmMetrics() { + setMemoryValuesValues(); + setThreadCountValues(); + setOsLevelValues(); + } + + private void setMemoryValuesValues() { + MemoryUsage memNonHeap = memoryMXBean.getNonHeapMemoryUsage(); + MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage(); + Runtime runtime = Runtime.getRuntime(); + memNonHeapUsedMGauge.set(memNonHeap.getUsed() / M); + memNonHeapCommittedM.set(memNonHeap.getCommitted() / M); + memNonHeapMaxM.set(memNonHeap.getMax() / M); + memHeapUsedM.set(memHeap.getUsed() / M); + memHeapCommittedM.set(memHeap.getCommitted() / M); + memHeapMaxM.set(memHeap.getMax() / M); + memMaxM.set(runtime.maxMemory() / M); + } + + private void setThreadCountValues() { + JvmMetrics.ThreadCountResult threadCountResult = JvmMetrics.getThreadCountResult(threadMXBean); + threadsNew.set(threadCountResult.threadsNew); + threadsRunnable.set(threadCountResult.threadsRunnable); + threadsBlocked.set(threadCountResult.threadsBlocked); + threadsWaiting.set(threadCountResult.threadsWaiting); + threadsTimedWaiting.set(threadCountResult.threadsTimedWaiting); + threadsTerminated.set(threadCountResult.threadsTerminated); + } + + private void setOsLevelValues() { + systemLoadAverage.set(osMXBean.getSystemLoadAverage()); + if (osMXBean instanceof UnixOperatingSystemMXBean) { + UnixOperatingSystemMXBean unixMxBean = (UnixOperatingSystemMXBean) osMXBean; + systemCpuLoad.set(unixMxBean.getSystemCpuLoad()); + committedVirtualMemorySize.set(unixMxBean.getCommittedVirtualMemorySize()); + processCpuTime.set(unixMxBean.getProcessCpuTime()); + freePhysicalMemorySize.set(unixMxBean.getFreePhysicalMemorySize()); + freeSwapSpaceSize.set(unixMxBean.getFreeSwapSpaceSize()); + totalPhysicalMemorySize.set(unixMxBean.getTotalPhysicalMemorySize()); + processCpuLoad.set(unixMxBean.getProcessCpuLoad()); + } + } + } }