This repository provides a set of data processing tasks for reading and transforming DataPackages.
The primary objective of this project is to implement routines that ingest DataPackages and convert them into various output formats, such as Elasticsearch indices, Parquet files, and Apache Iceberg tables.
At this stage, the project focuses on basic transformation from a DataPackage into a set of Elasticsearch indices.
This project uses the following dependencies:
- Apache Spark 3.5.1 for distributed and local data processing.
- Elasticsearch 7.11.2 for indexing and searching data.
To build the project, run:
mvn clean package verify -U
Note: All Apache Spark dependencies are marked as provided. You must ensure they are available at runtime, either locally or in your cluster environment.
This Spark job reads a DataPackage and transforms it into a set of Elasticsearch indices.
It begins by reading the datapackage.json descriptor and generating equivalent Elasticsearch mappings using ElasticsearchMappingBuilder.java. It then loads the tabular data files into the corresponding target indices.
Primary keys are used at ingestion time, a field defined as primary is later used as the document ID in Elasticsearch.
For example, a schema with "primaryKey": "assertionID"
will result in:
.option("es.mapping.id", "assertionID")
If the schema does not define a primary key, the job will ignore the es.mapping.id
option, and Elasticsearch will generate a unique ID for each document.
Foreign keys are interpreted basically to only allow search related documents in other indices by a field. For example, a schema with:
"foreignKeys": [
{
"fields": "basedOnOccurrenceID",
"reference": {
"resource": "occurrence",
"fields": "occurrenceID"
}
]
It Is transformed into an Alias field in Elasticsearch:
"occurrence_occurrenceID": {
"type": "alias",
"path": "basedOnOccurrenceID"
}
This allows using the field basedOnOccurrenceID
to search for documents in the occurrence
index by the occurrenceID
field.
By convention, it uses the _dp
postfix for indices and aliases.
Each index is named using the pattern:
`<schemaName>_dp_<datasetKey>`
Where:
-
schemaName comes from the resource schema defined in datapackage.json
-
datasetKey is a user-defined or generated identifier
And index will point to an alias <schemaName>_dp
that will be used to query the data.
The schema names are derived from the DataPackage schemas, which is defined in the datapackage.json
file.
Given the following datapackage.json
file:
{
"profile": "tabular-data-package",
"name": "occurrence",
"title": "Occurrence Example",
"resources": [
{
"profile": "tabular-data-resource",
"name": "occurrence-assertion",
"path": "occurrence-assertion.tsv",
"format": "tsv",
"mediatype": "text/tab-separated-values",
"schema": {
"name": "occurrence-assertion",
...
}
}
]
}
And a datasetKey value of abcd-1234
, the Spark job will create:
-
Index:
occurrence-assertion_dp_abcd-1234
-
Alias:
occurrence-assertion_dp
(the index will point to this alias).
This project has been test using the data packages available in the GBIF Darwin Core DataPackage Examples
Ensure you have an Elasticsearch instance running locally on port 9200. You can use Docker to quickly set up an Elasticsearch instance:
docker pull docker.elastic.co/elasticsearch/elasticsearch:7.11.2
docker run --name es711 -p 9200:9200 -p 9300:9300 --ulimit nofile=65536:65536 --security-opt seccomp=unconfined -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.11.2
You can download Apache Spark from the official website.
Download Spark 3.5.1 with Hadoop 3.3 and extract it to a directory of your choice.
Set the SPARK_HOME
environment variable to point to the Spark installation directory:
First, build the project using the command mentioned above. Then, you can run the job using the following command:
java --add-exports java.base/sun.util.calendar=ALL-UNNAMED \
--add-exports java.base/sun.nio.ch=ALL-UNNAMED \
--add-opens java.base/java.io=ALL-UNNAMED \
-cp "$SPARK_HOME/jars/*:target/datapackages-pipeline-1.0-SNAPSHOT-shaded.jar" org.gbif.dp.elasticsearch.spark.DataPackageToElasticsearch \
abcd-1234 /dwc-dp-examples/organism_interaction/inaturalist/output_data/datapackage.json http://localhost:9200 LOCAL