Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove dependency on SPI #419

Merged
merged 1 commit into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//
//===----------------------------------------------------------------------===//

@_spi(AsyncChannel) import NIOCore
import NIOCore

internal struct InlineStreamMultiplexer {
private let context: ChannelHandlerContext
Expand Down Expand Up @@ -211,36 +211,3 @@ extension InlineStreamMultiplexer {
self.commonStreamMultiplexer.setChannelContinuation(streamChannels)
}
}

extension NIOHTTP2Handler {
/// A variant of `NIOHTTP2Handler.StreamMultiplexer` which creates a child channel for each HTTP/2 stream and
/// provides access to inbound HTTP/2 streams.
///
/// In general in NIO applications it is helpful to consider each HTTP/2 stream as an
/// independent stream of HTTP/2 frames. This multiplexer achieves this by creating a
/// number of in-memory `HTTP2StreamChannel` objects, one for each stream. These operate
/// on ``HTTP2Frame/FramePayload`` objects as their base communication
/// atom, as opposed to the regular NIO `SelectableChannel` objects which use `ByteBuffer`
/// and `IOData`.
///
/// Outbound stream channel objects are initialized upon creation using the supplied `streamStateInitializer` which returns a type
/// `Output`. This type may be `HTTP2Frame` or changed to any other type.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public struct AsyncStreamMultiplexer<InboundStreamOutput> {
private let inlineStreamMultiplexer: InlineStreamMultiplexer
public let inbound: NIOHTTP2InboundStreamChannels<InboundStreamOutput>

// Cannot be created by users.
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<InboundStreamOutput>) {
self.inlineStreamMultiplexer = inlineStreamMultiplexer
self.inlineStreamMultiplexer.setChannelContinuation(continuation)
self.inbound = inboundStreamChannels
}

/// Create a stream channel initialized with the provided closure
public func createStreamChannel<Output: Sendable>(_ initializer: @escaping NIOChannelInitializerWithOutput<Output>) async throws -> Output {
return try await self.inlineStreamMultiplexer.createStreamChannel(initializer).get()
}
}
}
12 changes: 0 additions & 12 deletions Sources/NIOHTTP2/HTTP2ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1140,16 +1140,4 @@ extension NIOHTTP2Handler {
throw NIOHTTP2Errors.missingMultiplexer()
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal func syncAsyncStreamMultiplexer<Output: Sendable>(continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<Output>) throws -> AsyncStreamMultiplexer<Output> {
self.eventLoop!.preconditionInEventLoop()

switch self.inboundStreamMultiplexer {
case let .some(.inline(multiplexer)):
return AsyncStreamMultiplexer(multiplexer, continuation: continuation, inboundStreamChannels: inboundStreamChannels)
case .some(.legacy), .none:
throw NIOHTTP2Errors.missingMultiplexer()
}
}
}
121 changes: 0 additions & 121 deletions Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -451,124 +451,3 @@ internal protocol AnyContinuation {
func finish()
func finish(throwing error: Error)
}


/// `NIOHTTP2InboundStreamChannels` provides access to inbound stream channels as a generic `AsyncSequence`.
/// They make use of generics to allow for wrapping the stream `Channel`s, for example as `NIOAsyncChannel`s or protocol negotiation objects.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public struct NIOHTTP2InboundStreamChannels<Output>: AsyncSequence {
public struct AsyncIterator: AsyncIteratorProtocol {
public typealias Element = Output

private var iterator: AsyncThrowingStream<Output, Error>.AsyncIterator

init(_ iterator: AsyncThrowingStream<Output, Error>.AsyncIterator) {
self.iterator = iterator
}

public mutating func next() async throws -> Output? {
try await self.iterator.next()
}
}

public typealias Element = Output

private let asyncThrowingStream: AsyncThrowingStream<Output, Error>

private init(_ asyncThrowingStream: AsyncThrowingStream<Output, Error>) {
self.asyncThrowingStream = asyncThrowingStream
}

public func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(self.asyncThrowingStream.makeAsyncIterator())
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension NIOHTTP2InboundStreamChannels {
/// `Continuation` is a wrapper for a generic `AsyncThrowingStream` to which inbound HTTP2 stream channels are yielded..
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
struct Continuation: AnyContinuation {
private var continuation: AsyncThrowingStream<Output, Error>.Continuation

internal init(
continuation: AsyncThrowingStream<Output, Error>.Continuation
) {
self.continuation = continuation
}

/// `yield` takes a channel as outputted by the stream initializer and yields the wrapped `AsyncThrowingStream`.
///
/// It takes channels as as `Any` type to allow wrapping by the stream initializer.
func yield(any: Any) {
let yieldResult = self.continuation.yield(any as! Output)
switch yieldResult {
case .enqueued:
break // success, nothing to do
case .dropped:
preconditionFailure("Attempted to yield when AsyncThrowingStream is over capacity. This shouldn't be possible for an unbounded stream.")
case .terminated:
preconditionFailure("Attempted to yield to AsyncThrowingStream in terminated state.")
default:
preconditionFailure("Attempt to yield to AsyncThrowingStream failed for unhandled reason.")
}
}

/// `finish` marks the continuation as finished.
func finish() {
self.continuation.finish()
}

/// `finish` marks the continuation as finished with the supplied error.
func finish(throwing error: Error) {
self.continuation.finish(throwing: error)
}
}


/// `initialize` creates a new `Continuation` object and returns it along with its backing `AsyncThrowingStream`.
/// The `StreamChannelContinuation` provides access to the inbound HTTP2 stream channels.
///
/// - Parameters:
/// - inboundStreamInititializer: A closure which initializes the newly-created inbound stream channel and returns a generic.
/// The returned type corresponds to the output of the channel once the operations in the initializer have been performed.
/// For example an `inboundStreamInititializer` which inserts handlers before wrapping the channel in a `NIOAsyncChannel` would
/// have a `Output` corresponding to that `NIOAsyncChannel` type. Another example is in cases where there is
/// per-stream protocol negotiation where `Output` would be some form of `NIOProtocolNegotiationResult`.
static func initialize(inboundStreamInitializerOutput: Output.Type = Output.self) -> (NIOHTTP2InboundStreamChannels<Output>, Continuation) {
let (stream, continuation) = AsyncThrowingStream.makeStream(of: Output.self)
return (.init(stream), Continuation(continuation: continuation))
}
}

#if swift(>=5.7)
// This doesn't compile on 5.6 but the omission of Sendable is sufficient in any case
@available(*, unavailable)
extension NIOHTTP2InboundStreamChannels.AsyncIterator: Sendable {}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension NIOHTTP2InboundStreamChannels: Sendable where Output: Sendable {}
#else
// This wasn't marked as sendable in 5.6 however it should be fine
// https://forums.swift.org/t/so-is-asyncstream-sendable-or-not/53148/2
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension NIOHTTP2InboundStreamChannels: @unchecked Sendable where Output: Sendable {}
#endif


#if swift(<5.9)
// this should be available in the std lib from 5.9 onwards
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension AsyncThrowingStream {
public static func makeStream(
of elementType: Element.Type = Element.self,
throwing failureType: Failure.Type = Failure.self,
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
) -> (stream: AsyncThrowingStream<Element, Failure>, continuation: AsyncThrowingStream<Element, Failure>.Continuation) where Failure == Error {
var continuation: AsyncThrowingStream<Element, Failure>.Continuation!
let stream = AsyncThrowingStream<Element, Failure>(bufferingPolicy: limit) { continuation = $0 }
return (stream: stream, continuation: continuation!)
}
}
#endif