Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic helpers for HTTP/2 async pipelines #401

Merged
merged 18 commits into from
Jul 3, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ let package = Package(
name: "NIOHTTP2Tests",
dependencies: [
"NIOHTTP2",
.product(name: "NIOConcurrencyHelpers", package: "swift-nio"),
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "NIOEmbedded", package: "swift-nio"),
.product(name: "NIOHTTP1", package: "swift-nio"),
Expand Down
42 changes: 42 additions & 0 deletions Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ extension InlineStreamMultiplexer {
internal func createStreamChannel(_ streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Void>) -> EventLoopFuture<Channel> {
self.commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), streamStateInitializer)
}

internal func createStreamChannel<Output>(_ streamStateInitializer: @escaping NIOHTTP2Handler.StreamInitializerWithOutput<Output>) -> EventLoopFuture<Output> {
self.commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), streamStateInitializer)
}
}

extension NIOHTTP2Handler {
Expand Down Expand Up @@ -201,3 +205,41 @@ extension NIOHTTP2Handler {
}
}
}

extension InlineStreamMultiplexer {
func setChannelContinuation(_ streamChannels: any ChannelContinuation) {
self.commonStreamMultiplexer.setChannelContinuation(streamChannels)
}
}

extension NIOHTTP2Handler {
/// A variant of `NIOHTTP2Handler.StreamMultiplexer` which creates a child channel for each HTTP/2 stream and
/// provides access to inbound HTTP/2 streams.
///
/// In general in NIO applications it is helpful to consider each HTTP/2 stream as an
/// independent stream of HTTP/2 frames. This multiplexer achieves this by creating a
/// number of in-memory `HTTP2StreamChannel` objects, one for each stream. These operate
/// on ``HTTP2Frame/FramePayload`` objects as their base communication
/// atom, as opposed to the regular NIO `SelectableChannel` objects which use `ByteBuffer`
/// and `IOData`.
///
/// Outbound stream channel objects are initialized upon creation using the supplied `streamStateInitializer` which returns a type
/// `OutboundStreamOutput`. This type may be `HTTP2Frame` or changed to any other type.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public struct AsyncStreamMultiplexer<InboundStreamOutput> {
private let inlineStreamMultiplexer: InlineStreamMultiplexer
public let inbound: NIOHTTP2InboundStreamChannels<InboundStreamOutput>
rnro marked this conversation as resolved.
Show resolved Hide resolved

// Cannot be created by users.
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, continuation: any ChannelContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<InboundStreamOutput>) {
self.inlineStreamMultiplexer = inlineStreamMultiplexer
self.inlineStreamMultiplexer.setChannelContinuation(continuation)
self.inbound = inboundStreamChannels
}

