Skip to content

Commit

Permalink
Refactor function registry for multi-stage engine
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed Jul 9, 2024
1 parent fb9f295 commit 8b0f391
Show file tree
Hide file tree
Showing 57 changed files with 1,405 additions and 2,005 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.pinot.broker.querylog.QueryLogger;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.calcite.jdbc.CalciteSchemaBuilder;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.http.MultiHttpRequest;
Expand Down Expand Up @@ -84,7 +83,6 @@
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.util.GapfillUtils;
import org.apache.pinot.query.catalog.PinotCatalog;
import org.apache.pinot.query.parser.utils.ParserUtils;
import org.apache.pinot.spi.auth.AuthorizationResult;
import org.apache.pinot.spi.config.table.FieldConfig;
Expand Down Expand Up @@ -264,8 +262,7 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
// Check if the query is a v2 supported query
Map<String, String> queryOptions = sqlNodeAndOptions.getOptions();
String database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions, httpHeaders);
if (ParserUtils.canCompileQueryUsingV2Engine(query, CalciteSchemaBuilder.asRootSchema(
new PinotCatalog(database, _tableCache), database))) {
if (ParserUtils.canCompileWithMultiStageEngine(query, database, _tableCache)) {
return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, new Exception(
"It seems that the query is only supported by the multi-stage query engine, please retry the query using "
+ "the multi-stage query engine "
Expand Down Expand Up @@ -398,8 +395,7 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
if (StringUtils.isNotBlank(failureMessage)) {
failureMessage = "Reason: " + failureMessage;
}
throw new WebApplicationException("Permission denied." + failureMessage,
Response.Status.FORBIDDEN);
throw new WebApplicationException("Permission denied." + failureMessage, Response.Status.FORBIDDEN);
}

// Get the tables hit by the request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.pinot.broker.querylog.QueryLogger;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.calcite.jdbc.CalciteSchemaBuilder;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerMeter;
Expand All @@ -65,8 +64,6 @@
import org.apache.pinot.query.runtime.MultiStageStatsTreeBuilder;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.query.type.TypeFactory;
import org.apache.pinot.query.type.TypeSystem;
import org.apache.pinot.spi.auth.TableAuthorizationResult;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.DatabaseConflictException;
Expand All @@ -85,6 +82,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {

private final WorkerManager _workerManager;
private final QueryDispatcher _queryDispatcher;
private final PinotCatalog _catalog;

public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache) {
Expand All @@ -93,6 +91,7 @@ public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId
int port = Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
_workerManager = new WorkerManager(hostname, port, _routingManager);
_queryDispatcher = new QueryDispatcher(new MailboxService(hostname, port, config));
_catalog = new PinotCatalog(tableCache);
LOGGER.info("Initialized MultiStageBrokerRequestHandler on host: {}, port: {} with broker id: {}, timeout: {}ms, "
+ "query log max length: {}, query log max rate: {}", hostname, port, _brokerId, _brokerTimeoutMs,
_queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit());
Expand Down Expand Up @@ -134,9 +133,7 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
Long timeoutMsFromQueryOption = QueryOptionsUtils.getTimeoutMs(queryOptions);
queryTimeoutMs = timeoutMsFromQueryOption != null ? timeoutMsFromQueryOption : _brokerTimeoutMs;
String database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions, httpHeaders);
QueryEnvironment queryEnvironment = new QueryEnvironment(new TypeFactory(new TypeSystem()),
CalciteSchemaBuilder.asRootSchema(new PinotCatalog(database, _tableCache), database), _workerManager,
_tableCache);
QueryEnvironment queryEnvironment = new QueryEnvironment(database, _tableCache, _workerManager);
switch (sqlNodeAndOptions.getSqlNode().getKind()) {
case EXPLAIN:
queryPlanResult = queryEnvironment.explainQuery(query, sqlNodeAndOptions, requestId);
Expand Down
4 changes: 4 additions & 0 deletions pinot-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-babel</artifactId>
</dependency>
<dependency>
<groupId>org.immutables</groupId>
<artifactId>value-annotations</artifactId>
</dependency>

<!-- Jersey Libraries -->
<dependency>
Expand Down
Loading

0 comments on commit 8b0f391

Please sign in to comment.