Skip to content

Commit

Permalink
booster-http: implement IPFS HTTP gateway (#1225)
Browse files Browse the repository at this point in the history
* feat: implement http api gateway

* feat: use go-libipfs lib (instead of copying to extern)

* feat: bump booster-bitswap info minor version

* feat: http gateway metrics

* fix: TestHttpInfo

* feat: by default only serve blocks and CARs, with option to serve original files (jpg, mov etc)
  • Loading branch information
dirkmc committed Apr 5, 2023
1 parent 5b51a00 commit d8825f7
Show file tree
Hide file tree
Showing 23 changed files with 1,382 additions and 137 deletions.
18 changes: 15 additions & 3 deletions cmd/booster-bitswap/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"github.com/filecoin-project/boost/api"
bclient "github.com/filecoin-project/boost/api/client"
cliutil "github.com/filecoin-project/boost/cli/util"
"github.com/filecoin-project/boost/cmd/booster-bitswap/filters"
"github.com/filecoin-project/boost/cmd/booster-bitswap/remoteblockstore"
"github.com/filecoin-project/boost/cmd/lib/filters"
"github.com/filecoin-project/boost/cmd/lib/remoteblockstore"
"github.com/filecoin-project/boost/metrics"
"github.com/filecoin-project/boostd-data/shared/tracing"
"github.com/filecoin-project/go-jsonrpc"
Expand Down Expand Up @@ -137,7 +137,19 @@ var runCmd = &cli.Command{
}
defer bcloser()

remoteStore := remoteblockstore.NewRemoteBlockstore(bapi)
bitswapBlockMetrics := remoteblockstore.BlockMetrics{
GetRequestCount: metrics.BitswapRblsGetRequestCount,
GetFailResponseCount: metrics.BitswapRblsGetFailResponseCount,
GetSuccessResponseCount: metrics.BitswapRblsGetSuccessResponseCount,
BytesSentCount: metrics.BitswapRblsBytesSentCount,
HasRequestCount: metrics.BitswapRblsHasRequestCount,
HasFailResponseCount: metrics.BitswapRblsHasFailResponseCount,
HasSuccessResponseCount: metrics.BitswapRblsHasSuccessResponseCount,
GetSizeRequestCount: metrics.BitswapRblsGetSizeRequestCount,
GetSizeFailResponseCount: metrics.BitswapRblsGetSizeFailResponseCount,
GetSizeSuccessResponseCount: metrics.BitswapRblsGetSizeSuccessResponseCount,
}
remoteStore := remoteblockstore.NewRemoteBlockstore(bapi, bitswapBlockMetrics)
// Create the server API
port := cctx.Int("port")
repoDir, err := homedir.Expand(cctx.String(FlagRepo.Name))
Expand Down
237 changes: 237 additions & 0 deletions cmd/booster-http/blocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
package main

// Copied from
// https://github.com/ipfs/go-libipfs/blob/c76138366f1a7c416c481a89b3e71ed29e81a641/examples/gateway/common/blocks.go

import (
"context"
"errors"
"fmt"
gopath "path"

"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
bsfetcher "github.com/ipfs/go-fetcher/impl/blockservice"
blockstore "github.com/ipfs/go-ipfs-blockstore"
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-libipfs/blocks"
"github.com/ipfs/go-libipfs/files"
"github.com/ipfs/go-merkledag"
"github.com/ipfs/go-namesys"
"github.com/ipfs/go-namesys/resolve"
ipfspath "github.com/ipfs/go-path"
"github.com/ipfs/go-path/resolver"
"github.com/ipfs/go-unixfs"
ufile "github.com/ipfs/go-unixfs/file"
uio "github.com/ipfs/go-unixfs/io"
"github.com/ipfs/go-unixfsnode"
iface "github.com/ipfs/interface-go-ipfs-core"
nsopts "github.com/ipfs/interface-go-ipfs-core/options/namesys"
ifacepath "github.com/ipfs/interface-go-ipfs-core/path"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/schema"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
mc "github.com/multiformats/go-multicodec"
)

type BlocksGateway struct {
blockStore blockstore.Blockstore
blockService blockservice.BlockService
dagService format.DAGService
resolver resolver.Resolver

// Optional routing system to handle /ipns addresses.
namesys namesys.NameSystem
routing routing.ValueStore
}

func NewBlocksGateway(blockService blockservice.BlockService, routing routing.ValueStore) (*BlocksGateway, error) {
// Setup the DAG services, which use the CAR block store.
dagService := merkledag.NewDAGService(blockService)

// Setup the UnixFS resolver.
fetcherConfig := bsfetcher.NewFetcherConfig(blockService)
fetcherConfig.PrototypeChooser = dagpb.AddSupportToChooser(func(lnk ipld.Link, lnkCtx ipld.LinkContext) (ipld.NodePrototype, error) {
if tlnkNd, ok := lnkCtx.LinkNode.(schema.TypedLinkNode); ok {
return tlnkNd.LinkTargetNodePrototype(), nil
}
return basicnode.Prototype.Any, nil
})
fetcher := fetcherConfig.WithReifier(unixfsnode.Reify)
resolver := resolver.NewBasicResolver(fetcher)

// Setup a name system so that we are able to resolve /ipns links.
var (
ns namesys.NameSystem
err error
)
if routing != nil {
ns, err = namesys.NewNameSystem(routing)
if err != nil {
return nil, err
}
}

return &BlocksGateway{
blockStore: blockService.Blockstore(),
blockService: blockService,
dagService: dagService,
resolver: resolver,
routing: routing,
namesys: ns,
}, nil
}

func (api *BlocksGateway) GetUnixFsNode(ctx context.Context, p ifacepath.Resolved) (files.Node, error) {
nd, err := api.resolveNode(ctx, p)
if err != nil {
return nil, err
}

return ufile.NewUnixfsFile(ctx, api.dagService, nd)
}

func (api *BlocksGateway) LsUnixFsDir(ctx context.Context, p ifacepath.Resolved) (<-chan iface.DirEntry, error) {
node, err := api.resolveNode(ctx, p)
if err != nil {
return nil, err
}

dir, err := uio.NewDirectoryFromNode(api.dagService, node)
if err != nil {
return nil, err
}

out := make(chan iface.DirEntry, uio.DefaultShardWidth)

go func() {
defer close(out)
for l := range dir.EnumLinksAsync(ctx) {
select {
case out <- api.processLink(ctx, l):
case <-ctx.Done():
return
}
}
}()

return out, nil
}

func (api *BlocksGateway) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
return api.blockService.GetBlock(ctx, c)
}