public func createStreamChannel<OutboundStreamOutput>(_ streamStateInitializer: @escaping NIOHTTP2Handler.StreamInitializerWithOutput<OutboundStreamOutput>) async throws -> OutboundStreamOutput {
rnro marked this conversation as resolved.
Show resolved Hide resolved
return try await self.inlineStreamMultiplexer.createStreamChannel(streamStateInitializer).get()
}
}
}
16 changes: 16 additions & 0 deletions Sources/NIOHTTP2/HTTP2ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -990,9 +990,13 @@ extension NIOHTTP2Handler {
#if swift(>=5.7)
/// The type of all `inboundStreamInitializer` callbacks.
public typealias StreamInitializer = @Sendable (Channel) -> EventLoopFuture<Void>
/// The type of `inboundStreamInitializer` callbacks which return non-void results.
public typealias StreamInitializerWithOutput<Output> = @Sendable (Channel) -> EventLoopFuture<Output>
rnro marked this conversation as resolved.
Show resolved Hide resolved
#else
/// The type of all `inboundStreamInitializer` callbacks.
public typealias StreamInitializer = (Channel) -> EventLoopFuture<Void>
/// The type of `inboundStreamInitializer` callbacks which return non-void results.
public typealias StreamInitializerWithOutput<Output> = (Channel) -> EventLoopFuture<Output>
#endif

/// Creates a new ``NIOHTTP2Handler`` with a local multiplexer. (i.e. using
Expand Down Expand Up @@ -1076,4 +1080,16 @@ extension NIOHTTP2Handler {
throw NIOHTTP2Errors.missingMultiplexer()
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal func syncAsyncStreamMultiplexer<Output>(continuation: any ChannelContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<Output>) throws -> AsyncStreamMultiplexer<Output> {
self.eventLoop!.preconditionInEventLoop()

switch self.inboundStreamMultiplexer {
case let .some(.inline(multiplexer)):
return AsyncStreamMultiplexer(multiplexer, continuation: continuation, inboundStreamChannels: inboundStreamChannels)
case .some(.legacy), .none:
throw NIOHTTP2Errors.missingMultiplexer()
}
}
}
239 changes: 222 additions & 17 deletions Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,16 @@ internal class HTTP2CommonInboundStreamMultiplexer {
private var isReading = false
private var flushPending = false

init(mode: NIOHTTP2Handler.ParserMode, channel: Channel, inboundStreamStateInitializer: MultiplexerAbstractChannel.InboundStreamStateInitializer, targetWindowSize: Int, streamChannelOutboundBytesHighWatermark: Int, streamChannelOutboundBytesLowWatermark: Int) {
var streamChannelContinuation: (any ChannelContinuation)?

init(
mode: NIOHTTP2Handler.ParserMode,
channel: Channel,
inboundStreamStateInitializer: MultiplexerAbstractChannel.InboundStreamStateInitializer,
targetWindowSize: Int,
streamChannelOutboundBytesHighWatermark: Int,
streamChannelOutboundBytesLowWatermark: Int
) {
self.channel = channel
self.inboundStreamStateInitializer = inboundStreamStateInitializer
self.targetWindowSize = targetWindowSize
Expand Down Expand Up @@ -93,6 +102,13 @@ extension HTTP2CommonInboundStreamMultiplexer {
)

self.streams[streamID] = channel

// If we have an async sequence of inbound stream channels yield the channel to it
// This also implicitly performs the stream initialization step.
// Note that in this case the API is constructed such that `self.inboundStreamStateInitializer`
// does no actual work.
self.streamChannelContinuation?.yield(channel: channel.baseChannel)

channel.configureInboundStream(initializer: self.inboundStreamStateInitializer)
channel.receiveInboundFrame(frame)

Expand Down Expand Up @@ -178,6 +194,8 @@ extension HTTP2CommonInboundStreamMultiplexer {
for channel in self.pendingStreams.values {
channel.receiveStreamClosed(nil)
}
// there cannot be any more inbound streams now that the connection channel is inactive
self.streamChannelContinuation?.finish()
}

internal func propagateChannelWritabilityChanged(context: ChannelHandlerContext) {
Expand Down Expand Up @@ -265,31 +283,98 @@ extension HTTP2CommonInboundStreamMultiplexer {
}

extension HTTP2CommonInboundStreamMultiplexer {
internal func createStreamChannel(multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer, promise: EventLoopPromise<Channel>?, _ streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Void>) {
self.channel.eventLoop.execute {
let channel = MultiplexerAbstractChannel(
allocator: self.channel.allocator,
parent: self.channel,
multiplexer: multiplexer,
streamID: nil,
targetWindowSize: Int32(self.targetWindowSize),
outboundBytesHighWatermark: self.streamChannelOutboundBytesHighWatermark,
outboundBytesLowWatermark: self.streamChannelOutboundBytesLowWatermark,
inboundStreamStateInitializer: .excludesStreamID(nil)
)
self.pendingStreams[channel.channelID] = channel
channel.configure(initializer: streamStateInitializer, userPromise: promise)
internal func _createStreamChannel<Output>(
_ multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
_ promise: EventLoopPromise<Output>?,
_ streamStateInitializer: @escaping NIOHTTP2Handler.StreamInitializerWithOutput<Output>
) {
self.channel.eventLoop.assertInEventLoop()

let channel = MultiplexerAbstractChannel(
rnro marked this conversation as resolved.
Show resolved Hide resolved
allocator: self.channel.allocator,
parent: self.channel,
multiplexer: multiplexer,
streamID: nil,
targetWindowSize: Int32(self.targetWindowSize),
outboundBytesHighWatermark: self.streamChannelOutboundBytesHighWatermark,
outboundBytesLowWatermark: self.streamChannelOutboundBytesLowWatermark,
inboundStreamStateInitializer: .excludesStreamID(nil)
)
self.pendingStreams[channel.channelID] = channel
channel.configure(initializer: streamStateInitializer, userPromise: promise)
}

internal func createStreamChannel<Output>(
multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
promise: EventLoopPromise<Output>?,
_ streamStateInitializer: @escaping NIOHTTP2Handler.StreamInitializerWithOutput<Output>
) {
if self.channel.eventLoop.inEventLoop {
self._createStreamChannel(multiplexer, promise, streamStateInitializer)
} else {
self.channel.eventLoop.execute {
self._createStreamChannel(multiplexer, promise, streamStateInitializer)
}
}
}

internal func createStreamChannel(multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer, _ streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Void>) -> EventLoopFuture<Channel> {
internal func createStreamChannel<Output>(
multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
_ streamStateInitializer: @escaping NIOHTTP2Handler.StreamInitializerWithOutput<Output>
) -> EventLoopFuture<Output> {
let promise = self.channel.eventLoop.makePromise(of: Output.self)
self.createStreamChannel(multiplexer: multiplexer, promise: promise, streamStateInitializer)
return promise.futureResult
}

internal func _createStreamChannel(
_ multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
_ promise: EventLoopPromise<Channel>?,
_ streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Void>
) {
let channel = MultiplexerAbstractChannel(
allocator: self.channel.allocator,
parent: self.channel,
multiplexer: multiplexer,
streamID: nil,
targetWindowSize: Int32(self.targetWindowSize),
outboundBytesHighWatermark: self.streamChannelOutboundBytesHighWatermark,
outboundBytesLowWatermark: self.streamChannelOutboundBytesLowWatermark,
inboundStreamStateInitializer: .excludesStreamID(nil)
)
self.pendingStreams[channel.channelID] = channel
channel.configure(initializer: streamStateInitializer, userPromise: promise)
}

internal func createStreamChannel(
multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
promise: EventLoopPromise<Channel>?,
_ streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Void>
) {
if self.channel.eventLoop.inEventLoop {
self._createStreamChannel(multiplexer, promise, streamStateInitializer)
} else {
self.channel.eventLoop.execute {
self._createStreamChannel(multiplexer, promise, streamStateInitializer)
}
}
}

internal func createStreamChannel(
multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
_ streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Void>
) -> EventLoopFuture<Channel> {
let promise = self.channel.eventLoop.makePromise(of: Channel.self)
self.createStreamChannel(multiplexer: multiplexer, promise: promise, streamStateInitializer)
return promise.futureResult
}

@available(*, deprecated, message: "The signature of 'streamStateInitializer' has changed to '(Channel) -> EventLoopFuture<Void>'")
internal func createStreamChannel(multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer, promise: EventLoopPromise<Channel>?, _ streamStateInitializer: @escaping (Channel, HTTP2StreamID) -> EventLoopFuture<Void>) {
internal func createStreamChannel(
multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
promise: EventLoopPromise<Channel>?,
_ streamStateInitializer: @escaping (Channel, HTTP2StreamID
) -> EventLoopFuture<Void>) {
rnro marked this conversation as resolved.
Show resolved Hide resolved
self.channel.eventLoop.execute {
let streamID = self.nextStreamID()
let channel = MultiplexerAbstractChannel(
Expand Down Expand Up @@ -321,3 +406,123 @@ extension HTTP2CommonInboundStreamMultiplexer {
}
}
}

extension HTTP2CommonInboundStreamMultiplexer {
func setChannelContinuation(_ streamChannels: any ChannelContinuation) {
self.channel.eventLoop.assertInEventLoop()
self.streamChannelContinuation = streamChannels
}
}

/// `ChannelContinuation` is used to generic async-sequence-like objects to deal with `Channel`s. This is so that they may be held
/// by the `HTTP2ChannelHandler` without causing it to become generic itself.
internal protocol ChannelContinuation {
func yield(channel: Channel)
func finish()
func finish(throwing error: Error)
}


/// `StreamChannelContinuation` is a wrapper for a generic `AsyncThrowingStream` which holds the inbound HTTP2 stream channels.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
struct StreamChannelContinuation<Output>: ChannelContinuation {
private var continuation: AsyncThrowingStream<Output, Error>.Continuation
private let inboundStreamInititializer: (Channel) -> EventLoopFuture<Output>
rnro marked this conversation as resolved.
Show resolved Hide resolved

private init(
continuation: AsyncThrowingStream<Output, Error>.Continuation,
inboundStreamInititializer: @escaping (Channel) -> EventLoopFuture<Output>
) {
self.continuation = continuation
self.inboundStreamInititializer = inboundStreamInititializer
}

/// `initialize` creates a new `StreamChannelContinuation` object and returns it along with its backing `AsyncThrowingStream`.
/// The `StreamChannelContinuation` provides access to the inbound HTTP2 stream channels.
///
/// - Parameters:
/// - inboundStreamInititializer: A closure which initializes the newly-created inbound stream channel and returns a generic.
/// The returned type corresponds to the output of the channel once the operations in the initializer have been performed.
/// For example an `inboundStreamInititializer` which inserts handlers before wrapping the channel in a `NIOAsyncChannel` would
/// have a `Output` corresponding to that `NIOAsyncChannel` type. Another example is in cases where there is
/// per-stream protocol negotiation where `Output` would be some form of `NIOProtocolNegotiationResult`.
static func initialize(
with inboundStreamInititializer: @escaping (Channel) -> EventLoopFuture<Output>
rnro marked this conversation as resolved.
Show resolved Hide resolved
) -> (StreamChannelContinuation<Output>, NIOHTTP2InboundStreamChannels<Output>) {
var continuation: AsyncThrowingStream<Output, Error>.Continuation? = nil
let stream = AsyncThrowingStream { continuation = $0 }
rnro marked this conversation as resolved.
Show resolved Hide resolved
return (StreamChannelContinuation(continuation: continuation!, inboundStreamInititializer: inboundStreamInititializer), NIOHTTP2InboundStreamChannels(stream))
}

/// `yield` takes a channel, executes the stored `streamInitializer` upon it and then yields the *derived* type to
/// the wrapped `AsyncThrowingStream`.
func yield(channel: Channel) {
channel.eventLoop.assertInEventLoop()
self.inboundStreamInititializer(channel).whenSuccess { streamChannel in
rnro marked this conversation as resolved.
Show resolved Hide resolved
let yieldResult = self.continuation.yield(streamChannel)
switch yieldResult {
case .enqueued:
break // success, nothing to do
case .dropped:
rnro marked this conversation as resolved.
Show resolved Hide resolved
preconditionFailure("Attempted to yield channel when AsyncThrowingStream is over capacity. This shouldn't be possible for an unbounded stream.")
case .terminated:
channel.close(mode: .all, promise: nil)
assertionFailure("Attempted to yield channel to AsyncThrowingStream in terminated state.")
default:
channel.close(mode: .all, promise: nil)
assertionFailure("Attempt to yield channel to AsyncThrowingStream failed for unhandled reason.")
rnro marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

/// `finish` marks the continuation as finished.
func finish() {
self.continuation.finish()
}

/// `finish` marks the continuation as finished with the supplied error.
func finish(throwing error: Error) {
self.continuation.finish(throwing: error)
}
}

/// `NIOHTTP2InboundStreamChannels` provides access to inbound stream channels as an `AsyncSequence`.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public struct NIOHTTP2InboundStreamChannels<Output>: AsyncSequence {
glbrntt marked this conversation as resolved.
Show resolved Hide resolved
public struct AsyncIterator: AsyncIteratorProtocol {
public typealias Element = Output

private var iterator: AsyncThrowingStream<Output, Error>.AsyncIterator

init(_ iterator: AsyncThrowingStream<Output, Error>.AsyncIterator) {
self.iterator = iterator
}

public mutating func next() async throws -> Output? {
try await self.iterator.next()
}
}

public typealias Element = Output

private let asyncThrowingStream: AsyncThrowingStream<Output, Error>

init(_ asyncThrowingStream: AsyncThrowingStream<Output, Error>) {
self.asyncThrowingStream = asyncThrowingStream
}

public func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(self.asyncThrowingStream.makeAsyncIterator())
}
}

rnro marked this conversation as resolved.
Show resolved Hide resolved
#if swift(>=5.7)
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension NIOHTTP2InboundStreamChannels: Sendable where Output: Sendable {}
#else
// This wasn't marked as sendable in 5.6 however it should be fine
// https://forums.swift.org/t/so-is-asyncstream-sendable-or-not/53148/2
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension NIOHTTP2InboundStreamChannels: @unchecked Sendable where Output: Sendable {}
#endif