From 00a96c9d15fe9e1c27d702f2ffaff37e161740e6 Mon Sep 17 00:00:00 2001 From: Charles Hu Date: Fri, 16 May 2025 19:47:06 -0700 Subject: [PATCH 1/2] Create platform specific AsyncIO - Darwin: based on DispatchIO - Linux: based on epoll - Windows (not included in this commit): based on IOCP with OVERLAPPED --- Sources/Subprocess/AsyncBufferSequence.swift | 37 +- Sources/Subprocess/Buffer.swift | 47 +- Sources/Subprocess/CMakeLists.txt | 1 + Sources/Subprocess/Configuration.swift | 2 - Sources/Subprocess/Error.swift | 17 +- Sources/Subprocess/IO/AsyncIO.swift | 808 ++++++++++++++++++ Sources/Subprocess/IO/Input.swift | 4 +- Sources/Subprocess/IO/Output.swift | 40 +- .../Platforms/Subprocess+Darwin.swift | 26 + .../Platforms/Subprocess+Linux.swift | 14 + .../Platforms/Subprocess+Unix.swift | 141 --- .../Platforms/Subprocess+Windows.swift | 165 +--- .../Input+Foundation.swift | 50 +- .../_SubprocessCShims/include/process_shims.h | 5 + .../SubprocessTests+Unix.swift | 7 +- 15 files changed, 956 insertions(+), 408 deletions(-) create mode 100644 Sources/Subprocess/IO/AsyncIO.swift diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index 39fb38b..147e09a 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -23,10 +23,10 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { public typealias Failure = any Swift.Error public typealias Element = Buffer - #if os(Windows) - internal typealias DiskIO = FileDescriptor - #else + #if canImport(Darwin) internal typealias DiskIO = DispatchIO + #else + internal typealias DiskIO = FileDescriptor #endif @_nonSendable @@ -47,15 +47,16 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { return self.buffer.removeFirst() } // Read more data - let data = try await self.diskIO.read( - upToLength: readBufferSize + let data = try await AsyncIO.shared.read( + from: self.diskIO, + upTo: readBufferSize ) guard let data else { // We finished reading. Close the file descriptor now - #if os(Windows) - try self.diskIO.close() - #else + #if canImport(Darwin) self.diskIO.close() + #else + try self.diskIO.close() #endif return nil } @@ -132,17 +133,7 @@ extension AsyncBufferSequence { self.eofReached = true return nil } - #if os(Windows) - // Cast data to CodeUnit type - let result = buffer.withUnsafeBytes { ptr in - return Array( - UnsafeBufferPointer( - start: ptr.bindMemory(to: Encoding.CodeUnit.self).baseAddress!, - count: ptr.count / MemoryLayout.size - ) - ) - } - #else + #if canImport(Darwin) // Unfortunately here we _have to_ copy the bytes out because // DispatchIO (rightfully) reuses buffer, which means `buffer.data` // has the same address on all iterations, therefore we can't directly @@ -157,7 +148,13 @@ extension AsyncBufferSequence { UnsafeBufferPointer(start: ptr.baseAddress?.assumingMemoryBound(to: Encoding.CodeUnit.self), count: elementCount) ) } - + #else + // Cast data to CodeUnitg type + let result = buffer.withUnsafeBytes { ptr in + return ptr.withMemoryRebound(to: Encoding.CodeUnit.self) { codeUnitPtr in + return Array(codeUnitPtr) + } + } #endif return result.isEmpty ? nil : result } diff --git a/Sources/Subprocess/Buffer.swift b/Sources/Subprocess/Buffer.swift index 94b8f52..292fac4 100644 --- a/Sources/Subprocess/Buffer.swift +++ b/Sources/Subprocess/Buffer.swift @@ -17,18 +17,8 @@ extension AsyncBufferSequence { /// A immutable collection of bytes public struct Buffer: Sendable { - #if os(Windows) - internal let data: [UInt8] - - internal init(data: [UInt8]) { - self.data = data - } - - internal static func createFrom(_ data: [UInt8]) -> [Buffer] { - return [.init(data: data)] - } - #else - // We need to keep the backingData alive while _ContiguousBufferView is alive + #if canImport(Darwin) + // We need to keep the backingData alive while Slice is alive internal let backingData: DispatchData internal let data: DispatchData._ContiguousBufferView @@ -45,7 +35,17 @@ extension AsyncBufferSequence { } return slices.map{ .init(data: $0, backingData: data) } } - #endif + #else + internal let data: [UInt8] + + internal init(data: [UInt8]) { + self.data = data + } + + internal static func createFrom(_ data: [UInt8]) -> [Buffer] { + return [.init(data: data)] + } + #endif // canImport(Darwin) } } @@ -92,26 +92,23 @@ extension AsyncBufferSequence.Buffer { // MARK: - Hashable, Equatable extension AsyncBufferSequence.Buffer: Equatable, Hashable { - #if os(Windows) - // Compiler generated conformances - #else + #if canImport(Darwin) public static func == (lhs: AsyncBufferSequence.Buffer, rhs: AsyncBufferSequence.Buffer) -> Bool { - return lhs.data.elementsEqual(rhs.data) + return lhs.data == rhs.data } public func hash(into hasher: inout Hasher) { - self.data.withUnsafeBytes { ptr in - hasher.combine(bytes: ptr) - } + hasher.combine(self.data) } #endif + // else Compiler generated conformances } // MARK: - DispatchData.Block #if canImport(Darwin) || canImport(Glibc) || canImport(Android) || canImport(Musl) extension DispatchData { /// Unfortunately `DispatchData.Region` is not available on Linux, hence our own wrapper - internal struct _ContiguousBufferView: @unchecked Sendable, RandomAccessCollection { + internal struct _ContiguousBufferView: @unchecked Sendable, RandomAccessCollection, Hashable { typealias Element = UInt8 internal let bytes: UnsafeBufferPointer @@ -127,6 +124,14 @@ extension DispatchData { return try body(UnsafeRawBufferPointer(self.bytes)) } + internal func hash(into hasher: inout Hasher) { + hasher.combine(bytes: UnsafeRawBufferPointer(self.bytes)) + } + + internal static func == (lhs: DispatchData._ContiguousBufferView, rhs: DispatchData._ContiguousBufferView) -> Bool { + return lhs.bytes.elementsEqual(rhs.bytes) + } + subscript(position: Int) -> UInt8 { _read { yield self.bytes[position] diff --git a/Sources/Subprocess/CMakeLists.txt b/Sources/Subprocess/CMakeLists.txt index ce78541..58bf205 100644 --- a/Sources/Subprocess/CMakeLists.txt +++ b/Sources/Subprocess/CMakeLists.txt @@ -17,6 +17,7 @@ target_sources(Subprocess PRIVATE Result.swift IO/Output.swift IO/Input.swift + IO/AsyncIO.swift Span+Subprocess.swift AsyncBufferSequence.swift API.swift diff --git a/Sources/Subprocess/Configuration.swift b/Sources/Subprocess/Configuration.swift index 5396506..cf91529 100644 --- a/Sources/Subprocess/Configuration.swift +++ b/Sources/Subprocess/Configuration.swift @@ -595,7 +595,6 @@ internal struct TrackedFileDescriptor: ~Copyable { self.closeWhenDone = closeWhenDone } - #if os(Windows) consuming func consumeDiskIO() -> FileDescriptor { let result = self.fileDescriptor // Transfer the ownership out and therefor @@ -603,7 +602,6 @@ internal struct TrackedFileDescriptor: ~Copyable { self.closeWhenDone = false return result } - #endif internal mutating func safelyClose() throws { guard self.closeWhenDone else { diff --git a/Sources/Subprocess/Error.swift b/Sources/Subprocess/Error.swift index dde4468..b7a6ca5 100644 --- a/Sources/Subprocess/Error.swift +++ b/Sources/Subprocess/Error.swift @@ -41,6 +41,7 @@ extension SubprocessError { case failedToWriteToSubprocess case failedToMonitorProcess case streamOutputExceedsLimit(Int) + case asyncIOFailed(String) // Signal case failedToSendSignal(Int32) // Windows Only @@ -67,18 +68,20 @@ extension SubprocessError { return 5 case .streamOutputExceedsLimit(_): return 6 - case .failedToSendSignal(_): + case .asyncIOFailed(_): return 7 - case .failedToTerminate: + case .failedToSendSignal(_): return 8 - case .failedToSuspend: + case .failedToTerminate: return 9 - case .failedToResume: + case .failedToSuspend: return 10 - case .failedToCreatePipe: + case .failedToResume: return 11 - case .invalidWindowsPath(_): + case .failedToCreatePipe: return 12 + case .invalidWindowsPath(_): + return 13 } } @@ -108,6 +111,8 @@ extension SubprocessError: CustomStringConvertible, CustomDebugStringConvertible return "Failed to monitor the state of child process with underlying error: \(self.underlyingError!)" case .streamOutputExceedsLimit(let limit): return "Failed to create output from current buffer because the output limit (\(limit)) was reached." + case .asyncIOFailed(let reason): + return "An error occurred within the AsyncIO subsystem: \(reason). Underlying error: \(self.underlyingError!)" case .failedToSendSignal(let signal): return "Failed to send signal \(signal) to the child process." case .failedToTerminate: diff --git a/Sources/Subprocess/IO/AsyncIO.swift b/Sources/Subprocess/IO/AsyncIO.swift new file mode 100644 index 0000000..3f075e5 --- /dev/null +++ b/Sources/Subprocess/IO/AsyncIO.swift @@ -0,0 +1,808 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift.org open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if canImport(System) +@preconcurrency import System +#else +@preconcurrency import SystemPackage +#endif + +/// Platform specific asynchronous read/write implementation + +// MARK: - Linux (epoll) +#if canImport(Glibc) || canImport(Android) || canImport(Musl) + +#if canImport(Glibc) +import Glibc +#elseif canImport(Android) +import Android +#elseif canImport(Musl) +import Musl +#endif + +import _SubprocessCShims +import Synchronization + +private typealias SignalStream = AsyncThrowingStream +private let _epollEventSize = 256 +private let _registration: Mutex< + [PlatformFileDescriptor : SignalStream.Continuation] +> = Mutex([:]) + +final class AsyncIO: Sendable { + + typealias OutputStream = AsyncThrowingStream + + private final class MonitorThreadContext { + let epollFileDescriptor: CInt + let shutdownFileDescriptor: CInt + + init( + epollFileDescriptor: CInt, + shutdownFileDescriptor: CInt + ) { + self.epollFileDescriptor = epollFileDescriptor + self.shutdownFileDescriptor = shutdownFileDescriptor + } + } + + private enum Event { + case read + case write + } + + private struct State { + let epollFileDescriptor: CInt + let shutdownFileDescriptor: CInt + let monitorThread: pthread_t + } + + static let shared: AsyncIO = AsyncIO() + + private let state: Result + + private init() { + // Create main epoll fd + let epollFileDescriptor = epoll_create1(CInt(EPOLL_CLOEXEC)) + guard epollFileDescriptor >= 0 else { + let error = SubprocessError( + code: .init(.asyncIOFailed("epoll_create1 failed")), + underlyingError: .init(rawValue: errno) + ) + self.state = .failure(error) + return + } + // Create shutdownFileDescriptor + let shutdownFileDescriptor = eventfd(0, CInt(EFD_NONBLOCK | EFD_CLOEXEC)) + guard shutdownFileDescriptor >= 0 else { + let error = SubprocessError( + code: .init(.asyncIOFailed("eventfd failed")), + underlyingError: .init(rawValue: errno) + ) + self.state = .failure(error) + return + } + + // Register shutdownFileDescriptor with epoll + var event = epoll_event( + events: EPOLLIN.rawValue, + data: epoll_data(fd: shutdownFileDescriptor) + ) + var rc = epoll_ctl( + epollFileDescriptor, + EPOLL_CTL_ADD, + shutdownFileDescriptor, + &event + ) + guard rc == 0 else { + let error = SubprocessError( + code: .init(.asyncIOFailed( + "failed to add shutdown fd \(shutdownFileDescriptor) to epoll list") + ), + underlyingError: .init(rawValue: errno) + ) + self.state = .failure(error) + return + } + + // Create thread data + let context = MonitorThreadContext( + epollFileDescriptor: epollFileDescriptor, + shutdownFileDescriptor: shutdownFileDescriptor + ) + let threadContext = Unmanaged.passRetained(context) + #if os(FreeBSD) || os(OpenBSD) + var thread: pthread_t? = nil + #else + var thread: pthread_t = pthread_t() + #endif + rc = pthread_create(&thread, nil, { args in + func reportError(_ error: SubprocessError) { + _registration.withLock { store in + for continuation in store.values { + continuation.finish(throwing: error) + } + } + } + + let unmanaged = Unmanaged.fromOpaque(args!) + let context = unmanaged.takeRetainedValue() + + var events: [epoll_event] = Array( + repeating: epoll_event(events: 0, data: epoll_data(fd: 0)), + count: _epollEventSize + ) + + // Enter the monitor loop + monitorLoop: while true { + let eventCount = epoll_wait( + context.epollFileDescriptor, + &events, + CInt(events.count), + -1 + ) + if eventCount < 0 { + if errno == EINTR || errno == EAGAIN { + continue // interrupted by signal; try again + } + // Report other errors + let error = SubprocessError( + code: .init(.asyncIOFailed( + "epoll_wait failed") + ), + underlyingError: .init(rawValue: errno) + ) + reportError(error) + break monitorLoop + } + + for index in 0 ..< Int(eventCount) { + let event = events[index] + let targetFileDescriptor = event.data.fd + // Breakout the monitor loop if we received shutdown + // from the shutdownFD + if targetFileDescriptor == context.shutdownFileDescriptor { + var buf: UInt64 = 0 + _ = _SubprocessCShims.read(context.shutdownFileDescriptor, &buf, MemoryLayout.size) + break monitorLoop + } + + // Notify the continuation + _registration.withLock { store in + if let continuation = store[targetFileDescriptor] { + continuation.yield(true) + } + } + } + } + + return nil + }, threadContext.toOpaque()) + guard rc == 0 else { + let error = SubprocessError( + code: .init(.asyncIOFailed("Failed to create monitor thread")), + underlyingError: .init(rawValue: rc) + ) + self.state = .failure(error) + return + } + + #if os(FreeBSD) || os(OpenBSD) + let monitorThread = thread! + #else + let monitorThread = thread + #endif + + let state = State( + epollFileDescriptor: epollFileDescriptor, + shutdownFileDescriptor: shutdownFileDescriptor, + monitorThread: monitorThread + ) + self.state = .success(state) + + atexit { + AsyncIO.shared.shutdown() + } + } + + private func shutdown() { + guard case .success(let currentState) = self.state else { + return + } + + var one: UInt64 = 1 + // Wake up the thread for shutdown + _ = _SubprocessCShims.write(currentState.shutdownFileDescriptor, &one, MemoryLayout.stride) + // Cleanup the monitor thread + pthread_join(currentState.monitorThread, nil) + } + + + private func registerFileDescriptor( + _ fileDescriptor: FileDescriptor, + for event: Event + ) -> SignalStream { + return SignalStream { continuation in + // If setup failed, nothing much we can do + switch self.state { + case .success(let state): + // Set file descriptor to be non blocking + let flags = fcntl(fileDescriptor.rawValue, F_GETFD) + guard flags != -1 else { + let error = SubprocessError( + code: .init(.asyncIOFailed( + "failed to get flags for \(fileDescriptor.rawValue)") + ), + underlyingError: .init(rawValue: errno) + ) + continuation.finish(throwing: error) + return + } + guard fcntl(fileDescriptor.rawValue, F_SETFL, flags | O_NONBLOCK) != -1 else { + let error = SubprocessError( + code: .init(.asyncIOFailed( + "failed to set \(fileDescriptor.rawValue) to be non-blocking") + ), + underlyingError: .init(rawValue: errno) + ) + continuation.finish(throwing: error) + return + } + // Register event + let targetEvent: EPOLL_EVENTS + switch event { + case .read: + targetEvent = EPOLLIN + case .write: + targetEvent = EPOLLOUT + } + + var event = epoll_event( + events: targetEvent.rawValue, + data: epoll_data(fd: fileDescriptor.rawValue) + ) + let rc = epoll_ctl( + state.epollFileDescriptor, + EPOLL_CTL_ADD, + fileDescriptor.rawValue, + &event + ) + if rc != 0 { + let error = SubprocessError( + code: .init(.asyncIOFailed( + "failed to add \(fileDescriptor.rawValue) to epoll list") + ), + underlyingError: .init(rawValue: errno) + ) + continuation.finish(throwing: error) + return + } + // Now save the continuation + _registration.withLock { storage in + storage[fileDescriptor.rawValue] = continuation + } + case .failure(let setupError): + continuation.finish(throwing: setupError) + return + } + } + } + + private func removeRegistration(for fileDescriptor: FileDescriptor) throws { + switch self.state { + case .success(let state): + let rc = epoll_ctl( + state.epollFileDescriptor, + EPOLL_CTL_DEL, + fileDescriptor.rawValue, + nil + ) + guard rc == 0 else { + throw SubprocessError( + code: .init(.asyncIOFailed( + "failed to remove \(fileDescriptor.rawValue) to epoll list") + ), + underlyingError: .init(rawValue: errno) + ) + } + _registration.withLock { store in + _ = store.removeValue(forKey: fileDescriptor.rawValue) + } + case .failure(let setupFailure): + throw setupFailure + } + } +} + +extension AsyncIO { + + protocol _ContiguousBytes { + var count: Int { get } + + func withUnsafeBytes( + _ body: (UnsafeRawBufferPointer + ) throws -> ResultType) rethrows -> ResultType + } + + func read( + from diskIO: borrowing TrackedPlatformDiskIO, + upTo maxLength: Int + ) async throws -> [UInt8]? { + return try await self.read(from: diskIO.fileDescriptor, upTo: maxLength) + } + + func read( + from fileDescriptor: FileDescriptor, + upTo maxLength: Int + ) async throws -> [UInt8]? { + // If we are reading until EOF, start with readBufferSize + // and gradually increase buffer size + let bufferLength = maxLength == .max ? readBufferSize : maxLength + + var resultBuffer: [UInt8] = Array( + repeating: 0, count: bufferLength + ) + var readLength: Int = 0 + let signalStream = self.registerFileDescriptor(fileDescriptor, for: .read) + for try await _ in signalStream { + // Every iteration signals we are ready to read more data + while true { + let bytesRead = resultBuffer.withUnsafeMutableBufferPointer { bufferPointer in + // Get a pointer to the memory at the specified offset + let targetCount = bufferPointer.count - readLength + + let offsetAddress = bufferPointer.baseAddress!.advanced(by: readLength) + + // Read directly into the buffer at the offset + return _SubprocessCShims.read(fileDescriptor.rawValue, offsetAddress, targetCount) + } + if bytesRead > 0 { + // Read some data + readLength += bytesRead + if maxLength == .max { + // Grow resultBuffer if needed + guard Double(readLength) > 0.8 * Double(resultBuffer.count) else { + continue + } + resultBuffer.append( + contentsOf: Array(repeating: 0, count: resultBuffer.count) + ) + } else if readLength >= maxLength { + // When we reached maxLength, return! + try self.removeRegistration(for: fileDescriptor) + return resultBuffer + } + } else if bytesRead == 0 { + // We reached EOF. Return whatever's left + try self.removeRegistration(for: fileDescriptor) + guard readLength > 0 else { + return nil + } + resultBuffer.removeLast(resultBuffer.count - readLength) + return resultBuffer + } else { + if errno == EAGAIN || errno == EWOULDBLOCK { + // No more data for now wait for the next signal + break + } else { + // Throw all other errors + try self.removeRegistration(for: fileDescriptor) + throw SubprocessError.UnderlyingError(rawValue: errno) + } + } + } + } + return resultBuffer + } + + func write( + _ array: [UInt8], + to diskIO: borrowing TrackedPlatformDiskIO + ) async throws -> Int { + return try await self._write(array, to: diskIO) + } + + func _write( + _ bytes: Bytes, + to diskIO: borrowing TrackedPlatformDiskIO + ) async throws -> Int { + let fileDescriptor = diskIO.fileDescriptor + let signalStream = self.registerFileDescriptor(fileDescriptor, for: .write) + var writtenLength: Int = 0 + for try await _ in signalStream { + while true { + let written = bytes.withUnsafeBytes { ptr in + let remainingLength = ptr.count - writtenLength + let startPtr = ptr.baseAddress!.advanced(by: writtenLength) + return _SubprocessCShims.write(fileDescriptor.rawValue, startPtr, remainingLength) + } + if written > 0 { + writtenLength += written + if writtenLength >= bytes.count { + // Wrote all data + try self.removeRegistration(for: fileDescriptor) + return writtenLength + } + } else { + if errno == EAGAIN || errno == EWOULDBLOCK { + // No more data for now wait for the next signal + break + } else { + // Throw all other errors + try self.removeRegistration(for: fileDescriptor) + throw SubprocessError.UnderlyingError(rawValue: errno) + } + } + } + } + return 0 + } + + #if SubprocessSpan + func write( + _ span: borrowing RawSpan, + to diskIO: borrowing TrackedPlatformDiskIO + ) async throws -> Int { + let fileDescriptor = diskIO.fileDescriptor + let signalStream = self.registerFileDescriptor(fileDescriptor, for: .write) + var writtenLength: Int = 0 + for try await _ in signalStream { + while true { + let written = span.withUnsafeBytes { ptr in + let remainingLength = ptr.count - writtenLength + let startPtr = ptr.baseAddress!.advanced(by: writtenLength) + return _SubprocessCShims.write(fileDescriptor.rawValue, startPtr, remainingLength) + } + if written > 0 { + writtenLength += written + if writtenLength >= span.byteCount { + // Wrote all data + try self.removeRegistration(for: fileDescriptor) + return writtenLength + } + } else { + if errno == EAGAIN || errno == EWOULDBLOCK { + // No more data for now wait for the next signal + break + } else { + // Throw all other errors + try self.removeRegistration(for: fileDescriptor) + throw SubprocessError.UnderlyingError(rawValue: errno) + } + } + } + } + return 0 + } + #endif +} + +extension Array : AsyncIO._ContiguousBytes where Element == UInt8 {} + +#endif // canImport(Glibc) || canImport(Android) || canImport(Musl) + +// MARK: - macOS (DispatchIO) +#if canImport(Darwin) + +internal import Dispatch + + +final class AsyncIO: Sendable { + static let shared: AsyncIO = AsyncIO() + + private init() {} + + internal func read( + from diskIO: borrowing TrackedPlatformDiskIO, + upTo maxLength: Int + ) async throws -> DispatchData? { + return try await self.read( + from: diskIO.dispatchIO, + upTo: maxLength, + ) + } + + internal func read( + from dispatchIO: DispatchIO, + upTo maxLength: Int + ) async throws -> DispatchData? { + return try await withCheckedThrowingContinuation { continuation in + var buffer: DispatchData = .empty + dispatchIO.read( + offset: 0, + length: maxLength, + queue: .global() + ) { done, data, error in + if error != 0 { + continuation.resume( + throwing: SubprocessError( + code: .init(.failedToReadFromSubprocess), + underlyingError: .init(rawValue: error) + ) + ) + return + } + if let data = data { + if buffer.isEmpty { + buffer = data + } else { + buffer.append(data) + } + } + if done { + if !buffer.isEmpty { + continuation.resume(returning: buffer) + } else { + continuation.resume(returning: nil) + } + } + } + } + } + + #if SubprocessSpan + internal func write( + _ span: borrowing RawSpan, + to diskIO: borrowing TrackedPlatformDiskIO + ) async throws -> Int { + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + let dispatchData = span.withUnsafeBytes { + return DispatchData( + bytesNoCopy: $0, + deallocator: .custom( + nil, + { + // noop + } + ) + ) + } + self.write(dispatchData, to: diskIO) { writtenLength, error in + if let error = error { + continuation.resume(throwing: error) + } else { + continuation.resume(returning: writtenLength) + } + } + } + } + #endif // SubprocessSpan + + internal func write( + _ array: [UInt8], + to diskIO: borrowing TrackedPlatformDiskIO + ) async throws -> Int { + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + let dispatchData = array.withUnsafeBytes { + return DispatchData( + bytesNoCopy: $0, + deallocator: .custom( + nil, + { + // noop + } + ) + ) + } + self.write(dispatchData, to: diskIO) { writtenLength, error in + if let error = error { + continuation.resume(throwing: error) + } else { + continuation.resume(returning: writtenLength) + } + } + } + } + + internal func write( + _ dispatchData: DispatchData, + to diskIO: borrowing TrackedPlatformDiskIO, + queue: DispatchQueue = .global(), + completion: @escaping (Int, Error?) -> Void + ) { + diskIO.dispatchIO.write( + offset: 0, + data: dispatchData, + queue: queue + ) { done, unwritten, error in + guard done else { + // Wait until we are done writing or encountered some error + return + } + + let unwrittenLength = unwritten?.count ?? 0 + let writtenLength = dispatchData.count - unwrittenLength + guard error != 0 else { + completion(writtenLength, nil) + return + } + completion( + writtenLength, + SubprocessError( + code: .init(.failedToWriteToSubprocess), + underlyingError: .init(rawValue: error) + ) + ) + } + } +} + +#endif + +// MARK: - Windows (I/O Completion Ports) TODO +#if os(Windows) + +internal import Dispatch +import WinSDK + +final class AsyncIO: Sendable { + + protocol _ContiguousBytes: Sendable { + var count: Int { get } + + func withUnsafeBytes( + _ body: (UnsafeRawBufferPointer + ) throws -> ResultType) rethrows -> ResultType + } + + static let shared = AsyncIO() + + private init() {} + + func read( + from diskIO: borrowing TrackedPlatformDiskIO, + upTo maxLength: Int + ) async throws -> [UInt8]? { + return try await self.read(from: diskIO.fileDescriptor, upTo: maxLength) + } + + func read( + from fileDescriptor: FileDescriptor, + upTo maxLength: Int + ) async throws -> [UInt8]? { + return try await withCheckedThrowingContinuation { continuation in + DispatchQueue.global(qos: .userInitiated).async { + var totalBytesRead: Int = 0 + var lastError: DWORD? = nil + let values = [UInt8]( + unsafeUninitializedCapacity: maxLength + ) { buffer, initializedCount in + while true { + guard let baseAddress = buffer.baseAddress else { + initializedCount = 0 + break + } + let bufferPtr = baseAddress.advanced(by: totalBytesRead) + var bytesRead: DWORD = 0 + let readSucceed = ReadFile( + fileDescriptor.platformDescriptor, + UnsafeMutableRawPointer(mutating: bufferPtr), + DWORD(maxLength - totalBytesRead), + &bytesRead, + nil + ) + if !readSucceed { + // Windows throws ERROR_BROKEN_PIPE when the pipe is closed + let error = GetLastError() + if error == ERROR_BROKEN_PIPE { + // We are done reading + initializedCount = totalBytesRead + } else { + // We got some error + lastError = error + initializedCount = 0 + } + break + } else { + // We successfully read the current round + totalBytesRead += Int(bytesRead) + } + + if totalBytesRead >= maxLength { + initializedCount = min(maxLength, totalBytesRead) + break + } + } + } + if let lastError = lastError { + let windowsError = SubprocessError( + code: .init(.failedToReadFromSubprocess), + underlyingError: .init(rawValue: lastError) + ) + continuation.resume(throwing: windowsError) + } else { + continuation.resume(returning: values) + } + } + } + } + + func write( + _ array: [UInt8], + to diskIO: borrowing TrackedPlatformDiskIO + ) async throws -> Int { + return try await self._write(array, to: diskIO) + } + + #if SubprocessSpan + func write( + _ span: borrowing RawSpan, + to diskIO: borrowing TrackedPlatformDiskIO + ) async throws -> Int { + // TODO: Remove this hack with I/O Completion Ports rewrite + struct _Box: @unchecked Sendable { + let ptr: UnsafeRawBufferPointer + } + let fileDescriptor = diskIO.fileDescriptor + return try await withCheckedThrowingContinuation { continuation in + span.withUnsafeBytes { ptr in + let box = _Box(ptr: ptr) + DispatchQueue.global().async { + let handle = HANDLE(bitPattern: _get_osfhandle(fileDescriptor.rawValue))! + var writtenBytes: DWORD = 0 + let writeSucceed = WriteFile( + handle, + box.ptr.baseAddress, + DWORD(box.ptr.count), + &writtenBytes, + nil + ) + if !writeSucceed { + let error = SubprocessError( + code: .init(.failedToWriteToSubprocess), + underlyingError: .init(rawValue: GetLastError()) + ) + continuation.resume(throwing: error) + } else { + continuation.resume(returning: Int(writtenBytes)) + } + } + } + } + } + #endif // SubprocessSpan + + func _write( + _ bytes: Bytes, + to diskIO: borrowing TrackedPlatformDiskIO + ) async throws -> Int { + let fileDescriptor = diskIO.fileDescriptor + return try await withCheckedThrowingContinuation { continuation in + DispatchQueue.global().async { + let handle = HANDLE(bitPattern: _get_osfhandle(fileDescriptor.rawValue))! + var writtenBytes: DWORD = 0 + let writeSucceed = bytes.withUnsafeBytes { ptr in + return WriteFile( + handle, + ptr.baseAddress, + DWORD(ptr.count), + &writtenBytes, + nil + ) + } + if !writeSucceed { + let error = SubprocessError( + code: .init(.failedToWriteToSubprocess), + underlyingError: .init(rawValue: GetLastError()) + ) + continuation.resume(throwing: error) + } else { + continuation.resume(returning: Int(writtenBytes)) + } + } + } + } +} + +extension Array : AsyncIO._ContiguousBytes where Element == UInt8 {} + +#endif + diff --git a/Sources/Subprocess/IO/Input.swift b/Sources/Subprocess/IO/Input.swift index 58bfe4d..3f473e4 100644 --- a/Sources/Subprocess/IO/Input.swift +++ b/Sources/Subprocess/IO/Input.swift @@ -224,7 +224,7 @@ public final actor StandardInputWriter: Sendable { public func write( _ array: [UInt8] ) async throws -> Int { - return try await self.diskIO.write(array) + return try await AsyncIO.shared.write(array, to: self.diskIO) } /// Write a `RawSpan` to the standard input of the subprocess. @@ -232,7 +232,7 @@ public final actor StandardInputWriter: Sendable { /// - Returns number of bytes written #if SubprocessSpan public func write(_ span: borrowing RawSpan) async throws -> Int { - return try await self.diskIO.write(span) + return try await AsyncIO.shared.write(span, to: self.diskIO) } #endif diff --git a/Sources/Subprocess/IO/Output.swift b/Sources/Subprocess/IO/Output.swift index 454eca4..563a73a 100644 --- a/Sources/Subprocess/IO/Output.swift +++ b/Sources/Subprocess/IO/Output.swift @@ -142,14 +142,12 @@ public struct BytesOutput: OutputProtocol { internal func captureOutput( from diskIO: consuming TrackedPlatformDiskIO? ) async throws -> [UInt8] { - #if os(Windows) - let result = try await diskIO?.fileDescriptor.read(upToLength: self.maxSize) ?? [] - try diskIO?.safelyClose() - return result - #else - let result = try await diskIO!.dispatchIO.read(upToLength: self.maxSize) + let result = try await AsyncIO.shared.read(from: diskIO!, upTo: self.maxSize) try diskIO?.safelyClose() + #if canImport(Darwin) return result?.array() ?? [] + #else + return result ?? [] #endif } @@ -264,14 +262,14 @@ extension OutputProtocol { if OutputType.self == Void.self { return () as! OutputType } - #if os(Windows) - let result = try await diskIO?.fileDescriptor.read(upToLength: self.maxSize) - try diskIO?.safelyClose() - return try self.output(from: result ?? []) - #else - let result = try await diskIO!.dispatchIO.read(upToLength: self.maxSize) + // Force unwrap is safe here because only `OutputType.self == Void` would + // have nil `TrackedPlatformDiskIO` + let result = try await AsyncIO.shared.read(from: diskIO!, upTo: self.maxSize) try diskIO?.safelyClose() + #if canImport(Darwin) return try self.output(from: result ?? .empty) + #else + return try self.output(from: result ?? []) #endif } } @@ -293,34 +291,34 @@ extension OutputProtocol where OutputType == Void { #if SubprocessSpan extension OutputProtocol { - #if os(Windows) - internal func output(from data: [UInt8]) throws -> OutputType { + #if canImport(Darwin) + internal func output(from data: DispatchData) throws -> OutputType { guard !data.isEmpty else { let empty = UnsafeRawBufferPointer(start: nil, count: 0) let span = RawSpan(_unsafeBytes: empty) return try self.output(from: span) } - return try data.withUnsafeBufferPointer { ptr in - let span = RawSpan(_unsafeBytes: UnsafeRawBufferPointer(ptr)) + return try data.withUnsafeBytes { ptr in + let bufferPtr = UnsafeRawBufferPointer(start: ptr, count: data.count) + let span = RawSpan(_unsafeBytes: bufferPtr) return try self.output(from: span) } } #else - internal func output(from data: DispatchData) throws -> OutputType { + internal func output(from data: [UInt8]) throws -> OutputType { guard !data.isEmpty else { let empty = UnsafeRawBufferPointer(start: nil, count: 0) let span = RawSpan(_unsafeBytes: empty) return try self.output(from: span) } - return try data.withUnsafeBytes { ptr in - let bufferPtr = UnsafeRawBufferPointer(start: ptr, count: data.count) - let span = RawSpan(_unsafeBytes: bufferPtr) + return try data.withUnsafeBufferPointer { ptr in + let span = RawSpan(_unsafeBytes: UnsafeRawBufferPointer(ptr)) return try self.output(from: span) } } - #endif // os(Windows) + #endif // canImport(Darwin) } #endif diff --git a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift index 7cd32c7..02640ab 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift @@ -524,4 +524,30 @@ internal func monitorProcessTermination( } } +internal typealias TrackedPlatformDiskIO = TrackedDispatchIO + +extension TrackedFileDescriptor { + internal consuming func createPlatformDiskIO() -> TrackedPlatformDiskIO { + // Transferring out the ownership of fileDescriptor means we don't have go close here + let shouldClose = self.closeWhenDone + let closeFd = self.fileDescriptor + let dispatchIO: DispatchIO = DispatchIO( + type: .stream, + fileDescriptor: self.platformDescriptor(), + queue: .global(), + cleanupHandler: { error in + // Close the file descriptor + if shouldClose { + try? closeFd.close() + } + } + ) + let result: TrackedPlatformDiskIO = .init( + dispatchIO, closeWhenDone: self.closeWhenDone + ) + self.closeWhenDone = false + return result + } +} + #endif // canImport(Darwin) diff --git a/Sources/Subprocess/Platforms/Subprocess+Linux.swift b/Sources/Subprocess/Platforms/Subprocess+Linux.swift index bf1e5b8..47fbe96 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Linux.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Linux.swift @@ -423,4 +423,18 @@ private func _setupMonitorSignalHandler() { setup } +internal typealias TrackedPlatformDiskIO = TrackedFileDescriptor + +extension TrackedFileDescriptor { + internal consuming func createPlatformDiskIO() -> TrackedPlatformDiskIO { + // Transferring out the ownership of fileDescriptor means we don't have go close here + let result: TrackedPlatformDiskIO = .init( + self.fileDescriptor, + closeWhenDone: self.closeWhenDone + ) + self.closeWhenDone = false + return result + } +} + #endif // canImport(Glibc) || canImport(Android) || canImport(Musl) diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index bb5da2f..122e5c5 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -375,146 +375,5 @@ extension FileDescriptor { } internal typealias PlatformFileDescriptor = CInt -internal typealias TrackedPlatformDiskIO = TrackedDispatchIO - -extension TrackedFileDescriptor { - internal consuming func createPlatformDiskIO() -> TrackedPlatformDiskIO { - let dispatchIO: DispatchIO = DispatchIO( - type: .stream, - fileDescriptor: self.platformDescriptor(), - queue: .global(), - cleanupHandler: { error in - // Close the file descriptor - if self.closeWhenDone { - try? self.safelyClose() - } - } - ) - return .init(dispatchIO, closeWhenDone: self.closeWhenDone) - } -} - -// MARK: - TrackedDispatchIO extensions -extension DispatchIO { - internal func read(upToLength maxLength: Int) async throws -> DispatchData? { - return try await withCheckedThrowingContinuation { continuation in - var buffer: DispatchData = .empty - self.read( - offset: 0, - length: maxLength, - queue: .global() - ) { done, data, error in - if error != 0 { - continuation.resume( - throwing: SubprocessError( - code: .init(.failedToReadFromSubprocess), - underlyingError: .init(rawValue: error) - ) - ) - return - } - if let data = data { - if buffer.isEmpty { - buffer = data - } else { - buffer.append(data) - } - } - if done { - if !buffer.isEmpty { - continuation.resume(returning: buffer) - } else { - continuation.resume(returning: nil) - } - } - } - } - } -} - -extension TrackedDispatchIO { - #if SubprocessSpan - internal func write( - _ span: borrowing RawSpan - ) async throws -> Int { - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - let dispatchData = span.withUnsafeBytes { - return DispatchData( - bytesNoCopy: $0, - deallocator: .custom( - nil, - { - // noop - } - ) - ) - } - self.write(dispatchData) { writtenLength, error in - if let error = error { - continuation.resume(throwing: error) - } else { - continuation.resume(returning: writtenLength) - } - } - } - } - #endif // SubprocessSpan - - internal func write( - _ array: [UInt8] - ) async throws -> Int { - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - let dispatchData = array.withUnsafeBytes { - return DispatchData( - bytesNoCopy: $0, - deallocator: .custom( - nil, - { - // noop - } - ) - ) - } - self.write(dispatchData) { writtenLength, error in - if let error = error { - continuation.resume(throwing: error) - } else { - continuation.resume(returning: writtenLength) - } - } - } - } - - internal func write( - _ dispatchData: DispatchData, - queue: DispatchQueue = .global(), - completion: @escaping (Int, Error?) -> Void - ) { - self.dispatchIO.write( - offset: 0, - data: dispatchData, - queue: queue - ) { done, unwritten, error in - guard done else { - // Wait until we are done writing or encountered some error - return - } - - let unwrittenLength = unwritten?.count ?? 0 - let writtenLength = dispatchData.count - unwrittenLength - guard error != 0 else { - completion(writtenLength, nil) - return - } - completion( - writtenLength, - SubprocessError( - code: .init(.failedToWriteToSubprocess), - underlyingError: .init(rawValue: error) - ) - ) - } - } -} #endif // canImport(Darwin) || canImport(Glibc) || canImport(Android) || canImport(Musl) diff --git a/Sources/Subprocess/Platforms/Subprocess+Windows.swift b/Sources/Subprocess/Platforms/Subprocess+Windows.swift index 3dfb00c..723a662 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Windows.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Windows.swift @@ -1004,168 +1004,15 @@ extension FileDescriptor { } } -extension FileDescriptor { - internal func read(upToLength maxLength: Int) async throws -> [UInt8]? { - return try await withCheckedThrowingContinuation { continuation in - self.readUntilEOF( - upToLength: maxLength - ) { result in - switch result { - case .failure(let error): - continuation.resume(throwing: error) - case .success(let bytes): - continuation.resume(returning: bytes.isEmpty ? nil : bytes) - } - } - } - } - - internal func readUntilEOF( - upToLength maxLength: Int, - resultHandler: @Sendable @escaping (Swift.Result<[UInt8], any (Error & Sendable)>) -> Void - ) { - DispatchQueue.global(qos: .userInitiated).async { - var totalBytesRead: Int = 0 - var lastError: DWORD? = nil - let values = [UInt8]( - unsafeUninitializedCapacity: maxLength - ) { buffer, initializedCount in - while true { - guard let baseAddress = buffer.baseAddress else { - initializedCount = 0 - break - } - let bufferPtr = baseAddress.advanced(by: totalBytesRead) - var bytesRead: DWORD = 0 - let readSucceed = ReadFile( - self.platformDescriptor, - UnsafeMutableRawPointer(mutating: bufferPtr), - DWORD(maxLength - totalBytesRead), - &bytesRead, - nil - ) - if !readSucceed { - // Windows throws ERROR_BROKEN_PIPE when the pipe is closed - let error = GetLastError() - if error == ERROR_BROKEN_PIPE { - // We are done reading - initializedCount = totalBytesRead - } else { - // We got some error - lastError = error - initializedCount = 0 - } - break - } else { - // We successfully read the current round - totalBytesRead += Int(bytesRead) - } - - if totalBytesRead >= maxLength { - initializedCount = min(maxLength, totalBytesRead) - break - } - } - } - if let lastError = lastError { - let windowsError = SubprocessError( - code: .init(.failedToReadFromSubprocess), - underlyingError: .init(rawValue: lastError) - ) - resultHandler(.failure(windowsError)) - } else { - resultHandler(.success(values)) - } - } - } -} - extension TrackedFileDescriptor { internal consuming func createPlatformDiskIO() -> TrackedPlatformDiskIO { - // TrackedPlatformDiskIO is a typealias of TrackedFileDescriptor on Windows (they're the same type) - // Just return the same object so we don't create a copy and try to double-close the fd. - return self - } - - internal func readUntilEOF( - upToLength maxLength: Int, - resultHandler: @Sendable @escaping (Swift.Result<[UInt8], any (Error & Sendable)>) -> Void - ) { - self.fileDescriptor.readUntilEOF( - upToLength: maxLength, - resultHandler: resultHandler + // Transferring out the ownership of fileDescriptor means we don't have go close here + let result: TrackedPlatformDiskIO = .init( + self.fileDescriptor, + closeWhenDone: self.closeWhenDone ) - } - -#if SubprocessSpan - internal func write( - _ span: borrowing RawSpan - ) async throws -> Int { - let fileDescriptor = self.fileDescriptor - return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - span.withUnsafeBytes { ptr in - // TODO: Use WriteFileEx for asyc here - Self.write( - ptr, - to: fileDescriptor - ) { writtenLength, error in - if let error = error { - continuation.resume(throwing: error) - } else { - continuation.resume(returning: writtenLength) - } - } - } - } - } -#endif - - internal func write( - _ array: [UInt8] - ) async throws -> Int { - try await withCheckedThrowingContinuation { continuation in - // TODO: Figure out a better way to asynchronously write - let fd = self.fileDescriptor - DispatchQueue.global(qos: .userInitiated).async { - array.withUnsafeBytes { - Self.write( - $0, - to: fd - ) { writtenLength, error in - if let error = error { - continuation.resume(throwing: error) - } else { - continuation.resume(returning: writtenLength) - } - } - } - } - } - } - - internal static func write( - _ ptr: UnsafeRawBufferPointer, - to fileDescriptor: FileDescriptor, - completion: @escaping (Int, Swift.Error?) -> Void - ) { - let handle = HANDLE(bitPattern: _get_osfhandle(fileDescriptor.rawValue))! - var writtenBytes: DWORD = 0 - let writeSucceed = WriteFile( - handle, - ptr.baseAddress, - DWORD(ptr.count), - &writtenBytes, - nil - ) - if !writeSucceed { - let error = SubprocessError( - code: .init(.failedToWriteToSubprocess), - underlyingError: .init(rawValue: GetLastError()) - ) - completion(Int(writtenBytes), error) - } else { - completion(Int(writtenBytes), nil) - } + self.closeWhenDone = false + return result } } diff --git a/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift b/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift index a1db8ad..8312c56 100644 --- a/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift +++ b/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift @@ -111,7 +111,7 @@ extension StandardInputWriter { public func write( _ data: Data ) async throws -> Int { - return try await self.diskIO.write(data) + return try await AsyncIO.shared.write(data, to: self.diskIO) } /// Write a AsyncSequence of Data to the standard input of the subprocess. @@ -128,35 +128,12 @@ extension StandardInputWriter { } } -#if os(Windows) -extension TrackedFileDescriptor { - internal func write( - _ data: Data - ) async throws -> Int { - let fileDescriptor = self.fileDescriptor - return try await withCheckedThrowingContinuation { continuation in - // TODO: Figure out a better way to asynchronously write - DispatchQueue.global(qos: .userInitiated).async { - data.withUnsafeBytes { - Self.write( - $0, - to: fileDescriptor - ) { writtenLength, error in - if let error = error { - continuation.resume(throwing: error) - } else { - continuation.resume(returning: writtenLength) - } - } - } - } - } - } -} -#else -extension TrackedDispatchIO { + +#if canImport(Darwin) +extension AsyncIO { internal func write( - _ data: Data + _ data: Data, + to diskIO: borrowing TrackedPlatformDiskIO ) async throws -> Int { try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in let dispatchData = data.withUnsafeBytes { @@ -170,7 +147,7 @@ extension TrackedDispatchIO { ) ) } - self.write(dispatchData) { writtenLength, error in + self.write(dispatchData, to: diskIO) { writtenLength, error in if let error = error { continuation.resume(throwing: error) } else { @@ -180,6 +157,17 @@ extension TrackedDispatchIO { } } } -#endif // os(Windows) +#else +extension Data : AsyncIO._ContiguousBytes { } + +extension AsyncIO { + internal func write( + _ data: Data, + to diskIO: borrowing TrackedPlatformDiskIO + ) async throws -> Int { + return try await self._write(data, to: diskIO) + } +} +#endif // canImport(Darwin) #endif // SubprocessFoundation diff --git a/Sources/_SubprocessCShims/include/process_shims.h b/Sources/_SubprocessCShims/include/process_shims.h index 35cbd2f..0ae4a5a 100644 --- a/Sources/_SubprocessCShims/include/process_shims.h +++ b/Sources/_SubprocessCShims/include/process_shims.h @@ -21,6 +21,11 @@ #include #endif +#if TARGET_OS_LINUX +#include +#include +#endif // TARGET_OS_LINUX + #if __has_include() vm_size_t _subprocess_vm_size(void); #endif diff --git a/Tests/SubprocessTests/SubprocessTests+Unix.swift b/Tests/SubprocessTests/SubprocessTests+Unix.swift index a50c4e8..25a464e 100644 --- a/Tests/SubprocessTests/SubprocessTests+Unix.swift +++ b/Tests/SubprocessTests/SubprocessTests+Unix.swift @@ -668,14 +668,11 @@ extension SubprocessUnixTests { var platformOptions = PlatformOptions() platformOptions.supplementaryGroups = Array(expectedGroups) let idResult = try await Subprocess.run( - .name("swift"), + .path("/usr/bin/swift"), arguments: [getgroupsSwift.string], platformOptions: platformOptions, - output: .string, - error: .string, + output: .string ) - let error = try #require(idResult.standardError) - try #require(error == "") #expect(idResult.terminationStatus.isSuccess) let ids = try #require( idResult.standardOutput From f15e03d64d8bb05e56747595591ecb1327cd76d4 Mon Sep 17 00:00:00 2001 From: Charles Hu Date: Wed, 2 Jul 2025 22:09:00 -0700 Subject: [PATCH 2/2] Refactor captureOutput() to minimize force unwrap --- Sources/Subprocess/IO/AsyncIO.swift | 20 +++++++++++++++++++- Sources/Subprocess/IO/Output.swift | 27 ++++++++++++++++++--------- 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/Sources/Subprocess/IO/AsyncIO.swift b/Sources/Subprocess/IO/AsyncIO.swift index 3f075e5..2e7b129 100644 --- a/Sources/Subprocess/IO/AsyncIO.swift +++ b/Sources/Subprocess/IO/AsyncIO.swift @@ -352,8 +352,14 @@ extension AsyncIO { ) var readLength: Int = 0 let signalStream = self.registerFileDescriptor(fileDescriptor, for: .read) + /// Outer loop: every iteration signals we are ready to read more data for try await _ in signalStream { - // Every iteration signals we are ready to read more data + /// Inner loop: repeatedly call `.read()` and read more data until: + /// 1. We reached EOF (read length is 0), in which case return the result + /// 2. We read `maxLength` bytes, in which case return the result + /// 3. `read()` returns -1 and sets `errno` to `EAGAIN` or `EWOULDBLOCK`. In + /// this case we `break` out of the inner loop and wait `.read()` to be + /// ready by `await`ing the next signal in the outer loop. while true { let bytesRead = resultBuffer.withUnsafeMutableBufferPointer { bufferPointer in // Get a pointer to the memory at the specified offset @@ -417,7 +423,13 @@ extension AsyncIO { let fileDescriptor = diskIO.fileDescriptor let signalStream = self.registerFileDescriptor(fileDescriptor, for: .write) var writtenLength: Int = 0 + /// Outer loop: every iteration signals we are ready to read more data for try await _ in signalStream { + /// Inner loop: repeatedly call `.write()` and write more data until: + /// 1. We've written bytes.count bytes. + /// 3. `.write()` returns -1 and sets `errno` to `EAGAIN` or `EWOULDBLOCK`. In + /// this case we `break` out of the inner loop and wait `.write()` to be + /// ready by `await`ing the next signal in the outer loop. while true { let written = bytes.withUnsafeBytes { ptr in let remainingLength = ptr.count - writtenLength @@ -454,7 +466,13 @@ extension AsyncIO { let fileDescriptor = diskIO.fileDescriptor let signalStream = self.registerFileDescriptor(fileDescriptor, for: .write) var writtenLength: Int = 0 + /// Outer loop: every iteration signals we are ready to read more data for try await _ in signalStream { + /// Inner loop: repeatedly call `.write()` and write more data until: + /// 1. We've written bytes.count bytes. + /// 3. `.write()` returns -1 and sets `errno` to `EAGAIN` or `EWOULDBLOCK`. In + /// this case we `break` out of the inner loop and wait `.write()` to be + /// ready by `await`ing the next signal in the outer loop. while true { let written = span.withUnsafeBytes { ptr in let remainingLength = ptr.count - writtenLength diff --git a/Sources/Subprocess/IO/Output.swift b/Sources/Subprocess/IO/Output.swift index 563a73a..983b473 100644 --- a/Sources/Subprocess/IO/Output.swift +++ b/Sources/Subprocess/IO/Output.swift @@ -140,10 +140,10 @@ public struct BytesOutput: OutputProtocol { public let maxSize: Int internal func captureOutput( - from diskIO: consuming TrackedPlatformDiskIO? + from diskIO: consuming TrackedPlatformDiskIO ) async throws -> [UInt8] { - let result = try await AsyncIO.shared.read(from: diskIO!, upTo: self.maxSize) - try diskIO?.safelyClose() + let result = try await AsyncIO.shared.read(from: diskIO, upTo: self.maxSize) + try diskIO.safelyClose() #if canImport(Darwin) return result?.array() ?? [] #else @@ -255,17 +255,26 @@ extension OutputProtocol { internal func captureOutput( from diskIO: consuming TrackedPlatformDiskIO? ) async throws -> OutputType { - if let bytesOutput = self as? BytesOutput { - return try await bytesOutput.captureOutput(from: diskIO) as! Self.OutputType - } - if OutputType.self == Void.self { return () as! OutputType } + // `diskIO` is only `nil` for any types that conform to `OutputProtocol` + // and have `Void` as ``OutputType` (i.e. `DiscardedOutput`). Since we + // made sure `OutputType` is not `Void` on the line above, `diskIO` + // must not be nil; otherwise, this is a programmer error. + guard var diskIO else { + fatalError( + "Internal Inconsistency Error: diskIO must not be nil when OutputType is not Void" + ) + } + + if let bytesOutput = self as? BytesOutput { + return try await bytesOutput.captureOutput(from: diskIO) as! Self.OutputType + } // Force unwrap is safe here because only `OutputType.self == Void` would // have nil `TrackedPlatformDiskIO` - let result = try await AsyncIO.shared.read(from: diskIO!, upTo: self.maxSize) - try diskIO?.safelyClose() + let result = try await AsyncIO.shared.read(from: diskIO, upTo: self.maxSize) + try diskIO.safelyClose() #if canImport(Darwin) return try self.output(from: result ?? .empty) #else