Skip to content

Commit

Permalink
Change Process interface into object variable
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: rht <rhtbot@gmail.com>
  • Loading branch information
rht committed Jun 20, 2015
1 parent e943bb3 commit d63cc0d
Show file tree
Hide file tree
Showing 16 changed files with 92 additions and 66 deletions.
2 changes: 1 addition & 1 deletion cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
defer func() {
// We wait for the node to close first, as the node has children
// that it will wait for before closing, such as the API server.
node.Close()
node.Process.Close()

select {
case <-ctx.Context.Done():
Expand Down
4 changes: 2 additions & 2 deletions cmd/ipfs/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func addDefaultAssets(out io.Writer, repoRoot string) error {
if err != nil {
return err
}
defer nd.Close()
defer nd.Process.Close()

dirb := uio.NewDirectory(nd.DAG)

Expand Down Expand Up @@ -218,7 +218,7 @@ func initializeIpnsKeyspace(repoRoot string) error {
if err != nil {
return err
}
defer nd.Close()
defer nd.Process.Close()

err = nd.SetupOfflineRouting()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/ipfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (i *cmdInvocation) close() {
// this is gross, and should be changed when we extract out the exec Context.
if i.node != nil {
log.Info("Shutting down node...")
i.node.Close()
i.node.Process.Close()
}
}

Expand Down
33 changes: 20 additions & 13 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ type IpfsNode struct {

IpnsFs *ipnsfs.Filesystem

goprocess.Process
Process goprocess.Process
ctx context.Context

mode mode
}
Expand All @@ -120,23 +121,24 @@ type Mounts struct {

type ConfigOption func(ctx context.Context) (*IpfsNode, error)

func NewIPFSNode(parent context.Context, option ConfigOption) (*IpfsNode, error) {
procctx := goprocessctx.WithContext(parent)
ctx := parent
func NewIPFSNode(ctx context.Context, option ConfigOption) (*IpfsNode, error) {
node, err := option(ctx)
if err != nil {
return nil, err
}

proc := goprocessctx.WithContext(ctx)
proc.SetTeardown(node.teardown)
node.Process = proc
node.ctx = ctx

success := false // flip to true after all sub-system inits succeed
defer func() {
if !success {
procctx.Close()
proc.Close()
}
}()

node, err := option(ctx)
if err != nil {
return nil, err
}
node.Process = procctx
ctxg.SetTeardown(node.teardown)

// Need to make sure it's perfectly clear 1) which variables are expected
// to be initialized at this point, and 2) which variables will be
// initialized after this point.
Expand Down Expand Up @@ -334,6 +336,11 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost
return nil
}

// Context returns the IpfsNode context
func (n *IpfsNode) Context() context.Context {
return n.ctx
}

// teardown closes owned children. If any errors occur, this function returns
// the first error.
func (n *IpfsNode) teardown() error {
Expand All @@ -360,7 +367,7 @@ func (n *IpfsNode) teardown() error {
}

if dht, ok := n.Routing.(*dht.IpfsDHT); ok {
closers = append(closers, dht)
closers = append(closers, dht.Process())
}

if n.PeerHost != nil {
Expand Down
10 changes: 4 additions & 6 deletions core/corehttp/corehttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
core "github.com/ipfs/go-ipfs/core"
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
)
Expand Down Expand Up @@ -78,20 +79,17 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error
var serverError error
serverExited := make(chan struct{})

node.Children().Add(1)
defer node.Children().Done()

go func() {
node.Process.Go(func(p goprocess.Process) {
serverError = http.Serve(lis, handler)
close(serverExited)
}()
})

// wait for server to exit.
select {
case <-serverExited:

// if node being closed before server exits, close server
case <-node.Closing():
case <-node.Process.Closing():
log.Infof("server at %s terminating...", addr)

lis.Close()
Expand Down
4 changes: 2 additions & 2 deletions core/coreunix/cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (

func Cat(n *core.IpfsNode, pstr string) (io.Reader, error) {
p := path.FromString(pstr)
dagNode, err := n.Resolver.ResolvePath(n.ContextGroup.Context(), p)
dagNode, err := n.Resolver.ResolvePath(n.Context(), p)
if err != nil {
return nil, err
}
return uio.NewDagReader(n.ContextGroup.Context(), dagNode, n.DAG)
return uio.NewDagReader(n.Context(), dagNode, n.DAG)
}
2 changes: 1 addition & 1 deletion fuse/ipns/mount_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ func Mount(ipfs *core.IpfsNode, ipnsmp, ipfsmp string) (mount.Mount, error) {
return nil, err
}

return mount.NewMount(ipfs, fsys, ipnsmp, allow_other)
return mount.NewMount(ipfs.Process, fsys, ipnsmp, allow_other)
}
2 changes: 1 addition & 1 deletion fuse/readonly/mount_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ func Mount(ipfs *core.IpfsNode, mountpoint string) (mount.Mount, error) {
cfg := ipfs.Repo.Config()
allow_other := cfg.Mounts.FuseAllowOther
fsys := NewFileSystem(ipfs)
return mount.NewMount(ipfs, fsys, mountpoint, allow_other)
return mount.NewMount(ipfs.Process, fsys, mountpoint, allow_other)
}
10 changes: 6 additions & 4 deletions p2p/net/mock/mock_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type mocknet struct {
linkDefaults LinkOptions

proc goprocess.Process // for Context closing
ctx context.Context
sync.RWMutex
}

Expand All @@ -42,6 +43,7 @@ func New(ctx context.Context) Mocknet {
hosts: map[peer.ID]*bhost.BasicHost{},
links: map[peer.ID]map[peer.ID]map[*link]struct{}{},
proc: goprocessctx.WithContext(ctx),
ctx: ctx,
}
}

Expand All @@ -62,15 +64,15 @@ func (mn *mocknet) GenPeer() (host.Host, error) {
}

func (mn *mocknet) AddPeer(k ic.PrivKey, a ma.Multiaddr) (host.Host, error) {
n, err := newPeernet(mn.cg.Context(), mn, k, a)
n, err := newPeernet(mn.ctx, mn, k, a)
if err != nil {
return nil, err
}

h := bhost.New(n)
log.Debugf("mocknet added listen addr for peer: %s -- %s", n.LocalPeer(), a)

mn.cg.AddChild(n.cg)
mn.proc.AddChild(n.proc)

mn.Lock()
mn.nets[n.peer] = n
Expand Down Expand Up @@ -298,11 +300,11 @@ func (mn *mocknet) ConnectAll() error {
}

func (mn *mocknet) ConnectPeers(a, b peer.ID) (inet.Conn, error) {
return mn.Net(a).DialPeer(mn.cg.Context(), b)
return mn.Net(a).DialPeer(mn.ctx, b)
}

func (mn *mocknet) ConnectNets(a, b inet.Network) (inet.Conn, error) {
return a.DialPeer(mn.cg.Context(), b.LocalPeer())
return a.DialPeer(mn.ctx, b.LocalPeer())
}

func (mn *mocknet) DisconnectPeers(p1, p2 peer.ID) error {
Expand Down
4 changes: 2 additions & 2 deletions p2p/net/mock/mock_peernet.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func newPeernet(ctx context.Context, m *mocknet, k ic.PrivKey,
notifs: make(map[inet.Notifiee]struct{}),
}

n.cg.SetTeardown(n.teardown)
n.proc.SetTeardown(n.teardown)
return n, nil
}

Expand Down Expand Up @@ -94,7 +94,7 @@ func (pn *peernet) allConns() []*conn {

// Close calls the ContextCloser func
func (pn *peernet) Close() error {
return pn.cg.Close()
return pn.proc.Close()
}

func (pn *peernet) Peerstore() peer.Peerstore {
Expand Down
7 changes: 7 additions & 0 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Swarm struct {
notifs map[inet.Notifiee]ps.Notifiee

proc goprocess.Process
ctx context.Context
bwc metrics.Reporter
}

Expand All @@ -69,6 +70,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
local: local,
peers: peers,
proc: goprocessctx.WithContext(ctx),
ctx: ctx,
dialT: DialTimeout,
notifs: make(map[inet.Notifiee]ps.Notifiee),
bwc: bwc,
Expand Down Expand Up @@ -110,6 +112,11 @@ func (s *Swarm) Process() goprocess.Process {
return s.proc
}

// Context returns the context of the swarm
func (s *Swarm) Context() context.Context {
return s.ctx
}

// Close stops the Swarm.
func (s *Swarm) Close() error {
return s.proc.Close()
Expand Down
4 changes: 2 additions & 2 deletions p2p/net/swarm/swarm_listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.")
}
log.Debugf("Swarm Listening at %s", maddr)
list, err := conn.Listen(s.cg.Context(), maddr, s.local, sk)
list, err := conn.Listen(s.Context(), maddr, s.local, sk)
if err != nil {
return err
}
Expand Down Expand Up @@ -110,7 +110,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
return
}
}
}(s.cg.Context(), sl)
}(s.Context(), sl)

return nil
}
Expand Down
34 changes: 23 additions & 11 deletions routing/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ type IpfsDHT struct {

Validator record.Validator // record validator funcs

Context context.Context
goprocess.Process
ctx context.Context
proc goprocess.Process
}

// NewDHT creates a new DHT object with the given peer as the 'local' host
Expand All @@ -73,18 +73,18 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip
// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))

procctx = goprocessctx.WithContext(ctx)
procctx.SetTeardown(func() error {
proc := goprocessctx.WithContext(ctx)
proc.SetTeardown(func() error {
// remove ourselves from network notifs.
dht.host.Network().StopNotify((*netNotifiee)(dht))
return nil
})
dht.Process = procctx
dht.Context = ctx
dht.proc = proc
dht.ctx = ctx

h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
dht.providers = NewProviderManager(dht.Context, dht.self)
dht.AddChild(dht.providers)
dht.providers = NewProviderManager(dht.ctx, dht.self)
dht.proc.AddChild(dht.providers.proc)

dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(dht.self), time.Minute, dht.peerstore)
dht.birth = time.Now()
Expand All @@ -93,7 +93,9 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip
dht.Validator["pk"] = record.PublicKeyValidator

if doPinging {
dht.Go(func() { dht.PingRoutine(time.Second * 10) })
dht.proc.Go(func(p goprocess.Process) {
dht.PingRoutine(time.Second * 10)
})
}
return dht
}
Expand Down Expand Up @@ -360,15 +362,25 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) {
rand.Read(id)
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.Key(id)), 5)
for _, p := range peers {
ctx, cancel := context.WithTimeout(dht.Context, time.Second*5)
ctx, cancel := context.WithTimeout(dht.Context(), time.Second*5)
_, err := dht.Ping(ctx, p)
if err != nil {
log.Debugf("Ping error: %s", err)
}
cancel()
}
case <-dht.Closing():
case <-dht.proc.Closing():
return
}
}
}

// Context return dht's context
func (dht *IpfsDHT) Context() context.Context {
return dht.ctx
}

// Process return dht's process
func (dht *IpfsDHT) Process() goprocess.Process {
return dht.proc
}
Loading

0 comments on commit d63cc0d

Please sign in to comment.