Skip to content

Commit

Permalink
HIVE-28542: OTEL: Implement OTEL Exporter to expose JVM details of Hi…
Browse files Browse the repository at this point in the history
…veServer2 (apache#5469). (Ayush Saxena, reviewed by Simhadri Govindappa, Tanishq Chugh)
  • Loading branch information
ayushtkn authored Sep 30, 2024
1 parent d85b87c commit 4472bba
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 8 deletions.
36 changes: 30 additions & 6 deletions common/src/java/org/apache/hadoop/hive/common/JvmMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,16 @@ private MetricsInfo[] getGcInfo(String gcName) {
}

private void getThreadUsage(MetricsRecordBuilder rb) {
ThreadCountResult result = getThreadCountResult(threadMXBean);
rb.addGauge(ThreadsNew, result.threadsNew)
.addGauge(ThreadsRunnable, result.threadsRunnable)
.addGauge(ThreadsBlocked, result.threadsBlocked)
.addGauge(ThreadsWaiting, result.threadsWaiting)
.addGauge(ThreadsTimedWaiting, result.threadsTimedWaiting)
.addGauge(ThreadsTerminated, result.threadsTerminated);
}

public static ThreadCountResult getThreadCountResult(ThreadMXBean threadMXBean) {
int threadsNew = 0;
int threadsRunnable = 0;
int threadsBlocked = 0;
Expand All @@ -170,12 +180,26 @@ private void getThreadUsage(MetricsRecordBuilder rb) {
case TERMINATED: threadsTerminated++; break;
}
}
rb.addGauge(ThreadsNew, threadsNew)
.addGauge(ThreadsRunnable, threadsRunnable)
.addGauge(ThreadsBlocked, threadsBlocked)
.addGauge(ThreadsWaiting, threadsWaiting)
.addGauge(ThreadsTimedWaiting, threadsTimedWaiting)
.addGauge(ThreadsTerminated, threadsTerminated);
return new ThreadCountResult(threadsNew, threadsRunnable, threadsBlocked, threadsWaiting, threadsTimedWaiting, threadsTerminated);
}

public static class ThreadCountResult {
public final int threadsNew;
public final int threadsRunnable;
public final int threadsBlocked;
public final int threadsWaiting;
public final int threadsTimedWaiting;
public final int threadsTerminated;

public ThreadCountResult(int threadsNew, int threadsRunnable, int threadsBlocked, int threadsWaiting, int threadsTimedWaiting,
int threadsTerminated) {
this.threadsNew = threadsNew;
this.threadsRunnable = threadsRunnable;
this.threadsBlocked = threadsBlocked;
this.threadsWaiting = threadsWaiting;
this.threadsTimedWaiting = threadsTimedWaiting;
this.threadsTerminated = threadsTerminated;
}
}