func (api *BlocksGateway) GetIPNSRecord(ctx context.Context, c cid.Cid) ([]byte, error) {
if api.routing == nil {
return nil, routing.ErrNotSupported
}

// Fails fast if the CID is not an encoded Libp2p Key, avoids wasteful
// round trips to the remote routing provider.
if mc.Code(c.Type()) != mc.Libp2pKey {
return nil, errors.New("provided cid is not an encoded libp2p key")
}

// The value store expects the key itself to be encoded as a multihash.
id, err := peer.FromCid(c)
if err != nil {
return nil, err
}

return api.routing.GetValue(ctx, "/ipns/"+string(id))
}

func (api *BlocksGateway) GetDNSLinkRecord(ctx context.Context, hostname string) (ifacepath.Path, error) {
if api.namesys != nil {
p, err := api.namesys.Resolve(ctx, "/ipns/"+hostname, nsopts.Depth(1))
if err == namesys.ErrResolveRecursion {
err = nil
}
return ifacepath.New(p.String()), err
}

return nil, errors.New("not implemented")
}

func (api *BlocksGateway) IsCached(ctx context.Context, p ifacepath.Path) bool {
rp, err := api.ResolvePath(ctx, p)
if err != nil {
return false
}

has, _ := api.blockStore.Has(ctx, rp.Cid())
return has
}

func (api *BlocksGateway) ResolvePath(ctx context.Context, p ifacepath.Path) (ifacepath.Resolved, error) {
if _, ok := p.(ifacepath.Resolved); ok {
return p.(ifacepath.Resolved), nil
}

err := p.IsValid()
if err != nil {
return nil, err
}

ipath := ipfspath.Path(p.String())
if ipath.Segments()[0] == "ipns" {
ipath, err = resolve.ResolveIPNS(ctx, api.namesys, ipath)
if err != nil {
return nil, err
}
}

if ipath.Segments()[0] != "ipfs" {
return nil, fmt.Errorf("unsupported path namespace: %s", p.Namespace())
}

node, rest, err := api.resolver.ResolveToLastNode(ctx, ipath)
if err != nil {
return nil, err
}

root, err := cid.Parse(ipath.Segments()[1])
if err != nil {
return nil, err
}

return ifacepath.NewResolvedPath(ipath, node, root, gopath.Join(rest...)), nil
}

