Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception handling in PipeUtil::read() #229

Merged
merged 6 commits into from
May 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions client/src/main/java/zingg/client/ZinggClientException.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,14 @@ public ZinggClientException(String m) {
this.message = m;
}

public ZinggClientException(String m, Throwable cause) {
super(m, cause);
this.message = m;
}

public ZinggClientException(Throwable cause) {
super(cause);
this.message = cause.getMessage();
}

}
2 changes: 1 addition & 1 deletion client/src/main/java/zingg/client/pipe/Pipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class Pipe implements Serializable{
String name;
Format format;
String preprocessors;
Map<String, String> props;
Map<String, String> props = new HashMap<String, String>();
@JsonSerialize(using = CustomSchemaSerializer.class)
StructType schema = null;
Map<String, String> sparkProps;
Expand Down
114 changes: 57 additions & 57 deletions core/src/main/java/zingg/LabelUpdater.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,70 +41,70 @@ public void execute() throws ZinggClientException {

public void processRecordsCli(Dataset<Row> lines) throws ZinggClientException {
LOG.info("Processing Records for CLI updateLabelling");
getMarkedRecordsStat(lines);
printMarkedRecordsStat();
if (lines == null || lines.count() == 0) {
LOG.info("There is no marked record for updating. Please run findTrainingData/label jobs to generate training data.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to still print this out in else, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. made changes in Labeller and UpdateLabeller.

return;
}

List<Column> displayCols = DSUtil.getFieldDefColumns(lines, args, false, args.getShowConcise());
try {
int matchFlag;
Dataset<Row> updatedRecords = null;
Dataset<Row> recordsToUpdate = lines;
int selectedOption = -1;
String postMsg;
if (lines != null && lines.count() > 0) {
getMarkedRecordsStat(lines);
printMarkedRecordsStat();

Scanner sc = new Scanner(System.in);
do {
System.out.print("\n\tPlease enter the cluster id (or 9 to exit): ");
String cluster_id = sc.next();
if (cluster_id.equals("9")) {
LOG.info("User has exit in the middle. Updating the records.");
break;
}
Dataset<Row> currentPair = lines.filter(lines.col(ColName.CLUSTER_COLUMN).equalTo(cluster_id));
if (currentPair.isEmpty()) {
System.out.println("\tInvalid cluster id. Enter '9' to exit");
continue;
}
List<Column> displayCols = DSUtil.getFieldDefColumns(lines, args, false, args.getShowConcise());
try {
int matchFlag;
Dataset<Row> updatedRecords = null;
Dataset<Row> recordsToUpdate = lines;
int selectedOption = -1;
String postMsg;

Scanner sc = new Scanner(System.in);
do {
System.out.print("\n\tPlease enter the cluster id (or 9 to exit): ");
String cluster_id = sc.next();
if (cluster_id.equals("9")) {
LOG.info("User has exit in the middle. Updating the records.");
break;
}
Dataset<Row> currentPair = lines.filter(lines.col(ColName.CLUSTER_COLUMN).equalTo(cluster_id));
if (currentPair.isEmpty()) {
System.out.println("\tInvalid cluster id. Enter '9' to exit");
continue;
}

matchFlag = currentPair.head().getAs(ColName.MATCH_FLAG_COL);
String preMsg = String.format("\n\tThe record pairs belonging to the input cluster id %s are:", cluster_id);
String matchType = LabelMatchType.get(matchFlag).msg;
postMsg = String.format("\tThe above pair is labeled as %s\n", matchType);
selectedOption = displayRecordsAndGetUserInput(DSUtil.select(currentPair, displayCols), preMsg, postMsg);
updateLabellerStat(selectedOption, +1);
updateLabellerStat(matchFlag, -1);
printMarkedRecordsStat();
if (selectedOption == 9) {
LOG.info("User has quit in the middle. Updating the records.");
break;
}
recordsToUpdate = recordsToUpdate
.filter(recordsToUpdate.col(ColName.CLUSTER_COLUMN).notEqual(cluster_id));
if (updatedRecords != null) {
updatedRecords = updatedRecords
.filter(updatedRecords.col(ColName.CLUSTER_COLUMN).notEqual(cluster_id));
}
updatedRecords = updateRecords(selectedOption, currentPair, updatedRecords);
} while (selectedOption != 9);

matchFlag = currentPair.head().getAs(ColName.MATCH_FLAG_COL);
String preMsg = String.format("\n\tThe record pairs belonging to the input cluster id %s are:", cluster_id);
String matchType = LabelMatchType.get(matchFlag).msg;
postMsg = String.format("\tThe above pair is labeled as %s\n", matchType);
selectedOption = displayRecordsAndGetUserInput(DSUtil.select(currentPair, displayCols), preMsg, postMsg);
updateLabellerStat(selectedOption, +1);
updateLabellerStat(matchFlag, -1);
printMarkedRecordsStat();
if (selectedOption == 9) {
LOG.info("User has quit in the middle. Updating the records.");
break;
}
recordsToUpdate = recordsToUpdate
.filter(recordsToUpdate.col(ColName.CLUSTER_COLUMN).notEqual(cluster_id));
if (updatedRecords != null) {
updatedRecords = updatedRecords
.filter(updatedRecords.col(ColName.CLUSTER_COLUMN).notEqual(cluster_id));
updatedRecords = updatedRecords.union(recordsToUpdate);
}
updatedRecords = updateRecords(selectedOption, currentPair, updatedRecords);
} while (selectedOption != 9);

if (updatedRecords != null) {
updatedRecords = updatedRecords.union(recordsToUpdate);
}
writeLabelledOutput(updatedRecords);
sc.close();
LOG.info("Processing finished.");
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
e.printStackTrace();
writeLabelledOutput(updatedRecords);
sc.close();
LOG.info("Processing finished.");
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
e.printStackTrace();
}
LOG.warn("An error has occured while Updating Label. " + e.getMessage());
throw new ZinggClientException("An error while updating label", e);
}
LOG.warn("An error has occured while Updating Label. " + e.getMessage());
throw new ZinggClientException(e.getMessage());
} else {
LOG.info("There is no marked record for updating. Please run findTrainingData/label jobs to generate training data.");
}
return;
}


Expand Down
106 changes: 51 additions & 55 deletions core/src/main/java/zingg/Labeller.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package zingg;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Scanner;

Expand Down Expand Up @@ -51,7 +49,7 @@ public Dataset<Row> getUnmarkedRecords() throws ZinggClientException {
unmarkedRecords = PipeUtil.read(spark, false, false, PipeUtil.getTrainingDataUnmarkedPipe(args));
try {
markedRecords = PipeUtil.read(spark, false, false, PipeUtil.getTrainingDataMarkedPipe(args));
} catch (Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont we still need to catch the other expcetions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read() throws only ZinggClientException.
Earlier, why didnot read() mandate to ctch Exception everywhere it is called. Is there any difference in Exception and ZinggClientException?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes ZCE extends from throwable

} catch (ZinggClientException e) {
LOG.warn("No record has been marked yet");
}
if (markedRecords != null ) {
Expand All @@ -60,7 +58,7 @@ public Dataset<Row> getUnmarkedRecords() throws ZinggClientException {
"left_anti");
getMarkedRecordsStat(markedRecords);
}
} catch (Exception e) {
} catch (ZinggClientException e) {
LOG.warn("No unmarked record for labelling");
}
return unmarkedRecords;
Expand All @@ -75,61 +73,59 @@ protected void getMarkedRecordsStat(Dataset<Row> markedRecords) {

public void processRecordsCli(Dataset<Row> lines) throws ZinggClientException {
LOG.info("Processing Records for CLI Labelling");
printMarkedRecordsStat();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see earlier comment about returns

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed return statement.

if (lines == null || lines.count() == 0) {
LOG.info("It seems there are no unmarked records at this moment. Please run findTrainingData job to build some pairs to be labelled and then run this labeler.");
return;
}

lines = lines.cache();
List<Column> displayCols = DSUtil.getFieldDefColumns(lines, args, false, args.getShowConcise());
if (lines != null && lines.count() > 0) {
printMarkedRecordsStat();

List<Row> clusterIDs = lines.select(ColName.CLUSTER_COLUMN).distinct().collectAsList();
try {
double score;
double prediction;
Dataset<Row> updatedRecords = null;
int selected_option = -1;
String msg1, msg2;
int totalPairs = clusterIDs.size();

for (int index = 0; index < totalPairs; index++){
Dataset<Row> currentPair = lines.filter(lines.col(ColName.CLUSTER_COLUMN).equalTo(
clusterIDs.get(index).getAs(ColName.CLUSTER_COLUMN))).cache();

score = currentPair.head().getAs(ColName.SCORE_COL);
prediction = currentPair.head().getAs(ColName.PREDICTION_COL);

msg1 = String.format("\tCurrent labelling round : %d/%d pairs labelled\n", index, totalPairs);
String matchType = LabelMatchType.get(prediction).msg;
if (prediction == ColValues.IS_NOT_KNOWN_PREDICTION) {
msg2 = String.format(
"\tZingg does not do any prediction for the above pairs as Zingg is still collecting training data to build the preliminary models.");
} else {
msg2 = String.format("\tZingg predicts the above records %s with a similarity score of %.2f",
matchType, Math.floor(score * 100) * 0.01);
lines = lines.cache();
List<Column> displayCols = DSUtil.getFieldDefColumns(lines, args, false, args.getShowConcise());
List<Row> clusterIDs = lines.select(ColName.CLUSTER_COLUMN).distinct().collectAsList();
try {
double score;
double prediction;
Dataset<Row> updatedRecords = null;
int selected_option = -1;
String msg1, msg2;
int totalPairs = clusterIDs.size();

for (int index = 0; index < totalPairs; index++) {
Dataset<Row> currentPair = lines.filter(lines.col(ColName.CLUSTER_COLUMN).equalTo(
clusterIDs.get(index).getAs(ColName.CLUSTER_COLUMN))).cache();

score = currentPair.head().getAs(ColName.SCORE_COL);
prediction = currentPair.head().getAs(ColName.PREDICTION_COL);

msg1 = String.format("\tCurrent labelling round : %d/%d pairs labelled\n", index, totalPairs);
String matchType = LabelMatchType.get(prediction).msg;
if (prediction == ColValues.IS_NOT_KNOWN_PREDICTION) {
msg2 = String.format(
"\tZingg does not do any prediction for the above pairs as Zingg is still collecting training data to build the preliminary models.");
} else {
msg2 = String.format("\tZingg predicts the above records %s with a similarity score of %.2f",
matchType, Math.floor(score * 100) * 0.01);
}
//String msgHeader = msg1 + msg2;

selected_option = displayRecordsAndGetUserInput(DSUtil.select(currentPair, displayCols), msg1, msg2);
updateLabellerStat(selected_option, 1);
printMarkedRecordsStat();
if (selected_option == 9) {
LOG.info("User has quit in the middle. Updating the records.");
break;
}
updatedRecords = updateRecords(selected_option, currentPair, updatedRecords);
}
//String msgHeader = msg1 + msg2;

selected_option = displayRecordsAndGetUserInput(DSUtil.select(currentPair, displayCols), msg1, msg2);
updateLabellerStat(selected_option, 1);
printMarkedRecordsStat();
if (selected_option == 9) {
LOG.info("User has quit in the middle. Updating the records.");
break;
writeLabelledOutput(updatedRecords);
LOG.warn("Processing finished.");
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
e.printStackTrace();
}
updatedRecords = updateRecords(selected_option, currentPair, updatedRecords);
}
writeLabelledOutput(updatedRecords);
LOG.warn("Processing finished.");
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
e.printStackTrace();
LOG.warn("Labelling error has occured " + e.getMessage());
throw new ZinggClientException("An error has occured while Labelling.", e);
}
LOG.warn("Labelling error has occured " + e.getMessage());
throw new ZinggClientException(e.getMessage());
} else {
LOG.info("It seems there are no unmarked records at this moment. Please run findTrainingData job to build some pairs to be labelled and then run this labeler.");
}
return;
}


Expand Down Expand Up @@ -203,7 +199,7 @@ protected void printMarkedRecordsStat() {
System.out.println(msg);
}

protected void writeLabelledOutput(Dataset<Row> records) {
protected void writeLabelledOutput(Dataset<Row> records) throws ZinggClientException {
if (records == null) {
LOG.warn("No records to be labelled.");
return;
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/zingg/Linker.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected Dataset<Row> selectColsFromBlocked(Dataset<Row> blocked) {
return blocked;
}

public void writeOutput(Dataset<Row> sampleOrginal, Dataset<Row> dupes) {
public void writeOutput(Dataset<Row> sampleOrginal, Dataset<Row> dupes) throws ZinggClientException {
try {
// input dupes are pairs
/// pick ones according to the threshold by user
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/java/zingg/Matcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ public Matcher() {
setZinggOptions(ZinggOptions.MATCH);
}

protected Dataset<Row> getTestData() {
return PipeUtil.read(spark, true, args.getNumPartitions(), true, args.getData());
protected Dataset<Row> getTestData() throws ZinggClientException{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To satisfy the requirement of catching or throwing the ZinggClientException. All the intermediate functions are mandating to add this.
In this regard, are types Exception and ZinggClientException different?

Dataset<Row> data = PipeUtil.read(spark, true, args.getNumPartitions(), true, args.getData());
return data;
}

protected Dataset<Row> getBlocked(Dataset<Row> testData) throws Exception{
protected Dataset<Row> getBlocked(Dataset<Row> testData) throws Exception, ZinggClientException{
LOG.debug("Blocking model file location is " + args.getBlockFile());
Tree<Canopy> tree = BlockingTreeUtil.readBlockingTree(spark, args);
Dataset<Row> blocked = testData.map(new Block.BlockFunction(tree), RowEncoder.apply(Block.appendHashCol(testData.schema())));
Expand Down Expand Up @@ -159,7 +160,7 @@ public void execute() throws ZinggClientException {
}
}

public void writeOutput(Dataset<Row> blocked, Dataset<Row> dupesActual) {
public void writeOutput(Dataset<Row> blocked, Dataset<Row> dupesActual) throws ZinggClientException {
try{
//input dupes are pairs
///pick ones according to the threshold by user
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/zingg/TrainingDataFinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public TrainingDataFinder() {
setZinggOptions(ZinggOptions.FIND_TRAINING_DATA);
}

public Dataset<Row> getTraining() {
public Dataset<Row> getTraining() throws ZinggClientException {
return DSUtil.getTraining(spark, args);
}

Expand Down Expand Up @@ -139,7 +139,7 @@ public void execute() throws ZinggClientException {
}
}

public void writeUncertain(Dataset<Row> dupesActual, Dataset<Row> sampleOrginal) {
public void writeUncertain(Dataset<Row> dupesActual, Dataset<Row> sampleOrginal) throws ZinggClientException {
//dupesActual.show(4);
//input dupes are pairs
dupesActual = DFUtil.addClusterRowNumber(dupesActual, spark);
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/zingg/preprocess/StopWords.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import zingg.client.Arguments;
import zingg.client.FieldDefinition;
import zingg.client.ZinggClientException;
import zingg.util.PipeUtil;

public class StopWords {
Expand All @@ -25,7 +26,7 @@ public class StopWords {
public static final Log LOG = LogFactory.getLog(StopWords.class);
protected static String stopWordColumn = "StopWord";

public static Dataset<Row> preprocessForStopWords(SparkSession spark, Arguments args, Dataset<Row> ds) {
public static Dataset<Row> preprocessForStopWords(SparkSession spark, Arguments args, Dataset<Row> ds) throws ZinggClientException {

List<String> wordList = new ArrayList<String>();
for (FieldDefinition def : args.getFieldDefinition()) {
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/zingg/util/BlockingTreeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import zingg.client.Arguments;
import zingg.client.FieldDefinition;
import zingg.client.MatchType;
import zingg.client.ZinggClientException;
import zingg.client.util.ListMap;
import zingg.client.util.Util;
import zingg.hash.HashFunction;
Expand Down Expand Up @@ -76,7 +77,7 @@ public static Tree<Canopy> createBlockingTreeFromSample(Dataset<Row> testData,
return createBlockingTree(sample, positives, sampleFraction, blockSize, args, hashFunctions);
}

public static void writeBlockingTree(SparkSession spark, JavaSparkContext ctx, Tree<Canopy> blockingTree, Arguments args) throws Exception {
public static void writeBlockingTree(SparkSession spark, JavaSparkContext ctx, Tree<Canopy> blockingTree, Arguments args) throws Exception, ZinggClientException {
byte[] byteArray = Util.convertObjectIntoByteArray(blockingTree);
StructType schema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField("BlockingTree", DataTypes.BinaryType, false) });
List<Object> objList = new ArrayList<>();
Expand All @@ -86,7 +87,7 @@ public static void writeBlockingTree(SparkSession spark, JavaSparkContext ctx, T
PipeUtil.write(df, args, ctx, PipeUtil.getBlockingTreePipe(args));
}

public static Tree<Canopy> readBlockingTree(SparkSession spark, Arguments args) throws Exception {
public static Tree<Canopy> readBlockingTree(SparkSession spark, Arguments args) throws Exception, ZinggClientException{
Dataset<Row> tree = PipeUtil.read(spark, false, args.getNumPartitions(), false, PipeUtil.getBlockingTreePipe(args));
byte [] byteArrayBack = (byte[]) tree.head().get(0);
Tree<Canopy> blockingTree = null;
Expand Down
Loading