private void getEventCounters(MetricsRecordBuilder rb) {
Expand Down
133 changes: 131 additions & 2 deletions service/src/java/org/apache/hive/service/servlet/OTELExporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,53 @@

package org.apache.hive.service.servlet;

import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.ThreadMXBean;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import com.sun.management.UnixOperatingSystemMXBean;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.metrics.DoubleGauge;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.internal.AttributesMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hive.common.JvmMetrics;
import org.apache.hadoop.hive.common.JvmMetricsInfo;
import org.apache.hadoop.hive.ql.QueryDisplay;
import org.apache.hadoop.hive.ql.QueryInfo;
import org.apache.hive.service.cli.operation.OperationManager;
import org.apache.hive.service.cli.session.SessionManager;

public class OTELExporter extends Thread {
private static final String INSTRUMENTATION_NAME = OTELExporter.class.getName();
private static final String QUERY_SCOPE = OTELExporter.class.getName();
private static final String JVM_SCOPE = JVMMetrics.class.getName();
private static final Logger LOG = LoggerFactory.getLogger(OTELExporter.class);
private final OperationManager operationManager;
private final Set<String> historicalQueryId;
private final long frequency;
private final Tracer tracer;
private final Map<String, Span> queryIdToSpanMap;
private final Map<String, Set<String>> queryIdToTasksMap;
private final JVMMetrics jvmMetrics;


public OTELExporter(OpenTelemetry openTelemetry, SessionManager sessionManager, long frequency) {
this.tracer = openTelemetry.getTracer(INSTRUMENTATION_NAME);
this.tracer = openTelemetry.getTracer(QUERY_SCOPE);
this.jvmMetrics = new JVMMetrics(openTelemetry.getMeter(JVM_SCOPE));
this.operationManager = sessionManager.getOperationManager();
this.historicalQueryId = new HashSet<>();
this.frequency = frequency;
Expand All @@ -61,6 +75,7 @@ public OTELExporter(OpenTelemetry openTelemetry, SessionManager sessionManager,
@Override
public void run() {
while (true) {
jvmMetrics.setJvmMetrics();
exposeMetricsToOTEL();
try {
Thread.sleep(frequency);
Expand Down Expand Up @@ -221,4 +236,118 @@ private AttributesMap addTaskAttributes(QueryDisplay.TaskDisplay taskDisplay) {
attributes.put(AttributeKey.longKey("EndTime"), taskDisplay.getEndTime());
return attributes;
}

static class JVMMetrics {

// The MXBean used to fetch values
private final MemoryMXBean memoryMXBean;
private final ThreadMXBean threadMXBean;
private final OperatingSystemMXBean osMXBean;

// Memory Level Gauge
private final DoubleGauge memNonHeapUsedMGauge;
private final DoubleGauge memNonHeapMaxM;
private final DoubleGauge memHeapUsedM;
private final DoubleGauge memHeapCommittedM;
private final DoubleGauge memHeapMaxM;
private final DoubleGauge memMaxM;
private final DoubleGauge memNonHeapCommittedM;

// Thread Level Gauge
private final DoubleGauge threadsNew;
private final DoubleGauge threadsRunnable;
private final DoubleGauge threadsBlocked;
private final DoubleGauge threadsWaiting;
private final DoubleGauge threadsTimedWaiting;
private final DoubleGauge threadsTerminated;

// OS Level Gauge
private final DoubleGauge systemLoadAverage;
private final DoubleGauge systemCpuLoad;
private final DoubleGauge committedVirtualMemorySize;
private final DoubleGauge processCpuTime;
private final DoubleGauge freePhysicalMemorySize;
private final DoubleGauge freeSwapSpaceSize;
private final DoubleGauge totalPhysicalMemorySize;
private final DoubleGauge processCpuLoad;

// 1 MB Constant
static final float M = 1024 * 1024;

public JVMMetrics(Meter meter) {
memoryMXBean = ManagementFactory.getMemoryMXBean();
threadMXBean = ManagementFactory.getThreadMXBean();
osMXBean = ManagementFactory.getOperatingSystemMXBean();
memNonHeapUsedMGauge = meter.gaugeBuilder(JvmMetricsInfo.MemNonHeapUsedM.name()).build();
memNonHeapCommittedM = meter.gaugeBuilder(JvmMetricsInfo.MemNonHeapCommittedM.name()).build();
memNonHeapMaxM = meter.gaugeBuilder(JvmMetricsInfo.MemNonHeapMaxM.name()).build();
memHeapUsedM = meter.gaugeBuilder(JvmMetricsInfo.MemHeapUsedM.name()).build();
memHeapCommittedM = meter.gaugeBuilder(JvmMetricsInfo.MemHeapCommittedM.name()).build();
memHeapMaxM = meter.gaugeBuilder(JvmMetricsInfo.MemHeapMaxM.name()).build();
memMaxM = meter.gaugeBuilder(JvmMetricsInfo.MemMaxM.name()).build();

// Thread Level Counters
threadsNew = meter.gaugeBuilder(JvmMetricsInfo.ThreadsNew.name()).build();
threadsRunnable = meter.gaugeBuilder(JvmMetricsInfo.ThreadsRunnable.name()).build();
threadsBlocked = meter.gaugeBuilder(JvmMetricsInfo.ThreadsBlocked.name()).build();
threadsWaiting = meter.gaugeBuilder(JvmMetricsInfo.ThreadsWaiting.name()).build();
threadsTimedWaiting = meter.gaugeBuilder(JvmMetricsInfo.ThreadsTimedWaiting.name()).build();
threadsTerminated = meter.gaugeBuilder(JvmMetricsInfo.ThreadsTerminated.name()).build();

// Os Level Counters
systemLoadAverage = meter.gaugeBuilder("SystemLoadAverage").build();
systemCpuLoad = meter.gaugeBuilder("SystemCpuLoad").build();
committedVirtualMemorySize = meter.gaugeBuilder("CommittedVirtualMemorySize").build();

processCpuTime = meter.gaugeBuilder("ProcessCpuTime").build();
freePhysicalMemorySize = meter.gaugeBuilder("FreePhysicalMemorySize").build();

freeSwapSpaceSize = meter.gaugeBuilder("FreeSwapSpaceSize").build();
totalPhysicalMemorySize = meter.gaugeBuilder("TotalPhysicalMemorySize").build();
processCpuLoad = meter.gaugeBuilder("ProcessCpuLoad").build();
}

public void setJvmMetrics() {
setMemoryValuesValues();
setThreadCountValues();
setOsLevelValues();
}

private void setMemoryValuesValues() {
MemoryUsage memNonHeap = memoryMXBean.getNonHeapMemoryUsage();
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
Runtime runtime = Runtime.getRuntime();
memNonHeapUsedMGauge.set(memNonHeap.getUsed() / M);
memNonHeapCommittedM.set(memNonHeap.getCommitted() / M);
memNonHeapMaxM.set(memNonHeap.getMax() / M);
memHeapUsedM.set(memHeap.getUsed() / M);
memHeapCommittedM.set(memHeap.getCommitted() / M);
memHeapMaxM.set(memHeap.getMax() / M);
memMaxM.set(runtime.maxMemory() / M);
}

private void setThreadCountValues() {
JvmMetrics.ThreadCountResult threadCountResult = JvmMetrics.getThreadCountResult(threadMXBean);
threadsNew.set(threadCountResult.threadsNew);
threadsRunnable.set(threadCountResult.threadsRunnable);
threadsBlocked.set(threadCountResult.threadsBlocked);
threadsWaiting.set(threadCountResult.threadsWaiting);
threadsTimedWaiting.set(threadCountResult.threadsTimedWaiting);
threadsTerminated.set(threadCountResult.threadsTerminated);
}

private void setOsLevelValues() {
systemLoadAverage.set(osMXBean.getSystemLoadAverage());
if (osMXBean instanceof UnixOperatingSystemMXBean) {
UnixOperatingSystemMXBean unixMxBean = (UnixOperatingSystemMXBean) osMXBean;
systemCpuLoad.set(unixMxBean.getSystemCpuLoad());
committedVirtualMemorySize.set(unixMxBean.getCommittedVirtualMemorySize());
processCpuTime.set(unixMxBean.getProcessCpuTime());
freePhysicalMemorySize.set(unixMxBean.getFreePhysicalMemorySize());
freeSwapSpaceSize.set(unixMxBean.getFreeSwapSpaceSize());
totalPhysicalMemorySize.set(unixMxBean.getTotalPhysicalMemorySize());
processCpuLoad.set(unixMxBean.getProcessCpuLoad());
}
}
}
}

0 comments on commit 4472bba

Please sign in to comment.