Skip to content

Commit

Permalink
Spikes ES support
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrian Cole committed Jul 22, 2017
1 parent dc72958 commit 2b28827
Show file tree
Hide file tree
Showing 14 changed files with 160 additions and 493 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,18 @@
*/
package zipkin.storage.elasticsearch.http;

import com.squareup.moshi.JsonWriter;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import okio.Buffer;
import zipkin.Codec;
import zipkin.Span;
import zipkin.internal.Pair;
import zipkin.simplespan.SimpleSpan;
import zipkin.simplespan.SimpleSpanCodec;
import zipkin.simplespan.SimpleSpanConverter;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.Callback;

import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp;
import static zipkin.internal.Util.UTF_8;
import static zipkin.internal.Util.propagateIfFatal;
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SERVICE_SPAN;

class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final for testing

Expand All @@ -50,20 +43,15 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final
}
try {
HttpBulkIndexer indexer = new HttpBulkIndexer("index-span", es);
Map<String, Set<Pair<String>>> indexToServiceSpans = indexSpans(indexer, spans);
if (!indexToServiceSpans.isEmpty()) {
indexNames(indexer, indexToServiceSpans);
}
indexSpans(indexer, spans);
indexer.execute(callback);
} catch (Throwable t) {
propagateIfFatal(t);
callback.onError(t);
}
}

