Skip to content

Commit

Permalink
Added shutdown() -> EventLoopFuture<Void> to the `ByteBufferLambdaH…
Browse files Browse the repository at this point in the history
…andler` (#122)

* Added `syncShutdown() throws` to the `ByteBufferLambdaHandler`

* Updated to use a `ShutdownContext`

* Review comments addressed.
  • Loading branch information
fabianfett authored Jun 17, 2020
1 parent 7728066 commit 437a60d
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 9 deletions.
24 changes: 24 additions & 0 deletions Sources/AWSLambdaRuntimeCore/LambdaContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,27 @@ extension Lambda {
}
}
}

// MARK: - ShutdownContext

extension Lambda {
/// Lambda runtime shutdown context.
/// The Lambda runtime generates and passes the `ShutdownContext` to the Lambda handler as an argument.
public final class ShutdownContext {
/// `Logger` to log with
///
/// - note: The `LogLevel` can be configured using the `LOG_LEVEL` environment variable.
public let logger: Logger

/// The `EventLoop` the Lambda is executed on. Use this to schedule work with.
///
/// - note: The `EventLoop` is shared with the Lambda runtime engine and should be handled with extra care.
/// Most importantly the `EventLoop` must never be blocked.
public let eventLoop: EventLoop

internal init(logger: Logger, eventLoop: EventLoop) {
self.eventLoop = eventLoop
self.logger = logger
}
}
}
13 changes: 13 additions & 0 deletions Sources/AWSLambdaRuntimeCore/LambdaHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,19 @@ public protocol ByteBufferLambdaHandler {
/// - Returns: An `EventLoopFuture` to report the result of the Lambda back to the runtime engine.
/// The `EventLoopFuture` should be completed with either a response encoded as `ByteBuffer` or an `Error`
func handle(context: Lambda.Context, event: ByteBuffer) -> EventLoopFuture<ByteBuffer?>

/// The method to clean up your resources.
/// Concrete Lambda handlers implement this method to shutdown their `HTTPClient`s and database connections.
///
/// - Note: In case your Lambda fails while creating your LambdaHandler in the `HandlerFactory`, this method
/// **is not invoked**. In this case you must cleanup the created resources immediately in the `HandlerFactory`.
func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture<Void>
}

public extension ByteBufferLambdaHandler {
func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture<Void> {
context.eventLoop.makeSucceededFuture(Void())
}
}

