Skip to content

Commit

Permalink
Merge pull request #472 from hbz/462-sigilProcessNew
Browse files Browse the repository at this point in the history
462 sigel process new
  • Loading branch information
TobiasNx committed Aug 18, 2023
2 parents b210a04 + f4352f2 commit e0b99ab
Show file tree
Hide file tree
Showing 30 changed files with 158 additions and 1,379 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ app/transformation/input/*.csv
.cache*
/bin/
application-log*.gz
app/transformation/input/*.dat
6 changes: 3 additions & 3 deletions app/controllers/Accept.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ enum Format {
N_TRIPLE("nt", "application/n-triples", "text/plain"), //
TURTLE("ttl", "text/turtle", "application/x-turtle");

String[] types;
String queryParamString;
final String[] types;
final String queryParamString;

private Format(String format, String... types) {
Format(String format, String... types) {
this.queryParamString = format;
this.types = types;
}
Expand Down
11 changes: 6 additions & 5 deletions app/controllers/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.index.query.GeoPolygonQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.sort.SortParseElement;

Expand Down Expand Up @@ -510,7 +511,7 @@ private static String defaultFields() {

private static String searchQueryResult(String q, String location, int from,
int size, String aggregations) {
String result = null;
String result;
if (location == null || location.isEmpty()) {
result = buildSimpleQuery(q, from, size, aggregations);
} else {
Expand Down Expand Up @@ -619,8 +620,8 @@ static String[] defaultAggregations() {

private static String returnAsJson(SearchResponse queryResponse) {
List<Map<String, Object>> hits =
Arrays.asList(queryResponse.getHits().hits()).stream()
.map(hit -> hit.getSource()).collect(Collectors.toList());
Arrays.stream(queryResponse.getHits().hits())
.map(SearchHit::getSource).collect(Collectors.toList());
ObjectNode object = Json.newObject();
object.put("@context",
CONFIG.getString("host") + routes.Application.context());
Expand Down Expand Up @@ -714,8 +715,8 @@ private static Result resultFor(String id, JsonNode json, String format) {

private static Pair<String, String> contentAndType(JsonNode responseJson,
String responseFormat) {
String content = "";
String contentType = "";
String content;
String contentType;
switch (responseFormat) {
case "rdf": {
content = RdfConverter.toRdf(responseJson.toString(), RdfFormat.RDF_XML);
Expand Down
44 changes: 20 additions & 24 deletions app/controllers/Index.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ public ConfigurableNode(Settings settings,
}
}

private static Settings clientSettings =
private final static Settings clientSettings =
Settings.settingsBuilder().put("path.home", ".")
.put("http.port", Application.CONFIG.getString("index.es.port.http"))
.put("transport.tcp.port",
Application.CONFIG.getString("index.es.port.tcp"))
.put("script.default_lang", "native").build();

private static Node node = new ConfigurableNode(
private final static Node node = new ConfigurableNode(
nodeBuilder().settings(clientSettings).local(true).getSettings().build(),
Arrays.asList(BundlePlugin.class, LocationAggregation.class, Zero.class))
.start();
Expand Down Expand Up @@ -112,8 +112,8 @@ public static void initialize(String pathToJson) throws IOException {
long minimumSize =
Long.parseLong(Application.CONFIG.getString("index.file.minsize"));
if (new File(pathToJson).length() >= minimumSize) {
createEmptyIndex(CLIENT, INDEX_NAME, "conf/index-settings.json");
indexData(CLIENT, pathToJson, INDEX_NAME);
createEmptyIndex();
indexData(pathToJson);
} else {
throw new IllegalArgumentException(
"File not large enough: " + pathToJson);
Expand Down Expand Up @@ -184,40 +184,36 @@ private static SearchRequestBuilder withAggregations(
return searchRequest;
}

static void createEmptyIndex(final Client aClient, final String aIndexName,
final String aPathToIndexSettings) throws IOException {
deleteIndex(aClient, aIndexName);
static void createEmptyIndex() throws IOException {
deleteIndex(Index.CLIENT, Index.INDEX_NAME);
CreateIndexRequestBuilder cirb =
aClient.admin().indices().prepareCreate(aIndexName);
if (aPathToIndexSettings != null) {
String settingsMappings = Files.lines(Paths.get(aPathToIndexSettings))
.collect(Collectors.joining());
cirb.setSource(settingsMappings);
}
Index.CLIENT.admin().indices().prepareCreate(Index.INDEX_NAME);
String settingsMappings = Files.lines(Paths.get("conf/index-settings.json"))
.collect(Collectors.joining());
cirb.setSource(settingsMappings);
cirb.execute().actionGet();
aClient.admin().indices().refresh(new RefreshRequest()).actionGet();
Index.CLIENT.admin().indices().refresh(new RefreshRequest()).actionGet();
}

static void indexData(final Client aClient, final String aPath,
final String aIndex) throws IOException {
final BulkRequestBuilder bulkRequest = aClient.prepareBulk();
static void indexData(final String aPath) throws IOException {
final BulkRequestBuilder bulkRequest = Index.CLIENT.prepareBulk();
try (BufferedReader br =
new BufferedReader(new InputStreamReader(new FileInputStream(aPath),
new BufferedReader(new InputStreamReader(Files.newInputStream(Paths.get(aPath)),
StandardCharsets.UTF_8))) {
readData(bulkRequest, br, aClient, aIndex);
readData(bulkRequest, br);
}
bulkRequest.execute().actionGet();
aClient.admin().indices().refresh(new RefreshRequest()).actionGet();
Index.CLIENT.admin().indices().refresh(new RefreshRequest()).actionGet();
}

private static void readData(final BulkRequestBuilder bulkRequest,
final BufferedReader br, final Client client, final String aIndex)
final BufferedReader br)
throws IOException {
final ObjectMapper mapper = new ObjectMapper();
String line;
int currentLine = 1;
String organisationData = null;
String[] idUriParts = null;
String organisationData;
String[] idUriParts;
String organisationId = null;

// First line: index with id, second line: source
Expand All @@ -229,7 +225,7 @@ private static void readData(final BulkRequestBuilder bulkRequest,
organisationId = idUriParts[idUriParts.length - 1].replace("#!", "");
} else {
organisationData = line;
bulkRequest.add(client.prepareIndex(aIndex, INDEX_TYPE, organisationId)
bulkRequest.add(Index.CLIENT.prepareIndex(Index.INDEX_NAME, INDEX_TYPE, organisationId)
.setSource(organisationData));
}
currentLine++;
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/RdfConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class RdfConverter {
* RDF serialization formats.
*/
@SuppressWarnings("javadoc")
public static enum RdfFormat {
public enum RdfFormat {
RDF_XML("RDF/XML"), //
N_TRIPLE("N-TRIPLE"), //
TURTLE("TURTLE");
Expand Down
5 changes: 2 additions & 3 deletions app/controllers/Reconcile.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static Result reconcile() {

private static List<JsonNode> mapToResults(String mainQuery,
SearchHits searchHits) {
return Arrays.asList(searchHits.getHits()).stream().map(hit -> {
return Arrays.stream(searchHits.getHits()).map(hit -> {
Map<String, Object> map = hit.getSource();
ObjectNode resultForHit = Json.newObject();
resultForHit.put("id", hit.getId());
Expand All @@ -101,8 +101,7 @@ private static SearchResponse executeQuery(Entry<String, JsonNode> entry,
QueryBuilders.simpleQueryStringQuery(queryString);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().must(stringQuery)
.must(QueryBuilders.existsQuery("type"));
SearchResponse response = Index.executeQuery(0, limit, boolQuery, "");
return response;
return Index.executeQuery(0, limit, boolQuery, "");
}

private static String buildQueryString(Entry<String, JsonNode> entry) {
Expand Down
13 changes: 7 additions & 6 deletions app/transformation/CsvExport.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*/
public class CsvExport {

private JsonNode organisations;
private final JsonNode organisations;

/**
* @param json The organisations JSON data to export
Expand All @@ -35,23 +35,24 @@ public CsvExport(String json) {
* @return The data for the given fields in CSV format
*/
public String of(String fields) {
String csv = fields + "\n";
StringBuilder csv = new StringBuilder(fields + "\n");
for (Iterator<JsonNode> iter = organisations.elements(); iter.hasNext();) {
JsonNode org = iter.next();
csv += Arrays.asList(fields.split(",")).stream().map(field -> {
csv.append(Arrays.asList(fields.split(",")).stream().map(field -> {
try {
Object value = JsonPath.read(Configuration.defaultConfiguration()
.jsonProvider().parse(org.toString()), "$." + field);
return String.format("\"%s\"",
value.toString().replaceAll("\"", "\"\""));
} catch (PathNotFoundException x) {
}
catch (PathNotFoundException x) {
Logger.trace(x.getMessage());
// https://www.w3.org/TR/2015/REC-tabular-data-model-20151217/#empty-and-quoted-cells
return "";
}
}).collect(Collectors.joining(",")) + "\n";
}).collect(Collectors.joining(","))).append("\n");
}
return csv;
return csv.toString();
}

}
4 changes: 2 additions & 2 deletions app/transformation/GeoLookupMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ public class GeoLookupMap extends HashMap<String, String> {
Application.CONFIG.getString("transformation.geo.lookup.server");
private static final Double THRESHOLD =
Application.CONFIG.getDouble("transformation.geo.lookup.threshold");
private LookupType lookupType;
private final LookupType lookupType;

static enum LookupType {
enum LookupType {
LAT, LON
}

Expand Down
12 changes: 7 additions & 5 deletions app/transformation/TransformAll.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,12 @@ public static void process(String startOfUpdates, int intervalSize,
final String outputPath, String geoServer) throws IOException {
String dbsOutput = outputPath + "-dbs";
String sigelOutput = outputPath + "-sigel";
TransformSigel.process(startOfUpdates, intervalSize, sigelOutput,
geoServer);
TransformDbs.process(dbsOutput, geoServer);
TransformSigel.processBulk(sigelOutput, geoServer); //Start processing Sigel pica binary bulk.
TransformSigel.processUpdates(startOfUpdates, intervalSize, sigelOutput, geoServer); //Start process Sigel Pica XML Updates via OAI-PMH.
TransformDbs.process(dbsOutput, geoServer); //Start process DBS data.

// DBS-Data, Sigel Bulk and Updates are joined in a single ES-Bulk-file.
// DBS data first, so that ES prefers Sigel entries that come later and overwrite DBS entries if available.
try (FileWriter resultWriter = new FileWriter(outputPath)) {
writeAll(dbsOutput, resultWriter);
writeAll(sigelOutput, resultWriter);
Expand All @@ -73,10 +76,9 @@ private static void writeAll(String dbsOutput, FileWriter resultWriter)
}

static JsonToElasticsearchBulk esBulk() {
final JsonToElasticsearchBulk esBulk = new JsonToElasticsearchBulk("id",
return new JsonToElasticsearchBulk("id",
Application.CONFIG.getString("index.es.type"),
Application.CONFIG.getString("index.es.name"));
return esBulk;
}

static Metafix fixEnriched(String geoLookupServer) throws FileNotFoundException {
Expand Down
51 changes: 31 additions & 20 deletions app/transformation/TransformSigel.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.metafacture.json.JsonEncoder;
import org.metafacture.metafix.Metafix;
import org.metafacture.biblio.pica.PicaXmlHandler;
import org.metafacture.io.LineReader;
import org.metafacture.biblio.pica.PicaDecoder;
import org.metafacture.xml.XmlDecoder;
import org.metafacture.xml.XmlElementSplitter;
import org.metafacture.io.ObjectWriter;
Expand All @@ -38,29 +40,48 @@
*
*/
public class TransformSigel {

static final String DUMP_TOP_LEVEL_TAG = "collection";
static final String DUMP_ENTITY = "record";
static final String UPDATE_TOP_LEVEL_TAG = "harvest";
static final String DUMP_TOP_LEVEL_TAG = "collection";
static final String UPDATE_ENTITY = "metadata";
static final String XPATH =
"/*[local-name() = 'record']/*[local-name() = 'global']/*[local-name() = 'tag'][@id='008H']/*[local-name() = 'subf'][@id='e']";
static final String DUMP_XPATH = "/" + DUMP_TOP_LEVEL_TAG + "/" + XPATH;

static void process(String startOfUpdates, int intervalSize,
// This opens the pica binary bulk we have, transforms them and saves them as JSON ES Bulk.
static void processBulk(final String outputPath, String geoLookupServer) throws IOException {
final FileOpener dumpOpener = new FileOpener();
PicaDecoder picaDecoder = new PicaDecoder();
picaDecoder.setNormalizeUTF8(true);
JsonEncoder encodeJson = new JsonEncoder();
encodeJson.setPrettyPrinting(true);
dumpOpener//
.setReceiver(new LineReader())//
.setReceiver(picaDecoder)//
.setReceiver(new Metafix("conf/fix-sigel.fix"))//
.setReceiver(TransformAll.fixEnriched(geoLookupServer))//
.setReceiver(encodeJson)//
.setReceiver(TransformAll.esBulk())//
.setReceiver(new ObjectWriter<>(outputPath));
dumpOpener.process(TransformAll.DATA_INPUT_DIR + "sigel.dat");
dumpOpener.closeStream();
}

// This opens the updates and transforms them and appends them to the JSON ES Bulk of the bulk transformation.
static void processUpdates(String startOfUpdates, int intervalSize,
final String outputPath, String geoLookupServer) throws IOException {
splitUpSigelDump();
final FileOpener splitFileOpener = new FileOpener();
final FileOpener splitFileOpener = new FileOpener();
JsonEncoder encodeJson = new JsonEncoder();
encodeJson.setPrettyPrinting(true);
ObjectWriter objectWriter = new ObjectWriter<>(outputPath);
objectWriter.setAppendIfFileExists(true);
splitFileOpener//
.setReceiver(new XmlDecoder())//
.setReceiver(new PicaXmlHandler())//
.setReceiver(new Metafix("conf/fix-sigel.fix")) // Fix skips all records that have no "inr" and "isil"
.setReceiver(TransformAll.fixEnriched(geoLookupServer))//
.setReceiver(new Metafix("conf/fix-sigel.fix")) // Preprocess Sigel-Data and fix skips all records that have no "inr" and "isil"
.setReceiver(TransformAll.fixEnriched(geoLookupServer))// Process and enrich Sigel-Data.
.setReceiver(encodeJson)//
.setReceiver(TransformAll.esBulk())//
.setReceiver(new ObjectWriter<>(outputPath));
.setReceiver(objectWriter);
if (!startOfUpdates.isEmpty()) {
processSigelUpdates(startOfUpdates, intervalSize);
}
Expand All @@ -73,17 +94,6 @@ static void process(String startOfUpdates, int intervalSize,
splitFileOpener.closeStream();
}

private static void splitUpSigelDump() {
final FileOpener dumpFileOpener = new FileOpener();
dumpFileOpener//
.setReceiver(new XmlDecoder())//
.setReceiver(new XmlElementSplitter(DUMP_TOP_LEVEL_TAG, DUMP_ENTITY))//
.setReceiver(
xmlFilenameWriter(TransformAll.DATA_OUTPUT_DIR, DUMP_XPATH));
dumpFileOpener.process(TransformAll.DATA_INPUT_DIR + "sigel.xml");
dumpFileOpener.closeStream();
}

private static void processSigelUpdates(String startOfUpdates,
int intervalSize) {
int updateIntervals =
Expand All @@ -97,6 +107,7 @@ private static void processSigelUpdates(String startOfUpdates,
}
}


private static ArrayList<OaiPmhOpener> buildUpdatePipes(int intervalSize,
String startOfUpdates, int updateIntervals) {
String start = startOfUpdates;
Expand Down
2 changes: 1 addition & 1 deletion conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ index.es.port.tcp=7310

index.remote=[10.1.1.106,127.0.0.1]

transformation.updates.start="2013-06-01"
transformation.updates.start="2023-06-01"
transformation.updates.interval.size=50
transformation.geo.lookup.server="http://gaia.hbz-nrw.de:4000/v1/search"
transformation.geo.lookup.threshold=0.675
Expand Down
4 changes: 2 additions & 2 deletions test/controllers/AcceptIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public static Collection<Object[]> data() {
{ fakeRequest(GET, "/organisations/DE-38").withHeader("Accept", "application/n-triples"), /*->*/ "application/n-triples" }});
} // @formatter:on

private FakeRequest fakeRequest;
private String contentType;
private final FakeRequest fakeRequest;
private final String contentType;

public AcceptIntegrationTest(FakeRequest request, String contentType) {
this.fakeRequest = request;
Expand Down
6 changes: 3 additions & 3 deletions test/controllers/AcceptUnitTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ public static Collection<Object[]> data() {
{ fakeRequest().withHeader("Accept", "text/html"), "json", /*->*/ "json" }});
} // @formatter:on

private FakeRequest fakeRequest;
private String passedFormat;
private String expectedFormat;
private final FakeRequest fakeRequest;
private final String passedFormat;
private final String expectedFormat;

public AcceptUnitTest(FakeRequest request, String givenFormat,
String expectedFormat) {
Expand Down
9 changes: 3 additions & 6 deletions test/controllers/IntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,9 @@ public class IntegrationTest extends ElasticsearchTest {
@Test
public void testMainPage() {
running(testServer(3333, fakeApplication(inMemoryDatabase())), HTMLUNIT,
new Callback<TestBrowser>() {
@Override
public void invoke(TestBrowser browser) {
browser.goTo("http://localhost:3333/organisations");
assertThat(browser.pageSource()).contains("lobid-organisations");
}
browser -> {
browser.goTo("http://localhost:3333/organisations");
assertThat(browser.pageSource()).contains("lobid-organisations");
});
}

Expand Down
Loading

0 comments on commit e0b99ab

Please sign in to comment.