Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Streamlined concurrency, generic operations and fanout|join are available #39

Merged
merged 1 commit into from
Apr 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion web3swift/Concurrency/Classes/Web3+Concurrency.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ public class OperationDispatcher {
private var provider: Web3Provider
private var queue: OperationQueue
public var policy: DispatchPolicy
// private var pendingRequests: SynchronizedArray<Request> = SynchronizedArray<Request>()
private var pendingRequests = [Request]()
private var schedulingOperation: Operation? = nil
private var lockQueue: DispatchQueue
Expand Down
94 changes: 88 additions & 6 deletions web3swift/Concurrency/Classes/Web3+ConversionOperations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,36 @@ import Foundation
import Result
import BigInt

final class ResultUnwrapOperation: Web3Operation {
override func main() {
if (error != nil) {
return self.processError(self.error!)
}
guard let _ = self.next else {return processError(Web3Error.inputError("Invalid input supplied"))}
guard inputData != nil else {return processError(Web3Error.inputError("Invalid input supplied"))}
guard let input = inputData! as? [String: Any] else {return processError(Web3Error.dataError)}
let result = ResultUnwrapper.getResponse(input)
switch result {
case .failure(let error):
return processError(error)
case .success(let payload):
return processSuccess(payload as AnyObject)
}
}
}

final class ConversionOperation<T>: Web3Operation {
override func main() {
if (error != nil) {
return self.processError(self.error!)
}
guard let _ = self.next else {return processError(Web3Error.inputError("Invalid input supplied"))}
guard inputData != nil else {return processError(Web3Error.inputError("Invalid input supplied"))}
guard let input = inputData! as? T else {return processError(Web3Error.dataError)}
return processSuccess(input as AnyObject)
}
}

final class BigUIntConversionOperation: Web3Operation {

override func main() {
Expand Down Expand Up @@ -180,19 +210,71 @@ final class StringConversionOperation: Web3Operation {
}
}

final class FlattenOperation: Web3Operation {
final class JoinOperation: Web3Operation {
convenience init(_ web3Instance: web3, queue: OperationQueue? = nil, operations: [Web3Operation]) {
self.init(web3Instance, queue: queue, inputData: operations as AnyObject)
}

override func main() {
if (error != nil) {
return self.processError(self.error!)
}
guard let _ = self.next else {return processError(Web3Error.inputError("Invalid input supplied"))}
guard inputData != nil else {return processError(Web3Error.inputError("Invalid input supplied"))}
guard let input = inputData! as? [AnyObject?] else {return processError(Web3Error.dataError)}
let notNilElements = input.filter { (el) -> Bool in
return el != nil
guard let operations = inputData! as? [Web3Operation] else {return processError(Web3Error.dataError)}
var resultsArray = [AnyObject]()
let lockQueue = DispatchQueue.init(label: "joinQueue")
var expectedOperations = operations.count
var earlyReturn = false

let joiningCallback = { (res: Result<AnyObject, Web3Error>) -> () in
switch res {
case .success(let result):
lockQueue.sync() {
expectedOperations = expectedOperations - 1
guard let ev = result as? [AnyObject] else {
if (!earlyReturn) {
earlyReturn = true
return self.processError(Web3Error.dataError)
} else {
return
}
}
resultsArray.append(contentsOf: ev)
guard let currentQueue = OperationQueue.current else {
if (!earlyReturn) {
earlyReturn = true
return self.processError(Web3Error.dataError)
} else {
return
}
}

if expectedOperations == 0 {
if (!earlyReturn) {
earlyReturn = true
currentQueue.underlyingQueue?.async(execute: {
self.processSuccess(resultsArray as AnyObject)
})
} else {
return
}
}
}
case .failure(let error):
lockQueue.sync() {
if (!earlyReturn) {
earlyReturn = true
return self.processError(error)
} else {
return
}
}
}
}
return processSuccess(notNilElements as AnyObject)

for op in operations {
op.next = OperationChainingType.callback(joiningCallback, self.expectedQueue)
}
self.expectedQueue.addOperations(operations, waitUntilFinished: false)
}
}
64 changes: 8 additions & 56 deletions web3swift/Concurrency/Classes/Web3+EventOperations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ final class ParseBlockForEventsOperation: Web3Operation {
if (error != nil) {
return self.processError(self.error!)
}
guard let _ = self.next else {return processError(Web3Error.inputError("Invalid input supplied"))}
guard let completion = self.next else {return processError(Web3Error.inputError("Invalid input supplied"))}
guard inputData != nil else {return processError(Web3Error.inputError("Invalid input supplied"))}
guard let input = inputData! as? [AnyObject] else {return processError(Web3Error.inputError("Invalid input supplied"))}
guard input.count == 4 else {return processError(Web3Error.inputError("Invalid input supplied"))}
Expand All @@ -40,58 +40,7 @@ final class ParseBlockForEventsOperation: Web3Operation {
let filter = input[2] as? EventFilter
guard let blockNumber = input[3] as? String else {return processError(Web3Error.inputError("Invalid input supplied"))}
let getBlockOperation = GetBlockByNumberOperation.init(self.web3, queue: self.expectedQueue, blockNumber: blockNumber, fullTransactions: false)
var resultsArray = [EventParserResultProtocol]()
let lockQueue = DispatchQueue.init(label: "org.bankexfoundation.LockQueue")
var expectedOperations = 0
var earlyReturn = false
let joiningCallback = { (res: Result<AnyObject, Web3Error>) -> () in
switch res {
case .success(let result):
lockQueue.sync() {
expectedOperations = expectedOperations - 1
guard let ev = result as? [EventParserResultProtocol] else {
if (!earlyReturn) {
earlyReturn = true
return self.processError(Web3Error.dataError)
} else {
return
}
}
resultsArray.append(contentsOf: ev)
guard let currentQueue = OperationQueue.current else {
if (!earlyReturn) {
earlyReturn = true
return self.processError(Web3Error.dataError)
} else {
return
}
}

if expectedOperations == 0 {
if (!earlyReturn) {
earlyReturn = true
currentQueue.underlyingQueue?.async(execute: {
let allEvents = resultsArray.flatMap({ (ev) -> EventParserResultProtocol in
return ev
})
self.processSuccess(allEvents as AnyObject)
})
} else {
return
}
}
}
case .failure(let error):
lockQueue.sync() {
if (!earlyReturn) {
earlyReturn = true
return self.processError(error)
} else {
return
}
}
}
}
let resultsArray = [EventParserResultProtocol]()

let blockCallback = { (res: Result<AnyObject, Web3Error>) -> () in
switch res {
Expand Down Expand Up @@ -120,11 +69,13 @@ final class ParseBlockForEventsOperation: Web3Operation {
self.processError(Web3Error.dataError)
return
}
parseOperation.next = OperationChainingType.callback(joiningCallback, self.expectedQueue)
allOps.append(parseOperation)
}
expectedOperations = allOps.count
self.expectedQueue.addOperations(allOps, waitUntilFinished: true)
let joinOperation = JoinOperation(self.web3, queue: self.expectedQueue, operations: allOps)
let conversionOp = ConversionOperation<[EventParserResultProtocol]>(self.web3, queue: self.expectedQueue)
joinOperation.next = OperationChainingType.operation(conversionOp)
conversionOp.next = completion
self.expectedQueue.addOperation(joinOperation)
case .failure(let error):
return self.processError(error)
}
Expand All @@ -134,6 +85,7 @@ final class ParseBlockForEventsOperation: Web3Operation {
}
}


final class ParseTransactionForEventsOperation: Web3Operation {

convenience init?(_ web3Instance: web3, queue: OperationQueue? = nil, contract: ContractProtocol, eventName: String, filter: EventFilter? = nil, transactionHash: Data) {
Expand Down
2 changes: 1 addition & 1 deletion web3swift/Web3/Classes/Web3+Instance.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class web3: Web3OptionsInheritable {
provider = prov
if queue == nil {
self.queue = OperationQueue.init()
self.queue.maxConcurrentOperationCount = 16
self.queue.maxConcurrentOperationCount = 32
self.queue.underlyingQueue = DispatchQueue.global(qos: .userInteractive)

} else {
Expand Down
27 changes: 27 additions & 0 deletions web3swiftTests/web3swiftTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1576,6 +1576,33 @@ class web3swiftTests: XCTestCase {
XCTAssert(!fail)
}

func testConcurrency13()
{
let semaphore = DispatchSemaphore(value: 0)
var fail = true;
let web3 = Web3.InfuraMainnetWeb3()
let contract = web3.contract(Web3.Utils.erc20ABI, at: nil, abiVersion: 2)
guard let operation = ParseBlockForEventsOperation.init(web3, queue: web3.queue, contract: contract!.contract, eventName: "Transfer", filter: nil, block: "latest") else {return XCTFail()}
let callback = { (res: Result<AnyObject, Web3Error>) -> () in
switch res {
case .success(let result):
print(result)
fail = false
case .failure(let error):
print(error)
XCTFail()
fatalError()
}
semaphore.signal()
}
operation.next = OperationChainingType.callback(callback, web3.queue)
web3.queue.addOperation(operation)


let _ = semaphore.wait(timeout: .distantFuture)
XCTAssert(!fail)
}

func testPerformanceExample() {
// This is an example of a performance test case.
self.measure {
Expand Down