Skip to content

Commit

Permalink
Use loggedAt to replace future occurredAt timestamps (apache#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
tweise committed Aug 2, 2019
1 parent e428da0 commit 2f28c20
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ public TypeInformation<WindowedValue<byte[]>> getProducedType() {
@Override
public WindowedValue<byte[]> deserialize(
byte[] messageKey, byte[] message, String topic, int partition, long offset) {
// System.out.println("###Kafka record: " + new String(message, Charset.defaultCharset()));
return WindowedValue.valueInGlobalWindow(message);
}

Expand Down Expand Up @@ -322,24 +321,42 @@ public WindowedValue<byte[]> deserialize(
throw new IOException("Events is not an array");
}

// Determine the timestamp as minimum occurred_at of all contained events.
// For each event, attempt to extract occurred_at from either date string or number.
// Assume timestamp as logged_at, when occurred_at is an (invalid) future timestamp.

Iterator<JsonNode> iter = events.elements();
long timestamp = Long.MAX_VALUE;
while (iter.hasNext()) {
JsonNode occurredAt = iter.next().path(EventField.EventOccurredAt.fieldName());
JsonNode event = iter.next();
JsonNode occurredAt = event.path(EventField.EventOccurredAt.fieldName());
try {
long occurredAtMillis;
if (occurredAt.isTextual()) {
timestamp = Math.min(parseDateTime(occurredAt.textValue()), timestamp);
occurredAtMillis = parseDateTime(occurredAt.textValue());
} else if (occurredAt.isNumber()) {
timestamp = occurredAt.asLong();
occurredAtMillis = occurredAt.asLong();
} else {
continue;
}
if (event.has(EventField.EventLoggedAt.fieldName())) {
long loggedAtMillis = event.path(EventField.EventLoggedAt.fieldName()).asLong();
if (loggedAtMillis > 0) {
if (loggedAtMillis < Integer.MAX_VALUE) {
loggedAtMillis = loggedAtMillis * 1000;
}
occurredAtMillis = Math.min(occurredAtMillis, loggedAtMillis);
}
}
timestamp = Math.min(occurredAtMillis, timestamp);
} catch (DateTimeParseException e) {
// skip this timestamp
}
}

// if we didn't find any valid timestamps, use Long.MIN_VALUE
if (timestamp == Long.MAX_VALUE) {
timestamp = Long.MIN_VALUE;
}
// if we didn't find any valid timestamps, use Long.MIN_VALUE
if (timestamp == Long.MAX_VALUE) {
timestamp = Long.MIN_VALUE;
}

return WindowedValue.timestampedValueInGlobalWindow(recordValue, new Instant(timestamp));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@
*/
package org.apache.beam.runners.flink;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Base64;
import java.util.TimeZone;
import java.util.zip.Deflater;
import org.apache.beam.runners.flink.LyftFlinkStreamingPortableTranslations.LyftBase64ZlibJsonSchema;
import org.apache.beam.sdk.util.WindowedValue;
import org.junit.Assert;
Expand Down Expand Up @@ -59,8 +64,7 @@ public void testBeamKinesisSchemaLongTimestamp() throws IOException {

@Test
public void testBeamKinesisSchemaNoTimestamp() throws IOException {
// [{"event_id": 1}]
byte[] message = Base64.getDecoder().decode("eJyLrlZKLUvNK4nPTFGyUjCsjQUANv8Fzg==");
byte[] message = encode("[{\"event_id\": 1}]");

LyftBase64ZlibJsonSchema schema = new LyftBase64ZlibJsonSchema();
WindowedValue<byte[]> value = schema.deserialize(message, "", "", 0, "", "");
Expand All @@ -86,4 +90,36 @@ public void testBeamKinesisSchemaMultipleRecords() throws IOException {
// we should output the oldest timestamp in the bundle
Assert.assertEquals(1540599602000L, value.getTimestamp().getMillis());
}

@Test
public void testBeamKinesisSchemaFutureOccurredAtTimestamp() throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));

long loggedAtMillis = sdf.parse("2018-10-27 00:10:02.000000").getTime();
String events =
"[{\"event_id\": 1, \"occurred_at\": \"2018-10-27 00:20:02.900\", \"logged_at\": "
+ loggedAtMillis / 1000
+ "}]";
byte[] message = encode(events);
LyftBase64ZlibJsonSchema schema = new LyftBase64ZlibJsonSchema();
WindowedValue<byte[]> value = schema.deserialize(message, "", "", 0, "", "");

Assert.assertArrayEquals(message, value.getValue());
Assert.assertEquals(loggedAtMillis, value.getTimestamp().getMillis());
}

private static byte[] encode(String data) throws IOException {
Deflater deflater = new Deflater();
deflater.setInput(data.getBytes(Charset.defaultCharset()));
deflater.finish();
byte[] buf = new byte[4096];
try (ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length())) {
while (!deflater.finished()) {
int count = deflater.deflate(buf);
bos.write(buf, 0, count);
}
return bos.toByteArray();
}
}
}

0 comments on commit 2f28c20

Please sign in to comment.