Skip to content

Commit

Permalink
Generic helpers for HTTP/2 async pipelines (#401)
Browse files Browse the repository at this point in the history
* Store HTTP/2 inbound streams as async sequence

Motivation:

In preparation for exposing APIs which surface HTTP/2 connections and
streams using structured concurrency this PR introduces a store for
inbound HTTP/2 streams.

Modifications:

Define types and methods for storing generic types which wrap `Channels`
corresponding to inbound HTTP/2 streams in an async sequence.

Result:

The new types are not yet exposed, this work introduces part of the
framework for future functionality.

* Expose (as spi) helpers for H2 async pipelines

Motivation:

This PR is the first step in exposing APIs which surface HTTP/2 connections and
streams using structured concurrency. This PR exposes the most abstract
spelling of this concept, assuming no particular types/forms for the
types involved in the stream channel types.

Modifications:

* Store a generic `streamInitializer` on the common inbound initializer
  which is used to initialize inbound streams before yielding them to
the continuation of streams.
* Expose pipeline configuration functions which assume HTTP/2 but
  nothing else about sream channel types.
* Provide internal functions for creating streams without configuring
  them. Configuring leads to activation so it can be helpful to allow
that step to be performed manually once any provided initialization
closures have beenn executed.

Result:

Adopters of the new SPI should be able to create outbound and deal with
inbound HTTP/2 stream channels using async streams.

Outside of SPI there should be no changes.
  • Loading branch information
rnro committed Jul 3, 2023
1 parent 87e96ed commit 83c04db
Show file tree
Hide file tree
Showing 17 changed files with 787 additions and 100 deletions.
1 change: 1 addition & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ let package = Package(
name: "NIOHTTP2Tests",
dependencies: [
"NIOHTTP2",
.product(name: "NIOConcurrencyHelpers", package: "swift-nio"),
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "NIOEmbedded", package: "swift-nio"),
.product(name: "NIOHTTP1", package: "swift-nio"),
Expand Down
42 changes: 42 additions & 0 deletions Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ extension InlineStreamMultiplexer {
internal func createStreamChannel(_ streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Void>) -> EventLoopFuture<Channel> {
self.commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), streamStateInitializer)
}

internal func createStreamChannel<Output>(_ initializer: @escaping NIOHTTP2Handler.StreamInitializerWithOutput<Output>) -> EventLoopFuture<Output> {
self.commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), initializer)
}
}

extension NIOHTTP2Handler {
Expand Down Expand Up @@ -201,3 +205,41 @@ extension NIOHTTP2Handler {
}
}
}

extension InlineStreamMultiplexer {
func setChannelContinuation(_ streamChannels: any ChannelContinuation) {
self.commonStreamMultiplexer.setChannelContinuation(streamChannels)
}
}

extension NIOHTTP2Handler {
/// A variant of `NIOHTTP2Handler.StreamMultiplexer` which creates a child channel for each HTTP/2 stream and
/// provides access to inbound HTTP/2 streams.
///
/// In general in NIO applications it is helpful to consider each HTTP/2 stream as an
/// independent stream of HTTP/2 frames. This multiplexer achieves this by creating a
/// number of in-memory `HTTP2StreamChannel` objects, one for each stream. These operate
/// on ``HTTP2Frame/FramePayload`` objects as their base communication
/// atom, as opposed to the regular NIO `SelectableChannel` objects which use `ByteBuffer`
/// and `IOData`.
///
/// Outbound stream channel objects are initialized upon creation using the supplied `streamStateInitializer` which returns a type
/// `OutboundStreamOutput`. This type may be `HTTP2Frame` or changed to any other type.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public struct AsyncStreamMultiplexer<InboundStreamOutput> {
private let inlineStreamMultiplexer: InlineStreamMultiplexer
public let inbound: NIOHTTP2InboundStreamChannels<InboundStreamOutput>

// Cannot be created by users.
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, continuation: any ChannelContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<InboundStreamOutput>) {
self.inlineStreamMultiplexer = inlineStreamMultiplexer
self.inlineStreamMultiplexer.setChannelContinuation(continuation)
self.inbound = inboundStreamChannels
}

