diff --git a/common/client/src/main/java/zingg/common/client/Arguments.java b/common/client/src/main/java/zingg/common/client/Arguments.java index 570849305..4918a46a6 100644 --- a/common/client/src/main/java/zingg/common/client/Arguments.java +++ b/common/client/src/main/java/zingg/common/client/Arguments.java @@ -111,7 +111,7 @@ public class Arguments implements Serializable { float stopWordsCutoff = 0.1f; long blockSize = 100L; String column; - String obviousDupeCondition; + ObviousDupes[] obviousDupes; public void setThreshold(double threshold) { @@ -478,12 +478,12 @@ public void setColumn(String column) { this.column = column; } - public String getObviousDupeCondition() { - return obviousDupeCondition; + public ObviousDupes[] getObviousDupes() { + return obviousDupes; } - public void setObviousDupeCondition(String obviousDupeCondition) { - this.obviousDupeCondition = obviousDupeCondition; + public void setObviousDupes(ObviousDupes[] obviousDupes) { + this.obviousDupes = obviousDupes; } public long getBlockSize() { diff --git a/common/client/src/main/java/zingg/common/client/ObviousDupes.java b/common/client/src/main/java/zingg/common/client/ObviousDupes.java new file mode 100644 index 000000000..df50d5455 --- /dev/null +++ b/common/client/src/main/java/zingg/common/client/ObviousDupes.java @@ -0,0 +1,40 @@ +package zingg.common.client; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.HashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class ObviousDupes implements Serializable { + + private static final long serialVersionUID = 1L; + public static final Log LOG = LogFactory.getLog(ObviousDupes.class); + + public static final String fieldName = "fieldName"; + + public ObviousDupes() { + + } + + public ObviousDupes(HashMap[] matchCondition) { + this.matchCondition = matchCondition; + } + + HashMap[] matchCondition; + + public HashMap[] getMatchCondition() { + return matchCondition; + } + + public void setMatchCondition(HashMap[] matchCondition) { + this.matchCondition = matchCondition; + } + + @Override + public String toString() { + return Arrays.toString(matchCondition); + } + +} diff --git a/common/client/src/main/java/zingg/common/client/ZFrame.java b/common/client/src/main/java/zingg/common/client/ZFrame.java index 0cfb3e058..e94e271e6 100644 --- a/common/client/src/main/java/zingg/common/client/ZFrame.java +++ b/common/client/src/main/java/zingg/common/client/ZFrame.java @@ -2,7 +2,6 @@ import java.util.List; - //Dataset, Row, column public interface ZFrame { @@ -11,11 +10,7 @@ public interface ZFrame { public static final String COL_COUNT = "count"; public static final String COL_VALUE = "VALUE"; - - public static final String orSeperator = "\\|"; - public static final String andSeperator = "\\&"; - - + public ZFrame cache(); public ZFrame as(String s); public String[] columns(); @@ -99,7 +94,9 @@ public interface ZFrame { public C gt(String c, double val); public C equalTo(String c, String e); - + + public C equalTo(C column1, C column2); + public C notEqual(String c, String e); public C notEqual(String e); @@ -110,6 +107,13 @@ public interface ZFrame { public C notEqual(String c, int e); + public C not(C col); + + public C isNotNull(C col); + + public C and(C col1, C col2); + + public C or(C col1, C col2); public void show(int num); public void show(); @@ -160,13 +164,5 @@ public interface ZFrame { public ZFrame filterNotNullCond(String colName); public ZFrame filterNullCond(String colName); - - public C getObviousDupesFilter(String obviousDupeString, C extraAndCond); - - public C getObviousDupesFilter(ZFrame dfToJoin, String obviousDupeString, C extraAndCond); - - public C getReverseObviousDupesFilter(String obviousDupeString, C extraAndCond); - - public C getReverseObviousDupesFilter(ZFrame dfToJoin, String obviousDupeString, C extraAndCond); - + } \ No newline at end of file diff --git a/common/client/src/test/java/zingg/common/client/TestArguments.java b/common/client/src/test/java/zingg/common/client/TestArguments.java index de6eee10b..60b9913d9 100644 --- a/common/client/src/test/java/zingg/common/client/TestArguments.java +++ b/common/client/src/test/java/zingg/common/client/TestArguments.java @@ -7,6 +7,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -241,4 +242,25 @@ public void testMatchTypeWrong() { } + + @Test + public void testObvDupe() { + Arguments args; + try { + args = argsUtil.createArgumentsFromJSON(getClass().getResource("../../../testArguments/configObvDupe.json").getFile(), "test"); + + ObviousDupes[] obviousDupes = args.getObviousDupes(); + HashMap[] matchCondition = obviousDupes[0].getMatchCondition(); + + assertEquals("fname", matchCondition[0].get(ObviousDupes.fieldName)); + + } catch (Exception | ZinggClientException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + fail("Could not read config"); + } + + } + + } diff --git a/common/client/src/test/resources/testArguments/configObvDupe.json b/common/client/src/test/resources/testArguments/configObvDupe.json new file mode 100644 index 000000000..724f2f3b7 --- /dev/null +++ b/common/client/src/test/resources/testArguments/configObvDupe.json @@ -0,0 +1,117 @@ +{ + "fieldDefinition":[ + { + "fieldName" : "recId", + "matchType" : "dont_use", + "fields" : "recId", + "dataType": "string" + }, + { + "fieldName" : "fname", + "matchType" : "fuzzy", + "fields" : "fname", + "dataType": "string" + }, + { + "fieldName" : "lname", + "matchType" : "fuzzy", + "fields" : "lname", + "dataType": "string" + }, + { + "fieldName" : "stNo", + "matchType": "fuzzy", + "fields" : "stNo", + "dataType": "string" + }, + { + "fieldName" : "add1", + "matchType": "fuzzy", + "fields" : "add1", + "dataType": "string" + }, + { + "fieldName" : "add2", + "matchType": "fuzzy", + "fields" : "add2", + "dataType": "string" + }, + { + "fieldName" : "city", + "matchType": "fuzzy", + "fields" : "city", + "dataType": "string" + }, + { + "fieldName" : "areacode", + "matchType": "fuzzy", + "fields" : "areacode", + "dataType": "string" + }, + { + "fieldName" : "state", + "matchType": "fuzzy", + "fields" : "state", + "dataType": "string" + }, + { + "fieldName" : "dob", + "matchType": "fuzzy", + "fields" : "dob", + "dataType": "string" + }, + { + "fieldName" : "ssn", + "matchType": "fuzzy", + "fields" : "ssn", + "dataType": "string" + } + ], + "output" : [{ + "name":"output", + "format":"csv", + "props": { + "location": "/tmp/zinggOutput", + "delimiter": ",", + "header":true + } + }], + "data" : [{ + "name":"test", + "format":"csv", + "props": { + "location": "examples/febrl/test.csv", + "delimiter": ",", + "header":false + }, + "schema": "recId string, fname string, lname string, stNo string, add1 string, add2 string, city string, state string, areacode string, dob string, ssn string" + } + ], + "obviousDupes":[ + { + "matchCondition":[ + { + "fieldName":"fname" + }, + { + "fieldName":"stNo" + }, + { + "fieldName":"add1" + } + ] + }, + { + "matchCondition":[ + { + "fieldName":"recId" + } + ] + } + ], + "labelDataSampleSize" : 0.5, + "numPartitions":4, + "modelId": 100, + "zinggDir": "models" + +} diff --git a/common/core/src/main/java/zingg/common/core/executor/Matcher.java b/common/core/src/main/java/zingg/common/core/executor/Matcher.java index 4fc351c33..7093d153e 100644 --- a/common/core/src/main/java/zingg/common/core/executor/Matcher.java +++ b/common/core/src/main/java/zingg/common/core/executor/Matcher.java @@ -14,16 +14,18 @@ import zingg.common.core.block.Canopy; import zingg.common.core.block.Tree; import zingg.common.core.model.Model; +import zingg.common.core.obviousdupes.ObviousDupesUtil; +import zingg.common.core.preprocess.StopWordsRemover; import zingg.common.core.util.Analytics; import zingg.common.core.util.Metric; -import zingg.common.core.preprocess.StopWordsRemover; public abstract class Matcher extends ZinggBase{ - + private static final long serialVersionUID = 1L; protected static String name = "zingg.Matcher"; public static final Log LOG = LogFactory.getLog(Matcher.class); - + protected ObviousDupesUtil obvDupeUtil; + public Matcher() { setZinggOptions(ZinggOptions.MATCH); } @@ -74,12 +76,6 @@ protected ZFrame getBlocks(ZFrameblocked, ZFramebAll) throw return joinH; } - protected ZFramemassageAllEquals(ZFrameallEqual) { - allEqual = allEqual.withColumn(ColName.PREDICTION_COL, ColValues.IS_MATCH_PREDICTION); - allEqual = allEqual.withColumn(ColName.SCORE_COL, ColValues.FULL_MATCH_SCORE); - return allEqual; - } - protected abstract Model getModel() throws ZinggClientException; protected ZFrameselectColsFromBlocked(ZFrameblocked) { @@ -124,13 +120,7 @@ public void execute() throws ZinggClientException { //get obvious dupes ZFrame obvDupePairs = getObvDupePairs(blocked); - if (obvDupePairs != null) { - long obvDupeCount = obvDupePairs.count(); - LOG.info("obvDupePairs count " + obvDupeCount); - if (obvDupeCount > 0) { - blocks = removeObvDupesFromBlocks(blocks); - } - } + blocks = removeObvDupesFromBlocks(blocks,obvDupePairs); //send remaining to model Model model = getModel(); @@ -172,6 +162,15 @@ public void execute() throws ZinggClientException { throw new ZinggClientException(e.getMessage()); } } + + protected ZFrame getObvDupePairs(ZFrame blocked) { + return getObvDupeUtil().getObvDupePairs(blocked); + } + + protected ZFrame removeObvDupesFromBlocks(ZFrame blocks,ZFrame obvDupePairs) { + return getObvDupeUtil().removeObvDupesFromBlocks(blocks,obvDupePairs); + } + protected ZFrame addObvDupes(ZFrame obvDupePairs, ZFrame dupesActual) { if (obvDupePairs != null) { // ensure same columns in both @@ -181,54 +180,6 @@ protected ZFrame addObvDupes(ZFrame obvDupePairs, ZFrame removeObvDupesFromBlocks(ZFrame blocks) { - LOG.info("blocks count before removing obvDupePairs " + blocks.count()); - C reverseOBVDupeDFFilter = blocks.getReverseObviousDupesFilter(args.getObviousDupeCondition(),null); - if (reverseOBVDupeDFFilter != null) { - // remove dupes as already considered in obvDupePairs - blocks = blocks.filter(reverseOBVDupeDFFilter); - } - LOG.info("blocks count after removing obvDupePairs " + blocks.count()); - return blocks; - } - - protected ZFrame getObvDupePairs(ZFrame blocked) { - - String obviousDupeString = args.getObviousDupeCondition(); - - if (obviousDupeString == null || obviousDupeString.trim().isEmpty()) { - return null; - } - - ZFrame prefixBlocked = getDSUtil().getPrefixedColumnsDS(blocked); - C gtCond = blocked.gt(prefixBlocked,ColName.ID_COL); - - ZFrame onlyIds = null; - - // split on || (orSeperator) - String[] obvDupeORConditions = obviousDupeString.trim().split(ZFrame.orSeperator); - // loop thru the values and build a filter condition - for (int i = 0; i < obvDupeORConditions.length; i++) { - - C obvDupeDFFilter = blocked.getObviousDupesFilter(prefixBlocked,obvDupeORConditions[i],gtCond); - ZFrame onlyIdsTemp = blocked - .joinOnCol(prefixBlocked, obvDupeDFFilter).select(ColName.ID_COL, ColName.COL_PREFIX + ColName.ID_COL); - - if(onlyIds==null) { - onlyIds = onlyIdsTemp; - } else { - onlyIds = onlyIds.unionAll(onlyIdsTemp); - } - - } - - // remove duplicate pairs - onlyIds = onlyIds.distinct(); - onlyIds = massageAllEquals(onlyIds); - onlyIds = onlyIds.cache(); - - return onlyIds; - } public void writeOutput( ZFrame blocked, ZFrame dupesActual) throws ZinggClientException { try{ @@ -369,5 +320,16 @@ protected ZFrame getGraphWithScores(ZFrame graph, ZFrame getStopWords(); + + public ObviousDupesUtil getObvDupeUtil() { + if (obvDupeUtil==null) { + obvDupeUtil = new ObviousDupesUtil(context.getDSUtil(), args); + } + return obvDupeUtil; + } + + public void setObvDupeUtil(ObviousDupesUtil obvDupeUtil) { + this.obvDupeUtil = obvDupeUtil; + } } diff --git a/common/core/src/main/java/zingg/common/core/executor/TrainingDataFinder.java b/common/core/src/main/java/zingg/common/core/executor/TrainingDataFinder.java index f171063ba..f2b2feffd 100644 --- a/common/core/src/main/java/zingg/common/core/executor/TrainingDataFinder.java +++ b/common/core/src/main/java/zingg/common/core/executor/TrainingDataFinder.java @@ -12,13 +12,16 @@ import zingg.common.core.block.Canopy; import zingg.common.core.block.Tree; import zingg.common.core.model.Model; +import zingg.common.core.obviousdupes.ObviousDupesUtil; import zingg.common.core.preprocess.StopWordsRemover; public abstract class TrainingDataFinder extends ZinggBase{ + private static final long serialVersionUID = 1L; protected static String name = "zingg.TrainingDataFinder"; public static final Log LOG = LogFactory.getLog(TrainingDataFinder.class); - + protected ObviousDupesUtil obvDupeUtil; + public TrainingDataFinder() { setZinggOptions(ZinggOptions.FIND_TRAINING_DATA); } @@ -83,13 +86,18 @@ public void execute() throws ZinggClientException { ZFrame blocked = getBlockingTreeUtil().getBlockHashes(sample, tree); blocked = blocked.repartition(args.getNumPartitions(), blocked.col(ColName.HASH_COL)).cache(); - System.out.println("blocked"); + LOG.debug("blocked"); if (LOG.isDebugEnabled()) { blocked.show(true); } ZFrame blocks = getDSUtil().joinWithItself(blocked, ColName.HASH_COL, true); + // remove obv dupe pairs + blocks = getObvDupeUtil().removeObvDupesFromBlocks(blocks); + if (blocks.isEmpty()) { + LOG.warn("unable to find any pairs as all pairs sampled are part of the obvious duplicate condition"); + } blocks = blocks.cache(); - System.out.println("blocks"); + LOG.debug("blocks"); if (LOG.isDebugEnabled()) { blocks.show(); } @@ -192,5 +200,16 @@ public ZFrame getPositiveSamples(ZFrame data) throws Exception { } protected abstract StopWordsRemover getStopWords(); + + public ObviousDupesUtil getObvDupeUtil() { + if (obvDupeUtil==null) { + obvDupeUtil = new ObviousDupesUtil(context.getDSUtil(), args); + } + return obvDupeUtil; + } + + public void setObvDupeUtil(ObviousDupesUtil obvDupeUtil) { + this.obvDupeUtil = obvDupeUtil; + } } diff --git a/common/core/src/main/java/zingg/common/core/obviousdupes/ObviousDupesFilter.java b/common/core/src/main/java/zingg/common/core/obviousdupes/ObviousDupesFilter.java new file mode 100644 index 000000000..fdf86e00a --- /dev/null +++ b/common/core/src/main/java/zingg/common/core/obviousdupes/ObviousDupesFilter.java @@ -0,0 +1,178 @@ +package zingg.common.core.obviousdupes; + +import java.util.HashMap; + +import zingg.common.client.ObviousDupes; +import zingg.common.client.ZFrame; +import zingg.common.client.util.ColName; + +public class ObviousDupesFilter { + + public ObviousDupesFilter() { + + } + + /** + * Returns a Column filter for the DF given the obviousDupes condition + * + * @param df1 :DF containing the self joined data e.g. fname, z_fname + * @param obviousDupes obvious dupe conditions. Those in one MatchCondition are "AND" condition, will "OR" with other MatchCondition + * @param extraAndCond Any extra condition to be applied e.g. z_zid > z_z_id + * @return + */ + public C getObviousDupesFilter(ZFrame df1, ObviousDupes[] obviousDupes, C extraAndCond) { + return getObviousDupesFilter(df1,df1,obviousDupes,extraAndCond); + } + /** + * Returns a Column filter for the DFs given the obviousDupes condition + * + * @param df1 : 1st DF to join + * @param dfToJoin : 2nd DF to join with (the one having cols with z_ as prefix) + * @param obviousDupes obvious dupe conditions. Those in one MatchCondition are "AND" condition, will "OR" with other MatchCondition + * @param extraAndCond Any extra condition to be applied e.g. z_zid > z_z_id + * @return Column filter for the DFs given the obviousDupes condition + */ + public C getObviousDupesFilter(ZFrame df1, ZFrame dfToJoin, ObviousDupes[] obviousDupes, C extraAndCond) { + + if (dfToJoin==null || obviousDupes == null) { + return null; + } + + + C filterExpr = getFilterExpr(df1, dfToJoin, obviousDupes); + + filterExpr = addExtraAndCond(df1, extraAndCond, filterExpr); + + return filterExpr; + } + + /** + * loop thru the values and build a filter condition + * @param df1 : 1st DF to join + * @param dfToJoin : 2nd DF to join with (the one having cols with z_ as prefix) + * @param obviousDupes obvious dupe conditions. Those in one MatchCondition are "AND" condition, will "OR" with other MatchCondition + * @return Column filter for the DFs given the obviousDupes condition + */ + private C getFilterExpr(ZFrame df1, ZFrame dfToJoin, ObviousDupes[] obviousDupes) { + C filterExpr = null; + + for (int i = 0; i < obviousDupes.length; i++) { + + C andCond = getAndCondition(df1, dfToJoin, obviousDupes[i].getMatchCondition()); + + filterExpr = addOrCond(df1, filterExpr, andCond); + + } + return filterExpr; + } + /** + * Get the AND condition for particular match condition passed + * + * @param df1 : 1st DF to join + * @param dfToJoin : 2nd DF to join with (the one having cols with z_ as prefix) + * @param andConditions : The match condition having various cols to be part of "AND" condition + * @return AND condition for particular match condition passed + */ + private C getAndCondition(ZFrame df1, ZFrame dfToJoin, HashMap[] andConditions) { + C andCond = null; + if (andConditions != null) { + for (int j = 0; j < andConditions.length; j++) { + andCond = getAndCondForCol(df1, dfToJoin, andCond, andConditions[j].get(ObviousDupes.fieldName)); + } + } + return andCond; + } + + /** + * Form the "AND" cond for particular col and add to already existing and cond + * + * @param df1 : 1st DF to join + * @param dfToJoin : 2nd DF to join with (the one having cols with z_ as prefix) + * @param andCond The condition constructed so far before calling for this column + * @param colName The col for which condition is required + * @return AND condition for particular col passed added to what already is there + */ + private C getAndCondForCol(ZFrame df1, ZFrame dfToJoin, C andCond, String colName) { + + if (colName==null) return null; + + C column = df1.col(colName); + C columnWithPrefix = dfToJoin.col(ColName.COL_PREFIX + colName); + + C eqCond = getEqCond(df1, column, columnWithPrefix); + + andCond = (andCond != null) ? df1.and(andCond, eqCond) : eqCond; + return andCond; + } + + /** + * Form a condition like x = z_x along with null checks + * + * @param df1 + * @param column + * @param columnWithPrefix + * @return + */ + private C getEqCond(ZFrame df1, C column, C columnWithPrefix) { + C eqCond = df1.and( + df1.and( + df1.equalTo(column, columnWithPrefix), + df1.isNotNull(column) + ), + df1.isNotNull(columnWithPrefix) + ); + return eqCond; + } + + /** + * Combine multiple match conditions via OR + * @param df1 + * @param filterExpr + * @param andCond + * @return + */ + private C addOrCond(ZFrame df1, C filterExpr, C andCond) { + if (andCond != null) { + filterExpr = (filterExpr != null) ? df1.or(filterExpr, andCond) : andCond; + } + return filterExpr; + } + + /** + * Any extra AND condition like z_zid > z_z_zid is added to existing condition + * @param df1 + * @param extraAndCond + * @param filterExpr + * @return + */ + private C addExtraAndCond(ZFrame df1, C extraAndCond, C filterExpr) { + if (extraAndCond != null) { + filterExpr = (filterExpr != null) ? df1.and(filterExpr, extraAndCond) : extraAndCond; + } + return filterExpr; + } + + /** + * Used to filter out obv dupes by forming a NOT over obv dupe filter condition + * @param df1 + * @param obviousDupes + * @param extraAndCond + * @return + */ + public C getReverseObviousDupesFilter(ZFrame df1,ObviousDupes[] obviousDupes, C extraAndCond) { + return getReverseObviousDupesFilter(df1,df1,obviousDupes,extraAndCond); + } + + /** + * Used to filter out obv dupes by forming a NOT over obv dupe filter condition + * @param df1 + * @param dfToJoin + * @param obviousDupes + * @param extraAndCond + * @return + */ + public C getReverseObviousDupesFilter(ZFrame df1,ZFrame dfToJoin, ObviousDupes[] obviousDupes, C extraAndCond) { + return df1.not(getObviousDupesFilter(df1,dfToJoin,obviousDupes,extraAndCond)); + } + +} diff --git a/common/core/src/main/java/zingg/common/core/obviousdupes/ObviousDupesUtil.java b/common/core/src/main/java/zingg/common/core/obviousdupes/ObviousDupesUtil.java new file mode 100644 index 000000000..ef3702edd --- /dev/null +++ b/common/core/src/main/java/zingg/common/core/obviousdupes/ObviousDupesUtil.java @@ -0,0 +1,145 @@ +package zingg.common.core.obviousdupes; + +import java.io.Serializable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import zingg.common.client.Arguments; +import zingg.common.client.ObviousDupes; +import zingg.common.client.ZFrame; +import zingg.common.client.util.ColName; +import zingg.common.client.util.ColValues; +import zingg.common.core.util.DSUtil; + +public class ObviousDupesUtil implements Serializable { + + private static final long serialVersionUID = 1L; + public static final Log LOG = LogFactory.getLog(ObviousDupesUtil.class); + + protected Arguments args; + protected DSUtil dsUtil; + protected ObviousDupesFilter obvDupeFilter; + + public ObviousDupesUtil(DSUtil dsUtil, Arguments args) { + this.dsUtil = dsUtil; + this.args = args; + this.obvDupeFilter = new ObviousDupesFilter(); + } + + /** + * Input data example : + * + * Z_ZID FNAME LNAME DOB Z_HASH Z_ZSOURCE + * 3 Érik Guay 19830807 -798 customers + * 11 xani green 19390410 890 customers + * 19 sachin step 19461101 700 customers + * 23 Érika Charles 19830807 991 customers + * + * Output data example (if say DOB is obv dupe condition): + * Z_ZID Z_Z_ZID Z_PREDICTION Z_SCORE + * 23 3 1.0 1.0 + * + * + * @param blocked + * @return + */ + public ZFrame getObvDupePairs(ZFrame blocked) { + + ObviousDupes[] obviousDupes = args.getObviousDupes(); + + // no condition specified + if (obviousDupes == null || obviousDupes.length==0) { + return null; + } + + ZFrame prefixBlocked = dsUtil.getPrefixedColumnsDS(blocked); + C gtCond = blocked.gt(prefixBlocked,ColName.ID_COL); + + ZFrame onlyIds = null; + + // loop thru the values and build a filter condition + // instead of using one big condition with AND , OR + // we are breaking it down and than doing UNION / DISTINCT in end + // this is being done due to performance issues + // please do not condense it into one big condition with and / or + // ((col(ssn).eq(col(z_ssn)).or(col(dob).eq(z_col(dob)) => does not work due to performance + // as we are doing a kind of cartesian join across all data in table to find obv dupe + // in reverse i.e. in blocks it works as the data to be compared is within the row + // but here in blocked we don't use hash and we have to search across the table + // col(ssn).eq(col(z_ssn)) separately + // col(dob).eq(z_col(dob) separately + // union / distinct in end works + for (int i = 0; i < obviousDupes.length; i++) { + + C obvDupeDFFilter = obvDupeFilter.getObviousDupesFilter(blocked,prefixBlocked,new ObviousDupes[] {obviousDupes[i]},gtCond); + ZFrame onlyIdsTemp = blocked + .joinOnCol(prefixBlocked, obvDupeDFFilter).select(ColName.ID_COL, ColName.COL_PREFIX + ColName.ID_COL); + + if(onlyIds==null) { + onlyIds = onlyIdsTemp; + } else { + onlyIds = onlyIds.unionAll(onlyIdsTemp); + } + + } + + // remove duplicate pairs + onlyIds = onlyIds.distinct(); + onlyIds = massageObvDupes(onlyIds); + onlyIds = onlyIds.cache(); + + return onlyIds; + } + + /** + * Input data format : + * + * Z_ZID FNAME LNAME DOB Z_HASH Z_ZSOURCE Z_Z_ZID Z_FNAME Z_LNAME Z_DOB Z_Z_HASH Z_Z_ZSOURCE + * 3 Érik Guay 19830807 -798 customers 23 Érika Charles 19830807 -798 customers + * 11 xani green 19390410 890 customers 19 x g 19461101 890 customers + * + * Output data example (if say DOB is obv dupe condition): + * Z_ZID FNAME LNAME DOB Z_HASH Z_ZSOURCE Z_Z_ZID Z_FNAME Z_LNAME Z_DOB Z_Z_HASH Z_Z_ZSOURCE + * 11 xani green 19390410 890 customers 19 x g 19461101 890 customers + * + * + * @param blocked + * @return + */ + public ZFrame removeObvDupesFromBlocks(ZFrame blocks) { + + LOG.debug("blocks count before removing obvDupePairs " + blocks.count()); + ObviousDupes[] obviousDupes = args.getObviousDupes(); + if (obviousDupes == null || obviousDupes.length == 0) { + return blocks; + } + C reverseOBVDupeDFFilter = obvDupeFilter.getReverseObviousDupesFilter(blocks,obviousDupes,null); + // remove dupes as already considered in obvDupePairs + blocks = blocks.filter(reverseOBVDupeDFFilter); + LOG.debug("blocks count after removing obvDupePairs " + blocks.count()); + return blocks; + } + + public ZFrame removeObvDupesFromBlocks(ZFrame blocks,ZFrame obvDupePairs) { + + if(obvDupePairs==null || obvDupePairs.isEmpty()) { + return blocks; + } + + return removeObvDupesFromBlocks(blocks); + + } + + /** + * Add prediction and score cols + * @param obvDupes + * @return + */ + public ZFrame massageObvDupes(ZFrame obvDupes) { + obvDupes = obvDupes.withColumn(ColName.PREDICTION_COL, ColValues.IS_MATCH_PREDICTION); + obvDupes = obvDupes.withColumn(ColName.SCORE_COL, ColValues.FULL_MATCH_SCORE); + return obvDupes; + } + +} diff --git a/examples/febrl/configObvDupe.json b/examples/febrl/configObvDupe.json index bef337501..724f2f3b7 100644 --- a/examples/febrl/configObvDupe.json +++ b/examples/febrl/configObvDupe.json @@ -87,7 +87,28 @@ "schema": "recId string, fname string, lname string, stNo string, add1 string, add2 string, city string, state string, areacode string, dob string, ssn string" } ], - "obviousDupeCondition" : "FNAME & STNO & ADD1", + "obviousDupes":[ + { + "matchCondition":[ + { + "fieldName":"fname" + }, + { + "fieldName":"stNo" + }, + { + "fieldName":"add1" + } + ] + }, + { + "matchCondition":[ + { + "fieldName":"recId" + } + ] + } + ], "labelDataSampleSize" : 0.5, "numPartitions":4, "modelId": 100, diff --git a/spark/client/src/main/java/zingg/spark/client/SparkFrame.java b/spark/client/src/main/java/zingg/spark/client/SparkFrame.java index dcea8bfe9..7b1bc13cc 100644 --- a/spark/client/src/main/java/zingg/spark/client/SparkFrame.java +++ b/spark/client/src/main/java/zingg/spark/client/SparkFrame.java @@ -221,11 +221,15 @@ public Column equalTo(String c, String e){ return df.col(c).equalTo(e); } + public Column equalTo(Column column1, Column column2) { + return column1.equalTo(column2); + } + public Column notEqual(String e) { return df.col(e).notEqual(df.col(ColName.COL_PREFIX + e)); } - public Column notEqual(String c, String e) { + public Column notEqual(String c, String e) { return df.col(c).notEqual(e); } @@ -240,7 +244,27 @@ public Column equalTo(String c, double e){ public Column notEqual(String c, int e) { return df.col(c).notEqual(e); } - + + @Override + public Column not(Column col) { + return functions.not(col); + } + + @Override + public Column isNotNull(Column col) { + return col.isNotNull(); + } + + @Override + public Column and(Column col1, Column col2) { + return col1.and(col2); + } + + @Override + public Column or(Column col1, Column col2) { + return col1.or(col2); + } + public ZFrame, Row, Column> sample(boolean withReplacement, float num){ return new SparkFrame(df.sample(withReplacement, num)); } @@ -389,122 +413,5 @@ public ZFrame, Row, Column> filterNotNullCond(String colName) { public ZFrame, Row, Column> filterNullCond(String colName) { return this.filter(df.col(colName).isNull()); } - - - /** - * - * obviousDupeString format col1 & col2 | col3 | col4 & col5 - * - * @param obviousDupeString - * @return - */ - public Column getObviousDupesFilter(String obviousDupeString, Column extraAndCond) { - return getObviousDupesFilter(this,obviousDupeString,extraAndCond); - } - - /** - * - * obviousDupeString format col1 & col2 | col3 | col4 & col5 - * - * @param obviousDupeString - * @return - */ - @Override - public Column getObviousDupesFilter(ZFrame, Row, Column> dfToJoin, String obviousDupeString, Column extraAndCond) { - - if (dfToJoin==null || obviousDupeString == null || obviousDupeString.trim().isEmpty()) { - return null; - } - - // split on || (orSeperator) - String[] obvDupeORConditions = new String[] {}; - - obvDupeORConditions = obviousDupeString.trim().split(orSeperator); - - // loop thru the values and build a filter condition - Column filterExpr = null; - for (int i = 0; i < obvDupeORConditions.length; i++) { - - // parse on &(andSeperator) for obvDupeCond[i] and form a column filter - // expression [keep adding to filterExpr] - // if number of columns in and condition = 1 => something like uid or ssn => - // direct match if equal - Column andCond = null; - String orCondStr = obvDupeORConditions[i]; - - if (orCondStr != null && !orCondStr.isEmpty()) { - - String[] andConditions = orCondStr.trim().split(andSeperator); - - if (andConditions != null) { - for (int j = 0; j < andConditions.length; j++) { - - String andCondStr = andConditions[j]; - - if (andCondStr != null && !andCondStr.trim().isEmpty()) { - - String colName = andCondStr.trim(); - Column column = this.col(colName); - Column columnWithPrefix = dfToJoin.col(ColName.COL_PREFIX + colName); - - Column eqCond = column.equalTo(columnWithPrefix).and(column.isNotNull()) - .and(columnWithPrefix.isNotNull()); - - if (andCond != null) { - andCond = andCond.and(eqCond); - } else { - andCond = eqCond; - } - - } - } - } - } - - if (andCond != null) { - if (filterExpr != null) { - filterExpr = filterExpr.or(andCond); - } else { - filterExpr = andCond; - } - } - - } - - if (extraAndCond != null) { - if (filterExpr != null) { - filterExpr = filterExpr.and(extraAndCond); - } else { - filterExpr = extraAndCond; - } - } - - return filterExpr; - } - - /** - * - * obviousDupeString format col1 & col2 | col3 | col4 & col5 - * - * @param obviousDupeString - * @return - */ - @Override - public Column getReverseObviousDupesFilter(String obviousDupeString, Column extraAndCond) { - return getReverseObviousDupesFilter(this,obviousDupeString,extraAndCond); - } - - /** - * - * obviousDupeString format col1 & col2 | col3 | col4 & col5 - * - * @param obviousDupeString - * @return - */ - @Override - public Column getReverseObviousDupesFilter(ZFrame, Row, Column> dfToJoin, String obviousDupeString, Column extraAndCond) { - return functions.not(getObviousDupesFilter(dfToJoin,obviousDupeString,extraAndCond)); - } - } \ No newline at end of file diff --git a/spark/client/src/test/java/zingg/client/TestSparkFrame.java b/spark/client/src/test/java/zingg/client/TestSparkFrame.java index 8e69e4e06..0166f2412 100644 --- a/spark/client/src/test/java/zingg/client/TestSparkFrame.java +++ b/spark/client/src/test/java/zingg/client/TestSparkFrame.java @@ -320,48 +320,6 @@ public void testFilterNullCond(){ } - @Test - public void testGetObviousDupesFilter() throws ZinggClientException { - - SparkFrame posDF = getPosPairDF(); - - Column filter = posDF.getObviousDupesFilter("name & event & comment | dob | comment & year",null); - - String expectedCond = "(((((((name = z_name) AND (name IS NOT NULL)) AND (z_name IS NOT NULL)) AND (((event = z_event) AND (event IS NOT NULL)) AND (z_event IS NOT NULL))) AND (((comment = z_comment) AND (comment IS NOT NULL)) AND (z_comment IS NOT NULL))) OR (((dob = z_dob) AND (dob IS NOT NULL)) AND (z_dob IS NOT NULL))) OR ((((comment = z_comment) AND (comment IS NOT NULL)) AND (z_comment IS NOT NULL)) AND (((year = z_year) AND (year IS NOT NULL)) AND (z_year IS NOT NULL))))"; - - assertEquals(expectedCond,filter.toString()); - - } - - @Test - public void testGetObviousDupesFilterWithExtraCond() throws ZinggClientException { - - SparkFrame posDF = getPosPairDF(); - Column gtCond = posDF.gt("z_zid"); - - Column filter = posDF.getObviousDupesFilter("name & event & comment | dob | comment & year",gtCond); - - System.out.println(filter.toString()); - - String expectedCond = "((((((((name = z_name) AND (name IS NOT NULL)) AND (z_name IS NOT NULL)) AND (((event = z_event) AND (event IS NOT NULL)) AND (z_event IS NOT NULL))) AND (((comment = z_comment) AND (comment IS NOT NULL)) AND (z_comment IS NOT NULL))) OR (((dob = z_dob) AND (dob IS NOT NULL)) AND (z_dob IS NOT NULL))) OR ((((comment = z_comment) AND (comment IS NOT NULL)) AND (z_comment IS NOT NULL)) AND (((year = z_year) AND (year IS NOT NULL)) AND (z_year IS NOT NULL)))) AND (z_zid > z_z_zid))"; - - assertEquals(expectedCond,filter.toString()); - - } - - @Test - public void testGetReverseObviousDupesFilter() throws ZinggClientException { - - SparkFrame posDF = getPosPairDF(); - - Column filter = posDF.getReverseObviousDupesFilter("name & event & comment | dob | comment & year",null); - - String expectedCond = "(NOT (((((((name = z_name) AND (name IS NOT NULL)) AND (z_name IS NOT NULL)) AND (((event = z_event) AND (event IS NOT NULL)) AND (z_event IS NOT NULL))) AND (((comment = z_comment) AND (comment IS NOT NULL)) AND (z_comment IS NOT NULL))) OR (((dob = z_dob) AND (dob IS NOT NULL)) AND (z_dob IS NOT NULL))) OR ((((comment = z_comment) AND (comment IS NOT NULL)) AND (z_comment IS NOT NULL)) AND (((year = z_year) AND (year IS NOT NULL)) AND (z_year IS NOT NULL)))))"; - - assertEquals(expectedCond,filter.toString()); - - } - private SparkFrame getPosPairDF() { Row[] posData = getPosPairRows(); StructType schema = getPairSchema(); diff --git a/spark/core/src/test/java/zingg/common/core/obviousdupes/TestObviousDupesFilter.java b/spark/core/src/test/java/zingg/common/core/obviousdupes/TestObviousDupesFilter.java new file mode 100644 index 000000000..8a5a04e64 --- /dev/null +++ b/spark/core/src/test/java/zingg/common/core/obviousdupes/TestObviousDupesFilter.java @@ -0,0 +1,146 @@ +package zingg.common.core.obviousdupes; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Arrays; +import java.util.HashMap; + +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.Test; + +import zingg.common.client.ObviousDupes; +import zingg.common.client.ZinggClientException; +import zingg.spark.client.SparkFrame; +import zingg.spark.core.executor.ZinggSparkTester; + +public class TestObviousDupesFilter extends ZinggSparkTester { + + ObviousDupesFilter,Row,Column> obvDupeFilter = new ObviousDupesFilter,Row,Column>(); + + @Test + public void testGetObviousDupesFilter() throws ZinggClientException { + + SparkFrame posDF = getPosPairDF(); + + Column filter = obvDupeFilter.getObviousDupesFilter(posDF,getObvDupeCond(),null); + + String expectedCond = "(((((((name = z_name) AND (name IS NOT NULL)) AND (z_name IS NOT NULL)) AND (((event = z_event) AND (event IS NOT NULL)) AND (z_event IS NOT NULL))) AND (((comment = z_comment) AND (comment IS NOT NULL)) AND (z_comment IS NOT NULL))) OR (((dob = z_dob) AND (dob IS NOT NULL)) AND (z_dob IS NOT NULL))) OR ((((comment = z_comment) AND (comment IS NOT NULL)) AND (z_comment IS NOT NULL)) AND (((year = z_year) AND (year IS NOT NULL)) AND (z_year IS NOT NULL))))"; + + assertEquals(expectedCond,filter.toString()); + + } + + @Test + public void testGetObviousDupesFilterWithExtraCond() throws ZinggClientException { + SparkFrame posDF = getPosPairDF(); + Column gtCond = posDF.gt("z_zid"); + + Column filter = obvDupeFilter.getObviousDupesFilter(posDF,getObvDupeCond(),gtCond); + + System.out.println(filter.toString()); + + String expectedCond = "((((((((name = z_name) AND (name IS NOT NULL)) AND (z_name IS NOT NULL)) AND (((event = z_event) AND (event IS NOT NULL)) AND (z_event IS NOT NULL))) AND (((comment = z_comment) AND (comment IS NOT NULL)) AND (z_comment IS NOT NULL))) OR (((dob = z_dob) AND (dob IS NOT NULL)) AND (z_dob IS NOT NULL))) OR ((((comment = z_comment) AND (comment IS NOT NULL)) AND (z_comment IS NOT NULL)) AND (((year = z_year) AND (year IS NOT NULL)) AND (z_year IS NOT NULL)))) AND (z_zid > z_z_zid))"; + + assertEquals(expectedCond,filter.toString()); + + } + + @Test + public void testGetReverseObviousDupesFilter() throws ZinggClientException { + + SparkFrame posDF = getPosPairDF(); + ObviousDupes[] obvDupe = getObvDupeCond(); + + + Column filter = obvDupeFilter.getReverseObviousDupesFilter(posDF,obvDupe,null); + + String expectedCond = "(NOT (((((((name = z_name) AND (name IS NOT NULL)) AND (z_name IS NOT NULL)) AND (((event = z_event) AND (event IS NOT NULL)) AND (z_event IS NOT NULL))) AND (((comment = z_comment) AND (comment IS NOT NULL)) AND (z_comment IS NOT NULL))) OR (((dob = z_dob) AND (dob IS NOT NULL)) AND (z_dob IS NOT NULL))) OR ((((comment = z_comment) AND (comment IS NOT NULL)) AND (z_comment IS NOT NULL)) AND (((year = z_year) AND (year IS NOT NULL)) AND (z_year IS NOT NULL)))))"; + + assertEquals(expectedCond,filter.toString()); + + } + + private SparkFrame getPosPairDF() { + Row[] posData = getPosPairRows(); + StructType schema = getPairSchema(); + SparkFrame posDF = new SparkFrame(spark.createDataFrame(Arrays.asList(posData), schema)); + return posDF; + } + + private Row[] getPosPairRows() { + int row_id = 1; + // Create a DataFrame containing test data + Row[] posData = { + RowFactory.create( row_id++, "1675683807452:31", "nicole","event1","comment1", 1992, new Integer(100), 1, row_id++, "1675683807452:31", "nicol","event11","comment11" , 1992, new Integer(101),1), + RowFactory.create( row_id++, "1675683807452:32", "vkas","event2","comment2",1993, new Integer(200),1, row_id++, "1675683807452:32", "vikas","event12","comment21" ,1992, new Integer(201),1 ), + RowFactory.create(row_id++, "1675683807452:33", "agrawaal","event3","comment3",1994, new Integer(300),1, row_id++, "1675683807452:33", "agarwal","event13","comment31" ,1992, new Integer(301),1 ), + RowFactory.create( row_id++, "1675683807452:31", "nicole","event1","comment1", 1992, new Integer(100), 1, row_id++, "1675683807452:31", "nicol","event11","comment11" , 1992, new Integer(101),1), + RowFactory.create( row_id++, "1675683807452:32", "vkas","event2","comment2",1993, new Integer(200),1, row_id++, "1675683807452:32", "vikas","event12","comment21" ,1992, new Integer(201),1 ), + RowFactory.create(row_id++, "1675683807452:33", "agrawaal","event3","comment3",1994, new Integer(300),1, row_id++, "1675683807452:33", "agarwal","event13","comment31" ,1992, new Integer(301),1 ), + RowFactory.create( row_id++, "1675683807452:31", "nicole","event1","comment1", 1992, new Integer(100), 1, row_id++, "1675683807452:31", "nicol","event11","comment11" , 1992, new Integer(101),1), + RowFactory.create( row_id++, "1675683807452:32", "vkas","event2","comment2",1993, new Integer(200),1, row_id++, "1675683807452:32", "vikas","event12","comment21" ,1992, new Integer(201),1 ), + RowFactory.create(row_id++, "1675683807452:33", "agrawaal","event3","comment3",1994, new Integer(300),1, row_id++, "1675683807452:33", "agarwal","event13","comment31" ,1992, new Integer(301),1 ), + RowFactory.create( row_id++, "1675683807452:31", "nicole","event1","comment1", 1992, new Integer(100), 1, row_id++, "1675683807452:31", "nicol","event11","comment11" , 1992, new Integer(101),1), + RowFactory.create( row_id++, "1675683807452:32", "vkas","event2","comment2",1993, new Integer(200),1, row_id++, "1675683807452:32", "vikas","event12","comment21" ,1992, new Integer(201),1 ), + RowFactory.create(row_id++, "1675683807452:33", "agrawaal","event3","comment3",1994, new Integer(300),1, row_id++, "1675683807452:33", "agarwal","event13","comment31" ,1992, new Integer(301),1 ), + RowFactory.create( ++row_id, "52", "nameObvDupe1" ,"def" ,"geh" ,1900, new Integer(1900), 0,++row_id, "410", "nameObvDupe1", "lmn", "opq", 2001, new Integer(1900), 0), + RowFactory.create( ++row_id, "53", "nameObvDupe2" ,"eventObvDupe2" ,"commentObvDupe2" ,1900, new Integer(1900), 0,++row_id, "54", "nameObvDupe2", "eventObvDupe2", "commentObvDupe2", 2001, new Integer(1901), 0), + RowFactory.create( ++row_id, "53", "nameObvDupe3" ,"eventObvDupe3" ,"commentObvDupe3" ,1900, new Integer(1901), 0,++row_id, "54", "nameObvDupe3", "eventObvDupe3", "commentObvDupe3", 2001, new Integer(1901), 0), + RowFactory.create( ++row_id, "53", "nameObvDupe3" ,"eventObvDupe3" ,"commentObvDupe3" ,1900, new Integer(1901), 0,++row_id, "54", null, "eventObvDupe3", "commentObvDupe3", 2001, new Integer(1901), 0) + }; + return posData; + } + + private StructType getPairSchema() { + StructType schema = new StructType(new StructField[] { + new StructField("z_zid", DataTypes.IntegerType, true, Metadata.empty()), + new StructField("z_cluster", DataTypes.StringType, true, Metadata.empty()), + new StructField("name", DataTypes.StringType, true, Metadata.empty()), + new StructField("event", DataTypes.StringType, true, Metadata.empty()), + new StructField("comment", DataTypes.StringType, true, Metadata.empty()), + new StructField("year", DataTypes.IntegerType, true, Metadata.empty()), + new StructField("dob", DataTypes.IntegerType, true, Metadata.empty()), + new StructField("z_isMatch", DataTypes.IntegerType, true, Metadata.empty()), + new StructField("z_z_zid", DataTypes.IntegerType, true, Metadata.empty()), + new StructField("z_z_cluster", DataTypes.StringType, true, Metadata.empty()), + new StructField("z_name", DataTypes.StringType, true, Metadata.empty()), + new StructField("z_event", DataTypes.StringType, true, Metadata.empty()), + new StructField("z_comment", DataTypes.StringType, true, Metadata.empty()), + new StructField("z_year", DataTypes.IntegerType, true, Metadata.empty()), + new StructField("z_dob", DataTypes.IntegerType, true, Metadata.empty()), + new StructField("z_z_isMatch", DataTypes.IntegerType, true, Metadata.empty())} + ); + return schema; + } + + protected ObviousDupes getObviousDupes(String field) { + return getObviousDupes(new String[] {field}); + } + + protected ObviousDupes getObviousDupes(String[] fields) { + HashMap[] matchCondition = new HashMap[fields.length]; + for (int i = 0; i < fields.length; i++) { + matchCondition[i] = new HashMap(); + matchCondition[i].put(ObviousDupes.fieldName,fields[i]); + } + return new ObviousDupes(matchCondition); + } + + public ObviousDupes[] getObvDupeCond() { + ObviousDupes obvDupe1 = getObviousDupes(new String[]{"name","event","comment"}); + ObviousDupes obvDupe2 = getObviousDupes("dob"); + ObviousDupes obvDupe3 = getObviousDupes(new String[]{"comment","year"}); + ObviousDupes[] obvDupe = new ObviousDupes[] {obvDupe1,obvDupe2,obvDupe3}; + return obvDupe; + } + + +} + diff --git a/spark/core/src/test/java/zingg/common/core/obviousdupes/TestObviousDupesUtil.java b/spark/core/src/test/java/zingg/common/core/obviousdupes/TestObviousDupesUtil.java new file mode 100644 index 000000000..a1d73ead0 --- /dev/null +++ b/spark/core/src/test/java/zingg/common/core/obviousdupes/TestObviousDupesUtil.java @@ -0,0 +1,204 @@ +package zingg.common.core.obviousdupes; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.Arrays; +import java.util.HashMap; + +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.Test; + +import zingg.common.client.Arguments; +import zingg.common.client.FieldDefinition; +import zingg.common.client.MatchType; +import zingg.common.client.ObviousDupes; +import zingg.common.client.ZFrame; +import zingg.common.client.ZinggClientException; +import zingg.common.client.pipe.Pipe; +import zingg.common.client.util.ColName; +import zingg.spark.client.SparkFrame; +import zingg.spark.client.ZSparkSession; +import zingg.spark.client.pipe.SparkPipe; +import zingg.spark.core.executor.ZinggSparkTester; + +public class TestObviousDupesUtil extends ZinggSparkTester { + + @Test + public void testGetObvDupePairs() throws ZinggClientException { + ZFrame, Row, Column> pairs = getObvDupeUtil().getObvDupePairs(getBlockedDF()); + assertEquals(1, pairs.count()); + Row r = pairs.head(); + assertEquals(23, pairs.getAsInt(r,ColName.ID_COL)); + assertEquals(3, pairs.getAsInt(r,ColName.COL_PREFIX + ColName.ID_COL)); + } + + @Test + public void testGetObvDupePairsNull() throws ZinggClientException { + ZFrame, Row, Column> pairs = getObvDupeUtilEmptyArgs().getObvDupePairs(getBlockedDF()); + assertNull(pairs); + } + + @Test + public void testRemoveObvDupesFromBlocks() throws ZinggClientException { + ZFrame, Row, Column> pairs = getObvDupeUtil().removeObvDupesFromBlocks(getBlocksDF()); + assertEquals(1, pairs.count()); + Row r = pairs.head(); + assertEquals(11, pairs.getAsInt(r,ColName.ID_COL)); + assertEquals(19, pairs.getAsInt(r,ColName.COL_PREFIX + ColName.ID_COL)); + } + + @Test + public void testRemoveObvDupesFromBlocks2() throws ZinggClientException { + // obv dupe df is null => don't remove dupes + ZFrame, Row, Column> pairs = getObvDupeUtil().removeObvDupesFromBlocks(getBlocksDF(), null); + assertEquals(2, pairs.count()); + } + + @Test + public void testRemoveObvDupesFromBlocks3() throws ZinggClientException { + // as long as obv dupe df is not empty => remove dupes + ZFrame, Row, Column> pairs = getObvDupeUtil().removeObvDupesFromBlocks(getBlocksDF(),getBlocksDF()); + assertEquals(1, pairs.count()); + Row r = pairs.head(); + assertEquals(11, pairs.getAsInt(r,ColName.ID_COL)); + assertEquals(19, pairs.getAsInt(r,ColName.COL_PREFIX + ColName.ID_COL)); + } + + @Test + public void testRemoveObvDupesFromBlocks4() throws ZinggClientException { + ZFrame, Row, Column> emptyDF = getBlocksDF().filterNullCond(ColName.ID_COL); + + // obv dupe df is empty => don't remove dupes + ZFrame, Row, Column> pairs = getObvDupeUtil().removeObvDupesFromBlocks(getBlocksDF(), emptyDF); + assertEquals(2, pairs.count()); + } + + @Test + public void testRemoveObvDupesFromBlocksNull() throws ZinggClientException { + ZFrame, Row, Column> pairs = getObvDupeUtilEmptyArgs().removeObvDupesFromBlocks(getBlocksDF()); + assertEquals(2, pairs.count()); + } + + public Arguments getArgs() throws ZinggClientException { + Arguments args = new Arguments(); + FieldDefinition fname = new FieldDefinition(); + fname.setFieldName("fname"); + fname.setDataType("string"); + fname.setMatchType(Arrays.asList(MatchType.EXACT, MatchType.FUZZY)); + fname.setFields("fname"); + + FieldDefinition lname = new FieldDefinition(); + lname.setFieldName("lname"); + lname.setDataType("string"); + lname.setMatchType(Arrays.asList(MatchType.FUZZY)); + lname.setFields("lname"); + + FieldDefinition dob = new FieldDefinition(); + lname.setFieldName("dob"); + lname.setDataType("long"); + lname.setMatchType(Arrays.asList(MatchType.FUZZY)); + lname.setFields("dob"); + + args.setFieldDefinition(Arrays.asList(fname, lname, dob)); + + Pipe inputPipe = new SparkPipe(); + inputPipe.setName("test"); + inputPipe.setFormat(Pipe.FORMAT_CSV); + inputPipe.setProp("location", "examples/febrl/test.csv"); + args.setData(new Pipe[] {inputPipe}); + + Pipe outputPipe = new SparkPipe(); + outputPipe.setName("output"); + outputPipe.setFormat(Pipe.FORMAT_CSV); + outputPipe.setProp("location", "examples/febrl/output.csv"); + args.setOutput(new Pipe[] {outputPipe}); + + args.setBlockSize(400L); + args.setCollectMetrics(true); + args.setModelId("500"); + + args.setObviousDupes(new ObviousDupes[]{getObviousDupes("dob")}); + + return args; + + } + + + protected SparkFrame getBlockedDF() { + Row[] rows = { + RowFactory.create(3, "Érik", "Guay", 19830807, -798, "customers"), + RowFactory.create(11, "xani", "green", 19390410, 890, "customers"), + RowFactory.create(19, "sachin", "step", 19461101, 700, "customers"), + RowFactory.create(23, "Érika", "Charles", 19830807, 991, "customers") + }; + StructType schema = new StructType( + new StructField[] { + new StructField(ColName.ID_COL, DataTypes.IntegerType, false, Metadata.empty()), + new StructField("fname", DataTypes.StringType, false, Metadata.empty()), + new StructField("lname", DataTypes.StringType, false, Metadata.empty()), + new StructField("dob", DataTypes.IntegerType, false, Metadata.empty()), + new StructField(ColName.HASH_COL, DataTypes.IntegerType, false, Metadata.empty()), + new StructField(ColName.SOURCE_COL, DataTypes.StringType, false, Metadata.empty()) + }); + SparkFrame df = new SparkFrame(spark.createDataFrame(Arrays.asList(rows), schema)); + return df; + } + + protected SparkFrame getBlocksDF() { + Row[] rows = { + RowFactory.create(3, "Érik", "Guay", 19830807, -798, "customers",23, "Érika", "Charles", 19830807, -798, "customers"), + RowFactory.create(11, "xani", "green", 19390410, 890, "customers",19, "x", "g", 19461101, 890, "customers") + + }; + StructType schema = new StructType( + new StructField[] { + new StructField(ColName.ID_COL, DataTypes.IntegerType, false, Metadata.empty()), + new StructField("fname", DataTypes.StringType, false, Metadata.empty()), + new StructField("lname", DataTypes.StringType, false, Metadata.empty()), + new StructField("dob", DataTypes.IntegerType, false, Metadata.empty()), + new StructField(ColName.HASH_COL, DataTypes.IntegerType, false, Metadata.empty()), + new StructField(ColName.SOURCE_COL, DataTypes.StringType, false, Metadata.empty()), + new StructField(ColName.COL_PREFIX + ColName.ID_COL, DataTypes.IntegerType, false, Metadata.empty()), + new StructField(ColName.COL_PREFIX + "fname", DataTypes.StringType, false, Metadata.empty()), + new StructField(ColName.COL_PREFIX + "lname", DataTypes.StringType, false, Metadata.empty()), + new StructField(ColName.COL_PREFIX + "dob", DataTypes.IntegerType, false, Metadata.empty()), + new StructField(ColName.COL_PREFIX + ColName.HASH_COL, DataTypes.IntegerType, false, Metadata.empty()), + new StructField(ColName.COL_PREFIX + ColName.SOURCE_COL, DataTypes.StringType, false, Metadata.empty()) + }); + SparkFrame df = new SparkFrame(spark.createDataFrame(Arrays.asList(rows), schema)); + return df; + } + + private ObviousDupes getObviousDupes(String field) { + return getObviousDupes(new String[] {field}); + } + + private ObviousDupes getObviousDupes(String[] fields) { + HashMap[] matchCondition = new HashMap[fields.length]; + for (int i = 0; i < fields.length; i++) { + matchCondition[i] = new HashMap(); + matchCondition[i].put(ObviousDupes.fieldName,fields[i]); + } + return new ObviousDupes(matchCondition); + } + + protected ObviousDupesUtil, Row, Column> getObvDupeUtil() throws ZinggClientException { + return new ObviousDupesUtil, Row, Column>(zsCTX.getDSUtil(), getArgs()); + } + + protected ObviousDupesUtil, Row, Column> getObvDupeUtilEmptyArgs() { + return new ObviousDupesUtil, Row, Column>(zsCTX.getDSUtil(), new Arguments()); + } + + + +} +