Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fs-repo-11-to-12 special flatfs handling #152

Merged
merged 8 commits into from
Feb 17, 2022
1 change: 1 addition & 0 deletions fs-repo-11-to-12/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ds-badger v0.2.7-0.20220117180822-159330558612 // indirect
github.com/ipfs/go-ds-flatfs v0.4.5
github.com/ipfs/go-filestore v1.0.0
github.com/ipfs/go-ipfs v0.8.0
github.com/ipfs/go-ipfs-ds-help v1.0.0
Expand Down
58 changes: 58 additions & 0 deletions fs-repo-11-to-12/migration/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package mg11

import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"reflect"
"unsafe"

ipfslite "github.com/hsanjuan/ipfs-lite"
migrate "github.com/ipfs/fs-repo-migrations/tools/go-migrate"
Expand All @@ -18,6 +21,9 @@ import (
loader "github.com/ipfs/go-ipfs/plugin/loader"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
format "github.com/ipfs/go-ipld-format"

"github.com/ipfs/go-datastore/mount"
flatfs "github.com/ipfs/go-ds-flatfs"
)

// locks the repo
Expand Down Expand Up @@ -83,6 +89,58 @@ func (m *Migration) open(opts migrate.Options) error {
return nil
}

func getUnexportedField(field reflect.Value) interface{} {
return reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem().Interface()
}

func IsBasicFlatFSBlockstore(dstore ds.Datastore) (dsPath string, v1 *flatfs.ShardIdV1, err error) {
errNotDefault := errors.New("not the default config")
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
if err := recover(); err != nil {
err = errNotDefault
}
}()

mds, ok := dstore.(*mount.Datastore)
if !ok {
return "", nil, errNotDefault
}

mnts, ok := getUnexportedField(reflect.ValueOf(mds).Elem().FieldByName("mounts")).([]mount.Mount)
if !ok {
return "", nil, errNotDefault
}

if len(mnts) != 2 {
return "", nil, errNotDefault
}

var blkDs ds.Datastore
if mnts[0].Prefix.Equal(blocksPrefix) {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
blkDs = mnts[0].Datastore
} else if mnts[1].Prefix.Equal(blocksPrefix) {
blkDs = mnts[1].Datastore
} else {
return "", nil, errNotDefault
}

if reflect.TypeOf(blkDs).String() != "*measure.measure" {
return "", nil, errNotDefault
}

fsds, ok := getUnexportedField(reflect.ValueOf(blkDs).Elem().FieldByName("backend")).(*flatfs.Datastore)
if !ok {
return "", nil, errNotDefault
}
fsdsPath := reflect.ValueOf(fsds).Elem().FieldByName("path").String()

shard, err := flatfs.ParseShardFunc(fsds.ShardStr())
if err != nil {
return "", nil, errNotDefault
}
return fsdsPath, shard, nil
}

