Skip to content

Commit

Permalink
feat: add searchQuery parameter to the ArtifactTask #1534
Browse files Browse the repository at this point in the history
  • Loading branch information
bamthomas committed Sep 18, 2024
1 parent cc87ac3 commit d44f92c
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@

import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;

import java.util.Optional;
import java.util.function.Function;
import org.icij.datashare.Entity;
import org.icij.datashare.PropertiesProvider;
import org.icij.datashare.Stage;
import org.icij.datashare.asynctasks.Task;
import org.icij.datashare.extract.DocumentCollectionFactory;
import org.icij.datashare.text.Document;
import org.icij.datashare.text.ProjectProxy;
import org.icij.datashare.text.indexing.Indexer;
import org.icij.datashare.text.indexing.SearchQuery;
import org.icij.datashare.text.nlp.Pipeline;
import org.icij.extract.queue.DocumentQueue;
import org.slf4j.Logger;
Expand All @@ -27,9 +31,11 @@
import static org.icij.datashare.cli.DatashareCliOptions.NLP_PIPELINE_OPT;
import static org.icij.datashare.cli.DatashareCliOptions.SCROLL_DURATION_OPT;
import static org.icij.datashare.cli.DatashareCliOptions.SCROLL_SIZE_OPT;
import static org.icij.datashare.cli.DatashareCliOptions.SEARCH_QUERY_OPT;

