Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic helpers for HTTP/2 async pipelines #401

Merged
merged 18 commits into from
Jul 3, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 47 additions & 0 deletions Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ extension InlineStreamMultiplexer {
internal func createStreamChannel(_ streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Void>) -> EventLoopFuture<Channel> {
self.commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), streamStateInitializer)
}

internal func createStreamChannel() -> EventLoopFuture<Channel> {
self.commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self))
}
}

extension NIOHTTP2Handler {
Expand Down Expand Up @@ -201,3 +205,46 @@ extension NIOHTTP2Handler {
}
}
}

extension InlineStreamMultiplexer {
func setChannelContinuation(_ streamChannels: any ChannelContinuation) {
self.commonStreamMultiplexer.setChannelContinuation(streamChannels)
}
}

extension NIOHTTP2Handler {
/// A variant of `NIOHTTP2Handler.StreamMultiplexer` which creates a child channel for each HTTP/2 stream and
/// provides access to inbound HTTP/2 streams.
///
/// In general in NIO applications it is helpful to consider each HTTP/2 stream as an
/// independent stream of HTTP/2 frames. This multiplexer achieves this by creating a
/// number of in-memory `HTTP2StreamChannel` objects, one for each stream. These operate
/// on ``HTTP2Frame/FramePayload`` objects as their base communication
/// atom, as opposed to the regular NIO `SelectableChannel` objects which use `ByteBuffer`
/// and `IOData`.
///
/// The stream channel objects are initialized upon creation using the supplied `streamStateInitializer` which returns a type
/// `Output`. This type may be `HTTP2Frame` or changed to any other type.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public struct AsyncStreamMultiplexer<Output> {
private let inlineStreamMultiplexer: InlineStreamMultiplexer
public let inbound: NIOHTTP2InboundStreamChannels<Output>

// Cannot be created by users.
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, continuation: any ChannelContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<Output>) {
self.inlineStreamMultiplexer = inlineStreamMultiplexer
self.inlineStreamMultiplexer.setChannelContinuation(continuation)
self.inbound = inboundStreamChannels
}

