Skip to content

Commit

Permalink
Async stream initializers order matches existing (#415)
Browse files Browse the repository at this point in the history
Motivation:

When adding in the SPI methods for exposing async sequences of HTTP/2
streams we moved the stream initialization to a subtly different
location so that it was easier to exfiltrate the outputs of those
initialization functions (such as protocol negotiation outputs).

In some cases this broke ordering expectations.

Modifications:

Yield the (optionally wrapped) channels as initializer outputs to the async stream

Result:

Changes only exist within SPI. Async inbound stream channel
initialization now matches previous behavior.
  • Loading branch information
rnro committed Sep 11, 2023
1 parent 92afa4f commit 8275314
Show file tree
Hide file tree
Showing 12 changed files with 598 additions and 387 deletions.
37 changes: 7 additions & 30 deletions Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,15 @@ extension InlineStreamMultiplexer {
}

extension InlineStreamMultiplexer {
internal func createStreamChannel(promise: EventLoopPromise<Channel>?, _ streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Void>) {
internal func createStreamChannel(promise: EventLoopPromise<Channel>?, _ streamStateInitializer: @escaping NIOChannelInitializer) {
self.commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), promise: promise, streamStateInitializer)
}

internal func createStreamChannel(_ streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Void>) -> EventLoopFuture<Channel> {
internal func createStreamChannel(_ streamStateInitializer: @escaping NIOChannelInitializer) -> EventLoopFuture<Channel> {
self.commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), streamStateInitializer)
}

internal func createStreamChannel<Output>(_ initializer: @escaping NIOChannelInitializerWithOutput<Output>) -> EventLoopFuture<Output> {
internal func createStreamChannel<Output: Sendable>(_ initializer: @escaping NIOChannelInitializerWithOutput<Output>) -> EventLoopFuture<Output> {
self.commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), initializer)
}
}
Expand Down Expand Up @@ -207,7 +207,7 @@ extension NIOHTTP2Handler {
}

extension InlineStreamMultiplexer {
func setChannelContinuation(_ streamChannels: any ChannelContinuation) {
func setChannelContinuation(_ streamChannels: any AnyContinuation) {
self.commonStreamMultiplexer.setChannelContinuation(streamChannels)
}
}
Expand All @@ -224,46 +224,23 @@ extension NIOHTTP2Handler {
/// and `IOData`.
///
/// Outbound stream channel objects are initialized upon creation using the supplied `streamStateInitializer` which returns a type
/// `OutboundStreamOutput`. This type may be `HTTP2Frame` or changed to any other type.
/// `Output`. This type may be `HTTP2Frame` or changed to any other type.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public struct AsyncStreamMultiplexer<InboundStreamOutput> {
private let inlineStreamMultiplexer: InlineStreamMultiplexer
public let inbound: NIOHTTP2InboundStreamChannels<InboundStreamOutput>

// Cannot be created by users.
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, continuation: any ChannelContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<InboundStreamOutput>) {
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<InboundStreamOutput>) {
self.inlineStreamMultiplexer = inlineStreamMultiplexer
self.inlineStreamMultiplexer.setChannelContinuation(continuation)
self.inbound = inboundStreamChannels
}

/// Create a stream channel initialized with the provided closure
public func createStreamChannel<OutboundStreamOutput>(_ initializer: @escaping NIOChannelInitializerWithOutput<OutboundStreamOutput>) async throws -> OutboundStreamOutput {
public func createStreamChannel<Output: Sendable>(_ initializer: @escaping NIOChannelInitializerWithOutput<Output>) async throws -> Output {
return try await self.inlineStreamMultiplexer.createStreamChannel(initializer).get()
}


/// Create a stream channel initialized with the provided closure and return it wrapped within a `NIOAsyncChannel`.
///
/// - Parameters:
/// - configuration: Configuration for the ``NIOAsyncChannel`` wrapping the HTTP/2 stream channel.
/// - initializer: A callback that will be invoked to allow you to configure the
/// `ChannelPipeline` for the newly created channel.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public func createStreamChannel<Inbound, Outbound>(
configuration: NIOAsyncChannel<Inbound, Outbound>.Configuration = .init(),
initializer: @escaping NIOChannelInitializer
) async throws -> NIOAsyncChannel<Inbound, Outbound> {
return try await self.createStreamChannel { channel in
initializer(channel).flatMapThrowing { _ in
return try NIOAsyncChannel(
synchronouslyWrapping: channel,
configuration: configuration
)
}
}
}
}
}
59 changes: 50 additions & 9 deletions Sources/NIOHTTP2/HTTP2ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,15 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler {
private enum InboundStreamMultiplexerState {
case uninitializedLegacy
case uninitializedInline(StreamConfiguration, StreamInitializer, NIOHTTP2StreamDelegate?)
case uninitializedAsync(StreamConfiguration, StreamInitializerWithAnyOutput, NIOHTTP2StreamDelegate?)
case initialized(InboundStreamMultiplexer)
case deinitialized

internal var multiplexer: InboundStreamMultiplexer? {
switch self {
case .initialized(let inboundStreamMultiplexer):
return inboundStreamMultiplexer
case .uninitializedLegacy, .uninitializedInline, .deinitialized:
case .uninitializedLegacy, .uninitializedInline, .uninitializedAsync, .deinitialized:
return nil
}
}
Expand All @@ -153,6 +154,20 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler {
)
))

