From 400769283b070d17ea596dfe856a86d07e9fbeed Mon Sep 17 00:00:00 2001 From: Navin Singh Date: Wed, 5 Jan 2022 14:08:06 +0530 Subject: [PATCH] Giving same cluster id to all records linked from multiple sources #108 --- core/src/main/java/zingg/Linker.java | 7 +++---- core/src/main/java/zingg/util/DSUtil.java | 2 ++ 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/zingg/Linker.java b/core/src/main/java/zingg/Linker.java index c441543ac..5928e2fcd 100644 --- a/core/src/main/java/zingg/Linker.java +++ b/core/src/main/java/zingg/Linker.java @@ -61,10 +61,9 @@ public void writeOutput(Dataset blocked, Dataset dupes) { if (args.getOutput() != null) { // input dupes are pairs - dupesActual = DFUtil.addClusterRowNumber(dupesActual, spark); - dupesActual = Util.addUniqueCol(dupesActual, ColName.CLUSTER_COLUMN); - Dataset dupes1 = DSUtil.alignLinked(dupesActual, args); - Dataset dupes2 = dupes1.orderBy(ColName.CLUSTER_COLUMN); + //dupesActual = DFUtil.addClusterRowNumber(dupesActual, spark); + dupesActual = Util.addUniqueCol(dupesActual, ColName.ID_COL); + Dataset dupes2 = DSUtil.alignLinked(dupesActual, args); LOG.debug("uncertain output schema is " + dupes2.schema()); PipeUtil.write(dupes2, args, ctx, args.getOutput()); } diff --git a/core/src/main/java/zingg/util/DSUtil.java b/core/src/main/java/zingg/util/DSUtil.java index 2f742adda..cb0fbbc96 100644 --- a/core/src/main/java/zingg/util/DSUtil.java +++ b/core/src/main/java/zingg/util/DSUtil.java @@ -92,6 +92,7 @@ public static Dataset joinWithItselfSourceSensitive(Dataset lines, Str public static Dataset alignLinked(Dataset dupesActual, Arguments args) { dupesActual = dupesActual.cache(); + dupesActual = dupesActual.withColumnRenamed(ColName.ID_COL, ColName.CLUSTER_COLUMN); List cols = new ArrayList(); cols.add(dupesActual.col(ColName.CLUSTER_COLUMN)); cols.add(dupesActual.col(ColName.SOURCE_COL)); @@ -102,6 +103,7 @@ public static Dataset alignLinked(Dataset dupesActual, Arguments args) } Dataset dupes1 = dupesActual.select(JavaConverters.asScalaIteratorConverter(cols.iterator()).asScala().toSeq()); + dupes1 = dupes1.dropDuplicates(ColName.CLUSTER_COLUMN, ColName.SOURCE_COL); List cols1 = new ArrayList(); cols1.add(dupesActual.col(ColName.CLUSTER_COLUMN)); cols1.add(dupesActual.col(ColName.COL_PREFIX +ColName.SOURCE_COL));