77
88import com .google .common .base .Strings ;
99import com .gotocompany .dagger .common .configuration .Configuration ;
10+ import org .apache .flink .util .Preconditions ;
1011import org .influxdb .InfluxDB ;
1112import org .influxdb .dto .Point ;
1213import org .influxdb .dto .Point .Builder ;
2122import java .util .HashMap ;
2223import java .util .List ;
2324import java .util .Map ;
25+ import java .util .Set ;
2426import java .util .concurrent .TimeUnit ;
2527
2628public class InfluxDBWriter implements SinkWriter <Row , Void , Void > {
@@ -32,11 +34,13 @@ public class InfluxDBWriter implements SinkWriter<Row, Void, Void> {
3234 private String [] columnNames ;
3335 private ErrorHandler errorHandler ;
3436 private ErrorReporter errorReporter ;
37+ private boolean useRowFieldNames ;
3538
3639 public InfluxDBWriter (Configuration configuration , InfluxDB influxDB , String [] columnNames , ErrorHandler errorHandler , ErrorReporter errorReporter ) {
3740 databaseName = configuration .getString (Constants .SINK_INFLUX_DB_NAME_KEY , Constants .SINK_INFLUX_DB_NAME_DEFAULT );
3841 retentionPolicy = configuration .getString (Constants .SINK_INFLUX_RETENTION_POLICY_KEY , Constants .SINK_INFLUX_RETENTION_POLICY_DEFAULT );
3942 measurementName = configuration .getString (Constants .SINK_INFLUX_MEASUREMENT_NAME_KEY , Constants .SINK_INFLUX_MEASUREMENT_NAME_DEFAULT );
43+ useRowFieldNames = configuration .getBoolean (Constants .SINK_INFLUX_USING_ROW_FIELD_NAMES_KEY , Constants .SINK_INFLUX_USING_ROW_FIELD_NAMES_DEFAULT );
4044 this .influxDB = influxDB ;
4145 this .columnNames = columnNames ;
4246 this .errorHandler = errorHandler ;
@@ -47,8 +51,27 @@ public InfluxDBWriter(Configuration configuration, InfluxDB influxDB, String[] c
4751 public void write (Row row , Context context ) throws IOException , InterruptedException {
4852 LOGGER .info ("row to influx: " + row );
4953
50- Builder pointBuilder = Point . measurement ( measurementName ) ;
54+ Builder pointBuilder ;
5155 Map <String , Object > fields = new HashMap <>();
56+ if (useRowFieldNames ) {
57+ pointBuilder = writeUsingRowFieldNames (row , fields );
58+ } else {
59+ pointBuilder = writeUsingColumnNames (row , fields );
60+ }
61+
62+ addErrorMetricsAndThrow ();
63+
64+ try {
65+ influxDB .write (databaseName , retentionPolicy , pointBuilder .fields (fields ).build ());
66+ } catch (Exception exception ) {
67+ errorReporter .reportFatalException (exception );
68+ throw exception ;
69+ }
70+ }
71+
72+ private Builder writeUsingColumnNames (Row row , Map <String , Object > fields ) {
73+ Builder pointBuilder = Point .measurement (measurementName );
74+
5275 for (int i = 0 ; i < columnNames .length ; i ++) {
5376 String columnName = columnNames [i ];
5477 if (columnName .equals ("window_timestamp" )) {
@@ -65,15 +88,31 @@ public void write(Row row, Context context) throws IOException, InterruptedExcep
6588 }
6689 }
6790 }
91+ return pointBuilder ;
92+ }
6893
69- addErrorMetricsAndThrow ();
94+ private Builder writeUsingRowFieldNames (Row row , Map <String , Object > fields ) {
95+ Builder pointBuilder = Point .measurement (measurementName );
7096
71- try {
72- influxDB .write (databaseName , retentionPolicy , pointBuilder .fields (fields ).build ());
73- } catch (Exception exception ) {
74- errorReporter .reportFatalException (exception );
75- throw exception ;
97+ Set <String > fieldNames = row .getFieldNames (false );
98+ Preconditions .checkNotNull (fieldNames , "Error! in writeUsingRowFieldNames, getFieldNames() returned null" );
99+
100+ for (String fieldName : fieldNames ) {
101+ if (fieldName .equals ("window_timestamp" )) {
102+ LocalDateTime timeField = (LocalDateTime ) row .getField (fieldName );
103+ ZonedDateTime zonedDateTime = timeField .atZone (ZoneOffset .UTC );
104+ pointBuilder .time (zonedDateTime .toInstant ().toEpochMilli (), TimeUnit .MILLISECONDS );
105+ } else if (fieldName .startsWith ("tag_" )) {
106+ pointBuilder .tag (fieldName , String .valueOf (row .getField (fieldName )));
107+ } else if (fieldName .startsWith ("label_" )) {
108+ pointBuilder .tag (fieldName .substring ("label_" .length ()), String .valueOf (row .getField (fieldName )));
109+ } else {
110+ if (!(Strings .isNullOrEmpty (fieldName ) || row .getField (fieldName ) == null )) {
111+ fields .put (fieldName , row .getField (fieldName ));
112+ }
113+ }
76114 }
115+ return pointBuilder ;
77116 }
78117
79118 @ Override
0 commit comments