diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtils.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtils.java index 8552c27d5967e..9c015cbf57621 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtils.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtils.java @@ -44,6 +44,10 @@ public class WindmillTimeUtils { public static Instant windmillToHarnessTimestamp(long timestampUs) { // Windmill should never send us an unknown timestamp. Preconditions.checkArgument(timestampUs != Long.MIN_VALUE); + // Set windmill min timestamp to beam min timestamp + if(timestampUs == Long.MIN_VALUE +1){ + return BoundedWindow.TIMESTAMP_MIN_VALUE; + } Instant result = new Instant(divideAndRoundDown(timestampUs, 1000)); if (result.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) { // End of time. diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtilsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtilsTest.java index 5f910c3acb5f5..940f2b8f6a895 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtilsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimeUtilsTest.java @@ -56,6 +56,7 @@ public void testWindmillToHarnessTimestamp() { assertEquals(new Instant(-17), windmillToHarnessTimestamp(-16987)); assertEquals(new Instant(-17), windmillToHarnessTimestamp(-17000)); assertEquals(new Instant(-18), windmillToHarnessTimestamp(-17001)); + assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, windmillToHarnessTimestamp(Long.MIN_VALUE + 1)); } @Test