Skip to content

Commit

Permalink
Add reconnect logic for stdio pipes
Browse files Browse the repository at this point in the history
This change adds retry logic on the stdio relay if the server end of the named pipe
disconnects. This is a common case if containerd restarts for example.
The current approach is to make a io.Writer wrapper that handles the
reconnection logic on a write failure if it can be determined that the error
is from a disconnect.

This exposes a new shim config option to tailor the retry timeout as well as
an annotation so it can be set per container.

Signed-off-by: Daniel Canter <dcanter@microsoft.com>
  • Loading branch information
dcantah committed Oct 17, 2021
1 parent 1b1197b commit 34e118f
Show file tree
Hide file tree
Showing 18 changed files with 516 additions and 252 deletions.
155 changes: 95 additions & 60 deletions cmd/containerd-shim-runhcs-v1/options/runhcs.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions cmd/containerd-shim-runhcs-v1/options/runhcs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ message Options {
// logrus log levels: "trace", "debug", "info", "warn", "error", "fatal", "panic". This setting will override
// the `debug` field if both are specified, unless the level specified is also "debug", as these are equivalent.
string log_level = 16;

// io_retry_timeout_in_sec is the timeout in seconds for how long to try and reconnect to an upstream IO provider if a connection is lost.
// The typical example is if Containerd has restarted but is expected to come back online.
int32 IO_retry_timeout_in_sec = 17;
}

// ProcessDetails contains additional information about a process. This is the additional
Expand Down
1 change: 0 additions & 1 deletion cmd/containerd-shim-runhcs-v1/service_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ func (s *service) startInternal(ctx context.Context, req *task.StartRequest) (*t

func (s *service) deleteInternal(ctx context.Context, req *task.DeleteRequest) (*task.DeleteResponse, error) {
// TODO: JTERRY75 we need to send this to the POD for isSandbox

t, err := s.getTask(req.ID)
if err != nil {
return nil, err
Expand Down
55 changes: 38 additions & 17 deletions cmd/containerd-shim-runhcs-v1/task_hcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -165,11 +166,6 @@ func newHcsTask(
owner := filepath.Base(os.Args[0])
isTemplate := oci.ParseAnnotationsSaveAsTemplate(ctx, s)

io, err := cmd.NewUpstreamIO(ctx, req.ID, req.Stdout, req.Stderr, req.Stdin, req.Terminal)
if err != nil {
return nil, err
}

var netNS string
if s.Windows != nil &&
s.Windows.Network != nil {
Expand All @@ -185,22 +181,36 @@ func newHcsTask(
shimOpts = v.(*runhcsopts.Options)
}

var ioRetryTimeoutInSec time.Duration
if timeoutStr := s.Annotations[oci.AnnotationIORetryTimeoutInSec]; timeoutStr != "" {
timeout, err := strconv.Atoi(timeoutStr)
if err != nil {
return nil, fmt.Errorf("failed to parse IO timeout setting: %w", err)
}
ioRetryTimeoutInSec = time.Duration(timeout)
}
io, err := cmd.NewUpstreamIO(ctx, req.ID, req.Stdout, req.Stderr, req.Stdin, req.Terminal, ioRetryTimeoutInSec)
if err != nil {
return nil, err
}

container, resources, err := createContainer(ctx, req.ID, owner, netNS, s, parent, shimOpts)
if err != nil {
return nil, err
}

ht := &hcsTask{
events: events,
id: req.ID,
isWCOW: oci.IsWCOW(s),
c: container,
cr: resources,
ownsHost: ownsParent,
host: parent,
closed: make(chan struct{}),
taskSpec: s,
isTemplate: isTemplate,
events: events,
id: req.ID,
isWCOW: oci.IsWCOW(s),
c: container,
cr: resources,
ownsHost: ownsParent,
host: parent,
closed: make(chan struct{}),
taskSpec: s,
isTemplate: isTemplate,
ioRetryTimeoutInSec: ioRetryTimeoutInSec,
}
ht.init = newHcsExec(
ctx,
Expand Down Expand Up @@ -278,7 +288,15 @@ func newClonedHcsTask(
return nil, fmt.Errorf("cloned task can only be created inside a windows host")
}

io, err := cmd.NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal)
var ioRetryTimeoutInSec time.Duration
if timeoutStr := s.Annotations[oci.AnnotationIORetryTimeoutInSec]; timeoutStr != "" {
timeout, err := strconv.Atoi(timeoutStr)
if err != nil {
return nil, fmt.Errorf("failed to parse IO timeout setting: %w", err)
}
ioRetryTimeoutInSec = time.Duration(timeout)
}
io, err := cmd.NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal, ioRetryTimeoutInSec)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -432,6 +450,9 @@ type hcsTask struct {

// taskSpec represents the spec/configuration for this task.
taskSpec *specs.Spec

// ioRetryTimeout is the time for how long to try reconnecting to stdio pipes from containerd.
ioRetryTimeoutInSec time.Duration
}

func (ht *hcsTask) ID() string {
Expand All @@ -452,7 +473,7 @@ func (ht *hcsTask) CreateExec(ctx context.Context, req *task.ExecProcessRequest,
return errors.Wrapf(errdefs.ErrFailedPrecondition, "exec: '' in task: '%s' must be running to create additional execs", ht.id)
}

io, err := cmd.NewUpstreamIO(ctx, req.ID, req.Stdout, req.Stderr, req.Stdin, req.Terminal)
io, err := cmd.NewUpstreamIO(ctx, req.ID, req.Stdout, req.Stderr, req.Stdin, req.Terminal, ht.ioRetryTimeoutInSec)
if err != nil {
return err
}
Expand Down
18 changes: 0 additions & 18 deletions internal/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,24 +131,6 @@ func CommandContext(ctx context.Context, host cow.ProcessHost, name string, arg
return cmd
}

// relayIO is a glorified io.Copy that also logs when the copy has completed.
func relayIO(w io.Writer, r io.Reader, log *logrus.Entry, name string) (int64, error) {
n, err := io.Copy(w, r)
if log != nil {
lvl := logrus.DebugLevel
log = log.WithFields(logrus.Fields{
"file": name,
"bytes": n,
})
if err != nil {
lvl = logrus.ErrorLevel
log = log.WithError(err)
}
log.Log(lvl, "Cmd IO relay complete")
}
return n, err
}

// Start starts a command. The caller must ensure that if Start succeeds,
// Wait is eventually called to clean up resources.
func (c *Cmd) Start() error {
Expand Down
4 changes: 2 additions & 2 deletions internal/cmd/diag.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func ExecInUvm(ctx context.Context, vm *uvm.UtilityVM, req *CmdProcessRequest) (
if len(req.Args) == 0 {
return 0, errors.New("missing command")
}
np, err := NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal)
np, err := NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal, 0)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -46,7 +46,7 @@ func ExecInShimHost(ctx context.Context, req *CmdProcessRequest) (int, error) {
if len(req.Args) > 1 {
cmdArgsWithoutName = req.Args[1:]
}
np, err := NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal)
np, err := NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal, 0)
if err != nil {
return 0, err
}
Expand Down
Loading

0 comments on commit 34e118f

Please sign in to comment.