Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include stats api to provide upload metrics #64

Merged
merged 10 commits into from
May 25, 2022
8 changes: 4 additions & 4 deletions .idea/copyright/SPDX_ALv2.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
public class IndexManager {
public static final String FIELD_TYPE_KEY = "type";
public static final String MAPPING_PROPERTIES_KEY = "properties";
public static final String DOCUMENT_TYPE = "_doc";
private static final Logger LOGGER = LogManager.getLogger(IndexManager.class);
private final IndicesAdminClient client;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

public class UploadGeoJSONAction extends ActionType<UploadGeoJSONResponse> {

public static UploadGeoJSONAction INSTANCE = new UploadGeoJSONAction();
public static final UploadGeoJSONAction INSTANCE = new UploadGeoJSONAction();
public static final String NAME = "cluster:admin/upload_geojson_action";

private UploadGeoJSONAction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,44 @@

package org.opensearch.geospatial.plugin;

import static java.util.Collections.singletonList;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.geospatial.action.upload.geojson.UploadGeoJSONAction;
import org.opensearch.geospatial.action.upload.geojson.UploadGeoJSONTransportAction;
import org.opensearch.geospatial.processor.FeatureProcessor;
import org.opensearch.geospatial.rest.action.upload.geojson.RestUploadGeoJSONAction;
import org.opensearch.geospatial.stats.upload.RestUploadStatsAction;
import org.opensearch.geospatial.stats.upload.UploadStats;
import org.opensearch.geospatial.stats.upload.UploadStatsAction;
import org.opensearch.geospatial.stats.upload.UploadStatsTransportAction;
import org.opensearch.ingest.Processor;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.IngestPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

/**
* Entry point for Geospatial features. It provides additional Processors, Actions
Expand All @@ -44,6 +57,23 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
.immutableMap();
}

@Override
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
return List.of(UploadStats.getInstance());
}

@Override
public List<RestHandler> getRestHandlers(
Settings settings,
Expand All @@ -55,11 +85,15 @@ public List<RestHandler> getRestHandlers(
Supplier<DiscoveryNodes> nodesInCluster
) {
RestUploadGeoJSONAction uploadGeoJSONAction = new RestUploadGeoJSONAction();
return singletonList(uploadGeoJSONAction);
RestUploadStatsAction statsAction = new RestUploadStatsAction();
return List.of(statsAction, uploadGeoJSONAction);
}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return singletonList(new ActionHandler<>(UploadGeoJSONAction.INSTANCE, UploadGeoJSONTransportAction.class));
return List.of(
new ActionHandler<>(UploadGeoJSONAction.INSTANCE, UploadGeoJSONTransportAction.class),
new ActionHandler<>(UploadStatsAction.INSTANCE, UploadStatsTransportAction.class)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.stats.upload;

import static org.opensearch.geospatial.shared.URLBuilder.URL_DELIMITER;
import static org.opensearch.geospatial.shared.URLBuilder.getPluginURLPrefix;
import static org.opensearch.rest.RestRequest.Method.GET;

import java.util.List;

import org.opensearch.client.node.NodeClient;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;

public class RestUploadStatsAction extends BaseRestHandler {

private static final String NAME = "upload_stats";
public static final String ACTION_OBJECT = "_upload";

public static final String ACTION_STATS = "stats";

@Override
public String getName() {
return NAME;
}

@Override
public List<Route> routes() {
String path = String.join(URL_DELIMITER, getPluginURLPrefix(), ACTION_OBJECT, ACTION_STATS);
return List.of(new Route(GET, path));
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient nodeClient) {
return channel -> nodeClient.execute(UploadStatsAction.INSTANCE, new UploadStatsRequest(), new RestToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public static UploadStats getInstance() {
* @throws IOException if cannot read {@link UploadStats} from given input
*/
public static UploadStats fromStreamInput(StreamInput input) throws IOException {
Objects.requireNonNull(input, "StreamInput cannot be null");
UploadStats instance = new UploadStats();
instance.totalAPICount.inc(input.readVLong());
instance.metrics.addAll(input.readSet(UploadMetric.UploadMetricBuilder::fromStreamInput));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.stats.upload;

import org.opensearch.action.ActionType;

public class UploadStatsAction extends ActionType<UploadStatsResponse> {

public static final UploadStatsAction INSTANCE = new UploadStatsAction();
public static final String NAME = "cluster:admin/geospatial/stats";

public UploadStatsAction() {
super(NAME, UploadStatsResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.stats.upload;

import java.io.IOException;

import org.opensearch.action.support.nodes.BaseNodeRequest;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

public class UploadStatsNodeRequest extends BaseNodeRequest {

private final UploadStatsRequest request;

public UploadStatsNodeRequest(StreamInput in) throws IOException {
super(in);
request = new UploadStatsRequest(in);
}

public UploadStatsNodeRequest(UploadStatsRequest request) {
this.request = request;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
request.writeTo(out);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.stats.upload;

import java.io.IOException;
import java.util.Objects;

import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;

public class UploadStatsNodeResponse extends BaseNodeResponse implements ToXContentObject {

private static final String UPLOADS = "uploads";
private final UploadStats uploadStats;

public UploadStatsNodeResponse(DiscoveryNode node, UploadStats uploadStats) {
super(node);
this.uploadStats = Objects.requireNonNull(uploadStats, "upload stats cannot be null");
}

public UploadStatsNodeResponse(StreamInput in) throws IOException {
super(in);
uploadStats = UploadStats.fromStreamInput(in);
}

public UploadStats getUploadStats() {
return uploadStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
uploadStats.writeTo(out);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(UPLOADS);
uploadStats.toXContent(builder, params);
return builder.endObject();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.stats.upload;

import java.io.IOException;

import org.opensearch.action.support.nodes.BaseNodesRequest;
import org.opensearch.common.io.stream.StreamInput;

public class UploadStatsRequest extends BaseNodesRequest<UploadStatsRequest> {

/**
* Empty constructor needed for UploadStatsTransportAction
*/
public UploadStatsRequest() {
super((String[]) null);
}

protected UploadStatsRequest(StreamInput in) throws IOException {
super(in);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.stats.upload;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.nodes.BaseNodesResponse;
import org.opensearch.cluster.ClusterName;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;

public class UploadStatsResponse extends BaseNodesResponse<UploadStatsNodeResponse> implements Writeable, ToXContentObject {

public UploadStatsResponse(StreamInput in) throws IOException {
super(new ClusterName(in), in.readList(UploadStatsNodeResponse::new), in.readList(FailedNodeException::new));
}

public UploadStatsResponse(ClusterName clusterName, List<UploadStatsNodeResponse> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}

@Override
protected List<UploadStatsNodeResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readList(UploadStatsNodeResponse::new);
}

@Override
protected void writeNodesTo(StreamOutput out, List<UploadStatsNodeResponse> nodeResponses) throws IOException {
out.writeList(nodeResponses);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {

final Map<String, UploadStats> nodeIDStatsMap = getNodes().stream()
.collect(Collectors.toMap(response -> response.getNode().getId(), UploadStatsNodeResponse::getUploadStats));
UploadStatsService uploadStatsService = new UploadStatsService(nodeIDStatsMap);
return uploadStatsService.toXContent(builder, params);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
UploadStatsResponse otherResponse = (UploadStatsResponse) o;
return Objects.equals(getNodes(), otherResponse.getNodes()) && Objects.equals(failures(), otherResponse.failures());
}

@Override
public int hashCode() {
return Objects.hash(getNodes(), failures());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;

// Service to calculate summary of upload stats and generate XContent for StatsResponse
// Service to calculate summary of upload stats and generate XContent for UploadStatsResponse
public class UploadStatsService implements ToXContentFragment {

public static final String UPLOADS = "uploads";
public static final String TOTAL = "total";
public static final String METRICS = "metrics";
public static final String NODE_ID = "node_id";
private final Map<String, UploadStats> uploadStats;
Expand All @@ -35,7 +33,6 @@ public UploadStatsService(Map<String, UploadStats> uploadStats) {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
/*
{
"uploads": {
"total": {
request_count : # of request,
"upload" : sum of documents to upload across API,
Expand All @@ -53,10 +50,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
"duration" : duration in milliseconds to ingest document
}, ......
]
}
}
*/
builder.startObject(UPLOADS);
builder.startObject();
totalUploadStats.toXContent(builder, params);
builder.startArray(METRICS);
if (totalUploadStats.isUploadStatsEmpty()) {
Expand Down
Loading