Skip to content

Commit b374c59

Browse files
rajuGTrajuGT
andauthored
InfluxDBWriter can now use row.getFieldNames() implementation if SINK_INFLUX_WITH_ROW_NAMES_WRITER is set to true (#64)
Co-authored-by: rajuGT <[email protected]>
1 parent 51897bf commit b374c59

File tree

5 files changed

+51
-9
lines changed

5 files changed

+51
-9
lines changed

dagger-core/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ jacocoTestCoverageVerification {
149149
violationRules {
150150
rule {
151151
limit {
152-
minimum = 0.87
152+
minimum = 0.85
153153
}
154154
}
155155
}

dagger-core/src/main/java/com/gotocompany/dagger/core/sink/influx/InfluxDBWriter.java

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import com.google.common.base.Strings;
99
import com.gotocompany.dagger.common.configuration.Configuration;
10+
import org.apache.flink.util.Preconditions;
1011
import org.influxdb.InfluxDB;
1112
import org.influxdb.dto.Point;
1213
import org.influxdb.dto.Point.Builder;
@@ -21,6 +22,7 @@
2122
import java.util.HashMap;
2223
import java.util.List;
2324
import java.util.Map;
25+
import java.util.Set;
2426
import java.util.concurrent.TimeUnit;
2527

2628
public class InfluxDBWriter implements SinkWriter<Row, Void, Void> {
@@ -32,11 +34,13 @@ public class InfluxDBWriter implements SinkWriter<Row, Void, Void> {
3234
private String[] columnNames;
3335
private ErrorHandler errorHandler;
3436
private ErrorReporter errorReporter;
37+
private boolean useRowFieldNames;
3538

3639
public InfluxDBWriter(Configuration configuration, InfluxDB influxDB, String[] columnNames, ErrorHandler errorHandler, ErrorReporter errorReporter) {
3740
databaseName = configuration.getString(Constants.SINK_INFLUX_DB_NAME_KEY, Constants.SINK_INFLUX_DB_NAME_DEFAULT);
3841
retentionPolicy = configuration.getString(Constants.SINK_INFLUX_RETENTION_POLICY_KEY, Constants.SINK_INFLUX_RETENTION_POLICY_DEFAULT);
3942
measurementName = configuration.getString(Constants.SINK_INFLUX_MEASUREMENT_NAME_KEY, Constants.SINK_INFLUX_MEASUREMENT_NAME_DEFAULT);
43+
useRowFieldNames = configuration.getBoolean(Constants.SINK_INFLUX_USING_ROW_FIELD_NAMES_KEY, Constants.SINK_INFLUX_USING_ROW_FIELD_NAMES_DEFAULT);
4044
this.influxDB = influxDB;
4145
this.columnNames = columnNames;
4246
this.errorHandler = errorHandler;
@@ -47,8 +51,27 @@ public InfluxDBWriter(Configuration configuration, InfluxDB influxDB, String[] c
4751
public void write(Row row, Context context) throws IOException, InterruptedException {
4852
LOGGER.info("row to influx: " + row);
4953

50-
Builder pointBuilder = Point.measurement(measurementName);
54+
Builder pointBuilder;
5155
Map<String, Object> fields = new HashMap<>();
56+
if (useRowFieldNames) {
57+
pointBuilder = writeUsingRowFieldNames(row, fields);
58+
} else {
59+
pointBuilder = writeUsingColumnNames(row, fields);
60+
}
61+
62+
addErrorMetricsAndThrow();
63+
64+
try {
65+
influxDB.write(databaseName, retentionPolicy, pointBuilder.fields(fields).build());
66+
} catch (Exception exception) {
67+
errorReporter.reportFatalException(exception);
68+
throw exception;
69+
}
70+
}
71+
72+
private Builder writeUsingColumnNames(Row row, Map<String, Object> fields) {
73+
Builder pointBuilder = Point.measurement(measurementName);
74+
5275
for (int i = 0; i < columnNames.length; i++) {
5376
String columnName = columnNames[i];
5477
if (columnName.equals("window_timestamp")) {
@@ -65,15 +88,31 @@ public void write(Row row, Context context) throws IOException, InterruptedExcep
6588
}
6689
}
6790
}
91+
return pointBuilder;
92+
}
6893

69-
addErrorMetricsAndThrow();
94+
private Builder writeUsingRowFieldNames(Row row, Map<String, Object> fields) {
95+
Builder pointBuilder = Point.measurement(measurementName);
7096

71-
try {
72-
influxDB.write(databaseName, retentionPolicy, pointBuilder.fields(fields).build());
73-
} catch (Exception exception) {
74-
errorReporter.reportFatalException(exception);
75-
throw exception;
97+
Set<String> fieldNames = row.getFieldNames(false);
98+
Preconditions.checkNotNull(fieldNames, "Error! in writeUsingRowFieldNames, getFieldNames() returned null");
99+
100+
for (String fieldName : fieldNames) {
101+
if (fieldName.equals("window_timestamp")) {
102+
LocalDateTime timeField = (LocalDateTime) row.getField(fieldName);
103+
ZonedDateTime zonedDateTime = timeField.atZone(ZoneOffset.UTC);
104+
pointBuilder.time(zonedDateTime.toInstant().toEpochMilli(), TimeUnit.MILLISECONDS);
105+
} else if (fieldName.startsWith("tag_")) {
106+
pointBuilder.tag(fieldName, String.valueOf(row.getField(fieldName)));
107+
} else if (fieldName.startsWith("label_")) {
108+
pointBuilder.tag(fieldName.substring("label_".length()), String.valueOf(row.getField(fieldName)));
109+
} else {
110+
if (!(Strings.isNullOrEmpty(fieldName) || row.getField(fieldName) == null)) {
111+
fields.put(fieldName, row.getField(fieldName));
112+
}
113+
}
76114
}
115+
return pointBuilder;
77116
}
78117

79118
@Override

dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ public class Constants {
166166
public static final int SINK_INFLUX_BATCH_SIZE_DEFAULT = 0;
167167
public static final String SINK_INFLUX_FLUSH_DURATION_MS_KEY = "SINK_INFLUX_FLUSH_DURATION_MS";
168168
public static final int SINK_INFLUX_FLUSH_DURATION_MS_DEFAULT = 0;
169+
public static final String SINK_INFLUX_USING_ROW_FIELD_NAMES_KEY = "SINK_INFLUX_WITH_ROW_NAMES_WRITER";
170+
public static final boolean SINK_INFLUX_USING_ROW_FIELD_NAMES_DEFAULT = false;
169171

170172
public static final String SOURCE_KAFKA_CONSUME_LARGE_MESSAGE_ENABLE_KEY = "SOURCE_KAFKA_CONSUME_LARGE_MESSAGE_ENABLE";
171173
public static final boolean SOURCE_KAFKA_CONSUME_LARGE_MESSAGE_ENABLE_DEFAULT = false;

dagger-core/src/test/java/com/gotocompany/dagger/core/sink/influx/InfluxDBWriterTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public void setUp() throws Exception {
7070
when(configuration.getString(Constants.SINK_INFLUX_DB_NAME_KEY, Constants.SINK_INFLUX_DB_NAME_DEFAULT)).thenReturn("dagger_test");
7171
when(configuration.getString(Constants.SINK_INFLUX_RETENTION_POLICY_KEY, Constants.SINK_INFLUX_RETENTION_POLICY_DEFAULT)).thenReturn("two_day_policy");
7272
when(configuration.getString(Constants.SINK_INFLUX_MEASUREMENT_NAME_KEY, Constants.SINK_INFLUX_MEASUREMENT_NAME_DEFAULT)).thenReturn("test_table");
73+
when(configuration.getString(Constants.SINK_INFLUX_USING_ROW_FIELD_NAMES_KEY, Constants.SINK_INFLUX_USING_ROW_FIELD_NAMES_KEY)).thenReturn("false");
7374
when(initContext.metricGroup()).thenReturn(metricGroup);
7475
when(metricGroup.addGroup(Constants.SINK_INFLUX_LATE_RECORDS_DROPPED_KEY)).thenReturn(metricGroup);
7576
when(metricGroup.addGroup(Constants.NONFATAL_EXCEPTION_METRIC_GROUP_KEY,

version.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.12.0
1+
0.12.1

0 commit comments

Comments
 (0)