Skip to content

Commit

Permalink
Transcode stream refactor (stashapp#609)
Browse files Browse the repository at this point in the history
* Remove forceMkv and forceHEVC
* Add HLS support and refactor
* Add new streaming endpoints
  • Loading branch information
WithoutPants authored Jul 23, 2020
1 parent e0e25c7 commit ec05b4b
Show file tree
Hide file tree
Showing 40 changed files with 768 additions and 373 deletions.
2 changes: 0 additions & 2 deletions graphql/documents/data/config.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ fragment ConfigGeneralData on ConfigGeneralResult {
previewPreset
maxTranscodeSize
maxStreamingTranscodeSize
forceMkv
forceHevc
username
password
maxSessionAge
Expand Down
2 changes: 0 additions & 2 deletions graphql/documents/data/scene.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ fragment SceneData on Scene {
...SceneMarkerData
}

is_streamable

gallery {
...GalleryData
}
Expand Down
10 changes: 9 additions & 1 deletion graphql/documents/queries/scene.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,12 @@ query ParseSceneFilenames($filter: FindFilterType!, $config: SceneParserInput!)
tag_ids
}
}
}
}

query SceneStreams($id: ID!) {
sceneStreams(id: $id) {
url
mime_type
label
}
}
3 changes: 3 additions & 0 deletions graphql/schema/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ type Query {

findScenesByPathRegex(filter: FindFilterType): FindScenesResultType!

"""Return valid stream paths"""
sceneStreams(id: ID): [SceneStreamEndpoint!]!

parseSceneFilenames(filter: FindFilterType, config: SceneParserInput!): SceneParserResultType!

"""A function which queries SceneMarker objects"""
Expand Down
8 changes: 0 additions & 8 deletions graphql/schema/types/config.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ input ConfigGeneralInput {
maxTranscodeSize: StreamingResolutionEnum
"""Max streaming transcode size"""
maxStreamingTranscodeSize: StreamingResolutionEnum
"""Force MKV as supported format"""
forceMkv: Boolean!
"""Force HEVC as a supported codec"""
forceHevc: Boolean!
"""Username"""
username: String
"""Password"""
Expand Down Expand Up @@ -71,10 +67,6 @@ type ConfigGeneralResult {
maxTranscodeSize: StreamingResolutionEnum
"""Max streaming transcode size"""
maxStreamingTranscodeSize: StreamingResolutionEnum
"""Force MKV as supported format"""
forceMkv: Boolean!
"""Force HEVC as a supported codec"""
forceHevc: Boolean!
"""Username"""
username: String!
"""Password"""
Expand Down
9 changes: 7 additions & 2 deletions graphql/schema/types/scene.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type Scene {

file: SceneFileType! # Resolver
paths: ScenePathsType! # Resolver
is_streamable: Boolean! # Resolver

scene_markers: [SceneMarker!]!
gallery: Gallery
Expand Down Expand Up @@ -138,4 +137,10 @@ type SceneParserResult {
type SceneParserResultType {
count: Int!
results: [SceneParserResult!]!
}
}

type SceneStreamEndpoint {
url: String!
mime_type: String
label: String
}
7 changes: 0 additions & 7 deletions pkg/api/resolver_model_scene.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"github.com/stashapp/stash/pkg/api/urlbuilders"
"github.com/stashapp/stash/pkg/manager"
"github.com/stashapp/stash/pkg/models"
"github.com/stashapp/stash/pkg/utils"
)
Expand Down Expand Up @@ -81,12 +80,6 @@ func (r *sceneResolver) Paths(ctx context.Context, obj *models.Scene) (*models.S
}, nil
}

func (r *sceneResolver) IsStreamable(ctx context.Context, obj *models.Scene) (bool, error) {
// ignore error
ret, _ := manager.IsStreamable(obj)
return ret, nil
}

func (r *sceneResolver) SceneMarkers(ctx context.Context, obj *models.Scene) ([]*models.SceneMarker, error) {
qb := models.NewSceneMarkerQueryBuilder()
return qb.FindBySceneID(obj.ID, nil)
Expand Down
2 changes: 0 additions & 2 deletions pkg/api/resolver_mutation_configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ func (r *mutationResolver) ConfigureGeneral(ctx context.Context, input models.Co
if input.MaxStreamingTranscodeSize != nil {
config.Set(config.MaxStreamingTranscodeSize, input.MaxStreamingTranscodeSize.String())
}
config.Set(config.ForceMKV, input.ForceMkv)
config.Set(config.ForceHEVC, input.ForceHevc)

if input.Username != nil {
config.Set(config.Username, input.Username)
Expand Down
2 changes: 0 additions & 2 deletions pkg/api/resolver_query_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ func makeConfigGeneralResult() *models.ConfigGeneralResult {
PreviewPreset: config.GetPreviewPreset(),
MaxTranscodeSize: &maxTranscodeSize,
MaxStreamingTranscodeSize: &maxStreamingTranscodeSize,
ForceMkv: config.GetForceMKV(),
ForceHevc: config.GetForceHEVC(),
Username: config.GetUsername(),
Password: config.GetPasswordHash(),
MaxSessionAge: config.GetMaxSessionAge(),
Expand Down
31 changes: 31 additions & 0 deletions pkg/api/resolver_query_scene.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package api

import (
"context"
"errors"
"strconv"

"github.com/stashapp/stash/pkg/api/urlbuilders"
"github.com/stashapp/stash/pkg/manager"
"github.com/stashapp/stash/pkg/models"
)

func (r *queryResolver) SceneStreams(ctx context.Context, id *string) ([]*models.SceneStreamEndpoint, error) {
// find the scene
qb := models.NewSceneQueryBuilder()
idInt, _ := strconv.Atoi(*id)
scene, err := qb.Find(idInt)

if err != nil {
return nil, err
}

if scene == nil {
return nil, errors.New("nil scene")
}

baseURL, _ := ctx.Value(BaseURLCtxKey).(string)
builder := urlbuilders.NewSceneURLBuilder(baseURL, scene.ID)

return manager.GetSceneStreamPaths(scene, builder.GetStreamURL())
}
165 changes: 93 additions & 72 deletions pkg/api/routes_scene.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package api

import (
"context"
"io"
"net/http"
"os"
"strconv"
"strings"

Expand All @@ -24,8 +22,15 @@ func (rs sceneRoutes) Routes() chi.Router {

r.Route("/{sceneId}", func(r chi.Router) {
r.Use(SceneCtx)
r.Get("/stream", rs.Stream)
r.Get("/stream.mp4", rs.Stream)

// streaming endpoints
r.Get("/stream", rs.StreamDirect)
r.Get("/stream.mkv", rs.StreamMKV)
r.Get("/stream.webm", rs.StreamWebM)
r.Get("/stream.m3u8", rs.StreamHLS)
r.Get("/stream.ts", rs.StreamTS)
r.Get("/stream.mp4", rs.StreamMp4)

r.Get("/screenshot", rs.Screenshot)
r.Get("/preview", rs.Preview)
r.Get("/webp", rs.Webp)
Expand All @@ -42,110 +47,126 @@ func (rs sceneRoutes) Routes() chi.Router {

// region Handlers

func (rs sceneRoutes) Stream(w http.ResponseWriter, r *http.Request) {

scene := r.Context().Value(sceneKey).(*models.Scene)

container := ""
func getSceneFileContainer(scene *models.Scene) ffmpeg.Container {
var container ffmpeg.Container
if scene.Format.Valid {
container = scene.Format.String
container = ffmpeg.Container(scene.Format.String)
} else { // container isn't in the DB
// shouldn't happen, fallback to ffprobe
tmpVideoFile, err := ffmpeg.NewVideoFile(manager.GetInstance().FFProbePath, scene.Path)
if err != nil {
logger.Errorf("[transcode] error reading video file: %s", err.Error())
return
return ffmpeg.Container("")
}

container = string(ffmpeg.MatchContainer(tmpVideoFile.Container, scene.Path))
container = ffmpeg.MatchContainer(tmpVideoFile.Container, scene.Path)
}

// detect if not a streamable file and try to transcode it instead
return container
}

func (rs sceneRoutes) StreamDirect(w http.ResponseWriter, r *http.Request) {
scene := r.Context().Value(sceneKey).(*models.Scene)

filepath := manager.GetInstance().Paths.Scene.GetStreamPath(scene.Path, scene.Checksum)
videoCodec := scene.VideoCodec.String
audioCodec := ffmpeg.MissingUnsupported
if scene.AudioCodec.Valid {
audioCodec = ffmpeg.AudioCodec(scene.AudioCodec.String)
}
hasTranscode, _ := manager.HasTranscode(scene)
if ffmpeg.IsValidCodec(videoCodec) && ffmpeg.IsValidCombo(videoCodec, ffmpeg.Container(container)) && ffmpeg.IsValidAudioForContainer(audioCodec, ffmpeg.Container(container)) || hasTranscode {
manager.RegisterStream(filepath, &w)
http.ServeFile(w, r, filepath)
manager.WaitAndDeregisterStream(filepath, &w, r)
manager.RegisterStream(filepath, &w)
http.ServeFile(w, r, filepath)
manager.WaitAndDeregisterStream(filepath, &w, r)
}

func (rs sceneRoutes) StreamMKV(w http.ResponseWriter, r *http.Request) {
// only allow mkv streaming if the scene container is an mkv already
scene := r.Context().Value(sceneKey).(*models.Scene)

container := getSceneFileContainer(scene)
if container != ffmpeg.Matroska {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("not an mkv file"))
return
}

// needs to be transcoded
rs.streamTranscode(w, r, ffmpeg.CodecMKVAudio)
}

func (rs sceneRoutes) StreamWebM(w http.ResponseWriter, r *http.Request) {
rs.streamTranscode(w, r, ffmpeg.CodecVP9)
}

func (rs sceneRoutes) StreamMp4(w http.ResponseWriter, r *http.Request) {
rs.streamTranscode(w, r, ffmpeg.CodecH264)
}

func (rs sceneRoutes) StreamHLS(w http.ResponseWriter, r *http.Request) {
scene := r.Context().Value(sceneKey).(*models.Scene)

videoFile, err := ffmpeg.NewVideoFile(manager.GetInstance().FFProbePath, scene.Path)
if err != nil {
logger.Errorf("[stream] error reading video file: %s", err.Error())
return
}

// start stream based on query param, if provided
r.ParseForm()
startTime := r.Form.Get("start")
logger.Debug("Returning HLS playlist")

encoder := ffmpeg.NewEncoder(manager.GetInstance().FFMPEGPath)
// getting the playlist manifest only
w.Header().Set("Content-Type", ffmpeg.MimeHLS)
var str strings.Builder

var stream io.ReadCloser
var process *os.Process
mimeType := ffmpeg.MimeWebm
ffmpeg.WriteHLSPlaylist(*videoFile, r.URL.String(), &str)

if audioCodec == ffmpeg.MissingUnsupported {
//ffmpeg fails if it trys to transcode a non supported audio codec
stream, process, err = encoder.StreamTranscodeVideo(*videoFile, startTime, config.GetMaxStreamingTranscodeSize())
} else {
copyVideo := false // try to be smart if the video to be transcoded is in a Matroska container
// mp4 has always supported audio so it doesn't need to be checked
// while mpeg_ts has seeking issues if we don't reencode the video

if config.GetForceMKV() { // If MKV is forced as supported and video codec is also supported then only transcode audio
if ffmpeg.Container(container) == ffmpeg.Matroska {
switch videoCodec {
case ffmpeg.H264, ffmpeg.Vp9, ffmpeg.Vp8:
copyVideo = true
case ffmpeg.Hevc:
if config.GetForceHEVC() {
copyVideo = true
}

}
}
}
requestByteRange := utils.CreateByteRange(r.Header.Get("Range"))
if requestByteRange.RawString != "" {
logger.Debugf("Requested range: %s", requestByteRange.RawString)
}

if copyVideo { // copy video stream instead of transcoding it
stream, process, err = encoder.StreamMkvTranscodeAudio(*videoFile, startTime, config.GetMaxStreamingTranscodeSize())
mimeType = ffmpeg.MimeMkv
ret := requestByteRange.Apply([]byte(str.String()))
rangeStr := requestByteRange.ToHeaderValue(int64(str.Len()))
w.Header().Set("Content-Range", rangeStr)

} else {
stream, process, err = encoder.StreamTranscode(*videoFile, startTime, config.GetMaxStreamingTranscodeSize())
}
}
w.Write(ret)
}

func (rs sceneRoutes) StreamTS(w http.ResponseWriter, r *http.Request) {
rs.streamTranscode(w, r, ffmpeg.CodecHLS)
}

func (rs sceneRoutes) streamTranscode(w http.ResponseWriter, r *http.Request, videoCodec ffmpeg.Codec) {
logger.Debugf("Streaming as %s", videoCodec.MimeType)
scene := r.Context().Value(sceneKey).(*models.Scene)

// needs to be transcoded

videoFile, err := ffmpeg.NewVideoFile(manager.GetInstance().FFProbePath, scene.Path)
if err != nil {
logger.Errorf("[stream] error transcoding video file: %s", err.Error())
logger.Errorf("[stream] error reading video file: %s", err.Error())
return
}

w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", mimeType)
// start stream based on query param, if provided
r.ParseForm()
startTime := r.Form.Get("start")

logger.Infof("[stream] transcoding video file to %s", mimeType)
var stream *ffmpeg.Stream

// handle if client closes the connection
notify := r.Context().Done()
go func() {
<-notify
logger.Info("[stream] client closed the connection. Killing stream process.")
process.Kill()
}()
audioCodec := ffmpeg.MissingUnsupported
if scene.AudioCodec.Valid {
audioCodec = ffmpeg.AudioCodec(scene.AudioCodec.String)
}

options := ffmpeg.GetTranscodeStreamOptions(*videoFile, videoCodec, audioCodec)
options.StartTime = startTime
options.MaxTranscodeSize = config.GetMaxStreamingTranscodeSize()

encoder := ffmpeg.NewEncoder(manager.GetInstance().FFMPEGPath)
stream, err = encoder.GetTranscodeStream(options)

_, err = io.Copy(w, stream)
if err != nil {
logger.Errorf("[stream] error serving transcoded video file: %s", err.Error())
logger.Errorf("[stream] error transcoding video file: %s", err.Error())
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}

stream.Serve(w, r)
}

func (rs sceneRoutes) Screenshot(w http.ResponseWriter, r *http.Request) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/urlbuilders/scene.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func NewSceneURLBuilder(baseURL string, sceneID int) SceneURLBuilder {
}

func (b SceneURLBuilder) GetStreamURL() string {
return b.BaseURL + "/scene/" + b.SceneID + "/stream.mp4"
return b.BaseURL + "/scene/" + b.SceneID + "/stream"
}

func (b SceneURLBuilder) GetStreamPreviewURL() string {
Expand Down
Loading

0 comments on commit ec05b4b

Please sign in to comment.