Skip to content

Commit c6c0dbb

Browse files
113 output various formats (#114)
* feat: Implement arrow output * feat: Implement arrow output * feat: Add csv format --------- Co-authored-by: Romuald Rousseau <romuald.rousseau@servier.com>
1 parent 80815d6 commit c6c0dbb

File tree

10 files changed

+230
-34
lines changed

10 files changed

+230
-34
lines changed

archery-dbf/src/main/java/com/github/romualdrousseau/archery/loader/dbf/DbfDocument.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public void autoRecipe(final BaseSheet sheet) {
8686
private boolean openWithEncoding(final File dbfFile, final String encoding, final String sheetName) {
8787
try {
8888
final var reader = new DBFReader(new FileInputStream(dbfFile), Charset.forName(encoding));
89-
this.sheet = new DbfSheet(sheetName, reader);
89+
this.sheet = new DbfSheet(this, sheetName, reader);
9090
return true;
9191
} catch (final IOException | UnsupportedCharsetException x) {
9292
return false;

archery-dbf/src/main/java/com/github/romualdrousseau/archery/loader/dbf/DbfSheet.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.text.SimpleDateFormat;
77
import java.util.Date;
88

9+
import com.github.romualdrousseau.archery.Document;
910
import com.github.romualdrousseau.archery.base.PatcheableSheetStore;
1011
import com.github.romualdrousseau.archery.commons.collections.DataFrame;
1112
import com.github.romualdrousseau.archery.commons.collections.DataFrameWriter;
@@ -17,14 +18,17 @@
1718
class DbfSheet extends PatcheableSheetStore implements Closeable {
1819

1920
private static final int BATCH_SIZE = 50000;
21+
private static final SimpleDateFormat DATE_FORMATTER = new SimpleDateFormat("yyyy-MM-dd");
22+
private static final SimpleDateFormat DATETIME_FORMATTER = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
2023

21-
private final SimpleDateFormat DATE_FORMATTER = new SimpleDateFormat("yyyy-MM-dd");
24+
private final DbfDocument document;
2225
private final String name;
2326

2427
private DBFReader reader;
2528
private DataFrame rows;
2629

27-
public DbfSheet(final String name, final DBFReader reader) {
30+
public DbfSheet(DbfDocument document, final String name, final DBFReader reader) {
31+
this.document = document;
2832
this.name = name;
2933
this.reader = reader;
3034
this.rows = null;
@@ -121,7 +125,11 @@ private String convertToString(final Object v) {
121125
return v.toString();
122126
}
123127
} else if (v instanceof Date) {
124-
return DATE_FORMATTER.format((Date) v);
128+
if (this.document.getHints().contains(Document.Hint.INTELLI_TIME)) {
129+
return DATETIME_FORMATTER.format((Date) v);
130+
} else {
131+
return DATE_FORMATTER.format((Date) v);
132+
}
125133
} else {
126134
return v.toString();
127135
}

archery/src/main/java/com/github/romualdrousseau/archery/Table.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.github.romualdrousseau.archery;
22

33
import java.io.Closeable;
4+
import java.io.IOException;
45

56
public interface Table extends Closeable {
67

@@ -28,9 +29,7 @@ public interface Table extends Closeable {
2829

2930
void updateHeaderTags();
3031

31-
void to_arrow(final String filePath);
32+
void to_arrow(final String outputfilePath) throws IOException;
3233

33-
void to_csv(final String filePath);
34-
35-
void to_json(final String filePath);
34+
void to_csv(final String outputFilePath) throws IOException;
3635
}

archery/src/main/java/com/github/romualdrousseau/archery/TagClassifier.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import java.util.List;
44

5+
import com.github.romualdrousseau.archery.commons.preprocessing.Text;
6+
57
public interface TagClassifier extends AutoCloseable {
68

79
enum TagStyle {
@@ -25,4 +27,6 @@ enum TagStyle {
2527
TagClassifier setLexicon(final List<String> lexion);
2628

2729
String ensureTagStyle(final String text);
30+
31+
Text.ITokenizer getTagTokenizer();
2832
}

archery/src/main/java/com/github/romualdrousseau/archery/base/BaseTable.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import com.github.romualdrousseau.archery.Header;
99
import com.github.romualdrousseau.archery.Row;
1010
import com.github.romualdrousseau.archery.Table;
11+
import com.github.romualdrousseau.archery.writer.ArrowWriter;
12+
import com.github.romualdrousseau.archery.writer.CsvWriter;
1113

1214
public class BaseTable implements Table, Visitable {
1315

@@ -132,18 +134,13 @@ public void updateHeaderTags() {
132134
}
133135

134136
@Override
135-
public void to_arrow(final String filePath) {
136-
throw new UnsupportedOperationException();
137+
public void to_arrow(final String filePath) throws IOException {
138+
new ArrowWriter(this).write(filePath);
137139
}
138140

139141
@Override
140-
public void to_csv(final String filePath) {
141-
throw new UnsupportedOperationException();
142-
}
143-
144-
@Override
145-
public void to_json(final String filePath) {
146-
throw new UnsupportedOperationException();
142+
public void to_csv(final String filePath) throws IOException {
143+
new CsvWriter(this).write(filePath);
147144
}
148145

149146
public int getFirstColumn() {

archery/src/main/java/com/github/romualdrousseau/archery/classifier/SimpleTagClassifier.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.github.romualdrousseau.archery.Model;
88
import com.github.romualdrousseau.archery.Table;
99
import com.github.romualdrousseau.archery.TagClassifier;
10+
import com.github.romualdrousseau.archery.commons.preprocessing.Text.ITokenizer;
1011
import com.github.romualdrousseau.archery.commons.preprocessing.tokenizer.ShingleTokenizer;
1112
import com.github.romualdrousseau.archery.commons.strings.StringUtils;
1213

@@ -102,6 +103,11 @@ public String predict(final Table table, final Header header) {
102103
}
103104
}
104105

106+
@Override
107+
public ITokenizer getTagTokenizer() {
108+
return this.tagTokenizer;
109+
}
110+
105111
private final Pattern pattern = Pattern.compile(" \\(\\$(.*)\\)$");
106112
private final ShingleTokenizer tagTokenizer;
107113

archery/src/main/java/com/github/romualdrousseau/archery/intelli/IntelliTable.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -67,21 +67,6 @@ public BaseRow getRowAt(final int rowIndex) {
6767
return new IntelliRow(this, rowIndex, this.rows.getRow(rowIndex));
6868
}
6969

70-
@Override
71-
public void to_arrow(final String filePath) {
72-
throw new UnsupportedOperationException();
73-
}
74-
75-
@Override
76-
public void to_csv(final String filePath) {
77-
throw new UnsupportedOperationException();
78-
}
79-
80-
@Override
81-
public void to_json(final String filePath) {
82-
throw new UnsupportedOperationException();
83-
}
84-
8570
private HashSet<String> collectPivotEntryTypes(final BaseTableGraph root) {
8671
final var pivotEntryTypes = new HashSet<String>();
8772
root.parse(e -> e.getTable().headers()
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package com.github.romualdrousseau.archery.writer;
2+
3+
import java.io.FileOutputStream;
4+
import java.io.IOException;
5+
import java.nio.charset.StandardCharsets;
6+
import java.util.ArrayList;
7+
import java.util.List;
8+
9+
import org.apache.arrow.memory.RootAllocator;
10+
import org.apache.arrow.vector.FieldVector;
11+
import org.apache.arrow.vector.VarCharVector;
12+
import org.apache.arrow.vector.VectorSchemaRoot;
13+
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
14+
import org.apache.arrow.vector.types.pojo.ArrowType;
15+
import org.apache.arrow.vector.types.pojo.Field;
16+
import org.apache.arrow.vector.types.pojo.FieldType;
17+
import org.apache.arrow.vector.types.pojo.Schema;
18+
19+
import com.github.romualdrousseau.archery.Header;
20+
import com.github.romualdrousseau.archery.base.BaseTable;
21+
import com.github.romualdrousseau.archery.commons.preprocessing.tokenizer.ShingleTokenizer;
22+
import com.github.romualdrousseau.archery.commons.strings.StringUtils;
23+
24+
public class ArrowWriter {
25+
26+
private static final int BATCH_SIZE = 65536;
27+
28+
private final BaseTable table;
29+
30+
public ArrowWriter(final BaseTable table) {
31+
this.table = table;
32+
}
33+
34+
public void write(final String outputFilePath) throws IOException {
35+
try (
36+
final var allocator = new RootAllocator();
37+
final var out = new FileOutputStream(outputFilePath)) {
38+
39+
// 1) Build Arrow schema from your framework schema
40+
41+
final var arrowSchema = this.buildArrowSchema(table.headers());
42+
43+
// 2) Create vectors for each field
44+
45+
final var fields = arrowSchema.getFields();
46+
final var vectors = new ArrayList<FieldVector>();
47+
for (final var field : fields) {
48+
final var vector = field.createVector(allocator);
49+
vector.setInitialCapacity(BATCH_SIZE);
50+
vector.allocateNew();
51+
vectors.add(vector);
52+
}
53+
54+
try (
55+
final var root = new VectorSchemaRoot(fields, vectors, 0);
56+
final var writer = new ArrowStreamWriter(root, null, out)) {
57+
58+
writer.start();
59+
60+
// 3) Stream rows from your framework and write in batches
61+
62+
int currentIndex = 0;
63+
final var iterator = table.rows().iterator();
64+
while (iterator.hasNext()) {
65+
final var row = iterator.next();
66+
67+
// Set values in each vector
68+
for (int colIndex = 0; colIndex < fields.size(); colIndex++) {
69+
final var vector = vectors.get(colIndex);
70+
final var value = row.getCellAt(colIndex).getValue();
71+
this.setValue(vector, currentIndex, value);
72+
}
73+
74+
currentIndex++;
75+
76+
// If batch full, flush to writer
77+
if (currentIndex == BATCH_SIZE) {
78+
this.flushBatch(root, vectors, currentIndex, writer);
79+
currentIndex = 0;
80+
}
81+
}
82+
83+
// flush remaining rows (if not multiple of BATCH_SIZE)
84+
if (currentIndex > 0) {
85+
this.flushBatch(root, vectors, currentIndex, writer);
86+
}
87+
88+
writer.end();
89+
90+
} finally {
91+
for (final var v : vectors) {
92+
v.close();
93+
}
94+
}
95+
}
96+
}
97+
98+
private Schema buildArrowSchema(final Iterable<Header> headers) {
99+
final var arrowFields = new ArrayList<Field>();
100+
101+
final var tagTokenizer = this.table.getSheet().getDocument().getTagClassifier().getTagTokenizer();
102+
((ShingleTokenizer) tagTokenizer).disableLemmatization();
103+
104+
for (final var header : headers) {
105+
final var name = header.hasTag() ? header.getTag().getValue() : header.getName();
106+
final var field = new Field(
107+
StringUtils.toSnake(name, tagTokenizer),
108+
FieldType.nullable(ArrowType.Utf8.INSTANCE),
109+
null);
110+
arrowFields.add(field);
111+
}
112+
113+
return new Schema(arrowFields);
114+
}
115+
116+
private void setValue(final FieldVector vector, final int index, final String value) {
117+
if (value == null) {
118+
vector.setNull(index);
119+
return;
120+
}
121+
final var bytes = value.toString().getBytes(StandardCharsets.UTF_8);
122+
((VarCharVector) vector).setSafe(index, bytes);
123+
}
124+
125+
private void flushBatch(final VectorSchemaRoot root, final List<FieldVector> vectors, final int rowCount,
126+
final ArrowStreamWriter writer) throws IOException {
127+
128+
root.setRowCount(rowCount);
129+
130+
for (final FieldVector v : vectors) {
131+
v.setValueCount(rowCount);
132+
}
133+
134+
writer.writeBatch();
135+
136+
// Reuse vectors for the next batch: reset but keep allocated memory
137+
for (final FieldVector v : vectors) {
138+
v.reset();
139+
// v.setInitialCapacity(BATCH_SIZE);
140+
}
141+
}
142+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.github.romualdrousseau.archery.writer;
2+
3+
import java.io.BufferedWriter;
4+
import java.io.FileWriter;
5+
import java.io.IOException;
6+
import java.util.ArrayList;
7+
import java.util.stream.Collectors;
8+
9+
import com.github.romualdrousseau.archery.Cell;
10+
import com.github.romualdrousseau.archery.Header;
11+
import com.github.romualdrousseau.archery.base.BaseTable;
12+
13+
public class CsvWriter {
14+
15+
private final BaseTable table;
16+
17+
public CsvWriter(final BaseTable table) {
18+
this.table = table;
19+
}
20+
21+
public void write(final String outputFilePath) throws IOException {
22+
try (var writer = new BufferedWriter(new FileWriter(outputFilePath))) {
23+
24+
final var headers = new ArrayList<Header>();
25+
this.table.headers().forEach(headers::add);
26+
final var headerNames = headers.stream()
27+
.map(h -> h.hasTag() ? h.getTag().getValue() : h.getName())
28+
// .map(this::escapeCsv)
29+
.collect(Collectors.joining(","));
30+
writer.write(headerNames);
31+
writer.newLine();
32+
33+
for (final var row : this.table.rows()) {
34+
final var cells = new ArrayList<Cell>();
35+
row.cells().forEach(cells::add);
36+
final var values = cells.stream()
37+
.map(c -> c.getValue())
38+
.map(this::escapeCsv)
39+
.collect(Collectors.joining(","));
40+
writer.write(values);
41+
writer.newLine();
42+
}
43+
}
44+
}
45+
46+
private String escapeCsv(final String data) {
47+
if (data == null) {
48+
return "";
49+
}
50+
if (data.contains(",") || data.contains("\"") || data.contains("\n")) {
51+
return "\"" + data.replace("\"", "\"\"") + "\"";
52+
}
53+
return data;
54+
}
55+
}

justfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ update-deps:
7979
mvn -DcreateChecksum=true -DprocessDependencyManagement=false versions:display-dependency-updates
8080

8181
# Copy dependencies
82-
copy-deps outdir='$PWD/target/jars':
83-
mvn dependency:copy-dependencies -DoutputDirectory={{outdir}}
82+
copy-deps outdir='$PWD/target/jars': install
83+
mvn -DoutputDirectory={{outdir}} dependency:copy-dependencies
8484

8585
@copy-pdfs:
8686
cp ./archery-documents/whitepapers/Semi-structured\ Document\ Feature\ Extraction/misc/main.pdf ./archery-documents/docs/resources/feature-extraction.pdf

0 commit comments

Comments
 (0)