Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async stream initializers order matches existing #415

Merged
merged 8 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 7 additions & 30 deletions Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,15 @@ extension InlineStreamMultiplexer {
}

extension InlineStreamMultiplexer {
internal func createStreamChannel(promise: EventLoopPromise<Channel>?, _ streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Void>) {
internal func createStreamChannel(promise: EventLoopPromise<Channel>?, _ streamStateInitializer: @escaping NIOChannelInitializer) {
self.commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), promise: promise, streamStateInitializer)
}

internal func createStreamChannel(_ streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Void>) -> EventLoopFuture<Channel> {
internal func createStreamChannel(_ streamStateInitializer: @escaping NIOChannelInitializer) -> EventLoopFuture<Channel> {
self.commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), streamStateInitializer)
}

internal func createStreamChannel<Output>(_ initializer: @escaping NIOChannelInitializerWithOutput<Output>) -> EventLoopFuture<Output> {
internal func createStreamChannel<Output: Sendable>(_ initializer: @escaping NIOChannelInitializerWithOutput<Output>) -> EventLoopFuture<Output> {
self.commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), initializer)
}
}
Expand Down Expand Up @@ -207,7 +207,7 @@ extension NIOHTTP2Handler {
}

extension InlineStreamMultiplexer {
func setChannelContinuation(_ streamChannels: any ChannelContinuation) {
func setChannelContinuation(_ streamChannels: any AnyContinuation) {
self.commonStreamMultiplexer.setChannelContinuation(streamChannels)
}
}
Expand All @@ -224,46 +224,23 @@ extension NIOHTTP2Handler {
/// and `IOData`.
///
/// Outbound stream channel objects are initialized upon creation using the supplied `streamStateInitializer` which returns a type
/// `OutboundStreamOutput`. This type may be `HTTP2Frame` or changed to any other type.
/// `Output`. This type may be `HTTP2Frame` or changed to any other type.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public struct AsyncStreamMultiplexer<InboundStreamOutput> {
private let inlineStreamMultiplexer: InlineStreamMultiplexer
public let inbound: NIOHTTP2InboundStreamChannels<InboundStreamOutput>

// Cannot be created by users.
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, continuation: any ChannelContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<InboundStreamOutput>) {
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<InboundStreamOutput>) {
self.inlineStreamMultiplexer = inlineStreamMultiplexer
self.inlineStreamMultiplexer.setChannelContinuation(continuation)
self.inbound = inboundStreamChannels
}

/// Create a stream channel initialized with the provided closure
public func createStreamChannel<OutboundStreamOutput>(_ initializer: @escaping NIOChannelInitializerWithOutput<OutboundStreamOutput>) async throws -> OutboundStreamOutput {
public func createStreamChannel<Output: Sendable>(_ initializer: @escaping NIOChannelInitializerWithOutput<Output>) async throws -> Output {
return try await self.inlineStreamMultiplexer.createStreamChannel(initializer).get()
}


/// Create a stream channel initialized with the provided closure and return it wrapped within a `NIOAsyncChannel`.
///
/// - Parameters:
/// - configuration: Configuration for the ``NIOAsyncChannel`` wrapping the HTTP/2 stream channel.
/// - initializer: A callback that will be invoked to allow you to configure the
/// `ChannelPipeline` for the newly created channel.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public func createStreamChannel<Inbound, Outbound>(
configuration: NIOAsyncChannel<Inbound, Outbound>.Configuration = .init(),
initializer: @escaping NIOChannelInitializer
) async throws -> NIOAsyncChannel<Inbound, Outbound> {
return try await self.createStreamChannel { channel in
initializer(channel).flatMapThrowing { _ in
return try NIOAsyncChannel(
synchronouslyWrapping: channel,
configuration: configuration
)
}
}
}
}
}
59 changes: 50 additions & 9 deletions Sources/NIOHTTP2/HTTP2ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,15 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler {
private enum InboundStreamMultiplexerState {
case uninitializedLegacy
case uninitializedInline(StreamConfiguration, StreamInitializer, NIOHTTP2StreamDelegate?)
case uninitializedAsync(StreamConfiguration, StreamInitializerWithAnyOutput, NIOHTTP2StreamDelegate?)
case initialized(InboundStreamMultiplexer)
case deinitialized

internal var multiplexer: InboundStreamMultiplexer? {
switch self {
case .initialized(let inboundStreamMultiplexer):
return inboundStreamMultiplexer
case .uninitializedLegacy, .uninitializedInline, .deinitialized:
case .uninitializedLegacy, .uninitializedInline, .uninitializedAsync, .deinitialized:
return nil
}
}
Expand All @@ -153,6 +154,20 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler {
)
))

