Skip to content

Commit

Permalink
feat: fs-repo-11-to-12 special flatfs handling
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Feb 15, 2022
1 parent 9dcf004 commit 359523d
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 0 deletions.
1 change: 1 addition & 0 deletions fs-repo-11-to-12/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ds-badger v0.2.7-0.20220117180822-159330558612 // indirect
github.com/ipfs/go-ds-flatfs v0.4.5
github.com/ipfs/go-filestore v1.0.0
github.com/ipfs/go-ipfs v0.8.0
github.com/ipfs/go-ipfs-ds-help v1.0.0
Expand Down
58 changes: 58 additions & 0 deletions fs-repo-11-to-12/migration/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package mg11

import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"reflect"
"unsafe"

ipfslite "github.com/hsanjuan/ipfs-lite"
migrate "github.com/ipfs/fs-repo-migrations/tools/go-migrate"
Expand All @@ -18,6 +21,9 @@ import (
loader "github.com/ipfs/go-ipfs/plugin/loader"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
format "github.com/ipfs/go-ipld-format"

"github.com/ipfs/go-datastore/mount"
flatfs "github.com/ipfs/go-ds-flatfs"
)

// locks the repo
Expand Down Expand Up @@ -83,6 +89,58 @@ func (m *Migration) open(opts migrate.Options) error {
return nil
}

func getUnexportedField(field reflect.Value) interface{} {
return reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem().Interface()
}

func IsDefaultConfig(dstore ds.Datastore) (dsPath string, v1 *flatfs.ShardIdV1, err error) {
errNotDefault := errors.New("not the default config")
defer func() {
if err := recover(); err != nil {
err = errNotDefault
}
}()

mds, ok := dstore.(*mount.Datastore)
if !ok {
return "", nil, errNotDefault
}

mnts, ok := getUnexportedField(reflect.ValueOf(mds).Elem().FieldByName("mounts")).([]mount.Mount)
if !ok {
return "", nil, errNotDefault
}

if len(mnts) != 2 {
return "", nil, errNotDefault
}

var blkDs ds.Datastore
if mnts[0].Prefix.Equal(blocksPrefix) {
blkDs = mnts[0].Datastore
} else if mnts[1].Prefix.Equal(blocksPrefix) {
blkDs = mnts[1].Datastore
} else {
return "", nil, errNotDefault
}

if reflect.TypeOf(blkDs).String() != "*measure.measure" {
return "", nil, errNotDefault
}

fsds, ok := getUnexportedField(reflect.ValueOf(blkDs).Elem().FieldByName("backend")).(*flatfs.Datastore)
if !ok {
return "", nil, errNotDefault
}
fsdsPath := reflect.ValueOf(fsds).Elem().FieldByName("path").String()

shard, err := flatfs.ParseShardFunc(fsds.ShardStr())
if err != nil {
return "", nil, errNotDefault
}
return fsdsPath, shard, nil
}

// Create a file to store the list of migrated CIDs. If it exists, it is
// opened for appending only.
func createBackupFile(path, name string) (*os.File, error) {
Expand Down
64 changes: 64 additions & 0 deletions fs-repo-11-to-12/migration/swapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package mg11

import (
"errors"
flatfs "github.com/ipfs/go-ds-flatfs"
"os"
"path/filepath"
"strconv"
"sync"
"sync/atomic"

log "github.com/ipfs/fs-repo-migrations/tools/stump"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
ktds "github.com/ipfs/go-datastore/keytransform"
query "github.com/ipfs/go-datastore/query"
dshelp "github.com/ipfs/go-ipfs-ds-help"
)
Expand Down Expand Up @@ -177,10 +180,71 @@ func (cswap *CidSwapper) prepareWorker(resultsCh <-chan query.Result) (uint64, u
return sw.swapped, errored
}

func (cswap *CidSwapper) swapWorkerFlatFS(fsdsPath string, fsdsShard *flatfs.ShardIdV1, swapCh <-chan Swap, reverting bool) (uint64, uint64) {
var swapped, errored uint64

const flatfsExtension = ".data"
prefix := ktds.PrefixTransform{Prefix: blocksPrefix}

getPath := func(basePath string, key ds.Key) (string, string) {
child := prefix.InvertKey(key)
noslash := child.String()[1:]
dir := filepath.Join(fsdsPath, fsdsShard.Func()(noslash))
file := filepath.Join(dir, noslash+flatfsExtension)

return dir, file
}

// Process keys from the results channel
for sw := range swapCh {
if reverting {
old := sw.Old
sw.Old = sw.New
sw.New = old
}

_, oldPath := getPath(fsdsPath, sw.Old)
newDir, newPath := getPath(fsdsPath, sw.New)

_, err := os.Stat(oldPath)
if err != nil {
log.Error("could not swap %s->%s. Could not find %s even though it was in the backup file %s. Skipping.", sw.Old, sw.New, sw.Old, err.Error())
continue
}

if err := os.Mkdir(newDir, 0755); err != nil && !os.IsExist(err) {
log.Error("could not swap %s->%s. Skipping.", sw.Old, sw.New, err.Error())
continue
}

if err := os.Rename(oldPath, newPath); err != nil {
log.Error("could not swap %s->%s. Skipping.", sw.Old, sw.New, err.Error())
errored++
continue
}
swapped++

if cswap.SwapCh != nil {
cswap.SwapCh <- Swap{Old: sw.Old, New: sw.New}
}
}

return swapped, errored
}

// unswap worker takes notifications from unswapCh (as they would be sent by
// the swapWorker) and undoes them. It ignores NotFound errors so that reverts
// can succeed even if they failed half-way.
func (cswap *CidSwapper) swapWorker(swapCh <-chan Swap, reverting bool) (uint64, uint64) {
fsdsPath, fsDsShard, err := IsDefaultConfig(cswap.Store)
if err != nil {
return cswap.swapWorkerDS(swapCh, reverting)
}

return cswap.swapWorkerFlatFS(fsdsPath, fsDsShard, swapCh, reverting)
}

func (cswap *CidSwapper) swapWorkerDS(swapCh <-chan Swap, reverting bool) (uint64, uint64) {
var errored uint64

swker := &swapWorker{
Expand Down
1 change: 1 addition & 0 deletions fs-repo-11-to-12/vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ github.com/ipfs/go-datastore/sync
## explicit
github.com/ipfs/go-ds-badger
# github.com/ipfs/go-ds-flatfs v0.4.5
## explicit
github.com/ipfs/go-ds-flatfs
# github.com/ipfs/go-ds-leveldb v0.4.2
github.com/ipfs/go-ds-leveldb
Expand Down

0 comments on commit 359523d

Please sign in to comment.