Skip to content

Commit

Permalink
clean up session, context data issue #534
Browse files Browse the repository at this point in the history
  • Loading branch information
vikasgupta78 committed Mar 15, 2023
1 parent bcdc7bf commit 2c68ad9
Show file tree
Hide file tree
Showing 19 changed files with 33 additions and 111 deletions.
2 changes: 2 additions & 0 deletions common/core/src/main/java/zingg/common/core/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public interface Context <S,D, R, C,T> extends Serializable {
public void init(String license)
throws ZinggClientException;

public void cleanup();

/**convenience method to set all utils
* especially useful when you dont want to create the connection/spark context etc
* */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@ public DataColDocumenter(Context<S,D,R,C,T> context, Arguments args) {
public void process(ZFrame<D, R, C> data) throws ZinggClientException {
}

@Override
public void cleanup() throws ZinggClientException {
// TODO Auto-generated method stub

}

@Override
public void execute() throws ZinggClientException {
// TODO Auto-generated method stub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,6 @@ protected List<String[]> getFieldDataList() {
return list;
}

@Override
public void cleanup() throws ZinggClientException {
// TODO Auto-generated method stub

}


@Override
public void execute() throws ZinggClientException {
// TODO Auto-generated method stub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ public DocumenterBase(Context<S,D,R,C,T> context, Arguments args) {
config = createConfigurationObject();
}

public abstract void cleanup() throws ZinggClientException;

public Configuration getTemplateConfig() {
if (config == null) {
config = createConfigurationObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,6 @@ private void prepareAndWriteColumnDocument(String fieldName, String columnsDir)
}
}

@Override
public void cleanup() throws ZinggClientException {
// TODO Auto-generated method stub

}

@Override
public void execute() throws ZinggClientException {
// TODO Auto-generated method stub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,6 @@ protected Map<String, Object> populateTemplateData() {
return root;
}

@Override
public void cleanup() throws ZinggClientException {
// TODO Auto-generated method stub

}

@Override
public void execute() throws ZinggClientException {
// TODO Auto-generated method stub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ public void execute() throws ZinggClientException {
}
}

@Override
public void cleanup() throws ZinggClientException {
// TODO Auto-generated method stub

}

/**
* To be implemented by concrete implementation of Spark/Snow etc.
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ public Long getUnsureMarkedRecordsStat(ZFrame<D,R,C> markedRecords){

public abstract void execute() throws ZinggClientException ;

@Override
public void cleanup() throws ZinggClientException {
context.cleanup();
}

public HashUtil<S,D,R,C,T> getHashUtil() {
return context.getHashUtil();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,6 @@ public void init(Arguments args, String license) throws ZinggClientException {
getContext().init(license);
}

@Override
public void cleanup() throws ZinggClientException {
// TODO Auto-generated method stub

}


@Override
protected ModelDocumenter<SparkSession, Dataset<Row>, Row, Column, DataType> getModelDocumenter() {
return new SparkModelDocumenter(getContext(),getArgs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@ public class SparkFindAndLabeller extends FindAndLabeller<SparkSession, Dataset<

public SparkFindAndLabeller() {
setZinggOptions(ZinggOptions.FIND_AND_LABEL);
}


@Override
public void cleanup() throws ZinggClientException {
// TODO Auto-generated method stub

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ public void init(Arguments args, String license) throws ZinggClientException {
getContext().init(license);
}

@Override
public void cleanup() throws ZinggClientException {
// TODO Auto-generated method stub

}

protected Pipe setSaveModeOnPipe(Pipe p) {
p.setMode(SaveMode.Overwrite.toString());
return p;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,6 @@ public void init(Arguments args, String license) throws ZinggClientException {
super.init(args, license);
getContext().init(license);
}


@Override
public void cleanup() throws ZinggClientException {
// TODO Auto-generated method stub

}



}
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ public void init(Arguments args, String license) throws ZinggClientException {
getContext().init(license);
}

@Override
public void cleanup() throws ZinggClientException {
// TODO Auto-generated method stub

}

@Override
protected Model getModel() throws ZinggClientException {
Model model = getModelUtil().loadModel(false, args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,6 @@ public void init(Arguments args, String license) throws ZinggClientException {
}


@Override
public void cleanup() throws ZinggClientException {
// TODO Auto-generated method stub

}


@Override
protected Model getModel() throws ZinggClientException {
Model model = getModelUtil().loadModel(false, args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,5 @@ public void execute() throws ZinggClientException {
}
}

@Override
public void cleanup() throws ZinggClientException {
// TODO Auto-generated method stub

}



}
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,6 @@ protected Model getModel() throws ZinggClientException {
return model;
}


@Override
public void cleanup() throws ZinggClientException {
// TODO Auto-generated method stub

}

@Override
protected StopWordsRemover<SparkSession, Dataset<Row>, Row, Column, DataType> getStopWords() {
return new SparkStopWordsRemover(getContext(),getArgs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ public void init(Arguments args, String license) throws ZinggClientException {
getContext().init(license);
}

@Override
public void cleanup() throws ZinggClientException {
// TODO Auto-generated method stub

}

@Override
protected StopWordsRemover<SparkSession, Dataset<Row>, Row, Column, DataType> getStopWords() {
return new SparkStopWordsRemover(getContext(),getArgs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,7 @@ public void init(Arguments args, String license) throws ZinggClientException {
super.init(args, license);
getContext().init(license);
}


@Override
public void cleanup() throws ZinggClientException {
// TODO Auto-generated method stub

}

@Override
protected StopWordsRemover<SparkSession, Dataset<Row>, Row, Column, DataType> getStopWords() {
return new SparkStopWordsRemover(getContext(),getArgs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,28 @@
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;

import zingg.common.client.Arguments;
import zingg.common.client.IZingg;
import zingg.common.client.ZinggClientException;
import zingg.common.core.Context;
import zingg.common.core.executor.ZinggBase;
import zingg.spark.core.util.SparkBlockingTreeUtil;
import zingg.spark.core.util.SparkDSUtil;
import zingg.spark.core.util.SparkGraphUtil;
import zingg.spark.core.util.SparkHashUtil;
import zingg.spark.core.util.SparkModelUtil;
import zingg.spark.core.util.SparkPipeUtil;
import zingg.common.core.util.BlockingTreeUtil;
import zingg.common.core.util.DSUtil;
import zingg.common.core.util.GraphUtil;
import zingg.common.core.util.HashUtil;
import zingg.common.core.util.ModelUtil;
import zingg.common.core.util.PipeUtilBase;
import zingg.spark.core.util.SparkBlockingTreeUtil;
import zingg.spark.core.util.SparkDSUtil;
import zingg.spark.core.util.SparkGraphUtil;
import zingg.spark.core.util.SparkHashUtil;
import zingg.spark.core.util.SparkModelUtil;
import zingg.spark.core.util.SparkPipeUtil;


public class ZinggSparkContext implements Context<SparkSession, Dataset<Row>, Row,Column,DataType>{


protected JavaSparkContext ctx;
private static final long serialVersionUID = 1L;
protected JavaSparkContext ctx;
protected SparkSession spark;
protected PipeUtilBase<SparkSession, Dataset<Row>, Row, Column> pipeUtil;
protected HashUtil<SparkSession,Dataset<Row>, Row, Column, DataType> hashUtil;
Expand Down Expand Up @@ -76,6 +75,23 @@ public void init(String license)
}
}

@Override
public void cleanup() {
try {
if (ctx != null) {
ctx.stop();
}
if (spark != null) {
spark.stop();
}
ctx = null;
spark = null;
} catch (Exception e) {
// ignore any exception in cleanup
e.printStackTrace();
}
}

@Override
public void setUtils() {
setPipeUtil(new SparkPipeUtil(spark));
Expand Down

0 comments on commit 2c68ad9

Please sign in to comment.