Skip to content

Commit

Permalink
[Tiered Caching] Serializers for ehcache (opensearch-project#12709)
Browse files Browse the repository at this point in the history
Adds serializers and integrates them into ehcache disk cache

---------

Signed-off-by: Peter Alfonsi <petealft@amazon.com>
Co-authored-by: Peter Alfonsi <petealft@amazon.com>
  • Loading branch information
peteralfonsi and Peter Alfonsi committed Aug 30, 2024
1 parent a45be0f commit 0fc1f40
Show file tree
Hide file tree
Showing 11 changed files with 606 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.store.config.CacheConfig;

Expand Down Expand Up @@ -106,8 +107,11 @@ public MockDiskCacheFactory(long delay, int maxSize) {
}

@Override
@SuppressWarnings({ "unchecked" })
public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType, Map<String, Factory> cacheFactories) {
return new Builder<K, V>().setMaxSize(maxSize)
return new Builder<K, V>().setKeySerializer((Serializer<K, byte[]>) config.getKeySerializer())
.setValueSerializer((Serializer<V, byte[]>) config.getValueSerializer())
.setMaxSize(maxSize)
.setDeliberateDelay(delay)
.setRemovalListener(config.getRemovalListener())
.build();
Expand All @@ -123,6 +127,8 @@ public static class Builder<K, V> extends ICacheBuilder<K, V> {

int maxSize;
long delay;
Serializer<K, byte[]> keySerializer;
Serializer<V, byte[]> valueSerializer;

@Override
public ICache<K, V> build() {
Expand All @@ -138,5 +144,16 @@ public Builder<K, V> setDeliberateDelay(long millis) {
this.delay = millis;
return this;
}

public Builder<K, V> setKeySerializer(Serializer<K, byte[]> keySerializer) {
this.keySerializer = keySerializer;
return this;
}

public Builder<K, V> setValueSerializer(Serializer<V, byte[]> valueSerializer) {
this.valueSerializer = valueSerializer;
return this;
}

}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,33 @@
package org.opensearch.cache.store.disk;

import org.opensearch.cache.EhcacheDiskCacheSettings;
import org.opensearch.common.Randomness;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.serializer.BytesReferenceSerializer;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.bytes.CompositeBytesReference;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.test.OpenSearchSingleNodeTestCase;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
Expand All @@ -51,6 +60,8 @@ public void testBasicGetAndPut() throws IOException {
.setIsEventListenerModeSync(true)
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -89,6 +100,8 @@ public void testBasicGetAndPutUsingFactory() throws IOException {
new CacheConfig.Builder<String, String>().setValueType(String.class)
.setKeyType(String.class)
.setRemovalListener(removalListener)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setSettings(
Settings.builder()
.put(
Expand Down Expand Up @@ -149,6 +162,8 @@ public void testConcurrentPut() throws Exception {
.setIsEventListenerModeSync(true) // For accurate count
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -194,6 +209,8 @@ public void testEhcacheParallelGets() throws Exception {
.setIsEventListenerModeSync(true) // For accurate count
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -237,6 +254,8 @@ public void testEhcacheKeyIterator() throws Exception {
.setIsEventListenerModeSync(true)
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -274,6 +293,8 @@ public void testEvictions() throws Exception {
.setThreadPoolAlias("ehcacheTest")
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -304,6 +325,8 @@ public void testComputeIfAbsentConcurrently() throws Exception {
.setThreadPoolAlias("ehcacheTest")
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -373,6 +396,8 @@ public void testComputeIfAbsentConcurrentlyAndThrowsException() throws Exception
.setThreadPoolAlias("ehcacheTest")
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -430,6 +455,8 @@ public void testComputeIfAbsentWithNullValueLoading() throws Exception {
.setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache")
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -491,6 +518,8 @@ public void testEhcacheKeyIteratorWithRemove() throws IOException {
.setIsEventListenerModeSync(true)
.setKeyType(String.class)
.setValueType(String.class)
.setKeySerializer(new StringSerializer())
.setValueSerializer(new StringSerializer())
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setExpireAfterAccess(TimeValue.MAX_VALUE)
Expand Down Expand Up @@ -525,6 +554,50 @@ public void testEhcacheKeyIteratorWithRemove() throws IOException {

}

public void testBasicGetAndPutBytesReference() throws Exception {
Settings settings = Settings.builder().build();
try (NodeEnvironment env = newNodeEnvironment(settings)) {
ICache<String, BytesReference> ehCacheDiskCachingTier = new EhcacheDiskCache.Builder<String, BytesReference>()
.setThreadPoolAlias("ehcacheTest")
.setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache")
.setKeySerializer(new StringSerializer())
.setValueSerializer(new BytesReferenceSerializer())
.setKeyType(String.class)
.setValueType(BytesReference.class)
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
.setSettings(settings)
.setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES * 20) // bigger so no evictions happen
.setExpireAfterAccess(TimeValue.MAX_VALUE)
.setRemovalListener(new MockRemovalListener<>())
.build();
int randomKeys = randomIntBetween(10, 100);
int valueLength = 100;
Random rand = Randomness.get();
Map<String, BytesReference> keyValueMap = new HashMap<>();
for (int i = 0; i < randomKeys; i++) {
byte[] valueBytes = new byte[valueLength];
rand.nextBytes(valueBytes);
keyValueMap.put(UUID.randomUUID().toString(), new BytesArray(valueBytes));

// Test a non-BytesArray implementation of BytesReference.
byte[] compositeBytes1 = new byte[valueLength];
byte[] compositeBytes2 = new byte[valueLength];
rand.nextBytes(compositeBytes1);
rand.nextBytes(compositeBytes2);
BytesReference composite = CompositeBytesReference.of(new BytesArray(compositeBytes1), new BytesArray(compositeBytes2));
keyValueMap.put(UUID.randomUUID().toString(), composite);
}
for (Map.Entry<String, BytesReference> entry : keyValueMap.entrySet()) {
ehCacheDiskCachingTier.put(entry.getKey(), entry.getValue());
}
for (Map.Entry<String, BytesReference> entry : keyValueMap.entrySet()) {
BytesReference value = ehCacheDiskCachingTier.get(entry.getKey());
assertEquals(entry.getValue(), value);
}
ehCacheDiskCachingTier.close();
}
}

private static String generateRandomString(int length) {
String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
StringBuilder randomString = new StringBuilder(length);
Expand All @@ -546,4 +619,25 @@ public void onRemoval(RemovalNotification<K, V> notification) {
evictionMetric.inc();
}
}

static class StringSerializer implements Serializer<String, byte[]> {
private final Charset charset = StandardCharsets.UTF_8;

@Override
public byte[] serialize(String object) {
return object.getBytes(charset);
}

@Override
public String deserialize(byte[] bytes) {
if (bytes == null) {
return null;
}
return new String(bytes, charset);
}

public boolean equals(String object, byte[] bytes) {
return object.equals(deserialize(bytes));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.serializer;

import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;

import java.util.Arrays;

/**
* A serializer which transforms BytesReference to byte[].
* The type of BytesReference is NOT preserved after deserialization, but nothing in opensearch should care.
*/
public class BytesReferenceSerializer implements Serializer<BytesReference, byte[]> {
// This class does not get passed to ehcache itself, so it's not required that classes match after deserialization.

public BytesReferenceSerializer() {}

@Override
public byte[] serialize(BytesReference object) {
return BytesReference.toBytes(object);
}

@Override
public BytesReference deserialize(byte[] bytes) {
if (bytes == null) {
return null;
}
return new BytesArray(bytes);
}

@Override
public boolean equals(BytesReference object, byte[] bytes) {
return Arrays.equals(serialize(object), bytes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.serializer;

/**
* Defines an interface for serializers, to be used by pluggable caches.
* T is the class of the original object, and U is the serialized class.
*/
public interface Serializer<T, U> {
/**
* Serializes an object.
* @param object A non-serialized object.
* @return The serialized representation of the object.
*/
U serialize(T object);

/**
* Deserializes bytes into an object.
* @param bytes The serialized representation.
* @return The original object.
*/
T deserialize(U bytes);

/**
* Compares an object to a serialized representation of an object.
* @param object A non-serialized objet
* @param bytes Serialized representation of an object
* @return true if representing the same object, false if not
*/
boolean equals(T object, U bytes);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/** A package for serializers used in caches. */
package org.opensearch.common.cache.serializer;
Loading

0 comments on commit 0fc1f40

Please sign in to comment.