From 04b00dea837c47b4951333623bfed602e13e7236 Mon Sep 17 00:00:00 2001 From: Sarthak Aggarwal Date: Thu, 1 Jun 2023 17:32:04 +0530 Subject: [PATCH 01/11] Adding zstd module to source Signed-off-by: Sarthak Aggarwal --- server/licenses/zstd-jni-1.5.5-1.jar.sha1 | 1 + .../opensearch/index/codec/CodecService.java | 9 +- .../PerFieldMappingPostingFormatCodec.java | 6 + .../customcodecs/Lucene95CustomCodec.java | 65 ++++++ .../Lucene95CustomStoredFieldsFormat.java | 121 ++++++++++ .../index/codec/customcodecs/ZstdCodec.java | 44 ++++ .../customcodecs/ZstdCompressionMode.java | 206 ++++++++++++++++ .../codec/customcodecs/ZstdNoDictCodec.java | 44 ++++ .../ZstdNoDictCompressionMode.java | 181 +++++++++++++++ .../opensearch/index/engine/EngineConfig.java | 4 +- .../services/org.apache.lucene.codecs.Codec | 2 + .../org/opensearch/bootstrap/security.policy | 3 + .../customcodecs/AbstractCompressorTests.java | 219 ++++++++++++++++++ .../customcodecs/ZstdCompressorTests.java | 30 +++ .../ZstdNoDictCompressorTests.java | 30 +++ 15 files changed, 963 insertions(+), 2 deletions(-) create mode 100644 server/licenses/zstd-jni-1.5.5-1.jar.sha1 create mode 100644 server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java create mode 100644 server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java create mode 100644 server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCodec.java create mode 100644 server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java create mode 100644 server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCodec.java create mode 100644 server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java create mode 100644 server/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec create mode 100644 server/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java create mode 100644 server/src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java create mode 100644 server/src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java diff --git a/server/licenses/zstd-jni-1.5.5-1.jar.sha1 b/server/licenses/zstd-jni-1.5.5-1.jar.sha1 new file mode 100644 index 0000000000000..bfb9e565bc6d5 --- /dev/null +++ b/server/licenses/zstd-jni-1.5.5-1.jar.sha1 @@ -0,0 +1 @@ +fda1d6278299af27484e1cc3c79a060e41b7ef7e \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/index/codec/CodecService.java b/server/src/main/java/org/opensearch/index/codec/CodecService.java index e4899c02d37e8..fffcd5ee60b02 100644 --- a/server/src/main/java/org/opensearch/index/codec/CodecService.java +++ b/server/src/main/java/org/opensearch/index/codec/CodecService.java @@ -38,6 +38,8 @@ import org.apache.lucene.codecs.lucene95.Lucene95Codec.Mode; import org.opensearch.common.Nullable; import org.opensearch.common.collect.MapBuilder; +import org.opensearch.index.codec.customcodecs.ZstdCodec; +import org.opensearch.index.codec.customcodecs.ZstdNoDictCodec; import org.opensearch.index.mapper.MapperService; import java.util.Map; @@ -58,15 +60,20 @@ public class CodecService { public static final String BEST_COMPRESSION_CODEC = "best_compression"; /** the raw unfiltered lucene default. useful for testing */ public static final String LUCENE_DEFAULT_CODEC = "lucene_default"; - + public static final String ZSTD_CODEC = "zstd"; + public static final String ZSTD_NO_DICT_CODEC = "zstd_no_dict"; public CodecService(@Nullable MapperService mapperService, Logger logger) { final MapBuilder codecs = MapBuilder.newMapBuilder(); if (mapperService == null) { codecs.put(DEFAULT_CODEC, new Lucene95Codec()); codecs.put(BEST_COMPRESSION_CODEC, new Lucene95Codec(Mode.BEST_COMPRESSION)); + codecs.put(ZSTD_CODEC, new ZstdCodec()); + codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec()); } else { codecs.put(DEFAULT_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger)); codecs.put(BEST_COMPRESSION_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger)); + codecs.put(ZSTD_CODEC, new ZstdCodec(mapperService, logger)); + codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(mapperService, logger)); } codecs.put(LUCENE_DEFAULT_CODEC, Codec.getDefault()); for (String codec : Codec.availableCodecs()) { diff --git a/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java b/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java index f1b515534bdeb..3059dc89e85dc 100644 --- a/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java +++ b/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java @@ -69,6 +69,12 @@ public PerFieldMappingPostingFormatCodec(Mode compressionMode, MapperService map this.logger = logger; } + public PerFieldMappingPostingFormatCodec(MapperService mapperService, Logger logger) { + super(); + this.mapperService = mapperService; + this.logger = logger; + } + @Override public PostingsFormat getPostingsFormatForField(String field) { final MappedFieldType fieldType = mapperService.fieldType(field); diff --git a/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java b/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java new file mode 100644 index 0000000000000..1011314f72a4e --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.FilterCodec; +import org.apache.lucene.codecs.StoredFieldsFormat; +import org.apache.lucene.codecs.lucene95.Lucene95Codec; +import org.opensearch.index.codec.PerFieldMappingPostingFormatCodec; +import org.opensearch.index.mapper.MapperService; + +abstract class Lucene95CustomCodec extends FilterCodec { + public static final int DEFAULT_COMPRESSION_LEVEL = 6; + + /** Each mode represents a compression algorithm. */ + public enum Mode { + ZSTD, + ZSTD_NO_DICT + } + + private final StoredFieldsFormat storedFieldsFormat; + + /** + * Creates a new compression codec with the default compression level. + * + * @param mode The compression codec (ZSTD or ZSTDNODICT). + */ + public Lucene95CustomCodec(Mode mode) { + this(mode, DEFAULT_COMPRESSION_LEVEL); + } + + /** + * Creates a new compression codec with the given compression level. We use + * lowercase letters when registering the codec so that we remain consistent with + * the other compression codecs: default, lucene_default, and best_compression. + * + * @param mode The compression codec (ZSTD or ZSTDNODICT). + * @parama compressionLevel The compression level. + */ + public Lucene95CustomCodec(Mode mode, int compressionLevel) { + super("Lucene95CustomCodec", new Lucene95Codec()); + this.storedFieldsFormat = new Lucene95CustomStoredFieldsFormat(mode, compressionLevel); + } + + public Lucene95CustomCodec(Mode mode, int compressionLevel, MapperService mapperService, Logger logger) { + super("Lucene95CustomCodec", new PerFieldMappingPostingFormatCodec(mapperService, logger)); + this.storedFieldsFormat = new Lucene95CustomStoredFieldsFormat(mode, compressionLevel); + } + + @Override + public StoredFieldsFormat storedFieldsFormat() { + return storedFieldsFormat; + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java b/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java new file mode 100644 index 0000000000000..12b5febef8d34 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java @@ -0,0 +1,121 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.codecs.StoredFieldsFormat; +import org.apache.lucene.codecs.StoredFieldsReader; +import org.apache.lucene.codecs.StoredFieldsWriter; +import org.apache.lucene.codecs.compressing.CompressionMode; +import org.apache.lucene.codecs.lucene90.compressing.Lucene90CompressingStoredFieldsFormat; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.SegmentInfo; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; + +import java.io.IOException; +import java.util.Objects; + +/** Stored field format used by pluggable codec */ +public class Lucene95CustomStoredFieldsFormat extends StoredFieldsFormat { + + /** A key that we use to map to a mode */ + public static final String MODE_KEY = Lucene95CustomStoredFieldsFormat.class.getSimpleName() + ".mode"; + + private static final int ZSTD_BLOCK_LENGTH = 10 * 48 * 1024; + private static final int ZSTD_MAX_DOCS_PER_BLOCK = 4096; + private static final int ZSTD_BLOCK_SHIFT = 10; + + private final CompressionMode zstdCompressionMode; + private final CompressionMode zstdNoDictCompressionMode; + + private final Lucene95CustomCodec.Mode mode; + + /** default constructor */ + public Lucene95CustomStoredFieldsFormat() { + this(Lucene95CustomCodec.Mode.ZSTD, Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL); + } + + /** + * Creates a new instance. + * + * @param mode The mode represents ZSTD or ZSTDNODICT + */ + public Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode mode) { + this(mode, Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL); + } + + /** + * Creates a new instance with the specified mode and compression level. + * + * @param mode The mode represents ZSTD or ZSTDNODICT + * @param compressionLevel The compression level for the mode. + */ + public Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode mode, int compressionLevel) { + this.mode = Objects.requireNonNull(mode); + zstdCompressionMode = new ZstdCompressionMode(compressionLevel); + zstdNoDictCompressionMode = new ZstdNoDictCompressionMode(compressionLevel); + } + + /** + * Returns a {@link StoredFieldsReader} to load stored fields. + * @param directory The index directory. + * @param si The SegmentInfo that stores segment information. + * @param fn The fieldInfos. + * @param context The IOContext that holds additional details on the merge/search context. + */ + @Override + public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo si, FieldInfos fn, IOContext context) throws IOException { + String value = si.getAttribute(MODE_KEY); + if (value == null) { + throw new IllegalStateException("missing value for " + MODE_KEY + " for segment: " + si.name); + } + Lucene95CustomCodec.Mode mode = Lucene95CustomCodec.Mode.valueOf(value); + return impl(mode).fieldsReader(directory, si, fn, context); + } + + /** + * Returns a {@link StoredFieldsReader} to write stored fields. + * @param directory The index directory. + * @param si The SegmentInfo that stores segment information. + * @param context The IOContext that holds additional details on the merge/search context. + */ + @Override + public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo si, IOContext context) throws IOException { + String previous = si.putAttribute(MODE_KEY, mode.name()); + if (previous != null && previous.equals(mode.name()) == false) { + throw new IllegalStateException( + "found existing value for " + MODE_KEY + " for segment: " + si.name + " old = " + previous + ", new = " + mode.name() + ); + } + return impl(mode).fieldsWriter(directory, si, context); + } + + private StoredFieldsFormat impl(Lucene95CustomCodec.Mode mode) { + switch (mode) { + case ZSTD: + return new Lucene90CompressingStoredFieldsFormat( + "CustomStoredFieldsZstd", + zstdCompressionMode, + ZSTD_BLOCK_LENGTH, + ZSTD_MAX_DOCS_PER_BLOCK, + ZSTD_BLOCK_SHIFT + ); + case ZSTD_NO_DICT: + return new Lucene90CompressingStoredFieldsFormat( + "CustomStoredFieldsZstdNoDict", + zstdNoDictCompressionMode, + ZSTD_BLOCK_LENGTH, + ZSTD_MAX_DOCS_PER_BLOCK, + ZSTD_BLOCK_SHIFT + ); + default: + throw new AssertionError(); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCodec.java b/server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCodec.java new file mode 100644 index 0000000000000..68da782421e6e --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCodec.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.logging.log4j.Logger; +import org.opensearch.index.mapper.MapperService; + +/** + * ZstdCodec provides ZSTD compressor using the zstd-jni library. + */ +public class ZstdCodec extends Lucene95CustomCodec { + + /** + * Creates a new ZstdCodec instance with the default compression level. + */ + public ZstdCodec() { + this(DEFAULT_COMPRESSION_LEVEL); + } + + /** + * Creates a new ZstdCodec instance. + * + * @param compressionLevel The compression level. + */ + public ZstdCodec(int compressionLevel) { + super(Mode.ZSTD, compressionLevel); + } + + public ZstdCodec(MapperService mapperService, Logger logger) { + super(Mode.ZSTD, DEFAULT_COMPRESSION_LEVEL, mapperService, logger); + } + + /** The name for this codec. */ + @Override + public String toString() { + return getClass().getSimpleName(); + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java b/server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java new file mode 100644 index 0000000000000..7057dac3d6bd2 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java @@ -0,0 +1,206 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import com.github.luben.zstd.Zstd; +import com.github.luben.zstd.ZstdCompressCtx; +import com.github.luben.zstd.ZstdDecompressCtx; +import com.github.luben.zstd.ZstdDictCompress; +import com.github.luben.zstd.ZstdDictDecompress; +import org.apache.lucene.codecs.compressing.CompressionMode; +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.BytesRef; + +import java.io.IOException; + +/** Zstandard Compression Mode */ +public class ZstdCompressionMode extends CompressionMode { + + private static final int NUM_SUB_BLOCKS = 10; + private static final int DICT_SIZE_FACTOR = 6; + private static final int DEFAULT_COMPRESSION_LEVEL = 6; + + private final int compressionLevel; + + /** default constructor */ + protected ZstdCompressionMode() { + this.compressionLevel = DEFAULT_COMPRESSION_LEVEL; + } + + /** + * Creates a new instance. + * + * @param compressionLevel The compression level to use. + */ + protected ZstdCompressionMode(int compressionLevel) { + this.compressionLevel = compressionLevel; + } + + /** Creates a new compressor instance.*/ + @Override + public Compressor newCompressor() { + return new ZstdCompressor(compressionLevel); + } + + /** Creates a new decompressor instance. */ + @Override + public Decompressor newDecompressor() { + return new ZstdDecompressor(); + } + + /** zstandard compressor */ + private static final class ZstdCompressor extends Compressor { + + private final int compressionLevel; + private byte[] compressedBuffer; + + /** compressor with a given compresion level */ + public ZstdCompressor(int compressionLevel) { + this.compressionLevel = compressionLevel; + compressedBuffer = BytesRef.EMPTY_BYTES; + } + + /*resuable compress function*/ + private void doCompress(byte[] bytes, int offset, int length, ZstdCompressCtx cctx, DataOutput out) throws IOException { + if (length == 0) { + out.writeVInt(0); + return; + } + final int maxCompressedLength = (int) Zstd.compressBound(length); + compressedBuffer = ArrayUtil.growNoCopy(compressedBuffer, maxCompressedLength); + + int compressedSize = cctx.compressByteArray(compressedBuffer, 0, compressedBuffer.length, bytes, offset, length); + + out.writeVInt(compressedSize); + out.writeBytes(compressedBuffer, compressedSize); + } + + private void compress(byte[] bytes, int offset, int length, DataOutput out) throws IOException { + assert offset >= 0 : "offset value must be greater than 0"; + + final int dictLength = length / (NUM_SUB_BLOCKS * DICT_SIZE_FACTOR); + final int blockLength = (length - dictLength + NUM_SUB_BLOCKS - 1) / NUM_SUB_BLOCKS; + out.writeVInt(dictLength); + out.writeVInt(blockLength); + + final int end = offset + length; + assert end >= 0 : "buffer read size must be greater than 0"; + + try (ZstdCompressCtx cctx = new ZstdCompressCtx()) { + cctx.setLevel(compressionLevel); + + // dictionary compression first + doCompress(bytes, offset, dictLength, cctx, out); + cctx.loadDict(new ZstdDictCompress(bytes, offset, dictLength, compressionLevel)); + + for (int start = offset + dictLength; start < end; start += blockLength) { + int l = Math.min(blockLength, end - start); + doCompress(bytes, start, l, cctx, out); + } + } + } + + @Override + public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException { + final int length = (int) buffersInput.size(); + byte[] bytes = new byte[length]; + buffersInput.readBytes(bytes, 0, length); + compress(bytes, 0, length, out); + } + + @Override + public void close() throws IOException {} + } + + /** zstandard decompressor */ + private static final class ZstdDecompressor extends Decompressor { + + private byte[] compressedBuffer; + + /** default decompressor */ + public ZstdDecompressor() { + compressedBuffer = BytesRef.EMPTY_BYTES; + } + + /*resuable decompress function*/ + private void doDecompress(DataInput in, ZstdDecompressCtx dctx, BytesRef bytes, int decompressedLen) throws IOException { + final int compressedLength = in.readVInt(); + if (compressedLength == 0) { + return; + } + + compressedBuffer = ArrayUtil.growNoCopy(compressedBuffer, compressedLength); + in.readBytes(compressedBuffer, 0, compressedLength); + + bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + decompressedLen); + int uncompressed = dctx.decompressByteArray(bytes.bytes, bytes.length, decompressedLen, compressedBuffer, 0, compressedLength); + + if (decompressedLen != uncompressed) { + throw new IllegalStateException(decompressedLen + " " + uncompressed); + } + bytes.length += uncompressed; + } + + @Override + public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException { + assert offset + length <= originalLength : "buffer read size must be within limit"; + + if (length == 0) { + bytes.length = 0; + return; + } + final int dictLength = in.readVInt(); + final int blockLength = in.readVInt(); + bytes.bytes = ArrayUtil.growNoCopy(bytes.bytes, dictLength); + bytes.offset = bytes.length = 0; + + try (ZstdDecompressCtx dctx = new ZstdDecompressCtx()) { + + // decompress dictionary first + doDecompress(in, dctx, bytes, dictLength); + + dctx.loadDict(new ZstdDictDecompress(bytes.bytes, 0, dictLength)); + + int offsetInBlock = dictLength; + int offsetInBytesRef = offset; + + // Skip unneeded blocks + while (offsetInBlock + blockLength < offset) { + final int compressedLength = in.readVInt(); + in.skipBytes(compressedLength); + offsetInBlock += blockLength; + offsetInBytesRef -= blockLength; + } + + // Read blocks that intersect with the interval we need + while (offsetInBlock < offset + length) { + bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + blockLength); + int l = Math.min(blockLength, originalLength - offsetInBlock); + doDecompress(in, dctx, bytes, l); + offsetInBlock += blockLength; + } + + bytes.offset = offsetInBytesRef; + bytes.length = length; + + assert bytes.isValid() : "decompression output is corrupted"; + } + } + + @Override + public Decompressor clone() { + return new ZstdDecompressor(); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCodec.java b/server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCodec.java new file mode 100644 index 0000000000000..26620473ec116 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCodec.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.logging.log4j.Logger; +import org.opensearch.index.mapper.MapperService; + +/** + * ZstdNoDictCodec provides ZSTD compressor without a dictionary support. + */ +public class ZstdNoDictCodec extends Lucene95CustomCodec { + + /** + * Creates a new ZstdNoDictCodec instance with the default compression level. + */ + public ZstdNoDictCodec() { + this(DEFAULT_COMPRESSION_LEVEL); + } + + /** + * Creates a new ZstdNoDictCodec instance. + * + * @param compressionLevel The compression level. + */ + public ZstdNoDictCodec(int compressionLevel) { + super(Mode.ZSTD_NO_DICT, compressionLevel); + } + + public ZstdNoDictCodec(MapperService mapperService, Logger logger) { + super(Mode.ZSTD_NO_DICT, DEFAULT_COMPRESSION_LEVEL, mapperService, logger); + } + + /** The name for this codec. */ + @Override + public String toString() { + return getClass().getSimpleName(); + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java b/server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java new file mode 100644 index 0000000000000..7a1d661550768 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java @@ -0,0 +1,181 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import com.github.luben.zstd.Zstd; +import org.apache.lucene.codecs.compressing.CompressionMode; +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.BytesRef; + +import java.io.IOException; + +/** ZSTD Compression Mode (without a dictionary support). */ +public class ZstdNoDictCompressionMode extends CompressionMode { + + private static final int NUM_SUB_BLOCKS = 10; + private static final int DEFAULT_COMPRESSION_LEVEL = 6; + + private final int compressionLevel; + + /** default constructor */ + protected ZstdNoDictCompressionMode() { + this.compressionLevel = DEFAULT_COMPRESSION_LEVEL; + } + + /** + * Creates a new instance with the given compression level. + * + * @param compressionLevel The compression level. + */ + protected ZstdNoDictCompressionMode(int compressionLevel) { + this.compressionLevel = compressionLevel; + } + + /** Creates a new compressor instance.*/ + @Override + public Compressor newCompressor() { + return new ZstdCompressor(compressionLevel); + } + + /** Creates a new decompressor instance. */ + @Override + public Decompressor newDecompressor() { + return new ZstdDecompressor(); + } + + /** zstandard compressor */ + private static final class ZstdCompressor extends Compressor { + + private final int compressionLevel; + private byte[] compressedBuffer; + + /** compressor with a given compresion level */ + public ZstdCompressor(int compressionLevel) { + this.compressionLevel = compressionLevel; + compressedBuffer = BytesRef.EMPTY_BYTES; + } + + private void compress(byte[] bytes, int offset, int length, DataOutput out) throws IOException { + assert offset >= 0 : "offset value must be greater than 0"; + + int blockLength = (length + NUM_SUB_BLOCKS - 1) / NUM_SUB_BLOCKS; + out.writeVInt(blockLength); + + final int end = offset + length; + assert end >= 0 : "buffer read size must be greater than 0"; + + for (int start = offset; start < end; start += blockLength) { + int l = Math.min(blockLength, end - start); + + if (l == 0) { + out.writeVInt(0); + return; + } + + final int maxCompressedLength = (int) Zstd.compressBound(l); + compressedBuffer = ArrayUtil.growNoCopy(compressedBuffer, maxCompressedLength); + + int compressedSize = (int) Zstd.compressByteArray( + compressedBuffer, + 0, + compressedBuffer.length, + bytes, + start, + l, + compressionLevel + ); + + out.writeVInt(compressedSize); + out.writeBytes(compressedBuffer, compressedSize); + } + } + + @Override + public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException { + final int length = (int) buffersInput.size(); + byte[] bytes = new byte[length]; + buffersInput.readBytes(bytes, 0, length); + compress(bytes, 0, length, out); + } + + @Override + public void close() throws IOException {} + } + + /** zstandard decompressor */ + private static final class ZstdDecompressor extends Decompressor { + + private byte[] compressed; + + /** default decompressor */ + public ZstdDecompressor() { + compressed = BytesRef.EMPTY_BYTES; + } + + @Override + public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException { + assert offset + length <= originalLength : "buffer read size must be within limit"; + + if (length == 0) { + bytes.length = 0; + return; + } + + final int blockLength = in.readVInt(); + bytes.offset = bytes.length = 0; + int offsetInBlock = 0; + int offsetInBytesRef = offset; + + // Skip unneeded blocks + while (offsetInBlock + blockLength < offset) { + final int compressedLength = in.readVInt(); + in.skipBytes(compressedLength); + offsetInBlock += blockLength; + offsetInBytesRef -= blockLength; + } + + // Read blocks that intersect with the interval we need + while (offsetInBlock < offset + length) { + bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + blockLength); + final int compressedLength = in.readVInt(); + if (compressedLength == 0) { + return; + } + compressed = ArrayUtil.growNoCopy(compressed, compressedLength); + in.readBytes(compressed, 0, compressedLength); + + int l = Math.min(blockLength, originalLength - offsetInBlock); + bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + l); + + byte[] output = new byte[l]; + + final int uncompressed = (int) Zstd.decompressByteArray(output, 0, l, compressed, 0, compressedLength); + System.arraycopy(output, 0, bytes.bytes, bytes.length, uncompressed); + + bytes.length += uncompressed; + offsetInBlock += blockLength; + } + + bytes.offset = offsetInBytesRef; + bytes.length = length; + + assert bytes.isValid() : "decompression output is corrupted."; + } + + @Override + public Decompressor clone() { + return new ZstdDecompressor(); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java index 338a541af387a..7419cf1dadea6 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java @@ -129,12 +129,14 @@ public Supplier retentionLeasesSupplier() { switch (s) { case "default": case "best_compression": + case "zstd": + case "zstd_no_dict": case "lucene_default": return s; default: if (Codec.availableCodecs().contains(s) == false) { // we don't error message the not officially supported ones throw new IllegalArgumentException( - "unknown value for [index.codec] must be one of [default, best_compression] but was: " + s + "unknown value for [index.codec] must be one of [default, best_compression, zstd, zstd_no_dict] but was: " + s ); } return s; diff --git a/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec b/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec new file mode 100644 index 0000000000000..8b37d91cd8bc4 --- /dev/null +++ b/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec @@ -0,0 +1,2 @@ +org.opensearch.index.codec.customcodecs.ZstdCodec +org.opensearch.index.codec.customcodecs.ZstdNoDictCodec diff --git a/server/src/main/resources/org/opensearch/bootstrap/security.policy b/server/src/main/resources/org/opensearch/bootstrap/security.policy index 77cd0ab05278e..ce552c123a1fa 100644 --- a/server/src/main/resources/org/opensearch/bootstrap/security.policy +++ b/server/src/main/resources/org/opensearch/bootstrap/security.policy @@ -88,6 +88,9 @@ grant codeBase "${codebase.zstd-jni}" { //// Everything else: grant { + + permission java.lang.RuntimePermission "loadLibrary.*"; + // needed by vendored Guice permission java.lang.RuntimePermission "accessClassInPackage.jdk.internal.vm.annotation"; diff --git a/server/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java b/server/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java new file mode 100644 index 0000000000000..cc794eb2c48f1 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java @@ -0,0 +1,219 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; +import org.apache.lucene.store.ByteArrayDataInput; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.store.ByteBuffersDataOutput; +import org.apache.lucene.tests.util.LineFileDocs; +import org.apache.lucene.tests.util.TestUtil; +import org.apache.lucene.util.BytesRef; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Random; + +/** + * Test cases for compressors (based on {@See org.opensearch.common.compress.DeflateCompressTests}). + */ +public abstract class AbstractCompressorTests extends OpenSearchTestCase { + + abstract Compressor compressor(); + + abstract Decompressor decompressor(); + + public void testEmpty() throws IOException { + final byte[] bytes = "".getBytes(StandardCharsets.UTF_8); + doTest(bytes); + } + + public void testShortLiterals() throws IOException { + final byte[] bytes = "1234567345673456745608910123".getBytes(StandardCharsets.UTF_8); + doTest(bytes); + } + + public void testRandom() throws IOException { + Random r = random(); + for (int i = 0; i < 10; i++) { + final byte[] bytes = new byte[TestUtil.nextInt(r, 1, 100000)]; + r.nextBytes(bytes); + doTest(bytes); + } + } + + public void testLineDocs() throws IOException { + Random r = random(); + LineFileDocs lineFileDocs = new LineFileDocs(r); + for (int i = 0; i < 10; i++) { + int numDocs = TestUtil.nextInt(r, 1, 200); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + for (int j = 0; j < numDocs; j++) { + String s = lineFileDocs.nextDoc().get("body"); + bos.write(s.getBytes(StandardCharsets.UTF_8)); + } + doTest(bos.toByteArray()); + } + lineFileDocs.close(); + } + + public void testRepetitionsL() throws IOException { + Random r = random(); + for (int i = 0; i < 10; i++) { + int numLongs = TestUtil.nextInt(r, 1, 10000); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + long theValue = r.nextLong(); + for (int j = 0; j < numLongs; j++) { + if (r.nextInt(10) == 0) { + theValue = r.nextLong(); + } + bos.write((byte) (theValue >>> 56)); + bos.write((byte) (theValue >>> 48)); + bos.write((byte) (theValue >>> 40)); + bos.write((byte) (theValue >>> 32)); + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + doTest(bos.toByteArray()); + } + } + + public void testRepetitionsI() throws IOException { + Random r = random(); + for (int i = 0; i < 10; i++) { + int numInts = TestUtil.nextInt(r, 1, 20000); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int theValue = r.nextInt(); + for (int j = 0; j < numInts; j++) { + if (r.nextInt(10) == 0) { + theValue = r.nextInt(); + } + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + doTest(bos.toByteArray()); + } + } + + public void testRepetitionsS() throws IOException { + Random r = random(); + for (int i = 0; i < 10; i++) { + int numShorts = TestUtil.nextInt(r, 1, 40000); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + short theValue = (short) r.nextInt(65535); + for (int j = 0; j < numShorts; j++) { + if (r.nextInt(10) == 0) { + theValue = (short) r.nextInt(65535); + } + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + doTest(bos.toByteArray()); + } + } + + public void testMixed() throws IOException { + Random r = random(); + LineFileDocs lineFileDocs = new LineFileDocs(r); + for (int i = 0; i < 2; ++i) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int prevInt = r.nextInt(); + long prevLong = r.nextLong(); + while (bos.size() < 400000) { + switch (r.nextInt(4)) { + case 0: + addInt(r, prevInt, bos); + break; + case 1: + addLong(r, prevLong, bos); + break; + case 2: + addString(lineFileDocs, bos); + break; + case 3: + addBytes(r, bos); + break; + default: + throw new IllegalStateException("Random is broken"); + } + } + doTest(bos.toByteArray()); + } + } + + private void addLong(Random r, long prev, ByteArrayOutputStream bos) { + long theValue = prev; + if (r.nextInt(10) != 0) { + theValue = r.nextLong(); + } + bos.write((byte) (theValue >>> 56)); + bos.write((byte) (theValue >>> 48)); + bos.write((byte) (theValue >>> 40)); + bos.write((byte) (theValue >>> 32)); + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + + private void addInt(Random r, int prev, ByteArrayOutputStream bos) { + int theValue = prev; + if (r.nextInt(10) != 0) { + theValue = r.nextInt(); + } + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + + private void addString(LineFileDocs lineFileDocs, ByteArrayOutputStream bos) throws IOException { + String s = lineFileDocs.nextDoc().get("body"); + bos.write(s.getBytes(StandardCharsets.UTF_8)); + } + + private void addBytes(Random r, ByteArrayOutputStream bos) throws IOException { + byte bytes[] = new byte[TestUtil.nextInt(r, 1, 10000)]; + r.nextBytes(bytes); + bos.write(bytes); + } + + private void doTest(byte[] bytes) throws IOException { + final int length = bytes.length; + + ByteBuffersDataInput in = new ByteBuffersDataInput(List.of(ByteBuffer.wrap(bytes))); + ByteBuffersDataOutput out = new ByteBuffersDataOutput(); + + // let's compress + Compressor compressor = compressor(); + compressor.compress(in, out); + byte[] compressed = out.toArrayCopy(); + + // let's decompress + BytesRef outbytes = new BytesRef(); + Decompressor decompressor = decompressor(); + decompressor.decompress(new ByteArrayDataInput(compressed), length, 0, length, outbytes); + + // get the uncompressed array out of outbytes + byte[] restored = new byte[outbytes.length]; + System.arraycopy(outbytes.bytes, 0, restored, 0, outbytes.length); + + assertArrayEquals(bytes, restored); + } + +} diff --git a/server/src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java b/server/src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java new file mode 100644 index 0000000000000..78cf62c08f889 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; + +/** + * Test ZSTD compression (with dictionary enabled) + */ +public class ZstdCompressorTests extends AbstractCompressorTests { + + private final Compressor compressor = new ZstdCompressionMode().newCompressor(); + private final Decompressor decompressor = new ZstdCompressionMode().newDecompressor(); + + @Override + Compressor compressor() { + return compressor; + } + + @Override + Decompressor decompressor() { + return decompressor; + } +} diff --git a/server/src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java b/server/src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java new file mode 100644 index 0000000000000..2eda81a6af2ab --- /dev/null +++ b/server/src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; + +/** + * Test ZSTD compression (with no dictionary). + */ +public class ZstdNoDictCompressorTests extends AbstractCompressorTests { + + private final Compressor compressor = new ZstdNoDictCompressionMode().newCompressor(); + private final Decompressor decompressor = new ZstdNoDictCompressionMode().newDecompressor(); + + @Override + Compressor compressor() { + return compressor; + } + + @Override + Decompressor decompressor() { + return decompressor; + } +} From d02fba252d9571ff81c755a1f0f0e7fb9cb75bd3 Mon Sep 17 00:00:00 2001 From: Sarthak Aggarwal Date: Fri, 2 Jun 2023 17:30:21 +0530 Subject: [PATCH 02/11] Removing zstd module from sandbox Signed-off-by: Sarthak Aggarwal --- sandbox/plugins/custom-codecs/build.gradle | 28 --- .../codec/customcodecs/CustomCodecPlugin.java | 26 --- .../customcodecs/Lucene95CustomCodec.java | 59 ----- .../Lucene95CustomStoredFieldsFormat.java | 120 ---------- .../index/codec/customcodecs/ZstdCodec.java | 37 --- .../customcodecs/ZstdCompressionMode.java | 205 ---------------- .../codec/customcodecs/ZstdNoDictCodec.java | 37 --- .../ZstdNoDictCompressionMode.java | 180 -------------- .../codec/customcodecs/package-info.java | 12 - .../plugin-metadata/plugin-security.policy | 11 - .../services/org.apache.lucene.codecs.Codec | 2 - .../customcodecs/AbstractCompressorTests.java | 219 ------------------ .../customcodecs/ZstdCompressorTests.java | 30 --- .../ZstdNoDictCompressorTests.java | 30 --- 14 files changed, 996 deletions(-) delete mode 100644 sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java delete mode 100644 sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java delete mode 100644 sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java delete mode 100644 sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCodec.java delete mode 100644 sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java delete mode 100644 sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCodec.java delete mode 100644 sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java delete mode 100644 sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java delete mode 100644 sandbox/plugins/custom-codecs/src/main/plugin-metadata/plugin-security.policy delete mode 100644 sandbox/plugins/custom-codecs/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec delete mode 100644 sandbox/plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java delete mode 100644 sandbox/plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java delete mode 100644 sandbox/plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java diff --git a/sandbox/plugins/custom-codecs/build.gradle b/sandbox/plugins/custom-codecs/build.gradle index 2183df25044a4..e69de29bb2d1d 100644 --- a/sandbox/plugins/custom-codecs/build.gradle +++ b/sandbox/plugins/custom-codecs/build.gradle @@ -1,28 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - * - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -apply plugin: 'opensearch.opensearchplugin' -apply plugin: 'opensearch.yaml-rest-test' - -opensearchplugin { - name 'custom-codecs' - description 'A plugin that implements custom compression codecs.' - classname 'org.opensearch.index.codec.customcodecs.CustomCodecPlugin' - licenseFile rootProject.file('licenses/APACHE-LICENSE-2.0.txt') - noticeFile rootProject.file('NOTICE.txt') -} - -dependencies { - api "com.github.luben:zstd-jni:${versions.zstd}" -} - -yamlRestTest.enabled = false; -testingConventions.enabled = false; diff --git a/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java b/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java deleted file mode 100644 index 9d36184bf81af..0000000000000 --- a/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.codec.customcodecs; - -import org.opensearch.plugins.Plugin; -import org.opensearch.plugins.EnginePlugin; - -/** - * A plugin that implements custom codecs. Supports these codecs: - *
    - *
  • zstd - *
  • zstdnodict - *
- * - * @opensearch.internal - */ -public final class CustomCodecPlugin extends Plugin implements EnginePlugin { - /** Creates a new instance. */ - public CustomCodecPlugin() {} -} diff --git a/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java b/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java deleted file mode 100644 index ad9e5cd3374fa..0000000000000 --- a/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.codec.customcodecs; - -import org.apache.lucene.codecs.StoredFieldsFormat; -import org.apache.lucene.codecs.FilterCodec; -import org.apache.lucene.codecs.lucene95.Lucene95Codec; - -import java.util.Locale; - -abstract class Lucene95CustomCodec extends FilterCodec { - public static final int DEFAULT_COMPRESSION_LEVEL = 6; - - /** Each mode represents a compression algorithm. */ - public enum Mode { - ZSTD, - ZSTDNODICT - } - - private final StoredFieldsFormat storedFieldsFormat; - - /** - * Creates a new compression codec with the default compression level. - * - * @param mode The compression codec (ZSTD or ZSTDNODICT). - */ - public Lucene95CustomCodec(Mode mode) { - this(mode, DEFAULT_COMPRESSION_LEVEL); - } - - /** - * Creates a new compression codec with the given compression level. We use - * lowercase letters when registering the codec so that we remain consistent with - * the other compression codecs: default, lucene_default, and best_compression. - * - * @param mode The compression codec (ZSTD or ZSTDNODICT). - * @parama compressionLevel The compression level. - */ - public Lucene95CustomCodec(Mode mode, int compressionLevel) { - super(mode.name().toLowerCase(Locale.ROOT), new Lucene95Codec()); - this.storedFieldsFormat = new Lucene95CustomStoredFieldsFormat(mode, compressionLevel); - } - - @Override - public StoredFieldsFormat storedFieldsFormat() { - return storedFieldsFormat; - } - - @Override - public String toString() { - return getClass().getSimpleName(); - } -} diff --git a/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java b/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java deleted file mode 100644 index 2bfec2ef171d4..0000000000000 --- a/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.codec.customcodecs; - -import java.io.IOException; -import java.util.Objects; -import org.apache.lucene.codecs.StoredFieldsFormat; -import org.apache.lucene.codecs.StoredFieldsReader; -import org.apache.lucene.codecs.StoredFieldsWriter; -import org.apache.lucene.codecs.compressing.CompressionMode; -import org.apache.lucene.codecs.lucene90.compressing.Lucene90CompressingStoredFieldsFormat; -import org.apache.lucene.index.FieldInfos; -import org.apache.lucene.index.SegmentInfo; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.IOContext; - -/** Stored field format used by pluggable codec */ -public class Lucene95CustomStoredFieldsFormat extends StoredFieldsFormat { - - /** A key that we use to map to a mode */ - public static final String MODE_KEY = Lucene95CustomStoredFieldsFormat.class.getSimpleName() + ".mode"; - - private static final int ZSTD_BLOCK_LENGTH = 10 * 48 * 1024; - private static final int ZSTD_MAX_DOCS_PER_BLOCK = 4096; - private static final int ZSTD_BLOCK_SHIFT = 10; - - private final CompressionMode zstdCompressionMode; - private final CompressionMode zstdNoDictCompressionMode; - - private final Lucene95CustomCodec.Mode mode; - - /** default constructor */ - public Lucene95CustomStoredFieldsFormat() { - this(Lucene95CustomCodec.Mode.ZSTD, Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL); - } - - /** - * Creates a new instance. - * - * @param mode The mode represents ZSTD or ZSTDNODICT - */ - public Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode mode) { - this(mode, Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL); - } - - /** - * Creates a new instance with the specified mode and compression level. - * - * @param mode The mode represents ZSTD or ZSTDNODICT - * @param compressionLevel The compression level for the mode. - */ - public Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode mode, int compressionLevel) { - this.mode = Objects.requireNonNull(mode); - zstdCompressionMode = new ZstdCompressionMode(compressionLevel); - zstdNoDictCompressionMode = new ZstdNoDictCompressionMode(compressionLevel); - } - - /** - * Returns a {@link StoredFieldsReader} to load stored fields. - * @param directory The index directory. - * @param si The SegmentInfo that stores segment information. - * @param fn The fieldInfos. - * @param context The IOContext that holds additional details on the merge/search context. - */ - @Override - public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo si, FieldInfos fn, IOContext context) throws IOException { - String value = si.getAttribute(MODE_KEY); - if (value == null) { - throw new IllegalStateException("missing value for " + MODE_KEY + " for segment: " + si.name); - } - Lucene95CustomCodec.Mode mode = Lucene95CustomCodec.Mode.valueOf(value); - return impl(mode).fieldsReader(directory, si, fn, context); - } - - /** - * Returns a {@link StoredFieldsReader} to write stored fields. - * @param directory The index directory. - * @param si The SegmentInfo that stores segment information. - * @param context The IOContext that holds additional details on the merge/search context. - */ - @Override - public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo si, IOContext context) throws IOException { - String previous = si.putAttribute(MODE_KEY, mode.name()); - if (previous != null && previous.equals(mode.name()) == false) { - throw new IllegalStateException( - "found existing value for " + MODE_KEY + " for segment: " + si.name + " old = " + previous + ", new = " + mode.name() - ); - } - return impl(mode).fieldsWriter(directory, si, context); - } - - private StoredFieldsFormat impl(Lucene95CustomCodec.Mode mode) { - switch (mode) { - case ZSTD: - return new Lucene90CompressingStoredFieldsFormat( - "CustomStoredFieldsZstd", - zstdCompressionMode, - ZSTD_BLOCK_LENGTH, - ZSTD_MAX_DOCS_PER_BLOCK, - ZSTD_BLOCK_SHIFT - ); - case ZSTDNODICT: - return new Lucene90CompressingStoredFieldsFormat( - "CustomStoredFieldsZstdNoDict", - zstdNoDictCompressionMode, - ZSTD_BLOCK_LENGTH, - ZSTD_MAX_DOCS_PER_BLOCK, - ZSTD_BLOCK_SHIFT - ); - default: - throw new AssertionError(); - } - } -} diff --git a/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCodec.java b/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCodec.java deleted file mode 100644 index 2b09540d8037d..0000000000000 --- a/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCodec.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.codec.customcodecs; - -/** - * ZstdCodec provides ZSTD compressor using the zstd-jni library. - */ -public class ZstdCodec extends Lucene95CustomCodec { - - /** - * Creates a new ZstdCodec instance with the default compression level. - */ - public ZstdCodec() { - this(DEFAULT_COMPRESSION_LEVEL); - } - - /** - * Creates a new ZstdCodec instance. - * - * @param compressionLevel The compression level. - */ - public ZstdCodec(int compressionLevel) { - super(Mode.ZSTD, compressionLevel); - } - - /** The name for this codec. */ - @Override - public String toString() { - return getClass().getSimpleName(); - } -} diff --git a/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java b/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java deleted file mode 100644 index 5b8f1ffcc9569..0000000000000 --- a/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.codec.customcodecs; - -import com.github.luben.zstd.Zstd; -import com.github.luben.zstd.ZstdCompressCtx; -import com.github.luben.zstd.ZstdDecompressCtx; -import com.github.luben.zstd.ZstdDictCompress; -import com.github.luben.zstd.ZstdDictDecompress; -import java.io.IOException; -import org.apache.lucene.codecs.compressing.CompressionMode; -import org.apache.lucene.codecs.compressing.Compressor; -import org.apache.lucene.codecs.compressing.Decompressor; -import org.apache.lucene.store.DataInput; -import org.apache.lucene.store.DataOutput; -import org.apache.lucene.store.ByteBuffersDataInput; -import org.apache.lucene.util.ArrayUtil; -import org.apache.lucene.util.BytesRef; - -/** Zstandard Compression Mode */ -public class ZstdCompressionMode extends CompressionMode { - - private static final int NUM_SUB_BLOCKS = 10; - private static final int DICT_SIZE_FACTOR = 6; - private static final int DEFAULT_COMPRESSION_LEVEL = 6; - - private final int compressionLevel; - - /** default constructor */ - protected ZstdCompressionMode() { - this.compressionLevel = DEFAULT_COMPRESSION_LEVEL; - } - - /** - * Creates a new instance. - * - * @param compressionLevel The compression level to use. - */ - protected ZstdCompressionMode(int compressionLevel) { - this.compressionLevel = compressionLevel; - } - - /** Creates a new compressor instance.*/ - @Override - public Compressor newCompressor() { - return new ZstdCompressor(compressionLevel); - } - - /** Creates a new decompressor instance. */ - @Override - public Decompressor newDecompressor() { - return new ZstdDecompressor(); - } - - /** zstandard compressor */ - private static final class ZstdCompressor extends Compressor { - - private final int compressionLevel; - private byte[] compressedBuffer; - - /** compressor with a given compresion level */ - public ZstdCompressor(int compressionLevel) { - this.compressionLevel = compressionLevel; - compressedBuffer = BytesRef.EMPTY_BYTES; - } - - /*resuable compress function*/ - private void doCompress(byte[] bytes, int offset, int length, ZstdCompressCtx cctx, DataOutput out) throws IOException { - if (length == 0) { - out.writeVInt(0); - return; - } - final int maxCompressedLength = (int) Zstd.compressBound(length); - compressedBuffer = ArrayUtil.growNoCopy(compressedBuffer, maxCompressedLength); - - int compressedSize = cctx.compressByteArray(compressedBuffer, 0, compressedBuffer.length, bytes, offset, length); - - out.writeVInt(compressedSize); - out.writeBytes(compressedBuffer, compressedSize); - } - - private void compress(byte[] bytes, int offset, int length, DataOutput out) throws IOException { - assert offset >= 0 : "offset value must be greater than 0"; - - final int dictLength = length / (NUM_SUB_BLOCKS * DICT_SIZE_FACTOR); - final int blockLength = (length - dictLength + NUM_SUB_BLOCKS - 1) / NUM_SUB_BLOCKS; - out.writeVInt(dictLength); - out.writeVInt(blockLength); - - final int end = offset + length; - assert end >= 0 : "buffer read size must be greater than 0"; - - try (ZstdCompressCtx cctx = new ZstdCompressCtx()) { - cctx.setLevel(compressionLevel); - - // dictionary compression first - doCompress(bytes, offset, dictLength, cctx, out); - cctx.loadDict(new ZstdDictCompress(bytes, offset, dictLength, compressionLevel)); - - for (int start = offset + dictLength; start < end; start += blockLength) { - int l = Math.min(blockLength, end - start); - doCompress(bytes, start, l, cctx, out); - } - } - } - - @Override - public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException { - final int length = (int) buffersInput.size(); - byte[] bytes = new byte[length]; - buffersInput.readBytes(bytes, 0, length); - compress(bytes, 0, length, out); - } - - @Override - public void close() throws IOException {} - } - - /** zstandard decompressor */ - private static final class ZstdDecompressor extends Decompressor { - - private byte[] compressedBuffer; - - /** default decompressor */ - public ZstdDecompressor() { - compressedBuffer = BytesRef.EMPTY_BYTES; - } - - /*resuable decompress function*/ - private void doDecompress(DataInput in, ZstdDecompressCtx dctx, BytesRef bytes, int decompressedLen) throws IOException { - final int compressedLength = in.readVInt(); - if (compressedLength == 0) { - return; - } - - compressedBuffer = ArrayUtil.growNoCopy(compressedBuffer, compressedLength); - in.readBytes(compressedBuffer, 0, compressedLength); - - bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + decompressedLen); - int uncompressed = dctx.decompressByteArray(bytes.bytes, bytes.length, decompressedLen, compressedBuffer, 0, compressedLength); - - if (decompressedLen != uncompressed) { - throw new IllegalStateException(decompressedLen + " " + uncompressed); - } - bytes.length += uncompressed; - } - - @Override - public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException { - assert offset + length <= originalLength : "buffer read size must be within limit"; - - if (length == 0) { - bytes.length = 0; - return; - } - final int dictLength = in.readVInt(); - final int blockLength = in.readVInt(); - bytes.bytes = ArrayUtil.growNoCopy(bytes.bytes, dictLength); - bytes.offset = bytes.length = 0; - - try (ZstdDecompressCtx dctx = new ZstdDecompressCtx()) { - - // decompress dictionary first - doDecompress(in, dctx, bytes, dictLength); - - dctx.loadDict(new ZstdDictDecompress(bytes.bytes, 0, dictLength)); - - int offsetInBlock = dictLength; - int offsetInBytesRef = offset; - - // Skip unneeded blocks - while (offsetInBlock + blockLength < offset) { - final int compressedLength = in.readVInt(); - in.skipBytes(compressedLength); - offsetInBlock += blockLength; - offsetInBytesRef -= blockLength; - } - - // Read blocks that intersect with the interval we need - while (offsetInBlock < offset + length) { - bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + blockLength); - int l = Math.min(blockLength, originalLength - offsetInBlock); - doDecompress(in, dctx, bytes, l); - offsetInBlock += blockLength; - } - - bytes.offset = offsetInBytesRef; - bytes.length = length; - - assert bytes.isValid() : "decompression output is corrupted"; - } - } - - @Override - public Decompressor clone() { - return new ZstdDecompressor(); - } - } -} diff --git a/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCodec.java b/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCodec.java deleted file mode 100644 index 4ed6ba57545d0..0000000000000 --- a/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCodec.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.codec.customcodecs; - -/** - * ZstdNoDictCodec provides ZSTD compressor without a dictionary support. - */ -public class ZstdNoDictCodec extends Lucene95CustomCodec { - - /** - * Creates a new ZstdNoDictCodec instance with the default compression level. - */ - public ZstdNoDictCodec() { - this(DEFAULT_COMPRESSION_LEVEL); - } - - /** - * Creates a new ZstdNoDictCodec instance. - * - * @param compressionLevel The compression level. - */ - public ZstdNoDictCodec(int compressionLevel) { - super(Mode.ZSTDNODICT, compressionLevel); - } - - /** The name for this codec. */ - @Override - public String toString() { - return getClass().getSimpleName(); - } -} diff --git a/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java b/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java deleted file mode 100644 index 6cfd85b053190..0000000000000 --- a/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.codec.customcodecs; - -import com.github.luben.zstd.Zstd; -import java.io.IOException; -import org.apache.lucene.codecs.compressing.CompressionMode; -import org.apache.lucene.codecs.compressing.Compressor; -import org.apache.lucene.codecs.compressing.Decompressor; -import org.apache.lucene.store.DataInput; -import org.apache.lucene.store.DataOutput; -import org.apache.lucene.store.ByteBuffersDataInput; -import org.apache.lucene.util.ArrayUtil; -import org.apache.lucene.util.BytesRef; - -/** ZSTD Compression Mode (without a dictionary support). */ -public class ZstdNoDictCompressionMode extends CompressionMode { - - private static final int NUM_SUB_BLOCKS = 10; - private static final int DEFAULT_COMPRESSION_LEVEL = 6; - - private final int compressionLevel; - - /** default constructor */ - protected ZstdNoDictCompressionMode() { - this.compressionLevel = DEFAULT_COMPRESSION_LEVEL; - } - - /** - * Creates a new instance with the given compression level. - * - * @param compressionLevel The compression level. - */ - protected ZstdNoDictCompressionMode(int compressionLevel) { - this.compressionLevel = compressionLevel; - } - - /** Creates a new compressor instance.*/ - @Override - public Compressor newCompressor() { - return new ZstdCompressor(compressionLevel); - } - - /** Creates a new decompressor instance. */ - @Override - public Decompressor newDecompressor() { - return new ZstdDecompressor(); - } - - /** zstandard compressor */ - private static final class ZstdCompressor extends Compressor { - - private final int compressionLevel; - private byte[] compressedBuffer; - - /** compressor with a given compresion level */ - public ZstdCompressor(int compressionLevel) { - this.compressionLevel = compressionLevel; - compressedBuffer = BytesRef.EMPTY_BYTES; - } - - private void compress(byte[] bytes, int offset, int length, DataOutput out) throws IOException { - assert offset >= 0 : "offset value must be greater than 0"; - - int blockLength = (length + NUM_SUB_BLOCKS - 1) / NUM_SUB_BLOCKS; - out.writeVInt(blockLength); - - final int end = offset + length; - assert end >= 0 : "buffer read size must be greater than 0"; - - for (int start = offset; start < end; start += blockLength) { - int l = Math.min(blockLength, end - start); - - if (l == 0) { - out.writeVInt(0); - return; - } - - final int maxCompressedLength = (int) Zstd.compressBound(l); - compressedBuffer = ArrayUtil.growNoCopy(compressedBuffer, maxCompressedLength); - - int compressedSize = (int) Zstd.compressByteArray( - compressedBuffer, - 0, - compressedBuffer.length, - bytes, - start, - l, - compressionLevel - ); - - out.writeVInt(compressedSize); - out.writeBytes(compressedBuffer, compressedSize); - } - } - - @Override - public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException { - final int length = (int) buffersInput.size(); - byte[] bytes = new byte[length]; - buffersInput.readBytes(bytes, 0, length); - compress(bytes, 0, length, out); - } - - @Override - public void close() throws IOException {} - } - - /** zstandard decompressor */ - private static final class ZstdDecompressor extends Decompressor { - - private byte[] compressed; - - /** default decompressor */ - public ZstdDecompressor() { - compressed = BytesRef.EMPTY_BYTES; - } - - @Override - public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException { - assert offset + length <= originalLength : "buffer read size must be within limit"; - - if (length == 0) { - bytes.length = 0; - return; - } - - final int blockLength = in.readVInt(); - bytes.offset = bytes.length = 0; - int offsetInBlock = 0; - int offsetInBytesRef = offset; - - // Skip unneeded blocks - while (offsetInBlock + blockLength < offset) { - final int compressedLength = in.readVInt(); - in.skipBytes(compressedLength); - offsetInBlock += blockLength; - offsetInBytesRef -= blockLength; - } - - // Read blocks that intersect with the interval we need - while (offsetInBlock < offset + length) { - bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + blockLength); - final int compressedLength = in.readVInt(); - if (compressedLength == 0) { - return; - } - compressed = ArrayUtil.growNoCopy(compressed, compressedLength); - in.readBytes(compressed, 0, compressedLength); - - int l = Math.min(blockLength, originalLength - offsetInBlock); - bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + l); - - byte[] output = new byte[l]; - - final int uncompressed = (int) Zstd.decompressByteArray(output, 0, l, compressed, 0, compressedLength); - System.arraycopy(output, 0, bytes.bytes, bytes.length, uncompressed); - - bytes.length += uncompressed; - offsetInBlock += blockLength; - } - - bytes.offset = offsetInBytesRef; - bytes.length = length; - - assert bytes.isValid() : "decompression output is corrupted."; - } - - @Override - public Decompressor clone() { - return new ZstdDecompressor(); - } - } -} diff --git a/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java b/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java deleted file mode 100644 index e996873963b1b..0000000000000 --- a/sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java +++ /dev/null @@ -1,12 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/** - * A plugin that implements compression codecs with native implementation. - */ -package org.opensearch.index.codec.customcodecs; diff --git a/sandbox/plugins/custom-codecs/src/main/plugin-metadata/plugin-security.policy b/sandbox/plugins/custom-codecs/src/main/plugin-metadata/plugin-security.policy deleted file mode 100644 index 8161010cfa897..0000000000000 --- a/sandbox/plugins/custom-codecs/src/main/plugin-metadata/plugin-security.policy +++ /dev/null @@ -1,11 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -grant codeBase "${codebase.zstd-jni}" { - permission java.lang.RuntimePermission "loadLibrary.*"; -}; diff --git a/sandbox/plugins/custom-codecs/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec b/sandbox/plugins/custom-codecs/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec deleted file mode 100644 index 8b37d91cd8bc4..0000000000000 --- a/sandbox/plugins/custom-codecs/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec +++ /dev/null @@ -1,2 +0,0 @@ -org.opensearch.index.codec.customcodecs.ZstdCodec -org.opensearch.index.codec.customcodecs.ZstdNoDictCodec diff --git a/sandbox/plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java b/sandbox/plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java deleted file mode 100644 index fcfb06ca6b050..0000000000000 --- a/sandbox/plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.codec.customcodecs; - -import org.apache.lucene.tests.util.LineFileDocs; -import org.apache.lucene.tests.util.TestUtil; -import org.opensearch.test.OpenSearchTestCase; -import org.apache.lucene.codecs.compressing.Compressor; -import org.apache.lucene.codecs.compressing.Decompressor; -import org.apache.lucene.store.ByteArrayDataInput; -import org.apache.lucene.store.ByteBuffersDataInput; -import org.apache.lucene.store.ByteBuffersDataOutput; -import org.apache.lucene.util.BytesRef; - -import java.util.List; -import java.nio.ByteBuffer; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Random; - -/** - * Test cases for compressors (based on {@See org.opensearch.common.compress.DeflateCompressTests}). - */ -public abstract class AbstractCompressorTests extends OpenSearchTestCase { - - abstract Compressor compressor(); - - abstract Decompressor decompressor(); - - public void testEmpty() throws IOException { - final byte[] bytes = "".getBytes(StandardCharsets.UTF_8); - doTest(bytes); - } - - public void testShortLiterals() throws IOException { - final byte[] bytes = "1234567345673456745608910123".getBytes(StandardCharsets.UTF_8); - doTest(bytes); - } - - public void testRandom() throws IOException { - Random r = random(); - for (int i = 0; i < 10; i++) { - final byte[] bytes = new byte[TestUtil.nextInt(r, 1, 100000)]; - r.nextBytes(bytes); - doTest(bytes); - } - } - - public void testLineDocs() throws IOException { - Random r = random(); - LineFileDocs lineFileDocs = new LineFileDocs(r); - for (int i = 0; i < 10; i++) { - int numDocs = TestUtil.nextInt(r, 1, 200); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - for (int j = 0; j < numDocs; j++) { - String s = lineFileDocs.nextDoc().get("body"); - bos.write(s.getBytes(StandardCharsets.UTF_8)); - } - doTest(bos.toByteArray()); - } - lineFileDocs.close(); - } - - public void testRepetitionsL() throws IOException { - Random r = random(); - for (int i = 0; i < 10; i++) { - int numLongs = TestUtil.nextInt(r, 1, 10000); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - long theValue = r.nextLong(); - for (int j = 0; j < numLongs; j++) { - if (r.nextInt(10) == 0) { - theValue = r.nextLong(); - } - bos.write((byte) (theValue >>> 56)); - bos.write((byte) (theValue >>> 48)); - bos.write((byte) (theValue >>> 40)); - bos.write((byte) (theValue >>> 32)); - bos.write((byte) (theValue >>> 24)); - bos.write((byte) (theValue >>> 16)); - bos.write((byte) (theValue >>> 8)); - bos.write((byte) theValue); - } - doTest(bos.toByteArray()); - } - } - - public void testRepetitionsI() throws IOException { - Random r = random(); - for (int i = 0; i < 10; i++) { - int numInts = TestUtil.nextInt(r, 1, 20000); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - int theValue = r.nextInt(); - for (int j = 0; j < numInts; j++) { - if (r.nextInt(10) == 0) { - theValue = r.nextInt(); - } - bos.write((byte) (theValue >>> 24)); - bos.write((byte) (theValue >>> 16)); - bos.write((byte) (theValue >>> 8)); - bos.write((byte) theValue); - } - doTest(bos.toByteArray()); - } - } - - public void testRepetitionsS() throws IOException { - Random r = random(); - for (int i = 0; i < 10; i++) { - int numShorts = TestUtil.nextInt(r, 1, 40000); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - short theValue = (short) r.nextInt(65535); - for (int j = 0; j < numShorts; j++) { - if (r.nextInt(10) == 0) { - theValue = (short) r.nextInt(65535); - } - bos.write((byte) (theValue >>> 8)); - bos.write((byte) theValue); - } - doTest(bos.toByteArray()); - } - } - - public void testMixed() throws IOException { - Random r = random(); - LineFileDocs lineFileDocs = new LineFileDocs(r); - for (int i = 0; i < 2; ++i) { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - int prevInt = r.nextInt(); - long prevLong = r.nextLong(); - while (bos.size() < 400000) { - switch (r.nextInt(4)) { - case 0: - addInt(r, prevInt, bos); - break; - case 1: - addLong(r, prevLong, bos); - break; - case 2: - addString(lineFileDocs, bos); - break; - case 3: - addBytes(r, bos); - break; - default: - throw new IllegalStateException("Random is broken"); - } - } - doTest(bos.toByteArray()); - } - } - - private void addLong(Random r, long prev, ByteArrayOutputStream bos) { - long theValue = prev; - if (r.nextInt(10) != 0) { - theValue = r.nextLong(); - } - bos.write((byte) (theValue >>> 56)); - bos.write((byte) (theValue >>> 48)); - bos.write((byte) (theValue >>> 40)); - bos.write((byte) (theValue >>> 32)); - bos.write((byte) (theValue >>> 24)); - bos.write((byte) (theValue >>> 16)); - bos.write((byte) (theValue >>> 8)); - bos.write((byte) theValue); - } - - private void addInt(Random r, int prev, ByteArrayOutputStream bos) { - int theValue = prev; - if (r.nextInt(10) != 0) { - theValue = r.nextInt(); - } - bos.write((byte) (theValue >>> 24)); - bos.write((byte) (theValue >>> 16)); - bos.write((byte) (theValue >>> 8)); - bos.write((byte) theValue); - } - - private void addString(LineFileDocs lineFileDocs, ByteArrayOutputStream bos) throws IOException { - String s = lineFileDocs.nextDoc().get("body"); - bos.write(s.getBytes(StandardCharsets.UTF_8)); - } - - private void addBytes(Random r, ByteArrayOutputStream bos) throws IOException { - byte bytes[] = new byte[TestUtil.nextInt(r, 1, 10000)]; - r.nextBytes(bytes); - bos.write(bytes); - } - - private void doTest(byte[] bytes) throws IOException { - final int length = bytes.length; - - ByteBuffersDataInput in = new ByteBuffersDataInput(List.of(ByteBuffer.wrap(bytes))); - ByteBuffersDataOutput out = new ByteBuffersDataOutput(); - - // let's compress - Compressor compressor = compressor(); - compressor.compress(in, out); - byte[] compressed = out.toArrayCopy(); - - // let's decompress - BytesRef outbytes = new BytesRef(); - Decompressor decompressor = decompressor(); - decompressor.decompress(new ByteArrayDataInput(compressed), length, 0, length, outbytes); - - // get the uncompressed array out of outbytes - byte[] restored = new byte[outbytes.length]; - System.arraycopy(outbytes.bytes, 0, restored, 0, outbytes.length); - - assertArrayEquals(bytes, restored); - } - -} diff --git a/sandbox/plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java b/sandbox/plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java deleted file mode 100644 index 78cf62c08f889..0000000000000 --- a/sandbox/plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ -package org.opensearch.index.codec.customcodecs; - -import org.apache.lucene.codecs.compressing.Compressor; -import org.apache.lucene.codecs.compressing.Decompressor; - -/** - * Test ZSTD compression (with dictionary enabled) - */ -public class ZstdCompressorTests extends AbstractCompressorTests { - - private final Compressor compressor = new ZstdCompressionMode().newCompressor(); - private final Decompressor decompressor = new ZstdCompressionMode().newDecompressor(); - - @Override - Compressor compressor() { - return compressor; - } - - @Override - Decompressor decompressor() { - return decompressor; - } -} diff --git a/sandbox/plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java b/sandbox/plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java deleted file mode 100644 index 2eda81a6af2ab..0000000000000 --- a/sandbox/plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ -package org.opensearch.index.codec.customcodecs; - -import org.apache.lucene.codecs.compressing.Compressor; -import org.apache.lucene.codecs.compressing.Decompressor; - -/** - * Test ZSTD compression (with no dictionary). - */ -public class ZstdNoDictCompressorTests extends AbstractCompressorTests { - - private final Compressor compressor = new ZstdNoDictCompressionMode().newCompressor(); - private final Decompressor decompressor = new ZstdNoDictCompressionMode().newDecompressor(); - - @Override - Compressor compressor() { - return compressor; - } - - @Override - Decompressor decompressor() { - return decompressor; - } -} From b025249afd7da1cab1b2221c251cc142b133a68b Mon Sep 17 00:00:00 2001 From: Sarthak Aggarwal Date: Sun, 4 Jun 2023 21:49:32 +0530 Subject: [PATCH 03/11] Added tests and refactoring Signed-off-by: Sarthak Aggarwal --- sandbox/plugins/custom-codecs/build.gradle | 0 server/licenses/zstd-jni-1.5.5-1.jar.sha1 | 1 - .../customcodecs/Lucene95CustomCodec.java | 2 +- .../Lucene95CustomStoredFieldsFormat.java | 6 +- .../opensearch/index/codec/CodecTests.java | 81 +++++++++++++++---- ...Lucene95CustomStoredFieldsFormatTests.java | 25 ++++++ 6 files changed, 97 insertions(+), 18 deletions(-) delete mode 100644 sandbox/plugins/custom-codecs/build.gradle delete mode 100644 server/licenses/zstd-jni-1.5.5-1.jar.sha1 create mode 100644 server/src/test/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormatTests.java diff --git a/sandbox/plugins/custom-codecs/build.gradle b/sandbox/plugins/custom-codecs/build.gradle deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/server/licenses/zstd-jni-1.5.5-1.jar.sha1 b/server/licenses/zstd-jni-1.5.5-1.jar.sha1 deleted file mode 100644 index bfb9e565bc6d5..0000000000000 --- a/server/licenses/zstd-jni-1.5.5-1.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -fda1d6278299af27484e1cc3c79a060e41b7ef7e \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java b/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java index 1011314f72a4e..89a87e7c22b0f 100644 --- a/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java +++ b/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java @@ -15,7 +15,7 @@ import org.opensearch.index.codec.PerFieldMappingPostingFormatCodec; import org.opensearch.index.mapper.MapperService; -abstract class Lucene95CustomCodec extends FilterCodec { +public abstract class Lucene95CustomCodec extends FilterCodec { public static final int DEFAULT_COMPRESSION_LEVEL = 6; /** Each mode represents a compression algorithm. */ diff --git a/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java b/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java index 12b5febef8d34..f70306afc8562 100644 --- a/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java +++ b/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java @@ -96,7 +96,7 @@ public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo si, IOCo return impl(mode).fieldsWriter(directory, si, context); } - private StoredFieldsFormat impl(Lucene95CustomCodec.Mode mode) { + StoredFieldsFormat impl(Lucene95CustomCodec.Mode mode) { switch (mode) { case ZSTD: return new Lucene90CompressingStoredFieldsFormat( @@ -118,4 +118,8 @@ private StoredFieldsFormat impl(Lucene95CustomCodec.Mode mode) { throw new AssertionError(); } } + + Lucene95CustomCodec.Mode getMode() { + return mode; + } } diff --git a/server/src/test/java/org/opensearch/index/codec/CodecTests.java b/server/src/test/java/org/opensearch/index/codec/CodecTests.java index bc50525412954..f844212d7f28c 100644 --- a/server/src/test/java/org/opensearch/index/codec/CodecTests.java +++ b/server/src/test/java/org/opensearch/index/codec/CodecTests.java @@ -47,6 +47,8 @@ import org.opensearch.env.Environment; import org.opensearch.index.IndexSettings; import org.opensearch.index.analysis.IndexAnalyzers; +import org.opensearch.index.codec.customcodecs.Lucene95CustomCodec; +import org.opensearch.index.codec.customcodecs.Lucene95CustomStoredFieldsFormat; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.similarity.SimilarityService; import org.opensearch.indices.mapper.MapperRegistry; @@ -63,40 +65,75 @@ public class CodecTests extends OpenSearchTestCase { public void testResolveDefaultCodecs() throws Exception { - CodecService codecService = createCodecService(); + CodecService codecService = createCodecService(false); assertThat(codecService.codec("default"), instanceOf(PerFieldMappingPostingFormatCodec.class)); assertThat(codecService.codec("default"), instanceOf(Lucene95Codec.class)); } public void testDefault() throws Exception { - Codec codec = createCodecService().codec("default"); + Codec codec = createCodecService(false).codec("default"); assertStoredFieldsCompressionEquals(Lucene95Codec.Mode.BEST_SPEED, codec); } public void testBestCompression() throws Exception { - Codec codec = createCodecService().codec("best_compression"); + Codec codec = createCodecService(false).codec("best_compression"); assertStoredFieldsCompressionEquals(Lucene95Codec.Mode.BEST_COMPRESSION, codec); } + public void testZstd() throws Exception { + Codec codec = createCodecService(false).codec("zstd"); + assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD, codec); + } + + public void testZstdNoDict() throws Exception { + Codec codec = createCodecService(false).codec("zstd_no_dict"); + assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD_NO_DICT, codec); + } + + public void testDefaultMapperServiceNull() throws Exception { + Codec codec = createCodecService(true).codec("default"); + assertStoredFieldsCompressionEquals(Lucene95Codec.Mode.BEST_SPEED, codec); + } + + public void testBestCompressionMapperServiceNull() throws Exception { + Codec codec = createCodecService(true).codec("best_compression"); + assertStoredFieldsCompressionEquals(Lucene95Codec.Mode.BEST_COMPRESSION, codec); + } + + public void testZstdMapperServiceNull() throws Exception { + Codec codec = createCodecService(true).codec("zstd"); + assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD, codec); + } + + public void testZstdNoDictMapperServiceNull() throws Exception { + Codec codec = createCodecService(true).codec("zstd_no_dict"); + assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD_NO_DICT, codec); + } + + public void testExceptionCodecNull() { + assertThrows(IllegalArgumentException.class, () -> createCodecService(true).codec(null)); + } + // write some docs with it, inspect .si to see this was the used compression private void assertStoredFieldsCompressionEquals(Lucene95Codec.Mode expected, Codec actual) throws Exception { - Directory dir = newDirectory(); - IndexWriterConfig iwc = newIndexWriterConfig(null); - iwc.setCodec(actual); - IndexWriter iw = new IndexWriter(dir, iwc); - iw.addDocument(new Document()); - iw.commit(); - iw.close(); - DirectoryReader ir = DirectoryReader.open(dir); - SegmentReader sr = (SegmentReader) ir.leaves().get(0).reader(); + SegmentReader sr = getSegmentReader(actual); String v = sr.getSegmentInfo().info.getAttribute(Lucene90StoredFieldsFormat.MODE_KEY); assertNotNull(v); assertEquals(expected, Lucene95Codec.Mode.valueOf(v)); - ir.close(); - dir.close(); } - private CodecService createCodecService() throws IOException { + private void assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode expected, Codec actual) throws Exception { + SegmentReader sr = getSegmentReader(actual); + String v = sr.getSegmentInfo().info.getAttribute(Lucene95CustomStoredFieldsFormat.MODE_KEY); + assertNotNull(v); + assertEquals(expected, Lucene95CustomCodec.Mode.valueOf(v)); + } + + private CodecService createCodecService(boolean isMapperServiceNull) throws IOException { + + if (isMapperServiceNull) { + return new CodecService(null, LogManager.getLogger("test")); + } Settings nodeSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build(); IndexSettings settings = IndexSettingsModule.newIndexSettings("_na", nodeSettings); SimilarityService similarityService = new SimilarityService(settings, null, Collections.emptyMap()); @@ -115,4 +152,18 @@ private CodecService createCodecService() throws IOException { return new CodecService(service, LogManager.getLogger("test")); } + private SegmentReader getSegmentReader(Codec actual) throws IOException { + Directory dir = newDirectory(); + IndexWriterConfig iwc = newIndexWriterConfig(null); + iwc.setCodec(actual); + IndexWriter iw = new IndexWriter(dir, iwc); + iw.addDocument(new Document()); + iw.commit(); + iw.close(); + DirectoryReader ir = DirectoryReader.open(dir); + SegmentReader sr = (SegmentReader) ir.leaves().get(0).reader(); + ir.close(); + dir.close(); + return sr; + } } diff --git a/server/src/test/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormatTests.java b/server/src/test/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormatTests.java new file mode 100644 index 0000000000000..1f8118cf9444d --- /dev/null +++ b/server/src/test/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormatTests.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.opensearch.test.OpenSearchTestCase; + +public class Lucene95CustomStoredFieldsFormatTests extends OpenSearchTestCase { + + public void testDefaultLucene95CustomCodecMode() { + Lucene95CustomStoredFieldsFormat lucene95CustomStoredFieldsFormat = new Lucene95CustomStoredFieldsFormat(); + assertEquals(Lucene95CustomCodec.Mode.ZSTD, lucene95CustomStoredFieldsFormat.getMode()); + } + + public void testZstdNoDictLucene95CustomCodecMode() { + Lucene95CustomStoredFieldsFormat lucene95CustomStoredFieldsFormat = new Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode.ZSTD_NO_DICT); + assertEquals(Lucene95CustomCodec.Mode.ZSTD_NO_DICT, lucene95CustomStoredFieldsFormat.getMode()); + } + +} From a9a74c1c565b3c08d807066500a62c7b445d3819 Mon Sep 17 00:00:00 2001 From: Sarthak Aggarwal Date: Mon, 5 Jun 2023 12:51:07 +0530 Subject: [PATCH 04/11] Fixing gradle issues Signed-off-by: Sarthak Aggarwal --- .../main/java/org/opensearch/index/codec/CodecService.java | 1 + .../customcodecs/Lucene95CustomStoredFieldsFormatTests.java | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/codec/CodecService.java b/server/src/main/java/org/opensearch/index/codec/CodecService.java index fffcd5ee60b02..b6dac7bd1596c 100644 --- a/server/src/main/java/org/opensearch/index/codec/CodecService.java +++ b/server/src/main/java/org/opensearch/index/codec/CodecService.java @@ -62,6 +62,7 @@ public class CodecService { public static final String LUCENE_DEFAULT_CODEC = "lucene_default"; public static final String ZSTD_CODEC = "zstd"; public static final String ZSTD_NO_DICT_CODEC = "zstd_no_dict"; + public CodecService(@Nullable MapperService mapperService, Logger logger) { final MapBuilder codecs = MapBuilder.newMapBuilder(); if (mapperService == null) { diff --git a/server/src/test/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormatTests.java b/server/src/test/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormatTests.java index 1f8118cf9444d..4f23450ce0b39 100644 --- a/server/src/test/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormatTests.java +++ b/server/src/test/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormatTests.java @@ -18,7 +18,9 @@ public void testDefaultLucene95CustomCodecMode() { } public void testZstdNoDictLucene95CustomCodecMode() { - Lucene95CustomStoredFieldsFormat lucene95CustomStoredFieldsFormat = new Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode.ZSTD_NO_DICT); + Lucene95CustomStoredFieldsFormat lucene95CustomStoredFieldsFormat = new Lucene95CustomStoredFieldsFormat( + Lucene95CustomCodec.Mode.ZSTD_NO_DICT + ); assertEquals(Lucene95CustomCodec.Mode.ZSTD_NO_DICT, lucene95CustomStoredFieldsFormat.getMode()); } From 25bf6220054113b0b82257a6c65e22f5e9178561 Mon Sep 17 00:00:00 2001 From: Sarthak Aggarwal Date: Mon, 5 Jun 2023 14:43:44 +0530 Subject: [PATCH 05/11] flaky test Signed-off-by: Sarthak Aggarwal From 0b80afa3e042b8bf05f12306969d3de2995d7893 Mon Sep 17 00:00:00 2001 From: Sarthak Aggarwal Date: Wed, 7 Jun 2023 11:57:03 +0530 Subject: [PATCH 06/11] fixing precommit failure Signed-off-by: Sarthak Aggarwal --- CHANGELOG.md | 1 + .../index/codec/customcodecs/Lucene95CustomCodec.java | 9 ++++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ffb14d6750c5..68db9cfa7b3d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -128,6 +128,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Compress and cache cluster state during validate join request ([#7321](https://github.com/opensearch-project/OpenSearch/pull/7321)) - [Snapshot Interop] Add Changes in Create Snapshot Flow for remote store interoperability. ([#7118](https://github.com/opensearch-project/OpenSearch/pull/7118)) - Allow insecure string settings to warn-log usage and advise to migration of a newer secure variant ([#5496](https://github.com/opensearch-project/OpenSearch/pull/5496)) +- Move ZSTD compression codecs out of the sandbox ([#7908](https://github.com/opensearch-project/OpenSearch/pull/7908)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java b/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java index 89a87e7c22b0f..2d464e7ef4283 100644 --- a/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java +++ b/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java @@ -15,6 +15,13 @@ import org.opensearch.index.codec.PerFieldMappingPostingFormatCodec; import org.opensearch.index.mapper.MapperService; +/** + * + * Extends {@link FilterCodec} to reuse the functionality of Lucene Codec. + * Supports two modes zstd and zstd_no_dict. + * + * @opensearch.internal + */ public abstract class Lucene95CustomCodec extends FilterCodec { public static final int DEFAULT_COMPRESSION_LEVEL = 6; @@ -41,7 +48,7 @@ public Lucene95CustomCodec(Mode mode) { * the other compression codecs: default, lucene_default, and best_compression. * * @param mode The compression codec (ZSTD or ZSTDNODICT). - * @parama compressionLevel The compression level. + * @param compressionLevel The compression level. */ public Lucene95CustomCodec(Mode mode, int compressionLevel) { super("Lucene95CustomCodec", new Lucene95Codec()); From 5ca78a4bb0eed25451ce1fe6b334851092062325 Mon Sep 17 00:00:00 2001 From: Sarthak Aggarwal Date: Mon, 12 Jun 2023 13:35:21 +0530 Subject: [PATCH 07/11] Incorporate review comments and fixed precommit failures Signed-off-by: Sarthak Aggarwal --- .../codec/PerFieldMappingPostingFormatCodec.java | 6 ------ .../codec/customcodecs/Lucene95CustomCodec.java | 2 +- .../index/codec/customcodecs/package-info.java | 12 ++++++++++++ .../org/opensearch/bootstrap/security.policy | 3 --- 4 files changed, 13 insertions(+), 10 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java diff --git a/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java b/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java index 3059dc89e85dc..f1b515534bdeb 100644 --- a/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java +++ b/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java @@ -69,12 +69,6 @@ public PerFieldMappingPostingFormatCodec(Mode compressionMode, MapperService map this.logger = logger; } - public PerFieldMappingPostingFormatCodec(MapperService mapperService, Logger logger) { - super(); - this.mapperService = mapperService; - this.logger = logger; - } - @Override public PostingsFormat getPostingsFormatForField(String field) { final MappedFieldType fieldType = mapperService.fieldType(field); diff --git a/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java b/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java index 2d464e7ef4283..3c570f9d0566c 100644 --- a/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java +++ b/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java @@ -56,7 +56,7 @@ public Lucene95CustomCodec(Mode mode, int compressionLevel) { } public Lucene95CustomCodec(Mode mode, int compressionLevel, MapperService mapperService, Logger logger) { - super("Lucene95CustomCodec", new PerFieldMappingPostingFormatCodec(mapperService, logger)); + super("Lucene95CustomCodec", new PerFieldMappingPostingFormatCodec(Lucene95Codec.Mode.BEST_SPEED, mapperService, logger)); this.storedFieldsFormat = new Lucene95CustomStoredFieldsFormat(mode, compressionLevel); } diff --git a/server/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java b/server/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java new file mode 100644 index 0000000000000..e996873963b1b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * A plugin that implements compression codecs with native implementation. + */ +package org.opensearch.index.codec.customcodecs; diff --git a/server/src/main/resources/org/opensearch/bootstrap/security.policy b/server/src/main/resources/org/opensearch/bootstrap/security.policy index ce552c123a1fa..77cd0ab05278e 100644 --- a/server/src/main/resources/org/opensearch/bootstrap/security.policy +++ b/server/src/main/resources/org/opensearch/bootstrap/security.policy @@ -88,9 +88,6 @@ grant codeBase "${codebase.zstd-jni}" { //// Everything else: grant { - - permission java.lang.RuntimePermission "loadLibrary.*"; - // needed by vendored Guice permission java.lang.RuntimePermission "accessClassInPackage.jdk.internal.vm.annotation"; From 6be559147424e1de8c16e3352e35e2adfc6f3dba Mon Sep 17 00:00:00 2001 From: Sarthak Aggarwal Date: Thu, 15 Jun 2023 22:11:04 +0530 Subject: [PATCH 08/11] Incorporating review comments Signed-off-by: Sarthak Aggarwal --- .../java/org/opensearch/index/codec/CodecTests.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/codec/CodecTests.java b/server/src/test/java/org/opensearch/index/codec/CodecTests.java index f844212d7f28c..099e8a8823c7d 100644 --- a/server/src/test/java/org/opensearch/index/codec/CodecTests.java +++ b/server/src/test/java/org/opensearch/index/codec/CodecTests.java @@ -115,15 +115,15 @@ public void testExceptionCodecNull() { } // write some docs with it, inspect .si to see this was the used compression - private void assertStoredFieldsCompressionEquals(Lucene95Codec.Mode expected, Codec actual) throws Exception { - SegmentReader sr = getSegmentReader(actual); + private void assertStoredFieldsCompressionEquals(Lucene95Codec.Mode expected, Codec codec) throws Exception { + SegmentReader sr = getSegmentReader(codec); String v = sr.getSegmentInfo().info.getAttribute(Lucene90StoredFieldsFormat.MODE_KEY); assertNotNull(v); assertEquals(expected, Lucene95Codec.Mode.valueOf(v)); } - private void assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode expected, Codec actual) throws Exception { - SegmentReader sr = getSegmentReader(actual); + private void assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode expected, Codec codec) throws Exception { + SegmentReader sr = getSegmentReader(codec); String v = sr.getSegmentInfo().info.getAttribute(Lucene95CustomStoredFieldsFormat.MODE_KEY); assertNotNull(v); assertEquals(expected, Lucene95CustomCodec.Mode.valueOf(v)); @@ -152,10 +152,10 @@ private CodecService createCodecService(boolean isMapperServiceNull) throws IOEx return new CodecService(service, LogManager.getLogger("test")); } - private SegmentReader getSegmentReader(Codec actual) throws IOException { + private SegmentReader getSegmentReader(Codec codec) throws IOException { Directory dir = newDirectory(); IndexWriterConfig iwc = newIndexWriterConfig(null); - iwc.setCodec(actual); + iwc.setCodec(codec); IndexWriter iw = new IndexWriter(dir, iwc); iw.addDocument(new Document()); iw.commit(); From 36e7450ad2a3486b095cbdc01c707733ccc24ea5 Mon Sep 17 00:00:00 2001 From: Sarthak Aggarwal Date: Fri, 16 Jun 2023 15:15:58 +0530 Subject: [PATCH 09/11] Incorporating review comments Signed-off-by: Sarthak Aggarwal --- .../test/java/org/opensearch/index/codec/CodecTests.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/codec/CodecTests.java b/server/src/test/java/org/opensearch/index/codec/CodecTests.java index 099e8a8823c7d..016e785f8da13 100644 --- a/server/src/test/java/org/opensearch/index/codec/CodecTests.java +++ b/server/src/test/java/org/opensearch/index/codec/CodecTests.java @@ -115,15 +115,15 @@ public void testExceptionCodecNull() { } // write some docs with it, inspect .si to see this was the used compression - private void assertStoredFieldsCompressionEquals(Lucene95Codec.Mode expected, Codec codec) throws Exception { - SegmentReader sr = getSegmentReader(codec); + private void assertStoredFieldsCompressionEquals(Lucene95Codec.Mode expected, Codec actual) throws Exception { + SegmentReader sr = getSegmentReader(actual); String v = sr.getSegmentInfo().info.getAttribute(Lucene90StoredFieldsFormat.MODE_KEY); assertNotNull(v); assertEquals(expected, Lucene95Codec.Mode.valueOf(v)); } - private void assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode expected, Codec codec) throws Exception { - SegmentReader sr = getSegmentReader(codec); + private void assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode expected, Codec actual) throws Exception { + SegmentReader sr = getSegmentReader(actual); String v = sr.getSegmentInfo().info.getAttribute(Lucene95CustomStoredFieldsFormat.MODE_KEY); assertNotNull(v); assertEquals(expected, Lucene95CustomCodec.Mode.valueOf(v)); From be7561b86a85c1d22e179424df7975180881137f Mon Sep 17 00:00:00 2001 From: Sarthak Aggarwal Date: Mon, 19 Jun 2023 12:45:33 +0530 Subject: [PATCH 10/11] Adding Integ tests Signed-off-by: Sarthak Aggarwal --- .../index/codec/ReindexCodecIT.java | 179 ++++++++++++++++++ .../org/opensearch/index/codec/CodecIT.java | 167 ++++++++++++++++ 2 files changed, 346 insertions(+) create mode 100644 modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/ReindexCodecIT.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/index/codec/CodecIT.java diff --git a/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/ReindexCodecIT.java b/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/ReindexCodecIT.java new file mode 100644 index 0000000000000..09fc5108133a3 --- /dev/null +++ b/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/ReindexCodecIT.java @@ -0,0 +1,179 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec; + +import org.opensearch.action.admin.indices.flush.FlushResponse; +import org.opensearch.action.admin.indices.refresh.RefreshResponse; +import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.opensearch.action.support.ActiveShardCount; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.engine.Segment; +import org.opensearch.index.reindex.BulkByScrollResponse; +import org.opensearch.index.reindex.ReindexAction; +import org.opensearch.index.reindex.ReindexRequestBuilder; +import org.opensearch.index.reindex.ReindexTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.stream.IntStream; + +import static java.util.stream.Collectors.toList; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_METADATA; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_READ; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_WRITE; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; + +public class ReindexCodecIT extends ReindexTestCase { + + public void testReindexingMultipleCodecs() throws InterruptedException, ExecutionException { + internalCluster().ensureAtLeastNumDataNodes(2); + Map codecMap = Map.of( + "best_compression", + "BEST_COMPRESSION", + "zstd_no_dict", + "ZSTD_NO_DICT", + "zstd", + "ZSTD", + "default", + "BEST_SPEED" + ); + + for (Map.Entry codec : codecMap.entrySet()) { + assertReindexingWithMultipleCodecs(codec.getKey(), codec.getValue(), codecMap); + } + + } + + private void assertReindexingWithMultipleCodecs(String destCodec, String destCodecMode, Map codecMap) + throws ExecutionException, InterruptedException { + + final String index = "test-index" + destCodec; + final String destIndex = "dest-index" + destCodec; + + // creating source index + createIndex( + index, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.codec", "default") + .build() + ); + ensureGreen(index); + + final int nbDocs = randomIntBetween(2, 5); + + // indexing with all 4 codecs + for (Map.Entry codec : codecMap.entrySet()) { + indexWithDifferentCodecs(index, codec.getKey(), codec.getValue(), nbDocs); + } + + // creating destination index with destination codec + createIndex( + destIndex, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.codec", destCodec) + .build() + ); + + BulkByScrollResponse bulkResponse = new ReindexRequestBuilder(client(), ReindexAction.INSTANCE).source(index) + .destination(destIndex) + .refresh(true) + .waitForActiveShards(ActiveShardCount.ONE) + .get(); + + assertEquals(4 * nbDocs, bulkResponse.getCreated()); + assertEquals(4 * nbDocs, bulkResponse.getTotal()); + assertEquals(0, bulkResponse.getDeleted()); + assertEquals(0, bulkResponse.getNoops()); + assertEquals(0, bulkResponse.getVersionConflicts()); + assertEquals(1, bulkResponse.getBatches()); + assertTrue(bulkResponse.getTook().getMillis() > 0); + assertEquals(0, bulkResponse.getBulkFailures().size()); + assertEquals(0, bulkResponse.getSearchFailures().size()); + assertTrue(getSegments(destIndex).stream().allMatch(segment -> segment.attributes.containsValue(destCodecMode))); + } + + private void flushAndRefreshIndex(String index) { + + // Request is not blocked + for (String blockSetting : Arrays.asList( + SETTING_BLOCKS_READ, + SETTING_BLOCKS_WRITE, + SETTING_READ_ONLY, + SETTING_BLOCKS_METADATA, + SETTING_READ_ONLY_ALLOW_DELETE + )) { + try { + enableIndexBlock(index, blockSetting); + // flush + FlushResponse flushResponse = client().admin().indices().prepareFlush(index).setForce(true).execute().actionGet(); + assertNoFailures(flushResponse); + + // refresh + RefreshResponse refreshResponse = client().admin().indices().prepareRefresh(index).execute().actionGet(); + assertNoFailures(refreshResponse); + } finally { + disableIndexBlock(index, blockSetting); + } + } + } + + private void indexWithDifferentCodecs(String index, String codec, String codecMode, int nbDocs) throws InterruptedException, + ExecutionException { + + assertAcked(client().admin().indices().prepareClose(index)); + + assertAcked( + client().admin() + .indices() + .updateSettings(new UpdateSettingsRequest(index).settings(Settings.builder().put("index.codec", codec))) + .get() + ); + + assertAcked(client().admin().indices().prepareOpen(index)); + + indexRandom( + randomBoolean(), + false, + randomBoolean(), + IntStream.range(0, nbDocs) + .mapToObj(i -> client().prepareIndex(index).setId(UUID.randomUUID().toString()).setSource("num", i)) + .collect(toList()) + ); + flushAndRefreshIndex(index); + assertTrue(getSegments(index).stream().anyMatch(segment -> segment.attributes.containsValue(codecMode))); + } + + private ArrayList getSegments(String index) { + + return new ArrayList<>( + client().admin() + .indices() + .segments(new IndicesSegmentsRequest(index)) + .actionGet() + .getIndices() + .get(index) + .getShards() + .get(0) + .getShards()[0].getSegments() + ); + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/index/codec/CodecIT.java b/server/src/internalClusterTest/java/org/opensearch/index/codec/CodecIT.java new file mode 100644 index 0000000000000..7024846822512 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/index/codec/CodecIT.java @@ -0,0 +1,167 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec; + +import org.opensearch.action.admin.indices.flush.FlushResponse; +import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse; +import org.opensearch.action.admin.indices.refresh.RefreshResponse; +import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.engine.Segment; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.util.stream.Collectors.toList; +import static org.hamcrest.Matchers.is; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_METADATA; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_READ; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_WRITE; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST) +public class CodecIT extends OpenSearchIntegTestCase { + + public void testForceMergeMultipleCodecs() throws ExecutionException, InterruptedException { + + Map codecMap = Map.of( + "best_compression", + "BEST_COMPRESSION", + "zstd_no_dict", + "ZSTD_NO_DICT", + "zstd", + "ZSTD", + "default", + "BEST_SPEED" + ); + + for (Map.Entry codec : codecMap.entrySet()) { + forceMergeMultipleCodecs(codec.getKey(), codec.getValue(), codecMap); + } + + } + + private void forceMergeMultipleCodecs(String finalCodec, String finalCodecMode, Map codecMap) throws ExecutionException, + InterruptedException { + + internalCluster().ensureAtLeastNumDataNodes(2); + final String index = "test-index" + finalCodec; + + // creating index + createIndex( + index, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put("index.codec", "default") + .build() + ); + ensureGreen(index); + + // ingesting and asserting segment codec mode for all four codecs + for (Map.Entry codec : codecMap.entrySet()) { + assertSegmentCodec(index, codec.getKey(), codec.getValue()); + } + + // force merge into final codec + assertSegmentCodec(index, finalCodec, finalCodecMode); + final ForceMergeResponse forceMergeResponse = client().admin().indices().prepareForceMerge(index).setMaxNumSegments(1).get(); + + assertThat(forceMergeResponse.getFailedShards(), is(0)); + assertThat(forceMergeResponse.getSuccessfulShards(), is(2)); + + flushAndRefreshIndex(index); + + List segments = getSegments(index).stream().filter(Segment::isSearch).collect(Collectors.toList()); + assertEquals(1, segments.size()); + assertTrue(segments.stream().findFirst().get().attributes.containsValue(finalCodecMode)); + } + + private void assertSegmentCodec(String index, String codec, String codecMode) throws InterruptedException, ExecutionException { + + assertAcked(client().admin().indices().prepareClose(index)); + + assertAcked( + client().admin() + .indices() + .updateSettings(new UpdateSettingsRequest(index).settings(Settings.builder().put("index.codec", codec))) + .get() + ); + + assertAcked(client().admin().indices().prepareOpen(index)); + + ingest(index); + flushAndRefreshIndex(index); + + assertTrue(getSegments(index).stream().anyMatch(segment -> segment.attributes.containsValue(codecMode))); + } + + private ArrayList getSegments(String index) { + + return new ArrayList<>( + client().admin() + .indices() + .segments(new IndicesSegmentsRequest(index)) + .actionGet() + .getIndices() + .get(index) + .getShards() + .get(0) + .getShards()[0].getSegments() + ); + } + + private void ingest(String index) throws InterruptedException { + + final int nbDocs = randomIntBetween(1, 5); + indexRandom( + randomBoolean(), + false, + randomBoolean(), + IntStream.range(0, nbDocs) + .mapToObj(i -> client().prepareIndex(index).setId(UUID.randomUUID().toString()).setSource("num", i)) + .collect(toList()) + ); + } + + private void flushAndRefreshIndex(String index) { + + // Request is not blocked + for (String blockSetting : Arrays.asList( + SETTING_BLOCKS_READ, + SETTING_BLOCKS_WRITE, + SETTING_READ_ONLY, + SETTING_BLOCKS_METADATA, + SETTING_READ_ONLY_ALLOW_DELETE + )) { + try { + enableIndexBlock(index, blockSetting); + FlushResponse flushResponse = client().admin().indices().prepareFlush(index).setForce(true).execute().actionGet(); + assertNoFailures(flushResponse); + RefreshResponse response = client().admin().indices().prepareRefresh(index).execute().actionGet(); + assertNoFailures(response); + } finally { + disableIndexBlock(index, blockSetting); + } + } + } + +} From 097efd8db36a25f04bb2451af3a0435801d5bcff Mon Sep 17 00:00:00 2001 From: Sarthak Aggarwal Date: Tue, 20 Jun 2023 21:47:52 +0530 Subject: [PATCH 11/11] Incorporating review comments Signed-off-by: Sarthak Aggarwal --- ...xCodecIT.java => MultiCodecReindexIT.java} | 48 +++++++++++-------- .../{CodecIT.java => MultiCodecMergeIT.java} | 30 +++++++----- 2 files changed, 48 insertions(+), 30 deletions(-) rename modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/{ReindexCodecIT.java => MultiCodecReindexIT.java} (87%) rename server/src/internalClusterTest/java/org/opensearch/index/codec/{CodecIT.java => MultiCodecMergeIT.java} (87%) diff --git a/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/ReindexCodecIT.java b/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecReindexIT.java similarity index 87% rename from modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/ReindexCodecIT.java rename to modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecReindexIT.java index 09fc5108133a3..87f3c68d8af76 100644 --- a/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/ReindexCodecIT.java +++ b/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecReindexIT.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static java.util.stream.Collectors.toList; @@ -37,10 +38,10 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; -public class ReindexCodecIT extends ReindexTestCase { +public class MultiCodecReindexIT extends ReindexTestCase { public void testReindexingMultipleCodecs() throws InterruptedException, ExecutionException { - internalCluster().ensureAtLeastNumDataNodes(2); + internalCluster().ensureAtLeastNumDataNodes(1); Map codecMap = Map.of( "best_compression", "BEST_COMPRESSION", @@ -71,6 +72,7 @@ private void assertReindexingWithMultipleCodecs(String destCodec, String destCod .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put("index.codec", "default") + .put("index.merge.policy.max_merged_segment", "1b") .build() ); ensureGreen(index); @@ -79,9 +81,17 @@ private void assertReindexingWithMultipleCodecs(String destCodec, String destCod // indexing with all 4 codecs for (Map.Entry codec : codecMap.entrySet()) { - indexWithDifferentCodecs(index, codec.getKey(), codec.getValue(), nbDocs); + useCodec(index, codec.getKey()); + ingestDocs(index, nbDocs); } + assertTrue( + getSegments(index).stream() + .flatMap(s -> s.getAttributes().values().stream()) + .collect(Collectors.toSet()) + .containsAll(codecMap.values()) + ); + // creating destination index with destination codec createIndex( destIndex, @@ -98,8 +108,8 @@ private void assertReindexingWithMultipleCodecs(String destCodec, String destCod .waitForActiveShards(ActiveShardCount.ONE) .get(); - assertEquals(4 * nbDocs, bulkResponse.getCreated()); - assertEquals(4 * nbDocs, bulkResponse.getTotal()); + assertEquals(codecMap.size() * nbDocs, bulkResponse.getCreated()); + assertEquals(codecMap.size() * nbDocs, bulkResponse.getTotal()); assertEquals(0, bulkResponse.getDeleted()); assertEquals(0, bulkResponse.getNoops()); assertEquals(0, bulkResponse.getVersionConflicts()); @@ -110,6 +120,19 @@ private void assertReindexingWithMultipleCodecs(String destCodec, String destCod assertTrue(getSegments(destIndex).stream().allMatch(segment -> segment.attributes.containsValue(destCodecMode))); } + private void useCodec(String index, String codec) throws ExecutionException, InterruptedException { + assertAcked(client().admin().indices().prepareClose(index)); + + assertAcked( + client().admin() + .indices() + .updateSettings(new UpdateSettingsRequest(index).settings(Settings.builder().put("index.codec", codec))) + .get() + ); + + assertAcked(client().admin().indices().prepareOpen(index)); + } + private void flushAndRefreshIndex(String index) { // Request is not blocked @@ -135,19 +158,7 @@ private void flushAndRefreshIndex(String index) { } } - private void indexWithDifferentCodecs(String index, String codec, String codecMode, int nbDocs) throws InterruptedException, - ExecutionException { - - assertAcked(client().admin().indices().prepareClose(index)); - - assertAcked( - client().admin() - .indices() - .updateSettings(new UpdateSettingsRequest(index).settings(Settings.builder().put("index.codec", codec))) - .get() - ); - - assertAcked(client().admin().indices().prepareOpen(index)); + private void ingestDocs(String index, int nbDocs) throws InterruptedException { indexRandom( randomBoolean(), @@ -158,7 +169,6 @@ private void indexWithDifferentCodecs(String index, String codec, String codecMo .collect(toList()) ); flushAndRefreshIndex(index); - assertTrue(getSegments(index).stream().anyMatch(segment -> segment.attributes.containsValue(codecMode))); } private ArrayList getSegments(String index) { diff --git a/server/src/internalClusterTest/java/org/opensearch/index/codec/CodecIT.java b/server/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecMergeIT.java similarity index 87% rename from server/src/internalClusterTest/java/org/opensearch/index/codec/CodecIT.java rename to server/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecMergeIT.java index 7024846822512..2866292e5e2e0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/codec/CodecIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecMergeIT.java @@ -38,7 +38,7 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST) -public class CodecIT extends OpenSearchIntegTestCase { +public class MultiCodecMergeIT extends OpenSearchIntegTestCase { public void testForceMergeMultipleCodecs() throws ExecutionException, InterruptedException { @@ -62,7 +62,7 @@ public void testForceMergeMultipleCodecs() throws ExecutionException, Interrupte private void forceMergeMultipleCodecs(String finalCodec, String finalCodecMode, Map codecMap) throws ExecutionException, InterruptedException { - internalCluster().ensureAtLeastNumDataNodes(2); + internalCluster().ensureAtLeastNumDataNodes(1); final String index = "test-index" + finalCodec; // creating index @@ -70,23 +70,32 @@ private void forceMergeMultipleCodecs(String finalCodec, String finalCodecMode, index, Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put("index.codec", "default") + .put("index.merge.policy.max_merged_segment", "1b") .build() ); ensureGreen(index); - // ingesting and asserting segment codec mode for all four codecs for (Map.Entry codec : codecMap.entrySet()) { - assertSegmentCodec(index, codec.getKey(), codec.getValue()); + useCodec(index, codec.getKey()); + ingestDocs(index); } + assertTrue( + getSegments(index).stream() + .flatMap(s -> s.getAttributes().values().stream()) + .collect(Collectors.toSet()) + .containsAll(codecMap.values()) + ); + // force merge into final codec - assertSegmentCodec(index, finalCodec, finalCodecMode); + useCodec(index, finalCodec); + flushAndRefreshIndex(index); final ForceMergeResponse forceMergeResponse = client().admin().indices().prepareForceMerge(index).setMaxNumSegments(1).get(); assertThat(forceMergeResponse.getFailedShards(), is(0)); - assertThat(forceMergeResponse.getSuccessfulShards(), is(2)); + assertThat(forceMergeResponse.getSuccessfulShards(), is(1)); flushAndRefreshIndex(index); @@ -95,8 +104,7 @@ private void forceMergeMultipleCodecs(String finalCodec, String finalCodecMode, assertTrue(segments.stream().findFirst().get().attributes.containsValue(finalCodecMode)); } - private void assertSegmentCodec(String index, String codec, String codecMode) throws InterruptedException, ExecutionException { - + private void useCodec(String index, String codec) throws ExecutionException, InterruptedException { assertAcked(client().admin().indices().prepareClose(index)); assertAcked( @@ -107,11 +115,11 @@ private void assertSegmentCodec(String index, String codec, String codecMode) th ); assertAcked(client().admin().indices().prepareOpen(index)); + } + private void ingestDocs(String index) throws InterruptedException { ingest(index); flushAndRefreshIndex(index); - - assertTrue(getSegments(index).stream().anyMatch(segment -> segment.attributes.containsValue(codecMode))); } private ArrayList getSegments(String index) {