// Create a file to store the list of migrated CIDs. If it exists, it is
// opened for appending only.
func createBackupFile(path, name string) (*os.File, error) {
Expand Down
116 changes: 116 additions & 0 deletions fs-repo-11-to-12/migration/swapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package mg11

import (
"errors"
flatfs "github.com/ipfs/go-ds-flatfs"
"os"
"path/filepath"
"strconv"
"sync"
"sync/atomic"

log "github.com/ipfs/fs-repo-migrations/tools/stump"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
ktds "github.com/ipfs/go-datastore/keytransform"
query "github.com/ipfs/go-datastore/query"
dshelp "github.com/ipfs/go-ipfs-ds-help"
)
Expand Down Expand Up @@ -177,10 +180,123 @@ func (cswap *CidSwapper) prepareWorker(resultsCh <-chan query.Result) (uint64, u
return sw.swapped, errored
}

func (cswap *CidSwapper) swapWorkerFlatFS(fsdsPath string, fsdsShard *flatfs.ShardIdV1, swapCh <-chan Swap, reverting bool) (uint64, uint64) {
var swapped, errored uint64

const flatfsExtension = ".data"
prefix := ktds.PrefixTransform{Prefix: blocksPrefix}

getPath := func(basePath string, key ds.Key) (string, string) {
child := prefix.InvertKey(key)
noslash := child.String()[1:]
dir := filepath.Join(fsdsPath, fsdsShard.Func()(noslash))
file := filepath.Join(dir, noslash+flatfsExtension)

return dir, file
}

genericSwker := &swapWorker{
store: cswap.Store,
syncPrefix: cswap.Prefix,
}
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved

// Process keys from the results channel
for sw := range swapCh {
if reverting {
old := sw.Old
sw.Old = sw.New
sw.New = old
}

if !sw.Old.Parent().Equal(sw.New.Parent()) {
log.Error("could not swap %s->%s. The namespaces changed. Skipping.", sw.Old, sw.New)
errored++
continue
}

if !sw.Old.Parent().Equal(blocksPrefix) {
err := genericSwker.swap(sw.Old, sw.New, reverting)

// The datastore does not have the block we are planning to
// migrate.
if err == ds.ErrNotFound {
log.Error("could not swap %s->%s. Could not find %s even though it was in the backup file. Skipping.", sw.Old, sw.New, sw.Old)
continue
} else if err != nil {
log.Error("swapping %s->%s: %s", sw.Old, sw.New, err)
errored++
continue
}

if cswap.SwapCh != nil {
cswap.SwapCh <- Swap{Old: sw.Old, New: sw.New}
}
continue
}

_, oldPath := getPath(fsdsPath, sw.Old)
newDir, newPath := getPath(fsdsPath, sw.New)

_, err := os.Stat(oldPath)
if err != nil {
log.Error("could not swap %s->%s. Could not find %s even though it was in the backup file %s. Skipping.", sw.Old, sw.New, sw.Old, err.Error())
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
continue
}

if err := os.Mkdir(newDir, 0755); err != nil && !os.IsExist(err) {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
log.Error("could not swap %s->%s. Skipping.", sw.Old, sw.New, err.Error())
continue
}

if err := os.Rename(oldPath, newPath); err != nil {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
log.Error("could not swap %s->%s. Skipping.", sw.Old, sw.New, err.Error())
errored++
continue
}
swapped++

const swapLogThreshold = 10000
if swapped%swapLogThreshold == 0 {
log.Log("Migration worker has moved %d flatfs files", swapLogThreshold)
}
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved

if cswap.SwapCh != nil {
cswap.SwapCh <- Swap{Old: sw.Old, New: sw.New}
}
}

// handle generic worker sync
// final sync to added things
err := genericSwker.syncAndDelete()
if err != nil {
log.Error("error performing last sync: %s", err)
errored++
}
err = genericSwker.sync() // final sync for deletes.
if err != nil {
log.Error("error performing last sync for deletions: %s", err)
errored++
}

return genericSwker.swapped + swapped, errored
}

// unswap worker takes notifications from unswapCh (as they would be sent by
// the swapWorker) and undoes them. It ignores NotFound errors so that reverts
// can succeed even if they failed half-way.
func (cswap *CidSwapper) swapWorker(swapCh <-chan Swap, reverting bool) (uint64, uint64) {
// Use the more generic datastore swapper if not using a simple FlatFS setup.
// Also use it for reversion since the FlatFS specific code doesn't specifically
// handle some reversion edge cases.
fsdsPath, fsDsShard, err := IsBasicFlatFSBlockstore(cswap.Store)
if err != nil || reverting {
return cswap.swapWorkerDS(swapCh, reverting)
}

return cswap.swapWorkerFlatFS(fsdsPath, fsDsShard, swapCh, reverting)
}

func (cswap *CidSwapper) swapWorkerDS(swapCh <-chan Swap, reverting bool) (uint64, uint64) {
var errored uint64

swker := &swapWorker{
Expand Down
1 change: 1 addition & 0 deletions fs-repo-11-to-12/vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ github.com/ipfs/go-datastore/sync
## explicit
github.com/ipfs/go-ds-badger
# github.com/ipfs/go-ds-flatfs v0.4.5
## explicit
github.com/ipfs/go-ds-flatfs
# github.com/ipfs/go-ds-leveldb v0.4.2
github.com/ipfs/go-ds-leveldb
Expand Down