case .uninitializedAsync(let streamConfiguration, let inboundStreamInitializer, let streamDelegate):
self = .initialized(.inline(
InlineStreamMultiplexer(
context: context,
outboundView: .init(http2Handler: http2Handler),
mode: mode,
inboundStreamStateInitializer: .returnsAny(inboundStreamInitializer),
targetWindowSize: max(0, min(streamConfiguration.targetWindowSize, Int(Int32.max))),
streamChannelOutboundBytesHighWatermark: streamConfiguration.outboundBufferSizeHighWatermark,
streamChannelOutboundBytesLowWatermark: streamConfiguration.outboundBufferSizeLowWatermark,
streamDelegate: streamDelegate
)
))

case .initialized:
break //no-op
}
Expand Down Expand Up @@ -989,9 +1004,13 @@ extension NIOHTTP2Handler {
#if swift(>=5.7)
/// The type of all `inboundStreamInitializer` callbacks which do not need to return data.
public typealias StreamInitializer = NIOChannelInitializer
/// The type of NIO Channel initializer callbacks which need to return untyped data.
internal typealias StreamInitializerWithAnyOutput = @Sendable (Channel) -> EventLoopFuture<any Sendable>
#else
/// The type of all `inboundStreamInitializer` callbacks which need to return data.
public typealias StreamInitializer = NIOChannelInitializer
/// The type of NIO Channel initializer callbacks which need to return untyped data.
internal typealias StreamInitializerWithAnyOutput = (Channel) -> EventLoopFuture<any Sendable>
#endif

/// Creates a new ``NIOHTTP2Handler`` with a local multiplexer. (i.e. using
Expand All @@ -1014,17 +1033,39 @@ extension NIOHTTP2Handler {
streamDelegate: NIOHTTP2StreamDelegate? = nil,
inboundStreamInitializer: @escaping StreamInitializer
) {
self.init(mode: mode,
eventLoop: eventLoop,
initialSettings: connectionConfiguration.initialSettings,
headerBlockValidation: connectionConfiguration.headerBlockValidation,
contentLengthValidation: connectionConfiguration.contentLengthValidation,
maximumSequentialEmptyDataFrames: connectionConfiguration.maximumSequentialEmptyDataFrames,
maximumBufferedControlFrames: connectionConfiguration.maximumBufferedControlFrames
self.init(
mode: mode,
eventLoop: eventLoop,
initialSettings: connectionConfiguration.initialSettings,
headerBlockValidation: connectionConfiguration.headerBlockValidation,
contentLengthValidation: connectionConfiguration.contentLengthValidation,
maximumSequentialEmptyDataFrames: connectionConfiguration.maximumSequentialEmptyDataFrames,
maximumBufferedControlFrames: connectionConfiguration.maximumBufferedControlFrames
)

self.inboundStreamMultiplexerState = .uninitializedInline(streamConfiguration, inboundStreamInitializer, streamDelegate)
}

internal convenience init(
mode: ParserMode,
eventLoop: EventLoop,
connectionConfiguration: ConnectionConfiguration = .init(),
streamConfiguration: StreamConfiguration = .init(),
streamDelegate: NIOHTTP2StreamDelegate? = nil,
inboundStreamInitializerWithAnyOutput: @escaping StreamInitializerWithAnyOutput
) {
self.init(
mode: mode,
eventLoop: eventLoop,
initialSettings: connectionConfiguration.initialSettings,
headerBlockValidation: connectionConfiguration.headerBlockValidation,
contentLengthValidation: connectionConfiguration.contentLengthValidation,
maximumSequentialEmptyDataFrames: connectionConfiguration.maximumSequentialEmptyDataFrames,
maximumBufferedControlFrames: connectionConfiguration.maximumBufferedControlFrames
)
self.inboundStreamMultiplexerState = .uninitializedAsync(streamConfiguration, inboundStreamInitializerWithAnyOutput, streamDelegate)
}

/// Connection-level configuration.
///
/// The settings that will be used when establishing the connection. These will be sent to the peer as part of the
Expand Down Expand Up @@ -1101,7 +1142,7 @@ extension NIOHTTP2Handler {
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal func syncAsyncStreamMultiplexer<Output>(continuation: any ChannelContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<Output>) throws -> AsyncStreamMultiplexer<Output> {
internal func syncAsyncStreamMultiplexer<Output: Sendable>(continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<Output>) throws -> AsyncStreamMultiplexer<Output> {
self.eventLoop!.preconditionInEventLoop()

switch self.inboundStreamMultiplexer {
Expand Down