Skip to content

Commit

Permalink
Merge pull request #925 from conveyal/dev
Browse files Browse the repository at this point in the history
January 2024 Point Release 7.1
  • Loading branch information
ansoncfit committed Jan 5, 2024
2 parents 6363b28 + b7d02fd commit 584aafa
Show file tree
Hide file tree
Showing 15 changed files with 134 additions and 75 deletions.
21 changes: 14 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ plugins {
id 'application'
id 'maven-publish'
id 'com.palantir.git-version' version '2.0.0'
id 'com.github.johnrengelman.shadow' version '8.1.1'
}

group = 'com.conveyal'
Expand All @@ -19,7 +20,7 @@ jar {
// For Java 11+ Modules, specify a module name.
// Do not create module-info.java until all our dependencies specify a module name.
// Main-Class BackendMain will start a local backend.
// Build-Jdk-Spec mimics a Maven manifest entry that helps us automatically install the right JVM.
// Build-Jdk-Spec mimics a Maven manifest entry that helps us automatically install or select the right JVM.
// Implementation-X attributes are needed for ImageIO (used by Geotools) to initialize in some environments.
manifest {
attributes 'Automatic-Module-Name': 'com.conveyal.r5',
Expand All @@ -31,6 +32,10 @@ jar {
}
}

shadowJar {
mergeServiceFiles()
}

// Allow reflective access by ObjectDiffer to normally closed Java internals. Used for round-trip testing serialization.
// IntelliJ seems not to pass these JVM arguments when running tests from within the IDE, so the Kryo serialization
// tests may only succeed under command line Gradle.
Expand All @@ -42,8 +47,8 @@ test {
'--add-opens=java.base/java.lang=ALL-UNNAMED']
}