func (api *BlocksGateway) resolveNode(ctx context.Context, p ifacepath.Path) (format.Node, error) {
rp, err := api.ResolvePath(ctx, p)
if err != nil {
return nil, err
}

node, err := api.dagService.Get(ctx, rp.Cid())
if err != nil {
return nil, fmt.Errorf("get node: %w", err)
}
return node, nil
}

func (api *BlocksGateway) processLink(ctx context.Context, result unixfs.LinkResult) iface.DirEntry {
if result.Err != nil {
return iface.DirEntry{Err: result.Err}
}

link := iface.DirEntry{
Name: result.Link.Name,
Cid: result.Link.Cid,
}

switch link.Cid.Type() {
case cid.Raw:
link.Type = iface.TFile
link.Size = result.Link.Size
case cid.DagProtobuf:
link.Size = result.Link.Size
}

return link
}
101 changes: 101 additions & 0 deletions cmd/booster-http/gateway_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package main

import (
"fmt"
"github.com/ipfs/go-libipfs/gateway"
"mime"
"net/http"
"strings"
)

type gatewayHandler struct {
gwh http.Handler
supportedFormats map[string]struct{}
}

func newGatewayHandler(gw *BlocksGateway, supportedFormats []string) http.Handler {
headers := map[string][]string{}
gateway.AddAccessControlHeaders(headers)

fmtsMap := make(map[string]struct{}, len(supportedFormats))
for _, f := range supportedFormats {
fmtsMap[f] = struct{}{}
}

return &gatewayHandler{
gwh: gateway.NewHandler(gateway.Config{Headers: headers}, gw),
supportedFormats: fmtsMap,
}
}

func (h *gatewayHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
responseFormat, _, err := customResponseFormat(r)
if err != nil {
webError(w, fmt.Errorf("error while processing the Accept header: %w", err), http.StatusBadRequest)
return
}

if _, ok := h.supportedFormats[responseFormat]; !ok {
if responseFormat == "" {
responseFormat = "unixfs"
}
webError(w, fmt.Errorf("unsupported response format: %s", responseFormat), http.StatusBadRequest)
return
}

h.gwh.ServeHTTP(w, r)
}

func webError(w http.ResponseWriter, err error, code int) {
http.Error(w, err.Error(), code)
}

// Unfortunately this function is not exported from go-libipfs so we need to copy it here.
// return explicit response format if specified in request as query parameter or via Accept HTTP header
func customResponseFormat(r *http.Request) (mediaType string, params map[string]string, err error) {
if formatParam := r.URL.Query().Get("format"); formatParam != "" {
// translate query param to a content type
switch formatParam {
case "raw":
return "application/vnd.ipld.raw", nil, nil
case "car":
return "application/vnd.ipld.car", nil, nil
case "tar":
return "application/x-tar", nil, nil
case "json":
return "application/json", nil, nil
case "cbor":
return "application/cbor", nil, nil
case "dag-json":
return "application/vnd.ipld.dag-json", nil, nil
case "dag-cbor":
return "application/vnd.ipld.dag-cbor", nil, nil
case "ipns-record":
return "application/vnd.ipfs.ipns-record", nil, nil
}
}
// Browsers and other user agents will send Accept header with generic types like:
// Accept:text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8
// We only care about explicit, vendor-specific content-types and respond to the first match (in order).
// TODO: make this RFC compliant and respect weights (eg. return CAR for Accept:application/vnd.ipld.dag-json;q=0.1,application/vnd.ipld.car;q=0.2)
for _, header := range r.Header.Values("Accept") {
for _, value := range strings.Split(header, ",") {
accept := strings.TrimSpace(value)
// respond to the very first matching content type
if strings.HasPrefix(accept, "application/vnd.ipld") ||
strings.HasPrefix(accept, "application/x-tar") ||
strings.HasPrefix(accept, "application/json") ||
strings.HasPrefix(accept, "application/cbor") ||
strings.HasPrefix(accept, "application/vnd.ipfs") {
mediatype, params, err := mime.ParseMediaType(accept)
if err != nil {
return "", nil, err
}
return mediatype, params, nil
}
}
}
// If none of special-cased content types is found, return empty string
// to indicate default, implicit UnixFS response should be prepared
return "", nil, nil
}
Loading

0 comments on commit d8825f7

Please sign in to comment.