Skip to content

Commit

Permalink
[Playground] Log cancellation messages as warnings (#26790)
Browse files Browse the repository at this point in the history
* Log cancellation messages as warnings

* Log compilation errors as warnings
  • Loading branch information
TSultanov authored May 26, 2023
1 parent 0b43074 commit 4aebbc0
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 21 deletions.
4 changes: 2 additions & 2 deletions playground/backend/cmd/server/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ func (controller *playgroundController) GetSnippet(ctx context.Context, info *pb
func (controller *playgroundController) GetMetadata(_ context.Context, _ *pb.GetMetadataRequest) (*pb.GetMetadataResponse, error) {
commitTimestampInteger, err := strconv.ParseInt(BuildCommitTimestamp, 10, 64)
if err != nil {
logger.Errorf("GetMetadata(): failed to parse BuildCommitTimestamp (\"%s\"): %s", BuildCommitTimestamp, err.Error())
logger.Warnf("GetMetadata(): failed to parse BuildCommitTimestamp (\"%s\"): %s", BuildCommitTimestamp, err.Error())
commitTimestampInteger = 0
}

Expand All @@ -587,7 +587,7 @@ func (controller *playgroundController) GetMetadata(_ context.Context, _ *pb.Get
// verifyRouter verifies that controller is configured to work in router mode
func (controller *playgroundController) verifyRouter() error {
if controller.env.BeamSdkEnvs.ApacheBeamSdk != pb.Sdk_SDK_UNSPECIFIED {
return errors.New("runner mode")
return errors.New("server is in runner mode")
}
if controller.db == nil {
return errors.New("no database service")
Expand Down
63 changes: 44 additions & 19 deletions playground/backend/internal/code_processing/code_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package code_processing
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
Expand All @@ -31,7 +32,7 @@ import (
pb "beam.apache.org/playground/backend/internal/api/v1"
"beam.apache.org/playground/backend/internal/cache"
"beam.apache.org/playground/backend/internal/environment"
"beam.apache.org/playground/backend/internal/errors"
perrors "beam.apache.org/playground/backend/internal/errors"
"beam.apache.org/playground/backend/internal/executors"
"beam.apache.org/playground/backend/internal/fs_tool"
"beam.apache.org/playground/backend/internal/logger"
Expand Down Expand Up @@ -68,13 +69,23 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl

err := validateStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, sdkEnv, &validationResults)
if err != nil {
logger.Errorf("%s: error during validation step: %s", pipelineId, err.Error())
var pipelineCanceledError perrors.PipelineCanceledError
if errors.As(err, &pipelineCanceledError) {
logger.Warnf("%s: pipeline execution has been canceled: %s", pipelineId, pipelineCanceledError.Error())
} else {
logger.Errorf("%s: error during validation step: %s", pipelineId, err.Error())
}
return
}

err = prepareStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, sdkEnv, &validationResults, lc.GetPreparerParameters())
if err != nil {
logger.Errorf("%s: error during preparation step: %s", pipelineId, err.Error())
var pipelineCanceledError perrors.PipelineCanceledError
if errors.As(err, &pipelineCanceledError) {
logger.Warnf("%s: pipeline execution has been canceled: %s", pipelineId, pipelineCanceledError.Error())
} else {
logger.Errorf("%s: error during preparation step: %s", pipelineId, err.Error())
}
return
}

Expand All @@ -84,14 +95,28 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl

err = compileStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, sdkEnv, isUnitTest)
if err != nil {
logger.Errorf("%s: error during compilation step: %s", pipelineId, err.Error())
var pipelineCanceledError perrors.PipelineCanceledError
var compilationError perrors.CompilationError
if errors.As(err, &pipelineCanceledError) {
logger.Warnf("%s: pipeline execution has been canceled: %s", pipelineId, pipelineCanceledError.Error())
} else if errors.As(err, &compilationError) {
logger.Warnf("%s: compilation error: %s", pipelineId, compilationError.Error())
} else {
logger.Errorf("%s: error during compilation step: %s", pipelineId, err.Error())
}
return
}

// Run/RunTest
err = runStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, isUnitTest, sdkEnv, pipelineOptions)
if err != nil {
logger.Errorf("%s: error during run step: %s", pipelineId, err.Error())
var pipelineCanceledError perrors.PipelineCanceledError
if errors.As(err, &pipelineCanceledError) {
logger.Warnf("%s: pipeline execution has been canceled: %s", pipelineId, pipelineCanceledError.Error())
} else {
logger.Errorf("%s: error during run step: %s", pipelineId, err.Error())
}
return
}
}

Expand Down Expand Up @@ -206,7 +231,7 @@ func compileStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.L
if processingErr != nil {
return processingErr
}
return err
return perrors.CompilationError{Reason: err.Error()}
} // Compile step is finished and code is compiled
if err := processCompileSuccess(compileOutput.Bytes(), pipelineId, cacheService); err != nil {
return err
Expand Down Expand Up @@ -320,12 +345,12 @@ func GetProcessingOutput(ctx context.Context, cacheService cache.Cache, key uuid
value, err := cacheService.GetValue(ctx, key, subKey)
if err != nil {
logger.Errorf("%s: GetProcessingOutput(): cache.GetValue: error: %s", key, err.Error())
return "", errors.NotFoundError(errorTitle, "Error during getting output")
return "", perrors.NotFoundError(errorTitle, "Error during getting output")
}
stringValue, converted := value.(string)
if !converted {
logger.Errorf("%s: couldn't convert value to string: %s", key, value)
return "", errors.InternalError(errorTitle, "Error during getting output")
return "", perrors.InternalError(errorTitle, "Error during getting output")
}
return stringValue, nil
}
Expand All @@ -337,12 +362,12 @@ func GetProcessingStatus(ctx context.Context, cacheService cache.Cache, key uuid
value, err := cacheService.GetValue(ctx, key, cache.Status)
if err != nil {
logger.Errorf("%s: GetProcessingStatus(): cache.GetValue: error: %s", key, err.Error())
return pb.Status_STATUS_UNSPECIFIED, errors.NotFoundError(errorTitle, "Error during getting status")
return pb.Status_STATUS_UNSPECIFIED, perrors.NotFoundError(errorTitle, "Error during getting status")
}
statusValue, converted := value.(pb.Status)
if !converted {
logger.Errorf("%s: couldn't convert value to correct status enum: %s", key, value)
return pb.Status_STATUS_UNSPECIFIED, errors.InternalError(errorTitle, "Error during getting status")
return pb.Status_STATUS_UNSPECIFIED, perrors.InternalError(errorTitle, "Error during getting status")
}
return statusValue, nil
}
Expand All @@ -354,12 +379,12 @@ func GetLastIndex(ctx context.Context, cacheService cache.Cache, key uuid.UUID,
value, err := cacheService.GetValue(ctx, key, subKey)
if err != nil {
logger.Errorf("%s: GetLastIndex(): cache.GetValue: error: %s", key, err.Error())
return 0, errors.NotFoundError(errorTitle, "Error during getting pagination value")
return 0, perrors.NotFoundError(errorTitle, "Error during getting pagination value")
}
convertedValue, converted := value.(float64)
if !converted {
logger.Errorf("%s: couldn't convert value to float64. value: %s type %s", key, value, reflect.TypeOf(value))
return 0, errors.InternalError(errorTitle, "Error during getting pagination value")
return 0, perrors.InternalError(errorTitle, "Error during getting pagination value")
}
return int(convertedValue), nil
}
Expand All @@ -371,12 +396,12 @@ func GetGraph(ctx context.Context, cacheService cache.Cache, key uuid.UUID, erro
value, err := cacheService.GetValue(ctx, key, cache.Graph)
if err != nil {
logger.Errorf("%s: GetGraph(): cache.GetValue: error: %s", key, err.Error())
return "", errors.NotFoundError(errorTitle, "Error during getting graph")
return "", perrors.NotFoundError(errorTitle, "Error during getting graph")
}
stringValue, converted := value.(string)
if !converted {
logger.Errorf("%s: couldn't convert value to string. value: %s type %s", key, value, reflect.TypeOf(value))
return "", errors.InternalError(errorTitle, "Error during getting graph")
return "", perrors.InternalError(errorTitle, "Error during getting graph")
}
return stringValue, nil
}
Expand Down Expand Up @@ -414,14 +439,14 @@ func reconcileBackgroundTask(pipelineLifeCycleCtx context.Context, pipelineId uu
if err := finishByTimeout(pipelineId, cacheService); err != nil {
return false, fmt.Errorf("error during context timeout processing: %s", err.Error())
}
return false, fmt.Errorf("code processing context timeout")
return false, perrors.PipelineCanceledError{Reason: fmt.Sprintf("code processing context timeout")}
case context.Canceled:
if err := processCancel(cacheService, pipelineId); err != nil {
return false, fmt.Errorf("error during cancellation processing: %s", err.Error())
}
return false, fmt.Errorf("code processing was canceled")
return false, perrors.PipelineCanceledError{Reason: "code processing was canceled"}
default:
return false, fmt.Errorf("code processing cancelled: %s", contextErr.Error())
return false, fmt.Errorf("code processing cancelled due to unexpected reason: %s", contextErr.Error())
}
case ok := <-successChannel:
return ok, nil
Expand Down Expand Up @@ -554,15 +579,15 @@ func DeleteResources(pipelineId uuid.UUID, lc *fs_tool.LifeCycle) {

// finishByTimeout is used in case of runCode method finished by timeout
func finishByTimeout(pipelineId uuid.UUID, cacheService cache.Cache) error {
logger.Errorf("%s: code processing finishes because of timeout\n", pipelineId)
logger.Warnf("%s: code processing finishes because of timeout\n", pipelineId)

// set to cache pipelineId: cache.SubKey_Status: Status_STATUS_RUN_TIMEOUT
return utils.SetToCache(cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_TIMEOUT)
}

// processErrorWithSavingOutput processes error with saving to cache received error output.
func processErrorWithSavingOutput(err error, errorOutput []byte, pipelineId uuid.UUID, subKey cache.SubKey, cacheService cache.Cache, errorTitle string, newStatus pb.Status) error {
logger.Errorf("%s: %s(): err: %s, output: %s\n", pipelineId, errorTitle, err.Error(), errorOutput)
logger.Warnf("%s: %s(): err: %s, output: %s\n", pipelineId, errorTitle, err.Error(), errorOutput)

if err := utils.SetToCache(cacheService, pipelineId, subKey, fmt.Sprintf("error: %s\noutput: %s", err.Error(), errorOutput)); err != nil {
logger.Errorf("%s: failed to save error message to cache: %s", pipelineId, err.Error())
Expand Down
32 changes: 32 additions & 0 deletions playground/backend/internal/errors/lifecycle_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package errors

type PipelineCanceledError struct {
Reason string
}

func (e PipelineCanceledError) Error() string {
return e.Reason
}

type CompilationError struct {
Reason string
}

func (e CompilationError) Error() string {
return e.Reason
}

0 comments on commit 4aebbc0

Please sign in to comment.