Skip to content

Commit

Permalink
Remove synchronized on PerOperationSampler.sample
Browse files Browse the repository at this point in the history
- Use a ConcurrentHashMap for operationNameToSampler so that any number
of concurrent calls to sample can be made and safely modify it.
- Add volatile to any fields that can be changed by the update method to
ensure visibility of changes to other threads.
- Retain instances of GuaranteedThroughputSampler to preserve their rate
limit balances across updates when parameters don't change improving on
jaegertracing/jaeger#1729

See jaegertracing#609 for
similar work.

Fixes jaegertracing#807

Signed-off-by: Will Tran <will@autonomic.ai>
  • Loading branch information
Will Tran committed Oct 14, 2021
1 parent f9a9592 commit c0cf6fd
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
import io.jaegertracing.internal.samplers.http.OperationSamplingParameters;
import io.jaegertracing.internal.samplers.http.PerOperationSamplingParameters;
import io.jaegertracing.spi.Sampler;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
Expand All @@ -38,13 +41,13 @@
@Getter(AccessLevel.PACKAGE) //Visible for testing
public class PerOperationSampler implements Sampler {
private final int maxOperations;
private Map<String, GuaranteedThroughputSampler> operationNameToSampler;
private ProbabilisticSampler defaultSampler;
private double lowerBound;
private final ConcurrentHashMap<String, GuaranteedThroughputSampler> operationNameToSampler;
private volatile ProbabilisticSampler defaultSampler;
private volatile double lowerBound;

public PerOperationSampler(int maxOperations, OperationSamplingParameters strategies) {
this(maxOperations,
new HashMap<String, GuaranteedThroughputSampler>(),
new ConcurrentHashMap<String, GuaranteedThroughputSampler>(),
new ProbabilisticSampler(strategies.getDefaultSamplingProbability()),
strategies.getDefaultLowerBoundTracesPerSecond());
update(strategies);
Expand All @@ -56,55 +59,62 @@ public PerOperationSampler(int maxOperations, OperationSamplingParameters strate
* @return true if any samplers were updated
*/
public synchronized boolean update(final OperationSamplingParameters strategies) {
boolean isUpdated = false;
AtomicBoolean isUpdated = new AtomicBoolean(false);

lowerBound = strategies.getDefaultLowerBoundTracesPerSecond();
ProbabilisticSampler defaultSampler = new ProbabilisticSampler(strategies.getDefaultSamplingProbability());
if (lowerBound != strategies.getDefaultLowerBoundTracesPerSecond()) {
lowerBound = strategies.getDefaultLowerBoundTracesPerSecond();
isUpdated.set(true);
}
ProbabilisticSampler defaultSampler = new ProbabilisticSampler(
strategies.getDefaultSamplingProbability());

if (!defaultSampler.equals(this.defaultSampler)) {
this.defaultSampler = defaultSampler;
isUpdated = true;
isUpdated.set(true);
}

Map<String, GuaranteedThroughputSampler> newOpsSamplers = new HashMap<String, GuaranteedThroughputSampler>();
//add or update operation samples using given strategies
Set<String> configuredOperations = strategies.getPerOperationStrategies().stream()
.map(PerOperationSamplingParameters::getOperation).collect(Collectors.toSet());
for (Entry<String, GuaranteedThroughputSampler> entry : operationNameToSampler.entrySet()) {
if (!configuredOperations.contains(entry.getKey())
&& entry.getValue().update(defaultSampler.getSamplingRate(), lowerBound)) {
isUpdated.set(true);
}
}

// add or update operation samples using given strategies
for (PerOperationSamplingParameters strategy : strategies.getPerOperationStrategies()) {
String operation = strategy.getOperation();
double samplingRate = strategy.getProbabilisticSampling().getSamplingRate();
GuaranteedThroughputSampler sampler = operationNameToSampler.get(operation);
if (sampler != null) {
isUpdated = sampler.update(samplingRate, lowerBound) || isUpdated;
newOpsSamplers.put(operation, sampler);
} else {
if (newOpsSamplers.size() < maxOperations) {
sampler = new GuaranteedThroughputSampler(samplingRate, lowerBound);
newOpsSamplers.put(operation, sampler);
isUpdated = true;
} else {
log.info("Exceeded the maximum number of operations({}) for per operations sampling",
maxOperations);
}
GuaranteedThroughputSampler sampler = operationNameToSampler.computeIfAbsent(operation,
op -> {
if (operationNameToSampler.size() >= maxOperations) {
log.info("Exceeded the maximum number of operations({}) for per operations sampling",
maxOperations);
return null;
}
isUpdated.set(true);
return new GuaranteedThroughputSampler(samplingRate, lowerBound);
});
if (sampler != null && sampler.update(samplingRate, lowerBound)) {
isUpdated.set(true);
}
}

operationNameToSampler = newOpsSamplers;
return isUpdated;
return isUpdated.get();
}

@Override
public synchronized SamplingStatus sample(String operation, long id) {
GuaranteedThroughputSampler sampler = operationNameToSampler.get(operation);
if (sampler != null) {
return sampler.sample(operation, id);
}

if (operationNameToSampler.size() < maxOperations) {
sampler = new GuaranteedThroughputSampler(defaultSampler.getSamplingRate(), lowerBound);
operationNameToSampler.put(operation, sampler);
return sampler.sample(operation, id);
public SamplingStatus sample(String operation, long id) {
Sampler sampler = operationNameToSampler.computeIfAbsent(operation, op -> {
if (operationNameToSampler.size() >= maxOperations) {
return null;
}
return new GuaranteedThroughputSampler(defaultSampler.getSamplingRate(), lowerBound);
});
if (sampler == null) {
sampler = defaultSampler;
}

return defaultSampler.sample(operation, id);
return sampler.sample(operation, id);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
Expand All @@ -29,6 +30,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -48,7 +50,7 @@ public class PerOperationSamplerTest {
private static final String OPERATION = "some OPERATION";

@Mock private ProbabilisticSampler defaultProbabilisticSampler;
private HashMap<String, GuaranteedThroughputSampler> operationToSamplers = new HashMap<>();
private ConcurrentHashMap<String, GuaranteedThroughputSampler> operationToSamplers = new ConcurrentHashMap<>();
private PerOperationSampler undertest;

@Before
Expand Down Expand Up @@ -174,24 +176,46 @@ public void testUpdateAddOperation() {
DEFAULT_LOWER_BOUND_TRACES_PER_SECOND),
undertest.getOperationNameToSampler().get(OPERATION));
}

@Test
public void testAbsentOperationIsRemoved() {
String absentOp = "ShouldBeRemoved";
operationToSamplers.put(absentOp, mock(GuaranteedThroughputSampler.class));
public void testPreviouslyKnownOperationRevertsToDefaultProbabilityAfterUpdate() {
String previouslyKnownOp = "previouslyKnownOp";
operationToSamplers.put(previouslyKnownOp, mock(GuaranteedThroughputSampler.class));

PerOperationSamplingParameters perOperationSamplingParameters1 =
new PerOperationSamplingParameters(OPERATION, new ProbabilisticSamplingStrategy(SAMPLING_RATE));
List<PerOperationSamplingParameters> parametersList = new ArrayList<>();
parametersList.add(perOperationSamplingParameters1);

undertest.update(new OperationSamplingParameters(DEFAULT_SAMPLING_PROBABILITY,
DEFAULT_LOWER_BOUND_TRACES_PER_SECOND,
parametersList));
DEFAULT_LOWER_BOUND_TRACES_PER_SECOND,
parametersList));
verify(operationToSamplers.get(previouslyKnownOp)).update(DEFAULT_SAMPLING_PROBABILITY,
DEFAULT_LOWER_BOUND_TRACES_PER_SECOND);
}

@Test
public void testUpdateRetainsSamplerInstances() {
String previouslyKnownOp = "previouslyKnownOp";
operationToSamplers.put(previouslyKnownOp, mock(GuaranteedThroughputSampler.class));
operationToSamplers.put(OPERATION, mock(GuaranteedThroughputSampler.class));
undertest.sample(previouslyKnownOp, TRACE_ID);
undertest.sample(OPERATION, TRACE_ID);
verify(operationToSamplers.get(previouslyKnownOp), times(1)).sample(previouslyKnownOp, TRACE_ID);
verify(operationToSamplers.get(OPERATION), times(1)).sample(OPERATION, TRACE_ID);


PerOperationSamplingParameters perOperationSamplingParameters1 =
new PerOperationSamplingParameters(OPERATION, new ProbabilisticSamplingStrategy(SAMPLING_RATE));
List<PerOperationSamplingParameters> parametersList = new ArrayList<>();
parametersList.add(perOperationSamplingParameters1);

assertEquals(1, undertest.getOperationNameToSampler().size());
assertEquals(new GuaranteedThroughputSampler(SAMPLING_RATE,
DEFAULT_LOWER_BOUND_TRACES_PER_SECOND),
undertest.getOperationNameToSampler().get(OPERATION));
assertFalse(undertest.getOperationNameToSampler().containsKey(absentOp));
undertest.update(new OperationSamplingParameters(DEFAULT_SAMPLING_PROBABILITY,
DEFAULT_LOWER_BOUND_TRACES_PER_SECOND,
parametersList));
undertest.sample(previouslyKnownOp, TRACE_ID);
undertest.sample(OPERATION, TRACE_ID);
verify(operationToSamplers.get(previouslyKnownOp), times(2)).sample(previouslyKnownOp, TRACE_ID);
verify(operationToSamplers.get(OPERATION), times(2)).sample(OPERATION, TRACE_ID);
}
}

0 comments on commit c0cf6fd

Please sign in to comment.