case .uninitializedAsync(let streamConfiguration, let inboundStreamInitializer, let streamDelegate):
self = .initialized(.inline(
InlineStreamMultiplexer(
context: context,
outboundView: .init(http2Handler: http2Handler),
mode: mode,
inboundStreamStateInitializer: .returnsAny(inboundStreamInitializer),
targetWindowSize: max(0, min(streamConfiguration.targetWindowSize, Int(Int32.max))),
streamChannelOutboundBytesHighWatermark: streamConfiguration.outboundBufferSizeHighWatermark,
streamChannelOutboundBytesLowWatermark: streamConfiguration.outboundBufferSizeLowWatermark,
streamDelegate: streamDelegate
)
))

case .initialized:
break //no-op
}
Expand Down Expand Up @@ -989,9 +1004,13 @@ extension NIOHTTP2Handler {
#if swift(>=5.7)
/// The type of all `inboundStreamInitializer` callbacks which do not need to return data.
public typealias StreamInitializer = NIOChannelInitializer
/// The type of NIO Channel initializer callbacks which need to return untyped data.
internal typealias StreamInitializerWithAnyOutput = @Sendable (Channel) -> EventLoopFuture<any Sendable>
#else
/// The type of all `inboundStreamInitializer` callbacks which need to return data.
public typealias StreamInitializer = NIOChannelInitializer
/// The type of NIO Channel initializer callbacks which need to return untyped data.
internal typealias StreamInitializerWithAnyOutput = (Channel) -> EventLoopFuture<any Sendable>
#endif

/// Creates a new ``NIOHTTP2Handler`` with a local multiplexer. (i.e. using
Expand All @@ -1014,17 +1033,39 @@ extension NIOHTTP2Handler {
streamDelegate: NIOHTTP2StreamDelegate? = nil,
inboundStreamInitializer: @escaping StreamInitializer
) {
self.init(mode: mode,
eventLoop: eventLoop,
initialSettings: connectionConfiguration.initialSettings,
headerBlockValidation: connectionConfiguration.headerBlockValidation,
contentLengthValidation: connectionConfiguration.contentLengthValidation,
maximumSequentialEmptyDataFrames: connectionConfiguration.maximumSequentialEmptyDataFrames,
maximumBufferedControlFrames: connectionConfiguration.maximumBufferedControlFrames
self.init(
mode: mode,
eventLoop: eventLoop,
initialSettings: connectionConfiguration.initialSettings,
headerBlockValidation: connectionConfiguration.headerBlockValidation,
contentLengthValidation: connectionConfiguration.contentLengthValidation,
maximumSequentialEmptyDataFrames: connectionConfiguration.maximumSequentialEmptyDataFrames,
maximumBufferedControlFrames: connectionConfiguration.maximumBufferedControlFrames
)

self.inboundStreamMultiplexerState = .uninitializedInline(streamConfiguration, inboundStreamInitializer, streamDelegate)
}

internal convenience init(
mode: ParserMode,
eventLoop: EventLoop,
connectionConfiguration: ConnectionConfiguration = .init(),
streamConfiguration: StreamConfiguration = .init(),
streamDelegate: NIOHTTP2StreamDelegate? = nil,
inboundStreamInitializerWithAnyOutput: @escaping StreamInitializerWithAnyOutput
) {
self.init(
mode: mode,
eventLoop: eventLoop,
initialSettings: connectionConfiguration.initialSettings,
headerBlockValidation: connectionConfiguration.headerBlockValidation,
contentLengthValidation: connectionConfiguration.contentLengthValidation,
maximumSequentialEmptyDataFrames: connectionConfiguration.maximumSequentialEmptyDataFrames,
maximumBufferedControlFrames: connectionConfiguration.maximumBufferedControlFrames
)
self.inboundStreamMultiplexerState = .uninitializedAsync(streamConfiguration, inboundStreamInitializerWithAnyOutput, streamDelegate)
}

/// Connection-level configuration.
///
/// The settings that will be used when establishing the connection. These will be sent to the peer as part of the
Expand Down Expand Up @@ -1101,7 +1142,7 @@ extension NIOHTTP2Handler {
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal func syncAsyncStreamMultiplexer<Output>(continuation: any ChannelContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<Output>) throws -> AsyncStreamMultiplexer<Output> {
internal func syncAsyncStreamMultiplexer<Output: Sendable>(continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<Output>) throws -> AsyncStreamMultiplexer<Output> {
self.eventLoop!.preconditionInEventLoop()

switch self.inboundStreamMultiplexer {
Expand Down

0 comments on commit 8275314

Please sign in to comment.