Skip to content

Commit

Permalink
Simplify iterator stream management: The iterator stream-read thread …
Browse files Browse the repository at this point in the history
…is the only one to close the row channel. Also, it will cancel the GRPC stream as necessary. Closes #74 (#75)
  • Loading branch information
kaidaguerre committed Jul 6, 2021
1 parent a6385a6 commit 65140f3
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 148 deletions.
1 change: 1 addition & 0 deletions fdw.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ func goFdwReScanForeignScan(node *C.ForeignScanState) {

//export goFdwEndForeignScan
func goFdwEndForeignScan(node *C.ForeignScanState) {

s := GetExecState(node.fdw_state)
pluginHub, _ := hub.GetHub()
if s != nil && pluginHub != nil {
Expand Down
6 changes: 1 addition & 5 deletions hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func (h *Hub) startScanForConnection(connectionName string, table string, qualMa
}

// cache not enabled - create a scan iterator
log.Printf("[TRACE] startScanForConnection creating a new scan iterator")
queryContext := proto.NewQueryContext(columns, qualMap, limit)
iterator := newScanIterator(h, connection, table, qualMap, columns, limit)

Expand Down Expand Up @@ -369,11 +370,6 @@ func (h *Hub) startScan(iterator *scanIterator, queryContext *proto.QueryContext
log.Printf("[INFO] StartScan\n table: %s", table)
c := iterator.connection

// if a scanIterator is in progress, fail
if iterator.inProgress() {
return fmt.Errorf("cannot start scanIterator while existing scanIterator is incomplete - cancel first")
}

req := &proto.ExecuteRequest{
Table: table,
QueryContext: queryContext,
Expand Down
268 changes: 125 additions & 143 deletions hub/scan_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"log"
"sync"
"time"

"github.com/golang/protobuf/ptypes"
Expand All @@ -30,22 +29,22 @@ const (
)

type scanIterator struct {
status queryStatus
err error
rows chan *proto.Row
columns []string
limit int64
stream proto.WrapperPlugin_ExecuteClient
rel *types.Relation
qualMap map[string]*proto.Quals
hub *Hub
cachedRows *cache.QueryResult
cacheEnabled bool
cacheTTL time.Duration
table string
connection *steampipeconfig.ConnectionPlugin
readLock sync.Mutex
cancel context.CancelFunc
status queryStatus
err error
rows chan *proto.Row
columns []string
limit int64
pluginRowStream proto.WrapperPlugin_ExecuteClient
rel *types.Relation
qualMap map[string]*proto.Quals
hub *Hub
cachedRows *cache.QueryResult
cacheEnabled bool
cacheTTL time.Duration
table string
connection *steampipeconfig.ConnectionPlugin
cancel context.CancelFunc
cancelChan chan bool
}

func newScanIterator(hub *Hub, connection *steampipeconfig.ConnectionPlugin, table string, qualMap map[string]*proto.Quals, columns []string, limit int64) *scanIterator {
Expand All @@ -64,9 +63,12 @@ func newScanIterator(hub *Hub, connection *steampipeconfig.ConnectionPlugin, tab
cacheTTL: cacheTTL,
table: table,
connection: connection,
// buffer the cancel channel as otherwise we never seem to select from it
cancelChan: make(chan bool, 1),
}
}

// access functions
func (i *scanIterator) ConnectionName() string {
return i.connection.ConnectionName
}
Expand All @@ -82,12 +84,10 @@ func (i *scanIterator) Error() error {
// Next implements Iterator
// return the next row. Nil row means there are no more rows to scan.
func (i *scanIterator) Next() (map[string]interface{}, error) {

// check the iterator state - has an error occurred
if i.status == QueryStatusError {
return nil, i.err
}

logging.LogTime("[hub] Next start")

if !i.CanIterate() {
Expand All @@ -105,13 +105,14 @@ func (i *scanIterator) Next() (map[string]interface{}, error) {

// if iterator is in error, return the error
if i.Status() == QueryStatusError {
i.onError()
// return error
return nil, i.err
}
// otherwise mark iterator complete, caching result
i.onComplete(true)
i.status = QueryStatusComplete
i.writeToCache()
} else {
// so we got a row
var err error
res, err = i.populateRow(row)
if err != nil {
Expand All @@ -127,6 +128,49 @@ func (i *scanIterator) Next() (map[string]interface{}, error) {
return res, nil
}

func (i *scanIterator) Start(stream proto.WrapperPlugin_ExecuteClient, cancel context.CancelFunc) {
logging.LogTime("[hub] start")
if i.status != QueryStatusReady {
panic("attempting to start iterator which is still in progress")
}
i.status = QueryStatusStarted
i.pluginRowStream = stream
i.cancel = cancel

// read the results - this will loop until it hits an error or the stream is closed
go i.readThread()
}

func (i *scanIterator) Close(writeToCache bool) {
log.Println("[TRACE] scanIterator Close")

// ping the cancel channel - if there is an active read thread, it will cancel the GRPC stream if needed
log.Printf("[TRACE] signalling cancel channel")
i.cancelChan <- true

log.Printf("[TRACE] stream cancelled")
if writeToCache {
i.writeToCache()
}
// set status to complete
if i.status != QueryStatusError {
i.status = QueryStatusComplete
}

}

// CanIterate returns true if this iterator has results available to iterate
func (i *scanIterator) CanIterate() bool {
switch i.status {
case QueryStatusError, QueryStatusReady, QueryStatusComplete:
// scan iterator must be explicitly started - so we cannot iterate is in ready state
return false
default:
return true
}

}

func (i *scanIterator) populateRow(row *proto.Row) (map[string]interface{}, error) {
res := make(map[string]interface{}, len(row.Columns))
for columnName, column := range row.Columns {
Expand Down Expand Up @@ -165,143 +209,93 @@ func (i *scanIterator) populateRow(row *proto.Row) (map[string]interface{}, erro
return res, nil
}

func (i *scanIterator) Start(stream proto.WrapperPlugin_ExecuteClient, cancel context.CancelFunc) {
logging.LogTime("[hub] start")
if i.status != QueryStatusReady {
panic("attempting to start iterator which is still in progress")
}
i.status = QueryStatusStarted
i.stream = stream
i.cancel = cancel

// read the results - this will loop until it hits an error or the stream is closed
go i.readResults()
}

func (i *scanIterator) Close(writeToCache bool) {

// lock readlock so the stream read process does not try to read from the nil stream
i.readLock.Lock()
defer i.readLock.Unlock()

log.Println("[TRACE] scanIterator Close")

// if there is an active stream, cancel it
if i.stream != nil {
// close our GRPC stream from the plugin
log.Printf("[TRACE] there is a stream - calling cancel")

i.stream.CloseSend()
i.cancel()
close(i.rows)
// clear iterator state, cache results (if enabled)
i.onComplete(writeToCache)
}
}

// read results from plugin stream, saving results in 'rows'.
func (i *scanIterator) readResults() {
log.Printf("[TRACE] readResults - read results from plugin stream, saving results in 'rows'\n")
// readThread is run in a goroutine and reads results from the GRPC stream until either:
// - the stream is complete
// - there stream returns an error
// there is a signal on the cancel channel
func (i *scanIterator) readThread() {
log.Println("[TRACE] iterator readThread - read results from GRPC stream")
if i.status != QueryStatusStarted {
panic(fmt.Sprintf("attempting to read scan results but no iteration is in progress - iterator status %v", i.status))
}

for i.readResult() {
time.Sleep(10 * time.Millisecond)
}

log.Println("[TRACE] iterator readThread complete")
// we are done
close(i.rows)

}

// read a single result from the GRPC stream. Return true if there are more results to read
func (i *scanIterator) readResult() bool {
var row *proto.Row
continueReading := true

// lock read lock to ensure the stream is not closed from under us by a call to close()
i.readLock.Lock()
defer i.readLock.Unlock()
rcvChan := make(chan *proto.ExecuteResponse)
errChan := make(chan error)
go func() {
rowResult, err := i.pluginRowStream.Recv()
if err != nil {
errChan <- err
return
}

// if iterator has been closed, stream will be nil
if i.stream == nil {
log.Printf("[TRACE] scanIterator readResultstream is nil, it must have been closed")
// stop reading
return false
}
rcvChan <- rowResult
}()

rowResult, err := i.stream.Recv()
if err != nil {
// set error, shut the grpc stream and row channel
return i.onReceiveError(err)
}

if rowResult == nil {
log.Printf("[WARN] nil row received - ending grpc stream read thread\n")
// stop reading
continueReading := true
select {
// check for cancellation first - this takes precedence over reading the grpc stream
case <-i.cancelChan:
log.Printf("[TRACE] readResult received signal on cancelChan")
i.cancel()
continueReading = false
case rowResult := <-rcvChan:
if rowResult == nil {
log.Printf("[TRACE] readResult nil row received - stop reading")
// stop reading
continueReading = false
} else {
// so we have a row
row = rowResult.Row
}
// send row (which may be nil)
i.rows <- row
case err := <-errChan:
if err.Error() == "EOF" {
log.Printf("[TRACE] readResult EOF error received - stop reading")
} else {
log.Printf("[WARN] stream receive error %v\n", err)
i.setError(err)
}
continueReading = false
} else {
// so we have a row
row = rowResult.Row
}

// send row (which may be nil)
i.rows <- row

// continue reading
return continueReading
}

func (i *scanIterator) onReceiveError(err error) bool {
if err.Error() != "EOF" {
log.Printf("[WARN] stream receive error %v\n", err)
}
i.setError(err)
// clear stream - we will not need it any more an dthis
i.stream = nil
// close the row channel
close(i.rows)
// stop reading
return false
}

// scanIterator state methods
func (i *scanIterator) inProgress() bool {
return i.status == QueryStatusStarted
}

func (i *scanIterator) failed() bool {
return i.status == QueryStatusError
log.Printf("[TRACE] readResult returning continueReading=%v", continueReading)
return continueReading
}

// called when all the data has been read from the stream - complete status to QueryStatusReady, and clear stream and error
func (i *scanIterator) onComplete(writeToCache bool) {
log.Printf("[WARN] onComplete %s, writeToCache %v", i.ConnectionName(), writeToCache)
// lock readlock so the stream read process does not try to read from the nil stream
i.readLock.Lock()
defer i.readLock.Unlock()
// called when all the data has been read from the stream
func (i *scanIterator) writeToCache() {
log.Printf("[TRACE] writeToCache %s", i.ConnectionName())

if i.status == QueryStatusReady {
if i.cacheEnabled {
log.Printf("[TRACE] caching disabled - returning")
// nothing to do
return
}

// TODO do we need complete status or can I reset to read
i.status = QueryStatusComplete
i.stream = nil
i.err = nil
// write the data to the cache
if i.cacheEnabled && writeToCache {
res := i.hub.queryCache.Set(i.connection, i.table, i.qualMap, i.columns, i.limit, i.cachedRows, i.cacheTTL)
log.Println("[INFO] scan complete")
if res {
log.Printf("[INFO] adding %d rows to cache", len(i.cachedRows.Rows))
} else {
log.Printf("[WARN] failed to add %d rows to cache", len(i.cachedRows.Rows))
}
res := i.hub.queryCache.Set(i.connection, i.table, i.qualMap, i.columns, i.limit, i.cachedRows, i.cacheTTL)

if res {
log.Printf("[INFO] adding %d rows to cache", len(i.cachedRows.Rows))
} else {
log.Printf("[WARN] failed to add %d rows to cache", len(i.cachedRows.Rows))
}
}

func (i *scanIterator) onError() {
// clear the stream so any subsequent calls to Close do not try to cancel the stream
i.stream = nil
log.Printf("[WARN] writeToCache returning")
}

// if there is an error other than EOF, save error and set state to QueryStatusError
Expand All @@ -311,15 +305,3 @@ func (i *scanIterator) setError(err error) {
i.err = err
}
}

// CanIterate :: return true if this iterator has results available to iterate
func (i *scanIterator) CanIterate() bool {
switch i.status {
case QueryStatusError, QueryStatusReady, QueryStatusComplete:
// scan iterator must be explicitly started - so we cannot iterate is in ready state
return false
default:
return true
}

}

0 comments on commit 65140f3

Please sign in to comment.