diff --git a/cmd/swarm/db.go b/cmd/swarm/db.go index 856d3e1d8a..b3d5a7764b 100644 --- a/cmd/swarm/db.go +++ b/cmd/swarm/db.go @@ -24,7 +24,6 @@ import ( "fmt" "io" "os" - "path/filepath" "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/common" @@ -168,10 +167,6 @@ func dbImport(ctx *cli.Context) { } func openLDBStore(path string, basekey []byte) (*localstore.DB, error) { - if _, err := os.Stat(filepath.Join(path, "CURRENT")); err != nil { - return nil, fmt.Errorf("invalid chunkdb path: %s", err) - } - return localstore.New(path, basekey, nil) } diff --git a/cmd/swarm/main.go b/cmd/swarm/main.go index 1d506e6067..e0e25803f5 100644 --- a/cmd/swarm/main.go +++ b/cmd/swarm/main.go @@ -19,6 +19,7 @@ package main import ( "crypto/ecdsa" "encoding/hex" + "errors" "fmt" "io/ioutil" "net" @@ -46,6 +47,7 @@ import ( "github.com/ethersphere/swarm/internal/debug" swarmmetrics "github.com/ethersphere/swarm/metrics" "github.com/ethersphere/swarm/network" + "github.com/ethersphere/swarm/storage/localstore" "github.com/ethersphere/swarm/storage/mock" mockrpc "github.com/ethersphere/swarm/storage/mock/rpc" "github.com/ethersphere/swarm/tracing" @@ -367,7 +369,16 @@ func registerBzzService(bzzconfig *bzzapi.Config, stack *node.Node) { // create a node store for this swarm key on global store nodeStore = globalStore.NewNodeStore(common.HexToAddress(bzzconfig.BzzKey)) } - return swarm.NewSwarm(bzzconfig, nodeStore) + s, err := swarm.NewSwarm(bzzconfig, nodeStore) + if err != nil { + var e *localstore.BreakingMigrationError + if errors.As(err, &e) { + fmt.Println(e.Manual) + utils.Fatalf("Manual storage migration required.") + } + return nil, err + } + return s, nil } //register within the ethereum node if err := stack.Register(boot); err != nil { diff --git a/storage/fcds/doc.go b/storage/fcds/doc.go new file mode 100644 index 0000000000..e73cc4c091 --- /dev/null +++ b/storage/fcds/doc.go @@ -0,0 +1,43 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +// Package fcds provides storage layers for storing chunk data only. +// +// FCDS stands for Fixed Chunk Data Storage. +// +// Swarm Chunk data limited size property allows a very specific chunk storage +// solution that can be more performant than more generalized key/value +// databases. FCDS stores chunk data in files (shards) at fixed length offsets. +// Relations between chunk address, file number and offset in that file are +// managed by a separate MetaStore implementation. +// +// Package fcds contains the main implementation based on simple file operations +// for persisting chunk data and relaying on specific chunk meta information +// storage. +// +// The reference chunk meta information storage is implemented in fcds/mem +// package. It can be used in tests. +// +// LevelDB based chunk meta information storage is implemented in fcds/leveldb +// package. This implementation should be used as default in production. +// +// Additional FCDS Store implementation is in fcds/mock. It uses mock store and +// can be used for centralized chunk storage options that mock storage package +// provides. +// +// Package fcds/test contains test functions which can be used to validate +// behaviour of different FCDS or its MetaStore implementations. +package fcds diff --git a/storage/fcds/fcds.go b/storage/fcds/fcds.go new file mode 100644 index 0000000000..2e7fe2e1c9 --- /dev/null +++ b/storage/fcds/fcds.go @@ -0,0 +1,362 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package fcds + +import ( + "errors" + "fmt" + "io" + "os" + "path/filepath" + "sync" + "time" + + "github.com/ethersphere/swarm/log" + + "github.com/ethersphere/swarm/chunk" +) + +// Storer specifies methods required for FCDS implementation. +// It can be used where alternative implementations are needed to +// switch at runtime. +type Storer interface { + Get(addr chunk.Address) (ch chunk.Chunk, err error) + Has(addr chunk.Address) (yes bool, err error) + Put(ch chunk.Chunk) (err error) + Delete(addr chunk.Address) (err error) + Count() (count int, err error) + Iterate(func(ch chunk.Chunk) (stop bool, err error)) (err error) + Close() (err error) +} + +var _ Storer = new(Store) + +// Number of files that store chunk data. +const shardCount = 32 + +// ErrStoreClosed is returned if store is already closed. +var ErrStoreClosed = errors.New("closed store") + +// Store is the main FCDS implementation. It stores chunk data into +// a number of files partitioned by the last byte of the chunk address. +type Store struct { + shards []shard // relations with shard id and a shard file and their mutexes + meta MetaStore // stores chunk offsets + free []bool // which shards have free offsets + freeMu sync.RWMutex // protects free field + freeCache *offsetCache // optional cache of free offset values + wg sync.WaitGroup // blocks Close until all other method calls are done + maxChunkSize int // maximal chunk data size + quit chan struct{} // quit disables all operations after Close is called + quitOnce sync.Once // protects quit channel from multiple Close calls +} + +// Option is an optional argument passed to New. +type Option func(*Store) + +// WithCache is an optional argument to New constructor that enables +// in memory cache of free chunk data positions in files +func WithCache(yes bool) Option { + return func(s *Store) { + if yes { + s.freeCache = newOffsetCache(shardCount) + } else { + s.freeCache = nil + } + } +} + +// New constructs a new Store with files at path, with specified max chunk size. +func New(path string, maxChunkSize int, metaStore MetaStore, opts ...Option) (s *Store, err error) { + s = &Store{ + shards: make([]shard, shardCount), + meta: metaStore, + free: make([]bool, shardCount), + maxChunkSize: maxChunkSize, + quit: make(chan struct{}), + } + for _, o := range opts { + o(s) + } + if err := os.MkdirAll(path, 0777); err != nil { + return nil, err + } + for i := byte(0); i < shardCount; i++ { + s.shards[i].f, err = os.OpenFile(filepath.Join(path, fmt.Sprintf("chunks-%v.db", i)), os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + return nil, err + } + s.shards[i].mu = new(sync.Mutex) + } + return s, nil +} + +// Get returns a chunk with data. +func (s *Store) Get(addr chunk.Address) (ch chunk.Chunk, err error) { + if err := s.protect(); err != nil { + return nil, err + } + defer s.unprotect() + + m, err := s.getMeta(addr) + if err != nil { + return nil, err + } + + sh := s.shards[getShard(addr)] + sh.mu.Lock() + defer sh.mu.Unlock() + + data := make([]byte, m.Size) + n, err := sh.f.ReadAt(data, m.Offset) + if err != nil && err != io.EOF { + return nil, err + } + if n != int(m.Size) { + return nil, fmt.Errorf("incomplete chunk data, read %v of %v", n, m.Size) + } + return chunk.NewChunk(addr, data), nil +} + +// Has returns true if chunk is stored. +func (s *Store) Has(addr chunk.Address) (yes bool, err error) { + if err := s.protect(); err != nil { + return false, err + } + defer s.unprotect() + + return s.meta.Has(addr) +} + +// Put stores chunk data. +func (s *Store) Put(ch chunk.Chunk) (err error) { + if err := s.protect(); err != nil { + return err + } + defer s.unprotect() + + addr := ch.Address() + data := ch.Data() + + size := len(data) + if size > s.maxChunkSize { + return fmt.Errorf("chunk data size %v exceeds %v bytes", size, s.maxChunkSize) + } + + section := make([]byte, s.maxChunkSize) + copy(section, data) + + shard := getShard(addr) + sh := s.shards[shard] + + has, err := s.meta.Has(addr) + if err != nil { + return err + } + if has { + return nil + } + + sh.mu.Lock() + defer sh.mu.Unlock() + + offset, reclaimed, err := s.getOffset(shard) + if err != nil { + return err + } + + if offset < 0 { + // no free offsets found, + // append the chunk data by + // seeking to the end of the file + offset, err = sh.f.Seek(0, io.SeekEnd) + } else { + // seek to the offset position + // to replace the chunk data at that position + _, err = sh.f.Seek(offset, io.SeekStart) + } + if err != nil { + return err + } + + if _, err = sh.f.Write(section); err != nil { + return err + } + if reclaimed && s.freeCache != nil { + s.freeCache.remove(shard, offset) + } + return s.meta.Set(addr, shard, reclaimed, &Meta{ + Size: uint16(size), + Offset: offset, + }) +} + +// getOffset returns an offset where chunk data can be written to +// and a flag if the offset is reclaimed from a previously removed chunk. +// If offset is less then 0, no free offsets are available. +func (s *Store) getOffset(shard uint8) (offset int64, reclaimed bool, err error) { + if !s.shardHasFreeOffsets(shard) { + return -1, false, nil + } + + offset = -1 + if s.freeCache != nil { + offset = s.freeCache.get(shard) + } + + if offset < 0 { + offset, err = s.meta.FreeOffset(shard) + if err != nil { + return 0, false, err + } + } + if offset < 0 { + s.markShardWithFreeOffsets(shard, false) + return -1, false, nil + } + + return offset, true, nil +} + +// Delete makes the chunk unavailable. +func (s *Store) Delete(addr chunk.Address) (err error) { + if err := s.protect(); err != nil { + return err + } + defer s.unprotect() + + shard := getShard(addr) + s.markShardWithFreeOffsets(shard, true) + + if s.freeCache != nil { + m, err := s.getMeta(addr) + if err != nil { + return err + } + s.freeCache.set(shard, m.Offset) + } + return s.meta.Remove(addr, shard) +} + +// Count returns a number of stored chunks. +func (s *Store) Count() (count int, err error) { + return s.meta.Count() +} + +// Iterate iterates over stored chunks in no particular order. +func (s *Store) Iterate(fn func(chunk.Chunk) (stop bool, err error)) (err error) { + if err := s.protect(); err != nil { + return err + } + defer s.unprotect() + + for _, sh := range s.shards { + sh.mu.Lock() + } + defer func() { + for _, sh := range s.shards { + sh.mu.Unlock() + } + }() + + return s.meta.Iterate(func(addr chunk.Address, m *Meta) (stop bool, err error) { + data := make([]byte, m.Size) + _, err = s.shards[getShard(addr)].f.ReadAt(data, m.Offset) + if err != nil { + return true, err + } + return fn(chunk.NewChunk(addr, data)) + }) +} + +// Close disables of further operations on the Store. +// Every call to its methods will return ErrStoreClosed error. +// Close will wait for all running operations to finish before +// closing its MetaStore and returning. +func (s *Store) Close() (err error) { + s.quitOnce.Do(func() { + close(s.quit) + }) + + timeout := 15 * time.Second + done := make(chan struct{}) + go func() { + s.wg.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(timeout): + log.Debug("timeout on waiting chunk store parallel operations to finish", "timeout", timeout) + } + + for _, sh := range s.shards { + if err := sh.f.Close(); err != nil { + return err + } + } + return s.meta.Close() +} + +// protect protects Store from executing operations +// after the Close method is called and makes sure +// that Close method will wait for all ongoing operations +// to finish before returning. Method unprotect done +// must be closed to unblock the Close method call. +func (s *Store) protect() (err error) { + select { + case <-s.quit: + return ErrStoreClosed + default: + } + s.wg.Add(1) + return nil +} + +// unprotect removes a protection set by the protect method +// allowing the Close method to unblock. +func (s *Store) unprotect() { + s.wg.Done() +} + +// getMeta returns Meta information from MetaStore. +func (s *Store) getMeta(addr chunk.Address) (m *Meta, err error) { + return s.meta.Get(addr) +} + +func (s *Store) markShardWithFreeOffsets(shard uint8, has bool) { + s.freeMu.Lock() + s.free[shard] = has + s.freeMu.Unlock() +} + +func (s *Store) shardHasFreeOffsets(shard uint8) (has bool) { + s.freeMu.RLock() + has = s.free[shard] + s.freeMu.RUnlock() + return has +} + +// getShard returns a shard number for the chunk address. +func getShard(addr chunk.Address) (shard uint8) { + return addr[len(addr)-1] % shardCount +} + +type shard struct { + f *os.File + mu *sync.Mutex +} diff --git a/storage/fcds/leveldb/leveldb.go b/storage/fcds/leveldb/leveldb.go new file mode 100644 index 0000000000..de5f92b7cc --- /dev/null +++ b/storage/fcds/leveldb/leveldb.go @@ -0,0 +1,187 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package leveldb + +import ( + "encoding/binary" + + "github.com/ethersphere/swarm/chunk" + "github.com/ethersphere/swarm/storage/fcds" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" +) + +var _ fcds.MetaStore = new(MetaStore) + +// MetaStore implements FCDS MetaStore with LevelDB +// for persistence. +type MetaStore struct { + db *leveldb.DB +} + +// NewMetaStore returns new MetaStore at path. +func NewMetaStore(path string) (s *MetaStore, err error) { + db, err := leveldb.OpenFile(path, &opt.Options{}) + if err != nil { + return nil, err + } + return &MetaStore{ + db: db, + }, err +} + +// Get returns chunk meta information. +func (s *MetaStore) Get(addr chunk.Address) (m *fcds.Meta, err error) { + data, err := s.db.Get(chunkKey(addr), nil) + if err != nil { + if err == leveldb.ErrNotFound { + return nil, chunk.ErrChunkNotFound + } + return nil, err + } + m = new(fcds.Meta) + if err := m.UnmarshalBinary(data); err != nil { + return nil, err + } + return m, nil +} + +// Has returns true if chunk has meta information stored. +func (s *MetaStore) Has(addr chunk.Address) (yes bool, err error) { + if _, err = s.db.Get(chunkKey(addr), nil); err != nil { + if err == leveldb.ErrNotFound { + return false, nil + } + return false, err + } + return true, nil +} + +// Set adds a new chunk meta information for a shard. +// Reclaimed flag denotes that the chunk is at the place of +// already deleted chunk, not appended to the end of the file. +func (s *MetaStore) Set(addr chunk.Address, shard uint8, reclaimed bool, m *fcds.Meta) (err error) { + batch := new(leveldb.Batch) + if reclaimed { + batch.Delete(freeKey(shard, m.Offset)) + } + meta, err := m.MarshalBinary() + if err != nil { + return err + } + batch.Put(chunkKey(addr), meta) + return s.db.Write(batch, nil) +} + +// Remove removes chunk meta information from the shard. +func (s *MetaStore) Remove(addr chunk.Address, shard uint8) (err error) { + m, err := s.Get(addr) + if err != nil { + return err + } + batch := new(leveldb.Batch) + batch.Put(freeKey(shard, m.Offset), nil) + batch.Delete(chunkKey(addr)) + return s.db.Write(batch, nil) +} + +// FreeOffset returns an offset that can be reclaimed by +// another chunk. If the returned value is less then 0 +// there are no free offset at this shard. +func (s *MetaStore) FreeOffset(shard uint8) (offset int64, err error) { + i := s.db.NewIterator(nil, nil) + defer i.Release() + + i.Seek([]byte{freePrefix, shard}) + key := i.Key() + if key == nil || key[0] != freePrefix || key[1] != shard { + return -1, nil + } + offset = int64(binary.BigEndian.Uint64(key[2:10])) + return offset, nil +} + +// Count returns a number of chunks in MetaStore. +// This operation is slow for larger numbers of chunks. +func (s *MetaStore) Count() (count int, err error) { + it := s.db.NewIterator(nil, nil) + defer it.Release() + + for ok := it.First(); ok; ok = it.Next() { + value := it.Value() + if len(value) == 0 { + continue + } + key := it.Key() + if len(key) < 1 { + continue + } + count++ + } + return count, it.Error() +} + +// Iterate iterates over all chunk meta information. +func (s *MetaStore) Iterate(fn func(chunk.Address, *fcds.Meta) (stop bool, err error)) (err error) { + it := s.db.NewIterator(nil, nil) + defer it.Release() + + for ok := it.First(); ok; ok = it.Next() { + value := it.Value() + if len(value) == 0 { + continue + } + key := it.Key() + if len(key) < 1 { + continue + } + m := new(fcds.Meta) + if err := m.UnmarshalBinary(value); err != nil { + return err + } + stop, err := fn(chunk.Address(key[1:]), m) + if err != nil { + return err + } + if stop { + return nil + } + } + return it.Error() +} + +// Close closes the underlaying LevelDB instance. +func (s *MetaStore) Close() (err error) { + return s.db.Close() +} + +const ( + chunkPrefix = 0 + freePrefix = 1 +) + +func chunkKey(addr chunk.Address) (key []byte) { + return append([]byte{chunkPrefix}, addr...) +} + +func freeKey(shard uint8, offset int64) (key []byte) { + key = make([]byte, 10) + key[0] = freePrefix + key[1] = shard + binary.BigEndian.PutUint64(key[2:10], uint64(offset)) + return key +} diff --git a/storage/fcds/leveldb/leveldb_test.go b/storage/fcds/leveldb/leveldb_test.go new file mode 100644 index 0000000000..c960c2df61 --- /dev/null +++ b/storage/fcds/leveldb/leveldb_test.go @@ -0,0 +1,45 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package leveldb_test + +import ( + "io/ioutil" + "path/filepath" + "testing" + + "github.com/ethersphere/swarm/storage/fcds" + "github.com/ethersphere/swarm/storage/fcds/leveldb" + "github.com/ethersphere/swarm/storage/fcds/test" +) + +// TestFCDS runs a standard series of tests on main Store implementation +// with LevelDB meta store. +func TestFCDS(t *testing.T) { + test.RunAll(t, func(t *testing.T) (fcds.Storer, func()) { + path, err := ioutil.TempDir("", "swarm-fcds-") + if err != nil { + t.Fatal(err) + } + + metaStore, err := leveldb.NewMetaStore(filepath.Join(path, "meta")) + if err != nil { + t.Fatal(err) + } + + return test.NewFCDSStore(t, path, metaStore) + }) +} diff --git a/storage/fcds/mem/mem.go b/storage/fcds/mem/mem.go new file mode 100644 index 0000000000..3f38c570f8 --- /dev/null +++ b/storage/fcds/mem/mem.go @@ -0,0 +1,135 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package mem + +import ( + "sync" + + "github.com/ethersphere/swarm/chunk" + "github.com/ethersphere/swarm/storage/fcds" +) + +var _ fcds.MetaStore = new(MetaStore) + +// MetaStore is the simplest in-memory implementation of FCDS MetaStore. +// It is meant to be used as the reference implementation. +type MetaStore struct { + meta map[string]*fcds.Meta + free map[uint8]map[int64]struct{} + mu sync.RWMutex +} + +// NewMetaStore constructs a new MetaStore. +func NewMetaStore() (s *MetaStore) { + free := make(map[uint8]map[int64]struct{}) + for shard := uint8(0); shard < 255; shard++ { + free[shard] = make(map[int64]struct{}) + } + return &MetaStore{ + meta: make(map[string]*fcds.Meta), + free: free, + } +} + +// Get returns chunk meta information. +func (s *MetaStore) Get(addr chunk.Address) (m *fcds.Meta, err error) { + s.mu.RLock() + m = s.meta[string(addr)] + s.mu.RUnlock() + if m == nil { + return nil, chunk.ErrChunkNotFound + } + return m, nil +} + +// Get returns true is meta information is stored. +func (s *MetaStore) Has(addr chunk.Address) (yes bool, err error) { + s.mu.RLock() + _, yes = s.meta[string(addr)] + s.mu.RUnlock() + return yes, nil +} + +// Set adds a new chunk meta information for a shard. +// Reclaimed flag denotes that the chunk is at the place of +// already deleted chunk, not appended to the end of the file. +func (s *MetaStore) Set(addr chunk.Address, shard uint8, reclaimed bool, m *fcds.Meta) (err error) { + s.mu.Lock() + if reclaimed { + delete(s.free[shard], m.Offset) + } + s.meta[string(addr)] = m + s.mu.Unlock() + return nil +} + +// Remove removes chunk meta information from the shard. +func (s *MetaStore) Remove(addr chunk.Address, shard uint8) (err error) { + s.mu.Lock() + defer s.mu.Unlock() + key := string(addr) + m := s.meta[key] + if m == nil { + return chunk.ErrChunkNotFound + } + s.free[shard][m.Offset] = struct{}{} + delete(s.meta, key) + return nil +} + +// FreeOffset returns an offset that can be reclaimed by +// another chunk. If the returned value is less then 0 +// there are no free offset at this shard. +func (s *MetaStore) FreeOffset(shard uint8) (offset int64, err error) { + s.mu.RLock() + for o := range s.free[shard] { + s.mu.RUnlock() + return o, nil + } + s.mu.RUnlock() + return -1, nil +} + +// Count returns a number of chunks in MetaStore. +func (s *MetaStore) Count() (count int, err error) { + s.mu.RLock() + count = len(s.meta) + s.mu.RUnlock() + return count, nil +} + +// Iterate iterates over all chunk meta information. +func (s *MetaStore) Iterate(fn func(chunk.Address, *fcds.Meta) (stop bool, err error)) (err error) { + s.mu.RLock() + defer s.mu.RUnlock() + for a, m := range s.meta { + stop, err := fn(chunk.Address(a), m) + if err != nil { + return err + } + if stop { + return nil + } + } + return nil +} + +// Close doesn't do anything. +// It exists to implement fcdb.MetaStore interface. +func (s *MetaStore) Close() (err error) { + return nil +} diff --git a/storage/fcds/mem/mem_test.go b/storage/fcds/mem/mem_test.go new file mode 100644 index 0000000000..288ab47157 --- /dev/null +++ b/storage/fcds/mem/mem_test.go @@ -0,0 +1,39 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package mem_test + +import ( + "io/ioutil" + "testing" + + "github.com/ethersphere/swarm/storage/fcds" + "github.com/ethersphere/swarm/storage/fcds/mem" + "github.com/ethersphere/swarm/storage/fcds/test" +) + +// TestFCDS runs a standard series of tests on main Store implementation +// with in-memory meta store. +func TestFCDS(t *testing.T) { + test.RunAll(t, func(t *testing.T) (fcds.Storer, func()) { + path, err := ioutil.TempDir("", "swarm-fcds-") + if err != nil { + t.Fatal(err) + } + + return test.NewFCDSStore(t, path, mem.NewMetaStore()) + }) +} diff --git a/storage/fcds/meta.go b/storage/fcds/meta.go new file mode 100644 index 0000000000..f0c9cc4e2b --- /dev/null +++ b/storage/fcds/meta.go @@ -0,0 +1,65 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package fcds + +import ( + "encoding/binary" + "fmt" + + "github.com/ethersphere/swarm/chunk" +) + +// MetaStore defines methods to store and manage +// chunk meta information in Store FCDS implementation. +type MetaStore interface { + Get(addr chunk.Address) (*Meta, error) + Has(addr chunk.Address) (bool, error) + Set(addr chunk.Address, shard uint8, reclaimed bool, m *Meta) error + Remove(addr chunk.Address, shard uint8) error + Count() (int, error) + Iterate(func(chunk.Address, *Meta) (stop bool, err error)) error + FreeOffset(shard uint8) (int64, error) + Close() error +} + +// Meta stores chunk data size and its offset in a file. +type Meta struct { + Size uint16 + Offset int64 +} + +// MarshalBinary returns binary encoded value of meta chunk information. +func (m *Meta) MarshalBinary() (data []byte, err error) { + data = make([]byte, 10) + binary.BigEndian.PutUint64(data[:8], uint64(m.Offset)) + binary.BigEndian.PutUint16(data[8:10], m.Size) + return data, nil +} + +// UnmarshalBinary sets meta chunk information from encoded data. +func (m *Meta) UnmarshalBinary(data []byte) error { + m.Offset = int64(binary.BigEndian.Uint64(data[:8])) + m.Size = binary.BigEndian.Uint16(data[8:10]) + return nil +} + +func (m *Meta) String() (s string) { + if m == nil { + return "" + } + return fmt.Sprintf("{Size: %v, Offset %v}", m.Size, m.Offset) +} diff --git a/storage/fcds/mock/mock.go b/storage/fcds/mock/mock.go new file mode 100644 index 0000000000..b4b7503ce4 --- /dev/null +++ b/storage/fcds/mock/mock.go @@ -0,0 +1,125 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package mock + +import ( + "github.com/ethersphere/swarm/chunk" + "github.com/ethersphere/swarm/storage/fcds" + "github.com/ethersphere/swarm/storage/mock" +) + +var _ fcds.Storer = new(Store) + +// Store implements FCDS Interface by using mock +// store for persistence. +type Store struct { + m *mock.NodeStore +} + +// New returns a new store with mock NodeStore +// for storing Chunk data. +func New(m *mock.NodeStore) (s *Store) { + return &Store{ + m: m, + } +} + +// Get returns a chunk with data. +func (s *Store) Get(addr chunk.Address) (c chunk.Chunk, err error) { + data, err := s.m.Get(addr) + if err != nil { + if err == mock.ErrNotFound { + return nil, chunk.ErrChunkNotFound + } + return nil, err + } + return chunk.NewChunk(addr, data), nil +} + +// Has returns true if chunk is stored. +func (s *Store) Has(addr chunk.Address) (yes bool, err error) { + _, err = s.m.Get(addr) + if err != nil { + if err == mock.ErrNotFound { + return false, nil + } + return false, err + } + return true, nil +} + +// Put stores chunk data. +func (s *Store) Put(ch chunk.Chunk) (err error) { + return s.m.Put(ch.Address(), ch.Data()) +} + +// Delete removes chunk data. +func (s *Store) Delete(addr chunk.Address) (err error) { + return s.m.Delete(addr) +} + +// Count returns a number of stored chunks. +func (s *Store) Count() (count int, err error) { + var startKey []byte + for { + keys, err := s.m.Keys(startKey, 0) + if err != nil { + return 0, err + } + count += len(keys.Keys) + if keys.Next == nil { + break + } + startKey = keys.Next + } + return count, nil +} + +// Iterate iterates over stored chunks in no particular order. +func (s *Store) Iterate(fn func(chunk.Chunk) (stop bool, err error)) (err error) { + var startKey []byte + for { + keys, err := s.m.Keys(startKey, 0) + if err != nil { + return err + } + for _, addr := range keys.Keys { + data, err := s.m.Get(addr) + if err != nil { + return err + } + stop, err := fn(chunk.NewChunk(addr, data)) + if err != nil { + return err + } + if stop { + return nil + } + } + if keys.Next == nil { + break + } + startKey = keys.Next + } + return nil +} + +// Close doesn't do anything. +// It exists to implement fcdb.MetaStore interface. +func (s *Store) Close() error { + return nil +} diff --git a/storage/fcds/mock/mock_test.go b/storage/fcds/mock/mock_test.go new file mode 100644 index 0000000000..49029c608a --- /dev/null +++ b/storage/fcds/mock/mock_test.go @@ -0,0 +1,38 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package mock_test + +import ( + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethersphere/swarm/storage/fcds" + "github.com/ethersphere/swarm/storage/fcds/mock" + "github.com/ethersphere/swarm/storage/fcds/test" + "github.com/ethersphere/swarm/storage/mock/mem" +) + +// TestFCDS runs a standard series of tests on mock Store implementation. +func TestFCDS(t *testing.T) { + test.RunAll(t, func(t *testing.T) (fcds.Storer, func()) { + return mock.New( + mem.NewGlobalStore().NewNodeStore( + common.BytesToAddress(make([]byte, 20)), + ), + ), func() {} + }) +} diff --git a/storage/fcds/offsetcache.go b/storage/fcds/offsetcache.go new file mode 100644 index 0000000000..66311fdbc1 --- /dev/null +++ b/storage/fcds/offsetcache.go @@ -0,0 +1,64 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package fcds + +import "sync" + +// offsetCache is a simple cache of offset integers +// by shard files. +type offsetCache struct { + m map[uint8]map[int64]struct{} + mu sync.RWMutex +} + +// newOffsetCache constructs offsetCache for a fixed number of shards. +func newOffsetCache(shardCount uint8) (c *offsetCache) { + m := make(map[uint8]map[int64]struct{}) + for i := uint8(0); i < shardCount; i++ { + m[i] = make(map[int64]struct{}) + } + return &offsetCache{ + m: m, + } +} + +// get returns a free offset in a shard. If the returned +// value is less then 0, there are no free offset in that +// shard. +func (c *offsetCache) get(shard uint8) (offset int64) { + c.mu.RLock() + for o := range c.m[shard] { + c.mu.RUnlock() + return o + } + c.mu.RUnlock() + return -1 +} + +// set sets a free offset for a shard file. +func (c *offsetCache) set(shard uint8, offset int64) { + c.mu.Lock() + c.m[shard][offset] = struct{}{} + c.mu.Unlock() +} + +// remove removes a free offset for a shard file. +func (c *offsetCache) remove(shard uint8, offset int64) { + c.mu.Lock() + delete(c.m[shard], offset) + c.mu.Unlock() +} diff --git a/storage/fcds/test/store.go b/storage/fcds/test/store.go new file mode 100644 index 0000000000..0580c16820 --- /dev/null +++ b/storage/fcds/test/store.go @@ -0,0 +1,327 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package test + +import ( + "bytes" + "flag" + "fmt" + "math/rand" + "os" + "sync" + "testing" + + "github.com/ethersphere/swarm/chunk" + chunktesting "github.com/ethersphere/swarm/chunk/testing" + "github.com/ethersphere/swarm/storage/fcds" +) + +var ( + chunksFlag = flag.Int("chunks", 100, "Number of chunks to use in tests.") + concurrencyFlag = flag.Int("concurrency", 8, "Maximal number of parallel operations.") + noCacheFlag = flag.Bool("no-cache", false, "Disable memory cache.") +) + +// Main parses custom cli flags automatically on test runs. +func Main(m *testing.M) { + flag.Parse() + os.Exit(m.Run()) +} + +// RunAll runs all available tests for a Store implementation. +func RunAll(t *testing.T, newStoreFunc func(t *testing.T) (fcds.Storer, func())) { + + t.Run("empty", func(t *testing.T) { + RunStore(t, &RunStoreOptions{ + ChunkCount: *chunksFlag, + NewStoreFunc: newStoreFunc, + }) + }) + + t.Run("cleaned", func(t *testing.T) { + RunStore(t, &RunStoreOptions{ + ChunkCount: *chunksFlag, + NewStoreFunc: newStoreFunc, + Cleaned: true, + }) + }) + + for _, tc := range []struct { + name string + deleteSplit int + }{ + { + name: "delete-all", + deleteSplit: 1, + }, + { + name: "delete-half", + deleteSplit: 2, + }, + { + name: "delete-fifth", + deleteSplit: 5, + }, + { + name: "delete-tenth", + deleteSplit: 10, + }, + { + name: "delete-percent", + deleteSplit: 100, + }, + { + name: "delete-permill", + deleteSplit: 1000, + }, + } { + t.Run(tc.name, func(t *testing.T) { + RunStore(t, &RunStoreOptions{ + ChunkCount: *chunksFlag, + DeleteSplit: tc.deleteSplit, + NewStoreFunc: newStoreFunc, + }) + }) + } + + t.Run("iterator", func(t *testing.T) { + RunIterator(t, newStoreFunc) + }) +} + +// RunStoreOptions define parameters for Store test function. +type RunStoreOptions struct { + NewStoreFunc func(t *testing.T) (fcds.Storer, func()) + ChunkCount int + DeleteSplit int + Cleaned bool +} + +// RunStore tests a single Store implementation for its general functionalities. +// Subtests are deliberately separated into sections that can have timings +// printed on test runs for each of them. +func RunStore(t *testing.T, o *RunStoreOptions) { + db, clean := o.NewStoreFunc(t) + defer clean() + + chunks := getChunks(o.ChunkCount) + + if o.Cleaned { + t.Run("clean", func(t *testing.T) { + sem := make(chan struct{}, *concurrencyFlag) + var wg sync.WaitGroup + + wg.Add(o.ChunkCount) + for _, ch := range chunks { + sem <- struct{}{} + + go func(ch chunk.Chunk) { + defer func() { + <-sem + wg.Done() + }() + + if err := db.Put(ch); err != nil { + panic(err) + } + }(ch) + } + wg.Wait() + + wg = sync.WaitGroup{} + + wg.Add(o.ChunkCount) + for _, ch := range chunks { + sem <- struct{}{} + + go func(ch chunk.Chunk) { + defer func() { + <-sem + wg.Done() + }() + + if err := db.Delete(ch.Address()); err != nil { + panic(err) + } + }(ch) + } + wg.Wait() + }) + } + + rand.Shuffle(o.ChunkCount, func(i, j int) { + chunks[i], chunks[j] = chunks[j], chunks[i] + }) + + var deletedChunks sync.Map + + t.Run("write", func(t *testing.T) { + sem := make(chan struct{}, *concurrencyFlag) + var wg sync.WaitGroup + var wantCount int + var wantCountMu sync.Mutex + wg.Add(o.ChunkCount) + for i, ch := range chunks { + sem <- struct{}{} + + go func(i int, ch chunk.Chunk) { + defer func() { + <-sem + wg.Done() + }() + + if err := db.Put(ch); err != nil { + panic(err) + } + if o.DeleteSplit > 0 && i%o.DeleteSplit == 0 { + if err := db.Delete(ch.Address()); err != nil { + panic(err) + } + deletedChunks.Store(string(ch.Address()), nil) + } else { + wantCountMu.Lock() + wantCount++ + wantCountMu.Unlock() + } + }(i, ch) + } + wg.Wait() + }) + + rand.Shuffle(o.ChunkCount, func(i, j int) { + chunks[i], chunks[j] = chunks[j], chunks[i] + }) + + t.Run("read", func(t *testing.T) { + sem := make(chan struct{}, *concurrencyFlag) + var wg sync.WaitGroup + + wg.Add(o.ChunkCount) + for i, ch := range chunks { + sem <- struct{}{} + + go func(i int, ch chunk.Chunk) { + defer func() { + <-sem + wg.Done() + }() + + got, err := db.Get(ch.Address()) + + if _, ok := deletedChunks.Load(string(ch.Address())); ok { + if err != chunk.ErrChunkNotFound { + panic(fmt.Errorf("got error %v, want %v", err, chunk.ErrChunkNotFound)) + } + } else { + if err != nil { + panic(fmt.Errorf("chunk %v %s: %v", i, ch.Address().Hex(), err)) + } + if !bytes.Equal(got.Address(), ch.Address()) { + panic(fmt.Errorf("got chunk %v address %x, want %x", i, got.Address(), ch.Address())) + } + if !bytes.Equal(got.Data(), ch.Data()) { + panic(fmt.Errorf("got chunk %v data %x, want %x", i, got.Data(), ch.Data())) + } + } + }(i, ch) + } + wg.Wait() + }) +} + +// RunIterator validates behaviour of Iterate and Count methods on a Store. +func RunIterator(t *testing.T, newStoreFunc func(t *testing.T) (fcds.Storer, func())) { + chunkCount := 1000 + + db, clean := newStoreFunc(t) + defer clean() + + chunks := getChunks(chunkCount) + + for _, ch := range chunks { + if err := db.Put(ch); err != nil { + t.Fatal(err) + } + } + + gotCount, err := db.Count() + if err != nil { + t.Fatal(err) + } + if gotCount != chunkCount { + t.Fatalf("got %v count, want %v", gotCount, chunkCount) + } + + var iteratedCount int + if err := db.Iterate(func(ch chunk.Chunk) (stop bool, err error) { + for _, c := range chunks { + if bytes.Equal(c.Address(), ch.Address()) { + if !bytes.Equal(c.Data(), ch.Data()) { + t.Fatalf("invalid data in iterator for key %s", c.Address()) + } + iteratedCount++ + return false, nil + } + } + return false, nil + }); err != nil { + t.Fatal(err) + } + if iteratedCount != chunkCount { + t.Fatalf("iterated on %v chunks, want %v", iteratedCount, chunkCount) + } +} + +// NewFCDSStore is a test helper function that constructs +// a new Store for testing purposes into which a specific MetaStore can be injected. +func NewFCDSStore(t *testing.T, path string, metaStore fcds.MetaStore) (s *fcds.Store, clean func()) { + t.Helper() + + s, err := fcds.New(path, chunk.DefaultSize, metaStore, fcds.WithCache(!*noCacheFlag)) + if err != nil { + os.RemoveAll(path) + t.Fatal(err) + } + return s, func() { + s.Close() + os.RemoveAll(path) + } +} + +// chunkCache reduces the work done by generating random chunks +// by getChunks function by keeping storing them for future reuse. +var chunkCache []chunk.Chunk + +// getChunk returns a number of chunks with random data for testing purposes. +// By calling it multiple times, it will return same chunks from the cache. +func getChunks(count int) []chunk.Chunk { + l := len(chunkCache) + if l == 0 { + chunkCache = make([]chunk.Chunk, count) + for i := 0; i < count; i++ { + chunkCache[i] = chunktesting.GenerateTestRandomChunk() + } + return chunkCache + } + if l < count { + for i := 0; i < count-l; i++ { + chunkCache = append(chunkCache, chunktesting.GenerateTestRandomChunk()) + } + return chunkCache + } + return chunkCache[:count] +} diff --git a/storage/localstore/export.go b/storage/localstore/export.go index b8826d976e..296cbc90a3 100644 --- a/storage/localstore/export.go +++ b/storage/localstore/export.go @@ -18,16 +18,20 @@ package localstore import ( "archive/tar" + "bufio" "context" + "encoding/binary" "encoding/hex" "fmt" "io" "io/ioutil" + "strings" "sync" "github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/log" "github.com/ethersphere/swarm/shed" + "github.com/syndtr/goleveldb/leveldb" ) const ( @@ -38,6 +42,9 @@ const ( legacyExportVersion = "1" // current export format version currentExportVersion = "2" + // tags + tagsFilenamePrefix = "tags-" + exportTagsFileLimit = 1000 ) // Export writes a tar structured data to the writer of @@ -58,23 +65,81 @@ func (db *DB) Export(w io.Writer) (count int64, err error) { return 0, err } - err = db.retrievalDataIndex.Iterate(func(item shed.Item) (stop bool, err error) { + // tags export + var ( + tagsFileNumber int + tagsCounter int + tags []byte + + writeTags = func() (err error) { + l := len(tags) + if l == 0 { + return nil + } + + tagsCounter = 0 + tagsFileNumber++ + + if err := tw.WriteHeader(&tar.Header{ + Name: fmt.Sprintf("%s%v", tagsFilenamePrefix, tagsFileNumber), + Mode: 0644, + Size: int64(l), + }); err != nil { + return err + } + _, err = tw.Write(tags) + return err + } + ) + err = db.pinIndex.Iterate(func(item shed.Item) (stop bool, err error) { + tags = append(tags, encodeExportPin(item.Address, item.PinCounter)...) + tags = append(tags, '\n') + if tagsCounter == exportTagsFileLimit { + if err := writeTags(); err != nil { + return true, err + } + } + return false, nil + }, nil) + if err != nil { + return 0, err + } + if err := writeTags(); err != nil { + return 0, err + } + + exportchunk := func(addr chunk.Address, data []byte) error { hdr := &tar.Header{ - Name: hex.EncodeToString(item.Address), + Name: hex.EncodeToString(addr), Mode: 0644, - Size: int64(len(item.Data)), + Size: int64(len(data)), } if err := tw.WriteHeader(hdr); err != nil { - return false, err + return err } - if _, err := tw.Write(item.Data); err != nil { - return false, err + if _, err := tw.Write(data); err != nil { + return err } count++ - return false, nil + return nil + } + + // Export legacy (pre fcds) data index. + // This is required as a manual step in migrateDiwali migration. + err = db.retrievalDataIndex.Iterate(func(item shed.Item) (stop bool, err error) { + err = exportchunk(item.Address, item.Data) + return false, err }, nil) + if err != nil { + return 0, err + } + + err = db.data.Iterate(func(ch chunk.Chunk) (stop bool, err error) { + err = exportchunk(ch.Address(), ch.Data()) + return false, err + }) return count, err } @@ -94,7 +159,6 @@ func (db *DB) Import(r io.Reader, legacy bool) (count int64, err error) { var wg sync.WaitGroup go func() { var ( - firstFile = true // if exportVersionFilename file is not present // assume legacy version version = legacyExportVersion @@ -107,22 +171,70 @@ func (db *DB) Import(r io.Reader, legacy bool) (count int64, err error) { } select { case errC <- err: + return case <-ctx.Done(): + return } } - if firstFile { - firstFile = false - if hdr.Name == exportVersionFilename { - data, err := ioutil.ReadAll(tr) - if err != nil { + // get the export file format version + if hdr.Name == exportVersionFilename { + data, err := ioutil.ReadAll(tr) + if err != nil { + select { + case errC <- err: + return + case <-ctx.Done(): + return + } + } + version = string(data) + continue + } + // set pinned chunks + if strings.HasPrefix(hdr.Name, tagsFilenamePrefix) { + // All chunks are put before tag files are iterated on + // because of tagsFilenamePrefix starts with "t" + // which is ordered later in the tar file then + // hex characters of chunk addresses. + // + // Wait for chunks to be stored before continuing. + wg.Wait() + + scanner := bufio.NewScanner(tr) + batch := new(leveldb.Batch) + for scanner.Scan() { + addr, counter := decodeExportPin(scanner.Bytes()) + if addr == nil { + continue + } + if err := db.setPin(batch, addr, counter); err != nil { select { case errC <- err: + return case <-ctx.Done(): + return } } - version = string(data) - continue } + + if err := scanner.Err(); err != nil { + select { + case errC <- err: + return + case <-ctx.Done(): + return + } + } + + if err := db.shed.WriteBatch(batch); err != nil { + select { + case errC <- err: + return + case <-ctx.Done(): + return + } + } + continue } if len(hdr.Name) != 64 { @@ -140,7 +252,9 @@ func (db *DB) Import(r io.Reader, legacy bool) (count int64, err error) { if err != nil { select { case errC <- err: + return case <-ctx.Done(): + return } } key := chunk.Address(keybytes) @@ -204,3 +318,19 @@ func (db *DB) Import(r io.Reader, legacy bool) (count int64, err error) { } } } + +func encodeExportPin(addr chunk.Address, counter uint64) (data []byte) { + data = make([]byte, 8, 8+len(addr)) + binary.BigEndian.PutUint64(data[:8], counter) + data = append(data, addr...) + return data +} + +func decodeExportPin(data []byte) (addr chunk.Address, counter uint64) { + if len(data) < 8 { + return nil, 0 + } + counter = binary.BigEndian.Uint64(data[:8]) + addr = chunk.Address(data[8:]) + return addr, counter +} diff --git a/storage/localstore/export_test.go b/storage/localstore/export_test.go index 3681159673..dc2d63ea8f 100644 --- a/storage/localstore/export_test.go +++ b/storage/localstore/export_test.go @@ -19,9 +19,12 @@ package localstore import ( "bytes" "context" + "errors" + "fmt" "testing" "github.com/ethersphere/swarm/chunk" + "github.com/ethersphere/swarm/shed" ) // TestExportImport constructs two databases, one to put and export @@ -31,9 +34,13 @@ func TestExportImport(t *testing.T) { db1, cleanup1 := newTestDB(t, nil) defer cleanup1() - var chunkCount = 100 + var ( + chunkCount = exportTagsFileLimit * 3 + pinnedCount = exportTagsFileLimit*2 + 10 + ) chunks := make(map[string][]byte, chunkCount) + pinned := make(map[string]uint64, pinnedCount) for i := 0; i < chunkCount; i++ { ch := generateTestRandomChunk() @@ -42,8 +49,44 @@ func TestExportImport(t *testing.T) { t.Fatal(err) } chunks[string(ch.Address())] = ch.Data() + if i < pinnedCount { + count := uint64(i%84) + 1 // arbitrary pin counter + for i := uint64(0); i < count; i++ { + err := db1.Set(context.Background(), chunk.ModeSetPin, ch.Address()) + if err != nil { + t.Fatal(err) + } + } + pinned[string(ch.Address())] = count + } } + validtePins := func(t *testing.T, db *DB) { + t.Helper() + + var got int + err := db1.pinIndex.Iterate(func(item shed.Item) (stop bool, err error) { + count, ok := pinned[string(item.Address)] + if !ok { + return true, errors.New("chunk not pinned") + } + if count != item.PinCounter { + return true, fmt.Errorf("got pin count %v for chunk %x, want %v", item.PinCounter, item.Address, count) + } + got++ + return false, nil + }, nil) + if err != nil { + t.Fatal(err) + } + + if got != pinnedCount { + t.Fatalf("got pinned chunks %v, want %v", got, pinnedCount) + } + } + + validtePins(t, db1) + var buf bytes.Buffer c, err := db1.Export(&buf) @@ -77,4 +120,6 @@ func TestExportImport(t *testing.T) { t.Fatalf("chunk %s: got data %x, want %x", addr.Hex(), got, want) } } + + validtePins(t, db2) } diff --git a/storage/localstore/gc.go b/storage/localstore/gc.go index f8c9c5ef34..7e46437e62 100644 --- a/storage/localstore/gc.go +++ b/storage/localstore/gc.go @@ -21,6 +21,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/shed" "github.com/syndtr/goleveldb/leveldb" ) @@ -108,6 +109,7 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { } metrics.GetOrRegisterGauge(metricName+".gcsize", nil).Update(int64(gcSize)) + var addrs []chunk.Address done = true err = db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) { if gcSize-collectedCount <= target { @@ -118,8 +120,8 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { metrics.GetOrRegisterGauge(metricName+".accessts", nil).Update(item.AccessTimestamp) // delete from retrieve, pull, gc - db.retrievalDataIndex.DeleteInBatch(batch, item) - db.retrievalAccessIndex.DeleteInBatch(batch, item) + addrs = append(addrs, item.Address) + db.metaIndex.DeleteInBatch(batch, item) db.pullIndex.DeleteInBatch(batch, item) db.gcIndex.DeleteInBatch(batch, item) collectedCount++ @@ -143,6 +145,12 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { metrics.GetOrRegisterCounter(metricName+".writebatch.err", nil).Inc(1) return 0, false, err } + for _, a := range addrs { + if err := db.data.Delete(a); err != nil { + metrics.GetOrRegisterCounter(metricName+".deletechunk.err", nil).Inc(1) + return 0, false, err + } + } return collectedCount, done, nil } @@ -162,18 +170,12 @@ func (db *DB) removeChunksInExcludeIndexFromGC() (err error) { var gcSizeChange int64 err = db.gcExcludeIndex.Iterate(func(item shed.Item) (stop bool, err error) { // Get access timestamp - retrievalAccessIndexItem, err := db.retrievalAccessIndex.Get(item) - if err != nil { - return false, err - } - item.AccessTimestamp = retrievalAccessIndexItem.AccessTimestamp - - // Get the binId - retrievalDataIndexItem, err := db.retrievalDataIndex.Get(item) + metaIndexItem, err := db.metaIndex.Get(item) if err != nil { return false, err } - item.BinID = retrievalDataIndexItem.BinID + item.AccessTimestamp = metaIndexItem.AccessTimestamp + item.BinID = metaIndexItem.BinID // Check if this item is in gcIndex and remove it ok, err := db.gcIndex.Has(item) diff --git a/storage/localstore/localstore.go b/storage/localstore/localstore.go index 7f6f9d7f13..ffe86860ed 100644 --- a/storage/localstore/localstore.go +++ b/storage/localstore/localstore.go @@ -20,6 +20,7 @@ import ( "encoding/binary" "errors" "os" + "path/filepath" "runtime/pprof" "sync" "time" @@ -28,6 +29,9 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/shed" + "github.com/ethersphere/swarm/storage/fcds" + fcdsleveldb "github.com/ethersphere/swarm/storage/fcds/leveldb" + fcdsmock "github.com/ethersphere/swarm/storage/fcds/mock" "github.com/ethersphere/swarm/storage/mock" ) @@ -58,12 +62,17 @@ type DB struct { shed *shed.DB tags *chunk.Tags + path string + // schema name of loaded data schemaName shed.StringField - // retrieval indexes - retrievalDataIndex shed.Index - retrievalAccessIndex shed.Index + // chunk data storage + data fcds.Storer + // bin index and timestamps index + metaIndex shed.Index + // legacy data index, used only in export for manual migration + retrievalDataIndex shed.Index // push syncing index pushIndex shed.Index // push syncing subscriptions triggers @@ -164,6 +173,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { } db = &DB{ + path: path, capacity: o.Capacity, baseKey: baseKey, tags: o.Tags, @@ -199,13 +209,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { } if schemaName == "" { // initial new localstore run - err := db.schemaName.Put(DbSchemaCurrent) - if err != nil { - return nil, err - } - } else { - // execute possible migrations - err = db.migrate(schemaName) + err := db.schemaName.Put(dbSchemaCurrent) if err != nil { return nil, err } @@ -216,45 +220,28 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { if err != nil { return nil, err } - // Functions for retrieval data index. - var ( - encodeValueFunc func(fields shed.Item) (value []byte, err error) - decodeValueFunc func(keyItem shed.Item, value []byte) (e shed.Item, err error) - ) - if o.MockStore != nil { - encodeValueFunc = func(fields shed.Item) (value []byte, err error) { - b := make([]byte, 16) - binary.BigEndian.PutUint64(b[:8], fields.BinID) - binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp)) - err = o.MockStore.Put(fields.Address, fields.Data) - if err != nil { - return nil, err - } - return b, nil + + if o.MockStore == nil { + metaStore, err := fcdsleveldb.NewMetaStore(filepath.Join(path, "meta")) + if err != nil { + return nil, err } - decodeValueFunc = func(keyItem shed.Item, value []byte) (e shed.Item, err error) { - e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16])) - e.BinID = binary.BigEndian.Uint64(value[:8]) - e.Data, err = o.MockStore.Get(keyItem.Address) - return e, err + db.data, err = fcds.New( + filepath.Join(path, "data"), + chunk.DefaultSize+8, // chunk data has additional 8 bytes prepended + metaStore, + fcds.WithCache(true), + ) + if err != nil { + return nil, err } } else { - encodeValueFunc = func(fields shed.Item) (value []byte, err error) { - b := make([]byte, 16) - binary.BigEndian.PutUint64(b[:8], fields.BinID) - binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp)) - value = append(b, fields.Data...) - return value, nil - } - decodeValueFunc = func(keyItem shed.Item, value []byte) (e shed.Item, err error) { - e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16])) - e.BinID = binary.BigEndian.Uint64(value[:8]) - e.Data = value[16:] - return e, nil - } + // Mock store is provided, use mock FCDS. + db.data = fcdsmock.New(o.MockStore) } - // Index storing actual chunk address, data and bin id. - db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|BinID|Data", shed.IndexFuncs{ + // Index storing bin id, store and access timestamp for a particular address. + // It is needed in order to update gc index keys for iteration order. + db.metaIndex, err = db.shed.NewIndex("Address->BinID|StoreTimestamp|AccessTimestamp", shed.IndexFuncs{ EncodeKey: func(fields shed.Item) (key []byte, err error) { return fields.Address, nil }, @@ -262,15 +249,26 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { e.Address = key return e, nil }, - EncodeValue: encodeValueFunc, - DecodeValue: decodeValueFunc, + EncodeValue: func(fields shed.Item) (value []byte, err error) { + b := make([]byte, 24) + binary.BigEndian.PutUint64(b[:8], fields.BinID) + binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp)) + binary.BigEndian.PutUint64(b[16:24], uint64(fields.AccessTimestamp)) + return b, nil + }, + DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { + e.BinID = binary.BigEndian.Uint64(value[:8]) + e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16])) + e.AccessTimestamp = int64(binary.BigEndian.Uint64(value[16:24])) + return e, nil + }, }) if err != nil { return nil, err } - // Index storing access timestamp for a particular address. - // It is needed in order to update gc index keys for iteration order. - db.retrievalAccessIndex, err = db.shed.NewIndex("Address->AccessTimestamp", shed.IndexFuncs{ + // Index storing actual chunk address, data and bin id. + // Used only in export to provide migration functionality. + db.retrievalDataIndex, err = db.shed.NewIndex("Address->StoreTimestamp|BinID|Data", shed.IndexFuncs{ EncodeKey: func(fields shed.Item) (key []byte, err error) { return fields.Address, nil }, @@ -279,12 +277,16 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { return e, nil }, EncodeValue: func(fields shed.Item) (value []byte, err error) { - b := make([]byte, 8) - binary.BigEndian.PutUint64(b, uint64(fields.AccessTimestamp)) - return b, nil + b := make([]byte, 16) + binary.BigEndian.PutUint64(b[:8], fields.BinID) + binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp)) + value = append(b, fields.Data...) + return value, nil }, DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { - e.AccessTimestamp = int64(binary.BigEndian.Uint64(value)) + e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16])) + e.BinID = binary.BigEndian.Uint64(value[:8]) + e.Data = value[16:] return e, nil }, }) @@ -457,6 +459,9 @@ func (db *DB) Close() (err error) { // TODO: use a logger to write a goroutine profile pprof.Lookup("goroutine").WriteTo(os.Stdout, 2) } + if err := db.data.Close(); err != nil { + log.Error("close chunk data storage", "err", err) + } return db.shed.Close() } @@ -471,13 +476,12 @@ func (db *DB) po(addr chunk.Address) (bin uint8) { func (db *DB) DebugIndices() (indexInfo map[string]int, err error) { indexInfo = make(map[string]int) for k, v := range map[string]shed.Index{ - "retrievalDataIndex": db.retrievalDataIndex, - "retrievalAccessIndex": db.retrievalAccessIndex, - "pushIndex": db.pushIndex, - "pullIndex": db.pullIndex, - "gcIndex": db.gcIndex, - "gcExcludeIndex": db.gcExcludeIndex, - "pinIndex": db.pinIndex, + "metaIndex": db.metaIndex, + "pushIndex": db.pushIndex, + "pullIndex": db.pullIndex, + "gcIndex": db.gcIndex, + "gcExcludeIndex": db.gcExcludeIndex, + "pinIndex": db.pinIndex, } { indexSize, err := v.Count() if err != nil { @@ -491,6 +495,7 @@ func (db *DB) DebugIndices() (indexInfo map[string]int, err error) { } indexInfo["gcSize"] = int(val) + indexInfo["data"], err = db.data.Count() return indexInfo, err } diff --git a/storage/localstore/localstore_test.go b/storage/localstore/localstore_test.go index ace0a10dcb..6fae246df4 100644 --- a/storage/localstore/localstore_test.go +++ b/storage/localstore/localstore_test.go @@ -32,7 +32,6 @@ import ( "github.com/ethersphere/swarm/chunk" chunktesting "github.com/ethersphere/swarm/chunk/testing" "github.com/ethersphere/swarm/shed" - "github.com/syndtr/goleveldb/leveldb" ) func init() { @@ -243,17 +242,15 @@ func newRetrieveIndexesTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTim return func(t *testing.T) { t.Helper() - item, err := db.retrievalDataIndex.Get(addressToItem(chunk.Address())) + c, err := db.data.Get(chunk.Address()) if err != nil { t.Fatal(err) } - validateItem(t, item, chunk.Address(), chunk.Data(), storeTimestamp, 0) + validateItem(t, shed.Item{Address: c.Address(), Data: c.Data()}, chunk.Address(), chunk.Data(), 0, 0) - // access index should not be set - wantErr := leveldb.ErrNotFound - item, err = db.retrievalAccessIndex.Get(addressToItem(chunk.Address())) - if err != wantErr { - t.Errorf("got error %v, want %v", err, wantErr) + _, err = db.metaIndex.Get(addressToItem(chunk.Address())) + if err != nil { + t.Errorf("got error %v, want %v", err, nil) } } } @@ -264,18 +261,18 @@ func newRetrieveIndexesTestWithAccess(db *DB, ch chunk.Chunk, storeTimestamp, ac return func(t *testing.T) { t.Helper() - item, err := db.retrievalDataIndex.Get(addressToItem(ch.Address())) + c, err := db.data.Get(ch.Address()) if err != nil { t.Fatal(err) } - validateItem(t, item, ch.Address(), ch.Data(), storeTimestamp, 0) + validateItem(t, shed.Item{Address: c.Address(), Data: c.Data()}, ch.Address(), ch.Data(), 0, 0) if accessTimestamp > 0 { - item, err = db.retrievalAccessIndex.Get(addressToItem(ch.Address())) + item, err := db.metaIndex.Get(addressToItem(ch.Address())) if err != nil { t.Fatal(err) } - validateItem(t, item, ch.Address(), nil, 0, accessTimestamp) + validateItem(t, item, ch.Address(), nil, storeTimestamp, accessTimestamp) } } } @@ -376,6 +373,20 @@ func newItemsCountTest(i shed.Index, want int) func(t *testing.T) { } } +func newDataCountTest(db *DB, want int) func(t *testing.T) { + return func(t *testing.T) { + t.Helper() + + got, err := db.data.Count() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Fatalf("got %v chunks in data, want %v", got, want) + } + } +} + // newIndexGCSizeTest retruns a test function that validates if DB.gcSize // value is the same as the number of items in DB.gcIndex. func newIndexGCSizeTest(db *DB) func(t *testing.T) { @@ -531,7 +542,7 @@ func TestSetNow(t *testing.T) { } } -func testIndexCounts(t *testing.T, pushIndex, pullIndex, gcIndex, gcExcludeIndex, pinIndex, retrievalDataIndex, retrievalAccessIndex int, indexInfo map[string]int) { +func testIndexCounts(t *testing.T, pushIndex, pullIndex, gcIndex, gcExcludeIndex, pinIndex, data, metaIndex int, indexInfo map[string]int) { t.Helper() if indexInfo["pushIndex"] != pushIndex { t.Fatalf("pushIndex count mismatch. got %d want %d", indexInfo["pushIndex"], pushIndex) @@ -553,12 +564,12 @@ func testIndexCounts(t *testing.T, pushIndex, pullIndex, gcIndex, gcExcludeIndex t.Fatalf("pinIndex count mismatch. got %d want %d", indexInfo["pinIndex"], pinIndex) } - if indexInfo["retrievalDataIndex"] != retrievalDataIndex { - t.Fatalf("retrievalDataIndex count mismatch. got %d want %d", indexInfo["retrievalDataIndex"], retrievalDataIndex) + if indexInfo["data"] != data { + t.Fatalf("data count mismatch. got %d want %d", indexInfo["data"], data) } - if indexInfo["retrievalAccessIndex"] != retrievalAccessIndex { - t.Fatalf("retrievalAccessIndex count mismatch. got %d want %d", indexInfo["retrievalAccessIndex"], retrievalAccessIndex) + if indexInfo["metaIndex"] != metaIndex { + t.Fatalf("metaIndex count mismatch. got %d want %d", indexInfo["metaIndex"], metaIndex) } } @@ -585,8 +596,8 @@ func TestDBDebugIndexes(t *testing.T) { t.Fatal(err) } - // for reference: testIndexCounts(t *testing.T, pushIndex, pullIndex, gcIndex, gcExcludeIndex, pinIndex, retrievalDataIndex, retrievalAccessIndex int, indexInfo map[string]int) - testIndexCounts(t, 1, 1, 0, 0, 0, 1, 0, indexCounts) + // for reference: testIndexCounts(t *testing.T, pushIndex, pullIndex, gcIndex, gcExcludeIndex, pinIndex, data, metaIndex int, indexInfo map[string]int) + testIndexCounts(t, 1, 1, 0, 0, 0, 1, 1, indexCounts) // set the chunk for pinning and expect the index count to grow err = db.Set(context.Background(), chunk.ModeSetPin, ch.Address()) @@ -600,7 +611,7 @@ func TestDBDebugIndexes(t *testing.T) { } // assert that there's a pin and gc exclude entry now - testIndexCounts(t, 1, 1, 0, 1, 1, 1, 0, indexCounts) + testIndexCounts(t, 1, 1, 0, 1, 1, 1, 1, indexCounts) // set the chunk as accessed and expect the access index to grow err = db.Set(context.Background(), chunk.ModeSetAccess, ch.Address()) @@ -614,5 +625,4 @@ func TestDBDebugIndexes(t *testing.T) { // assert that there's a pin and gc exclude entry now testIndexCounts(t, 1, 1, 1, 1, 1, 1, 1, indexCounts) - } diff --git a/storage/localstore/migration.go b/storage/localstore/migration.go index 318f593fda..88c6c07d47 100644 --- a/storage/localstore/migration.go +++ b/storage/localstore/migration.go @@ -30,24 +30,57 @@ import ( var errMissingCurrentSchema = errors.New("could not find current db schema") var errMissingTargetSchema = errors.New("could not find target db schema") +// BreakingMigrationError is returned from migration functions that require +// manual migration steps. +type BreakingMigrationError struct { + Manual string +} + +// NewBreakingMigrationError returns a new BreakingMigrationError +// with instructions for manual operations. +func NewBreakingMigrationError(manual string) *BreakingMigrationError { + return &BreakingMigrationError{ + Manual: manual, + } +} + +func (e *BreakingMigrationError) Error() string { + return "breaking migration" +} + +// Migrate checks the schema name in storage dir and compares it +// with the expected schema name to construct a series of data migrations +// if they are required. +func (db *DB) Migrate() (err error) { + schemaName, err := db.schemaName.Get() + if err != nil { + return err + } + if schemaName == "" { + return nil + } + // execute possible migrations + return db.migrate(schemaName) +} + type migration struct { - name string // name of the schema - fn func(db *DB) error // the migration function that needs to be performed in order to get to the current schema name + name string // name of the schema + fn func(db *DB) error // the migration function that needs to be performed in order to get to the current schema name + breaking bool } // schemaMigrations contains an ordered list of the database schemes, that is // in order to run data migrations in the correct sequence var schemaMigrations = []migration{ - {name: DbSchemaPurity, fn: func(db *DB) error { return nil }}, - {name: DbSchemaHalloween, fn: func(db *DB) error { return nil }}, - {name: DbSchemaSanctuary, fn: func(db *DB) error { return nil }}, - {name: DbSchemaDiwali, fn: migrateSanctuary}, + {name: dbSchemaSanctuary, fn: func(db *DB) error { return nil }}, + {name: dbSchemaDiwali, fn: migrateSanctuary}, + {name: dbSchemaForky, fn: migrateDiwali, breaking: true}, } func (db *DB) migrate(schemaName string) error { - migrations, err := getMigrations(schemaName, DbSchemaCurrent, schemaMigrations) + migrations, err := getMigrations(schemaName, dbSchemaCurrent, schemaMigrations) if err != nil { - return fmt.Errorf("error getting migrations for current schema (%s): %v", schemaName, err) + return fmt.Errorf("get migrations for current schema %s: %w", schemaName, err) } // no migrations to run @@ -84,23 +117,31 @@ type migrationFn func(db *DB) error func getMigrations(currentSchema, targetSchema string, allSchemeMigrations []migration) (migrations []migration, err error) { foundCurrent := false foundTarget := false - if currentSchema == DbSchemaCurrent { + if currentSchema == dbSchemaCurrent { return nil, nil } for i, v := range allSchemeMigrations { - switch v.name { - case currentSchema: + if v.name == targetSchema { + foundTarget = true + } + if v.name == currentSchema { if foundCurrent { return nil, errors.New("found schema name for the second time when looking for migrations") } foundCurrent = true - log.Info("found current localstore schema", "currentSchema", currentSchema, "migrateTo", DbSchemaCurrent, "total migrations", len(allSchemeMigrations)-i) + log.Info("found current localstore schema", "currentSchema", currentSchema, "migrateTo", dbSchemaCurrent, "total migrations", len(allSchemeMigrations)-i) continue // current schema migration should not be executed (already has been when schema was migrated to) - case targetSchema: - foundTarget = true } if foundCurrent { - migrations = append(migrations, v) + if v.breaking { + // discard all migrations before a breaking one + migrations = []migration{v} + } else { + migrations = append(migrations, v) + } + } + if foundTarget { + break } } if !foundCurrent { @@ -181,3 +222,23 @@ func migrateSanctuary(db *DB) error { return db.shed.WriteBatch(batch) } + +func migrateDiwali(db *DB) error { + return NewBreakingMigrationError(fmt.Sprintf(` +Swarm chunk storage layer has changed. + +You can choose either to manually migrate the data in your local store to the new data store or to discard the data altogether. + +Preserving data requires additional storage roughly the size of the data directory and may take longer time depending on storage performance. + +To continue by discarding data, just remove %[1]s directory and start the swarm binary again. + +To preserve data: + - export data + swarm db export %[1]s data.tar %[2]x + - remove data directory %[1]s + - import data + swarm db import %[1]s data.tar %[2]x + - start the swarm +`, db.path, db.baseKey)) +} diff --git a/storage/localstore/migration_test.go b/storage/localstore/migration_test.go index 1c3ca97b4d..b87ebef0af 100644 --- a/storage/localstore/migration_test.go +++ b/storage/localstore/migration_test.go @@ -17,13 +17,14 @@ package localstore import ( + "errors" "io" "io/ioutil" "log" "math/rand" "os" "path" - "strings" + "reflect" "testing" "github.com/ethersphere/swarm/chunk" @@ -32,19 +33,19 @@ import ( func TestOneMigration(t *testing.T) { defer func(v []migration, s string) { schemaMigrations = v - DbSchemaCurrent = s - }(schemaMigrations, DbSchemaCurrent) + dbSchemaCurrent = s + }(schemaMigrations, dbSchemaCurrent) - DbSchemaCurrent = DbSchemaSanctuary + dbSchemaCurrent = dbSchemaSanctuary ran := false shouldNotRun := false schemaMigrations = []migration{ - {name: DbSchemaSanctuary, fn: func(db *DB) error { + {name: dbSchemaSanctuary, fn: func(db *DB) error { shouldNotRun = true // this should not be executed return nil }}, - {name: DbSchemaDiwali, fn: func(db *DB) error { + {name: dbSchemaDiwali, fn: func(db *DB) error { ran = true return nil }}, @@ -71,21 +72,24 @@ func TestOneMigration(t *testing.T) { t.Fatal(err) } - DbSchemaCurrent = DbSchemaDiwali + dbSchemaCurrent = dbSchemaDiwali // start the existing localstore and expect the migration to run db, err = New(dir, baseKey, nil) if err != nil { t.Fatal(err) } + if err := db.Migrate(); err != nil { + t.Fatal(err) + } schemaName, err := db.schemaName.Get() if err != nil { t.Fatal(err) } - if schemaName != DbSchemaDiwali { - t.Errorf("schema name mismatch. got '%s', want '%s'", schemaName, DbSchemaDiwali) + if schemaName != dbSchemaDiwali { + t.Errorf("schema name mismatch. got '%s', want '%s'", schemaName, dbSchemaDiwali) } if !ran { @@ -105,20 +109,20 @@ func TestOneMigration(t *testing.T) { func TestManyMigrations(t *testing.T) { defer func(v []migration, s string) { schemaMigrations = v - DbSchemaCurrent = s - }(schemaMigrations, DbSchemaCurrent) + dbSchemaCurrent = s + }(schemaMigrations, dbSchemaCurrent) - DbSchemaCurrent = DbSchemaSanctuary + dbSchemaCurrent = dbSchemaSanctuary shouldNotRun := false executionOrder := []int{-1, -1, -1, -1} schemaMigrations = []migration{ - {name: DbSchemaSanctuary, fn: func(db *DB) error { + {name: dbSchemaSanctuary, fn: func(db *DB) error { shouldNotRun = true // this should not be executed return nil }}, - {name: DbSchemaDiwali, fn: func(db *DB) error { + {name: dbSchemaDiwali, fn: func(db *DB) error { executionOrder[0] = 0 return nil }}, @@ -157,13 +161,16 @@ func TestManyMigrations(t *testing.T) { t.Fatal(err) } - DbSchemaCurrent = "salvation" + dbSchemaCurrent = "salvation" // start the existing localstore and expect the migration to run db, err = New(dir, baseKey, nil) if err != nil { t.Fatal(err) } + if err := db.Migrate(); err != nil { + t.Fatal(err) + } schemaName, err := db.schemaName.Get() if err != nil { @@ -190,14 +197,194 @@ func TestManyMigrations(t *testing.T) { } } +// TestGetMigrations validates the migration selection based on +// current and target schema names. +func TestGetMigrations(t *testing.T) { + currentSchema := "current" + defaultTargetSchema := "target" + + for _, tc := range []struct { + name string + targetSchema string + migrations []migration + wantMigrations []migration + }{ + { + name: "empty", + targetSchema: "current", + migrations: []migration{ + {name: "current"}, + }, + }, + { + name: "single", + migrations: []migration{ + {name: "current"}, + {name: "target"}, + }, + wantMigrations: []migration{ + {name: "target"}, + }, + }, + { + name: "multiple", + migrations: []migration{ + {name: "current"}, + {name: "middle"}, + {name: "target"}, + }, + wantMigrations: []migration{ + {name: "middle"}, + {name: "target"}, + }, + }, + { + name: "between", + migrations: []migration{ + {name: "current"}, + {name: "target"}, + {name: "future"}, + }, + wantMigrations: []migration{ + {name: "target"}, + }, + }, + { + name: "between multiple", + migrations: []migration{ + {name: "current"}, + {name: "middle"}, + {name: "target"}, + {name: "future"}, + }, + wantMigrations: []migration{ + {name: "middle"}, + {name: "target"}, + }, + }, + { + name: "with previous", + migrations: []migration{ + {name: "previous"}, + {name: "current"}, + {name: "target"}, + }, + wantMigrations: []migration{ + {name: "target"}, + }, + }, + { + name: "with previous multiple", + migrations: []migration{ + {name: "previous"}, + {name: "current"}, + {name: "middle"}, + {name: "target"}, + }, + wantMigrations: []migration{ + {name: "middle"}, + {name: "target"}, + }, + }, + { + name: "breaking", + migrations: []migration{ + {name: "current"}, + {name: "target", breaking: true}, + }, + wantMigrations: []migration{ + {name: "target", breaking: true}, + }, + }, + { + name: "breaking multiple", + migrations: []migration{ + {name: "current"}, + {name: "middle"}, + {name: "breaking", breaking: true}, + {name: "target"}, + }, + wantMigrations: []migration{ + {name: "breaking", breaking: true}, + {name: "target"}, + }, + }, + { + name: "breaking with previous", + migrations: []migration{ + {name: "previous"}, + {name: "current"}, + {name: "target", breaking: true}, + }, + wantMigrations: []migration{ + {name: "target", breaking: true}, + }, + }, + { + name: "breaking multiple breaks", + migrations: []migration{ + {name: "current"}, + {name: "middle", breaking: true}, + {name: "target", breaking: true}, + }, + wantMigrations: []migration{ + {name: "target", breaking: true}, + }, + }, + { + name: "breaking multiple with middle", + migrations: []migration{ + {name: "current"}, + {name: "breaking", breaking: true}, + {name: "middle"}, + {name: "target", breaking: true}, + }, + wantMigrations: []migration{ + {name: "target", breaking: true}, + }, + }, + { + name: "breaking multiple between", + migrations: []migration{ + {name: "current"}, + {name: "breaking", breaking: true}, + {name: "middle"}, + {name: "target", breaking: true}, + {name: "future"}, + }, + wantMigrations: []migration{ + {name: "target", breaking: true}, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + targetSchema := tc.targetSchema + if targetSchema == "" { + targetSchema = defaultTargetSchema + } + got, err := getMigrations( + currentSchema, + targetSchema, + tc.migrations, + ) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(got, tc.wantMigrations) { + t.Errorf("got migrations %v, want %v", got, tc.wantMigrations) + } + }) + } +} + // TestMigrationFailFrom checks that local store boot should fail when the schema we're migrating from cannot be found func TestMigrationFailFrom(t *testing.T) { defer func(v []migration, s string) { schemaMigrations = v - DbSchemaCurrent = s - }(schemaMigrations, DbSchemaCurrent) + dbSchemaCurrent = s + }(schemaMigrations, dbSchemaCurrent) - DbSchemaCurrent = "koo-koo-schema" + dbSchemaCurrent = "koo-koo-schema" shouldNotRun := false schemaMigrations = []migration{ @@ -236,12 +423,16 @@ func TestMigrationFailFrom(t *testing.T) { t.Fatal(err) } - DbSchemaCurrent = "foo" + dbSchemaCurrent = "foo" // start the existing localstore and expect the migration to run db, err = New(dir, baseKey, nil) - if !strings.Contains(err.Error(), errMissingCurrentSchema.Error()) { - t.Fatalf("expected errCannotFindSchema but got %v", err) + if err != nil { + t.Fatal(err) + } + + if err := db.Migrate(); !errors.Is(err, errMissingCurrentSchema) { + t.Fatalf("got error %v, want %v", err, errMissingCurrentSchema) } if shouldNotRun { @@ -253,10 +444,10 @@ func TestMigrationFailFrom(t *testing.T) { func TestMigrationFailTo(t *testing.T) { defer func(v []migration, s string) { schemaMigrations = v - DbSchemaCurrent = s - }(schemaMigrations, DbSchemaCurrent) + dbSchemaCurrent = s + }(schemaMigrations, dbSchemaCurrent) - DbSchemaCurrent = "langur" + dbSchemaCurrent = "langur" shouldNotRun := false schemaMigrations = []migration{ @@ -295,12 +486,16 @@ func TestMigrationFailTo(t *testing.T) { t.Fatal(err) } - DbSchemaCurrent = "foo" + dbSchemaCurrent = "foo" // start the existing localstore and expect the migration to run db, err = New(dir, baseKey, nil) - if !strings.Contains(err.Error(), errMissingTargetSchema.Error()) { - t.Fatalf("expected errMissingTargetSchema but got %v", err) + if err != nil { + t.Fatal(err) + } + + if err := db.Migrate(); !errors.Is(err, errMissingTargetSchema) { + t.Fatalf("got error %v, want %v", err, errMissingTargetSchema) } if shouldNotRun { @@ -350,8 +545,8 @@ func TestMigrateSanctuaryFixture(t *testing.T) { t.Fatal(err) } - if schemaName != DbSchemaCurrent { - t.Fatalf("schema name mismatch, want '%s' got '%s'", DbSchemaCurrent, schemaName) + if schemaName != dbSchemaSanctuary { + t.Fatalf("schema name mismatch, want '%s' got '%s'", dbSchemaSanctuary, schemaName) } err = db.Close() diff --git a/storage/localstore/mode_get.go b/storage/localstore/mode_get.go index bccebd4dd8..354ebfc4f0 100644 --- a/storage/localstore/mode_get.go +++ b/storage/localstore/mode_get.go @@ -58,19 +58,19 @@ func (db *DB) Get(ctx context.Context, mode chunk.ModeGet, addr chunk.Address) ( // get returns Item from the retrieval index // and updates other indexes. func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err error) { - item := addressToItem(addr) - - out, err = db.retrievalDataIndex.Get(item) + c, err := db.data.Get(addr) if err != nil { return out, err } + out.Address = addr + out.Data = c.Data() switch mode { // update the access timestamp and gc index case chunk.ModeGetRequest: db.updateGCItems(out) case chunk.ModeGetPin: - pinnedItem, err := db.pinIndex.Get(item) + pinnedItem, err := db.pinIndex.Get(addressToItem(addr)) if err != nil { return out, err } @@ -133,9 +133,11 @@ func (db *DB) updateGC(item shed.Item) (err error) { // update accessTimeStamp in retrieve, gc - i, err := db.retrievalAccessIndex.Get(item) + i, err := db.metaIndex.Get(item) switch err { case nil: + item.BinID = i.BinID + item.StoreTimestamp = i.StoreTimestamp item.AccessTimestamp = i.AccessTimestamp case leveldb.ErrNotFound: // no chunk accesses @@ -152,10 +154,9 @@ func (db *DB) updateGC(item shed.Item) (err error) { // update access timestamp item.AccessTimestamp = now() // update retrieve access index - db.retrievalAccessIndex.PutInBatch(batch, item) + db.metaIndex.PutInBatch(batch, item) // add new entry to gc index db.gcIndex.PutInBatch(batch, item) - return db.shed.WriteBatch(batch) } diff --git a/storage/localstore/mode_get_multi.go b/storage/localstore/mode_get_multi.go index 393f8d119e..f95644c376 100644 --- a/storage/localstore/mode_get_multi.go +++ b/storage/localstore/mode_get_multi.go @@ -61,11 +61,16 @@ func (db *DB) GetMulti(ctx context.Context, mode chunk.ModeGet, addrs ...chunk.A // and updates other indexes. func (db *DB) getMulti(mode chunk.ModeGet, addrs ...chunk.Address) (out []shed.Item, err error) { out = make([]shed.Item, len(addrs)) - for i, addr := range addrs { - out[i].Address = addr + for i, a := range addrs { + c, err := db.data.Get(a) + if err != nil { + return nil, err + } + out[i].Address = a + out[i].Data = c.Data() } - err = db.retrievalDataIndex.Fill(out) + err = db.metaIndex.Fill(out) if err != nil { return nil, err } diff --git a/storage/localstore/mode_has.go b/storage/localstore/mode_has.go index 0d06b28076..bcbe374e7d 100644 --- a/storage/localstore/mode_has.go +++ b/storage/localstore/mode_has.go @@ -31,7 +31,7 @@ func (db *DB) Has(ctx context.Context, addr chunk.Address) (bool, error) { metrics.GetOrRegisterCounter(metricName, nil).Inc(1) defer totalTimeMetric(metricName, time.Now()) - has, err := db.retrievalDataIndex.Has(addressToItem(addr)) + has, err := db.data.Has(addr) if err != nil { metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1) } @@ -46,9 +46,14 @@ func (db *DB) HasMulti(ctx context.Context, addrs ...chunk.Address) ([]bool, err metrics.GetOrRegisterCounter(metricName, nil).Inc(1) defer totalTimeMetric(metricName, time.Now()) - have, err := db.retrievalDataIndex.HasMulti(addressesToItems(addrs...)...) - if err != nil { - metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1) + have := make([]bool, len(addrs)) + for i, a := range addrs { + has, err := db.data.Has(a) + if err != nil { + metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1) + return nil, err + } + have[i] = has } - return have, err + return have, nil } diff --git a/storage/localstore/mode_put.go b/storage/localstore/mode_put.go index 33f4c4e525..c4bdf7efa7 100644 --- a/storage/localstore/mode_put.go +++ b/storage/localstore/mode_put.go @@ -141,6 +141,14 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err return nil, err } + for i, ch := range chs { + if !exist[i] { + if err := db.data.Put(ch); err != nil { + return nil, err + } + } + } + err = db.shed.WriteBatch(batch) if err != nil { return nil, err @@ -161,12 +169,17 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err // The batch can be written to the database. // Provided batch and binID map are updated. func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) { - i, err := db.retrievalDataIndex.Get(item) + i, err := db.metaIndex.Get(item) switch err { case nil: exists = true - item.StoreTimestamp = i.StoreTimestamp item.BinID = i.BinID + item.StoreTimestamp = i.StoreTimestamp + item.AccessTimestamp = i.AccessTimestamp + if item.AccessTimestamp > 0 { + db.gcIndex.DeleteInBatch(batch, item) + gcSizeChange-- + } case leveldb.ErrNotFound: // no chunk accesses exists = false @@ -183,12 +196,12 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she } } - gcSizeChange, err = db.setGC(batch, item) - if err != nil { - return false, 0, err - } + item.AccessTimestamp = now() - db.retrievalDataIndex.PutInBatch(batch, item) + db.gcIndex.PutInBatch(batch, item) + gcSizeChange++ + + db.metaIndex.PutInBatch(batch, item) return exists, gcSizeChange, nil } @@ -198,7 +211,7 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she // The batch can be written to the database. // Provided batch and binID map are updated. func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) { - exists, err = db.retrievalDataIndex.Has(item) + exists, err = db.data.Has(item.Address) if err != nil { return false, 0, err } @@ -226,7 +239,7 @@ func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed if err != nil { return false, 0, err } - db.retrievalDataIndex.PutInBatch(batch, item) + db.metaIndex.PutInBatch(batch, item) db.pullIndex.PutInBatch(batch, item) if !anonymous { db.pushIndex.PutInBatch(batch, item) @@ -252,7 +265,7 @@ func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed // The batch can be written to the database. // Provided batch and binID map are updated. func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) { - exists, err = db.retrievalDataIndex.Has(item) + exists, err = db.data.Has(item.Address) if err != nil { return false, 0, err } @@ -272,7 +285,7 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I if err != nil { return false, 0, err } - db.retrievalDataIndex.PutInBatch(batch, item) + db.metaIndex.PutInBatch(batch, item) db.pullIndex.PutInBatch(batch, item) if db.putToGCCheck(item.Address) { @@ -291,31 +304,28 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I // setGC is a helper function used to add chunks to the retrieval access // index and the gc index in the cases that the putToGCCheck condition -// warrants a gc set. this is to mitigate index leakage in edge cases where +// warrants a gc set. This is to mitigate index leakage in edge cases where // a chunk is added to a node's localstore and given that the chunk is // already within that node's NN (thus, it can be added to the gc index -// safely) +// safely). func (db *DB) setGC(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, err error) { - if item.BinID == 0 { - i, err := db.retrievalDataIndex.Get(item) - if err != nil { - return 0, err - } - item.BinID = i.BinID - } - i, err := db.retrievalAccessIndex.Get(item) + i, err := db.metaIndex.Get(item) switch err { case nil: + item.BinID = i.BinID + item.StoreTimestamp = i.StoreTimestamp item.AccessTimestamp = i.AccessTimestamp - db.gcIndex.DeleteInBatch(batch, item) - gcSizeChange-- + if item.AccessTimestamp > 0 { + db.gcIndex.DeleteInBatch(batch, item) + gcSizeChange-- + } case leveldb.ErrNotFound: // the chunk is not accessed before default: return 0, err } item.AccessTimestamp = now() - db.retrievalAccessIndex.PutInBatch(batch, item) + db.metaIndex.PutInBatch(batch, item) db.gcIndex.PutInBatch(batch, item) gcSizeChange++ diff --git a/storage/localstore/mode_put_test.go b/storage/localstore/mode_put_test.go index ce2277ae4a..2d1da34bd6 100644 --- a/storage/localstore/mode_put_test.go +++ b/storage/localstore/mode_put_test.go @@ -54,7 +54,7 @@ func TestModePutRequest(t *testing.T) { } for _, ch := range chunks { - newRetrieveIndexesTestWithAccess(db, ch, wantTimestamp, wantTimestamp)(t) + newRetrieveIndexesTestWithAccess(db, ch, wantTimestamp, 0)(t) } newItemsCountTest(db.gcIndex, tc.count)(t) @@ -73,7 +73,7 @@ func TestModePutRequest(t *testing.T) { } for _, ch := range chunks { - newRetrieveIndexesTestWithAccess(db, ch, storeTimestamp, wantTimestamp)(t) + newRetrieveIndexesTestWithAccess(db, ch, storeTimestamp, 0)(t) } newItemsCountTest(db.gcIndex, tc.count)(t) @@ -309,7 +309,7 @@ func TestModePut_sameChunk(t *testing.T) { return 0 } - newItemsCountTest(db.retrievalDataIndex, tc.count)(t) + newDataCountTest(db, tc.count)(t) newItemsCountTest(db.pullIndex, count(tcn.pullIndex))(t) newItemsCountTest(db.pushIndex, count(tcn.pushIndex))(t) } @@ -319,13 +319,9 @@ func TestModePut_sameChunk(t *testing.T) { } } -// TestModePutSync_addToGc validates ModePut* with PutSetCheckFunc stub results -// in the added chunk to show up in GC index -func TestModePut_addToGc(t *testing.T) { - retVal := true - // PutSetCheckFunc's output is toggled from the test case - opts := &Options{PutToGCCheck: func(_ []byte) bool { return retVal }} - +// TestModePutSync_addToGC validates ModePut* with PutToGCCheck stub results +// in the added chunk to show up in GC index. +func TestModePut_addToGC(t *testing.T) { for _, m := range []struct { mode chunk.ModePut putToGc bool @@ -337,10 +333,10 @@ func TestModePut_addToGc(t *testing.T) { {mode: chunk.ModePutRequest, putToGc: true}, // in ModePutRequest we always insert to GC, so putToGc=false not needed } { for _, tc := range multiChunkTestCases { - t.Run(tc.name, func(t *testing.T) { - retVal = m.putToGc - - db, cleanupFunc := newTestDB(t, opts) + t.Run(fmt.Sprintf("%s %s putToGc=%v", tc.name, m.mode, m.putToGc), func(t *testing.T) { + db, cleanupFunc := newTestDB(t, &Options{ + PutToGCCheck: func(_ []byte) bool { return m.putToGc }, + }) defer cleanupFunc() wantTimestamp := time.Now().UTC().UnixNano() @@ -367,18 +363,19 @@ func TestModePut_addToGc(t *testing.T) { newRetrieveIndexesTestWithAccess(db, ch, wantTimestamp, wantTimestamp) newGCIndexTest(db, ch, wantTimestamp, wantTimestamp, binIDs[po], wantErr)(t) } + + if m.putToGc { + newItemsCountTest(db.gcIndex, tc.count)(t) + newIndexGCSizeTest(db)(t) + } }) } } } -// TestModePutSync_addToGcExisting validates ModePut* with PutSetCheckFunc stub results -// in the added chunk to show up in GC index -func TestModePut_addToGcExisting(t *testing.T) { - retVal := true - // PutSetCheckFunc's output is toggled from the test case - opts := &Options{PutToGCCheck: func(_ []byte) bool { return retVal }} - +// TestModePutSync_addToGCExisting validates ModePut* with PutToGCCheck stub results +// in the added chunk to show up in GC index. +func TestModePut_addToGCExisting(t *testing.T) { for _, m := range []struct { mode chunk.ModePut putToGc bool @@ -390,10 +387,10 @@ func TestModePut_addToGcExisting(t *testing.T) { {mode: chunk.ModePutRequest, putToGc: true}, // in ModePutRequest we always insert to GC, so putToGc=false not needed } { for _, tc := range multiChunkTestCases { - t.Run(tc.name, func(t *testing.T) { - retVal = m.putToGc - - db, cleanupFunc := newTestDB(t, opts) + t.Run(fmt.Sprintf("%s %s putToGc=%v", tc.name, m.mode, m.putToGc), func(t *testing.T) { + db, cleanupFunc := newTestDB(t, &Options{ + PutToGCCheck: func(_ []byte) bool { return m.putToGc }, + }) defer cleanupFunc() wantStoreTimestamp := time.Now().UTC().UnixNano() @@ -434,6 +431,11 @@ func TestModePut_addToGcExisting(t *testing.T) { newRetrieveIndexesTestWithAccess(db, ch, wantStoreTimestamp, wantAccessTimestamp) newGCIndexTest(db, ch, wantStoreTimestamp, wantAccessTimestamp, binIDs[po], wantErr)(t) } + + if m.putToGc { + newItemsCountTest(db.gcIndex, tc.count)(t) + newIndexGCSizeTest(db)(t) + } }) } } @@ -464,7 +466,7 @@ func TestPutDuplicateChunks(t *testing.T) { t.Error("second chunk should exist") } - newItemsCountTest(db.retrievalDataIndex, 1)(t) + newDataCountTest(db, 1)(t) got, err := db.Get(context.Background(), chunk.ModeGetLookup, ch.Address()) if err != nil { diff --git a/storage/localstore/mode_set.go b/storage/localstore/mode_set.go index 0d5cf229f1..2a786f64bd 100644 --- a/storage/localstore/mode_set.go +++ b/storage/localstore/mode_set.go @@ -54,6 +54,7 @@ func (db *DB) set(mode chunk.ModeSet, addrs ...chunk.Address) (err error) { defer db.batchMu.Unlock() batch := new(leveldb.Batch) + var removeChunks bool // variables that provide information for operations // to be done after write batch function successfully executes @@ -96,10 +97,11 @@ func (db *DB) set(mode chunk.ModeSet, addrs ...chunk.Address) (err error) { } gcSizeChange += c } + removeChunks = true case chunk.ModeSetPin: for _, addr := range addrs { - err := db.setPin(batch, addr) + err := db.setPin(batch, addr, 1) if err != nil { return err } @@ -125,6 +127,15 @@ func (db *DB) set(mode chunk.ModeSet, addrs ...chunk.Address) (err error) { if err != nil { return err } + + if removeChunks { + for _, a := range addrs { + if err := db.data.Delete(a); err != nil { + return err + } + } + } + for po := range triggerPullFeed { db.triggerPullSubscriptions(po) } @@ -138,14 +149,14 @@ func (db *DB) setAccess(batch *leveldb.Batch, binIDs map[uint8]uint64, addr chun item := addressToItem(addr) - // need to get access timestamp here as it is not - // provided by the access function, and it is not - // a property of a chunk provided to Accessor.Put. - i, err := db.retrievalDataIndex.Get(item) + i, err := db.metaIndex.Get(item) switch err { case nil: - item.StoreTimestamp = i.StoreTimestamp item.BinID = i.BinID + item.StoreTimestamp = i.StoreTimestamp + item.AccessTimestamp = i.AccessTimestamp + db.gcIndex.DeleteInBatch(batch, item) + gcSizeChange-- case leveldb.ErrNotFound: db.pushIndex.DeleteInBatch(batch, item) item.StoreTimestamp = now() @@ -156,20 +167,7 @@ func (db *DB) setAccess(batch *leveldb.Batch, binIDs map[uint8]uint64, addr chun default: return 0, err } - - i, err = db.retrievalAccessIndex.Get(item) - switch err { - case nil: - item.AccessTimestamp = i.AccessTimestamp - db.gcIndex.DeleteInBatch(batch, item) - gcSizeChange-- - case leveldb.ErrNotFound: - // the chunk is not accessed before - default: - return 0, err - } item.AccessTimestamp = now() - db.retrievalAccessIndex.PutInBatch(batch, item) db.pullIndex.PutInBatch(batch, item) db.gcIndex.PutInBatch(batch, item) gcSizeChange++ @@ -191,20 +189,16 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS // provided by the access function, and it is not // a property of a chunk provided to Accessor.Put. - i, err := db.retrievalDataIndex.Get(item) + i, err := db.metaIndex.Get(item) if err != nil { if err == leveldb.ErrNotFound { - // chunk is not found, - // no need to update gc index - // just delete from the push index - // if it is there - db.pushIndex.DeleteInBatch(batch, item) return 0, nil } return 0, err } - item.StoreTimestamp = i.StoreTimestamp item.BinID = i.BinID + item.StoreTimestamp = i.StoreTimestamp + item.AccessTimestamp = i.AccessTimestamp switch mode { case chunk.ModeSetSyncPull: @@ -276,19 +270,12 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS db.pushIndex.DeleteInBatch(batch, item) } - i, err = db.retrievalAccessIndex.Get(item) - switch err { - case nil: - item.AccessTimestamp = i.AccessTimestamp + if item.AccessTimestamp > 0 { db.gcIndex.DeleteInBatch(batch, item) gcSizeChange-- - case leveldb.ErrNotFound: - // the chunk is not accessed before - default: - return 0, err } item.AccessTimestamp = now() - db.retrievalAccessIndex.PutInBatch(batch, item) + db.metaIndex.PutInBatch(batch, item) // Add in gcIndex only if this chunk is not pinned ok, err := db.pinIndex.Has(item) @@ -312,23 +299,18 @@ func (db *DB) setRemove(batch *leveldb.Batch, addr chunk.Address) (gcSizeChange // need to get access timestamp here as it is not // provided by the access function, and it is not // a property of a chunk provided to Accessor.Put. - i, err := db.retrievalAccessIndex.Get(item) + i, err := db.metaIndex.Get(item) switch err { case nil: + item.BinID = i.BinID + item.StoreTimestamp = i.StoreTimestamp item.AccessTimestamp = i.AccessTimestamp case leveldb.ErrNotFound: default: return 0, err } - i, err = db.retrievalDataIndex.Get(item) - if err != nil { - return 0, err - } - item.StoreTimestamp = i.StoreTimestamp - item.BinID = i.BinID - db.retrievalDataIndex.DeleteInBatch(batch, item) - db.retrievalAccessIndex.DeleteInBatch(batch, item) + db.metaIndex.DeleteInBatch(batch, item) db.pullIndex.DeleteInBatch(batch, item) db.gcIndex.DeleteInBatch(batch, item) // a check is needed for decrementing gcSize @@ -344,7 +326,7 @@ func (db *DB) setRemove(batch *leveldb.Batch, addr chunk.Address) (gcSizeChange // setPin increments pin counter for the chunk by updating // pin index and sets the chunk to be excluded from garbage collection. // Provided batch is updated. -func (db *DB) setPin(batch *leveldb.Batch, addr chunk.Address) (err error) { +func (db *DB) setPin(batch *leveldb.Batch, addr chunk.Address, count uint64) (err error) { item := addressToItem(addr) // Get the existing pin counter of the chunk @@ -364,8 +346,8 @@ func (db *DB) setPin(batch *leveldb.Batch, addr chunk.Address) (err error) { existingPinCounter = pinnedChunk.PinCounter } - // Otherwise increase the existing counter by 1 - item.PinCounter = existingPinCounter + 1 + // Otherwise increase the existing counter by the pin count + item.PinCounter = existingPinCounter + count db.pinIndex.PutInBatch(batch, item) return nil diff --git a/storage/localstore/mode_set_test.go b/storage/localstore/mode_set_test.go index abe84cba73..606a2acda3 100644 --- a/storage/localstore/mode_set_test.go +++ b/storage/localstore/mode_set_test.go @@ -333,22 +333,23 @@ func TestModeSetRemove(t *testing.T) { t.Run("retrieve indexes", func(t *testing.T) { for _, ch := range chunks { - wantErr := leveldb.ErrNotFound - _, err := db.retrievalDataIndex.Get(addressToItem(ch.Address())) + wantErr := chunk.ErrChunkNotFound + _, err := db.data.Get(ch.Address()) if err != wantErr { t.Errorf("got error %v, want %v", err, wantErr) } // access index should not be set - _, err = db.retrievalAccessIndex.Get(addressToItem(ch.Address())) + _, err = db.metaIndex.Get(addressToItem(ch.Address())) + wantErr = leveldb.ErrNotFound if err != wantErr { t.Errorf("got error %v, want %v", err, wantErr) } } - t.Run("retrieve data index count", newItemsCountTest(db.retrievalDataIndex, 0)) + t.Run("retrieve data index count", newDataCountTest(db, 0)) - t.Run("retrieve access index count", newItemsCountTest(db.retrievalAccessIndex, 0)) + t.Run("retrieve access index count", newItemsCountTest(db.metaIndex, 0)) }) for _, ch := range chunks { diff --git a/storage/localstore/schema.go b/storage/localstore/schema.go index 512869f3bf..ba0e956096 100644 --- a/storage/localstore/schema.go +++ b/storage/localstore/schema.go @@ -22,50 +22,50 @@ import ( "github.com/syndtr/goleveldb/leveldb/opt" ) -// The DB schema we want to use. The actual/current DB schema might differ -// until migrations are run. -var DbSchemaCurrent = DbSchemaDiwali +// dbSchemaCurrent is the schema name of the current implementation. +// The actual/current DB schema might differ until migrations are run. +var dbSchemaCurrent = dbSchemaForky -// There was a time when we had no schema at all. -const DbSchemaNone = "" - -// "purity" is the first formal schema of LevelDB we release together with Swarm 0.3.5 -const DbSchemaPurity = "purity" - -// "halloween" is here because we had a screw in the garbage collector index. -// Because of that we had to rebuild the GC index to get rid of erroneous -// entries and that takes a long time. This schema is used for bookkeeping, -// so rebuild index will run just once. -const DbSchemaHalloween = "halloween" +const ( + // dbSchemaSanctuary is the first storage/localstore schema. + dbSchemaSanctuary = "sanctuary" + // dbSchemaDiwali migration simply renames the pullIndex in localstore. + dbSchemaDiwali = "diwali" + // dbSchemaForky migration implements FCDS storage and requires manual import and export. + dbSchemaForky = "forky" +) -const DbSchemaSanctuary = "sanctuary" +// IsLegacyDatabase returns true if legacy database is in the data directory. +func IsLegacyDatabase(datadir string) bool { -// the "diwali" migration simply renames the pullIndex in localstore -const DbSchemaDiwali = "diwali" + // "purity" is the first formal schema of LevelDB we release together with Swarm 0.3.5 + const dbSchemaPurity = "purity" -// returns true if legacy database is in the datadir -func IsLegacyDatabase(datadir string) bool { + // "halloween" is here because we had a screw in the garbage collector index. + // Because of that we had to rebuild the GC index to get rid of erroneous + // entries and that takes a long time. This schema is used for bookkeeping, + // so rebuild index will run just once. + const dbSchemaHalloween = "halloween" - var ( - legacyDbSchemaKey = []byte{8} - ) + var legacyDBSchemaKey = []byte{8} db, err := leveldb.OpenFile(datadir, &opt.Options{OpenFilesCacheCapacity: 128}) if err != nil { - log.Error("got an error while trying to open leveldb path", "path", datadir, "err", err) + log.Error("open leveldb", "path", datadir, "err", err) return false } defer db.Close() - data, err := db.Get(legacyDbSchemaKey, nil) + data, err := db.Get(legacyDBSchemaKey, nil) if err != nil { if err == leveldb.ErrNotFound { // if we haven't found anything under the legacy db schema key- we are not on legacy return false } - log.Error("got an unexpected error fetching legacy name from the database", "err", err) + log.Error("get legacy name from", "err", err) } - log.Trace("checking if database scheme is legacy", "schema name", string(data)) - return string(data) == DbSchemaHalloween || string(data) == DbSchemaPurity + schema := string(data) + log.Trace("checking if database scheme is legacy", "schema name", schema) + return schema == dbSchemaHalloween || schema == dbSchemaPurity } diff --git a/storage/localstore/subscription_pull_test.go b/storage/localstore/subscription_pull_test.go index 993fefe77d..cf679aee7b 100644 --- a/storage/localstore/subscription_pull_test.go +++ b/storage/localstore/subscription_pull_test.go @@ -569,7 +569,7 @@ func readPullSubscriptionBin(ctx context.Context, db *DB, bin uint8, ch <-chan c if !bytes.Equal(got.Address, addr) { err = fmt.Errorf("got chunk bin id %v in bin %v %v, want %v", i, bin, got.Address.Hex(), addr.Hex()) } else { - want, err := db.retrievalDataIndex.Get(shed.Item{ + want, err := db.metaIndex.Get(shed.Item{ Address: addr, }) if err != nil { diff --git a/storage/localstore/subscription_push.go b/storage/localstore/subscription_push.go index 1df1fabc9f..d5fe42eaa1 100644 --- a/storage/localstore/subscription_push.go +++ b/storage/localstore/subscription_push.go @@ -72,13 +72,13 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun var count int err := db.pushIndex.Iterate(func(item shed.Item) (stop bool, err error) { // get chunk data - dataItem, err := db.retrievalDataIndex.Get(item) + c, err := db.data.Get(item.Address) if err != nil { return true, err } select { - case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data).WithTagID(item.Tag): + case chunks <- chunk.NewChunk(c.Address(), c.Data()).WithTagID(item.Tag): count++ // set next iteration start item // when its chunk is successfully sent to channel diff --git a/swarm.go b/swarm.go index 774664b684..9c8eeacbe3 100644 --- a/swarm.go +++ b/swarm.go @@ -198,9 +198,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e // check that we are not in the old database schema // if so - fail and exit - isLegacy := localstore.IsLegacyDatabase(config.ChunkDbPath) - - if isLegacy { + if localstore.IsLegacyDatabase(config.ChunkDbPath) { return nil, errors.New("Legacy database format detected! Please read the migration announcement at: https://github.com/ethersphere/swarm/blob/master/docs/Migration-v0.3-to-v0.4.md") } @@ -234,6 +232,9 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e if err != nil { return nil, err } + if err := localStore.Migrate(); err != nil { + return nil, err + } lstore := chunk.NewValidatorStore( localStore, storage.NewContentAddressValidator(storage.MakeHashFunc(storage.DefaultHash)),