public func createStreamChannel<Output>(_ streamStateInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output>) async throws -> Output {
return try await self.inlineStreamMultiplexer.createStreamChannel().flatMap { streamChannel in
let http2StreamChannel = streamChannel as! HTTP2StreamChannel
let promise = streamChannel.eventLoop.makePromise(of: Output.self)
http2StreamChannel.configure(initializer: streamStateInitializer, userPromise: promise)
return promise.futureResult
}.get()
rnro marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
12 changes: 12 additions & 0 deletions Sources/NIOHTTP2/HTTP2ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1076,4 +1076,16 @@ extension NIOHTTP2Handler {
throw NIOHTTP2Errors.missingMultiplexer()
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal func syncAsyncStreamMultiplexer<Output>(continuation: any ChannelContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<Output>) throws -> AsyncStreamMultiplexer<Output> {
self.eventLoop!.preconditionInEventLoop()

switch self.inboundStreamMultiplexer {
case let .some(.inline(multiplexer)):
return AsyncStreamMultiplexer(multiplexer, continuation: continuation, inboundStreamChannels: inboundStreamChannels)
case .some(.legacy), .none:
throw NIOHTTP2Errors.missingMultiplexer()
}
}
}
192 changes: 178 additions & 14 deletions Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,16 @@ internal class HTTP2CommonInboundStreamMultiplexer {
private var isReading = false
private var flushPending = false

init(mode: NIOHTTP2Handler.ParserMode, channel: Channel, inboundStreamStateInitializer: MultiplexerAbstractChannel.InboundStreamStateInitializer, targetWindowSize: Int, streamChannelOutboundBytesHighWatermark: Int, streamChannelOutboundBytesLowWatermark: Int) {
var streamChannels: (any ChannelContinuation)?
rnro marked this conversation as resolved.
Show resolved Hide resolved

init(
mode: NIOHTTP2Handler.ParserMode,
channel: Channel,
inboundStreamStateInitializer: MultiplexerAbstractChannel.InboundStreamStateInitializer,
targetWindowSize: Int,
streamChannelOutboundBytesHighWatermark: Int,
streamChannelOutboundBytesLowWatermark: Int
) {
self.channel = channel
self.inboundStreamStateInitializer = inboundStreamStateInitializer
self.targetWindowSize = targetWindowSize
Expand Down Expand Up @@ -93,6 +102,11 @@ extension HTTP2CommonInboundStreamMultiplexer {
)

self.streams[streamID] = channel

// If we have an async sequence of inbound stream channels yield the channel to it
// This also implicitly performs the stream initialization step
self.streamChannels?.yield(channel: channel.baseChannel)
rnro marked this conversation as resolved.
Show resolved Hide resolved

channel.configureInboundStream(initializer: self.inboundStreamStateInitializer)
channel.receiveInboundFrame(frame)

Expand Down Expand Up @@ -178,6 +192,8 @@ extension HTTP2CommonInboundStreamMultiplexer {
for channel in self.pendingStreams.values {
channel.receiveStreamClosed(nil)
}
// there cannot be any more inbound streams now that the connection channel is inactive
self.streamChannels?.finish()
}

internal func propagateChannelWritabilityChanged(context: ChannelHandlerContext) {
Expand Down Expand Up @@ -265,20 +281,59 @@ extension HTTP2CommonInboundStreamMultiplexer {
}

extension HTTP2CommonInboundStreamMultiplexer {
internal func _createStreamChannel(_ multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer, _ promise: EventLoopPromise<Channel>?) {
let channel = MultiplexerAbstractChannel(
rnro marked this conversation as resolved.
Show resolved Hide resolved
allocator: self.channel.allocator,
parent: self.channel,
multiplexer: multiplexer,
streamID: nil,
targetWindowSize: Int32(self.targetWindowSize),
outboundBytesHighWatermark: self.streamChannelOutboundBytesHighWatermark,
outboundBytesLowWatermark: self.streamChannelOutboundBytesLowWatermark,
inboundStreamStateInitializer: .excludesStreamID(nil)
)
self.pendingStreams[channel.channelID] = channel
promise?.succeed(channel.baseChannel)
}

internal func createStreamChannel(multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer, promise: EventLoopPromise<Channel>?) {
if self.channel.eventLoop.inEventLoop {
self._createStreamChannel(multiplexer, promise)
} else {
self.channel.eventLoop.execute {
self._createStreamChannel(multiplexer, promise)
}
}
}

internal func createStreamChannel(multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer) -> EventLoopFuture<Channel> {
let promise = self.channel.eventLoop.makePromise(of: Channel.self)
self.createStreamChannel(multiplexer: multiplexer, promise: promise)
return promise.futureResult
}

internal func _createStreamChannel(_ multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer, _ promise: EventLoopPromise<Channel>?, _ streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Void>) {
let channel = MultiplexerAbstractChannel(
allocator: self.channel.allocator,
parent: self.channel,
multiplexer: multiplexer,
streamID: nil,
targetWindowSize: Int32(self.targetWindowSize),
outboundBytesHighWatermark: self.streamChannelOutboundBytesHighWatermark,
outboundBytesLowWatermark: self.streamChannelOutboundBytesLowWatermark,
inboundStreamStateInitializer: .excludesStreamID(nil)
)
self.pendingStreams[channel.channelID] = channel
channel.configure(initializer: streamStateInitializer, userPromise: promise)
}

internal func createStreamChannel(multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer, promise: EventLoopPromise<Channel>?, _ streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Void>) {
self.channel.eventLoop.execute {
let channel = MultiplexerAbstractChannel(
allocator: self.channel.allocator,
parent: self.channel,
multiplexer: multiplexer,
streamID: nil,
targetWindowSize: Int32(self.targetWindowSize),
outboundBytesHighWatermark: self.streamChannelOutboundBytesHighWatermark,
outboundBytesLowWatermark: self.streamChannelOutboundBytesLowWatermark,
inboundStreamStateInitializer: .excludesStreamID(nil)
)
self.pendingStreams[channel.channelID] = channel
channel.configure(initializer: streamStateInitializer, userPromise: promise)
if self.channel.eventLoop.inEventLoop {
self._createStreamChannel(multiplexer, promise, streamStateInitializer)
} else {
self.channel.eventLoop.execute {
self._createStreamChannel(multiplexer, promise, streamStateInitializer)
}
}
}

Expand Down Expand Up @@ -321,3 +376,112 @@ extension HTTP2CommonInboundStreamMultiplexer {
}
}
}

extension HTTP2CommonInboundStreamMultiplexer {
func setChannelContinuation(_ streamChannels: any ChannelContinuation) {
self.streamChannels = streamChannels
rnro marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// `ChannelContinuation` is used to generic async-sequence-like objects to deal with `Channel`s. This is so that they may be held
/// by the `HTTP2ChannelHandler` without causing it to become generic itself.
internal protocol ChannelContinuation {
func yield(channel: Channel)
func finish()
func finish(throwing error: Error)
}


/// `StreamChannelContinuation` is a wrapper for a generic `AsyncThrowingStream` which holds the inbound HTTP2 stream channels.
///
rnro marked this conversation as resolved.
Show resolved Hide resolved
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
struct StreamChannelContinuation<Output>: ChannelContinuation {
private var continuation: AsyncThrowingStream<Output, Error>.Continuation
private let streamStateInitializer: (Channel) -> EventLoopFuture<Output>
rnro marked this conversation as resolved.
Show resolved Hide resolved

private init(
continuation: AsyncThrowingStream<Output, Error>.Continuation,
streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Output>
) {
self.continuation = continuation
self.streamStateInitializer = streamStateInitializer
}

/// `initialize` creates a new `StreamChannels` object and returns it along with its backing `AsyncThrowingStream`.
rnro marked this conversation as resolved.
Show resolved Hide resolved
///
/// - Parameters:
/// - streamInitializer: A closure which initializes the newly-created inbound stream channel and returns a generic.
/// The returned type corresponds to the output of the channel once the operations in the initializer have been performed.
/// For example a `streamStateInitializer` which inserts handlers before wrapping the channel in a `NIOAsyncChannel` would
/// have a `Output` corresponding to that `NIOAsyncChannel` type. Another example is in cases where there is
/// per-stream protocol negotiation where `Output` would be some form of `NIOProtocolNegotiationResult`.
static func initialize(
with streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Output>
rnro marked this conversation as resolved.
Show resolved Hide resolved
) -> (StreamChannelContinuation<Output>, NIOHTTP2InboundStreamChannels<Output>) {
var continuation: AsyncThrowingStream<Output, Error>.Continuation? = nil
let stream = AsyncThrowingStream { continuation = $0 }
rnro marked this conversation as resolved.
Show resolved Hide resolved
return (StreamChannelContinuation(continuation: continuation!, streamStateInitializer: streamStateInitializer), NIOHTTP2InboundStreamChannels(stream))
}

/// `yield` takes a channel, executes the stored `streamInitializer` upon it and then yields the *derived* type to
/// the wrapped `AsyncThrowingStream`.
func yield(channel: Channel) {
channel.eventLoop.assertInEventLoop()
self.streamStateInitializer(channel).whenSuccess{ streamChannel in
rnro marked this conversation as resolved.
Show resolved Hide resolved
let yieldResult = self.continuation.yield(streamChannel)
switch yieldResult {
case .enqueued:
break // success, nothing to do
case .dropped:
rnro marked this conversation as resolved.
Show resolved Hide resolved
preconditionFailure("Attempted to yield channel when AsyncThrowingStream is over capacity. This shouldn't be possible for an unbounded stream.")
case .terminated:
channel.close(mode: .all, promise: nil)
assertionFailure("Attempted to yield channel to AsyncThrowingStream in terminated state.")
default:
channel.close(mode: .all, promise: nil)
assertionFailure("Attempt to yield channel to AsyncThrowingStream failed for unhandled reason.")
rnro marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

/// `finish` marks the continuation as finished.
func finish() {
self.continuation.finish()
}

/// `finish` marks the continuation as finished with the supplied error.
func finish(throwing error: Error) {
self.continuation.finish(throwing: error)
}
}

/// `NIOHTTP2InboundStreamChannels` provides access to inbound stream channels as an `AsyncSequence`.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public struct NIOHTTP2InboundStreamChannels<Output>: AsyncSequence {
glbrntt marked this conversation as resolved.
Show resolved Hide resolved
public struct AsyncIterator: AsyncIteratorProtocol {
public typealias Element = Output

internal var iterator: AsyncThrowingStream<Output, Error>.AsyncIterator
rnro marked this conversation as resolved.
Show resolved Hide resolved

public mutating func next() async throws -> Output? {
try await self.iterator.next()
}

init(_ iterator: AsyncThrowingStream<Output, Error>.AsyncIterator) {
self.iterator = iterator
}
rnro marked this conversation as resolved.
Show resolved Hide resolved
}

public typealias Element = Output

public func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(self.asyncThrowingStream.makeAsyncIterator())
}

let asyncThrowingStream: AsyncThrowingStream<Output, Error>
rnro marked this conversation as resolved.
Show resolved Hide resolved

init(_ asyncThrowingStream: AsyncThrowingStream<Output, Error>) {
self.asyncThrowingStream = asyncThrowingStream
}
}