Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP/2 pipeline NIOAsyncChannel pipeline config #403

Merged
merged 3 commits into from
Jul 4, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//
//===----------------------------------------------------------------------===//

import NIOCore
@_spi(AsyncChannel) import NIOCore

internal struct InlineStreamMultiplexer {
private let context: ChannelHandlerContext
Expand Down Expand Up @@ -238,8 +238,36 @@ extension NIOHTTP2Handler {
self.inbound = inboundStreamChannels
}

/// Create a stream channel initialized with the provided closure
public func createStreamChannel<OutboundStreamOutput>(_ initializer: @escaping NIOHTTP2Handler.StreamInitializerWithOutput<OutboundStreamOutput>) async throws -> OutboundStreamOutput {
return try await self.inlineStreamMultiplexer.createStreamChannel(initializer).get()
}


/// Create a stream channel initialized with the provided closure and return it wrapped within a `NIOAsyncChannel`.
/// - parameters:
/// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for outbound stream channels.
rnro marked this conversation as resolved.
Show resolved Hide resolved
/// This type must match the `InboundOut` type of the final handler added to the stream channel by the `initializer`
/// or `HTTP2Frame.Payload` if there are none.
rnro marked this conversation as resolved.
Show resolved Hide resolved
/// - streamOutboundType: The ``NIOAsyncChannel/outboundWriter`` message type for outbound stream channels.
/// This type must match the `OutboundIn` type of the final handler added to the stream channel by the `initializer`
/// or `HTTP2Frame.Payload` if there are none.
rnro marked this conversation as resolved.
Show resolved Hide resolved
/// - initializer: A callback that will be invoked to allow you to configure the
/// `ChannelPipeline` for the newly created channel.
public func createStreamChannel<Inbound, Outbound>(
rnro marked this conversation as resolved.
Show resolved Hide resolved
inboundType: Inbound.Type = Inbound.self,
outboundType: Outbound.Type = Outbound.self,
rnro marked this conversation as resolved.
Show resolved Hide resolved
initializer: @escaping NIOHTTP2Handler.StreamInitializer
) async throws -> NIOAsyncChannel<Inbound, Outbound> {
return try await self.createStreamChannel { channel in
initializer(channel).flatMapThrowing { _ in
return try NIOAsyncChannel(
synchronouslyWrapping: channel,
inboundType: Inbound.self,
outboundType: Outbound.self
)
}
}
}
}
}
4 changes: 4 additions & 0 deletions Sources/NIOHTTP2/HTTP2ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -990,11 +990,15 @@ extension NIOHTTP2Handler {
#if swift(>=5.7)
/// The type of all `inboundStreamInitializer` callbacks.
public typealias StreamInitializer = @Sendable (Channel) -> EventLoopFuture<Void>
/// The type of all `connectionInitializer` callbacks.
public typealias ConnectionInitializer = @Sendable (Channel) -> EventLoopFuture<Void>
/// The type of `inboundStreamInitializer` callbacks which return non-void results.
public typealias StreamInitializerWithOutput<Output> = @Sendable (Channel) -> EventLoopFuture<Output>
#else
/// The type of all `inboundStreamInitializer` callbacks.
public typealias StreamInitializer = (Channel) -> EventLoopFuture<Void>
/// The type of all `connectionInitializer` callbacks.
public typealias ConnectionInitializer = (Channel) -> EventLoopFuture<Void>
/// The type of `inboundStreamInitializer` callbacks which return non-void results.
public typealias StreamInitializerWithOutput<Output> = (Channel) -> EventLoopFuture<Output>
#endif
Expand Down
205 changes: 200 additions & 5 deletions Sources/NIOHTTP2/HTTP2PipelineHelpers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//
//===----------------------------------------------------------------------===//

import NIOCore
@_spi(AsyncChannel) import NIOCore
import NIOTLS

/// The supported ALPN protocol tokens for NIO's HTTP/2 abstraction layer.
Expand Down Expand Up @@ -212,7 +212,8 @@ extension Channel {
///
/// In general this is not entirely useful by itself, as HTTP/2 is a negotiated protocol. This helper does not handle negotiation.
/// Instead, this simply adds the handler required to speak HTTP/2 after negotiation has completed, or when agreed by prior knowledge.
/// Use this function to setup a HTTP/2 pipeline if you wish to use async sequence abstractions over inbound and outbound streams, as it allows that pipeline to evolve without breaking your code.
/// Use this function to setup a HTTP/2 pipeline if you wish to use async sequence abstractions over inbound and outbound streams.
/// Using this rather than implementing a similar function yourself allows that pipeline to evolve without breaking your code.
///
/// - parameters:
/// - mode: The mode this pipeline will operate in, server or client.
Expand All @@ -222,7 +223,8 @@ extension Channel {
/// - streamDelegate: The delegate to be notified in the event of stream creation and close.
/// - position: The position in the pipeline into which to insert this handler.
/// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream.
/// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can be used to initiate new streams and iterate over inbound HTTP/2 stream channels.
/// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can
/// be used to initiate new streams and iterate over inbound HTTP/2 stream channels.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public func configureAsyncHTTP2Pipeline<Output>(
Expand Down Expand Up @@ -258,6 +260,73 @@ extension Channel {
}
}

/// Configures a `ChannelPipeline` to speak HTTP/2 and wraps any created inbound stream channels in `NIOAsyncChannel`s.
///
/// In general this is not entirely useful by itself, as HTTP/2 is a negotiated protocol. This helper does not handle negotiation.
/// Instead, this simply adds the handler required to speak HTTP/2 after negotiation has completed, or when agreed by prior knowledge.
/// Use this function to setup a HTTP/2 pipeline if you wish to use `NIOAsyncChannel`s to provide async sequence abstractions
/// over inbound and outbound streams whilst handling back-pressure.
///
/// Using this rather than implementing a similar function yourself allows that pipeline to evolve without breaking your code.
///
/// - parameters:
/// - mode: The mode this pipeline will operate in, server or client.
/// - connectionConfiguration: The settings that will be used when establishing the connection. These will be sent to the peer as part of the
/// handshake.
/// - streamConfiguration: The settings that will be used when establishing new streams. These mainly pertain to flow control.
/// - streamDelegate: The delegate to be notified in the event of stream creation and close.
/// - position: The position in the pipeline into which to insert the `NIOHTTP2Handler`.
/// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for inbound stream channels.
/// This type must match the `InboundOut` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or `HTTP2Frame.Payload` if there are none.
/// - streamOutboundType: The ``NIOAsyncChannel/outboundWriter`` message type for inbound stream channels.
/// This type must match the `OutboundIn` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or `HTTP2Frame.Payload` if there are none.
/// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream.
/// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline which wraps
/// inbound streams as `NIOAsyncChannels` after initialization. The multiplexer can be used to initiate new streams
/// and iterate over inbound HTTP/2 stream channels.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public func configureAsyncHTTP2Pipeline<StreamInbound, StreamOutbound>(
rnro marked this conversation as resolved.
Show resolved Hide resolved
mode: NIOHTTP2Handler.ParserMode,
connectionConfiguration: NIOHTTP2Handler.ConnectionConfiguration,
streamConfiguration: NIOHTTP2Handler.StreamConfiguration,
streamDelegate: NIOHTTP2StreamDelegate? = nil,
position: ChannelPipeline.Position = .last,
streamInboundType: StreamInbound.Type,
streamOutboundType: StreamOutbound.Type,
inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer
) throws -> EventLoopFuture<NIOHTTP2Handler.AsyncStreamMultiplexer<NIOAsyncChannel<StreamInbound, StreamOutbound>>> {
if self.eventLoop.inEventLoop {
return self.eventLoop.makeCompletedFuture {
return try self.pipeline.syncOperations.configureAsyncHTTP2Pipeline(
mode: mode,
connectionConfiguration: connectionConfiguration,
streamConfiguration: streamConfiguration,
streamDelegate: streamDelegate,
position: position,
streamInboundType: streamInboundType,
streamOutboundType: streamOutboundType,
inboundStreamInitializer: inboundStreamInitializer
)
}
} else {
return self.eventLoop.submit {
return try self.pipeline.syncOperations.configureAsyncHTTP2Pipeline(
mode: mode,
connectionConfiguration: connectionConfiguration,
streamConfiguration: streamConfiguration,
streamDelegate: streamDelegate,
position: position,
streamInboundType: streamInboundType,
streamOutboundType: streamOutboundType,
inboundStreamInitializer: inboundStreamInitializer
)
}
}
}

/// Configures a channel to perform a HTTP/2 secure upgrade.
///
/// HTTP/2 secure upgrade uses the Application Layer Protocol Negotiation TLS extension to
Expand Down Expand Up @@ -418,6 +487,76 @@ extension Channel {
}.map { _ in () }
}
}