/** Indexes spans and returns a mapping of indexes that may need a names update */
Map<String, Set<Pair<String>>> indexSpans(HttpBulkIndexer indexer, List<Span> spans) {
Map<String, Set<Pair<String>>> indexToServiceSpans = new LinkedHashMap<>();
void indexSpans(HttpBulkIndexer indexer, List<Span> spans) {
for (Span span : spans) {
Long timestamp = guessTimestamp(span);
Long timestampMillis;
Expand All @@ -83,40 +71,11 @@ Map<String, Set<Pair<String>>> indexSpans(HttpBulkIndexer indexer, List<Span> sp
if (indexTimestamp == null) indexTimestamp = System.currentTimeMillis();
index = indexNameFormatter.indexNameForTimestamp(indexTimestamp);
}
if (!span.name.isEmpty()) putServiceSpans(indexToServiceSpans, index, span);
byte[] document = Codec.JSON.writeSpan(span);
if (timestampMillis != null) document = prefixWithTimestampMillis(document, timestampMillis);
indexer.add(index, ElasticsearchHttpSpanStore.SPAN, document, null /* Allow ES to choose an ID */);
}
return indexToServiceSpans;
}

void putServiceSpans(Map<String, Set<Pair<String>>> indexToServiceSpans, String index, Span s) {
Set<Pair<String>> serviceSpans = indexToServiceSpans.get(index);
if (serviceSpans == null) indexToServiceSpans.put(index, serviceSpans = new LinkedHashSet<>());
for (String serviceName : s.serviceNames()) {
serviceSpans.add(Pair.create(serviceName, s.name));
}
}

/**
* Adds service and span names to the pending batch. The id is "serviceName|spanName" to prevent
* a large order of duplicates ending up in the daily index. This also means queries do not need
* to deduplicate.
*/
void indexNames(HttpBulkIndexer indexer, Map<String, Set<Pair<String>>> indexToServiceSpans)
throws IOException {
Buffer buffer = new Buffer();
for (Map.Entry<String, Set<Pair<String>>> entry : indexToServiceSpans.entrySet()) {
String index = entry.getKey();
for (Pair<String> serviceSpan : entry.getValue()) {
JsonWriter writer = JsonWriter.of(buffer);
writer.beginObject();
writer.name("serviceName").value(serviceSpan._1);
writer.name("spanName").value(serviceSpan._2);
writer.endObject();
byte[] document = buffer.readByteArray();
indexer.add(index, SERVICE_SPAN, document, serviceSpan._1 + "|" + serviceSpan._2);
for (SimpleSpan simpleSpan: SimpleSpanConverter.fromSpan(span)) {
byte[] document = SimpleSpanCodec.JSON.writeSpan(simpleSpan);
if (timestampMillis != null)
document = prefixWithTimestampMillis(document, timestampMillis);
indexer.add(index, ElasticsearchHttpSpanStore.SPAN, document, null /* Allow ES to choose an ID */);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -63,39 +62,21 @@ final class ElasticsearchHttpSpanStore implements AsyncSpanStore {
SearchRequest.Filters filters = new SearchRequest.Filters();
filters.addRange("timestamp_millis", beginMillis, endMillis);
if (request.serviceName != null) {
filters.addNestedTerms(asList(
"annotations.endpoint.serviceName",
"binaryAnnotations.endpoint.serviceName"
), request.serviceName);
filters.addTerm("localEndpoint.serviceName", request.serviceName);
}

if (request.spanName != null) {
filters.addTerm("name", request.spanName);
}

for (String annotation : request.annotations) {
Map<String, String> annotationValues = new LinkedHashMap<>();
annotationValues.put("annotations.value", annotation);
Map<String, String> binaryAnnotationKeys = new LinkedHashMap<>();
binaryAnnotationKeys.put("binaryAnnotations.key", annotation);
if (request.serviceName != null) {
annotationValues.put("annotations.endpoint.serviceName", request.serviceName);
binaryAnnotationKeys.put("binaryAnnotations.endpoint.serviceName", request.serviceName);
}
filters.addNestedTerms(annotationValues, binaryAnnotationKeys);
filters.should()
.addTerm("annotations.value", annotation)
.addExists("tags." + annotation);
}

for (Map.Entry<String, String> kv : request.binaryAnnotations.entrySet()) {
// In our index template, we make sure the binaryAnnotation value is indexed as string,
// meaning non-string values won't even be indexed at all. This means that we can only
// match string values here, which happens to be exactly what we want.
Map<String, String> nestedTerms = new LinkedHashMap<>();
nestedTerms.put("binaryAnnotations.key", kv.getKey());
nestedTerms.put("binaryAnnotations.value", kv.getValue());
if (request.serviceName != null) {
nestedTerms.put("binaryAnnotations.endpoint.serviceName", request.serviceName);
}
filters.addNestedTerms(nestedTerms);
filters.addTerm("tags." + kv.getKey(), kv.getValue());
}

if (request.minDuration != null) {
Expand Down Expand Up @@ -193,28 +174,15 @@ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback<List<Span>>
long beginMillis = endMillis - namesLookback;

List<String> indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis);
SearchRequest request = SearchRequest.forIndicesAndType(indices, SERVICE_SPAN)
.addAggregation(Aggregation.terms("serviceName", Integer.MAX_VALUE));

search.newCall(request, BodyConverters.SORTED_KEYS).submit(new Callback<List<String>>() {
@Override public void onSuccess(List<String> value) {
if (!value.isEmpty()) callback.onSuccess(value);

// Special cased code until sites update their collectors. What this does is do a more
// expensive nested query to get service names when the servicespan type returns nothing.
SearchRequest.Filters filters = new SearchRequest.Filters();
filters.addRange("timestamp_millis", beginMillis, endMillis);
SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN)
.filters(filters)
.addAggregation(Aggregation.nestedTerms("annotations.endpoint.serviceName"))
.addAggregation(Aggregation.nestedTerms("binaryAnnotations.endpoint.serviceName"));
search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
}

@Override public void onError(Throwable t) {
callback.onError(t);
}
});
// Service name queries include both local and remote endpoints. This is different than
// Span name, as a span name can only be on a local endpoint.
SearchRequest.Filters filters = new SearchRequest.Filters();
filters.addRange("timestamp_millis", beginMillis, endMillis);
SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN)
.filters(filters)
.addAggregation(Aggregation.terms("localEndpoint.serviceName", Integer.MAX_VALUE))
.addAggregation(Aggregation.terms("remoteEndpoint.serviceName", Integer.MAX_VALUE));
search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
}

@Override public void getSpanNames(String serviceName, Callback<List<String>> callback) {
Expand All @@ -228,32 +196,15 @@ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback<List<Span>>

List<String> indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis);

SearchRequest request = SearchRequest.forIndicesAndType(indices, SERVICE_SPAN)
.term("serviceName", serviceName.toLowerCase(Locale.ROOT))
.addAggregation(Aggregation.terms("spanName", Integer.MAX_VALUE));

search.newCall(request, BodyConverters.SORTED_KEYS).submit(new Callback<List<String>>() {
@Override public void onSuccess(List<String> value) {
if (!value.isEmpty()) callback.onSuccess(value);

// Special cased code until sites update their collectors. What this does is do a more
// expensive nested query to get span names when the servicespan type returns nothing.
SearchRequest.Filters filters = new SearchRequest.Filters();
filters.addRange("timestamp_millis", beginMillis, endMillis);
filters.addNestedTerms(asList(
"annotations.endpoint.serviceName",
"binaryAnnotations.endpoint.serviceName"
), serviceName.toLowerCase(Locale.ROOT));
SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN)
.filters(filters)
.addAggregation(Aggregation.terms("name", Integer.MAX_VALUE));
search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
}
// A span name is only valid on a local endpoint, as a span name is defined locally
SearchRequest.Filters filters = new SearchRequest.Filters()
.addRange("timestamp_millis", beginMillis, endMillis)
.addTerm("localEndpoint.serviceName", serviceName.toLowerCase(Locale.ROOT));

@Override public void onError(Throwable t) {
callback.onError(t);
}
});
SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN)
.filters(filters)
.addAggregation(Aggregation.terms("name", Integer.MAX_VALUE));
search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
}

