From 5be5bb9776f968003bb0f23f144157dffa1423c7 Mon Sep 17 00:00:00 2001 From: kaidaguerre Date: Wed, 17 Apr 2024 10:33:44 -0500 Subject: [PATCH] Add support for pushing down sort order. Closes #447 --- CHANGELOG.md | 5 ++- fdw.go | 84 +++++++++++++++++++++++++++++++++++++- fdw/common.h | 2 + fdw/fdw.c | 1 + fdw/fdw_helpers.h | 2 + fdw/query.c | 9 ++-- go.mod | 16 ++++---- go.sum | 28 ++++++------- hub/hub.go | 3 +- hub/hub_base.go | 5 +++ hub/hub_local.go | 12 +++--- hub/hub_remote.go | 41 +++++++++++++++++-- hub/scan_iterator.go | 4 +- hub/scan_iterator_base.go | 4 +- hub/scan_iterator_local.go | 4 +- 15 files changed, 173 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 51093c8f..46c44fa0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## v1.11.0 [tbd] -Add support for running plugins in-process. ([#383](https://github.com/turbot/steampipe-postgres-fdw/issues/383)) -Fixes issue where the install script fails if pg_config in not in users path. ([#404](https://github.com/turbot/steampipe-postgres-fdw/issues/404)) +* Update Steampipe timing output to show all scans for all connections. ([#439](https://github.com/turbot/steampipe-postgres-fdw/issues/439)) +* Add support for running plugins in-process. ([#383](https://github.com/turbot/steampipe-postgres-fdw/issues/383)) +* Fixes issue where the install script fails if pg_config in not in users path. ([#404](https://github.com/turbot/steampipe-postgres-fdw/issues/404)) ## v1.10.0 [2024-03-04] _Whats new_ diff --git a/fdw.go b/fdw.go index 96382605..5efa22ec 100644 --- a/fdw.go +++ b/fdw.go @@ -4,9 +4,13 @@ package main #cgo linux LDFLAGS: -Wl,-unresolved-symbols=ignore-all #cgo darwin LDFLAGS: -Wl,-undefined,dynamic_lookup #include "fdw_helpers.h" + #include "utils/rel.h" #include "nodes/pg_list.h" #include "utils/timestamp.h" + +static Name deserializeDeparsedSortListCell(ListCell *lc); + */ import "C" @@ -18,6 +22,7 @@ import ( "unsafe" "github.com/hashicorp/go-hclog" + "github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto" "github.com/turbot/steampipe-plugin-sdk/v5/logging" "github.com/turbot/steampipe-plugin-sdk/v5/sperr" "github.com/turbot/steampipe-postgres-fdw/hub" @@ -65,6 +70,63 @@ func init() { } +//export goLog +func goLog(msg *C.char) { + log.Println("[WARN] " + C.GoString(msg)) +} + +// Given a list of FdwDeparsedSortGroup and a FdwPlanState, +// construct a list FdwDeparsedSortGroup that can be pushed down +// +//export goFdwCanSort +func goFdwCanSort(deparsed *C.List, planstate *C.FdwPlanState) *C.List { + log.Println("[WARN] goFdwCanSort deparsed", deparsed) + // This will be the list of FdwDeparsedSortGroup items that can be pushed down + var pushDownList *C.List = nil + + // Iterate over the deparsed list + if deparsed == nil { + return pushDownList + } + + // Convert the sortable fields into a lookup + sortableFields := getSortableFields(planstate.foreigntableid) + if len(sortableFields) == 0 { + return pushDownList + } + + for it := C.list_head(deparsed); it != nil; it = C.lnext(deparsed, it) { + deparsedSortGroup := C.cellGetFdwDeparsedSortGroup(it) + columnName := C.GoString(C.nameStr(deparsedSortGroup.attname)) + + supportedOrder := sortableFields[columnName] + requiredOrder := proto.SortOrder_Asc + if deparsedSortGroup.reversed { + requiredOrder = proto.SortOrder_Desc + } + log.Println("[INFO] goFdwCanSort column", columnName, "supportedOrder", supportedOrder, "requiredOrder", requiredOrder) + + if supportedOrder == requiredOrder || supportedOrder == proto.SortOrder_All { + log.Printf("[INFO] goFdwCanSort column %s can be pushed down", columnName) + // add deparsedSortGroup to pushDownList + pushDownList = C.lappend(pushDownList, unsafe.Pointer(deparsedSortGroup)) + } else { + log.Printf("[INFO] goFdwCanSort column %s CANNOT be pushed down", columnName) + } + } + + return pushDownList +} + +func getSortableFields(foreigntableid C.Oid) map[string]proto.SortOrder { + opts := GetFTableOptions(types.Oid(foreigntableid)) + connection := GetSchemaNameFromForeignTableId(types.Oid(foreigntableid)) + + tableName := opts["table"] + pluginHub := hub.GetHub() + return pluginHub.GetSortableFields(tableName, connection) +} + //export goFdwGetRelSize func goFdwGetRelSize(state *C.FdwPlanState, root *C.PlannerInfo, rows *C.double, width *C.int, baserel *C.RelOptInfo) { logging.ClearProfileData() @@ -242,8 +304,9 @@ func goFdwBeginForeignScan(node *C.ForeignScanState, eflags C.int) { } // if we are NOT explaining, create an iterator to scan for us if !explain { + var sortOrder = getSortColumns(execState) ts := int64(C.GetSQLCurrentTimestamp(0)) - iter, err := pluginHub.GetIterator(columns, quals, unhandledRestrictions, int64(execState.limit), opts, ts) + iter, err := pluginHub.GetIterator(columns, quals, unhandledRestrictions, int64(execState.limit), sortOrder, ts, opts) if err != nil { log.Printf("[WARN] pluginHub.GetIterator FAILED: %s", err) FdwError(err) @@ -258,6 +321,25 @@ func goFdwBeginForeignScan(node *C.ForeignScanState, eflags C.int) { logging.LogTime("[fdw] BeginForeignScan end") } +func getSortColumns(state *C.FdwExecState) []*proto.SortColumn { + sortGroups := state.pathkeys + var res []*proto.SortColumn + for it := C.list_head(sortGroups); it != nil; it = C.lnext(sortGroups, it) { + deparsedSortGroup := C.cellGetFdwDeparsedSortGroup(it) + columnName := C.GoString(C.nameStr(deparsedSortGroup.attname)) + requiredOrder := proto.SortOrder_Asc + if deparsedSortGroup.reversed { + requiredOrder = proto.SortOrder_Desc + } + + res = append(res, &proto.SortColumn{ + Column: columnName, + Order: requiredOrder, + }) + } + return res +} + //export goFdwIterateForeignScan func goFdwIterateForeignScan(node *C.ForeignScanState) *C.TupleTableSlot { defer func() { diff --git a/fdw/common.h b/fdw/common.h index d39dc768..ce1b7b56 100644 --- a/fdw/common.h +++ b/fdw/common.h @@ -89,6 +89,8 @@ typedef struct FdwDeparsedSortGroup PathKey *key; } FdwDeparsedSortGroup; +static inline FdwDeparsedSortGroup *cellGetFdwDeparsedSortGroup(ListCell *n) { return (FdwDeparsedSortGroup *)n->ptr_value; } + // datum.c char *datumString(Datum datum, ConversionInfo *cinfo); int64 datumInt16(Datum datum, ConversionInfo *cinfo); diff --git a/fdw/fdw.c b/fdw/fdw.c index 4c2724e0..49913711 100644 --- a/fdw/fdw.c +++ b/fdw/fdw.c @@ -207,6 +207,7 @@ static void fdwGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid forei NULL, NULL)); + /* Handle sort pushdown */ if (root->query_pathkeys) { diff --git a/fdw/fdw_helpers.h b/fdw/fdw_helpers.h index 91ae072b..1333d8ec 100644 --- a/fdw/fdw_helpers.h +++ b/fdw/fdw_helpers.h @@ -101,6 +101,8 @@ static inline BooleanTest *cellGetBooleanTest(ListCell *n) { return (BooleanTest static inline BoolExpr *cellGetBoolExpr(ListCell *n) { return (BoolExpr *)n->ptr_value; } static inline RestrictInfo *cellGetRestrictInfo(ListCell *n) { return (RestrictInfo *)n->ptr_value; } +static inline char *nameStr(Name n) { return NameStr(*n); } + // logging char *tagTypeToString(NodeTag type); \ No newline at end of file diff --git a/fdw/query.c b/fdw/query.c index 637784c8..8d1462b8 100644 --- a/fdw/query.c +++ b/fdw/query.c @@ -408,12 +408,11 @@ void computeDeparsedSortGroup(List *deparsed, FdwPlanState *planstate, Assert(*deparsed_pathkeys == NIL); /* Don't ask FDW if nothing to sort */ - if (deparsed == NIL) + if (deparsed == NIL){ return; + } - // TODO - Fdw doesn't support this yet - // sortable_fields = canSort(planstate, deparsed); - sortable_fields = NIL; + sortable_fields = goFdwCanSort(deparsed,planstate); /* Don't go further if FDW can't enforce any sort */ if (sortable_fields == NIL) @@ -546,7 +545,7 @@ findPaths(PlannerInfo *root, RelOptInfo *baserel, List *possiblePaths, /* * Deparse a list of PathKey and return a list of FdwDeparsedSortGroup. - * This function will return data iif all the PathKey belong to the current + * This function will return data if all the PathKey belong to the current * foreign table. */ List * diff --git a/go.mod b/go.mod index 2b894f7f..5d7aaed8 100644 --- a/go.mod +++ b/go.mod @@ -10,16 +10,16 @@ require ( github.com/hashicorp/go-hclog v1.6.3 github.com/hashicorp/go-version v1.6.0 // indirect github.com/turbot/go-kit v0.10.0-rc.0 - //main (join_timing) - github.com/turbot/steampipe v1.7.0-rc.0.0.20240417091217-07782a2b1346 - github.com/turbot/steampipe-plugin-sdk/v5 v5.10.0 - go.opentelemetry.io/otel v1.24.0 + //main + github.com/turbot/steampipe v1.7.0-rc.0.0.20240417105518-bd915693c2ea + github.com/turbot/steampipe-plugin-sdk/v5 v5.11.0-alpha.0 + go.opentelemetry.io/otel v1.25.0 google.golang.org/protobuf v1.33.0 ) require ( github.com/Masterminds/semver/v3 v3.2.1 - go.opentelemetry.io/otel/metric v1.24.0 + go.opentelemetry.io/otel/metric v1.25.0 ) require ( @@ -168,9 +168,9 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.24.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0 // indirect - go.opentelemetry.io/otel/sdk v1.24.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.24.0 // indirect - go.opentelemetry.io/otel/trace v1.24.0 // indirect + go.opentelemetry.io/otel/sdk v1.25.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.25.0 // indirect + go.opentelemetry.io/otel/trace v1.25.0 // indirect go.opentelemetry.io/proto/otlp v1.1.0 // indirect golang.org/x/crypto v0.21.0 // indirect golang.org/x/exp v0.0.0-20231006140011-7918f672742d diff --git a/go.sum b/go.sum index 9a70ae69..dabd3d5c 100644 --- a/go.sum +++ b/go.sum @@ -709,10 +709,10 @@ github.com/turbot/go-kit v0.10.0-rc.0 h1:kd+jp2ibbIV33Hc8SsMAN410Dl9Pz6SJ40axbKU github.com/turbot/go-kit v0.10.0-rc.0/go.mod h1:fFQqR59I5z5JeeBLfK1PjSifn4Oprs3NiQx0CxeSJxs= github.com/turbot/pipe-fittings v1.1.1 h1:W1F/O3tWaR2W9HTnFskJS5sLHpZXwOTbhTtDorIw744= github.com/turbot/pipe-fittings v1.1.1/go.mod h1:Cgy232VEhVjtDibJS8v5Zf4lKQnfZJOBtsuUNZ7MzTc= -github.com/turbot/steampipe v1.7.0-rc.0.0.20240417091217-07782a2b1346 h1:MxpsEaeo+9BlEbkC14hIFexI2eo7lY++xhEQpqLz4SI= -github.com/turbot/steampipe v1.7.0-rc.0.0.20240417091217-07782a2b1346/go.mod h1:If2VvE6PYUVhP6HDenst19E//vyniNLCxafj8OHFH5E= -github.com/turbot/steampipe-plugin-sdk/v5 v5.10.0 h1:2g2rPRzY3N5+94yjUW2jnbFm9DGGsUx3d77tjqgDT4M= -github.com/turbot/steampipe-plugin-sdk/v5 v5.10.0/go.mod h1:DJ9gPbPzmCe4M2sp+KzCmOhFuucl5/6hXnXvFvS/9nQ= +github.com/turbot/steampipe v1.7.0-rc.0.0.20240417105518-bd915693c2ea h1:wb2sAxZ+haYdUSM5wJuYRGPMr0wC2vu1voHyH8CfLHo= +github.com/turbot/steampipe v1.7.0-rc.0.0.20240417105518-bd915693c2ea/go.mod h1:If2VvE6PYUVhP6HDenst19E//vyniNLCxafj8OHFH5E= +github.com/turbot/steampipe-plugin-sdk/v5 v5.11.0-alpha.0 h1:lABwRQmtTFgsHxzzeA5PSEmtu3JVqfSo2JBZsQ6NLGw= +github.com/turbot/steampipe-plugin-sdk/v5 v5.11.0-alpha.0/go.mod h1:a6f869uShOJDiU4p6fHnSGE9xVTJF1ZyCGf6k0CR31Q= github.com/turbot/terraform-components v0.0.0-20231213122222-1f3526cab7a7 h1:qDMxFVd8Zo0rIhnEBdCIbR+T6WgjwkxpFZMN8zZmmjg= github.com/turbot/terraform-components v0.0.0-20231213122222-1f3526cab7a7/go.mod h1:5hzpfalEjfcJWp9yq75/EZoEu2Mzm34eJAPm3HOW2tw= github.com/ulikunitz/xz v0.5.10 h1:t92gobL9l3HE202wg3rlk19F6X+JOxl9BBrCCMYEYd8= @@ -746,22 +746,22 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.4 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0/go.mod h1:r9vWsPS/3AQItv3OSlEJ/E4mbrhUbbw18meOjArPtKQ= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 h1:sv9kVfal0MK0wBMCOGr+HeJm9v803BkJxGrk2au7j08= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0/go.mod h1:SK2UL73Zy1quvRPonmOmRDiWk1KBV3LyIeeIxcEApWw= -go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= -go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= +go.opentelemetry.io/otel v1.25.0 h1:gldB5FfhRl7OJQbUHt/8s0a7cE8fbsPAtdpRaApKy4k= +go.opentelemetry.io/otel v1.25.0/go.mod h1:Wa2ds5NOXEMkCmUou1WA7ZBfLTHWIsp034OVD7AO+Vg= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.24.0 h1:f2jriWfOdldanBwS9jNBdeOKAQN7b4ugAMaNu1/1k9g= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.24.0/go.mod h1:B+bcQI1yTY+N0vqMpoZbEN7+XU4tNM0DmUiOwebFJWI= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 h1:t6wl9SPayj+c7lEIFgm4ooDBZVb01IhLB4InpomhRw8= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0/go.mod h1:iSDOcsnSA5INXzZtwaBPrKp/lWu/V14Dd+llD0oI2EA= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0 h1:Mw5xcxMwlqoJd97vwPxA8isEaIoxsta9/Q51+TTJLGE= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0/go.mod h1:CQNu9bj7o7mC6U7+CA/schKEYakYXWr79ucDHTMGhCM= -go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= -go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= -go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= -go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= -go.opentelemetry.io/otel/sdk/metric v1.24.0 h1:yyMQrPzF+k88/DbH7o4FMAs80puqd+9osbiBrJrz/w8= -go.opentelemetry.io/otel/sdk/metric v1.24.0/go.mod h1:I6Y5FjH6rvEnTTAYQz3Mmv2kl6Ek5IIrmwTLqMrrOE0= -go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= -go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= +go.opentelemetry.io/otel/metric v1.25.0 h1:LUKbS7ArpFL/I2jJHdJcqMGxkRdxpPHE0VU/D4NuEwA= +go.opentelemetry.io/otel/metric v1.25.0/go.mod h1:rkDLUSd2lC5lq2dFNrX9LGAbINP5B7WBkC78RXCpH5s= +go.opentelemetry.io/otel/sdk v1.25.0 h1:PDryEJPC8YJZQSyLY5eqLeafHtG+X7FWnf3aXMtxbqo= +go.opentelemetry.io/otel/sdk v1.25.0/go.mod h1:oFgzCM2zdsxKzz6zwpTZYLLQsFwc+K0daArPdIhuxkw= +go.opentelemetry.io/otel/sdk/metric v1.25.0 h1:7CiHOy08LbrxMAp4vWpbiPcklunUshVpAvGBrdDRlGw= +go.opentelemetry.io/otel/sdk/metric v1.25.0/go.mod h1:LzwoKptdbBBdYfvtGCzGwk6GWMA3aUzBOwtQpR6Nz7o= +go.opentelemetry.io/otel/trace v1.25.0 h1:tqukZGLwQYRIFtSQM2u2+yfMVTgGVeqRLPUYx1Dq6RM= +go.opentelemetry.io/otel/trace v1.25.0/go.mod h1:hCCs70XM/ljO+BeQkyFnbK28SBIJ/Emuha+ccrCRT7I= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.opentelemetry.io/proto/otlp v1.1.0 h1:2Di21piLrCqJ3U3eXGCTPHE9R8Nh+0uglSnOyxikMeI= go.opentelemetry.io/proto/otlp v1.1.0/go.mod h1:GpBHCBWiqvVLDqmHZsoMM3C5ySeKTC7ej/RNTae6MdY= diff --git a/hub/hub.go b/hub/hub.go index 9317768c..8f39fbd4 100644 --- a/hub/hub.go +++ b/hub/hub.go @@ -12,7 +12,7 @@ type Hub interface { GetConnectionConfigByName(string) *proto.ConnectionConfig LoadConnectionConfig() (bool, error) GetSchema(remoteSchema string, localSchema string) (*proto.Schema, error) - GetIterator(columns []string, quals *proto.Quals, unhandledRestrictions int, limit int64, opts types.Options, queryTimestamp int64) (Iterator, error) + GetIterator(columns []string, quals *proto.Quals, unhandledRestrictions int, limit int64, sortOrder []*proto.SortColumn, queryTimestamp int64, opts types.Options) (Iterator, error) GetRelSize(columns []string, quals []*proto.Qual, opts types.Options) (types.RelSize, error) GetPathKeys(opts types.Options) ([]types.PathKey, error) Explain(columns []string, quals []*proto.Qual, sortKeys []string, verbose bool, opts types.Options) ([]string, error) @@ -30,4 +30,5 @@ type Hub interface { ValidateCacheCommand(command string) error cacheTTL(name string) time.Duration cacheEnabled(name string) bool + GetSortableFields(table, connection string) map[string]proto.SortOrder } diff --git a/hub/hub_base.go b/hub/hub_base.go index d81008c0..c286f3e4 100644 --- a/hub/hub_base.go +++ b/hub/hub_base.go @@ -543,3 +543,8 @@ func (h *hubBase) cacheTTL(connectionName string) time.Duration { log.Printf("[INFO] cacheTTL 5") return ttl } + +// GetSortableFields +func (h *hubBase) GetSortableFields(tableName, connectionName string) map[string]proto.SortOrder { + return nil +} diff --git a/hub/hub_local.go b/hub/hub_local.go index e437eaa8..c49341de 100644 --- a/hub/hub_local.go +++ b/hub/hub_local.go @@ -108,7 +108,7 @@ func (l *HubLocal) GetSchema(_, connectionName string) (*proto.Schema, error) { return res.GetSchema(), nil } -func (l *HubLocal) GetIterator(columns []string, quals *proto.Quals, unhandledRestrictions int, limit int64, opts types.Options, queryTimestamp int64) (Iterator, error) { +func (l *HubLocal) GetIterator(columns []string, quals *proto.Quals, unhandledRestrictions int, limit int64, sortOrder []*proto.SortColumn, queryTimestamp int64, opts types.Options) (Iterator, error) { logging.LogTime("GetIterator start") qualMap, err := buildQualMap(quals) connectionName := opts["connection"] @@ -121,7 +121,7 @@ func (l *HubLocal) GetIterator(columns []string, quals *proto.Quals, unhandledRe // create a span for this scan scanTraceCtx := l.traceContextForScan(table, columns, limit, qualMap, connectionName) - iterator, err := l.startScanForConnection(connectionName, table, qualMap, unhandledRestrictions, columns, limit, scanTraceCtx) + iterator, err := l.startScanForConnection(connectionName, table, qualMap, unhandledRestrictions, columns, limit, sortOrder, queryTimestamp, scanTraceCtx) if err != nil { log.Printf("[TRACE] RemoteHub GetIterator() failed :( %s", err) @@ -144,8 +144,8 @@ func (l *HubLocal) GetPathKeys(opts types.Options) ([]types.PathKey, error) { } -func (h *HubLocal) GetConnectionConfigByName(name string) *proto.ConnectionConfig { - return h.connections[name] +func (l *HubLocal) GetConnectionConfigByName(name string) *proto.ConnectionConfig { + return l.connections[name] } func (l *HubLocal) ProcessImportForeignSchemaOptions(opts types.Options, connection string) error { @@ -165,7 +165,7 @@ func (l *HubLocal) ProcessImportForeignSchemaOptions(opts types.Options, connect } // startScanForConnection starts a scan for a single connection, using a scanIterator or a legacyScanIterator -func (l *HubLocal) startScanForConnection(connectionName string, table string, qualMap map[string]*proto.Quals, unhandledRestrictions int, columns []string, limit int64, scanTraceCtx *telemetry.TraceCtx) (_ Iterator, err error) { +func (l *HubLocal) startScanForConnection(connectionName string, table string, qualMap map[string]*proto.Quals, unhandledRestrictions int, columns []string, limit int64, sortOrder []*proto.SortColumn, queryTimestamp int64, scanTraceCtx *telemetry.TraceCtx) (_ Iterator, err error) { defer func() { if err != nil { // close the span in case of errir @@ -197,7 +197,7 @@ func (l *HubLocal) startScanForConnection(connectionName string, table string, q } log.Printf("[TRACE] startScanForConnection creating a new scan iterator") - iterator := newScanIteratorLocal(l, connectionName, table, l.pluginName, connectionLimitMap, qualMap, columns, limit, scanTraceCtx) + iterator := newScanIteratorLocal(l, connectionName, table, l.pluginName, connectionLimitMap, qualMap, columns, limit, sortOrder, queryTimestamp, scanTraceCtx) return iterator, nil } diff --git a/hub/hub_remote.go b/hub/hub_remote.go index ae8e6da6..01cd6cd4 100644 --- a/hub/hub_remote.go +++ b/hub/hub_remote.go @@ -90,7 +90,7 @@ func (h *RemoteHub) GetSchema(remoteSchema string, localSchema string) (*proto.S } // GetIterator creates and returns an iterator -func (h *RemoteHub) GetIterator(columns []string, quals *proto.Quals, unhandledRestrictions int, limit int64, opts types.Options, queryTimestamp int64) (Iterator, error) { +func (h *RemoteHub) GetIterator(columns []string, quals *proto.Quals, unhandledRestrictions int, limit int64, sortOrder []*proto.SortColumn, queryTimestamp int64, opts types.Options) (Iterator, error) { logging.LogTime("GetIterator start") qualMap, err := buildQualMap(quals) connectionName := opts["connection"] @@ -103,7 +103,7 @@ func (h *RemoteHub) GetIterator(columns []string, quals *proto.Quals, unhandledR // create a span for this scan scanTraceCtx := h.traceContextForScan(table, columns, limit, qualMap, connectionName) - iterator, err := h.startScanForConnection(connectionName, table, qualMap, unhandledRestrictions, columns, limit, scanTraceCtx, queryTimestamp) + iterator, err := h.startScanForConnection(connectionName, table, qualMap, unhandledRestrictions, columns, limit, sortOrder, queryTimestamp, scanTraceCtx) if err != nil { log.Printf("[TRACE] RemoteHub GetIterator() failed :( %s", err) @@ -156,7 +156,7 @@ func (h *RemoteHub) GetPathKeys(opts types.Options) ([]types.PathKey, error) { //// internal implementation //// // startScanForConnection starts a scan for a single connection, using a scanIterator or a legacyScanIterator -func (h *RemoteHub) startScanForConnection(connectionName string, table string, qualMap map[string]*proto.Quals, unhandledRestrictions int, columns []string, limit int64, scanTraceCtx *telemetry.TraceCtx, queryTimestamp int64) (_ Iterator, err error) { +func (h *RemoteHub) startScanForConnection(connectionName string, table string, qualMap map[string]*proto.Quals, unhandledRestrictions int, columns []string, limit int64, sortOrder []*proto.SortColumn, queryTimestamp int64, scanTraceCtx *telemetry.TraceCtx) (_ Iterator, err error) { defer func() { if err != nil { // close the span in case of errir @@ -205,7 +205,7 @@ func (h *RemoteHub) startScanForConnection(connectionName string, table string, } log.Printf("[TRACE] startScanForConnection creating a new scan iterator") - iterator := newScanIterator(h, connectionPlugin, connectionName, table, connectionLimitMap, qualMap, columns, limit, scanTraceCtx, queryTimestamp) + iterator := newScanIterator(h, connectionPlugin, connectionName, table, connectionLimitMap, qualMap, columns, limit, sortOrder, queryTimestamp, scanTraceCtx) return iterator, nil } @@ -344,3 +344,36 @@ func (h *RemoteHub) getServerCacheEnabled() bool { return res } + +// GetSortableFields returns a slice of fields which are defined as sortable bythe plugin schema, +// as well as the sort order(s) supported +func (h *RemoteHub) GetSortableFields(tableName, connectionName string) map[string]proto.SortOrder { + connectionPlugin, err := h.getConnectionPlugin(connectionName) + if err != nil { + log.Printf("[WARN] GetSortableFields getConnectionPlugin failed for connection %s: %s", connectionName, err.Error()) + return nil + } + + schema, err := connectionPlugin.GetSchema(connectionName) + if err != nil { + log.Printf("[WARN] GetSortableFields GetSchema failed for connection %s: %s", connectionName, err.Error()) + return nil + } + + tableSchema, ok := schema.Schema[tableName] + if !ok { + log.Printf("[WARN] GetSortableFields table schema not found for connection %s, table %s", connectionName, tableName) + return nil + } + + // build map of sortable fields + var sortableFields = make(map[string]proto.SortOrder) + for _, column := range tableSchema.Columns { + sortableFields[column.Name] = column.SortOrder + } + + if len(sortableFields) > 0 { + log.Printf("[INFO] GetSortableFields for connection '%s`, table `%s`: %v", connectionName, tableName, sortableFields) + } + return sortableFields +} diff --git a/hub/scan_iterator.go b/hub/scan_iterator.go index 256f465c..a81e3b7a 100644 --- a/hub/scan_iterator.go +++ b/hub/scan_iterator.go @@ -21,9 +21,9 @@ type scanIterator struct { hub *RemoteHub } -func newScanIterator(hub Hub, connectionPlugin *steampipeconfig.ConnectionPlugin, connectionName, table string, connectionLimitMap map[string]int64, qualMap map[string]*proto.Quals, columns []string, limit int64, traceCtx *telemetry.TraceCtx, queryTimestamp int64) *scanIterator { +func newScanIterator(hub Hub, connectionPlugin *steampipeconfig.ConnectionPlugin, connectionName, table string, connectionLimitMap map[string]int64, qualMap map[string]*proto.Quals, columns []string, limit int64, sortOrder []*proto.SortColumn, queryTimestamp int64, traceCtx *telemetry.TraceCtx) *scanIterator { return &scanIterator{ - scanIteratorBase: newBaseScanIterator(hub, connectionName, table, connectionLimitMap, qualMap, columns, limit, traceCtx, queryTimestamp), + scanIteratorBase: newBaseScanIterator(hub, connectionName, table, connectionLimitMap, qualMap, columns, limit, sortOrder, queryTimestamp, traceCtx), connectionPlugin: connectionPlugin, } } diff --git a/hub/scan_iterator_base.go b/hub/scan_iterator_base.go index edb26a53..cb79c26b 100644 --- a/hub/scan_iterator_base.go +++ b/hub/scan_iterator_base.go @@ -40,7 +40,7 @@ type scanIteratorBase struct { callId string } -func newBaseScanIterator(hub Hub, connectionName, table string, connectionLimitMap map[string]int64, qualMap map[string]*proto.Quals, columns []string, limit int64, traceCtx *telemetry.TraceCtx, queryTimestamp int64) scanIteratorBase { +func newBaseScanIterator(hub Hub, connectionName, table string, connectionLimitMap map[string]int64, qualMap map[string]*proto.Quals, columns []string, limit int64, sortOrder []*proto.SortColumn, queryTimestamp int64, traceCtx *telemetry.TraceCtx) scanIteratorBase { return scanIteratorBase{ status: QueryStatusReady, rows: make(chan *proto.Row, rowBufferSize), @@ -51,7 +51,7 @@ func newBaseScanIterator(hub Hub, connectionName, table string, connectionLimitM connectionLimitMap: connectionLimitMap, traceCtx: traceCtx, startTime: time.Now(), - queryContext: proto.NewQueryContext(columns, qualMap, limit), + queryContext: proto.NewQueryContext(columns, qualMap, limit, sortOrder), callId: grpc.BuildCallId(), queryTimestamp: queryTimestamp, } diff --git a/hub/scan_iterator_local.go b/hub/scan_iterator_local.go index df4d98a0..ce47e94f 100644 --- a/hub/scan_iterator_local.go +++ b/hub/scan_iterator_local.go @@ -15,9 +15,9 @@ type scanIteratorLocal struct { pluginName string } -func newScanIteratorLocal(hub Hub, connectionName, table, pluginName string, connectionLimitMap map[string]int64, qualMap map[string]*proto.Quals, columns []string, limit int64, traceCtx *telemetry.TraceCtx) *scanIteratorLocal { +func newScanIteratorLocal(hub Hub, connectionName, table, pluginName string, connectionLimitMap map[string]int64, qualMap map[string]*proto.Quals, columns []string, limit int64, sortOrder []*proto.SortColumn, queryTimestamp int64, traceCtx *telemetry.TraceCtx) *scanIteratorLocal { return &scanIteratorLocal{ - scanIteratorBase: newBaseScanIterator(hub, connectionName, table, connectionLimitMap, qualMap, columns, limit, traceCtx, 0), + scanIteratorBase: newBaseScanIterator(hub, connectionName, table, connectionLimitMap, qualMap, columns, limit, sortOrder, queryTimestamp, traceCtx), pluginName: pluginName, } }