Skip to content

Commit b4d292a

Browse files
authored
Merge pull request #2 from navinrathore/pipeUtilProps
Pipe's props is initialized with blank hashmap. makes getProps() never return null
2 parents 9ea14e4 + 4619542 commit b4d292a

File tree

7 files changed

+12
-9
lines changed

7 files changed

+12
-9
lines changed

client/src/main/java/zingg/client/pipe/Pipe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class Pipe implements Serializable{
3232
String name;
3333
Format format;
3434
String preprocessors;
35-
Map<String, String> props;
35+
Map<String, String> props = new HashMap<String, String>();
3636
@JsonSerialize(using = CustomSchemaSerializer.class)
3737
StructType schema = null;
3838
Map<String, String> sparkProps;

core/src/main/java/zingg/Labeller.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ protected void printMarkedRecordsStat() {
199199
System.out.println(msg);
200200
}
201201

202-
protected void writeLabelledOutput(Dataset<Row> records) {
202+
protected void writeLabelledOutput(Dataset<Row> records) throws ZinggClientException {
203203
if (records == null) {
204204
LOG.warn("No records to be labelled.");
205205
return;

core/src/main/java/zingg/Linker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ protected Dataset<Row> selectColsFromBlocked(Dataset<Row> blocked) {
5050
return blocked;
5151
}
5252

53-
public void writeOutput(Dataset<Row> sampleOrginal, Dataset<Row> dupes) {
53+
public void writeOutput(Dataset<Row> sampleOrginal, Dataset<Row> dupes) throws ZinggClientException {
5454
try {
5555
// input dupes are pairs
5656
/// pick ones according to the threshold by user

core/src/main/java/zingg/Matcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ public void execute() throws ZinggClientException {
160160
}
161161
}
162162

163-
public void writeOutput(Dataset<Row> blocked, Dataset<Row> dupesActual) {
163+
public void writeOutput(Dataset<Row> blocked, Dataset<Row> dupesActual) throws ZinggClientException {
164164
try{
165165
//input dupes are pairs
166166
///pick ones according to the threshold by user

core/src/main/java/zingg/TrainingDataFinder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public void execute() throws ZinggClientException {
139139
}
140140
}
141141

142-
public void writeUncertain(Dataset<Row> dupesActual, Dataset<Row> sampleOrginal) {
142+
public void writeUncertain(Dataset<Row> dupesActual, Dataset<Row> sampleOrginal) throws ZinggClientException {
143143
//dupesActual.show(4);
144144
//input dupes are pairs
145145
dupesActual = DFUtil.addClusterRowNumber(dupesActual, spark);

core/src/main/java/zingg/util/BlockingTreeUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public static Tree<Canopy> createBlockingTreeFromSample(Dataset<Row> testData,
7777
return createBlockingTree(sample, positives, sampleFraction, blockSize, args, hashFunctions);
7878
}
7979

80-
public static void writeBlockingTree(SparkSession spark, JavaSparkContext ctx, Tree<Canopy> blockingTree, Arguments args) throws Exception {
80+
public static void writeBlockingTree(SparkSession spark, JavaSparkContext ctx, Tree<Canopy> blockingTree, Arguments args) throws Exception, ZinggClientException {
8181
byte[] byteArray = Util.convertObjectIntoByteArray(blockingTree);
8282
StructType schema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField("BlockingTree", DataTypes.BinaryType, false) });
8383
List<Object> objList = new ArrayList<>();

core/src/main/java/zingg/util/PipeUtil.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,8 @@ public static Dataset<Row> read(SparkSession spark, boolean addLineNo, int numPa
192192
return rows;
193193
}
194194

195-
public static void write(Dataset<Row> toWriteOrig, Arguments args, JavaSparkContext ctx, Pipe... pipes) {
195+
public static void write(Dataset<Row> toWriteOrig, Arguments args, JavaSparkContext ctx, Pipe... pipes) throws ZinggClientException {
196+
try {
196197
for (Pipe p: pipes) {
197198
Dataset<Row> toWrite = toWriteOrig;
198199
DataFrameWriter writer = toWrite.write();
@@ -293,11 +294,13 @@ else if (p.getFormat().equals(Format.JDBC)){
293294
}
294295

295296

297+
}
298+
} catch (Exception ex) {
299+
throw new ZinggClientException(ex.getMessage());
296300
}
297-
298301
}
299302

300-
public static void writePerSource(Dataset<Row> toWrite, Arguments args, JavaSparkContext ctx, Pipe[] pipes ) {
303+
public static void writePerSource(Dataset<Row> toWrite, Arguments args, JavaSparkContext ctx, Pipe[] pipes ) throws ZinggClientException {
301304
List<Row> sources = toWrite.select(ColName.SOURCE_COL).distinct().collectAsList();
302305
for (Row r : sources) {
303306
Dataset<Row> toWriteNow = toWrite.filter(toWrite.col(ColName.SOURCE_COL).equalTo(r.get(0)));

0 commit comments

Comments
 (0)