@Override public void getDependencies(long endTs, @Nullable Long lookback,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import zipkin.DependencyLink;
import zipkin.Endpoint;
import zipkin.Span;
import zipkin.internal.Util;
import zipkin.simplespan.SimpleSpan;
import zipkin.simplespan.SimpleSpanConverter;

import static zipkin.internal.Util.UTF_8;
import static zipkin.internal.Util.lowerHexToUnsignedLong;

/**
* Read-only json adapters resurrected from before we switched to Java 6 as storage components can
Expand All @@ -38,7 +38,7 @@ final class JsonAdapters {
static final JsonAdapter<Span> SPAN_ADAPTER = new JsonAdapter<Span>() {
@Override
public Span fromJson(JsonReader reader) throws IOException {
Span.Builder result = Span.builder();
SimpleSpan.Builder result = SimpleSpan.builder();
reader.beginObject();
while (reader.hasNext()) {
String nextName = reader.nextName();
Expand All @@ -48,50 +48,59 @@ public Span fromJson(JsonReader reader) throws IOException {
}
switch (nextName) {
case "traceId":
String traceId = reader.nextString();
if (traceId.length() == 32) {
result.traceIdHigh(lowerHexToUnsignedLong(traceId, 0));
}
result.traceId(lowerHexToUnsignedLong(traceId));
result.traceId(reader.nextString());
break;
case "name":
result.name(reader.nextString());
case "parentId":
result.parentId(reader.nextString());
break;
case "id":
result.id(Util.lowerHexToUnsignedLong(reader.nextString()));
result.id(reader.nextString());
break;
case "parentId":
result.parentId(Util.lowerHexToUnsignedLong(reader.nextString()));
case "kind":
result.kind(SimpleSpan.Kind.valueOf(reader.nextString()));
break;
case "name":
result.name(reader.nextString());
break;
case "timestamp":
result.timestamp(reader.nextLong());
break;
case "duration":
result.duration(reader.nextLong());
break;
case "localEndpoint":
result.localEndpoint(ENDPOINT_ADAPTER.fromJson(reader));
break;
case "remoteEndpoint":
result.remoteEndpoint(ENDPOINT_ADAPTER.fromJson(reader));
break;
case "annotations":
reader.beginArray();
while (reader.hasNext()) {
result.addAnnotation(ANNOTATION_ADAPTER.fromJson(reader));
Annotation a = ANNOTATION_ADAPTER.fromJson(reader);
result.addAnnotation(a.timestamp, a.value);
}
reader.endArray();
break;
case "binaryAnnotations":
reader.beginArray();
case "tags":
reader.beginObject();
while (reader.hasNext()) {
result.addBinaryAnnotation(BINARY_ANNOTATION_ADAPTER.fromJson(reader));
result.putTag(reader.nextName(), reader.nextString());
}
reader.endArray();
reader.endObject();
break;
case "debug":
result.debug(reader.nextBoolean());
break;
case "shared":
result.shared(reader.nextBoolean());
break;
default:
reader.skipValue();
}
}
reader.endObject();
return result.build();
return SimpleSpanConverter.toSpan(result.build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ final class VersionSpecificTemplate {
+ " },\n"
+ " \"mappings\": {\n"
+ " \"_default_\": {\n"
+ " \"dynamic_templates\": [\n"
+ " {\n"
+ " \"strings\": {\n"
+ " \"mapping\": {\n"
+ " KEYWORD,\n"
+ " \"ignore_above\": 256\n"
+ " },\n"
+ " \"match_mapping_type\": \"string\",\n"
+ " \"match\": \"*\"\n"
+ " }\n"
+ " }\n"
+ " ],\n"
+ " \"_all\": {\n"
+ " \"enabled\": false\n"
+ " }\n"
Expand All @@ -67,35 +79,31 @@ final class VersionSpecificTemplate {
+ " \"properties\": {\n"
+ " \"traceId\": ${__TRACE_ID_MAPPING__},\n"
+ " \"name\": { KEYWORD },\n"
+ " \"localEndpoint\": {\n"
+ " \"type\": \"object\",\n"
+ " \"dynamic\": false,\n"
+ " \"properties\": { \"serviceName\": { KEYWORD } }\n"
+ " },\n"
+ " \"remoteEndpoint\": {\n"
+ " \"type\": \"object\",\n"
+ " \"dynamic\": false,\n"
+ " \"properties\": { \"serviceName\": { KEYWORD } }\n"
+ " },\n"
+ " \"timestamp_millis\": {\n"
+ " \"type\": \"date\",\n"
+ " \"format\": \"epoch_millis\"\n"
+ " },\n"
+ " \"duration\": { \"type\": \"long\" },\n"
+ " \"annotations\": {\n"
+ " \"type\": \"nested\",\n"
+ " \"type\": \"object\",\n"
+ " \"dynamic\": false,\n"
+ " \"properties\": {\n"
+ " \"value\": { KEYWORD },\n"
+ " \"endpoint\": {\n"
+ " \"type\": \"object\",\n"
+ " \"dynamic\": false,\n"
+ " \"properties\": { \"serviceName\": { KEYWORD } }\n"
+ " }\n"
+ " \"value\": { KEYWORD }\n"
+ " }\n"
+ " },\n"
+ " \"binaryAnnotations\": {\n"
+ " \"type\": \"nested\",\n"
+ " \"dynamic\": false,\n"
+ " \"properties\": {\n"
+ " \"key\": { KEYWORD },\n"
+ " \"value\": { KEYWORD },\n"
+ " \"endpoint\": {\n"
+ " \"type\": \"object\",\n"
+ " \"dynamic\": false,\n"
+ " \"properties\": { \"serviceName\": { KEYWORD } }\n"
+ " }\n"
+ " }\n"
+ " \"tags\": {\n"
+ " \"type\": \"object\",\n"
+ " \"dynamic\": true\n"
+ " }\n"
+ " }\n"
+ " },\n"
Expand Down
Loading

0 comments on commit 2b28827

Please sign in to comment.