Skip to content

Commit

Permalink
Merge pull request #344 from numtide/fix/no-cache
Browse files Browse the repository at this point in the history
fix: --no-cache
  • Loading branch information
brianmcgee committed Jul 5, 2024
2 parents be50beb + 42decbf commit 23e563b
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 98 deletions.
15 changes: 3 additions & 12 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,21 +283,12 @@ func Update(files []*walk.File) error {
bucket := tx.Bucket([]byte(pathsBucket))

for _, f := range files {
currentInfo, err := os.Stat(f.Path)
if err != nil {
return err
}

if !(f.Info.ModTime() == currentInfo.ModTime() && f.Info.Size() == currentInfo.Size()) {
stats.Add(stats.Formatted, 1)
}

entry := Entry{
Size: currentInfo.Size(),
Modified: currentInfo.ModTime(),
Size: f.Info.Size(),
Modified: f.Info.ModTime(),
}

if err = putEntry(bucket, f.RelPath, &entry); err != nil {
if err := putEntry(bucket, f.RelPath, &entry); err != nil {
return err
}
}
Expand Down
1 change: 1 addition & 0 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Format struct {
globalExcludes []glob.Glob

filesCh chan *walk.File
formattedCh chan *walk.File
processedCh chan *walk.File
}

Expand Down
218 changes: 133 additions & 85 deletions cli/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,97 +148,22 @@ func (f *Format) Run() (err error) {
// we use a multiple of batch size here as a rudimentary concurrency optimization based on the host machine
f.filesCh = make(chan *walk.File, BatchSize*runtime.NumCPU())

// create a channel for files that have been formatted
f.formattedCh = make(chan *walk.File, cap(f.filesCh))

// create a channel for files that have been processed
f.processedCh = make(chan *walk.File, cap(f.filesCh))

// start concurrent processing tasks in reverse order
if !f.NoCache {
eg.Go(f.updateCache(ctx))
}
eg.Go(f.updateCache(ctx))
eg.Go(f.detectFormatted(ctx))
eg.Go(f.applyFormatters(ctx))
eg.Go(f.walkFilesystem(ctx))

// wait for everything to complete
return eg.Wait()
}

func (f *Format) updateCache(ctx context.Context) func() error {
return func() error {
// used to batch updates for more efficient txs
batch := make([]*walk.File, 0, BatchSize)

// apply a batch
processBatch := func() error {
if f.Stdin {
// do nothing
return nil
}
if err := cache.Update(batch); err != nil {
return err
}
batch = batch[:0]
return nil
}

LOOP:
for {
select {
// detect ctx cancellation
case <-ctx.Done():
return ctx.Err()
// respond to processed files
case file, ok := <-f.processedCh:
if !ok {
// channel has been closed, no further files to process
break LOOP
}

if f.Stdin {
// dump file into stdout
f, err := os.Open(file.Path)
if err != nil {
return fmt.Errorf("failed to open %s: %w", file.Path, err)
}
if _, err = io.Copy(os.Stdout, f); err != nil {
return fmt.Errorf("failed to copy %s to stdout: %w", file.Path, err)
}
if err = os.Remove(f.Name()); err != nil {
return fmt.Errorf("failed to remove temp file %s: %w", file.Path, err)
}

stats.Add(stats.Formatted, 1)
continue
}

// append to batch and process if we have enough
batch = append(batch, file)
if len(batch) == BatchSize {
if err := processBatch(); err != nil {
return err
}
}
}
}

// final flush
if err := processBatch(); err != nil {
return err
}

// if fail on change has been enabled, check that no files were actually formatted, throwing an error if so
if f.FailOnChange && stats.Value(stats.Formatted) != 0 {
return ErrFailOnChange
}

// print stats to stdout unless we are processing stdin and printing the results to stdout
if !f.Stdin {
stats.Print()
}

return nil
}
}

func (f *Format) walkFilesystem(ctx context.Context) func() error {
return func() error {
eg, ctx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -368,9 +293,9 @@ func (f *Format) applyFormatters(ctx context.Context) func() error {
}
}

// pass each file to the processed channel
// pass each file to the formatted channel
for _, task := range tasks {
f.processedCh <- task.File
f.formattedCh <- task.File
}

return nil
Expand All @@ -392,7 +317,7 @@ func (f *Format) applyFormatters(ctx context.Context) func() error {
return func() error {
defer func() {
// close processed channel
close(f.processedCh)
close(f.formattedCh)
}()

// iterate the files channel
Expand All @@ -402,7 +327,7 @@ func (f *Format) applyFormatters(ctx context.Context) func() error {
if format.PathMatches(file.RelPath, f.globalExcludes) {
log.Debugf("path matched global excludes: %s", file.RelPath)
// mark it as processed and continue to the next
f.processedCh <- file
f.formattedCh <- file
continue
}

Expand All @@ -421,7 +346,7 @@ func (f *Format) applyFormatters(ctx context.Context) func() error {
}
log.Logf(f.OnUnmatched, "no formatter for path: %s", file.RelPath)
// mark it as processed and continue to the next
f.processedCh <- file
f.formattedCh <- file
} else {
// record the match
stats.Add(stats.Matched, 1)
Expand All @@ -444,6 +369,129 @@ func (f *Format) applyFormatters(ctx context.Context) func() error {
}
}

func (f *Format) detectFormatted(ctx context.Context) func() error {
return func() error {
defer func() {
// close formatted channel
close(f.processedCh)
}()

for {
select {

// detect ctx cancellation
case <-ctx.Done():
return ctx.Err()
// take the next file that has been processed
case file, ok := <-f.formattedCh:
if !ok {
// channel has been closed, no further files to process
return nil
}

// look up current file info
currentInfo, err := os.Stat(file.Path)
if err != nil {
return fmt.Errorf("failed to stat processed file: %w", err)
}

// check if the file has changed
if !(file.Info.ModTime() == currentInfo.ModTime() && file.Info.Size() == currentInfo.Size()) {
// record the change
stats.Add(stats.Formatted, 1)
// log the change for diagnostics
log.Debugf("file has been changed: %s", file.Path)
// update the file info
file.Info = currentInfo
}

// mark as processed
f.processedCh <- file
}
}
}
}

func (f *Format) updateCache(ctx context.Context) func() error {
return func() error {
// used to batch updates for more efficient txs
batch := make([]*walk.File, 0, BatchSize)

// apply a batch
processBatch := func() error {
// pass the batch to the cache for updating
if err := cache.Update(batch); err != nil {
return err
}
batch = batch[:0]
return nil
}

// if we are processing from stdin that means we are outputting to stdout, no caching involved
// if f.NoCache is set that means either the user explicitly disabled the cache or we failed to open on
if f.Stdin || f.NoCache {
// do nothing
processBatch = func() error { return nil }
}

LOOP:
for {
select {
// detect ctx cancellation
case <-ctx.Done():
return ctx.Err()
// respond to formatted files
case file, ok := <-f.processedCh:
if !ok {
// channel has been closed, no further files to process
break LOOP
}

if f.Stdin {
// dump file into stdout
f, err := os.Open(file.Path)
if err != nil {
return fmt.Errorf("failed to open %s: %w", file.Path, err)
}
if _, err = io.Copy(os.Stdout, f); err != nil {
return fmt.Errorf("failed to copy %s to stdout: %w", file.Path, err)
}
if err = os.Remove(f.Name()); err != nil {
return fmt.Errorf("failed to remove temp file %s: %w", file.Path, err)
}

continue
}

// append to batch and process if we have enough
batch = append(batch, file)
if len(batch) == BatchSize {
if err := processBatch(); err != nil {
return err
}
}
}
}

// final flush
if err := processBatch(); err != nil {
return err
}

// if fail on change has been enabled, check that no files were actually formatted, throwing an error if so
if f.FailOnChange && stats.Value(stats.Formatted) != 0 {
return ErrFailOnChange
}

// print stats to stdout unless we are processing stdin and printing the results to stdout
if !f.Stdin {
stats.Print()
}

return nil
}
}

func findUp(searchDir string, fileName string) (path string, dir string, err error) {
for _, dir := range eachDir(searchDir) {
path := filepath.Join(dir, fileName)
Expand Down
5 changes: 5 additions & 0 deletions cli/format_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ func TestFailOnChange(t *testing.T) {
test.WriteConfig(t, configPath, cfg)
_, err := cmd(t, "--fail-on-change", "--config-file", configPath, "--tree-root", tempDir)
as.ErrorIs(err, ErrFailOnChange)

// test with no cache
test.WriteConfig(t, configPath, cfg)
_, err = cmd(t, "--fail-on-change", "--config-file", configPath, "--tree-root", tempDir, "--no-cache")
as.ErrorIs(err, ErrFailOnChange)
}

func TestBustCacheOnFormatterChange(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion format/formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (f *Formatter) Apply(ctx context.Context, tasks []*Task) error {

//

f.log.Infof("%v files processed in %v", len(tasks), time.Since(start))
f.log.Infof("%v file(s) processed in %v", len(tasks), time.Since(start))

return nil
}
Expand Down

0 comments on commit 23e563b

Please sign in to comment.