private enum CodecError: Error {
Expand Down
37 changes: 29 additions & 8 deletions Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ extension Lambda {

private var state = State.idle {
willSet {
assert(self.eventLoop.inEventLoop, "State may only be changed on the `Lifecycle`'s `eventLoop`")
self.eventLoop.assertInEventLoop()
precondition(newValue.order > self.state.order, "invalid state \(newValue) after \(self.state.order)")
}
}
Expand Down Expand Up @@ -71,22 +71,43 @@ extension Lambda {
///
/// - note: This method must be called on the `EventLoop` the `Lifecycle` has been initialized with.
public func start() -> EventLoopFuture<Void> {
assert(self.eventLoop.inEventLoop, "Start must be called on the `EventLoop` the `Lifecycle` has been initialized with.")
self.eventLoop.assertInEventLoop()

logger.info("lambda lifecycle starting with \(self.configuration)")
self.state = .initializing
// triggered when the Lambda has finished its last run
let finishedPromise = self.eventLoop.makePromise(of: Int.self)
finishedPromise.futureResult.always { _ in
self.markShutdown()
}.cascade(to: self.shutdownPromise)

var logger = self.logger
logger[metadataKey: "lifecycleId"] = .string(self.configuration.lifecycle.id)
let runner = Runner(eventLoop: self.eventLoop, configuration: self.configuration)
return runner.initialize(logger: logger, factory: self.factory).map { handler in

let startupFuture = runner.initialize(logger: logger, factory: self.factory)
startupFuture.flatMap { handler -> EventLoopFuture<(ByteBufferLambdaHandler, Result<Int, Error>)> in
// after the startup future has succeeded, we have a handler that we can use
// to `run` the lambda.
let finishedPromise = self.eventLoop.makePromise(of: Int.self)
self.state = .active(runner, handler)
self.run(promise: finishedPromise)
return finishedPromise.futureResult.mapResult { (handler, $0) }
}
.flatMap { (handler, runnerResult) -> EventLoopFuture<Int> in
// after the lambda finishPromise has succeeded or failed we need to
// shutdown the handler
let shutdownContext = ShutdownContext(logger: logger, eventLoop: self.eventLoop)
return handler.shutdown(context: shutdownContext).flatMapErrorThrowing { error in
// if, we had an error shuting down the lambda, we want to concatenate it with
// the runner result
logger.error("Error shutting down handler: \(error)")
throw RuntimeError.shutdownError(shutdownError: error, runnerResult: runnerResult)
}.flatMapResult { (_) -> Result<Int, Error> in
// we had no error shutting down the lambda. let's return the runner's result
runnerResult
}
}.always { _ in
// triggered when the Lambda has finished its last run or has a startup failure.
self.markShutdown()
}.cascade(to: self.shutdownPromise)

return startupFuture.map { _ in }
}

// MARK: - Private
Expand Down
2 changes: 1 addition & 1 deletion Sources/AWSLambdaRuntimeCore/LambdaRunner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private extension Lambda.Context {
}

// TODO: move to nio?
private extension EventLoopFuture {
extension EventLoopFuture {
// callback does not have side effects, failing with original result
func peekError(_ callback: @escaping (Error) -> Void) -> EventLoopFuture<Value> {
self.flatMapError { error in
Expand Down
1 change: 1 addition & 0 deletions Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ internal extension Lambda {
case invocationMissingHeader(String)
case noBody
case json(Error)
case shutdownError(shutdownError: Error, runnerResult: Result<Int, Error>)
}
}

Expand Down
142 changes: 142 additions & 0 deletions Tests/AWSLambdaRuntimeCoreTests/LambdaLifecycleTest.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

@testable import AWSLambdaRuntimeCore
import Logging
import NIO
import NIOHTTP1
import XCTest

class LambdaLifecycleTest: XCTestCase {
func testShutdownFutureIsFulfilledWithStartUpError() {
let server = MockLambdaServer(behavior: FailedBootstrapBehavior())
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }

let eventLoop = eventLoopGroup.next()
let logger = Logger(label: "TestLogger")
let testError = TestError("kaboom")
let lifecycle = Lambda.Lifecycle(eventLoop: eventLoop, logger: logger, factory: {
$0.eventLoop.makeFailedFuture(testError)
})

// eventLoop.submit in this case returns an EventLoopFuture<EventLoopFuture<ByteBufferHandler>>
// which is why we need `wait().wait()`
XCTAssertThrowsError(_ = try eventLoop.flatSubmit { lifecycle.start() }.wait()) { error in
XCTAssertEqual(testError, error as? TestError)
}

XCTAssertThrowsError(_ = try lifecycle.shutdownFuture.wait()) { error in
XCTAssertEqual(testError, error as? TestError)
}
}

struct CallbackLambdaHandler: ByteBufferLambdaHandler {
let handler: (Lambda.Context, ByteBuffer) -> (EventLoopFuture<ByteBuffer?>)
let shutdown: (Lambda.ShutdownContext) -> EventLoopFuture<Void>

init(_ handler: @escaping (Lambda.Context, ByteBuffer) -> (EventLoopFuture<ByteBuffer?>), shutdown: @escaping (Lambda.ShutdownContext) -> EventLoopFuture<Void>) {
self.handler = handler
self.shutdown = shutdown
}

func handle(context: Lambda.Context, event: ByteBuffer) -> EventLoopFuture<ByteBuffer?> {
self.handler(context, event)
}

func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture<Void> {
self.shutdown(context)
}
}

func testShutdownIsCalledWhenLambdaShutsdown() {
let server = MockLambdaServer(behavior: BadBehavior())
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }

var count = 0
let handler = CallbackLambdaHandler({ XCTFail("Should not be reached"); return $0.eventLoop.makeSucceededFuture($1) }) { context in
count += 1
return context.eventLoop.makeSucceededFuture(Void())
}

let eventLoop = eventLoopGroup.next()
let logger = Logger(label: "TestLogger")
let lifecycle = Lambda.Lifecycle(eventLoop: eventLoop, logger: logger, factory: {
$0.eventLoop.makeSucceededFuture(handler)
})

XCTAssertNoThrow(_ = try eventLoop.flatSubmit { lifecycle.start() }.wait())
XCTAssertThrowsError(_ = try lifecycle.shutdownFuture.wait()) { error in
XCTAssertEqual(.badStatusCode(HTTPResponseStatus.internalServerError), error as? Lambda.RuntimeError)
}
XCTAssertEqual(count, 1)
}

func testLambdaResultIfShutsdownIsUnclean() {
let server = MockLambdaServer(behavior: BadBehavior())
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }

var count = 0
let handler = CallbackLambdaHandler({ XCTFail("Should not be reached"); return $0.eventLoop.makeSucceededFuture($1) }) { context in
count += 1
return context.eventLoop.makeFailedFuture(TestError("kaboom"))
}

let eventLoop = eventLoopGroup.next()
let logger = Logger(label: "TestLogger")
let lifecycle = Lambda.Lifecycle(eventLoop: eventLoop, logger: logger, factory: {
$0.eventLoop.makeSucceededFuture(handler)
})

XCTAssertNoThrow(_ = try eventLoop.flatSubmit { lifecycle.start() }.wait())
XCTAssertThrowsError(_ = try lifecycle.shutdownFuture.wait()) { error in
guard case Lambda.RuntimeError.shutdownError(let shutdownError, .failure(let runtimeError)) = error else {
XCTFail("Unexpected error"); return
}

XCTAssertEqual(shutdownError as? TestError, TestError("kaboom"))
XCTAssertEqual(runtimeError as? Lambda.RuntimeError, .badStatusCode(.internalServerError))
}
XCTAssertEqual(count, 1)
}
}

struct BadBehavior: LambdaServerBehavior {
func getInvocation() -> GetInvocationResult {
.failure(.internalServerError)
}

func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
XCTFail("should not report a response")
return .failure(.internalServerError)
}

func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report an error")
return .failure(.internalServerError)
}

func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report an error")
return .failure(.internalServerError)
}
}

0 comments on commit 437a60d

Please sign in to comment.