5
5
import java .util .List ;
6
6
import java .util .Map ;
7
7
import java .util .Objects ;
8
- import java .util .function .Function ;
8
+ import java .util .function .UnaryOperator ;
9
9
import java .util .stream .Stream ;
10
10
import javax .annotation .Nullable ;
11
11
import org .apache .commons .collections .CollectionUtils ;
@@ -22,7 +22,7 @@ static ConnectorInfo extract(String className,
22
22
ConnectorTypeDTO type ,
23
23
Map <String , Object > config ,
24
24
List <String > topicsFromApi , // can be empty for old Connect API versions
25
- Function < String , String > topicOddrnBuilder ) {
25
+ UnaryOperator < String > topicOddrnBuilder ) {
26
26
return switch (className ) {
27
27
case "org.apache.kafka.connect.file.FileStreamSinkConnector" ,
28
28
"org.apache.kafka.connect.file.FileStreamSourceConnector" ,
@@ -43,7 +43,7 @@ static ConnectorInfo extract(String className,
43
43
private static ConnectorInfo extractFileIoConnector (ConnectorTypeDTO type ,
44
44
List <String > topics ,
45
45
Map <String , Object > config ,
46
- Function < String , String > topicOddrnBuilder ) {
46
+ UnaryOperator < String > topicOddrnBuilder ) {
47
47
return new ConnectorInfo (
48
48
extractInputs (type , topics , config , topicOddrnBuilder ),
49
49
extractOutputs (type , topics , config , topicOddrnBuilder )
@@ -53,7 +53,7 @@ private static ConnectorInfo extractFileIoConnector(ConnectorTypeDTO type,
53
53
private static ConnectorInfo extractJdbcSink (ConnectorTypeDTO type ,
54
54
List <String > topics ,
55
55
Map <String , Object > config ,
56
- Function < String , String > topicOddrnBuilder ) {
56
+ UnaryOperator < String > topicOddrnBuilder ) {
57
57
String tableNameFormat = (String ) config .getOrDefault ("table.name.format" , "${topic}" );
58
58
List <String > targetTables = extractTopicNamesBestEffort (topics , config )
59
59
.map (topic -> tableNameFormat .replace ("${kafka}" , topic ))
@@ -106,7 +106,7 @@ private static ConnectorInfo extractDebeziumMysql(Map<String, Object> config) {
106
106
private static ConnectorInfo extractS3Sink (ConnectorTypeDTO type ,
107
107
List <String > topics ,
108
108
Map <String , Object > config ,
109
- Function < String , String > topicOrrdnBuilder ) {
109
+ UnaryOperator < String > topicOrrdnBuilder ) {
110
110
String bucketName = (String ) config .get ("s3.bucket.name" );
111
111
String topicsDir = (String ) config .getOrDefault ("topics.dir" , "topics" );
112
112
String directoryDelim = (String ) config .getOrDefault ("directory.delim" , "/" );
@@ -122,7 +122,7 @@ private static ConnectorInfo extractS3Sink(ConnectorTypeDTO type,
122
122
private static List <String > extractInputs (ConnectorTypeDTO type ,
123
123
List <String > topicsFromApi ,
124
124
Map <String , Object > config ,
125
- Function < String , String > topicOrrdnBuilder ) {
125
+ UnaryOperator < String > topicOrrdnBuilder ) {
126
126
return type == ConnectorTypeDTO .SINK
127
127
? extractTopicsOddrns (config , topicsFromApi , topicOrrdnBuilder )
128
128
: List .of ();
@@ -131,7 +131,7 @@ private static List<String> extractInputs(ConnectorTypeDTO type,
131
131
private static List <String > extractOutputs (ConnectorTypeDTO type ,
132
132
List <String > topicsFromApi ,
133
133
Map <String , Object > config ,
134
- Function < String , String > topicOrrdnBuilder ) {
134
+ UnaryOperator < String > topicOrrdnBuilder ) {
135
135
return type == ConnectorTypeDTO .SOURCE
136
136
? extractTopicsOddrns (config , topicsFromApi , topicOrrdnBuilder )
137
137
: List .of ();
@@ -158,7 +158,7 @@ private static Stream<String> extractTopicNamesBestEffort(
158
158
159
159
private static List <String > extractTopicsOddrns (Map <String , Object > config ,
160
160
List <String > topicsFromApi ,
161
- Function < String , String > topicOrrdnBuilder ) {
161
+ UnaryOperator < String > topicOrrdnBuilder ) {
162
162
return extractTopicNamesBestEffort (topicsFromApi , config )
163
163
.map (topicOrrdnBuilder )
164
164
.toList ();
0 commit comments