Skip to content

Commit

Permalink
Add support for memoization to resource group state endpoint
Browse files Browse the repository at this point in the history
Adding memoization support to /v1/resourceGroupState endpoint. This will help reduce load on the coordinator to
calculate the resource group state for concurrent/frequent requests in a short period of time.
  • Loading branch information
swapsmagic committed May 21, 2024
1 parent ddf6f39 commit dbf6c37
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.resourcemanager.ResourceManagerProxy;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import io.airlift.units.Duration;

import javax.annotation.security.RolesAllowed;
import javax.inject.Inject;
Expand All @@ -42,16 +45,23 @@
import java.net.URI;
import java.net.URLDecoder;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;

import static com.facebook.presto.server.security.RoleType.ADMIN;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Suppliers.memoizeWithExpiration;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.net.HttpHeaders.X_FORWARDED_PROTO;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
Expand All @@ -60,10 +70,62 @@
@RolesAllowed(ADMIN)
public class ResourceGroupStateInfoResource
{
private static class ResourceGroupStateInfoKey
{
private final ResourceGroupId resourceGroupId;
private final boolean includeQueryInfo;
private final boolean summarizeSubGroups;
private final boolean includeStaticSubgroupsOnly;

public ResourceGroupStateInfoKey(ResourceGroupId resourceGroupId, boolean includeQueryInfo, boolean summarizeSubGroups, boolean includeStaticSubgroupsOnly)
{
this.resourceGroupId = requireNonNull(resourceGroupId, "resourceGroupId is null");
this.includeQueryInfo = includeQueryInfo;
this.summarizeSubGroups = summarizeSubGroups;
this.includeStaticSubgroupsOnly = includeStaticSubgroupsOnly;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ResourceGroupStateInfoKey that = (ResourceGroupStateInfoKey) o;
return Objects.equals(that.resourceGroupId, resourceGroupId) &&
that.includeQueryInfo == includeQueryInfo &&
that.summarizeSubGroups == summarizeSubGroups &&
that.includeStaticSubgroupsOnly == includeStaticSubgroupsOnly;
}

@Override
public int hashCode()
{
return Objects.hash(resourceGroupId, includeQueryInfo, summarizeSubGroups, includeStaticSubgroupsOnly);
}

@Override
public String toString()
{
return toStringHelper(this)
.add("resourceGroupId", resourceGroupId)
.add("includeQueryInfo", includeQueryInfo)
.add("summarizeSubGroups", summarizeSubGroups)
.add("includeStaticSubgroupsOnly", includeStaticSubgroupsOnly)
.toString();
}
}

private final ResourceGroupManager<?> resourceGroupManager;
private final boolean resourceManagerEnabled;
private final InternalNodeManager internalNodeManager;
private final Optional<ResourceManagerProxy> proxyHelper;
private final Map<ResourceGroupStateInfoKey, Supplier<ResourceGroupInfo>> resourceGroupStateInfoKeySupplierMap;
private final Supplier<List<ResourceGroupInfo>> rootResourceGroupInfoSupplier;
private final Duration expirationDuration;

@Inject
public ResourceGroupStateInfoResource(
Expand All @@ -76,6 +138,11 @@ public ResourceGroupStateInfoResource(
this.resourceGroupManager = requireNonNull(resourceGroupManager, "resourceGroupManager is null");
this.internalNodeManager = requireNonNull(internalNodeManager, "internalNodeManager is null");
this.proxyHelper = requireNonNull(proxyHelper, "proxyHelper is null");
this.resourceGroupStateInfoKeySupplierMap = new HashMap<>();
this.expirationDuration = requireNonNull(serverConfig, "serverConfig is null").getClusterResourceGroupStateInfoExpirationDuration();
this.rootResourceGroupInfoSupplier = expirationDuration.getValue() > 0 ?
memoizeWithExpiration(() -> resourceGroupManager.getRootResourceGroups(), expirationDuration.toMillis(), MILLISECONDS) :
() -> resourceGroupManager.getRootResourceGroups();
}

@GET
Expand All @@ -100,27 +167,44 @@ public void getResourceGroupInfos(
try {
if (isNullOrEmpty(resourceGroupIdString)) {
// return root groups if no group id is specified
asyncResponse.resume(Response.ok().entity(resourceGroupManager.getRootResourceGroups()).build());
return;
asyncResponse.resume(Response.ok().entity(rootResourceGroupInfoSupplier.get()).build());
}
else {
asyncResponse.resume(Response.ok().entity(resourceGroupManager.getResourceGroupInfo(
new ResourceGroupId(
Arrays.stream(resourceGroupIdString.split("/"))
.map(ResourceGroupStateInfoResource::urlDecode)
.collect(toImmutableList())),
includeQueryInfo,
summarizeSubgroups,
includeStaticSubgroupsOnly)).build());
return;
ResourceGroupId resourceGroupId = getResourceGroupId(resourceGroupIdString);

ResourceGroupStateInfoKey resourceGroupStateInfoKey = new ResourceGroupStateInfoKey(resourceGroupId, includeQueryInfo, summarizeSubgroups, includeStaticSubgroupsOnly);

Supplier<ResourceGroupInfo> resourceGroupInfoSupplier = resourceGroupStateInfoKeySupplierMap.getOrDefault(resourceGroupStateInfoKey, expirationDuration.getValue() > 0 ?
Suppliers.memoizeWithExpiration(() -> getResourceGroupInfo(resourceGroupId, includeQueryInfo, summarizeSubgroups, includeStaticSubgroupsOnly), expirationDuration.toMillis(), MILLISECONDS) :
Suppliers.ofInstance(getResourceGroupInfo(resourceGroupId, includeQueryInfo, summarizeSubgroups, includeStaticSubgroupsOnly)));

resourceGroupStateInfoKeySupplierMap.putIfAbsent(resourceGroupStateInfoKey, resourceGroupInfoSupplier);

asyncResponse.resume(Response.ok().entity(resourceGroupInfoSupplier.get()).build());
}
}
catch (NoSuchElementException | IllegalArgumentException e) {
asyncResponse.resume(Response.status(NOT_FOUND).build());
return;
}
}

private ResourceGroupInfo getResourceGroupInfo(ResourceGroupId resourceGroupId, boolean includeQueryInfo, boolean summarizeSubgroups, boolean includeStaticSubgroupsOnly)
{
return resourceGroupManager.getResourceGroupInfo(
resourceGroupId,
includeQueryInfo,
summarizeSubgroups,
includeStaticSubgroupsOnly);
}

private ResourceGroupId getResourceGroupId(String resourceGroupIdString)
{
return new ResourceGroupId(
Arrays.stream(resourceGroupIdString.split("/"))
.map(ResourceGroupStateInfoResource::urlDecode)
.collect(toImmutableList()));
}

private static String urlDecode(String value)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class ServerConfig
private NodePoolType poolType = DEFAULT;
private Duration clusterStatsExpirationDuration = new Duration(0, MILLISECONDS);
private boolean nestedDataSerializationEnabled = true;
private Duration clusterResourceGroupStateInfoExpirationDuration = new Duration(0, MILLISECONDS);

public boolean isResourceManager()
{
Expand Down Expand Up @@ -214,4 +215,16 @@ public ServerConfig setNestedDataSerializationEnabled(boolean nestedDataSerializ
this.nestedDataSerializationEnabled = nestedDataSerializationEnabled;
return this;
}

public Duration getClusterResourceGroupStateInfoExpirationDuration()
{
return clusterResourceGroupStateInfoExpirationDuration;
}

@Config("cluster-resource-group-state-info-expiration-duration")
public ServerConfig setClusterResourceGroupStateInfoExpirationDuration(Duration clusterResourceGroupStateInfoExpirationDuration)
{
this.clusterResourceGroupStateInfoExpirationDuration = clusterResourceGroupStateInfoExpirationDuration;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public void testDefaults()
.setCatalogServerEnabled(false)
.setPoolType(DEFAULT)
.setClusterStatsExpirationDuration(new Duration(0, MILLISECONDS))
.setNestedDataSerializationEnabled(true));
.setNestedDataSerializationEnabled(true)
.setClusterResourceGroupStateInfoExpirationDuration(new Duration(0, MILLISECONDS)));
}

@Test
Expand All @@ -68,6 +69,7 @@ public void testExplicitPropertyMappings()
.put("pool-type", "LEAF")
.put("cluster-stats-expiration-duration", "10s")
.put("nested-data-serialization-enabled", "false")
.put("cluster-resource-group-state-info-expiration-duration", "10s")
.build();

ServerConfig expected = new ServerConfig()
Expand All @@ -84,7 +86,8 @@ public void testExplicitPropertyMappings()
.setCatalogServerEnabled(true)
.setPoolType(LEAF)
.setClusterStatsExpirationDuration(new Duration(10, SECONDS))
.setNestedDataSerializationEnabled(false);
.setNestedDataSerializationEnabled(false)
.setClusterResourceGroupStateInfoExpirationDuration(new Duration(10, SECONDS));

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.server;

import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.Request;
import com.facebook.airlift.http.client.jetty.JettyHttpClient;
import com.facebook.presto.resourceGroups.FileResourceGroupConfigurationManagerFactory;
import com.facebook.presto.server.testing.TestingPrestoServer;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static com.facebook.airlift.http.client.JsonResponseHandler.createJsonResponseHandler;
import static com.facebook.airlift.http.client.Request.Builder.prepareGet;
import static com.facebook.airlift.json.JsonCodec.jsonCodec;
import static com.facebook.airlift.testing.Closeables.closeQuietly;
import static com.facebook.presto.client.PrestoHeaders.PRESTO_USER;
import static com.facebook.presto.tests.tpch.TpchQueryRunner.createQueryRunner;
import static com.facebook.presto.utils.QueryExecutionClientUtil.runToExecuting;
import static com.facebook.presto.utils.ResourceUtils.getResourceFilePath;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

@Test
public class TestResourceGroupStateInfoResource
{
private HttpClient client;
private TestingPrestoServer server;

@BeforeClass
public void setup()
throws Exception
{
client = new JettyHttpClient();
DistributedQueryRunner runner = createQueryRunner(ImmutableMap.of("query.client.timeout", "20s", "cluster-resource-group-state-info-expiration-duration", "20s"));
server = runner.getCoordinator();
server.getResourceGroupManager().get().addConfigurationManagerFactory(new FileResourceGroupConfigurationManagerFactory());
server.getResourceGroupManager().get()
.forceSetConfigurationManager("file", ImmutableMap.of("resource-groups.config-file", getResourceFilePath("resource_groups_config_simple.json")));
}

@AfterClass(alwaysRun = true)
public void teardown()
{
closeQuietly(server);
closeQuietly(client);
server = null;
client = null;
}

@Test
public void testResourceGroupStateInfo()
{
runToExecuting(client, server, "SELECT * from tpch.sf101.orders");

ResourceGroupInfo resourceGroupInfo = getGlobalResourceGroupStateInfo(false);

assertNotNull(resourceGroupInfo);
assertEquals(resourceGroupInfo.getNumRunningQueries(), 1);

runToExecuting(client, server, "SELECT * from tpch.sf101.orders");
resourceGroupInfo = getGlobalResourceGroupStateInfo(false);

assertNotNull(resourceGroupInfo);
//Result will be served from cache so running queries count should remain 1
assertEquals(resourceGroupInfo.getNumRunningQueries(), 1);
}

private ResourceGroupInfo getGlobalResourceGroupStateInfo(boolean followRedirects)
{
Request.Builder builder = prepareGet();
Request request = builder
.setHeader(PRESTO_USER, "user")
.setUri(uriBuilderFrom(server.getBaseUrl().resolve("/v1/resourceGroupState/global")).build())
.setFollowRedirects(followRedirects)
.build();

return client.execute(request, createJsonResponseHandler(jsonCodec(ResourceGroupInfo.class)));
}
}

0 comments on commit dbf6c37

Please sign in to comment.