Skip to content

Commit

Permalink
[Playground] Refuse RunCode requests if NUM_PARALLEL_JOBS is exceeded (
Browse files Browse the repository at this point in the history
…#26737)

* Refuse RunCode requests if NUM_PARALLEL_JOBS is exceeded

* Retry running if backend was overloaded

* Fix the embedded_run frontend integration test

* Ignore copy button when a snippet has not loaded yet

* Change run attempts and interval

---------

Co-authored-by: Alexey Inkin <alexey.inkin@akvelon.com>
Co-authored-by: Alexey Inkin <leha@inkin.ru>
  • Loading branch information
3 people authored May 26, 2023
1 parent 4aebbc0 commit 6865eef
Show file tree
Hide file tree
Showing 22 changed files with 1,186 additions and 966 deletions.
1 change: 0 additions & 1 deletion learning/tour-of-beam/frontend/lib/locator.dart
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ Future<void> _initializeRepositories() async {
final exampleClient = GrpcExampleClient(url: routerUrl);

GetIt.instance.registerSingleton<CodeClient>(codeClient);
GetIt.instance.registerSingleton(CodeRepository(client: codeClient));
GetIt.instance.registerSingleton<ExampleClient>(exampleClient);
GetIt.instance.registerSingleton(ExampleRepository(client: exampleClient));
}
Expand Down
2 changes: 1 addition & 1 deletion learning/tour-of-beam/frontend/lib/pages/tour/state.dart
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ class TourNotifier extends ChangeNotifier with PageStateMixin<void> {

static PlaygroundController _createPlaygroundController(String initialSdkId) {
final playgroundController = PlaygroundController(
codeRepository: GetIt.instance.get<CodeRepository>(),
codeClient: GetIt.instance.get<CodeClient>(),
exampleCache: ExampleCache(
exampleRepository: GetIt.instance.get<ExampleRepository>(),
),
Expand Down
6 changes: 6 additions & 0 deletions playground/backend/cmd/server/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,17 @@ type playgroundController struct {

// RunCode is running code from requests using a particular SDK
// - In case of incorrect sdk returns codes.InvalidArgument
// - In case of exceeded number of parallel jobs returns codes.ResourceExhausted
// - In case of error during preparing files/folders returns codes.Internal
// - In case of no errors saves playground.Status_STATUS_EXECUTING as cache.Status into cache and sets expiration time
// for all cache values which will be saved into cache during processing received code.
// Returns id of code processing (pipelineId)
func (controller *playgroundController) RunCode(ctx context.Context, info *pb.RunCodeRequest) (*pb.RunCodeResponse, error) {
// check if we can take a new RunCode request
if !utils.CheckNumOfTheParallelJobs(controller.env.ApplicationEnvs.WorkingDir(), controller.env.BeamSdkEnvs.NumOfParallelJobs()) {
logger.Warnf("RunCode(): number of parallel jobs is exceeded\n")
return nil, cerrors.ResourceExhaustedError("Error during preparing", "Number of parallel jobs is exceeded")
}
// check for correct sdk
if info.Sdk != controller.env.BeamSdkEnvs.ApacheBeamSdk {
logger.Errorf("RunCode(): request contains incorrect sdk: %s\n", info.Sdk)
Expand Down
6 changes: 6 additions & 0 deletions playground/backend/internal/errors/grpc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,9 @@ func InternalError(title string, formatMessage string, args ...interface{}) erro
message := fmt.Sprintf(formatMessage, args...)
return status.Errorf(codes.Internal, "%s: %s", title, message)
}

// ResourceExhaustedError returns error with ResourceExhausted code error and message like "title: message"
func ResourceExhaustedError(title string, formatMessage string, args ...interface{}) error {
message := fmt.Sprintf(formatMessage, args...)
return status.Errorf(codes.ResourceExhausted, "%s: %s", title, message)
}
6 changes: 3 additions & 3 deletions playground/backend/internal/utils/system_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,21 @@ func GetLivenessFunction() func(writer http.ResponseWriter, request *http.Reques
// GetReadinessFunction returns the function that checks the readiness of the server to process a new code processing request
func GetReadinessFunction(envs *environment.Environment) func(writer http.ResponseWriter, request *http.Request) {
return func(writer http.ResponseWriter, request *http.Request) {
if checkNumOfTheParallelJobs(envs.ApplicationEnvs.WorkingDir(), envs.BeamSdkEnvs.NumOfParallelJobs()) {
if CheckNumOfTheParallelJobs(envs.ApplicationEnvs.WorkingDir(), envs.BeamSdkEnvs.NumOfParallelJobs()) {
writer.WriteHeader(http.StatusOK)
} else {
writer.WriteHeader(http.StatusLocked)
}
}
}

// checkNumOfTheParallelJobs checks the number of currently working code executions.
// CheckNumOfTheParallelJobs checks the number of currently working code executions.
//
// It counts by the number of the /path/to/workingDir/executableFiles/{pipelineId} folders.
//
// If it is equals or more than numOfParallelJobs, then returns false.
// If it is less than numOfParallelJobs, then returns true.
func checkNumOfTheParallelJobs(workingDir string, numOfParallelJobs int) bool {
func CheckNumOfTheParallelJobs(workingDir string, numOfParallelJobs int) bool {
baseFileFolder := filepath.Join(workingDir, executableFiles)
_, err := os.Stat(baseFileFolder)
if os.IsNotExist(err) {
Expand Down
12 changes: 6 additions & 6 deletions playground/backend/internal/utils/system_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func Test_checkNumOfTheParallelJobs(t *testing.T) {
want bool
}{
{
// Test case with calling checkNumOfTheParallelJobs when there is no code processing folders.
// Test case with calling CheckNumOfTheParallelJobs when there is no code processing folders.
// As a result, want to receive true
name: "There is no code processing folder",
args: args{
Expand All @@ -69,7 +69,7 @@ func Test_checkNumOfTheParallelJobs(t *testing.T) {
want: true,
},
{
// Test case with calling checkNumOfTheParallelJobs when there is one code processing folder.
// Test case with calling CheckNumOfTheParallelJobs when there is one code processing folder.
// As a result, want to receive true
name: "Less than needed",
args: args{
Expand All @@ -85,7 +85,7 @@ func Test_checkNumOfTheParallelJobs(t *testing.T) {
want: true,
},
{
// Test case with calling checkNumOfTheParallelJobs when the number of the code processing folders is equals numOfParallelJobs.
// Test case with calling CheckNumOfTheParallelJobs when the number of the code processing folders is equals numOfParallelJobs.
// As a result, want to receive false
name: "There are enough code processing folders",
args: args{
Expand All @@ -101,7 +101,7 @@ func Test_checkNumOfTheParallelJobs(t *testing.T) {
want: false,
},
{
// Test case with calling checkNumOfTheParallelJobs when the number of the code processing folders is more than numOfParallelJobs.
// Test case with calling CheckNumOfTheParallelJobs when the number of the code processing folders is more than numOfParallelJobs.
// As a result, want to receive false
name: "More than needed",
args: args{
Expand All @@ -120,8 +120,8 @@ func Test_checkNumOfTheParallelJobs(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.prepareFunc()
if got := checkNumOfTheParallelJobs(tt.args.workingDir, tt.args.numOfParallelJobs); got != tt.want {
t.Errorf("checkNumOfTheParallelJobs() = %v, want %v", got, tt.want)
if got := CheckNumOfTheParallelJobs(tt.args.workingDir, tt.args.numOfParallelJobs); got != tt.want {
t.Errorf("CheckNumOfTheParallelJobs() = %v, want %v", got, tt.want)
}
os.RemoveAll(executableFiles)
})
Expand Down
2 changes: 1 addition & 1 deletion playground/frontend/lib/controllers/factories.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ PlaygroundController createPlaygroundController(
final controller = PlaygroundController(
examplesLoader: ExamplesLoader(),
exampleCache: exampleCache,
codeRepository: GetIt.instance.get<CodeRepository>(),
codeClient: GetIt.instance.get<CodeClient>(),
);

unawaited(_loadExamples(controller, descriptor));
Expand Down
3 changes: 0 additions & 3 deletions playground/frontend/lib/locator.dart
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ Future<void> _initializeRepositories() async {
);

GetIt.instance.registerSingleton<CodeClient>(codeClient);
GetIt.instance.registerSingleton(CodeRepository(
client: codeClient,
));

final exampleClient = GrpcExampleClient(url: routerUrl);
GetIt.instance.registerSingleton<ExampleClient>(exampleClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ class EmbeddedAppBarTitle extends StatelessWidget {
icon: SvgPicture.asset(Assets.copy),
onPressed: () {
final source = controller.source;
Clipboard.setData(ClipboardData(text: source));
if (source != null) {
Clipboard.setData(ClipboardData(text: source));
}
},
),
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ export 'src/playground_components.dart';
export 'src/repositories/backend_urls.dart';
export 'src/repositories/code_client/code_client.dart';
export 'src/repositories/code_client/grpc_code_client.dart';
export 'src/repositories/code_repository.dart';
export 'src/repositories/example_client/example_client.dart';
export 'src/repositories/example_client/grpc_example_client.dart';
export 'src/repositories/example_repository.dart';
Expand Down
Loading

0 comments on commit 6865eef

Please sign in to comment.