Skip to content

Commit

Permalink
Added syncShutdown() throws to the ByteBufferLambdaHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
fabianfett committed Jun 12, 2020
1 parent a5e2fd0 commit feb35c7
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 9 deletions.
11 changes: 11 additions & 0 deletions Sources/AWSLambdaRuntimeCore/LambdaHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,17 @@ public protocol ByteBufferLambdaHandler {
/// - Returns: An `EventLoopFuture` to report the result of the Lambda back to the runtime engine.
/// The `EventLoopFuture` should be completed with either a response encoded as `ByteBuffer` or an `Error`
func handle(context: Lambda.Context, event: ByteBuffer) -> EventLoopFuture<ByteBuffer?>

/// The method to clean up your resources.
/// Concrete Lambda handlers implement this method to shutdown their `HTTPClient`s and database connections.
///
/// - Note: In case your Lambda fails while creating your LambdaHandler in the `HandlerFactory`, this method
/// **is not invoked**. In this case you must cleanup the created resources immediately in the `HandlerFactory`.
func syncShutdown() throws
}

public extension ByteBufferLambdaHandler {
func syncShutdown() throws {}
}

private enum CodecError: Error {
Expand Down
33 changes: 24 additions & 9 deletions Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ extension Lambda {

private var state = State.idle {
willSet {
assert(self.eventLoop.inEventLoop, "State may only be changed on the `Lifecycle`'s `eventLoop`")
self.eventLoop.assertInEventLoop()
precondition(newValue.order > self.state.order, "invalid state \(newValue) after \(self.state.order)")
}
}
Expand Down Expand Up @@ -71,22 +71,37 @@ extension Lambda {
///
/// - note: This method must be called on the `EventLoop` the `Lifecycle` has been initialized with.
public func start() -> EventLoopFuture<Void> {
assert(self.eventLoop.inEventLoop, "Start must be called on the `EventLoop` the `Lifecycle` has been initialized with.")
self.eventLoop.assertInEventLoop()

logger.info("lambda lifecycle starting with \(self.configuration)")
self.state = .initializing
// triggered when the Lambda has finished its last run
let finishedPromise = self.eventLoop.makePromise(of: Int.self)
finishedPromise.futureResult.always { _ in
self.markShutdown()
}.cascade(to: self.shutdownPromise)

var logger = self.logger
logger[metadataKey: "lifecycleId"] = .string(self.configuration.lifecycle.id)
let runner = Runner(eventLoop: self.eventLoop, configuration: self.configuration)
return runner.initialize(logger: logger, factory: self.factory).map { handler in

let startupFuture = runner.initialize(logger: logger, factory: self.factory)
startupFuture.flatMap { handler in
// after the startup future has succeeded, we have a handler that we can use
// to `run` the lambda.
let finishedPromise = self.eventLoop.makePromise(of: Int.self)
self.state = .active(runner, handler)
self.run(promise: finishedPromise)
}
return finishedPromise.futureResult.always { _ in
// If the lambda is terminated (e.g. LocalServer shutdown), we make sure
// developers have the chance to cleanup their resources.
do {
try handler.syncShutdown()
} catch {
logger.error("Error shutting down handler: \(error)")
}
}
}.always { _ in
// triggered when the Lambda has finished its last run or has a startup failure.
self.markShutdown()
}.cascade(to: self.shutdownPromise)

return startupFuture.map { _ in }
}

// MARK: - Private
Expand Down
110 changes: 110 additions & 0 deletions Tests/AWSLambdaRuntimeCoreTests/LambdaLifecycleTest.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

@testable import AWSLambdaRuntimeCore
import Logging
import NIO
import NIOHTTP1
import XCTest

class LambdaLifecycleTest: XCTestCase {
func testShutdownFutureIsFulfilledWithStartUpError() {
let server = MockLambdaServer(behavior: FailedBootstrapBehavior())
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }

let eventLoop = eventLoopGroup.next()
let logger = Logger(label: "TestLogger")
let testError = TestError("kaboom")
let lifecycle = Lambda.Lifecycle(eventLoop: eventLoop, logger: logger, factory: {
$0.makeFailedFuture(testError)
})

// eventLoop.submit in this case returns an EventLoopFuture<EventLoopFuture<ByteBufferHandler>>
// which is why we need `wait().wait()`
XCTAssertThrowsError(_ = try eventLoop.flatSubmit { lifecycle.start() }.wait()) { error in
XCTAssertEqual(testError, error as? TestError)
}

XCTAssertThrowsError(_ = try lifecycle.shutdownFuture.wait()) { error in
XCTAssertEqual(testError, error as? TestError)
}
}

func testSyncShutdownIsCalledWhenLambdaShutsdown() {
struct CallbackLambdaHandler: ByteBufferLambdaHandler {
let handler: (Lambda.Context, ByteBuffer) -> (EventLoopFuture<ByteBuffer?>)
let shutdown: () throws -> Void

init(_ handler: @escaping (Lambda.Context, ByteBuffer) -> (EventLoopFuture<ByteBuffer?>), shutdown: @escaping () throws -> Void) {
self.handler = handler
self.shutdown = shutdown
}

func handle(context: Lambda.Context, event: ByteBuffer) -> EventLoopFuture<ByteBuffer?> {
self.handler(context, event)
}

func syncShutdown() throws {
try self.shutdown()
}
}

let server = MockLambdaServer(behavior: BadBehavior())
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }

var count = 0
let handler = CallbackLambdaHandler({ XCTFail("Should not be reached"); return $0.eventLoop.makeSucceededFuture($1) }) {
count += 1
}

let eventLoop = eventLoopGroup.next()
let logger = Logger(label: "TestLogger")
let lifecycle = Lambda.Lifecycle(eventLoop: eventLoop, logger: logger, factory: {
$0.makeSucceededFuture(handler)
})

XCTAssertNoThrow(_ = try eventLoop.flatSubmit { lifecycle.start() }.wait())
XCTAssertThrowsError(_ = try lifecycle.shutdownFuture.wait()) { error in
XCTAssertEqual(.badStatusCode(HTTPResponseStatus.internalServerError), error as? Lambda.RuntimeError)
}
XCTAssertEqual(count, 1)
}
}

struct BadBehavior: LambdaServerBehavior {
func getInvocation() -> GetInvocationResult {
.failure(.internalServerError)
}

func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
XCTFail("should not report a response")
return .failure(.internalServerError)
}

func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report an error")
return .failure(.internalServerError)
}

func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report an error")
return .failure(.internalServerError)
}
}

0 comments on commit feb35c7

Please sign in to comment.