Skip to content

Commit

Permalink
Turn off Statsbeat when AMPLS is used (#1994)
Browse files Browse the repository at this point in the history
* shutdown statsbeat when sending statsbeat request and server returns UnknownHostException

* Fix failing tests

* Fix tests

* Remove

* Address comments
  • Loading branch information
heyams committed Dec 9, 2021
1 parent 8172def commit 7e90a14
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ void scheduleWithFixedDelay(long interval) {
scheduledExecutor.scheduleWithFixedDelay(this, 60, interval, TimeUnit.SECONDS);
}

void shutdown() {
logger.info("Shutting down Azure Metadata Service.");
scheduledExecutor.shutdown();
}

// only used by tests
void updateMetadata(String response) throws IOException {
updateMetadata(mapper.readValue(response, MetadataInstanceResponse.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ public class StatsbeatModule {
ThreadPoolUtils.createDaemonThreadFactory(BaseStatsbeat.class));

private final CustomDimensions customDimensions;

private final NetworkStatsbeat networkStatsbeat;
private final AttachStatsbeat attachStatsbeat;
private final FeatureStatsbeat featureStatsbeat;
private final FeatureStatsbeat instrumentationStatsbeat;
private final NonessentialStatsbeat nonessentialStatsbeat;
private final AzureMetadataService azureMetadataService;

private final AtomicBoolean started = new AtomicBoolean();

Expand All @@ -57,6 +57,7 @@ public StatsbeatModule(Cache<String, String> ikeyEndpointMap) {
featureStatsbeat = new FeatureStatsbeat(customDimensions, FeatureType.FEATURE);
instrumentationStatsbeat = new FeatureStatsbeat(customDimensions, FeatureType.INSTRUMENTATION);
nonessentialStatsbeat = new NonessentialStatsbeat(customDimensions);
azureMetadataService = new AzureMetadataService(attachStatsbeat, customDimensions);
}

public void start(TelemetryClient telemetryClient, Configuration config) {
Expand Down Expand Up @@ -99,9 +100,7 @@ public void start(TelemetryClient telemetryClient, Configuration config) {
// only turn on AzureMetadataService when the resource provider is VM or UNKNOWN.
if (rp == ResourceProvider.RP_VM || rp == ResourceProvider.UNKNOWN) {
// will only reach here the first time, after instance has been instantiated
AzureMetadataService metadataService =
new AzureMetadataService(attachStatsbeat, customDimensions);
metadataService.scheduleWithFixedDelay(longIntervalSeconds);
azureMetadataService.scheduleWithFixedDelay(longIntervalSeconds);
}

featureStatsbeat.trackConfigurationOptions(config);
Expand All @@ -117,6 +116,12 @@ public void start(TelemetryClient telemetryClient, Configuration config) {
}
}

public void shutdown() {
logger.debug("Shutting down Statsbeat scheduler.");
scheduledExecutor.shutdown();
azureMetadataService.shutdown();
}

public NetworkStatsbeat getNetworkStatsbeat() {
return networkStatsbeat;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@
import com.microsoft.applicationinsights.agent.internal.httpclient.LazyHttpClient;
import com.microsoft.applicationinsights.agent.internal.httpclient.RedirectPolicy;
import com.microsoft.applicationinsights.agent.internal.localstorage.LocalFileWriter;
import com.microsoft.applicationinsights.agent.internal.statsbeat.NetworkStatsbeat;
import com.microsoft.applicationinsights.agent.internal.statsbeat.StatsbeatModule;
import io.opentelemetry.instrumentation.api.cache.Cache;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.IOException;
import java.io.StringWriter;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -91,17 +92,19 @@ private static ObjectMapper createObjectMapper() {
private final HttpPipeline pipeline;
private final URL endpointUrl;
@Nullable private final LocalFileWriter localFileWriter;
// this is null for the statsbeat channel
@Nullable private final NetworkStatsbeat networkStatsbeat;
private final StatsbeatModule statsbeatModule;
private final boolean isStatsbeat;

public static TelemetryChannel create(
URL endpointUrl,
LocalFileWriter localFileWriter,
Cache<String, String> ikeyEndpointMap,
@Nullable NetworkStatsbeat networkStatsbeat,
StatsbeatModule statsbeatModule,
boolean isStatsbeat,
@Nullable Configuration.AadAuthentication aadAuthentication) {
HttpPipeline httpPipeline = LazyHttpClient.newHttpPipeLine(aadAuthentication, ikeyEndpointMap);
return new TelemetryChannel(httpPipeline, endpointUrl, localFileWriter, networkStatsbeat);
return new TelemetryChannel(
httpPipeline, endpointUrl, localFileWriter, statsbeatModule, isStatsbeat);
}

public CompletableResultCode sendRawBytes(ByteBuffer buffer, String instrumentationKey) {
Expand All @@ -113,11 +116,13 @@ public TelemetryChannel(
HttpPipeline pipeline,
URL endpointUrl,
LocalFileWriter localFileWriter,
@Nullable NetworkStatsbeat networkStatsbeat) {
StatsbeatModule statsbeatModule,
boolean isStatsbeat) {
this.pipeline = pipeline;
this.endpointUrl = endpointUrl;
this.localFileWriter = localFileWriter;
this.networkStatsbeat = networkStatsbeat;
this.statsbeatModule = statsbeatModule;
this.isStatsbeat = isStatsbeat;
}

public CompletableResultCode send(List<TelemetryItem> telemetryItems) {
Expand Down Expand Up @@ -233,13 +238,16 @@ private CompletableResultCode internalSend(
parseResponseCode(
response.getStatusCode(), instrumentationKey, byteBuffers, persisted);
LazyHttpClient.consumeResponseBody(response);
// networkStatsbeat is null when it's sending a Statsbeat request.
if (networkStatsbeat != null) {
if (!isStatsbeat) {
if (response.getStatusCode() == 200) {
networkStatsbeat.incrementRequestSuccessCount(
System.currentTimeMillis() - startTime, instrumentationKey);
statsbeatModule
.getNetworkStatsbeat()
.incrementRequestSuccessCount(
System.currentTimeMillis() - startTime, instrumentationKey);
} else {
networkStatsbeat.incrementRequestFailureCount(instrumentationKey);
statsbeatModule
.getNetworkStatsbeat()
.incrementRequestFailureCount(instrumentationKey);
}
}
if (!persisted) {
Expand All @@ -253,21 +261,32 @@ private CompletableResultCode internalSend(
}
},
error -> {
if (!NetworkFriendlyExceptions.logSpecialOneTimeFriendlyException(
error, endpointUrl.toString(), friendlyExceptionThrown, logger)) {
operationLogger.recordFailure(
"Error sending telemetry items: " + error.getMessage(), error);
}
// AMPLS
if (isStatsbeat && error instanceof UnknownHostException) {
// when sending a Statsbeat request and server returns an UnknownHostException, it's
// likely that
// it's using a virtual network. In that case, we use the kill-switch to turn off
// Statsbeat.
statsbeatModule.shutdown();
} else {
if (!NetworkFriendlyExceptions.logSpecialOneTimeFriendlyException(
error, endpointUrl.toString(), friendlyExceptionThrown, logger)) {
operationLogger.recordFailure(
"Error sending telemetry items: " + error.getMessage(), error);
}

// networkStatsbeat is null when it's sending a Statsbeat request.
if (networkStatsbeat != null) {
networkStatsbeat.incrementRequestFailureCount(instrumentationKey);
}
// no need to write to disk again when failing to send raw bytes from the persisted
// file
if (!persisted) {
writeToDiskOnFailure(byteBuffers, instrumentationKey);
if (!isStatsbeat) {
statsbeatModule
.getNetworkStatsbeat()
.incrementRequestFailureCount(instrumentationKey);
}
// no need to write to disk again when failing to send raw bytes from the persisted
// file
if (!persisted) {
writeToDiskOnFailure(byteBuffers, instrumentationKey);
}
}

if (!persisted) {
// persisted byte buffers don't come from the pool so shouldn't go back to the pool
byteBufferPool.offer(byteBuffers);
Expand Down Expand Up @@ -303,10 +322,9 @@ private void parseResponseCode(
case 439: // Breeze-specific: THROTTLED OVER EXTENDED TIME
// TODO handle throttling
// TODO (heya) track throttling count via Statsbeat
// networkStatsbeat is null when it's sending a Statsbeat request.
// instrumentationKey is null when sending persisted file's raw bytes.
if (networkStatsbeat != null) {
networkStatsbeat.incrementThrottlingCount(instrumentationKey);
if (!isStatsbeat) {
statsbeatModule.getNetworkStatsbeat().incrementThrottlingCount(instrumentationKey);
}
break;
case 200: // SUCCESS
Expand All @@ -318,10 +336,9 @@ private void parseResponseCode(
case 0: // client-side exception
// TODO exponential backoff and retry to a limit
// TODO (heya) track failure count via Statsbeat
// networkStatsbeat is null when it's sending a Statsbeat request.
// instrumentationKey is null when sending persisted file's raw bytes.
if (networkStatsbeat != null) {
networkStatsbeat.incrementRetryCount(instrumentationKey);
if (!isStatsbeat) {
statsbeatModule.getNetworkStatsbeat().incrementRetryCount(instrumentationKey);
}
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ public BatchSpanProcessor getChannelBatcher() {
endpointProvider.getIngestionEndpointUrl(),
localFileWriter,
ikeyEndpointMap,
statsbeatModule.getNetworkStatsbeat(),
statsbeatModule,
false,
aadAuthentication);

if (!readOnlyFileSystem) {
Expand Down Expand Up @@ -254,7 +255,8 @@ public BatchSpanProcessor getStatsbeatChannelBatcher() {
endpointProvider.getStatsbeatEndpointUrl(),
localFileWriter,
ikeyEndpointMap,
null,
statsbeatModule,
true,
null);

if (!readOnlyFileSystem) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public void setup() throws Exception {
pipelineBuilder.build(),
new URL("http://foo.bar"),
new LocalFileWriter(localFileCache, tempFolder, null),
null);
null,
false);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microsoft.applicationinsights.agent.internal.MockHttpResponse;
import com.microsoft.applicationinsights.agent.internal.statsbeat.NetworkStatsbeat;
import com.microsoft.applicationinsights.agent.internal.statsbeat.StatsbeatModule;
import com.microsoft.applicationinsights.agent.internal.telemetry.TelemetryChannel;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.ByteArrayInputStream;
Expand All @@ -54,6 +56,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
import reactor.core.publisher.Mono;

public class LocalFileLoaderTests {
Expand Down Expand Up @@ -246,9 +249,16 @@ public void testDeleteFilePermanentlyOnSuccess() throws Exception {
LocalFileWriter localFileWriter = new LocalFileWriter(localFileCache, tempFolder, null);
LocalFileLoader localFileLoader = new LocalFileLoader(localFileCache, tempFolder, null);

StatsbeatModule mockedStatsbeatModule = Mockito.mock(StatsbeatModule.class);
when(mockedStatsbeatModule.getNetworkStatsbeat())
.thenReturn(Mockito.mock(NetworkStatsbeat.class));
TelemetryChannel telemetryChannel =
new TelemetryChannel(
pipelineBuilder.build(), new URL("http://foo.bar"), localFileWriter, null);
pipelineBuilder.build(),
new URL("http://foo.bar"),
localFileWriter,
mockedStatsbeatModule,
false);

// persist 10 files to disk
for (int i = 0; i < 10; i++) {
Expand Down Expand Up @@ -298,7 +308,11 @@ public void testDeleteFilePermanentlyOnFailure() throws Exception {

TelemetryChannel telemetryChannel =
new TelemetryChannel(
pipelineBuilder.build(), new URL("http://foo.bar"), localFileWriter, null);
pipelineBuilder.build(),
new URL("http://foo.bar"),
localFileWriter,
Mockito.mock(StatsbeatModule.class),
false);

// persist 10 files to disk
for (int i = 0; i < 10; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package com.microsoft.applicationinsights.agent.internal.telemetry;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;

import com.azure.core.http.HttpClient;
import com.azure.core.http.HttpHeaders;
Expand All @@ -36,6 +37,8 @@
import com.microsoft.applicationinsights.agent.internal.httpclient.RedirectPolicy;
import com.microsoft.applicationinsights.agent.internal.localstorage.LocalFileCache;
import com.microsoft.applicationinsights.agent.internal.localstorage.LocalFileWriter;
import com.microsoft.applicationinsights.agent.internal.statsbeat.NetworkStatsbeat;
import com.microsoft.applicationinsights.agent.internal.statsbeat.StatsbeatModule;
import io.opentelemetry.instrumentation.api.cache.Cache;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.ByteArrayInputStream;
Expand All @@ -57,6 +60,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -78,11 +82,14 @@ private TelemetryChannel getTelemetryChannel() throws MalformedURLException {
.policies(policies.toArray(new HttpPipelinePolicy[0]))
.httpClient(recordingHttpClient);
LocalFileCache localFileCache = new LocalFileCache(tempFolder);
StatsbeatModule mockedStatsModule = Mockito.mock(StatsbeatModule.class);
when(mockedStatsModule.getNetworkStatsbeat()).thenReturn(Mockito.mock(NetworkStatsbeat.class));
return new TelemetryChannel(
pipelineBuilder.build(),
new URL(END_POINT_URL),
new LocalFileWriter(localFileCache, tempFolder, null),
null);
mockedStatsModule,
false);
}

@Nullable
Expand Down

0 comments on commit 7e90a14

Please sign in to comment.