Skip to content

Commit cb39e21

Browse files
authored
Merge pull request #229 from navinrathore/zProperErrors
Exception handling in PipeUtil::read()
2 parents da06692 + b4d292a commit cb39e21

File tree

11 files changed

+158
-139
lines changed

11 files changed

+158
-139
lines changed

client/src/main/java/zingg/client/ZinggClientException.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,14 @@ public ZinggClientException(String m) {
1818
this.message = m;
1919
}
2020

21+
public ZinggClientException(String m, Throwable cause) {
22+
super(m, cause);
23+
this.message = m;
24+
}
25+
26+
public ZinggClientException(Throwable cause) {
27+
super(cause);
28+
this.message = cause.getMessage();
29+
}
30+
2131
}

client/src/main/java/zingg/client/pipe/Pipe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class Pipe implements Serializable{
3232
String name;
3333
Format format;
3434
String preprocessors;
35-
Map<String, String> props;
35+
Map<String, String> props = new HashMap<String, String>();
3636
@JsonSerialize(using = CustomSchemaSerializer.class)
3737
StructType schema = null;
3838
Map<String, String> sparkProps;

core/src/main/java/zingg/LabelUpdater.java

Lines changed: 57 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -41,70 +41,70 @@ public void execute() throws ZinggClientException {
4141

4242
public void processRecordsCli(Dataset<Row> lines) throws ZinggClientException {
4343
LOG.info("Processing Records for CLI updateLabelling");
44-
getMarkedRecordsStat(lines);
45-
printMarkedRecordsStat();
46-
if (lines == null || lines.count() == 0) {
47-
LOG.info("There is no marked record for updating. Please run findTrainingData/label jobs to generate training data.");
48-
return;
49-
}
5044

51-
List<Column> displayCols = DSUtil.getFieldDefColumns(lines, args, false, args.getShowConcise());
52-
try {
53-
int matchFlag;
54-
Dataset<Row> updatedRecords = null;
55-
Dataset<Row> recordsToUpdate = lines;
56-
int selectedOption = -1;
57-
String postMsg;
45+
if (lines != null && lines.count() > 0) {
46+
getMarkedRecordsStat(lines);
47+
printMarkedRecordsStat();
5848

59-
Scanner sc = new Scanner(System.in);
60-
do {
61-
System.out.print("\n\tPlease enter the cluster id (or 9 to exit): ");
62-
String cluster_id = sc.next();
63-
if (cluster_id.equals("9")) {
64-
LOG.info("User has exit in the middle. Updating the records.");
65-
break;
66-
}
67-
Dataset<Row> currentPair = lines.filter(lines.col(ColName.CLUSTER_COLUMN).equalTo(cluster_id));
68-
if (currentPair.isEmpty()) {
69-
System.out.println("\tInvalid cluster id. Enter '9' to exit");
70-
continue;
71-
}
49+
List<Column> displayCols = DSUtil.getFieldDefColumns(lines, args, false, args.getShowConcise());
50+
try {
51+
int matchFlag;
52+
Dataset<Row> updatedRecords = null;
53+
Dataset<Row> recordsToUpdate = lines;
54+
int selectedOption = -1;
55+
String postMsg;
56+
57+
Scanner sc = new Scanner(System.in);
58+
do {
59+
System.out.print("\n\tPlease enter the cluster id (or 9 to exit): ");
60+
String cluster_id = sc.next();
61+
if (cluster_id.equals("9")) {
62+
LOG.info("User has exit in the middle. Updating the records.");
63+
break;
64+
}
65+
Dataset<Row> currentPair = lines.filter(lines.col(ColName.CLUSTER_COLUMN).equalTo(cluster_id));
66+
if (currentPair.isEmpty()) {
67+
System.out.println("\tInvalid cluster id. Enter '9' to exit");
68+
continue;
69+
}
70+
71+
matchFlag = currentPair.head().getAs(ColName.MATCH_FLAG_COL);
72+
String preMsg = String.format("\n\tThe record pairs belonging to the input cluster id %s are:", cluster_id);
73+
String matchType = LabelMatchType.get(matchFlag).msg;
74+
postMsg = String.format("\tThe above pair is labeled as %s\n", matchType);
75+
selectedOption = displayRecordsAndGetUserInput(DSUtil.select(currentPair, displayCols), preMsg, postMsg);
76+
updateLabellerStat(selectedOption, +1);
77+
updateLabellerStat(matchFlag, -1);
78+
printMarkedRecordsStat();
79+
if (selectedOption == 9) {
80+
LOG.info("User has quit in the middle. Updating the records.");
81+
break;
82+
}
83+
recordsToUpdate = recordsToUpdate
84+
.filter(recordsToUpdate.col(ColName.CLUSTER_COLUMN).notEqual(cluster_id));
85+
if (updatedRecords != null) {
86+
updatedRecords = updatedRecords
87+
.filter(updatedRecords.col(ColName.CLUSTER_COLUMN).notEqual(cluster_id));
88+
}
89+
updatedRecords = updateRecords(selectedOption, currentPair, updatedRecords);
90+
} while (selectedOption != 9);
7291

73-
matchFlag = currentPair.head().getAs(ColName.MATCH_FLAG_COL);
74-
String preMsg = String.format("\n\tThe record pairs belonging to the input cluster id %s are:", cluster_id);
75-
String matchType = LabelMatchType.get(matchFlag).msg;
76-
postMsg = String.format("\tThe above pair is labeled as %s\n", matchType);
77-
selectedOption = displayRecordsAndGetUserInput(DSUtil.select(currentPair, displayCols), preMsg, postMsg);
78-
updateLabellerStat(selectedOption, +1);
79-
updateLabellerStat(matchFlag, -1);
80-
printMarkedRecordsStat();
81-
if (selectedOption == 9) {
82-
LOG.info("User has quit in the middle. Updating the records.");
83-
break;
84-
}
85-
recordsToUpdate = recordsToUpdate
86-
.filter(recordsToUpdate.col(ColName.CLUSTER_COLUMN).notEqual(cluster_id));
8792
if (updatedRecords != null) {
88-
updatedRecords = updatedRecords
89-
.filter(updatedRecords.col(ColName.CLUSTER_COLUMN).notEqual(cluster_id));
93+
updatedRecords = updatedRecords.union(recordsToUpdate);
9094
}
91-
updatedRecords = updateRecords(selectedOption, currentPair, updatedRecords);
92-
} while (selectedOption != 9);
93-
94-
if (updatedRecords != null) {
95-
updatedRecords = updatedRecords.union(recordsToUpdate);
96-
}
97-
writeLabelledOutput(updatedRecords);
98-
sc.close();
99-
LOG.info("Processing finished.");
100-
} catch (Exception e) {
101-
if (LOG.isDebugEnabled()) {
102-
e.printStackTrace();
95+
writeLabelledOutput(updatedRecords);
96+
sc.close();
97+
LOG.info("Processing finished.");
98+
} catch (Exception e) {
99+
if (LOG.isDebugEnabled()) {
100+
e.printStackTrace();
101+
}
102+
LOG.warn("An error has occured while Updating Label. " + e.getMessage());
103+
throw new ZinggClientException("An error while updating label", e);
103104
}
104-
LOG.warn("An error has occured while Updating Label. " + e.getMessage());
105-
throw new ZinggClientException(e.getMessage());
105+
} else {
106+
LOG.info("There is no marked record for updating. Please run findTrainingData/label jobs to generate training data.");
106107
}
107-
return;
108108
}
109109

110110

core/src/main/java/zingg/Labeller.java

Lines changed: 51 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package zingg;
22

3-
import java.util.ArrayList;
4-
import java.util.Arrays;
53
import java.util.List;
64
import java.util.Scanner;
75

@@ -51,7 +49,7 @@ public Dataset<Row> getUnmarkedRecords() throws ZinggClientException {
5149
unmarkedRecords = PipeUtil.read(spark, false, false, PipeUtil.getTrainingDataUnmarkedPipe(args));
5250
try {
5351
markedRecords = PipeUtil.read(spark, false, false, PipeUtil.getTrainingDataMarkedPipe(args));
54-
} catch (Exception e) {
52+
} catch (ZinggClientException e) {
5553
LOG.warn("No record has been marked yet");
5654
}
5755
if (markedRecords != null ) {
@@ -60,7 +58,7 @@ public Dataset<Row> getUnmarkedRecords() throws ZinggClientException {
6058
"left_anti");
6159
getMarkedRecordsStat(markedRecords);
6260
}
63-
} catch (Exception e) {
61+
} catch (ZinggClientException e) {
6462
LOG.warn("No unmarked record for labelling");
6563
}
6664
return unmarkedRecords;
@@ -75,61 +73,59 @@ protected void getMarkedRecordsStat(Dataset<Row> markedRecords) {
7573

7674
public void processRecordsCli(Dataset<Row> lines) throws ZinggClientException {
7775
LOG.info("Processing Records for CLI Labelling");
78-
printMarkedRecordsStat();
79-
if (lines == null || lines.count() == 0) {
80-
LOG.info("It seems there are no unmarked records at this moment. Please run findTrainingData job to build some pairs to be labelled and then run this labeler.");
81-
return;
82-
}
83-
84-
lines = lines.cache();
85-
List<Column> displayCols = DSUtil.getFieldDefColumns(lines, args, false, args.getShowConcise());
76+
if (lines != null && lines.count() > 0) {
77+
printMarkedRecordsStat();
8678

87-
List<Row> clusterIDs = lines.select(ColName.CLUSTER_COLUMN).distinct().collectAsList();
88-
try {
89-
double score;
90-
double prediction;
91-
Dataset<Row> updatedRecords = null;
92-
int selected_option = -1;
93-
String msg1, msg2;
94-
int totalPairs = clusterIDs.size();
95-
96-
for (int index = 0; index < totalPairs; index++){
97-
Dataset<Row> currentPair = lines.filter(lines.col(ColName.CLUSTER_COLUMN).equalTo(
98-
clusterIDs.get(index).getAs(ColName.CLUSTER_COLUMN))).cache();
99-
100-
score = currentPair.head().getAs(ColName.SCORE_COL);
101-
prediction = currentPair.head().getAs(ColName.PREDICTION_COL);
102-
103-
msg1 = String.format("\tCurrent labelling round : %d/%d pairs labelled\n", index, totalPairs);
104-
String matchType = LabelMatchType.get(prediction).msg;
105-
if (prediction == ColValues.IS_NOT_KNOWN_PREDICTION) {
106-
msg2 = String.format(
107-
"\tZingg does not do any prediction for the above pairs as Zingg is still collecting training data to build the preliminary models.");
108-
} else {
109-
msg2 = String.format("\tZingg predicts the above records %s with a similarity score of %.2f",
110-
matchType, Math.floor(score * 100) * 0.01);
79+
lines = lines.cache();
80+
List<Column> displayCols = DSUtil.getFieldDefColumns(lines, args, false, args.getShowConcise());
81+
List<Row> clusterIDs = lines.select(ColName.CLUSTER_COLUMN).distinct().collectAsList();
82+
try {
83+
double score;
84+
double prediction;
85+
Dataset<Row> updatedRecords = null;
86+
int selected_option = -1;
87+
String msg1, msg2;
88+
int totalPairs = clusterIDs.size();
89+
90+
for (int index = 0; index < totalPairs; index++) {
91+
Dataset<Row> currentPair = lines.filter(lines.col(ColName.CLUSTER_COLUMN).equalTo(
92+
clusterIDs.get(index).getAs(ColName.CLUSTER_COLUMN))).cache();
93+
94+
score = currentPair.head().getAs(ColName.SCORE_COL);
95+
prediction = currentPair.head().getAs(ColName.PREDICTION_COL);
96+
97+
msg1 = String.format("\tCurrent labelling round : %d/%d pairs labelled\n", index, totalPairs);
98+
String matchType = LabelMatchType.get(prediction).msg;
99+
if (prediction == ColValues.IS_NOT_KNOWN_PREDICTION) {
100+
msg2 = String.format(
101+
"\tZingg does not do any prediction for the above pairs as Zingg is still collecting training data to build the preliminary models.");
102+
} else {
103+
msg2 = String.format("\tZingg predicts the above records %s with a similarity score of %.2f",
104+
matchType, Math.floor(score * 100) * 0.01);
105+
}
106+
//String msgHeader = msg1 + msg2;
107+
108+
selected_option = displayRecordsAndGetUserInput(DSUtil.select(currentPair, displayCols), msg1, msg2);
109+
updateLabellerStat(selected_option, 1);
110+
printMarkedRecordsStat();
111+
if (selected_option == 9) {
112+
LOG.info("User has quit in the middle. Updating the records.");
113+
break;
114+
}
115+
updatedRecords = updateRecords(selected_option, currentPair, updatedRecords);
111116
}
112-
//String msgHeader = msg1 + msg2;
113-
114-
selected_option = displayRecordsAndGetUserInput(DSUtil.select(currentPair, displayCols), msg1, msg2);
115-
updateLabellerStat(selected_option, 1);
116-
printMarkedRecordsStat();
117-
if (selected_option == 9) {
118-
LOG.info("User has quit in the middle. Updating the records.");
119-
break;
117+
writeLabelledOutput(updatedRecords);
118+
LOG.warn("Processing finished.");
119+
} catch (Exception e) {
120+
if (LOG.isDebugEnabled()) {
121+
e.printStackTrace();
120122
}
121-
updatedRecords = updateRecords(selected_option, currentPair, updatedRecords);
122-
}
123-
writeLabelledOutput(updatedRecords);
124-
LOG.warn("Processing finished.");
125-
} catch (Exception e) {
126-
if (LOG.isDebugEnabled()) {
127-
e.printStackTrace();
123+
LOG.warn("Labelling error has occured " + e.getMessage());
124+
throw new ZinggClientException("An error has occured while Labelling.", e);
128125
}
129-
LOG.warn("Labelling error has occured " + e.getMessage());
130-
throw new ZinggClientException(e.getMessage());
126+
} else {
127+
LOG.info("It seems there are no unmarked records at this moment. Please run findTrainingData job to build some pairs to be labelled and then run this labeler.");
131128
}
132-
return;
133129
}
134130

135131

@@ -203,7 +199,7 @@ protected void printMarkedRecordsStat() {
203199
System.out.println(msg);
204200
}
205201

206-
protected void writeLabelledOutput(Dataset<Row> records) {
202+
protected void writeLabelledOutput(Dataset<Row> records) throws ZinggClientException {
207203
if (records == null) {
208204
LOG.warn("No records to be labelled.");
209205
return;

core/src/main/java/zingg/Linker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ protected Dataset<Row> selectColsFromBlocked(Dataset<Row> blocked) {
5050
return blocked;
5151
}
5252

53-
public void writeOutput(Dataset<Row> sampleOrginal, Dataset<Row> dupes) {
53+
public void writeOutput(Dataset<Row> sampleOrginal, Dataset<Row> dupes) throws ZinggClientException {
5454
try {
5555
// input dupes are pairs
5656
/// pick ones according to the threshold by user

core/src/main/java/zingg/Matcher.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,12 @@ public Matcher() {
4141
setZinggOptions(ZinggOptions.MATCH);
4242
}
4343

44-
protected Dataset<Row> getTestData() {
45-
return PipeUtil.read(spark, true, args.getNumPartitions(), true, args.getData());
44+
protected Dataset<Row> getTestData() throws ZinggClientException{
45+
Dataset<Row> data = PipeUtil.read(spark, true, args.getNumPartitions(), true, args.getData());
46+
return data;
4647
}
4748

48-
protected Dataset<Row> getBlocked(Dataset<Row> testData) throws Exception{
49+
protected Dataset<Row> getBlocked(Dataset<Row> testData) throws Exception, ZinggClientException{
4950
LOG.debug("Blocking model file location is " + args.getBlockFile());
5051
Tree<Canopy> tree = BlockingTreeUtil.readBlockingTree(spark, args);
5152
Dataset<Row> blocked = testData.map(new Block.BlockFunction(tree), RowEncoder.apply(Block.appendHashCol(testData.schema())));
@@ -159,7 +160,7 @@ public void execute() throws ZinggClientException {
159160
}
160161
}
161162

162-
public void writeOutput(Dataset<Row> blocked, Dataset<Row> dupesActual) {
163+
public void writeOutput(Dataset<Row> blocked, Dataset<Row> dupesActual) throws ZinggClientException {
163164
try{
164165
//input dupes are pairs
165166
///pick ones according to the threshold by user

core/src/main/java/zingg/TrainingDataFinder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public TrainingDataFinder() {
4444
setZinggOptions(ZinggOptions.FIND_TRAINING_DATA);
4545
}
4646

47-
public Dataset<Row> getTraining() {
47+
public Dataset<Row> getTraining() throws ZinggClientException {
4848
return DSUtil.getTraining(spark, args);
4949
}
5050

@@ -139,7 +139,7 @@ public void execute() throws ZinggClientException {
139139
}
140140
}
141141

142-
public void writeUncertain(Dataset<Row> dupesActual, Dataset<Row> sampleOrginal) {
142+
public void writeUncertain(Dataset<Row> dupesActual, Dataset<Row> sampleOrginal) throws ZinggClientException {
143143
//dupesActual.show(4);
144144
//input dupes are pairs
145145
dupesActual = DFUtil.addClusterRowNumber(dupesActual, spark);

core/src/main/java/zingg/preprocess/StopWords.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import zingg.client.Arguments;
1919
import zingg.client.FieldDefinition;
20+
import zingg.client.ZinggClientException;
2021
import zingg.util.PipeUtil;
2122

2223
public class StopWords {
@@ -25,7 +26,7 @@ public class StopWords {
2526
public static final Log LOG = LogFactory.getLog(StopWords.class);
2627
protected static String stopWordColumn = "StopWord";
2728

28-
public static Dataset<Row> preprocessForStopWords(SparkSession spark, Arguments args, Dataset<Row> ds) {
29+
public static Dataset<Row> preprocessForStopWords(SparkSession spark, Arguments args, Dataset<Row> ds) throws ZinggClientException {
2930

3031
List<String> wordList = new ArrayList<String>();
3132
for (FieldDefinition def : args.getFieldDefinition()) {

core/src/main/java/zingg/util/BlockingTreeUtil.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import zingg.client.Arguments;
2424
import zingg.client.FieldDefinition;
2525
import zingg.client.MatchType;
26+
import zingg.client.ZinggClientException;
2627
import zingg.client.util.ListMap;
2728
import zingg.client.util.Util;
2829
import zingg.hash.HashFunction;
@@ -76,7 +77,7 @@ public static Tree<Canopy> createBlockingTreeFromSample(Dataset<Row> testData,
7677
return createBlockingTree(sample, positives, sampleFraction, blockSize, args, hashFunctions);
7778
}
7879

79-
public static void writeBlockingTree(SparkSession spark, JavaSparkContext ctx, Tree<Canopy> blockingTree, Arguments args) throws Exception {
80+
public static void writeBlockingTree(SparkSession spark, JavaSparkContext ctx, Tree<Canopy> blockingTree, Arguments args) throws Exception, ZinggClientException {
8081
byte[] byteArray = Util.convertObjectIntoByteArray(blockingTree);
8182
StructType schema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField("BlockingTree", DataTypes.BinaryType, false) });
8283
List<Object> objList = new ArrayList<>();
@@ -86,7 +87,7 @@ public static void writeBlockingTree(SparkSession spark, JavaSparkContext ctx, T
8687
PipeUtil.write(df, args, ctx, PipeUtil.getBlockingTreePipe(args));
8788
}
8889

89-
public static Tree<Canopy> readBlockingTree(SparkSession spark, Arguments args) throws Exception {
90+
public static Tree<Canopy> readBlockingTree(SparkSession spark, Arguments args) throws Exception, ZinggClientException{
9091
Dataset<Row> tree = PipeUtil.read(spark, false, args.getNumPartitions(), false, PipeUtil.getBlockingTreePipe(args));
9192
byte [] byteArrayBack = (byte[]) tree.head().get(0);
9293
Tree<Canopy> blockingTree = null;

0 commit comments

Comments
 (0)