/// Configures a `ChannelPipeline` to speak HTTP/2 and wraps both the connection channel and any
/// created inbound stream channels in `NIOAsyncChannel`s.
///
/// In general this is not entirely useful by itself, as HTTP/2 is a negotiated protocol. This helper does not handle negotiation.
/// Instead, this simply adds the handler required to speak HTTP/2 after negotiation has completed, or when agreed by prior knowledge.
/// Use this function to setup a HTTP/2 pipeline if you wish to use `NIOAsyncChannel`s to provide async sequence abstractions
/// over inbound and outbound streams whilst handling back-pressure.
///
/// Using this rather than implementing a similar function yourself allows that pipeline to evolve without breaking your code.
///
/// - parameters:
/// - mode: The mode this pipeline will operate in, server or client.
/// - connectionConfiguration: The settings that will be used when establishing the connection. These will be sent to the peer as part of the
/// handshake.
/// - streamConfiguration: The settings that will be used when establishing new streams. These mainly pertain to flow control.
/// - streamDelegate: The delegate to be notified in the event of stream creation and close.
/// - position: The position in the pipeline into which to insert the `NIOHTTP2Handler`.
/// - connectionInboundType: The ``NIOAsyncChannel/inboundStream`` message type for the HTTP/2 connection channel.
/// This type must match the `InboundOut` type of the final handler in the connection channel.
/// - connectionOutboundType: The ``NIOAsyncChannel/outboundWriter`` message type for the HTTP/2 connection channel.
/// This type must match the `OutboundIn` type of the final handler in the connection channel.
/// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for inbound stream channels.
/// This type must match the `InboundOut` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or `HTTP2Frame.Payload` if there are none.
/// - streamOutboundType: The ``NIOAsyncChannel/outboundWriter`` message type for inbound stream channels.
/// This type must match the `OutboundIn` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or `HTTP2Frame.Payload` if there are none.
/// - connectionInitializer: A closure that will be called once the `NIOHTTP2Handler` has been added to the pipeline.
/// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream.
/// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline which wraps
/// inbound streams as `NIOAsyncChannels` after initialization. The multiplexer can be used to initiate new streams
/// and iterate over inbound HTTP/2 stream channels.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public func configureAsyncHTTP2Pipeline<ConnectionInbound, ConnectionOutbound, StreamInbound, StreamOutbound>(
mode: NIOHTTP2Handler.ParserMode,
connectionConfiguration: NIOHTTP2Handler.ConnectionConfiguration,
streamConfiguration: NIOHTTP2Handler.StreamConfiguration,
streamDelegate: NIOHTTP2StreamDelegate? = nil,
connectionInboundType: ConnectionInbound.Type,
connectionOutboundType: ConnectionOutbound.Type,
streamInboundType: StreamInbound.Type,
streamOutboundType: StreamOutbound.Type,
connectionInitializer: @escaping NIOHTTP2Handler.ConnectionInitializer,
inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer
) throws -> EventLoopFuture<(
NIOAsyncChannel<ConnectionInbound, ConnectionOutbound>,
NIOHTTP2Handler.AsyncStreamMultiplexer<NIOAsyncChannel<StreamInbound, StreamOutbound>>
)> {
return try self.configureAsyncHTTP2Pipeline(
mode: mode,
connectionConfiguration: connectionConfiguration,
streamConfiguration: streamConfiguration,
streamDelegate: streamDelegate,
streamInboundType: streamInboundType,
streamOutboundType: streamOutboundType,
inboundStreamInitializer: inboundStreamInitializer
).flatMap { multiplexer in
return connectionInitializer(self).flatMapThrowing { _ in
return try NIOAsyncChannel(
synchronouslyWrapping: self,
inboundType: ConnectionInbound.self,
outboundType: ConnectionOutbound.self
)
}.map { connectionAsyncChannel in
return (connectionAsyncChannel, multiplexer)
rnro marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}

extension ChannelPipeline.SynchronousOperations {
Expand Down Expand Up @@ -476,8 +615,8 @@ extension ChannelPipeline.SynchronousOperations {
/// - streamDelegate: The delegate to be notified in the event of stream creation and close.
/// - position: The position in the pipeline into which to insert this handler.
/// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream.
/// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can be used to initiate new streams and iterate over inbound HTTP/2 stream channels.
/// inserted into this pipeline, which can be used to initiate new streams.
/// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can
/// be used to initiate new streams and iterate over inbound HTTP/2 stream channels.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public func configureAsyncHTTP2Pipeline<Output>(
Expand All @@ -504,4 +643,60 @@ extension ChannelPipeline.SynchronousOperations {
return try handler.syncAsyncStreamMultiplexer(continuation: continuation, inboundStreamChannels: inboundStreamChannels)
}

/// Configures a `ChannelPipeline` to speak HTTP/2 and wraps any created inbound stream channels in `NIOAsyncChannel`s.
///
/// This operation **must** be called on the event loop.
///
/// In general this is not entirely useful by itself, as HTTP/2 is a negotiated protocol. This helper does not handle negotiation.
/// Instead, this simply adds the handler required to speak HTTP/2 after negotiation has completed, or when agreed by prior knowledge.
/// Use this function to setup a HTTP/2 pipeline if you wish to use `NIOAsyncChannel`s to provide async sequence abstractions
/// over inbound and outbound streams whilst handling back-pressure.
///
/// Using this rather than implementing a similar function yourself allows that pipeline to evolve without breaking your code.
///
/// - parameters:
/// - mode: The mode this pipeline will operate in, server or client.
/// - connectionConfiguration: The settings that will be used when establishing the connection. These will be sent to the peer as part of the
/// handshake.
/// - streamConfiguration: The settings that will be used when establishing new streams. These mainly pertain to flow control.
/// - streamDelegate: The delegate to be notified in the event of stream creation and close.
/// - position: The position in the pipeline into which to insert the `NIOHTTP2Handler`.
/// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for inbound stream channels.
/// This type must match the `InboundOut` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or `HTTP2Frame.Payload` if there are none.
/// - streamOutboundType: The ``NIOAsyncChannel/outboundWriter`` message type for inbound stream channels.
/// This type must match the `OutboundIn` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or `HTTP2Frame.Payload` if there are none.
/// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream.
/// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline which wraps
/// inbound streams as `NIOAsyncChannels` after initialization. The multiplexer can be used to initiate new streams
/// and iterate over inbound HTTP/2 stream channels.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public func configureAsyncHTTP2Pipeline<StreamInbound, StreamOutbound>(
mode: NIOHTTP2Handler.ParserMode,
connectionConfiguration: NIOHTTP2Handler.ConnectionConfiguration,
streamConfiguration: NIOHTTP2Handler.StreamConfiguration,
streamDelegate: NIOHTTP2StreamDelegate? = nil,
position: ChannelPipeline.Position = .last,
streamInboundType: StreamInbound.Type,
streamOutboundType: StreamOutbound.Type,
inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer
) throws -> NIOHTTP2Handler.AsyncStreamMultiplexer<NIOAsyncChannel<StreamInbound, StreamOutbound>> {
return try configureAsyncHTTP2Pipeline(
rnro marked this conversation as resolved.
Show resolved Hide resolved
mode: mode,
connectionConfiguration: connectionConfiguration,
streamConfiguration: streamConfiguration,
streamDelegate: streamDelegate,
position: position
) { channel in
inboundStreamInitializer(channel).flatMapThrowing { _ in
return try NIOAsyncChannel(
synchronouslyWrapping: channel,
inboundType: StreamInbound.self,
outboundType: StreamOutbound.self
)
}
}
}
}