Skip to content

Commit db8dade

Browse files
committed
[FLINK-35825][hive] HiveTableSource supports report statistics for text file
1 parent b95c118 commit db8dade

File tree

3 files changed

+212
-3
lines changed

3 files changed

+212
-3
lines changed

Diff for: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.flink.connectors.hive.read.HivePartitionFetcherContextBase;
3535
import org.apache.flink.connectors.hive.read.HiveSourceSplit;
3636
import org.apache.flink.connectors.hive.util.HivePartitionUtils;
37+
import org.apache.flink.connectors.hive.util.TextFormatStatisticsReportUtil;
3738
import org.apache.flink.core.fs.Path;
3839
import org.apache.flink.formats.parquet.utils.ParquetFormatStatisticsReportUtil;
3940
import org.apache.flink.orc.util.OrcFormatStatisticsReportUtil;
@@ -379,7 +380,7 @@ private TableStats getMapRedInputFormatStatistics(
379380
Preconditions.checkArgument(
380381
statisticsThreadNum >= 1,
381382
TABLE_EXEC_HIVE_READ_STATISTICS_THREAD_NUM.key() + " cannot be less than 1");
382-
// Now we only support Parquet, Orc formats.
383+
// Now we only support Parquet, Orc and Text formats.
383384
if (serializationLib.contains("parquet")) {
384385
return ParquetFormatStatisticsReportUtil.getTableStatistics(
385386
files,
@@ -390,10 +391,13 @@ private TableStats getMapRedInputFormatStatistics(
390391
} else if (serializationLib.contains("orc")) {
391392
return OrcFormatStatisticsReportUtil.getTableStatistics(
392393
files, producedDataType, jobConf, statisticsThreadNum);
394+
} else if (serializationLib.contains("simple")) {
395+
return TextFormatStatisticsReportUtil.estimateTableStatistics(
396+
files, producedDataType, jobConf);
393397
} else {
394-
// Now, only support Orc and Parquet Formats.
398+
// Now, only support Orc and Parquet and Text Formats.
395399
LOG.info(
396-
"Now for hive table source, reporting statistics only support Orc and Parquet formats.");
400+
"Now for hive table source, reporting statistics only support Orc and Parquet and Text formats.");
397401
return TableStats.UNKNOWN;
398402
}
399403
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connectors.hive.util;
20+
21+
import org.apache.flink.core.fs.Path;
22+
import org.apache.flink.table.plan.stats.TableStats;
23+
import org.apache.flink.table.types.DataType;
24+
import org.apache.flink.table.types.logical.ArrayType;
25+
import org.apache.flink.table.types.logical.LogicalType;
26+
import org.apache.flink.table.types.logical.LogicalTypeRoot;
27+
import org.apache.flink.table.types.logical.MapType;
28+
import org.apache.flink.table.types.logical.RowType;
29+
30+
import org.apache.hadoop.conf.Configuration;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
import java.io.IOException;
35+
import java.util.List;
36+
37+
/** Utils for text format statistics report. */
38+
public class TextFormatStatisticsReportUtil {
39+
private static final Logger LOG = LoggerFactory.getLogger(TextFormatStatisticsReportUtil.class);
40+
41+
public static TableStats estimateTableStatistics(
42+
List<Path> files, DataType producedDataType, Configuration hadoopConfig) {
43+
try {
44+
long rowCount;
45+
RowType rowType = (RowType) producedDataType.getLogicalType();
46+
double totalFileSize = 0.0;
47+
for (Path file : files) {
48+
totalFileSize += getTextFileSize(hadoopConfig, file);
49+
}
50+
rowCount = (long) (totalFileSize / estimateRowSize(rowType));
51+
return new TableStats(rowCount);
52+
} catch (Exception e) {
53+
LOG.warn("Estimating statistics failed for text format: {}", e.getMessage());
54+
return TableStats.UNKNOWN;
55+
}
56+
}
57+
58+
private static int estimateRowSize(RowType rowType) {
59+
int rowSize = 0;
60+
for (int index = 0; index < rowType.getFieldCount(); ++index) {
61+
LogicalType logicalType = rowType.getTypeAt(index);
62+
rowSize += getAverageTypeValueSize(logicalType);
63+
}
64+
return rowSize;
65+
}
66+
67+
/** Estimation rules based on Hive field types. */
68+
private static double getAverageTypeValueSize(LogicalType logicalType) {
69+
LogicalTypeRoot typeRoot = logicalType.getTypeRoot();
70+
switch (typeRoot) {
71+
case CHAR:
72+
case TINYINT:
73+
return 1;
74+
case VARCHAR:
75+
case DATE:
76+
case TIMESTAMP_WITHOUT_TIME_ZONE:
77+
case DECIMAL:
78+
return 12;
79+
case SMALLINT:
80+
return 2;
81+
case INTEGER:
82+
case FLOAT:
83+
case INTERVAL_DAY_TIME:
84+
return 4;
85+
case BIGINT:
86+
case DOUBLE:
87+
case INTERVAL_YEAR_MONTH:
88+
return 8;
89+
case VARBINARY:
90+
return 16;
91+
case ARRAY:
92+
return getAverageTypeValueSize(((ArrayType) logicalType).getElementType()) * 16;
93+
case MAP:
94+
return (getAverageTypeValueSize(((MapType) logicalType).getKeyType())
95+
+ getAverageTypeValueSize(((MapType) logicalType).getValueType()))
96+
* 16;
97+
case ROW:
98+
return estimateRowSize((RowType) logicalType);
99+
default:
100+
// For unknown data types, we use a smaller data size for estimation.
101+
return 8;
102+
}
103+
}
104+
105+
private static long getTextFileSize(Configuration hadoopConfig, Path file) throws IOException {
106+
org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(file.toUri());
107+
return hadoopPath.getFileSystem(hadoopConfig).getContentSummary(hadoopPath).getLength();
108+
}
109+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connectors.hive.util;
20+
21+
import org.apache.flink.core.fs.Path;
22+
import org.apache.flink.table.plan.stats.TableStats;
23+
import org.apache.flink.table.types.AtomicDataType;
24+
import org.apache.flink.table.types.DataType;
25+
import org.apache.flink.table.types.logical.RowType;
26+
import org.apache.flink.table.types.logical.VarCharType;
27+
import org.apache.flink.testutils.junit.utils.TempDirUtils;
28+
29+
import org.apache.hadoop.conf.Configuration;
30+
import org.junit.jupiter.api.BeforeEach;
31+
import org.junit.jupiter.api.Test;
32+
import org.junit.jupiter.api.io.TempDir;
33+
34+
import java.io.File;
35+
import java.io.IOException;
36+
import java.net.URI;
37+
import java.nio.file.Files;
38+
import java.util.ArrayList;
39+
import java.util.List;
40+
41+
import static java.nio.file.StandardOpenOption.APPEND;
42+
import static org.junit.jupiter.api.Assertions.assertEquals;
43+
44+
/** Tests for {@link TextFormatStatisticsReportUtil}. */
45+
class TextFormatStatisticsReportUtilTest {
46+
private Configuration hadoopConfig;
47+
private DataType producedDataType;
48+
49+
@TempDir private java.nio.file.Path temporaryFolder;
50+
51+
@BeforeEach
52+
void setUp() {
53+
hadoopConfig = new Configuration();
54+
// Create a sample producedDataType with a RowType
55+
List<RowType.RowField> fields = new ArrayList<>();
56+
fields.add(new RowType.RowField("field1", new VarCharType()));
57+
fields.add(new RowType.RowField("field2", new VarCharType()));
58+
fields.add(new RowType.RowField("field3", new VarCharType()));
59+
producedDataType = new AtomicDataType(new RowType(fields));
60+
}
61+
62+
@Test
63+
void testEstimateTableStatisticsCase1() throws IOException {
64+
// Create sample files for testing
65+
File tempFile = TempDirUtils.newFile(temporaryFolder, "flink_test_file.txt");
66+
67+
List<Path> files = new ArrayList<>();
68+
files.add(new Path(tempFile.toURI()));
69+
70+
String sampleString = "sample data";
71+
Files.write(tempFile.toPath(), sampleString.getBytes());
72+
TableStats stats =
73+
TextFormatStatisticsReportUtil.estimateTableStatistics(
74+
files, producedDataType, hadoopConfig);
75+
assertEquals(0, stats.getRowCount());
76+
for (int i = 0; i < 10; ++i) {
77+
Files.write(tempFile.toPath(), sampleString.getBytes(), APPEND);
78+
}
79+
stats =
80+
TextFormatStatisticsReportUtil.estimateTableStatistics(
81+
files, producedDataType, hadoopConfig);
82+
assertEquals(3, stats.getRowCount());
83+
}
84+
85+
@Test
86+
void testEstimateFailedToUnknown() {
87+
List<Path> files = new ArrayList<>();
88+
files.add(new Path(URI.create("file:///non_existent_file.txt")));
89+
// Estimate table statistics
90+
TableStats stats =
91+
TextFormatStatisticsReportUtil.estimateTableStatistics(
92+
files, producedDataType, hadoopConfig);
93+
// Verify that it returns TableStats.UNKNOWN on failure
94+
assertEquals(TableStats.UNKNOWN, stats);
95+
}
96+
}

0 commit comments

Comments
 (0)