// `gradle publish` will upload both shadow and simple JAR to Github Packages
// On GH Actions, GITHUB_ACTOR env variable is supplied without specifying it in action yml.
// Set up publication of jar files to GitHub Packages Maven repository.
// On GitHub Actions, GITHUB_ACTOR env variable is supplied without specifying it in action yml.
publishing {
repositories {
maven {
Expand All @@ -56,10 +61,12 @@ publishing {
}
}
publications {
// The presence of the shadow plugin somehow causes the shadow-jar to also be automatically included in this
// publication. Ideally we want to produce the shadow jar and upload it to S3 as a worker, but only publish the
// much smaller plain JAR without dependencies to Github Packages. On the other hand, we may want to publish
// shadow jars for tagged releases.
// The shadow plugin automatically creates and registers a component called "shadow" for integration with this
// Maven publish plugin. `gradle publish` will then upload both shadow jar and simple jar to Github Packages.
// See https://imperceptiblethoughts.com/shadow/getting-started/#default-java-groovy-tasks
// To run R5 with dependencies, Conveyal does not use shadow jars anymore, only the zip distribution or runBackend.
// For development builds and tests we don't need to produce a shadow jar, only publish the much smaller plain
// jar without dependencies to Github Packages. For now, we continue to attach shadow jars to tagged releases.
gpr(MavenPublication) {
from(components.java)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private void respondToException(Exception e, Request request, Response response,
// Include a stack trace except when the error is known to be about unauthenticated or unauthorized access,
// in which case we don't want to leak information about the server to people scanning it for weaknesses.
if (type != UNAUTHORIZED && type != FORBIDDEN) {
body.put("stackTrace", errorEvent.stackTrace);
body.put("stackTrace", errorEvent.filteredStackTrace);
}
response.status(code);
response.type("application/json");
Expand Down
38 changes: 25 additions & 13 deletions src/main/java/com/conveyal/analysis/components/broker/Broker.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,19 +177,23 @@ public synchronized void enqueueTasksForRegionalJob (RegionalAnalysis regionalAn
LOG.error("Someone tried to enqueue job {} but it already exists.", templateTask.jobId);
throw new RuntimeException("Enqueued duplicate job " + templateTask.jobId);
}
// Create the Job object to share with the MultiOriginAssembler, but defer adding this job to the Multimap of
// active jobs until we're sure the result assembler was constructed without any errors. Always add and remove
// the Job and corresponding MultiOriginAssembler as a unit in the same synchronized block of code (see #887).
WorkerTags workerTags = WorkerTags.fromRegionalAnalysis(regionalAnalysis);
Job job = new Job(templateTask, workerTags);
jobs.put(job.workerCategory, job);

// Register the regional job so results received from multiple workers can be assembled into one file.
// If any parameters fail checks here, an exception may cause this method to exit early.
// TODO encapsulate MultiOriginAssemblers in a new Component
// Note: if this fails with an exception we'll have a job enqueued, possibly being processed, with no assembler.
// That is not catastrophic, but the user may need to recognize and delete the stalled regional job.
MultiOriginAssembler assembler = new MultiOriginAssembler(regionalAnalysis, job, fileStorage);
resultAssemblers.put(templateTask.jobId, assembler);

// A MultiOriginAssembler was successfully put in place. It's now safe to register and start the Job.
jobs.put(job.workerCategory, job);

// If this is a fake job for testing, don't confuse the worker startup code below with its null graph ID.
if (config.testTaskRedelivery()) {
// This is a fake job for testing, don't confuse the worker startup code below with null graph ID.
return;
}

Expand Down Expand Up @@ -385,14 +389,20 @@ public synchronized void markTaskCompleted (Job job, int taskId) {
}

/**
* When job.errors is non-empty, job.isErrored() becomes true and job.isActive() becomes false.
* Record an error that happened while a worker was processing a task on the given job. This method is tolerant
* of job being null, because it's called on a code path where any number of things could be wrong or missing.
* This method also ensures synchronization of writes to Jobs from any non-synchronized sections of an HTTP handler.
* Once job.errors is non-empty, job.isErrored() becomes true and job.isActive() becomes false.
* The Job will stop delivering tasks, allowing workers to shut down, but will continue to exist allowing the user
* to see the error message. User will then need to manually delete it, which will remove the result assembler.
* This method ensures synchronization of writes to Jobs from the unsynchronized worker poll HTTP handler.
*/
private synchronized void recordJobError (Job job, String error) {
if (job != null) {
job.errors.add(error);
// Limit the number of errors recorded to one.
// Still using a Set<String> instead of just String since the set of errors is exposed in a UI-facing API.
if (job.errors.isEmpty()) {
job.errors.add(error);
}
}
}

Expand Down Expand Up @@ -488,21 +498,23 @@ public void handleRegionalWorkResult(RegionalWorkResult workResult) {
// Once the job is retrieved, it can be used below to requestExtraWorkersIfAppropriate without synchronization,
// because that method only uses final fields of the job.
Job job = null;
MultiOriginAssembler assembler;
try {
MultiOriginAssembler assembler;
synchronized (this) {
job = findJob(workResult.jobId);
// Record any error reported by the worker and don't pass bad results on to regional result assembly.
// This will mark the job as errored and not-active, stopping distribution of tasks to workers.
// To ensure that happens, record errors before any other conditional that could exit this method.
if (workResult.error != null) {
recordJobError(job, workResult.error);
return;
}
assembler = resultAssemblers.get(workResult.jobId);
if (job == null || assembler == null || !job.isActive()) {
// This will happen naturally for all delivered tasks after a job is deleted or it errors out.
LOG.debug("Ignoring result for unrecognized, deleted, or inactive job ID {}.", workResult.jobId);
return;
}
if (workResult.error != null) {
// Record any error reported by the worker and don't pass bad results on to regional result assembly.
recordJobError(job, workResult.error);
return;
}
// Mark tasks completed first before passing results to the assembler. On the final result received,
// this will minimize the risk of race conditions by quickly making the job invisible to incoming stray
// results from spurious redeliveries, before the assembler is busy finalizing and uploading results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,11 @@ private RegionalTask makeOneTask (int taskNumber) {
public int deliveryPass = 0;

/**
* If any error compromises the usabilty or quality of results from any origin, it is recorded here.
* If any error compromises the usability or quality of results from any origin, it is recorded here.
* This is a Set because identical errors are likely to be reported from many workers or individual tasks.
* The presence of an error here causes the job to be considered "errored" and "inactive" and stop delivering tasks.
* There is some risk here of accumulating unbounded amounts of large error messages (see #919).
* The field type could be changed to a single String instead of Set, but it's exposed on a UI-facing API as a Set.
*/
public final Set<String> errors = new HashSet();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@

import com.conveyal.r5.util.ExceptionUtils;

import static com.conveyal.r5.util.ExceptionUtils.filterStackTrace;

/**
* This Event is fired each time a Throwable (usually an Exception or Error) occurs on the backend. It can then be
* recorded or tracked in various places - the console logs, Slack, etc. This could eventually be used for errors on
* the workers as well, but we'd have to be careful not to generate hundreds of messages at once.
*/
public class ErrorEvent extends Event {

// We may serialize this object, so we convert the Throwable to two strings to control its representation.
// All Events are intended to be eligible for serialization into a log or database, so we convert the Throwable to
// some Strings to determine its representation in a simple way.
// For flexibility in event handlers, it is tempting to hold on to the original Throwable instead of derived
// Strings. Exceptions are famously slow, but it's the initial creation and filling in the stack trace that are
// slow. Once the instace exists, repeatedly examining its stack trace should not be prohibitively costly. Still,
// we do probably gain some efficiency by converting the stack trace to a String once and reusing that.
// slow. Once the instance exists, repeatedly examining its stack trace should not be prohibitively costly.

public final String summary;

Expand All @@ -23,11 +25,16 @@ public class ErrorEvent extends Event {
*/
public final String httpPath;

/** The full stack trace of the exception that occurred. */
public final String stackTrace;

/** A minimal stack trace showing the immediate cause within Conveyal code. */
public final String filteredStackTrace;

public ErrorEvent (Throwable throwable, String httpPath) {
this.summary = ExceptionUtils.shortCauseString(throwable);
this.stackTrace = ExceptionUtils.stackTraceString(throwable);
this.filteredStackTrace = ExceptionUtils.filterStackTrace(throwable);
this.httpPath = httpPath;
}

Expand All @@ -54,25 +61,9 @@ public String traceWithContext (boolean verbose) {
if (verbose) {
builder.append(stackTrace);
} else {
builder.append(filterStackTrace(stackTrace));
builder.append(filteredStackTrace);
}
return builder.toString();
}

private static String filterStackTrace (String stackTrace) {
if (stackTrace == null) return null;
final String unknownFrame = "Unknown stack frame, probably optimized out by JVM.";
String error = stackTrace.lines().findFirst().get();
String frame = stackTrace.lines()
.map(String::strip)
.filter(s -> s.startsWith("at "))
.findFirst().orElse(unknownFrame);
String conveyalFrame = stackTrace.lines()
.map(String::strip)
.filter(s -> s.startsWith("at com.conveyal."))
.filter(s -> !frame.equals(s))
.findFirst().orElse("");
return String.join("\n", error, frame, conveyalFrame);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ public abstract class CsvResultWriter extends BaseResultWriter implements Region
*/
public abstract CsvResultType resultType ();

/** Override to provide column names for this CSV writer. */
/**
* Override to provide column names for this CSV writer.
* NOTE: Due to Java weirdness, subclass implementations of this method will be called by the CsvResultWriter
* constructor at a time when fields of the subclass remain initialized, but uninitialized final primitive
* fields are still readable! Do not read subclass fields in these implementations until/unless this is restructured.
*/
protected abstract String[] columnHeaders ();

/** Override to extract row values from a single origin result. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.conveyal.file.FileStorage;
import com.conveyal.file.FileStorageFormat;
import com.conveyal.r5.analyst.PointSet;
import com.conveyal.r5.analyst.cluster.PathResult;
import com.conveyal.r5.analyst.cluster.RegionalTask;
import com.conveyal.r5.analyst.cluster.RegionalWorkResult;
import com.conveyal.r5.util.ExceptionUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -89,21 +91,27 @@ public MultiOriginAssembler (RegionalAnalysis regionalAnalysis, Job job, FileSto
this.job = job;
this.nOriginsTotal = job.nTasksTotal;
this.originsReceived = new BitSet(job.nTasksTotal);
// Check that origin and destination sets are not too big for generating CSV files.
if (!job.templateTask.makeTauiSite &&
job.templateTask.destinationPointSetKeys[0].endsWith(FileStorageFormat.FREEFORM.extension)
) {
// This requires us to have already loaded this destination pointset instance into the transient field.
PointSet destinationPointSet = job.templateTask.destinationPointSets[0];
if ((job.templateTask.recordTimes || job.templateTask.includePathResults) && !job.templateTask.oneToOne) {
if (nOriginsTotal * destinationPointSet.featureCount() > MAX_FREEFORM_OD_PAIRS ||
destinationPointSet.featureCount() > MAX_FREEFORM_DESTINATIONS
) {
throw new AnalysisServerException(String.format(
"Freeform requests limited to %d destinations and %d origin-destination pairs.",
MAX_FREEFORM_DESTINATIONS, MAX_FREEFORM_OD_PAIRS
));
}
// If results have been requested for freeform origins, check that the origin and
// destination pointsets are not too big for generating CSV files.
RegionalTask task = job.templateTask;
if (!task.makeTauiSite && task.destinationPointSetKeys[0].endsWith(FileStorageFormat.FREEFORM.extension)) {
// This requires us to have already loaded this destination pointset instance into the transient field.
PointSet destinationPointSet = task.destinationPointSets[0];
int nDestinations = destinationPointSet.featureCount();
int nODPairs = task.oneToOne ? nOriginsTotal : nOriginsTotal * nDestinations;
if (task.recordTimes &&
(nDestinations > MAX_FREEFORM_DESTINATIONS || nODPairs > MAX_FREEFORM_OD_PAIRS)) {
throw AnalysisServerException.badRequest(String.format(
"Travel time results limited to %d destinations and %d origin-destination pairs.",
MAX_FREEFORM_DESTINATIONS, MAX_FREEFORM_OD_PAIRS
));
}
if (task.includePathResults &&
(nDestinations > PathResult.MAX_PATH_DESTINATIONS || nODPairs > MAX_FREEFORM_OD_PAIRS)) {
throw AnalysisServerException.badRequest(String.format(
"Path results limited to %d destinations and %d origin-destination pairs.",
PathResult.MAX_PATH_DESTINATIONS, MAX_FREEFORM_OD_PAIRS
));
}
}

Expand Down Expand Up @@ -152,8 +160,11 @@ public MultiOriginAssembler (RegionalAnalysis regionalAnalysis, Job job, FileSto
regionalAnalysis.resultStorage.put(csvWriter.resultType(), csvWriter.fileName);
}
}
} catch (AnalysisServerException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("Exception while creating multi-origin assembler: " + ExceptionUtils.stackTraceString(e));
// Handle any obscure problems we don't want end users to see without context of MultiOriginAssembler.
throw new RuntimeException("Exception while creating multi-origin assembler: " + e.toString(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void load (ShapeDataStore store) throws Exception {
for (SimpleFeatureIterator it = sfc.features(); it.hasNext();) {
GeobufFeature feat = new GeobufFeature(it.next());
feat.id = null;
feat.numericId = Long.parseLong((String) feat.properties.get("GEOID10"));
feat.numericId = Long.parseLong((String) feat.properties.get("GEOID20"));
feat.properties = new HashMap<>();
store.add(feat);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public class TemporalDensityResult {

/**
* The temporal density of opportunities. For each destination set, for each percentile, for each minute of
* travel from 0 to 120, the number of opportunities reached in travel times from i (inclusive) to i+1 (exclusive).
* travel m from 0 to 119, the number of opportunities reached in travel times from m (inclusive) to m+1
* (exclusive).
*/
public final double[][][] opportunitiesPerMinute;

Expand All @@ -57,7 +58,7 @@ public void recordOneTarget (int target, int[] travelTimePercentilesSeconds) {
break; // If any percentile is unreached, all higher ones are also unreached.
}
int m = travelTimePercentilesSeconds[p] / 60;
if (m <= 120) {
if (m < 120) {
opportunitiesPerMinute[d][p][m] += dps.getOpportunityCount(target);
}
}
Expand Down
Loading

0 comments on commit 584aafa

Please sign in to comment.