Skip to content

Commit

Permalink
Provide tablets metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
sylwiaszunejko committed Jul 27, 2023
1 parent 154bf48 commit ac9676d
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 5 deletions.
2 changes: 2 additions & 0 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,8 @@ func refreshTablets(r *ringDescriber) error {
r.session.ring.setTablets(tablets)
r.session.policy.SetTablets(tablets)

r.session.schemaDescriber.refreshTabletsSchema()

return nil
}

Expand Down
70 changes: 65 additions & 5 deletions metadata_scylla.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !cassandra || scylla
// +build !cassandra scylla

// Copyright (c) 2015 The gocql Authors. All rights reserved.
Expand Down Expand Up @@ -132,6 +133,29 @@ type IndexMetadata struct {
Options map[string]string
}

// TabletsMetadata holds metadata for tablet list
type TabletsMetadata struct {
Tablets []*TabletMetadata
}

// TabletMetadata holds metadata for single tablet
type TabletMetadata struct {
KeyspaceName string
TableId UUID
LastToken int64
TableName string
TabletCount int
NewReplicas []ReplicaMetadata
Replicas []ReplicaMetadata
Stage string
}

// TabletMetadata holds metadata for single replica
type ReplicaMetadata struct {
HostId UUID
ShardId int
}

const (
IndexKindCustom = "CUSTOM"
)
Expand Down Expand Up @@ -215,20 +239,22 @@ func columnKindFromSchema(kind string) (ColumnKind, error) {
}
}

// queries the cluster for schema information for a specific keyspace
// queries the cluster for schema information for a specific keyspace and for tablets
type schemaDescriber struct {
session *Session
mu sync.Mutex

cache map[string]*KeyspaceMetadata
cache map[string]*KeyspaceMetadata
tabletsCache *TabletsMetadata
}

// creates a session bound schema describer which will query and cache
// keyspace metadata
// keyspace metadata and tablets metadata
func newSchemaDescriber(session *Session) *schemaDescriber {
return &schemaDescriber{
session: session,
cache: map[string]*KeyspaceMetadata{},
session: session,
cache: map[string]*KeyspaceMetadata{},
tabletsCache: &TabletsMetadata{},
}
}

Expand All @@ -252,6 +278,40 @@ func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, err
return metadata, nil
}

func (s *schemaDescriber) getTabletsSchema() *TabletsMetadata {
s.mu.Lock()
defer s.mu.Unlock()

metadata := s.tabletsCache

return metadata
}

func (s *schemaDescriber) refreshTabletsSchema() {
tablets := s.session.getTablets()
s.tabletsCache.Tablets = []*TabletMetadata{}

for _, tablet := range tablets {
t := &TabletMetadata{}
t.KeyspaceName = tablet.KeyspaceName()
t.TableId = tablet.TableId()
t.TableName = tablet.TableName()
t.Stage = tablet.Stage()
t.LastToken = tablet.LastToken()
t.TabletCount = tablet.TabletCount()
t.Replicas = []ReplicaMetadata{}
for _, replica := range tablet.Replicas() {
t.Replicas = append(t.Replicas, ReplicaMetadata{replica.hostId, replica.shardId})
}
t.NewReplicas = []ReplicaMetadata{}
for _, replica := range tablet.NewReplicas() {
t.NewReplicas = append(t.NewReplicas, ReplicaMetadata{replica.hostId, replica.shardId})
}

s.tabletsCache.Tablets = append(s.tabletsCache.Tablets, t)
}
}

// clears the already cached keyspace metadata
func (s *schemaDescriber) clearSchema(keyspaceName string) {
s.mu.Lock()
Expand Down
13 changes: 13 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,18 @@ func (s *Session) KeyspaceMetadata(keyspace string) (*KeyspaceMetadata, error) {
return s.schemaDescriber.getSchema(keyspace)
}

// TabletsMetadata returns the metadata about tablets
func (s *Session) TabletsMetadata() (*TabletsMetadata, error) {
// fail fast
if s.Closed() {
return nil, ErrSessionClosed
} else if !s.hostSource.UsesTablets() {
return nil, ErrTabletsNotUsed
}

return s.schemaDescriber.getTabletsSchema(), nil
}

func (s *Session) getConn() *Conn {
hosts := s.ring.allHosts()
for _, host := range hosts {
Expand Down Expand Up @@ -2370,6 +2382,7 @@ var (
ErrNoKeyspace = errors.New("no keyspace provided")
ErrKeyspaceDoesNotExist = errors.New("keyspace does not exist")
ErrNoMetadata = errors.New("no metadata available")
ErrTabletsNotUsed = errors.New("tablets not used")
)

type ErrProtocol struct{ error }
Expand Down

0 comments on commit ac9676d

Please sign in to comment.