Skip to content

Commit

Permalink
* Improvements to serialization efficiency of replicated Cache and …
Browse files Browse the repository at this point in the history
…`CachedValue`
  • Loading branch information
lbwexler committed Sep 5, 2024
1 parent 85b7147 commit d71c955
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 38 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## 22.0-SNAPSHOT - unreleased

### ⚙️ Technical
* Improvements to serialization efficiency of replicated `Cache` and `CachedValue`

## 21.0.0 - 2024-09-03

### 💥 Breaking Changes (upgrade difficulty: 🟢 LOW - latest Hoist React + DB col additions)
Expand Down
26 changes: 4 additions & 22 deletions src/main/groovy/io/xh/hoist/cache/BaseCache.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@

package io.xh.hoist.cache

import com.hazelcast.core.EntryEvent
import com.hazelcast.core.EntryListener

import groovy.transform.CompileStatic
import io.xh.hoist.BaseService
import io.xh.hoist.cluster.ClusterService
Expand Down Expand Up @@ -53,7 +52,7 @@ abstract class BaseCache<V> {
public final boolean serializeOldValue

/** Handlers to be called on change with a {@link CacheValueChanged} object. */
public final List<Closure> onChange
public final List<Closure> onChange = []

BaseCache(
BaseService svc,
Expand All @@ -62,8 +61,7 @@ abstract class BaseCache<V> {
Closure expireFn,
Closure timestampFn,
boolean replicate,
boolean serializeOldValue,
Closure onChange
boolean serializeOldValue
) {
this.svc = svc
this.name = name
Expand All @@ -72,13 +70,10 @@ abstract class BaseCache<V> {
this.timestampFn = timestampFn
this.replicate = replicate
this.serializeOldValue = serializeOldValue
this.onChange = onChange ? [onChange] : []
}

/** @param handler called on change with a {@link CacheValueChanged} object. */
void addChangeHandler(Closure handler) {
onChange << handler
}
abstract void addChangeHandler(Closure handler)

/** Clear all values. */
abstract void clear()
Expand All @@ -90,19 +85,6 @@ abstract class BaseCache<V> {
return replicate && ClusterService.multiInstanceEnabled
}

protected EntryListener getHzEntryListener() {
Closure onChg = { EntryEvent<?, Entry<V>> it ->
fireOnChange(it.key, it.oldValue?.value, it.value?.value)
}
return [
entryAdded : onChg,
entryUpdated: onChg,
entryRemoved: onChg,
entryEvicted: onChg,
entryExpired: onChg
] as EntryListener
}

protected void fireOnChange(Object key, V oldValue, V value) {
def change = new CacheValueChanged(this, key, oldValue, value)
onChange.each { it.call(change) }
Expand Down
18 changes: 11 additions & 7 deletions src/main/groovy/io/xh/hoist/cache/Cache.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,14 @@ class Cache<K,V> extends BaseCache<V> {
@NamedParam boolean serializeOldValue = false,
@NamedParam Closure onChange = null
) {
super(svc, name, expireTime, expireFn, timestampFn, replicate, serializeOldValue, onChange)
super(svc, name, expireTime, expireFn, timestampFn, replicate, serializeOldValue)

if (replicate && !name) {
throw new IllegalArgumentException("Cannot create a replicated Cache without a unique name")
}

if (useCluster) {
_map = svc.getReplicatedMap(name)
(_map as ReplicatedMap).addEntryListener(getHzEntryListener())
} else {
_map = new ConcurrentHashMap()
}
_map = useCluster ? svc.getReplicatedMap(name) : new ConcurrentHashMap()
if (onChange) addChangeHandler(onChange)

timer = new Timer(
owner: svc,
Expand Down Expand Up @@ -131,6 +127,14 @@ class Cache<K,V> extends BaseCache<V> {
_map.each { k, v -> remove(k)}
}

void addChangeHandler(Closure handler) {
if (!onChange && _map instanceof ReplicatedMap) {
_map.addEntryListener(new HzEntryListener(this))
}
onChange << handler
}


/**
* Wait for the cache entry to be populated.
* @param key, entry to check
Expand Down
19 changes: 10 additions & 9 deletions src/main/groovy/io/xh/hoist/cache/CachedValue.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import java.util.concurrent.TimeoutException
import static io.xh.hoist.util.DateTimeUtils.SECONDS
import static io.xh.hoist.util.DateTimeUtils.intervalElapsed
import static java.lang.System.currentTimeMillis
import static java.util.Collections.emptyMap


/**
* Similar to {@link Cache}, but a single value that can be read, written, and expired.
Expand All @@ -32,14 +30,10 @@ class CachedValue<V> extends BaseCache<V> {
@NamedParam boolean serializeOldValue = false,
@NamedParam Closure onChange = null
) {
super(svc, name, expireTime, expireFn, timestampFn, replicate, serializeOldValue, onChange)
super(svc, name, expireTime, expireFn, timestampFn, replicate, serializeOldValue)

if (useCluster) {
_map = svc.replicatedCachedValuesMap
(_map as ReplicatedMap).addEntryListener(getHzEntryListener(), name)
} else {
_map = svc.localCachedValuesMap
}
_map = useCluster ? svc.replicatedCachedValuesMap : svc.localCachedValuesMap
if (onChange) addChangeHandler(onChange)
}

/** @returns the cached value. */
Expand Down Expand Up @@ -108,4 +102,11 @@ class CachedValue<V> extends BaseCache<V> {
throw new TimeoutException(msg)
}
}

void addChangeHandler(Closure handler) {
if (!onChange && _map instanceof ReplicatedMap) {
_map.addEntryListener(new HzEntryListener(this), name)
}
onChange << handler
}
}
43 changes: 43 additions & 0 deletions src/main/groovy/io/xh/hoist/cache/HzEntryListener.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.xh.hoist.cache

import com.hazelcast.core.EntryEvent
import com.hazelcast.core.EntryListener
import com.hazelcast.map.MapEvent

class HzEntryListener implements EntryListener {

private BaseCache target

HzEntryListener(BaseCache target) {
this.target = target
}

void entryAdded(EntryEvent event) {
fireEvent(event)
}

void entryEvicted(EntryEvent event) {
fireEvent(event)
}

void entryExpired(EntryEvent event) {
fireEvent(event)
}

void entryRemoved(EntryEvent event) {
fireEvent(event)
}

void entryUpdated(EntryEvent event) {
fireEvent(event)
}

void mapCleared(MapEvent event) {}

void mapEvicted(MapEvent event) {}

private fireEvent(EntryEvent event) {
target.fireOnChange(event.key, event.oldValue?.value, event.value?.value)
}

}

0 comments on commit d71c955

Please sign in to comment.