public func createStreamChannel<OutboundStreamOutput>(_ initializer: @escaping NIOHTTP2Handler.StreamInitializerWithOutput<OutboundStreamOutput>) async throws -> OutboundStreamOutput {
return try await self.inlineStreamMultiplexer.createStreamChannel(initializer).get()
}
}
}
16 changes: 16 additions & 0 deletions Sources/NIOHTTP2/HTTP2ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -990,9 +990,13 @@ extension NIOHTTP2Handler {
#if swift(>=5.7)
/// The type of all `inboundStreamInitializer` callbacks.
public typealias StreamInitializer = @Sendable (Channel) -> EventLoopFuture<Void>
/// The type of `inboundStreamInitializer` callbacks which return non-void results.
public typealias StreamInitializerWithOutput<Output> = @Sendable (Channel) -> EventLoopFuture<Output>
#else
/// The type of all `inboundStreamInitializer` callbacks.
public typealias StreamInitializer = (Channel) -> EventLoopFuture<Void>
/// The type of `inboundStreamInitializer` callbacks which return non-void results.
public typealias StreamInitializerWithOutput<Output> = (Channel) -> EventLoopFuture<Output>
#endif

/// Creates a new ``NIOHTTP2Handler`` with a local multiplexer. (i.e. using
Expand Down Expand Up @@ -1076,4 +1080,16 @@ extension NIOHTTP2Handler {
throw NIOHTTP2Errors.missingMultiplexer()
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal func syncAsyncStreamMultiplexer<Output>(continuation: any ChannelContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<Output>) throws -> AsyncStreamMultiplexer<Output> {
self.eventLoop!.preconditionInEventLoop()

switch self.inboundStreamMultiplexer {
case let .some(.inline(multiplexer)):
return AsyncStreamMultiplexer(multiplexer, continuation: continuation, inboundStreamChannels: inboundStreamChannels)
case .some(.legacy), .none:
throw NIOHTTP2Errors.missingMultiplexer()
}
}
}
257 changes: 240 additions & 17 deletions Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,16 @@ internal class HTTP2CommonInboundStreamMultiplexer {
private var isReading = false
private var flushPending = false

init(mode: NIOHTTP2Handler.ParserMode, channel: Channel, inboundStreamStateInitializer: MultiplexerAbstractChannel.InboundStreamStateInitializer, targetWindowSize: Int, streamChannelOutboundBytesHighWatermark: Int, streamChannelOutboundBytesLowWatermark: Int) {
var streamChannelContinuation: (any ChannelContinuation)?

init(
mode: NIOHTTP2Handler.ParserMode,
channel: Channel,
inboundStreamStateInitializer: MultiplexerAbstractChannel.InboundStreamStateInitializer,
targetWindowSize: Int,
streamChannelOutboundBytesHighWatermark: Int,
streamChannelOutboundBytesLowWatermark: Int
) {
self.channel = channel
self.inboundStreamStateInitializer = inboundStreamStateInitializer
self.targetWindowSize = targetWindowSize
Expand Down Expand Up @@ -93,6 +102,13 @@ extension HTTP2CommonInboundStreamMultiplexer {
)

self.streams[streamID] = channel

// If we have an async sequence of inbound stream channels yield the channel to it
// This also implicitly performs the stream initialization step.
// Note that in this case the API is constructed such that `self.inboundStreamStateInitializer`
// does no actual work.
self.streamChannelContinuation?.yield(channel: channel.baseChannel)

channel.configureInboundStream(initializer: self.inboundStreamStateInitializer)
channel.receiveInboundFrame(frame)

Expand Down Expand Up @@ -178,6 +194,8 @@ extension HTTP2CommonInboundStreamMultiplexer {
for channel in self.pendingStreams.values {
channel.receiveStreamClosed(nil)
}
// there cannot be any more inbound streams now that the connection channel is inactive
self.streamChannelContinuation?.finish()
}

internal func propagateChannelWritabilityChanged(context: ChannelHandlerContext) {
Expand Down Expand Up @@ -265,31 +283,97 @@ extension HTTP2CommonInboundStreamMultiplexer {
}

extension HTTP2CommonInboundStreamMultiplexer {
internal func createStreamChannel(multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer, promise: EventLoopPromise<Channel>?, _ streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Void>) {
self.channel.eventLoop.execute {
let channel = MultiplexerAbstractChannel(
allocator: self.channel.allocator,
parent: self.channel,
multiplexer: multiplexer,
streamID: nil,
targetWindowSize: Int32(self.targetWindowSize),
outboundBytesHighWatermark: self.streamChannelOutboundBytesHighWatermark,
outboundBytesLowWatermark: self.streamChannelOutboundBytesLowWatermark,
inboundStreamStateInitializer: .excludesStreamID(nil)
)
self.pendingStreams[channel.channelID] = channel
channel.configure(initializer: streamStateInitializer, userPromise: promise)
internal func _createStreamChannel<Output>(
_ multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
_ promise: EventLoopPromise<Output>?,
_ streamStateInitializer: @escaping NIOHTTP2Handler.StreamInitializerWithOutput<Output>
) {
self.channel.eventLoop.assertInEventLoop()

let channel = MultiplexerAbstractChannel(
allocator: self.channel.allocator,
parent: self.channel,
multiplexer: multiplexer,
streamID: nil,
targetWindowSize: Int32(self.targetWindowSize),
outboundBytesHighWatermark: self.streamChannelOutboundBytesHighWatermark,
outboundBytesLowWatermark: self.streamChannelOutboundBytesLowWatermark,
inboundStreamStateInitializer: .excludesStreamID(nil)
)
self.pendingStreams[channel.channelID] = channel
channel.configure(initializer: streamStateInitializer, userPromise: promise)
}

internal func createStreamChannel<Output>(
multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
promise: EventLoopPromise<Output>?,
_ streamStateInitializer: @escaping NIOHTTP2Handler.StreamInitializerWithOutput<Output>
) {
if self.channel.eventLoop.inEventLoop {
self._createStreamChannel(multiplexer, promise, streamStateInitializer)
} else {
self.channel.eventLoop.execute {
self._createStreamChannel(multiplexer, promise, streamStateInitializer)
}
}
}

internal func createStreamChannel(multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer, _ streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Void>) -> EventLoopFuture<Channel> {
internal func createStreamChannel<Output>(
multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
_ streamStateInitializer: @escaping NIOHTTP2Handler.StreamInitializerWithOutput<Output>
) -> EventLoopFuture<Output> {
let promise = self.channel.eventLoop.makePromise(of: Output.self)
self.createStreamChannel(multiplexer: multiplexer, promise: promise, streamStateInitializer)
return promise.futureResult
}

internal func _createStreamChannel(
_ multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
_ promise: EventLoopPromise<Channel>?,
_ streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Void>
) {
let channel = MultiplexerAbstractChannel(
allocator: self.channel.allocator,
parent: self.channel,
multiplexer: multiplexer,
streamID: nil,
targetWindowSize: Int32(self.targetWindowSize),
outboundBytesHighWatermark: self.streamChannelOutboundBytesHighWatermark,
outboundBytesLowWatermark: self.streamChannelOutboundBytesLowWatermark,
inboundStreamStateInitializer: .excludesStreamID(nil)
)
self.pendingStreams[channel.channelID] = channel
channel.configure(initializer: streamStateInitializer, userPromise: promise)
}

internal func createStreamChannel(
multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
promise: EventLoopPromise<Channel>?,
_ streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Void>
) {
if self.channel.eventLoop.inEventLoop {
self._createStreamChannel(multiplexer, promise, streamStateInitializer)
} else {
self.channel.eventLoop.execute {
self._createStreamChannel(multiplexer, promise, streamStateInitializer)
}
}
}

internal func createStreamChannel(
multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
_ streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Void>) -> EventLoopFuture<Channel> {
let promise = self.channel.eventLoop.makePromise(of: Channel.self)
self.createStreamChannel(multiplexer: multiplexer, promise: promise, streamStateInitializer)
return promise.futureResult
}

@available(*, deprecated, message: "The signature of 'streamStateInitializer' has changed to '(Channel) -> EventLoopFuture<Void>'")
internal func createStreamChannel(multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer, promise: EventLoopPromise<Channel>?, _ streamStateInitializer: @escaping (Channel, HTTP2StreamID) -> EventLoopFuture<Void>) {
internal func createStreamChannel(
multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
promise: EventLoopPromise<Channel>?,
_ streamStateInitializer: @escaping (Channel, HTTP2StreamID) -> EventLoopFuture<Void>
) {
self.channel.eventLoop.execute {
let streamID = self.nextStreamID()
let channel = MultiplexerAbstractChannel(
Expand Down Expand Up @@ -321,3 +405,142 @@ extension HTTP2CommonInboundStreamMultiplexer {
}
}
}

extension HTTP2CommonInboundStreamMultiplexer {
func setChannelContinuation(_ streamChannels: any ChannelContinuation) {
self.channel.eventLoop.assertInEventLoop()
self.streamChannelContinuation = streamChannels
}
}

/// `ChannelContinuation` is used to generic async-sequence-like objects to deal with `Channel`s. This is so that they may be held
/// by the `HTTP2ChannelHandler` without causing it to become generic itself.
internal protocol ChannelContinuation {
func yield(channel: Channel)
func finish()
func finish(throwing error: Error)
}


/// `StreamChannelContinuation` is a wrapper for a generic `AsyncThrowingStream` which holds the inbound HTTP2 stream channels.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
struct StreamChannelContinuation<Output>: ChannelContinuation {
private var continuation: AsyncThrowingStream<Output, Error>.Continuation
private let inboundStreamInititializer: NIOHTTP2Handler.StreamInitializerWithOutput<Output>

private init(
continuation: AsyncThrowingStream<Output, Error>.Continuation,
inboundStreamInititializer: @escaping NIOHTTP2Handler.StreamInitializerWithOutput<Output>
) {
self.continuation = continuation
self.inboundStreamInititializer = inboundStreamInititializer
}

/// `initialize` creates a new `StreamChannelContinuation` object and returns it along with its backing `AsyncThrowingStream`.
/// The `StreamChannelContinuation` provides access to the inbound HTTP2 stream channels.
///
/// - Parameters:
/// - inboundStreamInititializer: A closure which initializes the newly-created inbound stream channel and returns a generic.
/// The returned type corresponds to the output of the channel once the operations in the initializer have been performed.
/// For example an `inboundStreamInititializer` which inserts handlers before wrapping the channel in a `NIOAsyncChannel` would
/// have a `Output` corresponding to that `NIOAsyncChannel` type. Another example is in cases where there is
/// per-stream protocol negotiation where `Output` would be some form of `NIOProtocolNegotiationResult`.
static func initialize(
with inboundStreamInititializer: @escaping NIOHTTP2Handler.StreamInitializerWithOutput<Output>
) -> (StreamChannelContinuation<Output>, NIOHTTP2InboundStreamChannels<Output>) {
let (stream, continuation) = AsyncThrowingStream.makeStream(of: Output.self)
return (StreamChannelContinuation(continuation: continuation, inboundStreamInititializer: inboundStreamInititializer), NIOHTTP2InboundStreamChannels(stream))
}

/// `yield` takes a channel, executes the stored `streamInitializer` upon it and then yields the *derived* type to
/// the wrapped `AsyncThrowingStream`.
func yield(channel: Channel) {
channel.eventLoop.assertInEventLoop()
self.inboundStreamInititializer(channel).whenSuccess { output in
let yieldResult = self.continuation.yield(output)
switch yieldResult {
case .enqueued:
break // success, nothing to do
case .dropped:
preconditionFailure("Attempted to yield channel when AsyncThrowingStream is over capacity. This shouldn't be possible for an unbounded stream.")
case .terminated:
channel.close(mode: .all, promise: nil)
preconditionFailure("Attempted to yield channel to AsyncThrowingStream in terminated state.")
default:
channel.close(mode: .all, promise: nil)
preconditionFailure("Attempt to yield channel to AsyncThrowingStream failed for unhandled reason.")
}
}
}

/// `finish` marks the continuation as finished.
func finish() {
self.continuation.finish()
}

/// `finish` marks the continuation as finished with the supplied error.
func finish(throwing error: Error) {
self.continuation.finish(throwing: error)
}
}

/// `NIOHTTP2InboundStreamChannels` provides access to inbound stream channels as an `AsyncSequence`.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public struct NIOHTTP2InboundStreamChannels<Output>: AsyncSequence {
public struct AsyncIterator: AsyncIteratorProtocol {
public typealias Element = Output

private var iterator: AsyncThrowingStream<Output, Error>.AsyncIterator

init(_ iterator: AsyncThrowingStream<Output, Error>.AsyncIterator) {
self.iterator = iterator
}

public mutating func next() async throws -> Output? {
try await self.iterator.next()
}
}

public typealias Element = Output

private let asyncThrowingStream: AsyncThrowingStream<Output, Error>

init(_ asyncThrowingStream: AsyncThrowingStream<Output, Error>) {
self.asyncThrowingStream = asyncThrowingStream
}

public func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(self.asyncThrowingStream.makeAsyncIterator())
}
}

@available(*, unavailable)
extension NIOHTTP2InboundStreamChannels.AsyncIterator: Sendable {}

#if swift(>=5.7)
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension NIOHTTP2InboundStreamChannels: Sendable where Output: Sendable {}
#else
// This wasn't marked as sendable in 5.6 however it should be fine
// https://forums.swift.org/t/so-is-asyncstream-sendable-or-not/53148/2
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension NIOHTTP2InboundStreamChannels: @unchecked Sendable where Output: Sendable {}
#endif


#if swift(<5.9)
// this should be available in the std lib from 5.9 onwards
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension AsyncThrowingStream {
public static func makeStream(
of elementType: Element.Type = Element.self,
throwing failureType: Failure.Type = Failure.self,
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
) -> (stream: AsyncThrowingStream<Element, Failure>, continuation: AsyncThrowingStream<Element, Failure>.Continuation) where Failure == Error {
var continuation: AsyncThrowingStream<Element, Failure>.Continuation!
let stream = AsyncThrowingStream<Element, Failure>(bufferingPolicy: limit) { continuation = $0 }
return (stream: stream, continuation: continuation!)
}
}
#endif

0 comments on commit 83c04db

Please sign in to comment.