diff --git a/datashare-api/src/main/java/org/icij/datashare/text/Hasher.java b/datashare-api/src/main/java/org/icij/datashare/text/Hasher.java index ae3775504..76e7bdae1 100644 --- a/datashare-api/src/main/java/org/icij/datashare/text/Hasher.java +++ b/datashare-api/src/main/java/org/icij/datashare/text/Hasher.java @@ -39,6 +39,9 @@ public String toString() { return algorithm; } + public String toStringWithoutDash() { + return algorithm.replace("-", ""); + } public String hash(String message) { return hash(message, DEFAULT_ENCODING); diff --git a/datashare-app/src/main/java/org/icij/datashare/tasks/ArtifactTask.java b/datashare-app/src/main/java/org/icij/datashare/tasks/ArtifactTask.java index 9fcdeb29e..a084227e2 100644 --- a/datashare-app/src/main/java/org/icij/datashare/tasks/ArtifactTask.java +++ b/datashare-app/src/main/java/org/icij/datashare/tasks/ArtifactTask.java @@ -23,35 +23,34 @@ import static org.icij.datashare.cli.DatashareCliOptions.DEFAULT_DEFAULT_PROJECT; import static org.icij.datashare.cli.DatashareCliOptions.DEFAULT_PROJECT_OPT; -public class ArtifactTask extends PipelineTask> { +public class ArtifactTask extends PipelineTask { private final Logger logger = LoggerFactory.getLogger(getClass()); private final Indexer indexer; private final Project project; private final Path artifactDir; - static Pair POISON = new Pair<>("POISON", "POISON"); - public ArtifactTask(DocumentCollectionFactory> factory, Indexer indexer, PropertiesProvider propertiesProvider, @Assisted Task taskView, @Assisted final Function updateCallback) { - super(Stage.ARTIFACT, taskView.getUser(), factory, propertiesProvider, (Class>)(Object)Pair.class); + public ArtifactTask(DocumentCollectionFactory factory, Indexer indexer, PropertiesProvider propertiesProvider, @Assisted Task taskView, @Assisted final Function updateCallback) { + super(Stage.ARTIFACT, taskView.getUser(), factory, propertiesProvider, String.class); this.indexer = indexer; - project = Project.project(ofNullable((String)taskView.args.get(DEFAULT_PROJECT_OPT)).orElse(DEFAULT_DEFAULT_PROJECT)); - artifactDir = Path.of(ofNullable((String)taskView.args.get(ARTIFACT_DIR_OPT)).orElseThrow(() -> new IllegalArgumentException(String.format("cannot create artifact task with empty %s", ARTIFACT_DIR_OPT)))); + project = Project.project(propertiesProvider.get(DEFAULT_PROJECT_OPT).orElse(DEFAULT_DEFAULT_PROJECT)); + artifactDir = Path.of(propertiesProvider.get(ARTIFACT_DIR_OPT).orElseThrow(() -> new IllegalArgumentException(String.format("cannot create artifact task with empty %s", ARTIFACT_DIR_OPT)))); } @Override public Long call() throws Exception { super.call(); logger.info("creating artifact cache in {} for project {} from queue {}", artifactDir, project, inputQueue.getName()); - SourceExtractor extractor = new SourceExtractor(artifactDir.resolve(project.name)); + SourceExtractor extractor = new SourceExtractor(propertiesProvider); List sourceExcludes = List.of("content", "content_translated"); - Pair docRef; + String docId; long nbDocs = 0; - while (!(POISON.equals(docRef = inputQueue.poll(60, TimeUnit.SECONDS)))) { + while (!(STRING_POISON.equals(docId = inputQueue.poll(60, TimeUnit.SECONDS)))) { try { - if (docRef != null) { - Document doc = indexer.get(project.name, docRef._1(), docRef._2(), sourceExcludes); + if (docId != null) { + Document doc = indexer.get(project.name, docId, sourceExcludes); // we are getting a file input stream that is only created if we call the Supplier.get() // so it is safe to ignore the return value, it will just create the file - extractor.getEmbeddedSource(project, doc); + extractor.extractEmbeddedSources(project, doc); nbDocs++; } } catch (Throwable e) { diff --git a/datashare-app/src/test/java/org/icij/datashare/tasks/ArtifactTaskTest.java b/datashare-app/src/test/java/org/icij/datashare/tasks/ArtifactTaskTest.java index 2125576ce..72593601d 100644 --- a/datashare-app/src/test/java/org/icij/datashare/tasks/ArtifactTaskTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/tasks/ArtifactTaskTest.java @@ -4,7 +4,6 @@ import org.icij.datashare.PropertiesProvider; import org.icij.datashare.asynctasks.Task; import org.icij.datashare.extract.MemoryDocumentCollectionFactory; -import org.icij.datashare.function.Pair; import org.icij.datashare.text.indexing.Indexer; import org.icij.datashare.user.User; import org.icij.extract.queue.DocumentQueue; @@ -15,6 +14,7 @@ import org.mockito.Mock; import java.nio.file.Path; +import java.util.HashMap; import java.util.Map; import static org.fest.assertions.Assertions.assertThat; @@ -24,7 +24,7 @@ public class ArtifactTaskTest { @Rule public TemporaryFolder artifactDir = new TemporaryFolder(); @Mock Indexer mockEs; MockIndexer mockIndexer; - private final MemoryDocumentCollectionFactory> factory = new MemoryDocumentCollectionFactory<>(); + private final MemoryDocumentCollectionFactory factory = new MemoryDocumentCollectionFactory<>(); @Test(expected = IllegalArgumentException.class) public void test_missing_artifact_dir() { @@ -34,14 +34,15 @@ public void test_missing_artifact_dir() { @Test public void test_create_artifact_cache_one_file() throws Exception { Path path = Path.of(getClass().getResource("/docs/embedded_doc.eml").getPath()); - mockIndexer.indexFile("prj", "6abb96950946b62bb993307c8945c0c096982783bab7fa24901522426840ca3e", path, "application/pdf", "embedded_doc"); + String sha256 = "0f95ef97e4619f7bae2a585c6cf24587cd7a3a81a26599c8774d669e5c175e5e"; + mockIndexer.indexFile("prj", sha256, path, "message/rfc822"); - DocumentQueue> queue = factory.createQueue("extract:queue:artifact", (Class>)(Object)Pair.class); - queue.add(new Pair<>("6abb96950946b62bb993307c8945c0c096982783bab7fa24901522426840ca3e", "embedded_doc")); - queue.add(new Pair<>("POISON", "POISON")); + DocumentQueue queue = factory.createQueue("extract:queue:artifact", String.class); + queue.add(sha256); + queue.add("POISON"); - Long numberOfDocuments = new ArtifactTask(factory, mockEs, new PropertiesProvider(Map.of()), - new Task<>(ArtifactTask.class.getName(), User.local(), Map.of("artifactDir", artifactDir.getRoot().toString(), "defaultProject", "prj")), null) + Long numberOfDocuments = new ArtifactTask(factory, mockEs, new PropertiesProvider(Map.of("artifactDir", artifactDir.getRoot().toString(), "defaultProject", "prj")), + new Task<>(ArtifactTask.class.getName(), User.local(), new HashMap<>()), null) .call(); assertThat(numberOfDocuments).isEqualTo(1); diff --git a/datashare-app/src/test/java/org/icij/datashare/tasks/MockIndexer.java b/datashare-app/src/test/java/org/icij/datashare/tasks/MockIndexer.java index d44b5e150..d9e434c7a 100644 --- a/datashare-app/src/test/java/org/icij/datashare/tasks/MockIndexer.java +++ b/datashare-app/src/test/java/org/icij/datashare/tasks/MockIndexer.java @@ -21,6 +21,9 @@ public MockIndexer(Indexer mockIndexer) { this.mockIndexer = mockIndexer; } + public void indexFile(String index, String _id, Path path, String contentType) { + indexFile(index, _id, path, contentType, null); + } public void indexFile(String index, String _id, Path path, String contentType, String routing) { Document document = DocumentBuilder.createDoc(_id) .with(path) @@ -48,6 +51,7 @@ public void indexFile(String index, Document rootDocument, Document document) { public void indexFile(String index, Document document) { List sourceExcludes = List.of("content", "content_translated"); when(mockIndexer.get(index, document.getId())).thenReturn(document); + when(mockIndexer.get(index, document.getId(), sourceExcludes)).thenReturn(document); when(mockIndexer.get(index, document.getId(), document.getId(), sourceExcludes)).thenReturn(document); } diff --git a/datashare-index/src/main/java/org/icij/datashare/text/indexing/elasticsearch/SourceExtractor.java b/datashare-index/src/main/java/org/icij/datashare/text/indexing/elasticsearch/SourceExtractor.java index e4c92317c..07b6c7119 100644 --- a/datashare-index/src/main/java/org/icij/datashare/text/indexing/elasticsearch/SourceExtractor.java +++ b/datashare-index/src/main/java/org/icij/datashare/text/indexing/elasticsearch/SourceExtractor.java @@ -42,19 +42,10 @@ public class SourceExtractor { private final boolean filterMetadata; private final MetadataCleaner metadataCleaner = new MetadataCleaner(); - public SourceExtractor(Path artifactDir) { - this(new PropertiesProvider(Map.of(DatashareCliOptions.ARTIFACT_DIR_OPT, artifactDir.toString())), false); - } - public SourceExtractor(PropertiesProvider propertiesProvider) { this(propertiesProvider, false); } - public SourceExtractor(Path artifactDir, boolean filterMetadata) { - this.propertiesProvider = new PropertiesProvider(Map.of(DatashareCliOptions.ARTIFACT_DIR_OPT, artifactDir.toString())); - this.filterMetadata = filterMetadata; - } - public SourceExtractor(PropertiesProvider propertiesProvider, boolean filterMetadata) { this.propertiesProvider = propertiesProvider; this.filterMetadata = filterMetadata; @@ -83,16 +74,15 @@ public InputStream getSource(final Project project, final Document document) thr public InputStream getEmbeddedSource(final Project project, final Document document) { Hasher hasher = Hasher.valueOf(document.getId().length()); - String algorithm = hasher.toString(); int i = 0; List digesters = new ArrayList<>(List.of()); - // Digester without project name - digesters.add(new CommonsDigester(20 * 1024 * 1024, algorithm.replace("-", ""))); + // Digester without the project name + digesters.add(new CommonsDigester(20 * 1024 * 1024, hasher.toStringWithoutDash())); // Digester with the project name - digesters.add(new UpdatableDigester(project.getId(), algorithm)); + digesters.add(new UpdatableDigester(project.getId(), hasher.toString())); // Digester with the project name set on "defaultProject" for retro-compatibility if (mightUseLegacyDigester(document)) { - digesters.add(new UpdatableDigester(getDefaultProject(), algorithm)); + digesters.add(new UpdatableDigester(getDefaultProject(), hasher.toString())); } // Try each digester to find embedded doc and ensure we @@ -103,8 +93,8 @@ public InputStream getEmbeddedSource(final Project project, final Document docum try { EmbeddedDocumentExtractor embeddedExtractor = new EmbeddedDocumentExtractor( - digester, algorithm, - propertiesProvider.get(DatashareCliOptions.ARTIFACT_DIR_OPT).map(dir -> Path.of(dir).resolve(project.name)).orElse(null),false); + digester, hasher.toString(), + getArtifactPath(project),false); TikaDocumentSource source = embeddedExtractor.extract(rootDocument, document.getId()); InputStream inputStream = source.get(); if (filterMetadata) { @@ -115,13 +105,34 @@ public InputStream getEmbeddedSource(final Project project, final Document docum LOGGER.debug("Extract attempt {}/{} for embedded document {}/{} failed (algorithm={}, digester={}, project={})", ++i, digesters.size(), document.getId(), document.getRootDocument(), - algorithm, digester.getClass().getSimpleName(), + hasher, digester.getClass().getSimpleName(), document.getProject(), ex); } } + throw new ContentNotFoundException(document.getRootDocument(), document.getId()); } + private Path getArtifactPath(Project project) { + return propertiesProvider.get(DatashareCliOptions.ARTIFACT_DIR_OPT).map(dir -> Path.of(dir).resolve(project.name)).orElse(null); + } + + public void extractEmbeddedSources(final Project project, Document document) throws TikaException, IOException, SAXException { + Hasher hasher = Hasher.valueOf(document.getId().length()); + DigestingParser.Digester digester = noDigestProject() ? + new CommonsDigester(20 * 1024 * 1024, hasher.toStringWithoutDash()): + new UpdatableDigester(project.getId(), hasher.toString()); + + Identifier identifier = new DigestIdentifier(hasher.toString(), Charset.defaultCharset()); + TikaDocument tikaDocument = new DocumentFactory().withIdentifier(identifier).create(document.getPath()); + EmbeddedDocumentExtractor embeddedExtractor = new EmbeddedDocumentExtractor(digester, hasher.toString(), getArtifactPath(project),false); + embeddedExtractor.extractAll(tikaDocument); + } + + private boolean noDigestProject() { + return Boolean.parseBoolean(propertiesProvider.get(DatashareCliOptions.NO_DIGEST_PROJECT_OPT).orElse("true")); + } + private boolean mightUseLegacyDigester(Document document) { return !isServerMode() && document.getExtractionLevel() > 0 && !document.getProject().name.equals(getDefaultProject()); } @@ -133,6 +144,4 @@ private String getDefaultProject() { private boolean isServerMode() { return propertiesProvider.get("mode").orElse(Mode.SERVER.name()).equals((Mode.SERVER.name())); } - - } \ No newline at end of file diff --git a/datashare-index/src/test/java/org/icij/datashare/text/indexing/elasticsearch/SourceExtractorTest.java b/datashare-index/src/test/java/org/icij/datashare/text/indexing/elasticsearch/SourceExtractorTest.java index e25af7e03..0ff736d45 100644 --- a/datashare-index/src/test/java/org/icij/datashare/text/indexing/elasticsearch/SourceExtractorTest.java +++ b/datashare-index/src/test/java/org/icij/datashare/text/indexing/elasticsearch/SourceExtractorTest.java @@ -14,6 +14,7 @@ import org.icij.spewer.FieldNames; import org.icij.task.Options; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -29,19 +30,22 @@ import static java.nio.file.Paths.get; import static org.fest.assertions.Assertions.assertThat; +import static org.icij.datashare.cli.DatashareCliOptions.ARTIFACT_DIR_OPT; +import static org.icij.datashare.cli.DatashareCliOptions.NO_DIGEST_PROJECT_OPT; import static org.icij.datashare.test.ElasticsearchRule.TEST_INDEX; import static org.icij.datashare.text.Project.project; public class SourceExtractorTest { - @ClassRule static public TemporaryFolder tmpDir = new TemporaryFolder(); - @ClassRule public static ElasticsearchRule es = new ElasticsearchRule(); + @Rule public TemporaryFolder tmpDir = new TemporaryFolder(); + @ClassRule + public static ElasticsearchRule es = new ElasticsearchRule(); @Test(expected = FileNotFoundException.class) public void test_file_not_found() throws IOException { File file = tmpDir.newFile("foo.bar"); Document document = DocumentBuilder.createDoc(project("project"), file.toPath()).build(); assertThat(file.delete()).isTrue(); - new SourceExtractor(tmpDir.getRoot().toPath()).getSource(document); + new SourceExtractor(getPropertiesProvider()).getSource(document); } @Test(expected = EmbeddedDocumentExtractor.ContentNotFoundException.class) @@ -54,7 +58,7 @@ public void test_content_not_found() { .with(new HashMap<>()) .with(Document.Status.INDEXED) .withContentLength(45L).build(); - new SourceExtractor(new PropertiesProvider()).getEmbeddedSource(project("project"), document); + new SourceExtractor(getPropertiesProvider()).getEmbeddedSource(project("project"), document); } @Test @@ -68,7 +72,7 @@ public void test_get_source_for_root_doc() throws IOException { .with(Document.Status.INDEXED) .withContentLength(45L).build(); - InputStream source = new SourceExtractor(new PropertiesProvider()).getSource(document); + InputStream source = new SourceExtractor(getPropertiesProvider()).getSource(document); assertThat(source).isNotNull(); assertThat(getBytes(source)).hasSize(70574); } @@ -84,8 +88,8 @@ public void test_get_source_for_doc_and_pdf_with_without_metadata() throws IOExc .with(Document.Status.INDEXED) .withContentLength(0L).build(); - InputStream inputStreamWithMetadata = new SourceExtractor(new PropertiesProvider(), false).getSource(document); - InputStream inputStreamWithoutMetadata = new SourceExtractor(new PropertiesProvider(), true).getSource(document); + InputStream inputStreamWithMetadata = new SourceExtractor(getPropertiesProvider(), false).getSource(document); + InputStream inputStreamWithoutMetadata = new SourceExtractor(getPropertiesProvider(), true).getSource(document); assertThat(inputStreamWithMetadata).isNotNull(); assertThat(inputStreamWithoutMetadata).isNotNull(); assertThat(getBytes(inputStreamWithMetadata).length).isEqualTo(9216); @@ -118,13 +122,14 @@ public void test_get_source_for_embedded_doc_with_artifact_dir() throws Exceptio Map stringProperties = Map.of( "digestAlgorithm", Document.DEFAULT_DIGESTER.toString(), "digestProjectName", TEST_INDEX, + ARTIFACT_DIR_OPT, tmpDir.getRoot().toString(), "defaultProject", TEST_INDEX); ElasticsearchIndexer elasticsearchIndexer = indexDocument(stringProperties, path, stringProperties); Document attachedPdf = elasticsearchIndexer. get(TEST_INDEX, "1bf2b6aa27dd8b45c7db58875004b8cb27a78ced5200b4976b63e351ebbae5ececb86076d90e156a7cdea06cde9573ca", "f4078910c3e73a192e3a82d205f3c0bdb749c4e7b23c1d05a622db0f07d7f0ededb335abdb62aef41ace5d3cdb9298bc"); - InputStream source = new SourceExtractor(tmpDir.getRoot().toPath()).getSource(project(TEST_INDEX), attachedPdf); + InputStream source = new SourceExtractor(new PropertiesProvider(stringProperties)).getSource(project(TEST_INDEX), attachedPdf); assertThat(source).isNotNull(); assertThat(tmpDir.getRoot().toPath().resolve(TEST_INDEX).toFile()).isDirectory(); Path cachedArtifact = tmpDir.getRoot().toPath() @@ -227,6 +232,39 @@ private static ElasticsearchIndexer createIndexer(String defaultProject) { return new ElasticsearchIndexer(es.client, new PropertiesProvider(Map.of("defaultProject", defaultProject))).withRefresh(Refresh.True); } + @Test + public void test_extract_embeds_for_doc() throws Exception { + Document document = DocumentBuilder.createDoc(project(TEST_INDEX),get(getClass().getResource("/docs/embedded_doc.eml").getPath())) + .with("it has been parsed") + .with(Language.FRENCH) + .with(Charset.defaultCharset()) + .ofContentType("message/rfc822") + .with(new HashMap<>()) + .with(Document.Status.INDEXED) + .withContentLength(45L).build(); + + new SourceExtractor(getPropertiesProvider()).extractEmbeddedSources(project(TEST_INDEX), document); + assertThat(tmpDir.getRoot().toPath().resolve(TEST_INDEX).toFile()).isDirectory(); + assertThat(tmpDir.getRoot().toPath().resolve(TEST_INDEX).toFile().listFiles()).containsOnly( + tmpDir.getRoot().toPath().resolve(TEST_INDEX).resolve("1b").toFile()); + } + + @Test + public void test_extract_embeds_for_doc_with_no_digest_project_opt() throws Exception { + Document document = DocumentBuilder.createDoc(project(TEST_INDEX),get(getClass().getResource("/docs/embedded_doc.eml").getPath())) + .with("it has been parsed") + .with(Language.FRENCH) + .with(Charset.defaultCharset()) + .ofContentType("message/rfc822") + .with(new HashMap<>()) + .with(Document.Status.INDEXED) + .withContentLength(45L).build(); + + new SourceExtractor(getPropertiesProvider(true)).extractEmbeddedSources(project(TEST_INDEX), document); + assertThat(tmpDir.getRoot().toPath().resolve(TEST_INDEX).toFile().listFiles()).containsOnly( + tmpDir.getRoot().toPath().resolve(TEST_INDEX).resolve("75").toFile()); + } + private byte[] getBytes(InputStream source) throws IOException { ByteArrayOutputStream buffer = new ByteArrayOutputStream(); int nbTmpBytesRead; @@ -235,4 +273,15 @@ private byte[] getBytes(InputStream source) throws IOException { } return buffer.toByteArray(); } + + private PropertiesProvider getPropertiesProvider() { + return getPropertiesProvider(false); + } + + private PropertiesProvider getPropertiesProvider(boolean noDigestProject) { + return new PropertiesProvider(Map.of( + ARTIFACT_DIR_OPT, tmpDir.getRoot().toString(), + NO_DIGEST_PROJECT_OPT, String.valueOf(noDigestProject)) + ); + } } diff --git a/pom.xml b/pom.xml index ee1d9b5e6..bd51af1dc 100644 --- a/pom.xml +++ b/pom.xml @@ -88,7 +88,7 @@ 4.2.3 1.12.411 - 7.2.0 + 7.3.0 1.6.0 7.17.9 8.11.1