Skip to content

Commit 61d8d2a

Browse files
committed
[feat-35406][http] add http sink plugin
1 parent 5014c4e commit 61d8d2a

File tree

8 files changed

+526
-1
lines changed

8 files changed

+526
-1
lines changed

Diff for: http/http-sink/pom.xml

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.http</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>sql.sink.http</artifactId>
13+
<name>http-sink</name>
14+
15+
<build>
16+
<plugins>
17+
<plugin>
18+
<groupId>org.apache.maven.plugins</groupId>
19+
<artifactId>maven-shade-plugin</artifactId>
20+
<version>1.4</version>
21+
<executions>
22+
<execution>
23+
<phase>package</phase>
24+
<goals>
25+
<goal>shade</goal>
26+
</goals>
27+
<configuration>
28+
<createDependencyReducedPom>false</createDependencyReducedPom>
29+
<artifactSet>
30+
<excludes>
31+
<exclude>org.slf4j:*</exclude>
32+
<exclude>org.apache.hadoop:hadoop-common</exclude>
33+
<exclude>org.apache.hadoop:hadoop-auth</exclude>
34+
<exclude>org.apache.hadoop:hadoop-mapreduce-client-core</exclude>
35+
</excludes>
36+
</artifactSet>
37+
<filters>
38+
<filter>
39+
<artifact>*:*</artifact>
40+
<excludes>
41+
<exclude>META-INF/*.SF</exclude>
42+
<exclude>META-INF/*.DSA</exclude>
43+
<exclude>META-INF/*.RSA</exclude>
44+
</excludes>
45+
</filter>
46+
</filters>
47+
</configuration>
48+
</execution>
49+
</executions>
50+
</plugin>
51+
52+
<plugin>
53+
<artifactId>maven-antrun-plugin</artifactId>
54+
<version>1.2</version>
55+
<executions>
56+
<execution>
57+
<id>copy-resources</id>
58+
<!-- here the phase you need -->
59+
<phase>package</phase>
60+
<goals>
61+
<goal>run</goal>
62+
</goals>
63+
<configuration>
64+
<tasks>
65+
<copy todir="${basedir}/../../sqlplugins/httpsink">
66+
<fileset dir="target/">
67+
<include name="${project.artifactId}-${project.version}.jar" />
68+
</fileset>
69+
</copy>
70+
71+
<move file="${basedir}/../../sqlplugins/httpsink/${project.artifactId}-${project.version}.jar"
72+
tofile="${basedir}/../../sqlplugins/httpsink/${project.name}-${git.branch}.jar" />
73+
</tasks>
74+
</configuration>
75+
</execution>
76+
</executions>
77+
</plugin>
78+
</plugins>
79+
</build>
80+
81+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.http;
20+
21+
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
22+
import com.dtstack.flink.sql.sink.http.table.HttpTableInfo;
23+
import com.fasterxml.jackson.databind.ObjectMapper;
24+
import org.apache.commons.lang.StringUtils;
25+
import org.apache.flink.api.java.tuple.Tuple2;
26+
import org.apache.flink.configuration.Configuration;
27+
import org.apache.flink.types.Row;
28+
import org.apache.http.client.config.RequestConfig;
29+
import org.apache.http.client.methods.HttpPost;
30+
import org.apache.http.entity.StringEntity;
31+
import org.apache.http.impl.client.CloseableHttpClient;
32+
import org.apache.http.impl.client.HttpClients;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
import java.io.IOException;
37+
import java.util.HashMap;
38+
import java.util.Map;
39+
import java.util.concurrent.TimeUnit;
40+
41+
/**
42+
* @author: chuixue
43+
* @create: 2021-03-03 10:41
44+
* @description:
45+
**/
46+
public class HttpOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
47+
private static final Logger LOG = LoggerFactory.getLogger(HttpOutputFormat.class);
48+
49+
private ObjectMapper mapper = new ObjectMapper();
50+
/**
51+
* Default connection timeout when connecting to the server socket (infinite).
52+
*/
53+
private HttpTableInfo httpTableInfo;
54+
55+
private transient Map<String, Object> sendMessage;
56+
57+
private HttpOutputFormat() {
58+
}
59+
60+
public static HttpOutputFormatBuilder buildHttpOutputFormat() {
61+
return new HttpOutputFormatBuilder();
62+
}
63+
64+
@Override
65+
public void configure(Configuration parameters) {
66+
67+
}
68+
69+
@Override
70+
public void open(int taskNumber, int numTasks) {
71+
initMetric();
72+
sendMessage = new HashMap<>();
73+
}
74+
75+
@Override
76+
public void writeRecord(Tuple2 record) {
77+
String value = null;
78+
CloseableHttpClient client = null;
79+
try {
80+
client = HttpClients.createDefault();
81+
RequestConfig requestConfig = RequestConfig.custom()
82+
.setSocketTimeout(10000)
83+
.setConnectTimeout(10000)
84+
.setConnectionRequestTimeout(10000)
85+
.build();
86+
HttpPost post = new HttpPost(this.httpTableInfo.getUrl());
87+
post.setConfig(requestConfig);
88+
// read data
89+
Tuple2<Boolean, Row> tupleTrans = record;
90+
if (!tupleTrans.f0) {
91+
return;
92+
}
93+
// not empty ,need send flag、tablename、fieldinfo、value.
94+
if (StringUtils.isNotEmpty(httpTableInfo.getFlag())) {
95+
sendMessage.put("flag", httpTableInfo.getFlag());
96+
sendMessage.put("tableName", httpTableInfo.getName());
97+
}
98+
// add field
99+
String[] fields = httpTableInfo.getFields();
100+
Row row = tupleTrans.getField(1);
101+
for (int i = 0; i < fields.length; i++) {
102+
sendMessage.put(fields[i], row.getField(i));
103+
}
104+
// send data
105+
value = mapper.writeValueAsString(sendMessage);
106+
sendMsg(value, client, post);
107+
// metrics
108+
outRecords.inc();
109+
if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0) {
110+
LOG.info(value);
111+
}
112+
TimeUnit.MILLISECONDS.sleep(httpTableInfo.getDelay());
113+
} catch (Exception e) {
114+
if (outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0 || LOG.isDebugEnabled()) {
115+
LOG.error("record insert failed ..{}", value);
116+
LOG.error("", e);
117+
}
118+
outDirtyRecords.inc();
119+
} finally {
120+
sendMessage.clear();
121+
if (client == null) {
122+
try {
123+
client.close();
124+
} catch (IOException e) {
125+
LOG.error(e.getMessage());
126+
}
127+
}
128+
}
129+
}
130+
131+
/**
132+
* send data
133+
*
134+
* @param value
135+
* @param client
136+
* @param post
137+
* @throws Exception
138+
*/
139+
private void sendMsg(String value, CloseableHttpClient client, HttpPost post) throws Exception {
140+
StringEntity stringEntity = new StringEntity(value);
141+
stringEntity.setContentEncoding("UTF-8");
142+
stringEntity.setContentType("application/json");
143+
post.setEntity(stringEntity);
144+
client.execute(post);
145+
}
146+
147+
@Override
148+
public void close() {
149+
}
150+
151+
public static class HttpOutputFormatBuilder {
152+
private final HttpOutputFormat httpOutputFormat;
153+
154+
protected HttpOutputFormatBuilder() {
155+
this.httpOutputFormat = new HttpOutputFormat();
156+
}
157+
158+
public HttpOutputFormatBuilder setHttpTableInfo(HttpTableInfo httpTableInfo) {
159+
httpOutputFormat.httpTableInfo = httpTableInfo;
160+
return this;
161+
}
162+
163+
public HttpOutputFormat finish() {
164+
if (httpOutputFormat.httpTableInfo.getUrl() == null) {
165+
throw new IllegalArgumentException("No url supplied.");
166+
}
167+
return httpOutputFormat;
168+
}
169+
}
170+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.http;
20+
21+
import com.dtstack.flink.sql.sink.IStreamSinkGener;
22+
import com.dtstack.flink.sql.sink.http.table.HttpTableInfo;
23+
import com.dtstack.flink.sql.table.AbstractTargetTableInfo;
24+
import org.apache.flink.api.common.typeinfo.TypeInformation;
25+
import org.apache.flink.api.java.tuple.Tuple2;
26+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
27+
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
28+
import org.apache.flink.streaming.api.datastream.DataStream;
29+
import org.apache.flink.streaming.api.datastream.DataStreamSink;
30+
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
31+
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
32+
import org.apache.flink.table.sinks.RetractStreamTableSink;
33+
import org.apache.flink.table.sinks.TableSink;
34+
import org.apache.flink.types.Row;
35+
36+
/**
37+
* @author: chuixue
38+
* @create: 2021-03-03 10:40
39+
* @description:
40+
**/
41+
public class HttpSink implements RetractStreamTableSink<Row>, IStreamSinkGener<HttpSink> {
42+
protected String[] fieldNames;
43+
44+
protected TypeInformation<?>[] fieldTypes;
45+
46+
protected HttpTableInfo httpTableInfo;
47+
48+
public HttpSink() {
49+
}
50+
51+
@Override
52+
public HttpSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
53+
this.httpTableInfo = (HttpTableInfo) targetTableInfo;
54+
return this;
55+
}
56+
57+
@Override
58+
public TypeInformation<Row> getRecordType() {
59+
return new RowTypeInfo(fieldTypes, fieldNames);
60+
}
61+
62+
@Override
63+
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
64+
consumeDataStream(dataStream);
65+
}
66+
67+
@Override
68+
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
69+
HttpOutputFormat.HttpOutputFormatBuilder builder = HttpOutputFormat.buildHttpOutputFormat();
70+
HttpOutputFormat httpOutputFormat = builder.setHttpTableInfo(this.httpTableInfo)
71+
.finish();
72+
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(httpOutputFormat);
73+
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction)
74+
.setParallelism(this.httpTableInfo.getParallelism())
75+
.name(this.httpTableInfo.getName());
76+
return dataStreamSink;
77+
}
78+
79+
@Override
80+
public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
81+
return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), getRecordType());
82+
}
83+
84+
@Override
85+
public String[] getFieldNames() {
86+
return fieldNames;
87+
}
88+
89+
@Override
90+
public TypeInformation<?>[] getFieldTypes() {
91+
return fieldTypes;
92+
}
93+
94+
@Override
95+
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
96+
this.fieldNames = fieldNames;
97+
this.fieldTypes = fieldTypes;
98+
return this;
99+
}
100+
}

0 commit comments

Comments
 (0)