diff --git a/cmd/booster-http/http_test.go b/cmd/booster-http/http_test.go index efe4c151c..54e7b7feb 100644 --- a/cmd/booster-http/http_test.go +++ b/cmd/booster-http/http_test.go @@ -52,10 +52,8 @@ func TestHttpGzipResponse(t *testing.T) { defer f.Close() //Create CID - var cids []cid.Cid cid, err := cid.Parse("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi") require.NoError(t, err) - cids = append(cids, cid) // Crate pieceInfo deal := piecestore.DealInfo{ @@ -73,12 +71,11 @@ func TestHttpGzipResponse(t *testing.T) { mockHttpServer.EXPECT().UnsealSectorAt(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(f, nil) mockHttpServer.EXPECT().IsUnsealed(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(true, nil) - mockHttpServer.EXPECT().PiecesContainingMultihash(gomock.Any(), gomock.Any()).AnyTimes().Return(cids, nil) mockHttpServer.EXPECT().GetPieceInfo(gomock.Any()).AnyTimes().Return(&pieceInfo, nil) //Create a client and make request with Encoding header client := new(http.Client) - request, err := http.NewRequest("GET", "http://localhost:7777/piece?payloadCid=bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi&format=piece", nil) + request, err := http.NewRequest("GET", "http://localhost:7777/piece/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi", nil) require.NoError(t, err) request.Header.Add("Accept-Encoding", "gzip") @@ -103,61 +100,6 @@ func TestHttpGzipResponse(t *testing.T) { require.NoError(t, err) } -func TestHttpResponseRedirects(t *testing.T) { - - // Create a new mock Http server with custom functions - ctrl := gomock.NewController(t) - mockHttpServer := mocks_booster_http.NewMockHttpServerApi(ctrl) - httpServer := NewHttpServer("", 7777, false, mockHttpServer) - httpServer.Start(context.Background()) - - // Create mock unsealed file for piece/car - f, _ := os.Open(testFile) - defer f.Close() - - //Create CID - var cids []cid.Cid - cid, err := cid.Parse("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi") - require.NoError(t, err) - cids = append(cids, cid) - - // Crate pieceInfo - deal := piecestore.DealInfo{ - DealID: 1234567, - SectorID: 0, - Offset: 1233, - Length: 123, - } - var deals []piecestore.DealInfo - - pieceInfo := piecestore.PieceInfo{ - PieceCID: cid, - Deals: append(deals, deal), - } - - mockHttpServer.EXPECT().UnsealSectorAt(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(f, nil) - mockHttpServer.EXPECT().IsUnsealed(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(true, nil) - mockHttpServer.EXPECT().PiecesContainingMultihash(gomock.Any(), gomock.Any()).AnyTimes().Return(cids, nil) - mockHttpServer.EXPECT().GetPieceInfo(gomock.Any()).AnyTimes().Return(&pieceInfo, nil) - - // Create a client with check against redirects - client := &http.Client{ - CheckRedirect: func(req *http.Request, via []*http.Request) error { - return http.ErrUseLastResponse - }, - } - request, err := http.NewRequest("GET", "http://localhost:7777/piece?payloadCid=bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi&format=piece", nil) - require.NoError(t, err) - - response, err := client.Do(request) - require.NoError(t, err) - require.Equal(t, 200, response.StatusCode) - - // Stop the server - err = httpServer.Stop() - require.NoError(t, err) -} - func TestHttpInfo(t *testing.T) { var v apiVersion diff --git a/cmd/booster-http/mocks/mock_booster_http.go b/cmd/booster-http/mocks/mock_booster_http.go index 46f224ed5..7b3e914b1 100644 --- a/cmd/booster-http/mocks/mock_booster_http.go +++ b/cmd/booster-http/mocks/mock_booster_http.go @@ -13,7 +13,6 @@ import ( abi "github.com/filecoin-project/go-state-types/abi" gomock "github.com/golang/mock/gomock" cid "github.com/ipfs/go-cid" - multihash "github.com/multiformats/go-multihash" ) // MockHttpServerApi is a mock of HttpServerApi interface. @@ -39,21 +38,6 @@ func (m *MockHttpServerApi) EXPECT() *MockHttpServerApiMockRecorder { return m.recorder } -// GetMaxPieceOffset mocks base method. -func (m *MockHttpServerApi) GetMaxPieceOffset(pieceCid cid.Cid) (uint64, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetMaxPieceOffset", pieceCid) - ret0, _ := ret[0].(uint64) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetMaxPieceOffset indicates an expected call of GetMaxPieceOffset. -func (mr *MockHttpServerApiMockRecorder) GetMaxPieceOffset(pieceCid interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMaxPieceOffset", reflect.TypeOf((*MockHttpServerApi)(nil).GetMaxPieceOffset), pieceCid) -} - // GetPieceInfo mocks base method. func (m *MockHttpServerApi) GetPieceInfo(pieceCID cid.Cid) (*piecestore.PieceInfo, error) { m.ctrl.T.Helper() @@ -84,21 +68,6 @@ func (mr *MockHttpServerApiMockRecorder) IsUnsealed(ctx, sectorID, offset, lengt return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnsealed", reflect.TypeOf((*MockHttpServerApi)(nil).IsUnsealed), ctx, sectorID, offset, length) } -// PiecesContainingMultihash mocks base method. -func (m *MockHttpServerApi) PiecesContainingMultihash(ctx context.Context, mh multihash.Multihash) ([]cid.Cid, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PiecesContainingMultihash", ctx, mh) - ret0, _ := ret[0].([]cid.Cid) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// PiecesContainingMultihash indicates an expected call of PiecesContainingMultihash. -func (mr *MockHttpServerApiMockRecorder) PiecesContainingMultihash(ctx, mh interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PiecesContainingMultihash", reflect.TypeOf((*MockHttpServerApi)(nil).PiecesContainingMultihash), ctx, mh) -} - // UnsealSectorAt mocks base method. func (m *MockHttpServerApi) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset, length abi.UnpaddedPieceSize) (mount.Reader, error) { m.ctrl.T.Helper() diff --git a/cmd/booster-http/run.go b/cmd/booster-http/run.go index f9f336cd9..91b3d9500 100644 --- a/cmd/booster-http/run.go +++ b/cmd/booster-http/run.go @@ -29,7 +29,6 @@ import ( "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sealer" "github.com/ipfs/go-cid" - "github.com/multiformats/go-multihash" "github.com/urfave/cli/v2" ) @@ -227,14 +226,6 @@ type serverApi struct { var _ HttpServerApi = (*serverApi)(nil) -func (s serverApi) PiecesContainingMultihash(ctx context.Context, mh multihash.Multihash) ([]cid.Cid, error) { - return s.bapi.BoostDagstorePiecesContainingMultihash(ctx, mh) -} - -func (s serverApi) GetMaxPieceOffset(pieceCid cid.Cid) (uint64, error) { - return s.bapi.PiecesGetMaxOffset(s.ctx, pieceCid) -} - func (s serverApi) GetPieceInfo(pieceCID cid.Cid) (*piecestore.PieceInfo, error) { return s.bapi.PiecesGetPieceInfo(s.ctx, pieceCID) } diff --git a/cmd/booster-http/server.go b/cmd/booster-http/server.go index ca7b69d44..5271a3265 100644 --- a/cmd/booster-http/server.go +++ b/cmd/booster-http/server.go @@ -1,7 +1,6 @@ package main import ( - "bufio" "context" "encoding/json" "errors" @@ -9,7 +8,6 @@ import ( "io" "net" "net/http" - "net/url" "strings" "time" @@ -24,10 +22,6 @@ import ( "github.com/hashicorp/go-multierror" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" - "github.com/ipld/go-car/v2" - "github.com/ipld/go-car/v2/index" - "github.com/multiformats/go-multihash" - "github.com/multiformats/go-varint" "go.opencensus.io/stats" ) @@ -40,10 +34,6 @@ var ErrNotFound = errors.New("not found") // non-zero last modified time. var lastModified = time.UnixMilli(1) -const carSuffix = ".car" -const pieceCidParam = "pieceCid" -const payloadCidParam = "payloadCid" - type apiVersion struct { Version string `json:"Version"` } @@ -60,8 +50,6 @@ type HttpServer struct { } type HttpServerApi interface { - PiecesContainingMultihash(ctx context.Context, mh multihash.Multihash) ([]cid.Cid, error) - GetMaxPieceOffset(pieceCid cid.Cid) (uint64, error) GetPieceInfo(pieceCID cid.Cid) (*piecestore.PieceInfo, error) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error) @@ -72,7 +60,7 @@ func NewHttpServer(path string, port int, allowIndexing bool, api HttpServerApi) } func (s *HttpServer) pieceBasePath() string { - return s.path + "/piece" + return s.path + "/piece/" } func (s *HttpServer) Start(ctx context.Context) { @@ -80,7 +68,7 @@ func (s *HttpServer) Start(ctx context.Context) { listenAddr := fmt.Sprintf(":%d", s.port) handler := http.NewServeMux() - handler.HandleFunc(s.pieceBasePath(), s.handlePieceRequest) + handler.HandleFunc(s.pieceBasePath(), s.handleByPieceCid) handler.HandleFunc("/", s.handleIndex) handler.HandleFunc("/index.html", s.handleIndex) handler.HandleFunc("/info", s.handleInfo) @@ -116,36 +104,11 @@ const idxPage = ` - Download a raw piece by payload CID - - - /piece?payloadCid=&format=piece - - - - - Download a CAR file by payload CID - - - /piece?payloadCid=&format=car - - - - - Download a raw piece by piece CID - - - /piece?pieceCid=&format=piece - - - - - Download a CAR file by piece CID + Download a raw piece by its piece CID - /piece?payloadCid=&format=car + /piece/ - @@ -157,54 +120,6 @@ func (s *HttpServer) handleIndex(w http.ResponseWriter, r *http.Request) { w.Write([]byte(idxPage)) //nolint:errcheck } -func (s *HttpServer) handlePieceRequest(w http.ResponseWriter, r *http.Request) { - q, err := url.ParseQuery(r.URL.RawQuery) - if err != nil { - msg := fmt.Sprintf("parsing query: %s", err.Error()) - writeError(w, r, http.StatusBadRequest, msg) - return - } - - isCar := false - - if len(q["format"]) == 1 { - if q["format"][0] == "car" { // Check if format value is car - isCar = true - } else if q["format"][0] != "piece" { // Check if format value is not piece - writeError(w, r, http.StatusBadRequest, "incorrect `format` query parameter") - return - } - } else if len(q["format"]) == 0 { - isCar = false - } else { // Error if more than 1 format value - writeError(w, r, http.StatusBadRequest, "single `format` query parameter is allowed") - return - } - - // Check provided cid and format and redirect the request appropriately - if len(q[payloadCidParam]) == 1 { - payloadCid, err := cid.Parse(q[payloadCidParam][0]) - if err != nil { - msg := fmt.Sprintf("parsing payload CID '%s': %s", q[payloadCidParam][0], err.Error()) - writeError(w, r, http.StatusBadRequest, msg) - stats.Record(r.Context(), metrics.HttpPayloadByCidRequestCount.M(1)) - return - } - s.handleByPayloadCid(payloadCid, isCar, w, r) - } else if len(q[pieceCidParam]) == 1 { - pieceCid, err := cid.Parse(q[pieceCidParam][0]) - if err != nil { - msg := fmt.Sprintf("parsing piece CID '%s': %s", q[pieceCidParam][0], err.Error()) - writeError(w, r, http.StatusBadRequest, msg) - stats.Record(r.Context(), metrics.HttpPieceByCidRequestCount.M(1)) - return - } - s.handleByPieceCid(pieceCid, isCar, w, r) - } else { - writeError(w, r, http.StatusBadRequest, "unsupported query") - } -} - func (s *HttpServer) handleInfo(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -214,74 +129,32 @@ func (s *HttpServer) handleInfo(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(v) //nolint:errcheck } -func (s *HttpServer) handleByPayloadCid(payloadCid cid.Cid, isCar bool, w http.ResponseWriter, r *http.Request) { +func (s *HttpServer) handleByPieceCid(w http.ResponseWriter, r *http.Request) { startTime := time.Now() - ctx, span := tracing.Tracer.Start(r.Context(), "http.payload_cid") + ctx, span := tracing.Tracer.Start(r.Context(), "http.piece_cid") defer span.End() + stats.Record(ctx, metrics.HttpPieceByCidRequestCount.M(1)) - stats.Record(ctx, metrics.HttpPayloadByCidRequestCount.M(1)) - - // Find all the pieces that contain the payload cid - pieces, err := s.api.PiecesContainingMultihash(ctx, payloadCid.Hash()) - if err != nil { - if isNotFoundError(err) { - msg := fmt.Sprintf("getting piece that contains payload CID '%s': %s", payloadCid, err.Error()) - writeError(w, r, http.StatusNotFound, msg) - stats.Record(ctx, metrics.HttpPayloadByCid404ResponseCount.M(1)) - return - } - log.Errorf("getting piece that contains payload CID '%s': %s", payloadCid, err) - msg := fmt.Sprintf("server error getting piece that contains payload CID '%s'", payloadCid) - writeError(w, r, http.StatusInternalServerError, msg) - stats.Record(ctx, metrics.HttpPayloadByCid500ResponseCount.M(1)) + // Remove the path up to the piece cid + prefixLen := len(s.pieceBasePath()) + if len(r.URL.Path) <= prefixLen { + msg := fmt.Sprintf("path '%s' is missing piece CID", r.URL.Path) + writeError(w, r, http.StatusBadRequest, msg) + stats.Record(ctx, metrics.HttpPieceByCid400ResponseCount.M(1)) return } - // Just get the content of the first piece returned (if the client wants a - // different piece they can just call the /piece endpoint) - pieceCid := pieces[0] - content, err := s.getPieceContent(ctx, pieceCid) - if err == nil && isCar { - content, err = s.getCarContent(pieceCid, content) - } + pieceCidStr := r.URL.Path[prefixLen:] + pieceCid, err := cid.Parse(pieceCidStr) if err != nil { - if isNotFoundError(err) { - msg := fmt.Sprintf("getting content for payload CID %s in piece %s: %s", payloadCid, pieceCid, err) - writeError(w, r, http.StatusNotFound, msg) - stats.Record(ctx, metrics.HttpPayloadByCid404ResponseCount.M(1)) - return - } - log.Errorf("getting content for payload CID %s in piece %s: %s", payloadCid, pieceCid, err) - msg := fmt.Sprintf("server error getting content for payload CID %s in piece %s", payloadCid, pieceCid) - writeError(w, r, http.StatusInternalServerError, msg) - stats.Record(ctx, metrics.HttpPayloadByCid500ResponseCount.M(1)) + msg := fmt.Sprintf("parsing piece CID '%s': %s", pieceCidStr, err.Error()) + writeError(w, r, http.StatusBadRequest, msg) + stats.Record(ctx, metrics.HttpPieceByCid400ResponseCount.M(1)) return } - // Set an Etag based on the piece cid - etag := pieceCid.String() - if isCar { - etag += carSuffix - } - w.Header().Set("Etag", etag) - - serveContent(w, r, content, getContentType(isCar)) - - stats.Record(ctx, metrics.HttpPayloadByCid200ResponseCount.M(1)) - stats.Record(ctx, metrics.HttpPayloadByCidRequestDuration.M(float64(time.Since(startTime).Milliseconds()))) -} - -func (s *HttpServer) handleByPieceCid(pieceCid cid.Cid, isCar bool, w http.ResponseWriter, r *http.Request) { - startTime := time.Now() - ctx, span := tracing.Tracer.Start(r.Context(), "http.piece_cid") - defer span.End() - stats.Record(ctx, metrics.HttpPieceByCidRequestCount.M(1)) - // Get a reader over the piece content, err := s.getPieceContent(ctx, pieceCid) - if err == nil && isCar { - content, err = s.getCarContent(pieceCid, content) - } if err != nil { if isNotFoundError(err) { writeError(w, r, http.StatusNotFound, err.Error()) @@ -297,28 +170,18 @@ func (s *HttpServer) handleByPieceCid(pieceCid cid.Cid, isCar bool, w http.Respo // Set an Etag based on the piece cid etag := pieceCid.String() - if isCar { - etag += carSuffix - } w.Header().Set("Etag", etag) - serveContent(w, r, content, getContentType(isCar)) + serveContent(w, r, content) stats.Record(ctx, metrics.HttpPieceByCid200ResponseCount.M(1)) stats.Record(ctx, metrics.HttpPieceByCidRequestDuration.M(float64(time.Since(startTime).Milliseconds()))) } -func getContentType(isCar bool) string { - if isCar { - return "application/vnd.ipld.car" - } - return "application/piece" -} - -func serveContent(w http.ResponseWriter, r *http.Request, content io.ReadSeeker, contentType string) { +func serveContent(w http.ResponseWriter, r *http.Request, content io.ReadSeeker) { // Set the Content-Type header explicitly so that http.ServeContent doesn't // try to do it implicitly - w.Header().Set("Content-Type", contentType) + w.Header().Set("Content-Type", "application/piece") var writer http.ResponseWriter @@ -417,90 +280,6 @@ func (s *HttpServer) getPieceContent(ctx context.Context, pieceCid cid.Cid) (io. return pieceReader, nil } -func (s *HttpServer) getCarContent(pieceCid cid.Cid, pieceReader io.ReadSeeker) (io.ReadSeeker, error) { - maxOffset, err := s.api.GetMaxPieceOffset(pieceCid) - if err != nil { - if s.allowIndexing { - // If it's not possible to get the max piece offset it may be because - // the CAR file hasn't been indexed yet. So try to index it in real time. - alog("%s\tbuilding index for %s", color.New(color.FgBlue).Sprintf("INFO"), pieceCid) - maxOffset, err = getMaxPieceOffset(pieceReader) - } - if err != nil { - return nil, fmt.Errorf("getting max offset for piece %s: %w", pieceCid, err) - } - } - - // Seek to the max offset - _, err = pieceReader.Seek(int64(maxOffset), io.SeekStart) - if err != nil { - return nil, fmt.Errorf("seeking to offset %d in piece data: %w", maxOffset, err) - } - - // A section consists of - // - - // Get - cr := &countReader{r: bufio.NewReader(pieceReader)} - dataLength, err := varint.ReadUvarint(cr) - if err != nil { - return nil, fmt.Errorf("reading CAR section length: %w", err) - } - - // The number of bytes in the uvarint that records - dataLengthUvarSize := cr.count - - // Get the size of the (unpadded) CAR file - unpaddedCarSize := maxOffset + dataLengthUvarSize + dataLength - - // Seek to the end of the CAR to get its (padded) size - paddedCarSize, err := pieceReader.Seek(0, io.SeekEnd) - if err != nil { - return nil, fmt.Errorf("seeking to end of CAR: %w", err) - } - - // Seek back to the start of the CAR - _, err = pieceReader.Seek(0, io.SeekStart) - if err != nil { - return nil, fmt.Errorf("seeking to start of CAR: %w", err) - } - - lr := &limitSeekReader{ - Reader: io.LimitReader(pieceReader, int64(unpaddedCarSize)), - readSeeker: pieceReader, - unpaddedSize: int64(unpaddedCarSize), - paddedSize: paddedCarSize, - } - return lr, nil -} - -// getMaxPieceOffset generates a CAR file index from the reader, and returns -// the maximum offset in the index -func getMaxPieceOffset(reader io.ReadSeeker) (uint64, error) { - idx, err := car.GenerateIndex(reader, car.ZeroLengthSectionAsEOF(true), car.StoreIdentityCIDs(true)) - if err != nil { - return 0, fmt.Errorf("generating CAR index: %w", err) - } - - itidx, ok := idx.(index.IterableIndex) - if !ok { - return 0, fmt.Errorf("could not cast CAR file index %t to an IterableIndex", idx) - } - - var maxOffset uint64 - err = itidx.ForEach(func(m multihash.Multihash, offset uint64) error { - if offset > maxOffset { - maxOffset = offset - } - return nil - }) - if err != nil { - return 0, fmt.Errorf("getting max offset: %w", err) - } - - return maxOffset, nil -} - type limitSeekReader struct { io.Reader readSeeker io.ReadSeeker @@ -582,20 +361,6 @@ func (w *writeErrorWatcher) Write(bz []byte) (int, error) { return count, err } -// countReader just counts the number of bytes read -type countReader struct { - r *bufio.Reader - count uint64 -} - -func (c *countReader) ReadByte() (byte, error) { - b, err := c.r.ReadByte() - if err == nil { - c.count++ - } - return b, err -} - const timeFmt = "2006-01-02T15:04:05.000Z0700" func alog(l string, args ...interface{}) { diff --git a/go.mod b/go.mod index ed885ee5c..7e51bc339 100644 --- a/go.mod +++ b/go.mod @@ -90,7 +90,6 @@ require ( github.com/multiformats/go-multiaddr v0.6.0 github.com/multiformats/go-multibase v0.1.1 github.com/multiformats/go-multihash v0.2.1 - github.com/multiformats/go-varint v0.0.6 github.com/open-rpc/meta-schema v0.0.0-20201029221707-1b72ef2ea333 github.com/pressly/goose/v3 v3.5.3 github.com/prometheus/client_golang v1.12.1 @@ -284,6 +283,7 @@ require ( github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect github.com/multiformats/go-multicodec v0.5.0 // indirect github.com/multiformats/go-multistream v0.3.3 // indirect + github.com/multiformats/go-varint v0.0.6 // indirect github.com/nikkolasg/hexjson v0.0.0-20181101101858-78e39397e00c // indirect github.com/nkovacs/streamquote v1.0.0 // indirect github.com/nxadm/tail v1.4.8 // indirect