Skip to content

Commit

Permalink
Merge pull request #10781 from IQSS/10623-globus-improvements
Browse files Browse the repository at this point in the history
Improved handling of Globus uploads (experimental async framework)
  • Loading branch information
pdurbin committed Sep 25, 2024
2 parents ea02478 + 682c89f commit d40ce32
Show file tree
Hide file tree
Showing 27 changed files with 1,146 additions and 313 deletions.
1 change: 1 addition & 0 deletions doc/release-notes/10623-globus-improvements.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
A new alternative implementation of Globus polling during upload data transfers has been added in this release. This experimental framework does not rely on the instance staying up continuously for the duration of the transfer and saves the state information about Globus upload requests in the database. See `globus-use-experimental-async-framework` under [Feature Flags](https://dataverse-guide--10781.org.readthedocs.build/en/10781/installation/config.html#feature-flags) and [dataverse.files.globus-monitoring-server](https://dataverse-guide--10781.org.readthedocs.build/en/10781/installation/config.html#dataverse-files-globus-monitoring-server) in the Installation Guide. See also #10623 and #10781.
2 changes: 2 additions & 0 deletions doc/sphinx-guides/source/developers/big-data-support.rst
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,5 @@ As described in that document, Globus transfers can be initiated by choosing the
An overview of the control and data transfer interactions between components was presented at the 2022 Dataverse Community Meeting and can be viewed in the `Integrations and Tools Session Video <https://youtu.be/3ek7F_Dxcjk?t=5289>`_ around the 1 hr 28 min mark.

See also :ref:`Globus settings <:GlobusSettings>`.

An alternative, experimental implementation of Globus polling of ongoing upload transfers has been added in v6.4. This framework does not rely on the instance staying up continuously for the duration of the transfer and saves the state information about Globus upload requests in the database. Due to its experimental nature it is not enabled by default. See the ``globus-use-experimental-async-framework`` feature flag (see :ref:`feature-flags`) and the JVM option :ref:`dataverse.files.globus-monitoring-server`.
2 changes: 2 additions & 0 deletions doc/sphinx-guides/source/developers/globus-api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ As the transfer can take significant time and the API call is asynchronous, the

Once the transfer completes, Dataverse will remove the write permission for the principal.

An alternative, experimental implementation of Globus polling of ongoing upload transfers has been added in v6.4. This new framework does not rely on the instance staying up continuously for the duration of the transfer and saves the state information about Globus upload requests in the database. Due to its experimental nature it is not enabled by default. See the ``globus-use-experimental-async-framework`` feature flag (see :ref:`feature-flags`) and the JVM option :ref:`dataverse.files.globus-monitoring-server`.

Note that when using a managed endpoint that uses the Globus S3 Connector, the checksum should be correct as Dataverse can validate it. For file-based endpoints, the checksum should be included if available but Dataverse cannot verify it.

In the remote/reference case, where there is no transfer to monitor, the standard /addFiles API call (see :ref:`direct-add-to-dataset-api`) is used instead. There are no changes for the Globus case.
Expand Down
16 changes: 14 additions & 2 deletions doc/sphinx-guides/source/installation/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3312,6 +3312,13 @@ The email for your institution that you'd like to appear in bag-info.txt. See :r

Can also be set via *MicroProfile Config API* sources, e.g. the environment variable ``DATAVERSE_BAGIT_SOURCEORG_EMAIL``.

.. _dataverse.files.globus-monitoring-server:

dataverse.files.globus-monitoring-server
++++++++++++++++++++++++++++++++++++++++

This setting is required in conjunction with the ``globus-use-experimental-async-framework`` feature flag (see :ref:`feature-flags`). Setting it to true designates the Dataverse instance to serve as the dedicated polling server. It is needed so that the new framework can be used in a multi-node installation.

.. _feature-flags:

Feature Flags
Expand Down Expand Up @@ -3348,7 +3355,10 @@ please find all known feature flags below. Any of these flags can be activated u
- Removes the reason field in the `Publish/Return To Author` dialog that was added as a required field in v6.2 and makes the reason an optional parameter in the :ref:`return-a-dataset` API call.
- ``Off``
* - disable-dataset-thumbnail-autoselect
- Turns off automatic selection of a dataset thumbnail from image files in that dataset. When set to ``On``, a user can still manually pick a thumbnail image, or upload a dedicated thumbnail image.
- Turns off automatic selection of a dataset thumbnail from image files in that dataset. When set to ``On``, a user can still manually pick a thumbnail image or upload a dedicated thumbnail image.
- ``Off``
* - globus-use-experimental-async-framework
- Activates a new experimental implementation of Globus polling of ongoing remote data transfers that does not rely on the instance staying up continuously for the duration of the transfers and saves the state information about Globus upload requests in the database. Added in v6.4. Affects :ref:`:GlobusPollingInterval`. Note that the JVM option :ref:`dataverse.files.globus-monitoring-server` described above must also be enabled on one (and only one, in a multi-node installation) Dataverse instance.
- ``Off``

**Note:** Feature flags can be set via any `supported MicroProfile Config API source`_, e.g. the environment variable
Expand Down Expand Up @@ -4828,10 +4838,12 @@ The list of parent dataset field names for which the LDN Announce workflow step

The URL where the `dataverse-globus <https://github.com/scholarsportal/dataverse-globus>`_ "transfer" app has been deployed to support Globus integration. See :ref:`globus-support` for details.

.. _:GlobusPollingInterval:

:GlobusPollingInterval
++++++++++++++++++++++

The interval in seconds between Dataverse calls to Globus to check on upload progress. Defaults to 50 seconds. See :ref:`globus-support` for details.
The interval in seconds between Dataverse calls to Globus to check on upload progress. Defaults to 50 seconds (or to 10 minutes, when the ``globus-use-experimental-async-framework`` feature flag is enabled). See :ref:`globus-support` for details.

:GlobusSingleFileTransfer
+++++++++++++++++++++++++
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/edu/harvard/iq/dataverse/DatasetServiceBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,20 @@ public boolean checkDatasetLock(Long datasetId) {
List<DatasetLock> lock = lockCounter.getResultList();
return lock.size()>0;
}


public List<DatasetLock> getLocksByDatasetId(Long datasetId) {
TypedQuery<DatasetLock> locksQuery = em.createNamedQuery("DatasetLock.getLocksByDatasetId", DatasetLock.class);
locksQuery.setParameter("datasetId", datasetId);
return locksQuery.getResultList();
}

public List<DatasetLock> getDatasetLocksByUser( AuthenticatedUser user) {

return listLocks(null, user);
}

// @todo: we'll be better off getting rid of this method and using the other
// version of addDatasetLock() (that uses datasetId instead of Dataset).
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public DatasetLock addDatasetLock(Dataset dataset, DatasetLock lock) {
lock.setDataset(dataset);
Expand Down Expand Up @@ -467,6 +475,7 @@ public DatasetLock addDatasetLock(Long datasetId, DatasetLock.Reason reason, Lon
* is {@code aReason}.
* @param dataset the dataset whose locks (for {@code aReason}) will be removed.
* @param aReason The reason of the locks that will be removed.
* @todo this should probably take dataset_id, not a dataset
*/
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public void removeDatasetLocks(Dataset dataset, DatasetLock.Reason aReason) {
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/edu/harvard/iq/dataverse/EditDatafilesPage.java
Original file line number Diff line number Diff line change
Expand Up @@ -2127,8 +2127,12 @@ public void handleFileUpload(FileUploadEvent event) throws IOException {
}

/**
* Using information from the DropBox choose, ingest the chosen files
* https://www.dropbox.com/developers/dropins/chooser/js
* External, aka "Direct" Upload.
* The file(s) have been uploaded to physical storage (such as S3) directly,
* this call is to create and add the DataFiles to the Dataset on the Dataverse
* side. The method does NOT finalize saving the datafiles in the database -
* that will happen when the user clicks 'Save', similar to how the "normal"
* uploads are handled.
*
* @param event
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package edu.harvard.iq.dataverse;

import jakarta.persistence.Column;
import jakarta.persistence.Index;
import jakarta.persistence.NamedQueries;
import jakarta.persistence.NamedQuery;
import jakarta.persistence.Table;
import java.io.Serializable;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;

/**
*
* @author landreev
*
* The name of the class is provisional. I'm open to better-sounding alternatives,
* if anyone can think of any.
* But I wanted to avoid having the word "Globus" in the entity name. I'm adding
* it specifically for the Globus use case. But I'm guessing there's a chance
* this setup may come in handy for other types of datafile uploads that happen
* externally. (?)
*/
@NamedQueries({
@NamedQuery(name = "ExternalFileUploadInProgress.deleteByTaskId",
query = "DELETE FROM ExternalFileUploadInProgress f WHERE f.taskId=:taskId"),
@NamedQuery(name = "ExternalFileUploadInProgress.findByTaskId",
query = "SELECT f FROM ExternalFileUploadInProgress f WHERE f.taskId=:taskId")})
@Entity
@Table(indexes = {@Index(columnList="taskid")})
public class ExternalFileUploadInProgress implements Serializable {

private static final long serialVersionUID = 1L;
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}

/**
* Rather than saving various individual fields defining the datafile,
* which would essentially replicate the DataFile table, we are simply
* storing the full json record as passed to the API here.
*/
@Column(columnDefinition = "TEXT", nullable=false)
private String fileInfo;

/**
* This is Globus-specific task id associated with the upload in progress
*/
@Column(nullable=false)
private String taskId;

public ExternalFileUploadInProgress() {
}

public ExternalFileUploadInProgress(String taskId, String fileInfo) {
this.taskId = taskId;
this.fileInfo = fileInfo;
}

public String getFileInfo() {
return fileInfo;
}

public void setFileInfo(String fileInfo) {
this.fileInfo = fileInfo;
}

public String getTaskId() {
return taskId;
}

public void setTaskId(String taskId) {
this.taskId = taskId;
}

@Override
public int hashCode() {
int hash = 0;
hash += (id != null ? id.hashCode() : 0);
return hash;
}

@Override
public boolean equals(Object object) {
// TODO: Warning - this method won't work in the case the id fields are not set
if (!(object instanceof ExternalFileUploadInProgress)) {
return false;
}
ExternalFileUploadInProgress other = (ExternalFileUploadInProgress) object;
if ((this.id == null && other.id != null) || (this.id != null && !this.id.equals(other.id))) {
return false;
}
return true;
}

@Override
public String toString() {
return "edu.harvard.iq.dataverse.ExternalFileUploadInProgress[ id=" + id + " ]";
}

}
27 changes: 26 additions & 1 deletion src/main/java/edu/harvard/iq/dataverse/MailServiceBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,7 @@ public String getMessageTextBasedOnNotification(UserNotification userNotificatio
comment
)) ;
return downloadCompletedMessage;

case GLOBUSUPLOADCOMPLETEDWITHERRORS:
dataset = (Dataset) targetObject;
messageText = BundleUtil.getStringFromBundle("notification.email.greeting.html");
Expand All @@ -634,8 +635,30 @@ public String getMessageTextBasedOnNotification(UserNotification userNotificatio
comment
)) ;
return uploadCompletedWithErrorsMessage;

case GLOBUSUPLOADREMOTEFAILURE:
dataset = (Dataset) targetObject;
messageText = BundleUtil.getStringFromBundle("notification.email.greeting.html");
String uploadFailedRemotelyMessage = messageText + BundleUtil.getStringFromBundle("notification.mail.globus.upload.failedRemotely", Arrays.asList(
systemConfig.getDataverseSiteUrl(),
dataset.getGlobalId().asString(),
dataset.getDisplayName(),
comment
)) ;
return uploadFailedRemotelyMessage;

case GLOBUSDOWNLOADCOMPLETEDWITHERRORS:
case GLOBUSUPLOADLOCALFAILURE:
dataset = (Dataset) targetObject;
messageText = BundleUtil.getStringFromBundle("notification.email.greeting.html");
String uploadFailedLocallyMessage = messageText + BundleUtil.getStringFromBundle("notification.mail.globus.upload.failedLocally", Arrays.asList(
systemConfig.getDataverseSiteUrl(),
dataset.getGlobalId().asString(),
dataset.getDisplayName(),
comment
)) ;
return uploadFailedLocallyMessage;

case GLOBUSDOWNLOADCOMPLETEDWITHERRORS:
dataset = (Dataset) targetObject;
messageText = BundleUtil.getStringFromBundle("notification.email.greeting.html");
String downloadCompletedWithErrorsMessage = messageText + BundleUtil.getStringFromBundle("notification.mail.globus.download.completedWithErrors", Arrays.asList(
Expand Down Expand Up @@ -764,6 +787,8 @@ public Object getObjectOfNotification (UserNotification userNotification){
return versionService.find(userNotification.getObjectId());
case GLOBUSUPLOADCOMPLETED:
case GLOBUSUPLOADCOMPLETEDWITHERRORS:
case GLOBUSUPLOADREMOTEFAILURE:
case GLOBUSUPLOADLOCALFAILURE:
case GLOBUSDOWNLOADCOMPLETED:
case GLOBUSDOWNLOADCOMPLETEDWITHERRORS:
return datasetService.find(userNotification.getObjectId());
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/edu/harvard/iq/dataverse/UserNotification.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public enum Type {
CHECKSUMIMPORT, CHECKSUMFAIL, CONFIRMEMAIL, APIGENERATED, INGESTCOMPLETED, INGESTCOMPLETEDWITHERRORS,
PUBLISHFAILED_PIDREG, WORKFLOW_SUCCESS, WORKFLOW_FAILURE, STATUSUPDATED, DATASETCREATED, DATASETMENTIONED,
GLOBUSUPLOADCOMPLETED, GLOBUSUPLOADCOMPLETEDWITHERRORS,
GLOBUSDOWNLOADCOMPLETED, GLOBUSDOWNLOADCOMPLETEDWITHERRORS, REQUESTEDFILEACCESS;
GLOBUSDOWNLOADCOMPLETED, GLOBUSDOWNLOADCOMPLETEDWITHERRORS, REQUESTEDFILEACCESS,
GLOBUSUPLOADREMOTEFAILURE, GLOBUSUPLOADLOCALFAILURE;

public String getDescription() {
return BundleUtil.getStringFromBundle("notification.typeDescription." + this.name());
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/edu/harvard/iq/dataverse/api/ApiConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,8 @@ private ApiConstants() {
public static final String DS_VERSION_LATEST = ":latest";
public static final String DS_VERSION_DRAFT = ":draft";
public static final String DS_VERSION_LATEST_PUBLISHED = ":latest-published";

// addFiles call
public static final String API_ADD_FILES_COUNT_PROCESSED = "Total number of files";
public static final String API_ADD_FILES_COUNT_SUCCESSFUL = "Number of files successfully added";
}
43 changes: 33 additions & 10 deletions src/main/java/edu/harvard/iq/dataverse/api/Datasets.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import java.util.stream.Collectors;

import static edu.harvard.iq.dataverse.api.ApiConstants.*;
import edu.harvard.iq.dataverse.engine.command.exception.IllegalCommandException;
import edu.harvard.iq.dataverse.dataset.DatasetType;
import edu.harvard.iq.dataverse.dataset.DatasetTypeServiceBean;
import static edu.harvard.iq.dataverse.util.json.JsonPrinter.*;
Expand Down Expand Up @@ -3913,7 +3914,7 @@ public Response requestGlobusUpload(@Context ContainerRequestContext crc, @PathP

if (!systemConfig.isGlobusUpload()) {
return error(Response.Status.SERVICE_UNAVAILABLE,
BundleUtil.getStringFromBundle("datasets.api.globusdownloaddisabled"));
BundleUtil.getStringFromBundle("file.api.globusUploadDisabled"));
}

// -------------------------------------
Expand Down Expand Up @@ -4013,10 +4014,6 @@ public Response addGlobusFilesToDataset(@Context ContainerRequestContext crc,

logger.info(" ==== (api addGlobusFilesToDataset) jsonData ====== " + jsonData);

if (!systemConfig.isHTTPUpload()) {
return error(Response.Status.SERVICE_UNAVAILABLE, BundleUtil.getStringFromBundle("file.api.httpDisabled"));
}

// -------------------------------------
// (1) Get the user from the API key
// -------------------------------------
Expand All @@ -4039,6 +4036,32 @@ public Response addGlobusFilesToDataset(@Context ContainerRequestContext crc,
return wr.getResponse();
}

// Is Globus upload service available?

// ... on this Dataverse instance?
if (!systemConfig.isGlobusUpload()) {
return error(Response.Status.SERVICE_UNAVAILABLE, BundleUtil.getStringFromBundle("file.api.globusUploadDisabled"));
}

// ... and on this specific Dataset?
String storeId = dataset.getEffectiveStorageDriverId();
// acceptsGlobusTransfers should only be true for an S3 or globus store
if (!GlobusAccessibleStore.acceptsGlobusTransfers(storeId)
&& !GlobusAccessibleStore.allowsGlobusReferences(storeId)) {
return badRequest(BundleUtil.getStringFromBundle("datasets.api.globusuploaddisabled"));
}

// Check if the dataset is already locked
// We are reusing the code and logic used by various command to determine
// if there are any locks on the dataset that would prevent the current
// users from modifying it:
try {
DataverseRequest dataverseRequest = createDataverseRequest(authUser);
permissionService.checkEditDatasetLock(dataset, dataverseRequest, null);
} catch (IllegalCommandException icex) {
return error(Response.Status.FORBIDDEN, "Dataset " + datasetId + " is locked: " + icex.getLocalizedMessage());
}

JsonObject jsonObject = null;
try {
jsonObject = JsonUtil.getJsonObject(jsonData);
Expand Down Expand Up @@ -4069,18 +4092,18 @@ public Response addGlobusFilesToDataset(@Context ContainerRequestContext crc,
logger.log(Level.WARNING, "Failed to lock the dataset (dataset id={0})", dataset.getId());
}


ApiToken token = authSvc.findApiTokenByUser(authUser);

if(uriInfo != null) {
logger.info(" ==== (api uriInfo.getRequestUri()) jsonData ====== " + uriInfo.getRequestUri().toString());
}


String requestUrl = SystemConfig.getDataverseSiteUrlStatic();

// Async Call
globusService.globusUpload(jsonObject, token, dataset, requestUrl, authUser);
try {
globusService.globusUpload(jsonObject, dataset, requestUrl, authUser);
} catch (IllegalArgumentException ex) {
return badRequest("Invalid parameters: "+ex.getMessage());
}

return ok("Async call to Globus Upload started ");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,8 @@ public void displayNotification() {
case GLOBUSUPLOADCOMPLETEDWITHERRORS:
case GLOBUSDOWNLOADCOMPLETED:
case GLOBUSDOWNLOADCOMPLETEDWITHERRORS:
case GLOBUSUPLOADREMOTEFAILURE:
case GLOBUSUPLOADLOCALFAILURE:
userNotification.setTheObject(datasetService.find(userNotification.getObjectId()));
break;

Expand Down
Loading

0 comments on commit d40ce32

Please sign in to comment.