public class EnqueueFromIndexTask extends PipelineTask<String> {
private final DocumentCollectionFactory<String> factory;
private final String searchQuery;
Logger logger = LoggerFactory.getLogger(getClass());
private final Pipeline.Type nlpPipeline;
private final String projectName;
Expand All @@ -47,15 +53,21 @@ public EnqueueFromIndexTask(final DocumentCollectionFactory<String> factory, fin
this.projectName = (String)taskView.args.getOrDefault(DEFAULT_PROJECT_OPT, DEFAULT_DEFAULT_PROJECT);
this.scrollDuration = propertiesProvider.get(SCROLL_DURATION_OPT).orElse(DEFAULT_SCROLL_DURATION);
this.scrollSize = parseInt(propertiesProvider.get(SCROLL_SIZE_OPT).orElse(String.valueOf(DEFAULT_SCROLL_SIZE)));

this.searchQuery = propertiesProvider.get(SEARCH_QUERY_OPT).orElse(null);
}

@Override
public Long call() throws Exception {
super.call();
Indexer.Searcher searcher = indexer.search(singletonList(projectName), Document.class)
.without(nlpPipeline).withSource("rootDocument").limit(scrollSize);
logger.info("resuming NLP name finding for index {} and {} with {} scroll and size of {} : {} documents found", projectName, nlpPipeline,
Indexer.Searcher searcher;
if (searchQuery == null) {
searcher = indexer.search(singletonList(projectName), Document.class)
.without(nlpPipeline).withSource("rootDocument").limit(scrollSize);
} else {
searcher = indexer.search(singletonList(projectName), Document.class, new SearchQuery(searchQuery))
.withoutSource("content", "contentTranslated").limit(scrollSize);
}
logger.info("enqueuing doc ids finding for index {} and {} with {} scroll and size of {} : {} documents found", projectName, nlpPipeline,
scrollDuration, scrollSize, searcher.totalHits());
List<? extends Entity> docsToProcess = searcher.scroll(scrollDuration).collect(toList());
long totalHits = searcher.totalHits();
Expand All @@ -66,7 +78,7 @@ public Long call() throws Exception {
docsToProcess = searcher.scroll(scrollDuration).collect(toList());
} while (!docsToProcess.isEmpty());
outputQueue.add(STRING_POISON);
logger.info("enqueued into {} {} files without {} pipeline tags", outputQueue.getName(), totalHits, nlpPipeline);
logger.info("enqueued into {} {} files", outputQueue.getName(), totalHits);
searcher.clearScroll();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.icij.datashare.cli.DatashareCliOptions.NLP_PIPELINE_OPT;
import static org.icij.datashare.test.ElasticsearchRule.TEST_INDEX;
import static org.icij.datashare.text.DocumentBuilder.createDoc;
import static org.icij.datashare.text.Project.project;

public class EnqueueFromIndexTaskTest {
@ClassRule
Expand All @@ -38,8 +39,30 @@ public void test_size_of_search() throws Exception {
"queueName", "test:queue",
NLP_PIPELINE_OPT, Pipeline.Type.OPENNLP.name());
MemoryDocumentCollectionFactory<String> factory = new MemoryDocumentCollectionFactory<>();
EnqueueFromIndexTask resumeNlpTask = new EnqueueFromIndexTask(factory, indexer, new Task<>(EnqueueFromIndexTask.class.getName(), new User("test"), properties), null);
resumeNlpTask.call();
EnqueueFromIndexTask enqueueFromIndex = new EnqueueFromIndexTask(factory, indexer, new Task<>(EnqueueFromIndexTask.class.getName(), new User("test"), properties), null);
enqueueFromIndex.call();
assertThat(factory.queues.get("test:queue:nlp")).hasSize(21); // with poison
}

@Test
public void test_with_query() throws Exception {
indexer.add(TEST_INDEX, createDoc("my_id").with("this is my precious doc")
.with(Pipeline.Type.CORENLP).with(project(TEST_INDEX)).build()); // because default is CORENLP so it should fail as of now
Map<String, Object> properties = Map.of(
"defaultProject", "test-datashare",
"stages", "ENQUEUEIDX",
"queueName", "test:queue",
"searchQuery", """
{
"match": {
"extractionLevel": 0
}
}
""");

MemoryDocumentCollectionFactory<String> factory = new MemoryDocumentCollectionFactory<>();
EnqueueFromIndexTask enqueueFromIndex = new EnqueueFromIndexTask(factory, indexer, new Task<>(EnqueueFromIndexTask.class.getName(), new User("test"), properties), null);
enqueueFromIndex.call();
assertThat(factory.queues.get("test:queue:nlp")).hasSize(2); // with poison
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ OptionParser createParser() {
DatashareCliOptions.digestProjectName(parser);
DatashareCliOptions.noDigestProject(parser);
DatashareCliOptions.logLevel(parser);
DatashareCliOptions.searchQuery(parser);
return parser;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public final class DatashareCliOptions {
public static final String VERSION_ABBR_OPT = "v";
public static final String VERSION_OPT = "version";
public static final String ARTIFACT_DIR_OPT = "artifactDir";
public static final String SEARCH_QUERY_OPT = "searchQuery";

private static final Path DEFAULT_DATASHARE_HOME = Paths.get(System.getProperty("user.home"), ".local/share/datashare");
private static final Integer DEFAULT_NLP_PARALLELISM = 1;
Expand Down Expand Up @@ -789,6 +790,12 @@ public static void oauthClaimIdAttribute(OptionParser parser) {
.ofType(String.class);
}

public static void searchQuery(OptionParser parser) {
parser.acceptsAll(singletonList(SEARCH_QUERY_OPT), "Json query for filtering index matches for EnqueueFromIndex task.")
.withRequiredArg()
.ofType(String.class);
}

public static ValueConverter<String> toAbsolute() {
return new ValueConverter<String>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,6 @@ public InputStream getEmbeddedSource(final Project project, final Document docum
throw new ContentNotFoundException(document.getRootDocument(), document.getId());
}

private Path getArtifactPath(Project project) {
return propertiesProvider.get(DatashareCliOptions.ARTIFACT_DIR_OPT).map(dir -> Path.of(dir).resolve(project.name)).orElse(null);
}

public void extractEmbeddedSources(final Project project, Document document) throws TikaException, IOException, SAXException {
Hasher hasher = Hasher.valueOf(document.getId().length());
DigestingParser.Digester digester = noDigestProject() ?
Expand All @@ -129,6 +125,10 @@ public void extractEmbeddedSources(final Project project, Document document) thr
embeddedExtractor.extractAll(tikaDocument);
}

private Path getArtifactPath(Project project) {
return propertiesProvider.get(DatashareCliOptions.ARTIFACT_DIR_OPT).map(dir -> Path.of(dir).resolve(project.name)).orElse(null);
}

private boolean noDigestProject() {
return Boolean.parseBoolean(propertiesProvider.get(DatashareCliOptions.NO_DIGEST_PROJECT_OPT).orElse("true"));
}
Expand Down

0 comments on commit d44f92c

Please sign in to comment.