Skip to content

Commit

Permalink
Bind Event-Stream Encode Decode (#124)
Browse files Browse the repository at this point in the history
  • Loading branch information
waahm7 authored Dec 27, 2022
1 parent 02161b8 commit 6d8ca16
Show file tree
Hide file tree
Showing 13 changed files with 666 additions and 10 deletions.
6 changes: 6 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,9 @@
[submodule "aws-common-runtime/aws-c-sdkutils"]
path = aws-common-runtime/aws-c-sdkutils
url = https://github.com/awslabs/aws-c-sdkutils
[submodule "aws-common-runtime/aws-checksums"]
path = aws-common-runtime/aws-checksums
url = https://github.com/awslabs/aws-checksums
[submodule "aws-common-runtime/aws-c-event-stream"]
path = aws-common-runtime/aws-c-event-stream
url = https://github.com/awslabs/aws-c-event-stream
46 changes: 46 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,15 @@ var awsCCalPlatformExcludes = [
"ecdsa-fuzz-corpus/windows/p256_sig_corpus.txt",
"ecdsa-fuzz-corpus/darwin/p256_sig_corpus.txt"] + excludesFromAll

var awsCChecksumsExcludes = [
"CMakeLists.txt",
"LICENSE",
"builder.json",
"README.md",
"format-check.sh",
"cmake",
"tests"]

#if os(macOS)
awsCCalPlatformExcludes.append("source/windows")
awsCCalPlatformExcludes.append("source/unix")
Expand All @@ -116,6 +125,23 @@ awsCCalPlatformExcludes.append("source/windows")
awsCCalPlatformExcludes.append("source/darwin")
#endif

// swift never uses Microsoft Visual C++ compiler
awsCChecksumsExcludes.append("source/intel/visualc")
#if arch(arm64)
// includes source/arm
// TODO: look at the compiler flag in C
awsCChecksumsExcludes.append("source/intel")
awsCChecksumsExcludes.append("source/generic")
#elseif arch(x86_64) || arch(i386)
// include src/intel/asm
awsCChecksumsExcludes.append("source/arm")
awsCChecksumsExcludes.append("source/generic")
#else
// includes source/generic
awsCChecksumsExcludes.append("source/arm")
awsCChecksumsExcludes.append("source/intel")
#endif

let awsCSdkUtilsPlatformExcludes = ["CODE_OF_CONDUCT.md"] + excludesFromAll

var awsCCompressionPlatformExcludes = ["source/huffman_generator/", "CODE_OF_CONDUCT.md",
Expand All @@ -130,6 +156,10 @@ var awsCHttpPlatformExcludes = [
"codebuild/linux-integration-tests.yml"] + excludesFromAll

let awsCAuthPlatformExcludes = ["CODE_OF_CONDUCT.md"] + excludesFromAll
let awsCEventStreamExcludes = [
"bin",
"CODE_OF_CONDUCT.md",
"clang-tidy/run-clang-tidy.sh"] + excludesFromAll

let cFlags = ["-g", "-fno-omit-frame-pointer"]
let cSettings: [CSetting] = [
Expand Down Expand Up @@ -198,6 +228,20 @@ packageTargets.append(contentsOf: [
exclude: awsCAuthPlatformExcludes,
cSettings: cSettings
),
.target(
name: "AwsChecksums",
dependencies: ["AwsCCommon"],
path: "aws-common-runtime/aws-checksums",
exclude: awsCChecksumsExcludes,
cSettings: cSettings
),
.target(
name: "AwsCEventStream",
dependencies: ["AwsChecksums", "AwsCCommon", "AwsCIo", "AwsCCal"],
path: "aws-common-runtime/aws-c-event-stream",
exclude: awsCEventStreamExcludes,
cSettings: cSettings
),
.target(
name: "AwsCommonRuntimeKit",
dependencies: [ "AwsCAuth",
Expand All @@ -206,6 +250,8 @@ packageTargets.append(contentsOf: [
"AwsCCompression",
"AwsCIo",
"AwsCCommon",
"AwsChecksums",
"AwsCEventStream",
.product(name: "Collections", package: "swift-collections")],
path: "Source/AwsCommonRuntimeKit",
swiftSettings: [
Expand Down
4 changes: 3 additions & 1 deletion Source/AwsCommonRuntimeKit/CommonRuntimeKit.swift
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import AwsCHttp
import AwsCEventStream
import AwsCAuth

public struct CommonRuntimeKit {

public static func initialize(allocator: Allocator = defaultAllocator) {
aws_auth_library_init(allocator.rawValue)
aws_event_stream_library_init(allocator.rawValue)
}

public static func cleanUp() {
aws_auth_library_clean_up()
aws_event_stream_library_clean_up()
}

private init() {}
Expand Down
34 changes: 32 additions & 2 deletions Source/AwsCommonRuntimeKit/crt/Utilities.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,17 @@ extension Data {
return bufferData
}
}


func withAWSByteBufPointer<Result>(_ body: (UnsafeMutablePointer<aws_byte_buf>) -> Result) -> Result {
let count = self.count
return self.withUnsafeBytes { rawBufferPointer -> Result in
var byteBuf = aws_byte_buf_from_array(rawBufferPointer.baseAddress, count)
return withUnsafeMutablePointer(to: &byteBuf) {
body($0)
}
}
}

public func encodeToHexString() -> String {
map { String(format: "%02x", $0) }.joined()
}
Expand All @@ -68,17 +78,37 @@ extension aws_date_time {
}
}

extension aws_byte_buf {
func toString() -> String {
return String(
data: toData(),
encoding: .utf8)!
}

func toData() -> Data {
return Data(bytes: self.buffer, count: self.len)
}
}

extension Date {
func toAWSDate() -> aws_date_time {
var date = aws_date_time()
aws_date_time_init_epoch_secs(&date, self.timeIntervalSince1970)
return date
}

var millisecondsSince1970: Int64 {
Int64((self.timeIntervalSince1970 * 1000.0).rounded())
}

init(millisecondsSince1970: Int64) {
self = Date(timeIntervalSince1970: TimeInterval(millisecondsSince1970) / 1000)
}
}

extension TimeInterval {
var millisecond: UInt64 {
UInt64(self*1000)
UInt64((self*1000).rounded())
}
}

Expand Down
87 changes: 87 additions & 0 deletions Source/AwsCommonRuntimeKit/event-stream/EventStreamHeader.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0.

import AwsCEventStream
import Foundation

public struct EventStreamHeader {
/// max header name length is 127 bytes (Int8.max)
public static let maxNameLength = AWS_EVENT_STREAM_HEADER_NAME_LEN_MAX

public static let maxValueLength = Int16.max

/// name.count can not be greater than EventStreamHeader.maxNameLength
public var name: String

/// value.count can not be greater than EventStreamHeader.maxValueLength for supported types.
public var value: EventStreamHeaderValue
}

public enum EventStreamHeaderValue: Equatable {
case bool(value: Bool)
case byte(value: Int8)
case int16(value: Int16)
case int32(value: Int32)
case int64(value: Int64)
/// Data length can not be greater than EventStreamHeader.maxValueLength
case byteBuf(value: Data)
/// String length can not be greater than EventStreamHeader.maxValueLength
case string(value: String)
/// Date is only precise up to milliseconds.
/// It will lose the sub-millisecond precision during encoding.
case timestamp(value: Date)
case uuid(value: UUID)
}

extension EventStreamHeaderValue {
static func parseRaw(rawValue: UnsafeMutablePointer<aws_event_stream_header_value_pair>) -> EventStreamHeaderValue {
let value: EventStreamHeaderValue
switch rawValue.pointee.header_value_type {
case AWS_EVENT_STREAM_HEADER_BOOL_TRUE:
value = .bool(
value: aws_event_stream_header_value_as_bool(rawValue) != 0)
case AWS_EVENT_STREAM_HEADER_BOOL_FALSE:
value = .bool(
value: aws_event_stream_header_value_as_bool(rawValue) != 0)
case AWS_EVENT_STREAM_HEADER_BYTE:
value = .byte(value: aws_event_stream_header_value_as_byte(rawValue))
case AWS_EVENT_STREAM_HEADER_INT16:
value = .int16(
value: aws_event_stream_header_value_as_int16(rawValue))
case AWS_EVENT_STREAM_HEADER_INT32:
value = .int32(
value: aws_event_stream_header_value_as_int32(rawValue))
case AWS_EVENT_STREAM_HEADER_INT64:
value = .int64(
value: aws_event_stream_header_value_as_int64(rawValue))
case AWS_EVENT_STREAM_HEADER_BYTE_BUF:
value = .byteBuf(
value: aws_event_stream_header_value_as_bytebuf(rawValue).toData())
case AWS_EVENT_STREAM_HEADER_STRING:
value = .string(
value: aws_event_stream_header_value_as_string(rawValue).toString())
case AWS_EVENT_STREAM_HEADER_TIMESTAMP:
value = .timestamp(
value: Date(
millisecondsSince1970: aws_event_stream_header_value_as_timestamp(rawValue)))
case AWS_EVENT_STREAM_HEADER_UUID:
let uuid = UUID(uuid: rawValue.pointee.header_value.static_val)
value = .uuid(value: uuid)
default:
fatalError("Unexpected header value type found.")
}
return value
}
}

extension EventStreamHeader: Equatable {
public static func == (lhs: EventStreamHeader, rhs: EventStreamHeader) -> Bool {
if case let EventStreamHeaderValue.timestamp(value1) = lhs.value,
case let EventStreamHeaderValue.timestamp(value2) = rhs.value {
return lhs.name == rhs.name &&
value1.millisecondsSince1970 == value2.millisecondsSince1970
}
return lhs.name == rhs.name &&
lhs.value == rhs.value
}
}
137 changes: 137 additions & 0 deletions Source/AwsCommonRuntimeKit/event-stream/EventStreamMessage.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0.

import AwsCEventStream
import Foundation

public struct EventStreamMessage {
var headers: [EventStreamHeader] = [EventStreamHeader]()
var payload: Data = Data()
var allocator: Allocator = defaultAllocator

/// Get the binary format of this message (i.e. for sending across the wire manually)
/// - Returns: binary Data.
public func getEncoded() throws -> Data {
var rawValue = aws_event_stream_message()
var rawHeaders = aws_array_list()
defer {
aws_event_stream_headers_list_cleanup(&rawHeaders)
aws_event_stream_message_clean_up(&rawValue)
}

guard aws_event_stream_headers_list_init(&rawHeaders, allocator.rawValue) == AWS_OP_SUCCESS else {
throw CommonRunTimeError.crtError(.makeFromLastError())
}
try headers.forEach {
try addHeader(header: $0, rawHeaders: &rawHeaders)
}

guard payload.withAWSByteBufPointer({ byteBuff in
// TODO (optimization): we could avoid the extra copies of headers and data
// if there were an API in C that let us encode everything directly into a pre-allocated buffer
aws_event_stream_message_init(&rawValue, allocator.rawValue, &rawHeaders, byteBuff)
}) == AWS_OP_SUCCESS else {
throw CommonRunTimeError.crtError(.makeFromLastError())
}

return Data(
bytes: aws_event_stream_message_buffer(&rawValue),
count: Int(aws_event_stream_message_total_length(&rawValue)))
}
}

extension EventStreamMessage {
func addHeader(header: EventStreamHeader, rawHeaders: UnsafeMutablePointer<aws_array_list>) throws {
if header.name.count > EventStreamHeader.maxNameLength {
throw CommonRunTimeError.crtError(
.init(
code: AWS_ERROR_EVENT_STREAM_MESSAGE_INVALID_HEADERS_LEN.rawValue))
}
let addCHeader: () throws -> Int32 = {
return try header.name.withCString { headerName in
switch header.value {
case .bool(let value):
return aws_event_stream_add_bool_header(
rawHeaders,
headerName,
UInt8(header.name.count),
Int8(value.uintValue))
case .byte(let value):
return aws_event_stream_add_byte_header(
rawHeaders,
headerName,
UInt8(header.name.count),
value)
case .int16(let value):
return aws_event_stream_add_int16_header(
rawHeaders,
headerName,
UInt8(header.name.count),
value)
case .int32(let value):
return aws_event_stream_add_int32_header(
rawHeaders,
headerName,
UInt8(header.name.count),
value)
case .int64(let value):
return aws_event_stream_add_int64_header(
rawHeaders,
headerName,
UInt8(header.name.count),
value)
case .byteBuf(var value):
if value.count > EventStreamHeader.maxValueLength {
throw CommonRunTimeError.crtError(
.init(
code: AWS_ERROR_EVENT_STREAM_MESSAGE_INVALID_HEADERS_LEN.rawValue))
}
return value.withUnsafeMutableBytes {
let bytes = $0.bindMemory(to: UInt8.self).baseAddress!
return aws_event_stream_add_bytebuf_header(
rawHeaders,
headerName,
UInt8(header.name.count),
bytes,
UInt16($0.count),
1)
}
case .string(let value):
if value.count > EventStreamHeader.maxValueLength {
throw CommonRunTimeError.crtError(
.init(
code: AWS_ERROR_EVENT_STREAM_MESSAGE_INVALID_HEADERS_LEN.rawValue))
}
return value.withCString {
aws_event_stream_add_string_header(
rawHeaders,
headerName,
UInt8(header.name.count),
$0,
UInt16(value.count),
1)
}
case .timestamp(let value):
return aws_event_stream_add_timestamp_header(
rawHeaders,
headerName,
UInt8(header.name.count),
Int64(value.millisecondsSince1970))
case .uuid(let value):
return withUnsafeBytes(of: value) {
let address = $0.baseAddress?.assumingMemoryBound(to: UInt8.self)
return aws_event_stream_add_uuid_header(
rawHeaders,
headerName,
UInt8(header.name.count),
address)
}
}
}
}

guard try addCHeader() == AWS_OP_SUCCESS else {
throw CommonRunTimeError.crtError(.makeFromLastError())
}
}
}
Loading

0 comments on commit 6d8ca16

Please sign in to comment.