Skip to content

Commit

Permalink
Sb:draft-2 fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Milos Ljubinkovic committed Jan 26, 2018
1 parent 342028c commit 1c6d16a
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -154,8 +156,8 @@ private Map<String, Object> collectOutputs(SBJob job, File workingDir, HashAlgor
if (resultFile.exists()) {
String resultStr = FileUtils.readFileToString(resultFile);
Map<String, Object> result = JSONHelper.readMap(resultStr);
postprocessToolCreatedResults(result, hashAlgorithm);
return JSONHelper.readMap(resultStr);
postprocessToolCreatedResults(result, hashAlgorithm, workingDir.toPath());
return result;
}

Map<String, Object> result = new HashMap<>();
Expand All @@ -172,37 +174,30 @@ private Map<String, Object> collectOutputs(SBJob job, File workingDir, HashAlgor
return result;
}

private void postprocessToolCreatedResults(Object value, HashAlgorithm hashAlgorithm) {
private void postprocessToolCreatedResults(Object value, HashAlgorithm hashAlgorithm, Path workDir) {
if (value == null) {
return;
}
if ((SBSchemaHelper.isFileFromValue(value))) {
File file = new File(SBFileValueHelper.getPath(value));
if (!file.exists()) {
return;
}
SBFileValueHelper.setSize(file.length(), value);

if (hashAlgorithm != null) {
String checksum = ChecksumHelper.checksum(file, hashAlgorithm);
if (checksum != null) {
SBFileValueHelper.setChecksum(checksum, value);
}
try {
SBFileValueHelper.buildMissingInfo(value, hashAlgorithm, workDir);
} catch (IOException | URISyntaxException e) {
logger.error("Couldn't postprocess file: " + value + " : " + e.getMessage());
}

List<Map<String, Object>> secondaryFiles = SBFileValueHelper.getSecondaryFiles(value);
if (secondaryFiles != null) {
for (Object secondaryFile : secondaryFiles) {
postprocessToolCreatedResults(secondaryFile, hashAlgorithm);
postprocessToolCreatedResults(secondaryFile, hashAlgorithm, workDir);
}
}
} else if (value instanceof List<?>) {
for (Object subvalue : (List<?>) value) {
postprocessToolCreatedResults(subvalue, hashAlgorithm);
postprocessToolCreatedResults(subvalue, hashAlgorithm, workDir);
}
} else if (value instanceof Map<?, ?>) {
for (Object subvalue : ((Map<?, ?>) value).values()) {
postprocessToolCreatedResults(subvalue, hashAlgorithm);
postprocessToolCreatedResults(subvalue, hashAlgorithm, workDir);
}
}
}
Expand Down Expand Up @@ -323,7 +318,7 @@ private List<Map<String, Object>> globFiles(final SBJob job, final File workingD
SBFileValueHelper.setName(file.getName(), fileData);
SBFileValueHelper.setPath(file.getAbsolutePath(), fileData);

List<?> secondaryFiles = getSecondaryFiles(job, hashAlgorithm, fileData, file.getAbsolutePath(), outputBinding);
List<?> secondaryFiles = getSecondaryFiles(job, hashAlgorithm, fileData, file.getAbsolutePath(), outputBinding, true);
if (secondaryFiles != null) {
SBFileValueHelper.setSecondaryFiles(secondaryFiles, fileData);
}
Expand Down Expand Up @@ -357,7 +352,7 @@ private List<Map<String, Object>> globFiles(final SBJob job, final File workingD
* Gets secondary files (absolute paths)
*/
public static List<Map<String, Object>> getSecondaryFiles(SBJob job, HashAlgorithm hashAlgorithm, Map<String, Object> fileValue, String fileName,
Object binding) throws SBExpressionException {
Object binding, boolean onlyExisting) throws SBExpressionException {
List<String> secondaryFileSufixes = SBBindingHelper.getSecondaryFiles(binding);

if (secondaryFileSufixes == null) {
Expand All @@ -383,8 +378,11 @@ public static List<Map<String, Object>> getSecondaryFiles(SBJob job, HashAlgorit
secondaryFilePath += suffix.startsWith(".") ? suffix : "." + suffix;
}
try {
Map<String, Object> file = SBFileValueHelper.pathToRawFile(Paths.get(secondaryFilePath), hashAlgorithm, Paths.get(SBFileValueHelper.getPath(fileValue)));
secondaryFileMaps.add(file);
Path pathToSec = Paths.get(secondaryFilePath);
if (Files.exists(pathToSec) || !onlyExisting) {
Map<String, Object> file = SBFileValueHelper.pathToRawFile(pathToSec, hashAlgorithm, Paths.get(SBFileValueHelper.getPath(fileValue)));
secondaryFileMaps.add(file);
}
} catch (IOException | URISyntaxException e) {
logger.error("Couldn't collect secondary file: " + secondaryFilePath);
}
Expand All @@ -393,6 +391,14 @@ public static List<Map<String, Object>> getSecondaryFiles(SBJob job, HashAlgorit
return secondaryFileMaps;
}

/**
* Gets secondary files (absolute paths)
*/
public static List<Map<String, Object>> getSecondaryFiles(SBJob job, HashAlgorithm hashAlgorithm, Map<String, Object> fileValue, String fileName,
Object binding) throws SBExpressionException {
return getSecondaryFiles(job, hashAlgorithm, fileValue, fileName, binding, true);
}

@Override
public Object transformInputs(Object value, Job job, Object transform) throws BindingException {
return value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,28 @@
public class SBDocumentResolver {

public static final String CWL_VERSION_KEY = "cwlVersion";

public static final String RESOLVER_JSON_POINTER_KEY = "$job";

public static final String DOCUMENT_FRAGMENT_SEPARATOR = "#";

private static final Map<String, Map<String, JsonNode>> fragmentsCache = new HashMap<>();

private static final Map<String, Map<String, SBDocumentResolverReference>> referenceCache = new HashMap<>();
private static final Map<String, LinkedHashSet<SBDocumentResolverReplacement>> replacements = new HashMap<>();

public static String resolve(String appUrl) throws BindingException {
String appUrlBase = appUrl;
URI uri = URI.create(appUrl);
if (uri.getScheme().equals(URIHelper.DATA_URI_SCHEME)) {

if (appUrlBase.startsWith(URIHelper.DATA_URI_SCHEME)) {
appUrlBase = URIHelper.extractBase(appUrl);
}

Path file = null;
JsonNode root = null;
try {
boolean isFile = URIHelper.isFile(appUrlBase);
if (isFile) {
URI uri = URI.create(appUrl);
file = Paths.get(uri.getPath());
} else {
file = Paths.get(".");
Expand All @@ -63,13 +63,13 @@ public static String resolve(String appUrl) throws BindingException {
}

JsonNode cwlVersion = root.get(CWL_VERSION_KEY);
if (cwlVersion == null || !(cwlVersion.asText().equals(ProtocolType.SB.appVersion))){
if (cwlVersion == null || !(cwlVersion.asText().equals(ProtocolType.SB.appVersion))) {
clearReplacements(appUrl);
clearReferenceCache(appUrl);
clearFragmentCache(appUrl);
throw new BindingWrongVersionException("Document version is not " + ProtocolType.SB.appVersion);
}

if (root.isArray()) {
Map<String, JsonNode> fragmentsCachePerUrl = getFragmentsCache(appUrl);
for (JsonNode child : root) {
Expand All @@ -78,7 +78,7 @@ public static String resolve(String appUrl) throws BindingException {
String fragment = URIHelper.extractFragment(appUrl);
root = fragmentsCachePerUrl.get(fragment);
}

traverse(appUrl, root, file, null, root);

for (SBDocumentResolverReplacement replacement : getReplacements(appUrl)) {
Expand All @@ -88,8 +88,8 @@ public static String resolve(String appUrl) throws BindingException {
replaceObjectItem(appUrl, root, replacement);
}
}
if(!(root.get(CWL_VERSION_KEY).asText().equals(ProtocolType.SB.appVersion))) {

if (!(root.get(CWL_VERSION_KEY).asText().equals(ProtocolType.SB.appVersion))) {
clearReplacements(appUrl);
clearReferenceCache(appUrl);
clearFragmentCache(appUrl);
Expand All @@ -101,12 +101,17 @@ public static String resolve(String appUrl) throws BindingException {
clearFragmentCache(appUrl);
return JSONHelper.writeObject(root);
}

private static JsonNode traverse(String appUrl, JsonNode root, Path file, JsonNode parentNode, JsonNode currentNode) throws BindingException {
Preconditions.checkNotNull(currentNode, "current node id is null");


boolean isJsonPointer = currentNode.has(RESOLVER_JSON_POINTER_KEY) && parentNode != null; // we skip the first level $job
boolean isJsonPointer = currentNode.has(RESOLVER_JSON_POINTER_KEY) && parentNode != null; // we
// skip
// the
// first
// level
// $job

if (isJsonPointer) {
String referencePath = currentNode.get(RESOLVER_JSON_POINTER_KEY).textValue();
Expand All @@ -122,7 +127,7 @@ private static JsonNode traverse(String appUrl, JsonNode root, Path file, JsonNo
getReferenceCache(appUrl).put(referencePath, reference);

Map<String, JsonNode> fragmentsCachePerUrl = getFragmentsCache(appUrl);

ParentChild parentChild = null;
JsonNode referenceDocumentRoot = null;
if (fragmentsCachePerUrl != null && fragmentsCachePerUrl.containsKey(referencePath)) {
Expand Down Expand Up @@ -214,7 +219,7 @@ private static JsonNode findDocumentRoot(JsonNode root, Path file, String refere
}
}
}

private static String loadContents(Path file, String path) throws BindingException {
if (path.startsWith("ftp")) {
try {
Expand Down Expand Up @@ -274,7 +279,7 @@ private static ParentChild findReferencedNode(JsonNode rootNode, String absolute
}
return new ParentChild(parent, child);
}

private synchronized static Set<SBDocumentResolverReplacement> getReplacements(String url) {
LinkedHashSet<SBDocumentResolverReplacement> replacementsPerUrl = replacements.get(url);
if (replacementsPerUrl == null) {
Expand All @@ -283,11 +288,11 @@ private synchronized static Set<SBDocumentResolverReplacement> getReplacements(S
}
return replacementsPerUrl;
}

private synchronized static void clearReplacements(String url) {
replacements.remove(url);
}

private synchronized static Map<String, SBDocumentResolverReference> getReferenceCache(String url) {
Map<String, SBDocumentResolverReference> referenceCachePerUrl = referenceCache.get(url);
if (referenceCachePerUrl == null) {
Expand All @@ -296,7 +301,7 @@ private synchronized static Map<String, SBDocumentResolverReference> getReferenc
}
return referenceCachePerUrl;
}

private synchronized static Map<String, JsonNode> getFragmentsCache(String url) {
Map<String, JsonNode> fragmentsCachePerUrl = fragmentsCache.get(url);
if (fragmentsCachePerUrl == null) {
Expand All @@ -305,15 +310,15 @@ private synchronized static Map<String, JsonNode> getFragmentsCache(String url)
}
return fragmentsCachePerUrl;
}

private synchronized static void clearReferenceCache(String url) {
referenceCache.remove(url);
}

private synchronized static void clearFragmentCache(String url) {
fragmentsCache.remove(url);
}

private static class ParentChild {
JsonNode parent;
JsonNode child;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.rabix.common.helper.CloneHelper;
import org.rabix.common.helper.InternalSchemaHelper;
import org.rabix.common.helper.JSONHelper;
import org.rabix.common.logging.VerboseLogger;
import org.rabix.engine.JobHelper;
import org.rabix.engine.event.Event;
import org.rabix.engine.event.impl.ContextStatusEvent;
Expand Down Expand Up @@ -150,6 +151,9 @@ public void handle(JobStatusEvent event, EventHandlingMode mode) throws EventHan
case COMPLETED:
if (!jobRecord.isRoot()) {
jobService.delete(jobRecord.getRootId(), jobRecord.getExternalId());
if (jobRecord.isContainer() || jobRecord.isScatterWrapper()) {
VerboseLogger.log(String.format("Job %s has completed", jobRecord.getId()));
}
}

updateJobStats(jobRecord, jobStatsRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ private void doProcessEvent(final ExternalEvent externalEvent) {
invalidateContext(event.getContextId());
} catch (Exception ex) {
logger.error("Failed to call jobFailed handler for job after event {} failed.", e, ex);
jobService.handleJobRootFailed(event.getContextId(), ex.getMessage());
}
} finally {
triggerGC(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public interface JobService {

void handleJobRootFailed(Job job);

void handleJobRootFailed(UUID job, String message);

void handleJobRootCompleted(Job job);

void handleJobFailed(Job failedJob);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,17 @@ public void handleJobRootFailed(Job job){
jobRepository.update(job);
stoppingRootIds.remove(job.getId());
}
handleJobRootFailed(job.getRootId(), job.getMessage());
}

@Override
public void handleJobRootFailed(UUID job, String message){
try {
engineStatusCallback.onJobRootFailed(job.getRootId(), job.getMessage());
engineStatusCallback.onJobRootFailed(job, message);
} catch (EngineStatusCallbackException e) {
logger.error("Engine status callback failed", e);
} finally {
garbageCollectionService.forceGc(job.getRootId());
garbageCollectionService.forceGc(job);
}
}

Expand Down
Loading

0 comments on commit 1c6d16a

Please sign in to comment.