Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fs-repo-11-to-12 special flatfs handling #152

Merged
merged 8 commits into from
Feb 17, 2022
4 changes: 1 addition & 3 deletions fs-repo-11-to-12/migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"fmt"
"os"
"path/filepath"
"sync"

log "github.com/ipfs/fs-repo-migrations/tools/stump"

Expand Down Expand Up @@ -42,8 +41,7 @@ var migrationPrefixes = []ds.Key{

// Migration implements the migration described above.
type Migration struct {
loadPluginsOnce sync.Once
dstore ds.Batching
dstore ds.Batching
}

// Versions returns the current version string for this migration.
Expand Down
21 changes: 20 additions & 1 deletion fs-repo-11-to-12/migration/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,24 @@ const (
workingRepo = "repotest_copy"
)

func TestGenericMigration(t *testing.T) {
origSetting := EnableFlatFSFastPath
defer func() {
EnableFlatFSFastPath = origSetting
}()
EnableFlatFSFastPath = false
testMigrationBase(t)
}

func TestFlatFSMigration(t *testing.T) {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
origSetting := EnableFlatFSFastPath
defer func() {
EnableFlatFSFastPath = origSetting
}()
EnableFlatFSFastPath = true
testMigrationBase(t)
}

// TestMigration works on an IPFS repository as created by running steps.sh
// with ipfs v0.8.0 on using the $repotest folder:
//
Expand All @@ -28,7 +46,7 @@ const (
// added bafybeie4pduk2uwvr5dq36wnbhxspgox7dtqo3fprri4r2wpa7vrej5jqq b
// added Qmesmmf1EEG1orJb6XdK6DabxexsseJnCfw8pqWgonbkoj c/file3
// added QmT3zhz9ZZjEpbzWib95EQ5ESUQs4YasrMQwPScpNGLEXZ c
func TestMigration(t *testing.T) {
func testMigrationBase(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -176,6 +194,7 @@ func TestMigration(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer m.dstore.Close()

// Check that the CIDv1s that we explicitally pinned or
// added to MFS are now retrievable as CIDv1-addressed nodes.
Expand Down
13 changes: 9 additions & 4 deletions fs-repo-11-to-12/migration/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"reflect"
"sync"
"unsafe"

ipfslite "github.com/hsanjuan/ipfs-lite"
Expand All @@ -32,14 +33,18 @@ func (m *Migration) lock(opts migrate.Options) (io.Closer, error) {
return lock.Lock2(opts.Path)
}

var loadPluginsOnce sync.Once

// this is just setup so that we can open the datastore.
// Plugins are loaded once only.
func (m *Migration) setupPlugins(opts migrate.Options) error {
// Note: this means plugins cannot be loaded from multiple repos within the same binary
// however, this does not seem relevant for migrations
func setupPlugins(repoPath string) error {
var err error
var plugins *loader.PluginLoader
m.loadPluginsOnce.Do(func() {
loadPluginsOnce.Do(func() {
log.VLog(" - loading repo configurations")
plugins, err = loader.NewPluginLoader(opts.Path)
plugins, err = loader.NewPluginLoader(repoPath)
if err != nil {
err = fmt.Errorf("error loading plugins: %s", err)
return
Expand All @@ -62,7 +67,7 @@ func (m *Migration) setupPlugins(opts migrate.Options) error {
// user's IPFS configuration says that should be used. If we had a datastore,
// we close it and re-open it.
func (m *Migration) open(opts migrate.Options) error {
if err := m.setupPlugins(opts); err != nil {
if err := setupPlugins(opts.Path); err != nil {
return err
}

Expand Down
31 changes: 27 additions & 4 deletions fs-repo-11-to-12/migration/swapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ var SyncSize uint64 = 100 * 1024 * 1024 // 100MiB
// migration.
var NWorkers int = 1

var EnableFlatFSFastPath bool = true

func init() {
workerEnvVar := "IPFS_FS_MIGRATION_11_TO_12_NWORKERS"
syncSizeEnvVar := "IPFS_FS_MIGRATION_11_TO_12_SYNC_SIZE_BYTES"
flatfsFastPathEnvVar := "IPFS_FS_MIGRATION_11_TO_12_ENABLE_FLATFS_FASTPATH"
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
if nworkersStr, nworkerInEnv := os.LookupEnv(workerEnvVar); nworkerInEnv {
nworkers, err := strconv.Atoi(nworkersStr)
if err != nil {
Expand All @@ -49,6 +52,14 @@ func init() {
}
SyncSize = syncSize
}

if flatfsFastPathStr, flatfsFastPathInEnv := os.LookupEnv(flatfsFastPathEnvVar); flatfsFastPathInEnv {
enableFlatfsFastPath, err := strconv.ParseBool(flatfsFastPathStr)
if err != nil {
panic(err)
}
EnableFlatFSFastPath = enableFlatfsFastPath
}
}

// Swap holds the datastore keys for the original CID and for the
Expand Down Expand Up @@ -201,6 +212,9 @@ func (cswap *CidSwapper) swapWorkerFlatFS(fsdsPath string, fsdsShard *flatfs.Sha
syncPrefix: cswap.Prefix,
}

// the frequency with which we log flatfs moves
const swapLogThreshold = 10000

// Process keys from the results channel
for sw := range swapCh {
if reverting {
Expand Down Expand Up @@ -256,7 +270,6 @@ func (cswap *CidSwapper) swapWorkerFlatFS(fsdsPath string, fsdsShard *flatfs.Sha
}
swapped++

const swapLogThreshold = 10000
if swapped%swapLogThreshold == 0 {
log.Log("%v: Migration worker has moved %d flatfs files and %d in total", time.Now(), swapLogThreshold, swapped)
}
Expand All @@ -266,6 +279,11 @@ func (cswap *CidSwapper) swapWorkerFlatFS(fsdsPath string, fsdsShard *flatfs.Sha
}
}

// log the leftover flatfs moves that were not already logged
if rem := swapped % swapLogThreshold; rem != 0 {
log.Log("%v: Migration worker has moved %d flatfs files and %d in total", time.Now(), rem, swapped)
}

// handle generic worker sync
// final sync to added things
err := genericSwker.syncAndDelete()
Expand All @@ -286,11 +304,16 @@ func (cswap *CidSwapper) swapWorkerFlatFS(fsdsPath string, fsdsShard *flatfs.Sha
// the swapWorker) and undoes them. It ignores NotFound errors so that reverts
// can succeed even if they failed half-way.
func (cswap *CidSwapper) swapWorker(swapCh <-chan Swap, reverting bool) (uint64, uint64) {
// Use the more generic datastore swapper if not using a simple FlatFS setup.
// Use the more generic datastore swapper if the FlatFS fast path has been explicitly disabled
// Also use it for reversion since the FlatFS specific code doesn't specifically
// handle some reversion edge cases.
if !EnableFlatFSFastPath || reverting {
return cswap.swapWorkerDS(swapCh, reverting)
}

// Use the more generic datastore swapper if not using a simple FlatFS setup.
fsdsPath, fsDsShard, err := IsBasicFlatFSBlockstore(cswap.Store)
if err != nil || reverting {
if err != nil {
return cswap.swapWorkerDS(swapCh, reverting)
}

Expand Down Expand Up @@ -405,7 +428,7 @@ func (sw *swapWorker) syncAndDelete() error {
}

func (sw *swapWorker) sync() error {
log.Log("%v: Migration worker syncing after %d objects migrated", time.Now(), sw.swapped)
log.Log("%v: Generic migration worker syncing after %d objects migrated", time.Now(), sw.swapped)
err := sw.store.Sync(sw.syncPrefix)
if err != nil {
return err
Expand Down