Skip to content

Commit

Permalink
feat: extraction of all artifacts #1534
Browse files Browse the repository at this point in the history
  • Loading branch information
bamthomas committed Sep 18, 2024
1 parent 419bd0c commit b742538
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public String toString() {
return algorithm;
}

public String toStringWithoutDash() {
return algorithm.replace("-", "");
}

public String hash(String message) {
return hash(message, DEFAULT_ENCODING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,34 @@
import static org.icij.datashare.cli.DatashareCliOptions.DEFAULT_DEFAULT_PROJECT;
import static org.icij.datashare.cli.DatashareCliOptions.DEFAULT_PROJECT_OPT;

public class ArtifactTask extends PipelineTask<Pair<String, String>> {
public class ArtifactTask extends PipelineTask<String> {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final Indexer indexer;
private final Project project;
private final Path artifactDir;
static Pair<String, String> POISON = new Pair<>("POISON", "POISON");

public ArtifactTask(DocumentCollectionFactory<Pair<String, String>> factory, Indexer indexer, PropertiesProvider propertiesProvider, @Assisted Task<Long> taskView, @Assisted final Function<Double, Void> updateCallback) {
super(Stage.ARTIFACT, taskView.getUser(), factory, propertiesProvider, (Class<Pair<String, String>>)(Object)Pair.class);
public ArtifactTask(DocumentCollectionFactory<String> factory, Indexer indexer, PropertiesProvider propertiesProvider, @Assisted Task<Long> taskView, @Assisted final Function<Double, Void> updateCallback) {
super(Stage.ARTIFACT, taskView.getUser(), factory, propertiesProvider, String.class);
this.indexer = indexer;
project = Project.project(ofNullable((String)taskView.args.get(DEFAULT_PROJECT_OPT)).orElse(DEFAULT_DEFAULT_PROJECT));
artifactDir = Path.of(ofNullable((String)taskView.args.get(ARTIFACT_DIR_OPT)).orElseThrow(() -> new IllegalArgumentException(String.format("cannot create artifact task with empty %s", ARTIFACT_DIR_OPT))));
project = Project.project(propertiesProvider.get(DEFAULT_PROJECT_OPT).orElse(DEFAULT_DEFAULT_PROJECT));
artifactDir = Path.of(propertiesProvider.get(ARTIFACT_DIR_OPT).orElseThrow(() -> new IllegalArgumentException(String.format("cannot create artifact task with empty %s", ARTIFACT_DIR_OPT))));
}

@Override
public Long call() throws Exception {
super.call();
logger.info("creating artifact cache in {} for project {} from queue {}", artifactDir, project, inputQueue.getName());
SourceExtractor extractor = new SourceExtractor(artifactDir.resolve(project.name));
SourceExtractor extractor = new SourceExtractor(propertiesProvider);
List<String> sourceExcludes = List.of("content", "content_translated");
Pair<String, String> docRef;
String docId;
long nbDocs = 0;
while (!(POISON.equals(docRef = inputQueue.poll(60, TimeUnit.SECONDS)))) {
while (!(STRING_POISON.equals(docId = inputQueue.poll(60, TimeUnit.SECONDS)))) {
try {
if (docRef != null) {
Document doc = indexer.get(project.name, docRef._1(), docRef._2(), sourceExcludes);
if (docId != null) {
Document doc = indexer.get(project.name, docId, sourceExcludes);
// we are getting a file input stream that is only created if we call the Supplier<InputStream>.get()
// so it is safe to ignore the return value, it will just create the file
extractor.getEmbeddedSource(project, doc);
extractor.extractEmbeddedSources(project, doc);
nbDocs++;
}
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.icij.datashare.PropertiesProvider;
import org.icij.datashare.asynctasks.Task;
import org.icij.datashare.extract.MemoryDocumentCollectionFactory;
import org.icij.datashare.function.Pair;
import org.icij.datashare.text.indexing.Indexer;
import org.icij.datashare.user.User;
import org.icij.extract.queue.DocumentQueue;
Expand All @@ -15,6 +14,7 @@
import org.mockito.Mock;

import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;

import static org.fest.assertions.Assertions.assertThat;
Expand All @@ -24,7 +24,7 @@ public class ArtifactTaskTest {
@Rule public TemporaryFolder artifactDir = new TemporaryFolder();
@Mock Indexer mockEs;
MockIndexer mockIndexer;
private final MemoryDocumentCollectionFactory<Pair<String, String>> factory = new MemoryDocumentCollectionFactory<>();
private final MemoryDocumentCollectionFactory<String> factory = new MemoryDocumentCollectionFactory<>();

@Test(expected = IllegalArgumentException.class)
public void test_missing_artifact_dir() {
Expand All @@ -34,14 +34,15 @@ public void test_missing_artifact_dir() {
@Test
public void test_create_artifact_cache_one_file() throws Exception {
Path path = Path.of(getClass().getResource("/docs/embedded_doc.eml").getPath());
mockIndexer.indexFile("prj", "6abb96950946b62bb993307c8945c0c096982783bab7fa24901522426840ca3e", path, "application/pdf", "embedded_doc");
String sha256 = "0f95ef97e4619f7bae2a585c6cf24587cd7a3a81a26599c8774d669e5c175e5e";
mockIndexer.indexFile("prj", sha256, path, "message/rfc822");

DocumentQueue<Pair<String, String>> queue = factory.createQueue("extract:queue:artifact", (Class<Pair<String, String>>)(Object)Pair.class);
queue.add(new Pair<>("6abb96950946b62bb993307c8945c0c096982783bab7fa24901522426840ca3e", "embedded_doc"));
queue.add(new Pair<>("POISON", "POISON"));
DocumentQueue<String> queue = factory.createQueue("extract:queue:artifact", String.class);
queue.add(sha256);
queue.add("POISON");

Long numberOfDocuments = new ArtifactTask(factory, mockEs, new PropertiesProvider(Map.of()),
new Task<>(ArtifactTask.class.getName(), User.local(), Map.of("artifactDir", artifactDir.getRoot().toString(), "defaultProject", "prj")), null)
Long numberOfDocuments = new ArtifactTask(factory, mockEs, new PropertiesProvider(Map.of("artifactDir", artifactDir.getRoot().toString(), "defaultProject", "prj")),
new Task<>(ArtifactTask.class.getName(), User.local(), new HashMap<>()), null)
.call();

assertThat(numberOfDocuments).isEqualTo(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ public MockIndexer(Indexer mockIndexer) {
this.mockIndexer = mockIndexer;
}

public void indexFile(String index, String _id, Path path, String contentType) {
indexFile(index, _id, path, contentType, null);
}
public void indexFile(String index, String _id, Path path, String contentType, String routing) {
Document document = DocumentBuilder.createDoc(_id)
.with(path)
Expand Down Expand Up @@ -48,6 +51,7 @@ public void indexFile(String index, Document rootDocument, Document document) {
public void indexFile(String index, Document document) {
List<String> sourceExcludes = List.of("content", "content_translated");
when(mockIndexer.get(index, document.getId())).thenReturn(document);
when(mockIndexer.get(index, document.getId(), sourceExcludes)).thenReturn(document);
when(mockIndexer.get(index, document.getId(), document.getId(), sourceExcludes)).thenReturn(document);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,10 @@ public class SourceExtractor {
private final boolean filterMetadata;
private final MetadataCleaner metadataCleaner = new MetadataCleaner();

public SourceExtractor(Path artifactDir) {
this(new PropertiesProvider(Map.of(DatashareCliOptions.ARTIFACT_DIR_OPT, artifactDir.toString())), false);
}

public SourceExtractor(PropertiesProvider propertiesProvider) {
this(propertiesProvider, false);
}

public SourceExtractor(Path artifactDir, boolean filterMetadata) {
this.propertiesProvider = new PropertiesProvider(Map.of(DatashareCliOptions.ARTIFACT_DIR_OPT, artifactDir.toString()));
this.filterMetadata = filterMetadata;
}

public SourceExtractor(PropertiesProvider propertiesProvider, boolean filterMetadata) {
this.propertiesProvider = propertiesProvider;
this.filterMetadata = filterMetadata;
Expand Down Expand Up @@ -83,16 +74,15 @@ public InputStream getSource(final Project project, final Document document) thr

public InputStream getEmbeddedSource(final Project project, final Document document) {
Hasher hasher = Hasher.valueOf(document.getId().length());
String algorithm = hasher.toString();
int i = 0;
List<DigestingParser.Digester> digesters = new ArrayList<>(List.of());
// Digester without project name
digesters.add(new CommonsDigester(20 * 1024 * 1024, algorithm.replace("-", "")));
// Digester without the project name
digesters.add(new CommonsDigester(20 * 1024 * 1024, hasher.toStringWithoutDash()));
// Digester with the project name
digesters.add(new UpdatableDigester(project.getId(), algorithm));
digesters.add(new UpdatableDigester(project.getId(), hasher.toString()));
// Digester with the project name set on "defaultProject" for retro-compatibility
if (mightUseLegacyDigester(document)) {
digesters.add(new UpdatableDigester(getDefaultProject(), algorithm));
digesters.add(new UpdatableDigester(getDefaultProject(), hasher.toString()));
}

// Try each digester to find embedded doc and ensure we
Expand All @@ -103,8 +93,8 @@ public InputStream getEmbeddedSource(final Project project, final Document docum

try {
EmbeddedDocumentExtractor embeddedExtractor = new EmbeddedDocumentExtractor(
digester, algorithm,
propertiesProvider.get(DatashareCliOptions.ARTIFACT_DIR_OPT).map(dir -> Path.of(dir).resolve(project.name)).orElse(null),false);
digester, hasher.toString(),
getArtifactPath(project),false);
TikaDocumentSource source = embeddedExtractor.extract(rootDocument, document.getId());
InputStream inputStream = source.get();
if (filterMetadata) {
Expand All @@ -115,13 +105,34 @@ public InputStream getEmbeddedSource(final Project project, final Document docum
LOGGER.debug("Extract attempt {}/{} for embedded document {}/{} failed (algorithm={}, digester={}, project={})",
++i, digesters.size(),
document.getId(), document.getRootDocument(),
algorithm, digester.getClass().getSimpleName(),
hasher, digester.getClass().getSimpleName(),
document.getProject(), ex);
}
}

throw new ContentNotFoundException(document.getRootDocument(), document.getId());
}

private Path getArtifactPath(Project project) {
return propertiesProvider.get(DatashareCliOptions.ARTIFACT_DIR_OPT).map(dir -> Path.of(dir).resolve(project.name)).orElse(null);
}

public void extractEmbeddedSources(final Project project, Document document) throws TikaException, IOException, SAXException {
Hasher hasher = Hasher.valueOf(document.getId().length());
DigestingParser.Digester digester = noDigestProject() ?
new CommonsDigester(20 * 1024 * 1024, hasher.toStringWithoutDash()):
new UpdatableDigester(project.getId(), hasher.toString());

Identifier identifier = new DigestIdentifier(hasher.toString(), Charset.defaultCharset());
TikaDocument tikaDocument = new DocumentFactory().withIdentifier(identifier).create(document.getPath());
EmbeddedDocumentExtractor embeddedExtractor = new EmbeddedDocumentExtractor(digester, hasher.toString(), getArtifactPath(project),false);
embeddedExtractor.extractAll(tikaDocument);
}

private boolean noDigestProject() {
return Boolean.parseBoolean(propertiesProvider.get(DatashareCliOptions.NO_DIGEST_PROJECT_OPT).orElse("true"));
}

private boolean mightUseLegacyDigester(Document document) {
return !isServerMode() && document.getExtractionLevel() > 0 && !document.getProject().name.equals(getDefaultProject());
}
Expand All @@ -133,6 +144,4 @@ private String getDefaultProject() {
private boolean isServerMode() {
return propertiesProvider.get("mode").orElse(Mode.SERVER.name()).equals((Mode.SERVER.name()));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.icij.spewer.FieldNames;
import org.icij.task.Options;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

Expand All @@ -29,19 +30,22 @@

import static java.nio.file.Paths.get;
import static org.fest.assertions.Assertions.assertThat;
import static org.icij.datashare.cli.DatashareCliOptions.ARTIFACT_DIR_OPT;
import static org.icij.datashare.cli.DatashareCliOptions.NO_DIGEST_PROJECT_OPT;
import static org.icij.datashare.test.ElasticsearchRule.TEST_INDEX;
import static org.icij.datashare.text.Project.project;

public class SourceExtractorTest {
@ClassRule static public TemporaryFolder tmpDir = new TemporaryFolder();
@ClassRule public static ElasticsearchRule es = new ElasticsearchRule();
@Rule public TemporaryFolder tmpDir = new TemporaryFolder();
@ClassRule
public static ElasticsearchRule es = new ElasticsearchRule();

@Test(expected = FileNotFoundException.class)
public void test_file_not_found() throws IOException {
File file = tmpDir.newFile("foo.bar");
Document document = DocumentBuilder.createDoc(project("project"), file.toPath()).build();
assertThat(file.delete()).isTrue();
new SourceExtractor(tmpDir.getRoot().toPath()).getSource(document);
new SourceExtractor(getPropertiesProvider()).getSource(document);
}

@Test(expected = EmbeddedDocumentExtractor.ContentNotFoundException.class)
Expand All @@ -54,7 +58,7 @@ public void test_content_not_found() {
.with(new HashMap<>())
.with(Document.Status.INDEXED)
.withContentLength(45L).build();
new SourceExtractor(new PropertiesProvider()).getEmbeddedSource(project("project"), document);
new SourceExtractor(getPropertiesProvider()).getEmbeddedSource(project("project"), document);
}

@Test
Expand All @@ -68,7 +72,7 @@ public void test_get_source_for_root_doc() throws IOException {
.with(Document.Status.INDEXED)
.withContentLength(45L).build();

InputStream source = new SourceExtractor(new PropertiesProvider()).getSource(document);
InputStream source = new SourceExtractor(getPropertiesProvider()).getSource(document);
assertThat(source).isNotNull();
assertThat(getBytes(source)).hasSize(70574);
}
Expand All @@ -84,8 +88,8 @@ public void test_get_source_for_doc_and_pdf_with_without_metadata() throws IOExc
.with(Document.Status.INDEXED)
.withContentLength(0L).build();

InputStream inputStreamWithMetadata = new SourceExtractor(new PropertiesProvider(), false).getSource(document);
InputStream inputStreamWithoutMetadata = new SourceExtractor(new PropertiesProvider(), true).getSource(document);
InputStream inputStreamWithMetadata = new SourceExtractor(getPropertiesProvider(), false).getSource(document);
InputStream inputStreamWithoutMetadata = new SourceExtractor(getPropertiesProvider(), true).getSource(document);
assertThat(inputStreamWithMetadata).isNotNull();
assertThat(inputStreamWithoutMetadata).isNotNull();
assertThat(getBytes(inputStreamWithMetadata).length).isEqualTo(9216);
Expand Down Expand Up @@ -118,13 +122,14 @@ public void test_get_source_for_embedded_doc_with_artifact_dir() throws Exceptio
Map<String, Object> stringProperties = Map.of(
"digestAlgorithm", Document.DEFAULT_DIGESTER.toString(),
"digestProjectName", TEST_INDEX,
ARTIFACT_DIR_OPT, tmpDir.getRoot().toString(),
"defaultProject", TEST_INDEX);
ElasticsearchIndexer elasticsearchIndexer = indexDocument(stringProperties, path, stringProperties);
Document attachedPdf = elasticsearchIndexer.
get(TEST_INDEX, "1bf2b6aa27dd8b45c7db58875004b8cb27a78ced5200b4976b63e351ebbae5ececb86076d90e156a7cdea06cde9573ca",
"f4078910c3e73a192e3a82d205f3c0bdb749c4e7b23c1d05a622db0f07d7f0ededb335abdb62aef41ace5d3cdb9298bc");

InputStream source = new SourceExtractor(tmpDir.getRoot().toPath()).getSource(project(TEST_INDEX), attachedPdf);
InputStream source = new SourceExtractor(new PropertiesProvider(stringProperties)).getSource(project(TEST_INDEX), attachedPdf);
assertThat(source).isNotNull();
assertThat(tmpDir.getRoot().toPath().resolve(TEST_INDEX).toFile()).isDirectory();
Path cachedArtifact = tmpDir.getRoot().toPath()
Expand Down Expand Up @@ -227,6 +232,39 @@ private static ElasticsearchIndexer createIndexer(String defaultProject) {
return new ElasticsearchIndexer(es.client, new PropertiesProvider(Map.of("defaultProject", defaultProject))).withRefresh(Refresh.True);
}

@Test
public void test_extract_embeds_for_doc() throws Exception {
Document document = DocumentBuilder.createDoc(project(TEST_INDEX),get(getClass().getResource("/docs/embedded_doc.eml").getPath()))
.with("it has been parsed")
.with(Language.FRENCH)
.with(Charset.defaultCharset())
.ofContentType("message/rfc822")
.with(new HashMap<>())
.with(Document.Status.INDEXED)
.withContentLength(45L).build();

new SourceExtractor(getPropertiesProvider()).extractEmbeddedSources(project(TEST_INDEX), document);
assertThat(tmpDir.getRoot().toPath().resolve(TEST_INDEX).toFile()).isDirectory();
assertThat(tmpDir.getRoot().toPath().resolve(TEST_INDEX).toFile().listFiles()).containsOnly(
tmpDir.getRoot().toPath().resolve(TEST_INDEX).resolve("1b").toFile());
}

@Test
public void test_extract_embeds_for_doc_with_no_digest_project_opt() throws Exception {
Document document = DocumentBuilder.createDoc(project(TEST_INDEX),get(getClass().getResource("/docs/embedded_doc.eml").getPath()))
.with("it has been parsed")
.with(Language.FRENCH)
.with(Charset.defaultCharset())
.ofContentType("message/rfc822")
.with(new HashMap<>())
.with(Document.Status.INDEXED)
.withContentLength(45L).build();

new SourceExtractor(getPropertiesProvider(true)).extractEmbeddedSources(project(TEST_INDEX), document);
assertThat(tmpDir.getRoot().toPath().resolve(TEST_INDEX).toFile().listFiles()).containsOnly(
tmpDir.getRoot().toPath().resolve(TEST_INDEX).resolve("75").toFile());
}

private byte[] getBytes(InputStream source) throws IOException {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int nbTmpBytesRead;
Expand All @@ -235,4 +273,15 @@ private byte[] getBytes(InputStream source) throws IOException {
}
return buffer.toByteArray();
}

private PropertiesProvider getPropertiesProvider() {
return getPropertiesProvider(false);
}

private PropertiesProvider getPropertiesProvider(boolean noDigestProject) {
return new PropertiesProvider(Map.of(
ARTIFACT_DIR_OPT, tmpDir.getRoot().toString(),
NO_DIGEST_PROJECT_OPT, String.valueOf(noDigestProject))
);
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
<guice.version>4.2.3</guice.version>
<amazon.version>1.12.411</amazon.version>

<extract.version>7.2.0</extract.version>
<extract.version>7.3.0</extract.version>
<opennlp.version>1.6.0</opennlp.version>
<elasticsearch.version>7.17.9</elasticsearch.version>
<lucene.version>8.11.1</lucene.version>
Expand Down

0 comments on commit b742538

Please sign in to comment.