diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java index 442bc57eb0df8..7088498eedac3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java @@ -41,6 +41,8 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; import java.time.temporal.ChronoUnit; import java.util.AbstractMap; @@ -67,6 +69,12 @@ * with the Storage write API. */ public class TableRowToStorageApiProto { + // Custom formatter that accepts "2022-05-09 18:04:59.123456" + // The old dremel parser accepts this format, and so does insertall. We need to accept it + // for backwards compatibility, and it is based on UTC time. + private static final DateTimeFormatter DATETIME_SPACE_FORMATTER = + DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss.SSSSSS").withZone(ZoneOffset.UTC); + public static class SchemaConversionException extends Exception { SchemaConversionException(String msg) { super(msg); @@ -427,10 +435,19 @@ private static void fieldDescriptorFromTableField( case "TIMESTAMP": if (value instanceof String) { try { - return ChronoUnit.MICROS.between(Instant.EPOCH, Instant.parse((String) value)); - } catch (DateTimeParseException e) { + // '2011-12-03T10:15:30+01:00' '2011-12-03T10:15:30' return ChronoUnit.MICROS.between( - Instant.EPOCH, Instant.ofEpochMilli(Long.parseLong((String) value))); + Instant.EPOCH, Instant.from(DateTimeFormatter.ISO_DATE_TIME.parse((String) value))); + } catch (DateTimeParseException e) { + try { + // "12345667" + return ChronoUnit.MICROS.between( + Instant.EPOCH, Instant.ofEpochMilli(Long.parseLong((String) value))); + } catch (NumberFormatException e2) { + // "yyyy-MM-dd HH:mm:ss.SSSSSS" + return ChronoUnit.MICROS.between( + Instant.EPOCH, Instant.from(DATETIME_SPACE_FORMATTER.parse((String) value))); + } } } else if (value instanceof Instant) { return ChronoUnit.MICROS.between(Instant.EPOCH, (Instant) value); @@ -485,7 +502,15 @@ private static void fieldDescriptorFromTableField( break; case "DATETIME": if (value instanceof String) { - return CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.parse((String) value)); + try { + // '2011-12-03T10:15:30' + return CivilTimeEncoder.encodePacked64DatetimeMicros( + LocalDateTime.parse((String) value)); + } catch (DateTimeParseException e2) { + // '2011-12-03 10:15:30' + return CivilTimeEncoder.encodePacked64DatetimeMicros( + LocalDateTime.parse((String) value, DATETIME_SPACE_FORMATTER)); + } } else if (value instanceof Number) { return ((Number) value).longValue(); } else if (value instanceof LocalDateTime) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java index c2d6cf239848a..4a1fc31974438 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java @@ -82,6 +82,10 @@ public class TableRowToStorageApiProtoTest { .setType("BYTES") .setMode("REPEATED") .setName("arrayValue")) + .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampISOValue")) + .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueLong")) + .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueSpace")) + .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValueSpace")) .build()); private static final TableSchema BASE_TABLE_SCHEMA_NO_F = @@ -107,6 +111,10 @@ public class TableRowToStorageApiProtoTest { .setType("BYTES") .setMode("REPEATED") .setName("arrayValue")) + .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampISOValue")) + .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueLong")) + .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueSpace")) + .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValueSpace")) .build()); private static final DescriptorProto BASE_TABLE_SCHEMA_PROTO = @@ -223,6 +231,34 @@ public class TableRowToStorageApiProtoTest { .setType(Type.TYPE_BYTES) .setLabel(Label.LABEL_REPEATED) .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("timestampisovalue") + .setNumber(17) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("timestampvaluelong") + .setNumber(18) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("timestampvaluespace") + .setNumber(19) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("datetimevaluespace") + .setNumber(20) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) + .build()) .build(); private static final DescriptorProto BASE_TABLE_SCHEMA_NO_F_PROTO = @@ -332,6 +368,34 @@ public class TableRowToStorageApiProtoTest { .setType(Type.TYPE_BYTES) .setLabel(Label.LABEL_REPEATED) .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("timestampisovalue") + .setNumber(17) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("timestampvaluelong") + .setNumber(20) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("timestampvaluespace") + .setNumber(21) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("datetimevaluespace") + .setNumber(21) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL) + .build()) .build(); private static final TableSchema NESTED_TABLE_SCHEMA = new TableSchema() @@ -466,7 +530,11 @@ public void testNestedFromTableSchema() { new TableCell().setV("2019-08-16"), new TableCell().setV("23.4"), new TableCell().setV("2312345.4"), - new TableCell().setV(REPEATED_BYTES))); + new TableCell().setV(REPEATED_BYTES), + new TableCell().setV("1970-01-01T00:00:00.000+01:00"), + new TableCell().setV("1234567"), + new TableCell().setV("1970-01-01 00:00:00.000343"), + new TableCell().setV("2019-08-16 00:52:07.123456"))); private static final TableRow BASE_TABLE_ROW_NO_F = new TableRow() @@ -479,13 +547,19 @@ public void testNestedFromTableSchema() { .set("floatValue", "2") .set("boolValue", "true") .set("booleanValue", "true") + // UTC time .set("timestampValue", "1970-01-01T00:00:00.000043Z") .set("timeValue", "00:52:07.123456") .set("datetimeValue", "2019-08-16T00:52:07.123456") .set("dateValue", "2019-08-16") .set("numericValue", "23.4") .set("bigNumericValue", "2312345.4") - .set("arrayValue", REPEATED_BYTES); + .set("arrayValue", REPEATED_BYTES) + .set("timestampISOValue", "1970-01-01T00:00:00.000+01:00") + .set("timestampValueLong", "1234567") + // UTC time for backwards compatibility + .set("timestampValueSpace", "1970-01-01 00:00:00.000343") + .set("datetimeValueSpace", "2019-08-16 00:52:07.123456"); private static final Map BASE_ROW_EXPECTED_PROTO_VALUES = ImmutableMap.builder() @@ -509,6 +583,10 @@ public void testNestedFromTableSchema() { "bignumericvalue", BigDecimalByteStringEncoder.encodeToBigNumericByteString(new BigDecimal("2312345.4"))) .put("arrayvalue", EXPECTED_PROTO_REPEATED_BYTES) + .put("timestampisovalue", -3600000000L) + .put("timestampvaluelong", 1234567000L) + .put("timestampvaluespace", 343L) + .put("datetimevaluespace", 142111881387172416L) .build(); private static final Map BASE_ROW_NO_F_EXPECTED_PROTO_VALUES = @@ -532,6 +610,10 @@ public void testNestedFromTableSchema() { "bignumericvalue", BigDecimalByteStringEncoder.encodeToBigNumericByteString(new BigDecimal("2312345.4"))) .put("arrayvalue", EXPECTED_PROTO_REPEATED_BYTES) + .put("timestampisovalue", -3600000000L) + .put("timestampvaluelong", 1234567000L) + .put("timestampvaluespace", 343L) + .put("datetimevaluespace", 142111881387172416L) .build(); private void assertBaseRecord(DynamicMessage msg, boolean withF) {