Skip to content

Commit

Permalink
feat: Funqy Amazon Lambda support more Amazon events
Browse files Browse the repository at this point in the history
- Refactor, so that the implementation is less invasive.
  Using the JacksonInputReader and JacksonOutputWriter for
  the advanced event handling.
- Fix issue in Kinesis event handling
- Adjust and add tests
  • Loading branch information
holomekc committed Jul 7, 2024
1 parent 53b8a33 commit a3e52d9
Show file tree
Hide file tree
Showing 17 changed files with 411 additions and 348 deletions.
4 changes: 4 additions & 0 deletions devtools/gradle/gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
version = 999-SNAPSHOT
systemProp.http.proxyHost=localhost
systemProp.http.proxyPort=3128
systemProp.https.proxyHost=localhost
systemProp.https.proxyPort=3128
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public void init(List<FunctionBuildItem> functions,

@BuildStep
@Record(RUNTIME_INIT)
public RuntimeComplete choose(FunqyConfig config, FunqyAmazonConfig amazonConfig,
public RuntimeComplete choose(FunqyConfig config,
FunqyAmazonConfig amazonConfig,
FunqyLambdaBindingRecorder recorder) {
recorder.chooseInvoker(config, amazonConfig);
return new RuntimeComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public Uni<Void> snsFunction(SNSEvent.SNSRecord msg) {

@Funq("cloudevents-function")
public Uni<Void> cloudEventsFunction(CloudEvent msg) {
// Due to jackson deserialization the base64 decoding already happened.
if (new String(msg.getData().toBytes(), StandardCharsets.UTF_8).contains("true")) {
return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error."));
}
Expand All @@ -47,6 +48,7 @@ public Uni<Void> cloudEventsFunction(CloudEvent msg) {

@Funq("kinesis-function")
public Uni<Void> kinesisFunction(KinesisEvent.Record msg) {
// Due to jackson deserialization the base64 decoding already happened.
if (StandardCharsets.UTF_8.decode(msg.getData()).toString().contains("true")) {
return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error."));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"eventID": "1",
"eventVersion": "1.0",
"dynamodb": {
"ApproximateCreationDateTime": 1719318377.0,
"Keys": {
"Id": {
"N": "1"
Expand Down Expand Up @@ -33,6 +34,7 @@
"eventID": "2",
"eventVersion": "1.0",
"dynamodb": {
"ApproximateCreationDateTime": 1719318377.0,
"NewImage": {
"Message": {
"S": "fail"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,20 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.module.SimpleModule;

import io.quarkus.amazon.lambda.runtime.AbstractLambdaPollLoop;
import io.quarkus.amazon.lambda.runtime.AmazonLambdaContext;
import io.quarkus.amazon.lambda.runtime.AmazonLambdaMapperRecorder;
import io.quarkus.amazon.lambda.runtime.JacksonInputReader;
import io.quarkus.amazon.lambda.runtime.JacksonOutputWriter;
import io.quarkus.amazon.lambda.runtime.LambdaInputReader;
import io.quarkus.amazon.lambda.runtime.LambdaOutputWriter;
import io.quarkus.arc.ManagedContext;
import io.quarkus.arc.runtime.BeanContainer;
import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig;
import io.quarkus.funqy.lambda.config.FunqyAmazonConfig;
import io.quarkus.funqy.lambda.event.AwsModule;
import io.quarkus.funqy.lambda.event.EventDeserializer;
import io.quarkus.funqy.lambda.event.AwsEventInputReader;
import io.quarkus.funqy.lambda.event.AwsEventOutputWriter;
import io.quarkus.funqy.lambda.event.EventProcessor;
import io.quarkus.funqy.lambda.model.FunqyMethod;
import io.quarkus.funqy.runtime.FunctionConstructor;
Expand All @@ -45,35 +46,28 @@ public class FunqyLambdaBindingRecorder {

private static FunctionInvoker invoker;
private static BeanContainer beanContainer;
private static LambdaInputReader reader;
private static LambdaOutputWriter writer;
private static EventProcessor eventProcessor;
private static FunqyAmazonBuildTimeConfig amazonBuildTimeConfig;

public void init(BeanContainer bc, FunqyAmazonBuildTimeConfig buildTimeConfig) {
beanContainer = bc;
FunctionConstructor.CONTAINER = bc;
// We create a copy, because we register a custom deserializer for everything.
ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper.copy();
EventDeserializer eventDeserializer = new EventDeserializer(buildTimeConfig);
final SimpleModule simpleModule = new AwsModule();
simpleModule.addDeserializer(Object.class, eventDeserializer);
objectMapper.registerModule(simpleModule);
amazonBuildTimeConfig = buildTimeConfig;
ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper;

for (FunctionInvoker invoker : FunctionRecorder.registry.invokers()) {
ObjectReader reader = null;
JavaType javaInputType = null;
if (invoker.hasInput()) {
javaInputType = objectMapper.constructType(invoker.getInputType());
reader = objectMapper.readerFor(javaInputType);
JavaType javaInputType = objectMapper.constructType(invoker.getInputType());
ObjectReader reader = objectMapper.readerFor(javaInputType);
invoker.getBindingContext().put(ObjectReader.class.getName(), reader);
}
ObjectWriter writer = null;
JavaType javaOutputType = null;
if (invoker.hasOutput()) {
javaOutputType = objectMapper.constructType(invoker.getOutputType());
writer = objectMapper.writerFor(javaOutputType);
JavaType javaOutputType = objectMapper.constructType(invoker.getOutputType());
ObjectWriter writer = objectMapper.writerFor(javaOutputType);
invoker.getBindingContext().put(ObjectWriter.class.getName(), writer);
}
invoker.getBindingContext().put(EventProcessor.class.getName(),
new EventProcessor(objectMapper, eventDeserializer,
new FunqyMethod(reader, writer, javaInputType, javaOutputType),
buildTimeConfig));
}
}

Expand All @@ -92,8 +86,34 @@ public void chooseInvoker(FunqyConfig config, FunqyAmazonConfig amazonConfig) {
} else {
invoker = FunctionRecorder.registry.invokers().iterator().next();
}
eventProcessor = (EventProcessor) invoker.getBindingContext().get(EventProcessor.class.getName());
eventProcessor.init(amazonConfig);

ObjectReader objectReader = null;
if (invoker.hasInput()) {
objectReader = (ObjectReader) invoker.getBindingContext().get(ObjectReader.class.getName());

if (amazonBuildTimeConfig.advancedEventHandling().enabled()) {
// We create a copy, because the mapper will be reconfigured for the advanced event handling,
// and we do not want to adjust the ObjectMapper, which is available in arc context.
ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper.copy();
reader = new AwsEventInputReader(objectMapper, objectReader, amazonBuildTimeConfig);
} else {
reader = new JacksonInputReader(objectReader);
}

}
if (invoker.hasOutput()) {
ObjectWriter objectWriter = (ObjectWriter) invoker.getBindingContext().get(ObjectWriter.class.getName());

if (!amazonBuildTimeConfig.advancedEventHandling().enabled()) {
writer = new JacksonOutputWriter(objectWriter);
}
}
if (amazonBuildTimeConfig.advancedEventHandling().enabled()) {
ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper.copy();
writer = new AwsEventOutputWriter(objectMapper);

eventProcessor = new EventProcessor(objectReader, amazonBuildTimeConfig, amazonConfig);
}
}

/**
Expand All @@ -109,7 +129,16 @@ public void chooseInvoker(FunqyConfig config, FunqyAmazonConfig amazonConfig) {
* Is thrown in case the (de)serialization fails
*/
public static void handle(InputStream inputStream, OutputStream outputStream, Context context) throws IOException {
eventProcessor.handle(inputStream, outputStream, FunqyLambdaBindingRecorder::dispatch, context);
Object input = null;
if (invoker.hasInput()) {
input = reader.readValue(inputStream);
}
FunqyServerResponse response = dispatch(input, context);

Object value = response.getOutput().await().indefinitely();
if (value != null) {
writer.writeValue(outputStream, value);
}
}

@SuppressWarnings("rawtypes")
Expand All @@ -119,34 +148,43 @@ public void startPollLoop(ShutdownContext context, LaunchMode launchMode) {

@Override
protected Object processRequest(Object input, AmazonLambdaContext context) throws Exception {
throw new RuntimeException("Unreachable");
FunqyServerResponse response = dispatch(input, context);
return response.getOutput().await().indefinitely();
}

@Override
protected LambdaInputReader getInputReader() {
throw new RuntimeException("Unreachable");
return reader;
}

@Override
protected LambdaOutputWriter getOutputWriter() {
throw new RuntimeException("Unreachable");
return writer;
}

@Override
protected boolean isStream() {
return true;
return false;
}

@Override
protected void processRequest(InputStream input, OutputStream output, AmazonLambdaContext context)
throws Exception {
handle(input, output, context);
throw new RuntimeException("Unreachable!");
}
};
loop.startPollLoop(context);

}

private static FunqyServerResponse dispatch(Object input, Context context) throws IOException {
if (eventProcessor != null) {
return eventProcessor.handle(input, FunqyLambdaBindingRecorder::dispatch, context);
} else {
return dispatch(input);
}
}

private static FunqyServerResponse dispatch(Object input) {
ManagedContext requestContext = beanContainer.requestContext();
requestContext.activate();
Expand Down
Loading

0 comments on commit a3e52d9

Please sign in to comment.