Skip to content

Commit

Permalink
Rename seekToRowGroup to seekToPosition in SeekableInputStream (faceb…
Browse files Browse the repository at this point in the history
…ookincubator#1526)

Summary:
SeekableInputStream will be used by the column readers for different
file formats in the near future. This PR decouples the ORC RowGroup
concept from the SeekableInputStream by renaming seekToRowGroup() to
seekToPosition(), so it can be generalized for different file formats that do
not support ORC RowGroups.

This is the first PR to resolve facebookincubator#1533

Pull Request resolved: facebookincubator#1526

Reviewed By: zzhao0

Differential Revision: D36348871

Pulled By: oerling

fbshipit-source-id: ba0baf16c7951f86a5da6f51471800eed6ba3134
  • Loading branch information
yingsu00 authored and U-CCR\zhenhuiz committed Jun 21, 2022
1 parent 4978f40 commit ebe23e8
Show file tree
Hide file tree
Showing 17 changed files with 25 additions and 25 deletions.
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/common/ByteRLE.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ void ByteRleDecoder::nextBuffer() {

void ByteRleDecoder::seekToRowGroup(PositionProvider& positionProvider) {
// move the input stream
inputStream->seekToRowGroup(positionProvider);
inputStream->seekToPosition(positionProvider);
// force a re-read from the stream
bufferEnd = bufferStart;
// force reading a new header
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/common/CacheInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ google::protobuf::int64 CacheInputStream::ByteCount() const {
return static_cast<google::protobuf::int64>(position_);
}

void CacheInputStream::seekToRowGroup(PositionProvider& seekPosition) {
void CacheInputStream::seekToPosition(PositionProvider& seekPosition) {
position_ = seekPosition.next();
}

Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/common/CacheInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class CacheInputStream : public SeekableInputStream {
void BackUp(int count) override;
bool Skip(int count) override;
google::protobuf::int64 ByteCount() const override;
void seekToRowGroup(PositionProvider& position) override;
void seekToPosition(PositionProvider& position) override;
std::string getName() const override;
size_t loadIndices(const proto::RowIndex& rowIndex, size_t startIndex)
override;
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/common/DirectDecoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace facebook::velox::dwrf {
template <bool isSigned>
void DirectDecoder<isSigned>::seekToRowGroup(PositionProvider& location) {
// move the input stream
IntDecoder<isSigned>::inputStream->seekToRowGroup(location);
IntDecoder<isSigned>::inputStream->seekToPosition(location);
// force a re-read from the stream
IntDecoder<isSigned>::bufferEnd = IntDecoder<isSigned>::bufferStart;
}
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/common/FloatingPointDecoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class FloatingPointDecoder {
}

void seekToRowGroup(PositionProvider& positionProvider) {
input_->seekToRowGroup(positionProvider);
input_->seekToPosition(positionProvider);
bufferStart_ = bufferEnd_;
}

Expand Down
6 changes: 3 additions & 3 deletions velox/dwio/dwrf/common/InputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ google::protobuf::int64 SeekableArrayInputStream::ByteCount() const {
return static_cast<google::protobuf::int64>(position);
}

void SeekableArrayInputStream::seekToRowGroup(PositionProvider& seekPosition) {
position = seekPosition.next();
void SeekableArrayInputStream::seekToPosition(PositionProvider& position) {
this->position = position.next();
}

std::string SeekableArrayInputStream::getName() const {
Expand Down Expand Up @@ -249,7 +249,7 @@ google::protobuf::int64 SeekableFileInputStream::ByteCount() const {
return static_cast<google::protobuf::int64>(position);
}

void SeekableFileInputStream::seekToRowGroup(PositionProvider& location) {
void SeekableFileInputStream::seekToPosition(PositionProvider& location) {
position = location.next();
DWIO_ENSURE_LE(position, length, "seek too far");
pushBack = 0;
Expand Down
6 changes: 3 additions & 3 deletions velox/dwio/dwrf/common/InputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class SeekableInputStream : public google::protobuf::io::ZeroCopyInputStream {
public:
~SeekableInputStream() override = default;

virtual void seekToRowGroup(PositionProvider& position) = 0;
virtual void seekToPosition(PositionProvider& position) = 0;

virtual std::string getName() const = 0;

Expand Down Expand Up @@ -99,7 +99,7 @@ class SeekableArrayInputStream : public SeekableInputStream {
virtual void BackUp(int32_t count) override;
virtual bool Skip(int32_t count) override;
virtual google::protobuf::int64 ByteCount() const override;
virtual void seekToRowGroup(PositionProvider& position) override;
virtual void seekToPosition(PositionProvider& position) override;
virtual std::string getName() const override;
virtual size_t loadIndices(const proto::RowIndex& rowIndex, size_t startIndex)
override;
Expand Down Expand Up @@ -134,7 +134,7 @@ class SeekableFileInputStream : public SeekableInputStream {
virtual void BackUp(int32_t count) override;
virtual bool Skip(int32_t count) override;
virtual google::protobuf::int64 ByteCount() const override;
virtual void seekToRowGroup(PositionProvider& position) override;
virtual void seekToPosition(PositionProvider& position) override;
virtual std::string getName() const override;
virtual size_t loadIndices(const proto::RowIndex& rowIndex, size_t startIndex)
override;
Expand Down
4 changes: 2 additions & 2 deletions velox/dwio/dwrf/common/PagedInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,14 @@ void PagedInputStream::clearDecompressionState() {
inputBufferPtrEnd_ = nullptr;
}

void PagedInputStream::seekToRowGroup(PositionProvider& positionProvider) {
void PagedInputStream::seekToPosition(PositionProvider& positionProvider) {
auto compressedOffset = positionProvider.next();
auto uncompressedOffset = positionProvider.next();

if (compressedOffset != lastHeaderOffset_) {
std::vector<uint64_t> positions = {compressedOffset};
auto provider = PositionProvider(positions);
input_->seekToRowGroup(provider);
input_->seekToPosition(provider);

clearDecompressionState();

Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/common/PagedInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class PagedInputStream : public SeekableInputStream {
google::protobuf::int64 ByteCount() const override {
return bytesReturned_;
}
void seekToRowGroup(PositionProvider& position) override;
void seekToPosition(PositionProvider& position) override;
std::string getName() const override {
return folly::to<std::string>(
"PagedInputStream StreamInfo (",
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/common/RLEv1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ template void RleEncoderV1<false>::writeValues();
template <bool isSigned>
void RleDecoderV1<isSigned>::seekToRowGroup(PositionProvider& location) {
// move the input stream
IntDecoder<isSigned>::inputStream->seekToRowGroup(location);
IntDecoder<isSigned>::inputStream->seekToPosition(location);
// force a re-read from the stream
IntDecoder<isSigned>::bufferEnd = IntDecoder<isSigned>::bufferStart;
// force reading a new header
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/common/RLEv2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ template RleDecoderV2<false>::RleDecoderV2(
template <bool isSigned>
void RleDecoderV2<isSigned>::seekToRowGroup(PositionProvider& location) {
// move the input stream
IntDecoder<isSigned>::inputStream->seekToRowGroup(location);
IntDecoder<isSigned>::inputStream->seekToPosition(location);
// clear state
IntDecoder<isSigned>::bufferEnd = IntDecoder<isSigned>::bufferStart = 0;
runRead = runLength = 0;
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/reader/ColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1008,7 +1008,7 @@ void StringDictionaryColumnReader::loadStrideDictionary() {
std::vector<uint64_t> pos(
positions.begin() + positionOffset, positions.end());
PositionProvider pp(pos);
strideDictStream->seekToRowGroup(pp);
strideDictStream->seekToPosition(pp);
strideDictLengthDecoder->seekToRowGroup(pp);

detail::ensureCapacity<int64_t>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ void SelectiveStringDictionaryColumnReader::loadStrideDictionary() {
std::vector<uint64_t> pos(
positions.begin() + positionOffset_, positions.end());
PositionProvider pp(pos);
strideDictStream_->seekToRowGroup(pp);
strideDictStream_->seekToPosition(pp);
strideDictLengthDecoder_->seekToRowGroup(pp);

loadDictionary(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class SelectiveStringDictionaryColumnReader : public SelectiveColumnReader {
}

if (strideDictStream_) {
strideDictStream_->seekToRowGroup(positionsProvider);
strideDictStream_->seekToPosition(positionsProvider);
strideDictLengthDecoder_->seekToRowGroup(positionsProvider);
// skip row group dictionary size
positionsProvider.next();
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class SelectiveStringDirectColumnReader : public SelectiveColumnReader {
notNullDecoder_->seekToRowGroup(positionsProvider);
}

blobStream_->seekToRowGroup(positionsProvider);
blobStream_->seekToPosition(positionsProvider);
lengthDecoder_->seekToRowGroup(positionsProvider);

VELOX_CHECK(!positionsProvider.hasNext());
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/test/CacheInputTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ class CacheTest : public testing::Test {
0, region.length / 3, region.length * 2 / 3};
dwrf::PositionProvider positions(offsets);
for (auto i = 0; i < offsets.size(); ++i) {
stream.seekToRowGroup(positions);
stream.seekToPosition(positions);
checkRandomRead(stripe, stream, offsets, i, region);
}
}
Expand Down
8 changes: 4 additions & 4 deletions velox/dwio/dwrf/test/TestDecompression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,13 @@ TEST(TestDecompression, testFileSeek) {
{
std::vector<uint64_t> offsets(1, 100);
PositionProvider posn(offsets);
stream.seekToRowGroup(posn);
stream.seekToPosition(posn);
}
EXPECT_EQ(100, stream.ByteCount());
{
std::vector<uint64_t> offsets(1, 5);
PositionProvider posn(offsets);
stream.seekToRowGroup(posn);
stream.seekToPosition(posn);
}
EXPECT_EQ(5, stream.ByteCount());
ASSERT_EQ(true, stream.Next(&ptr, &len));
Expand All @@ -280,7 +280,7 @@ TEST(TestDecompression, testFileSeek) {
{
std::vector<uint64_t> offsets(1, 201);
PositionProvider posn(offsets);
EXPECT_THROW(stream.seekToRowGroup(posn), exception::LoggedException);
EXPECT_THROW(stream.seekToPosition(posn), exception::LoggedException);
}
}

Expand Down Expand Up @@ -832,7 +832,7 @@ class TestSeek : public ::testing::Test {
auto pos = arr[i];
std::vector<uint64_t> list{pos[0], pos[1]};
PositionProvider pp(list);
stream->seekToRowGroup(pp);
stream->seekToPosition(pp);
stream->Next(&data, &size);
EXPECT_EQ(size, inputSize - pos[1]);
EXPECT_EQ(0, memcmp(data, input[i] + pos[1], size));
Expand Down

0 comments on commit ebe23e8

Please sign in to comment.