Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fs-repo-11-to-12 special flatfs handling #152

Merged
merged 8 commits into from
Feb 17, 2022
1 change: 1 addition & 0 deletions fs-repo-11-to-12/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ds-badger v0.2.7-0.20220117180822-159330558612 // indirect
github.com/ipfs/go-ds-flatfs v0.4.5
github.com/ipfs/go-filestore v1.0.0
github.com/ipfs/go-ipfs v0.8.0
github.com/ipfs/go-ipfs-ds-help v1.0.0
Expand Down
4 changes: 1 addition & 3 deletions fs-repo-11-to-12/migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"fmt"
"os"
"path/filepath"
"sync"

log "github.com/ipfs/fs-repo-migrations/tools/stump"

Expand Down Expand Up @@ -42,8 +41,7 @@ var migrationPrefixes = []ds.Key{

// Migration implements the migration described above.
type Migration struct {
loadPluginsOnce sync.Once
dstore ds.Batching
dstore ds.Batching
}

// Versions returns the current version string for this migration.
Expand Down
21 changes: 20 additions & 1 deletion fs-repo-11-to-12/migration/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,24 @@ const (
workingRepo = "repotest_copy"
)

func TestGenericMigration(t *testing.T) {
origSetting := EnableFlatFSFastPath
defer func() {
EnableFlatFSFastPath = origSetting
}()
EnableFlatFSFastPath = false
testMigrationBase(t)
}

func TestFlatFSMigration(t *testing.T) {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
origSetting := EnableFlatFSFastPath
defer func() {
EnableFlatFSFastPath = origSetting
}()
EnableFlatFSFastPath = true
testMigrationBase(t)
}

// TestMigration works on an IPFS repository as created by running steps.sh
// with ipfs v0.8.0 on using the $repotest folder:
//
Expand All @@ -28,7 +46,7 @@ const (
// added bafybeie4pduk2uwvr5dq36wnbhxspgox7dtqo3fprri4r2wpa7vrej5jqq b
// added Qmesmmf1EEG1orJb6XdK6DabxexsseJnCfw8pqWgonbkoj c/file3
// added QmT3zhz9ZZjEpbzWib95EQ5ESUQs4YasrMQwPScpNGLEXZ c
func TestMigration(t *testing.T) {
func testMigrationBase(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -176,6 +194,7 @@ func TestMigration(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer m.dstore.Close()

// Check that the CIDv1s that we explicitally pinned or
// added to MFS are now retrievable as CIDv1-addressed nodes.
Expand Down
71 changes: 67 additions & 4 deletions fs-repo-11-to-12/migration/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package mg11

import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"reflect"
"sync"
"unsafe"

ipfslite "github.com/hsanjuan/ipfs-lite"
migrate "github.com/ipfs/fs-repo-migrations/tools/go-migrate"
Expand All @@ -18,6 +22,9 @@ import (
loader "github.com/ipfs/go-ipfs/plugin/loader"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
format "github.com/ipfs/go-ipld-format"

"github.com/ipfs/go-datastore/mount"
flatfs "github.com/ipfs/go-ds-flatfs"
)

// locks the repo
Expand All @@ -26,14 +33,18 @@ func (m *Migration) lock(opts migrate.Options) (io.Closer, error) {
return lock.Lock2(opts.Path)
}

var loadPluginsOnce sync.Once

// this is just setup so that we can open the datastore.
// Plugins are loaded once only.
func (m *Migration) setupPlugins(opts migrate.Options) error {
// Note: this means plugins cannot be loaded from multiple repos within the same binary
// however, this does not seem relevant for migrations
func setupPlugins(repoPath string) error {
var err error
var plugins *loader.PluginLoader
m.loadPluginsOnce.Do(func() {
loadPluginsOnce.Do(func() {
log.VLog(" - loading repo configurations")
plugins, err = loader.NewPluginLoader(opts.Path)
plugins, err = loader.NewPluginLoader(repoPath)
if err != nil {
err = fmt.Errorf("error loading plugins: %s", err)
return
Expand All @@ -56,7 +67,7 @@ func (m *Migration) setupPlugins(opts migrate.Options) error {
// user's IPFS configuration says that should be used. If we had a datastore,
// we close it and re-open it.
func (m *Migration) open(opts migrate.Options) error {
if err := m.setupPlugins(opts); err != nil {
if err := setupPlugins(opts.Path); err != nil {
return err
}

Expand All @@ -83,6 +94,58 @@ func (m *Migration) open(opts migrate.Options) error {
return nil
}

func getUnexportedField(field reflect.Value) interface{} {
return reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem().Interface()
}

func IsBasicFlatFSBlockstore(dstore ds.Datastore) (dsPath string, v1 *flatfs.ShardIdV1, err error) {
errNotSupportedFlatFSConfig := errors.New("not a supported FlatFS config")
defer func() {
if err := recover(); err != nil {
err = errNotSupportedFlatFSConfig
}
}()

mds, ok := dstore.(*mount.Datastore)
if !ok {
return "", nil, errNotSupportedFlatFSConfig
}

mnts, ok := getUnexportedField(reflect.ValueOf(mds).Elem().FieldByName("mounts")).([]mount.Mount)
if !ok {
return "", nil, errNotSupportedFlatFSConfig
}

if len(mnts) != 2 {
return "", nil, errNotSupportedFlatFSConfig
}

var blkDs ds.Datastore
if mnts[0].Prefix.Equal(blocksPrefix) {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
blkDs = mnts[0].Datastore
} else if mnts[1].Prefix.Equal(blocksPrefix) {
blkDs = mnts[1].Datastore
} else {
return "", nil, errNotSupportedFlatFSConfig
}

if reflect.TypeOf(blkDs).String() != "*measure.measure" {
return "", nil, errNotSupportedFlatFSConfig
}

fsds, ok := getUnexportedField(reflect.ValueOf(blkDs).Elem().FieldByName("backend")).(*flatfs.Datastore)
if !ok {
return "", nil, errNotSupportedFlatFSConfig
}
fsdsPath := reflect.ValueOf(fsds).Elem().FieldByName("path").String()

shard, err := flatfs.ParseShardFunc(fsds.ShardStr())
if err != nil {
return "", nil, errNotSupportedFlatFSConfig
}
return fsdsPath, shard, nil
}

// Create a file to store the list of migrated CIDs. If it exists, it is
// opened for appending only.
func createBackupFile(path, name string) (*os.File, error) {
Expand Down
142 changes: 141 additions & 1 deletion fs-repo-11-to-12/migration/swapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package mg11

import (
"errors"
flatfs "github.com/ipfs/go-ds-flatfs"
"os"
"path/filepath"
"strconv"
"sync"
"sync/atomic"
"time"

log "github.com/ipfs/fs-repo-migrations/tools/stump"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
ktds "github.com/ipfs/go-datastore/keytransform"
query "github.com/ipfs/go-datastore/query"
dshelp "github.com/ipfs/go-ipfs-ds-help"
)
Expand All @@ -21,9 +25,12 @@ var SyncSize uint64 = 100 * 1024 * 1024 // 100MiB
// migration.
var NWorkers int = 1

var EnableFlatFSFastPath bool = true

func init() {
workerEnvVar := "IPFS_FS_MIGRATION_11_TO_12_NWORKERS"
syncSizeEnvVar := "IPFS_FS_MIGRATION_11_TO_12_SYNC_SIZE_BYTES"
flatfsFastPathEnvVar := "IPFS_FS_MIGRATION_11_TO_12_ENABLE_FLATFS_FASTPATH"
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
if nworkersStr, nworkerInEnv := os.LookupEnv(workerEnvVar); nworkerInEnv {
nworkers, err := strconv.Atoi(nworkersStr)
if err != nil {
Expand All @@ -45,6 +52,14 @@ func init() {
}
SyncSize = syncSize
}

if flatfsFastPathStr, flatfsFastPathInEnv := os.LookupEnv(flatfsFastPathEnvVar); flatfsFastPathInEnv {
enableFlatfsFastPath, err := strconv.ParseBool(flatfsFastPathStr)
if err != nil {
panic(err)
}
EnableFlatFSFastPath = enableFlatfsFastPath
}
}

// Swap holds the datastore keys for the original CID and for the
Expand Down Expand Up @@ -177,10 +192,135 @@ func (cswap *CidSwapper) prepareWorker(resultsCh <-chan query.Result) (uint64, u
return sw.swapped, errored
}

func (cswap *CidSwapper) swapWorkerFlatFS(fsdsPath string, fsdsShard *flatfs.ShardIdV1, swapCh <-chan Swap, reverting bool) (uint64, uint64) {
var swapped, errored uint64

const flatfsExtension = ".data"
prefix := ktds.PrefixTransform{Prefix: blocksPrefix}

getPath := func(basePath string, key ds.Key) (string, string) {
child := prefix.InvertKey(key)
noslash := child.String()[1:]
dir := filepath.Join(fsdsPath, fsdsShard.Func()(noslash))
file := filepath.Join(dir, noslash+flatfsExtension)

return dir, file
}

genericSwker := &swapWorker{
store: cswap.Store,
syncPrefix: cswap.Prefix,
}
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved

// the frequency with which we log flatfs moves
const swapLogThreshold = 10000

// Process keys from the results channel
for sw := range swapCh {
if reverting {
old := sw.Old
sw.Old = sw.New
sw.New = old
}

if !sw.Old.Parent().Equal(sw.New.Parent()) {
log.Error("could not swap %s->%s. The namespaces changed. Skipping.", sw.Old, sw.New)
errored++
continue
}

if !sw.Old.Parent().Equal(blocksPrefix) {
err := genericSwker.swap(sw.Old, sw.New, reverting)

// The datastore does not have the block we are planning to
// migrate.
if err == ds.ErrNotFound {
log.Error("could not swap %s->%s. Could not find %s even though it was in the backup file. Skipping.", sw.Old, sw.New, sw.Old)
continue
} else if err != nil {
log.Error("swapping %s->%s: %s", sw.Old, sw.New, err)
errored++
continue
}

if cswap.SwapCh != nil {
cswap.SwapCh <- Swap{Old: sw.Old, New: sw.New}
}
continue
}

_, oldPath := getPath(fsdsPath, sw.Old)
newDir, newPath := getPath(fsdsPath, sw.New)

_, err := os.Stat(oldPath)
if err != nil {
log.Error("could not swap %s->%s. Could not find %s even though it was in the backup file %s. Skipping.", sw.Old, sw.New, sw.Old, err.Error())
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
continue
}

if err := os.Mkdir(newDir, 0755); err != nil && !os.IsExist(err) {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
log.Error("could not swap %s->%s. Skipping.", sw.Old, sw.New, err.Error())
continue
}

if err := os.Rename(oldPath, newPath); err != nil {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
log.Error("could not swap %s->%s. Skipping.", sw.Old, sw.New, err.Error())
errored++
continue
}
swapped++

if swapped%swapLogThreshold == 0 {
log.Log("%v: Migration worker has moved %d flatfs files and %d in total", time.Now(), swapLogThreshold, swapped)
}

if cswap.SwapCh != nil {
cswap.SwapCh <- Swap{Old: sw.Old, New: sw.New}
}
}

// log the leftover flatfs moves that were not already logged
if rem := swapped % swapLogThreshold; rem != 0 {
log.Log("%v: Migration worker has moved %d flatfs files and %d in total", time.Now(), rem, swapped)
}

// handle generic worker sync
// final sync to added things
err := genericSwker.syncAndDelete()
if err != nil {
log.Error("error performing last sync: %s", err)
errored++
}
err = genericSwker.sync() // final sync for deletes.
if err != nil {
log.Error("error performing last sync for deletions: %s", err)
errored++
}

return genericSwker.swapped + swapped, errored
}

// unswap worker takes notifications from unswapCh (as they would be sent by
// the swapWorker) and undoes them. It ignores NotFound errors so that reverts
// can succeed even if they failed half-way.
func (cswap *CidSwapper) swapWorker(swapCh <-chan Swap, reverting bool) (uint64, uint64) {
// Use the more generic datastore swapper if the FlatFS fast path has been explicitly disabled
// Also use it for reversion since the FlatFS specific code doesn't specifically
// handle some reversion edge cases.
if !EnableFlatFSFastPath || reverting {
return cswap.swapWorkerDS(swapCh, reverting)
}

// Use the more generic datastore swapper if not using a simple FlatFS setup.
fsdsPath, fsDsShard, err := IsBasicFlatFSBlockstore(cswap.Store)
if err != nil {
return cswap.swapWorkerDS(swapCh, reverting)
}

return cswap.swapWorkerFlatFS(fsdsPath, fsDsShard, swapCh, reverting)
}

func (cswap *CidSwapper) swapWorkerDS(swapCh <-chan Swap, reverting bool) (uint64, uint64) {
var errored uint64

swker := &swapWorker{
Expand Down Expand Up @@ -288,7 +428,7 @@ func (sw *swapWorker) syncAndDelete() error {
}

func (sw *swapWorker) sync() error {
log.Log("Migration worker syncing after %d objects migrated", sw.swapped)
log.Log("%v: Generic migration worker syncing after %d objects migrated", time.Now(), sw.swapped)
err := sw.store.Sync(sw.syncPrefix)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions fs-repo-11-to-12/vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ github.com/ipfs/go-datastore/sync
## explicit
github.com/ipfs/go-ds-badger
# github.com/ipfs/go-ds-flatfs v0.4.5
## explicit
github.com/ipfs/go-ds-flatfs
# github.com/ipfs/go-ds-leveldb v0.4.2
github.com/ipfs/go-ds-leveldb
Expand Down