Skip to content

Commit

Permalink
Indexing instrumentation changes for auto-create indices and threadpo…
Browse files Browse the repository at this point in the history
…ol queue waiting
  • Loading branch information
rayshrey committed Nov 3, 2023
1 parent 8673fa9 commit 7dedbe9
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -325,15 +325,19 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
} else {
final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
Span span = tracer.startSpan(SpanBuilder.from("autoCreateIndex", autoCreateIndices.size()));
SpanScope spanScope = tracer.withSpanInScope(span);
for (String index : autoCreateIndices) {
createIndex(index, bulkRequest.timeout(), minNodeVersion, new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
span.endSpan();
threadPool.executor(executorName).execute(new ActionRunnable<BulkResponse>(listener) {

@Override
protected void doRun() {
spanScope.close();
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
}
});
Expand All @@ -352,13 +356,15 @@ public void onFailure(Exception e) {
}
}
if (counter.decrementAndGet() == 0) {
span.endSpan();
final ActionListener<BulkResponse> wrappedListener = ActionListener.wrap(listener::onResponse, inner -> {
inner.addSuppressed(e);
listener.onFailure(inner);
});
threadPool.executor(executorName).execute(new ActionRunnable<BulkResponse>(wrappedListener) {
@Override
protected void doRun() {
spanScope.close();
executeBulk(task, bulkRequest, startTime, wrappedListener, responses, indicesThatCannotBeCreated);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,22 +225,43 @@ protected void shardOperationOnPrimary(
ActionListener<PrimaryResult<ReplicaRequest, Response>> listener
) {
final String executor = executorFunction.apply(primary);
threadPool.executor(executor).execute(new ActionRunnable<PrimaryResult<ReplicaRequest, Response>>(listener) {
@Override
protected void doRun() {
Span span = tracer.startSpan(
SpanBuilder.from("dispatchedShardOperationOnPrimary", clusterService.localNode().getId(), request)
);
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
dispatchedShardOperationOnPrimary(request, primary, TraceableActionListener.create(listener, span, tracer));
Span queueTimeSpan = tracer.startSpan(
SpanBuilder.from("dispatchedShardOperationOnPrimaryQueued", clusterService.localNode().getId(), request)
);

try (SpanScope spanScope = tracer.withSpanInScope(queueTimeSpan)) {
threadPool.executor(executor).execute(new ActionRunnable<PrimaryResult<ReplicaRequest, Response>>(listener) {
@Override
public void onFailure(Exception e) {
queueTimeSpan.setError(e);
queueTimeSpan.endSpan();
super.onFailure(e);
}
}

@Override
public boolean isForceExecution() {
return force(request);
}
});
@Override
public void onRejection(Exception e) {
queueTimeSpan.setError(e);
queueTimeSpan.endSpan();
super.onRejection(e);
}

@Override
protected void doRun() {
queueTimeSpan.endSpan();
Span span = tracer.startSpan(
SpanBuilder.from("dispatchedShardOperationOnPrimary", clusterService.localNode().getId(), request)
);
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
dispatchedShardOperationOnPrimary(request, primary, TraceableActionListener.create(listener, span, tracer));
}
}

@Override
public boolean isForceExecution() {
return force(request);
}
});
}
}

protected abstract void dispatchedShardOperationOnPrimary(
Expand All @@ -258,22 +279,42 @@ protected abstract void dispatchedShardOperationOnPrimary(
*/
@Override
protected void shardOperationOnReplica(ReplicaRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
threadPool.executor(executorFunction.apply(replica)).execute(new ActionRunnable<ReplicaResult>(listener) {
@Override
protected void doRun() {
Span span = tracer.startSpan(
SpanBuilder.from("dispatchedShardOperationOnReplica", clusterService.localNode().getId(), request)
);
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
dispatchedShardOperationOnReplica(request, replica, TraceableActionListener.create(listener, span, tracer));
Span queueTimeSpan = tracer.startSpan(
SpanBuilder.from("dispatchedShardOperationOnReplicaQueued", clusterService.localNode().getId(), request)
);
try (SpanScope spanScope = tracer.withSpanInScope(queueTimeSpan)) {
threadPool.executor(executorFunction.apply(replica)).execute(new ActionRunnable<ReplicaResult>(listener) {
@Override
public void onFailure(Exception e) {
queueTimeSpan.setError(e);
queueTimeSpan.endSpan();
super.onFailure(e);
}
}

@Override
public boolean isForceExecution() {
return true;
}
});
@Override
public void onRejection(Exception e) {
queueTimeSpan.setError(e);
queueTimeSpan.endSpan();
super.onRejection(e);
}

@Override
protected void doRun() {
queueTimeSpan.endSpan();
Span span = tracer.startSpan(
SpanBuilder.from("dispatchedShardOperationOnReplica", clusterService.localNode().getId(), request)
);
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
dispatchedShardOperationOnReplica(request, replica, TraceableActionListener.create(listener, span, tracer));
}
}

@Override
public boolean isForceExecution() {
return true;
}
});
}
}

protected abstract void dispatchedShardOperationOnReplica(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,9 @@ private AttributeNames() {
* Refresh Policy
*/
public static final String REFRESH_POLICY = "refresh_policy";

/**
* Number of Indices
*/
public static final String NUM_INDICES = "num_indices";
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ public static SpanCreationContext from(String spanName, String nodeId, Replicate
return SpanCreationContext.server().name(spanName).attributes(buildSpanAttributes(nodeId, request));
}

public static SpanCreationContext from(String spanName, int numIndices) {
return SpanCreationContext.server()
.name(spanName)
.attributes(Attributes.create().addAttribute(AttributeNames.NUM_INDICES, numIndices));
}

private static String createSpanName(HttpRequest httpRequest) {
return httpRequest.method().name() + SEPARATOR + httpRequest.uri();
}
Expand Down

0 comments on commit 7dedbe9

Please sign in to comment.