From 726bd6c6a53cc4a369dd5bab47dbe57001b17b74 Mon Sep 17 00:00:00 2001 From: Dmitry Ulyumdzhiev Date: Mon, 7 Oct 2024 21:48:25 +0100 Subject: [PATCH 1/3] Handle Date type in HCatToRow Some initial notes: - The issue (#20685) deals with java.sql.Date, which I wasn't able to reproduce fully (I can currently write hcatalog hadoop.hive date) - On this note, 267f76f3c2036c27dcbc94c563ecd1a2d4481f65 changed the code involved so that there's a direct cast to AbstractInstant in RowUtils.java. This doesn't change much, but jfyi. --- .../beam/sdk/io/hcatalog/HCatToRow.java | 13 +++++- .../beam/sdk/io/hcatalog/HCatalogIOTest.java | 41 +++++++++++++++++++ .../io/hcatalog/test/HCatalogIOTestUtils.java | 10 +++++ 3 files changed, 63 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java index 8e29650f3fc3e..2152a72192132 100644 --- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java +++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java @@ -25,6 +25,9 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.hive.hcatalog.data.HCatRecord; +import java.util.stream.Collectors; +import java.util.List; +import org.joda.time.Instant; /** Utilities to convert {@link HCatRecord HCatRecords} to {@link Row Rows}. */ @SuppressWarnings({ @@ -74,6 +77,13 @@ public PCollection expand(PBegin input) { private static class HCatToRowFn extends DoFn { private final Schema schema; + private Object castHDate(Object obj) { + if (obj instanceof org.apache.hadoop.hive.common.type.Date) { + return new Instant(((org.apache.hadoop.hive.common.type.Date) obj).toEpochMilli()); + } + return obj; + } + HCatToRowFn(Schema schema) { this.schema = schema; } @@ -81,7 +91,8 @@ private static class HCatToRowFn extends DoFn { @ProcessElement public void processElement(ProcessContext c) { HCatRecord hCatRecord = c.element(); - c.output(Row.withSchema(schema).addValues(hCatRecord.getAll()).build()); + List recordValues = hCatRecord.getAll().stream().map(this::castHDate).collect(Collectors.toList()); + c.output(Row.withSchema(schema).addValues(recordValues).build()); } } } diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java index 4bb7e1bd70441..d07e0e61d42a5 100644 --- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java +++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java @@ -22,10 +22,13 @@ import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.TEST_RECORDS_COUNT; import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.TEST_TABLE; import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.buildHCatRecords; +import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.buildHCatRecordsWithDate; import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.getConfigPropertiesAsMap; import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.getExpectedRecords; import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.getReaderContext; import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.insertTestData; +import org.apache.beam.sdk.io.hcatalog.HCatToRow; +import org.apache.beam.sdk.values.Row; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; @@ -57,6 +60,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Watch; +import org.apache.beam.sdk.transforms.Distinct; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.values.PCollection; @@ -230,6 +234,43 @@ public void processElement(ProcessContext c) { readAfterWritePipeline.run(); } + /** Perform test for reading Date column type from an hcatalog. */ + @Test + public void testReadHCatalogDateType() throws Exception { + service.executeQuery("drop table if exists " + TEST_TABLE); + service.executeQuery("create table " + TEST_TABLE + "(mycol1 string, mycol2 date)"); + + defaultPipeline + .apply(Create.of(buildHCatRecordsWithDate(TEST_RECORDS_COUNT))) + .apply( + HCatalogIO.write() + .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf())) + .withDatabase(TEST_DATABASE) + .withTable(TEST_TABLE) + .withPartition(new java.util.HashMap<>())); + defaultPipeline.run().waitUntilFinish(); + + final PCollection output = readAfterWritePipeline + .apply( + HCatToRow.fromSpec( + HCatalogIO.read() + .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf())) + .withDatabase(TEST_DATABASE) + .withTable(TEST_TABLE) + .withFilter(TEST_FILTER))) + .apply( + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element().getDateTime("mycol2").toString("yyyy-MM-dd HH:mm:ss")); + } + })) + .apply(Distinct.create()); + PAssert.that(output).containsInAnyOrder(ImmutableList.of("2014-01-20 00:00:00")); + readAfterWritePipeline.run(); + } + /** Test of Write to a non-existent table. */ @Test public void testWriteFailureTableDoesNotExist() { diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/test/HCatalogIOTestUtils.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/test/HCatalogIOTestUtils.java index d0d1d850a6cbe..ec102c8fe5355 100644 --- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/test/HCatalogIOTestUtils.java +++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/test/HCatalogIOTestUtils.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import org.apache.hadoop.hive.common.type.Date; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.values.KV; import org.apache.hadoop.hive.conf.HiveConf; @@ -120,4 +121,13 @@ public static Map getConfigPropertiesAsMap(HiveConf hiveConf) { private static DefaultHCatRecord toHCatRecord(int value) { return new DefaultHCatRecord(Arrays.asList("record " + value, value)); } + + /** Returns a list of HCatRecords of passed size with some dummy date as a field. */ + public static List buildHCatRecordsWithDate(int size) { + List expected = new ArrayList<>(); + for (int i = 0; i < size; i++) { + expected.add(new DefaultHCatRecord(Arrays.asList("record " + i, Date.valueOf("2014-01-20")))); + } + return expected; + } } From 7c09ed854639cde5c86769824bbf5f17a78f6b9f Mon Sep 17 00:00:00 2001 From: Dmitry Ulyumdzhiev Date: Tue, 8 Oct 2024 10:58:07 +0100 Subject: [PATCH 2/3] Run: ./gradlew :sdks:java:io:hcatalog:spotlessApply --- .../beam/sdk/io/hcatalog/HCatToRow.java | 7 +++--- .../beam/sdk/io/hcatalog/HCatalogIOTest.java | 22 +++++++++---------- .../io/hcatalog/test/HCatalogIOTestUtils.java | 2 +- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java index 2152a72192132..dedbfd77cf788 100644 --- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java +++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.hcatalog; +import java.util.List; +import java.util.stream.Collectors; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -25,8 +27,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.hive.hcatalog.data.HCatRecord; -import java.util.stream.Collectors; -import java.util.List; import org.joda.time.Instant; /** Utilities to convert {@link HCatRecord HCatRecords} to {@link Row Rows}. */ @@ -91,7 +91,8 @@ private Object castHDate(Object obj) { @ProcessElement public void processElement(ProcessContext c) { HCatRecord hCatRecord = c.element(); - List recordValues = hCatRecord.getAll().stream().map(this::castHDate).collect(Collectors.toList()); + List recordValues = + hCatRecord.getAll().stream().map(this::castHDate).collect(Collectors.toList()); c.output(Row.withSchema(schema).addValues(recordValues).build()); } } diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java index d07e0e61d42a5..3d97a2ccc1d98 100644 --- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java +++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java @@ -27,8 +27,6 @@ import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.getExpectedRecords; import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.getReaderContext; import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.insertTestData; -import org.apache.beam.sdk.io.hcatalog.HCatToRow; -import org.apache.beam.sdk.values.Row; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; @@ -57,13 +55,14 @@ import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Distinct; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Watch; -import org.apache.beam.sdk.transforms.Distinct; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hive.hcatalog.data.DefaultHCatRecord; @@ -241,16 +240,17 @@ public void testReadHCatalogDateType() throws Exception { service.executeQuery("create table " + TEST_TABLE + "(mycol1 string, mycol2 date)"); defaultPipeline - .apply(Create.of(buildHCatRecordsWithDate(TEST_RECORDS_COUNT))) - .apply( - HCatalogIO.write() - .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf())) - .withDatabase(TEST_DATABASE) - .withTable(TEST_TABLE) - .withPartition(new java.util.HashMap<>())); + .apply(Create.of(buildHCatRecordsWithDate(TEST_RECORDS_COUNT))) + .apply( + HCatalogIO.write() + .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf())) + .withDatabase(TEST_DATABASE) + .withTable(TEST_TABLE) + .withPartition(new java.util.HashMap<>())); defaultPipeline.run().waitUntilFinish(); - final PCollection output = readAfterWritePipeline + final PCollection output = + readAfterWritePipeline .apply( HCatToRow.fromSpec( HCatalogIO.read() diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/test/HCatalogIOTestUtils.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/test/HCatalogIOTestUtils.java index ec102c8fe5355..c09c2c906d649 100644 --- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/test/HCatalogIOTestUtils.java +++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/test/HCatalogIOTestUtils.java @@ -24,9 +24,9 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import org.apache.hadoop.hive.common.type.Date; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.values.KV; +import org.apache.hadoop.hive.common.type.Date; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.data.DefaultHCatRecord; From e811481305452aac94c5b9d5ece50067ecd52ffb Mon Sep 17 00:00:00 2001 From: Dmitry Ulyumdzhiev Date: Wed, 9 Oct 2024 18:23:09 +0100 Subject: [PATCH 3/3] review cr: castTypes util - s/castHDate/maybeCastHDate/ to be more concise - move values manipulation to a separate util (hopefully, I understood the cr in the right way) --- .../org/apache/beam/sdk/io/hcatalog/HCatToRow.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java index dedbfd77cf788..e5bdf18ecbcf4 100644 --- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java +++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java @@ -77,13 +77,18 @@ public PCollection expand(PBegin input) { private static class HCatToRowFn extends DoFn { private final Schema schema; - private Object castHDate(Object obj) { + private Object maybeCastHDate(Object obj) { if (obj instanceof org.apache.hadoop.hive.common.type.Date) { return new Instant(((org.apache.hadoop.hive.common.type.Date) obj).toEpochMilli()); } return obj; } + /** Cast objects of the types that aren't supported by {@link Row}. */ + private List castTypes(List values) { + return values.stream().map(this::maybeCastHDate).collect(Collectors.toList()); + } + HCatToRowFn(Schema schema) { this.schema = schema; } @@ -91,9 +96,7 @@ private Object castHDate(Object obj) { @ProcessElement public void processElement(ProcessContext c) { HCatRecord hCatRecord = c.element(); - List recordValues = - hCatRecord.getAll().stream().map(this::castHDate).collect(Collectors.toList()); - c.output(Row.withSchema(schema).addValues(recordValues).build()); + c.output(Row.withSchema(schema).addValues(castTypes(hCatRecord.getAll())).build()); } } }