Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Date type in HCatToRow #32695

Merged
merged 3 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.hcatalog;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
Expand All @@ -25,6 +27,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.joda.time.Instant;

/** Utilities to convert {@link HCatRecord HCatRecords} to {@link Row Rows}. */
@SuppressWarnings({
Expand Down Expand Up @@ -74,14 +77,23 @@ public PCollection<Row> expand(PBegin input) {
private static class HCatToRowFn extends DoFn<HCatRecord, Row> {
private final Schema schema;

private Object castHDate(Object obj) {
if (obj instanceof org.apache.hadoop.hive.common.type.Date) {
return new Instant(((org.apache.hadoop.hive.common.type.Date) obj).toEpochMilli());
}
return obj;
}

HCatToRowFn(Schema schema) {
this.schema = schema;
}

@ProcessElement
public void processElement(ProcessContext c) {
HCatRecord hCatRecord = c.element();
c.output(Row.withSchema(schema).addValues(hCatRecord.getAll()).build());
List<Object> recordValues =
hCatRecord.getAll().stream().map(this::castHDate).collect(Collectors.toList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we generalize the function, e.g. just naming as "castTypes", leaving space for future improvement if other type need a conversion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, thanks for the suggestion! I moved this whole logic to a separate new util castTypes. Please let me know if you simply meant s/castHDate/castTypes though!

c.output(Row.withSchema(schema).addValues(recordValues).build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.TEST_RECORDS_COUNT;
import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.TEST_TABLE;
import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.buildHCatRecords;
import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.buildHCatRecordsWithDate;
import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.getConfigPropertiesAsMap;
import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.getExpectedRecords;
import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.getReaderContext;
Expand Down Expand Up @@ -54,12 +55,14 @@
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
Expand Down Expand Up @@ -230,6 +233,44 @@ public void processElement(ProcessContext c) {
readAfterWritePipeline.run();
}

/** Perform test for reading Date column type from an hcatalog. */
@Test
public void testReadHCatalogDateType() throws Exception {
service.executeQuery("drop table if exists " + TEST_TABLE);
service.executeQuery("create table " + TEST_TABLE + "(mycol1 string, mycol2 date)");

defaultPipeline
.apply(Create.of(buildHCatRecordsWithDate(TEST_RECORDS_COUNT)))
.apply(
HCatalogIO.write()
.withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
.withDatabase(TEST_DATABASE)
.withTable(TEST_TABLE)
.withPartition(new java.util.HashMap<>()));
defaultPipeline.run().waitUntilFinish();

final PCollection<String> output =
readAfterWritePipeline
.apply(
HCatToRow.fromSpec(
HCatalogIO.read()
.withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
.withDatabase(TEST_DATABASE)
.withTable(TEST_TABLE)
.withFilter(TEST_FILTER)))
.apply(
ParDo.of(
new DoFn<Row, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().getDateTime("mycol2").toString("yyyy-MM-dd HH:mm:ss"));
}
}))
.apply(Distinct.create());
PAssert.that(output).containsInAnyOrder(ImmutableList.of("2014-01-20 00:00:00"));
readAfterWritePipeline.run();
}

/** Test of Write to a non-existent table. */
@Test
public void testWriteFailureTableDoesNotExist() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map.Entry;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.values.KV;
import org.apache.hadoop.hive.common.type.Date;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
Expand Down Expand Up @@ -120,4 +121,13 @@ public static Map<String, String> getConfigPropertiesAsMap(HiveConf hiveConf) {
private static DefaultHCatRecord toHCatRecord(int value) {
return new DefaultHCatRecord(Arrays.asList("record " + value, value));
}

/** Returns a list of HCatRecords of passed size with some dummy date as a field. */
public static List<HCatRecord> buildHCatRecordsWithDate(int size) {
List<HCatRecord> expected = new ArrayList<>();
for (int i = 0; i < size; i++) {
expected.add(new DefaultHCatRecord(Arrays.asList("record " + i, Date.valueOf("2014-01-20"))));
}
return expected;
}
}
Loading