Skip to content

Commit

Permalink
HTTP/2 pipeline NIOAsyncChannel pipeline config
Browse files Browse the repository at this point in the history
Motivation:

This continues the work to expose functionality which allows users to interact
with HTTP/2 connections via async abstractions and using structured
concurrency.

We build atop of previous work to configure pipelines to deal with
HTTP/2 (with no protocol negotiation) and to wrap connection/stream
channels with `NIOAsyncChannel`s. This will allow users to iterate over
streams and HTTP2Frames.

Modifications:

Provide functions which configure channels and pipelines with the HTTP2
handler, expose a multiplexer for dealing with streams and wrap
connection and stream channels with `NIOAsyncChannel`s.

Result:

Users will be able to create and interact with HTTP/2 connections via
`NIOAsyncChannel`s. Because HTTP/2 is a negotiated protocol and we do
not yet handle it, this is of limited utility.
  • Loading branch information
rnro committed Jul 3, 2023
1 parent 83c04db commit d46839c
Show file tree
Hide file tree
Showing 4 changed files with 319 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//
//===----------------------------------------------------------------------===//

import NIOCore
@_spi(AsyncChannel) import NIOCore

internal struct InlineStreamMultiplexer {
private let context: ChannelHandlerContext
Expand Down Expand Up @@ -238,8 +238,36 @@ extension NIOHTTP2Handler {
self.inbound = inboundStreamChannels
}

/// Create a stream channel initialized with the provided closure
public func createStreamChannel<OutboundStreamOutput>(_ initializer: @escaping NIOHTTP2Handler.StreamInitializerWithOutput<OutboundStreamOutput>) async throws -> OutboundStreamOutput {
return try await self.inlineStreamMultiplexer.createStreamChannel(initializer).get()
}


/// Create a stream channel initialized with the provided closure and return it wrapped within a `NIOAsyncChannel`.
/// - parameters:
/// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for outbound stream channels.
/// This type must match the `InboundOut` type of the final handler added to the stream channel by the `initializer`
/// or `HTTP2Frame.Payload` if there are none.
/// - streamOutboundType: The ``NIOAsyncChannel/outboundWriter`` message type for outbound stream channels.
/// This type must match the `OutboundIn` type of the final handler added to the stream channel by the `initializer`
/// or `HTTP2Frame.Payload` if there are none.
/// - initializer: A callback that will be invoked to allow you to configure the
/// `ChannelPipeline` for the newly created channel.
public func createStreamChannel<Inbound, Outbound>(
inboundType: Inbound.Type = Inbound.self,
outboundType: Outbound.Type = Outbound.self,
initializer: @escaping NIOHTTP2Handler.StreamInitializer
) async throws -> NIOAsyncChannel<Inbound, Outbound> {
return try await self.createStreamChannel { channel in
initializer(channel).flatMapThrowing { _ in
return try NIOAsyncChannel(
synchronouslyWrapping: channel,
inboundType: Inbound.self,
outboundType: Outbound.self
)
}
}
}
}
}
4 changes: 4 additions & 0 deletions Sources/NIOHTTP2/HTTP2ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -990,11 +990,15 @@ extension NIOHTTP2Handler {
#if swift(>=5.7)
/// The type of all `inboundStreamInitializer` callbacks.
public typealias StreamInitializer = @Sendable (Channel) -> EventLoopFuture<Void>
/// The type of all `connectionInitializer` callbacks.
public typealias ConnectionInitializer = @Sendable (Channel) -> EventLoopFuture<Void>
/// The type of `inboundStreamInitializer` callbacks which return non-void results.
public typealias StreamInitializerWithOutput<Output> = @Sendable (Channel) -> EventLoopFuture<Output>
#else
/// The type of all `inboundStreamInitializer` callbacks.
public typealias StreamInitializer = (Channel) -> EventLoopFuture<Void>
/// The type of all `connectionInitializer` callbacks.
public typealias ConnectionInitializer = (Channel) -> EventLoopFuture<Void>
/// The type of `inboundStreamInitializer` callbacks which return non-void results.
public typealias StreamInitializerWithOutput<Output> = (Channel) -> EventLoopFuture<Output>
#endif
Expand Down
205 changes: 200 additions & 5 deletions Sources/NIOHTTP2/HTTP2PipelineHelpers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//
//===----------------------------------------------------------------------===//

import NIOCore
@_spi(AsyncChannel) import NIOCore
import NIOTLS

/// The supported ALPN protocol tokens for NIO's HTTP/2 abstraction layer.
Expand Down Expand Up @@ -198,7 +198,8 @@ extension Channel {
///
/// In general this is not entirely useful by itself, as HTTP/2 is a negotiated protocol. This helper does not handle negotiation.
/// Instead, this simply adds the handler required to speak HTTP/2 after negotiation has completed, or when agreed by prior knowledge.
/// Use this function to setup a HTTP/2 pipeline if you wish to use async sequence abstractions over inbound and outbound streams, as it allows that pipeline to evolve without breaking your code.
/// Use this function to setup a HTTP/2 pipeline if you wish to use async sequence abstractions over inbound and outbound streams.
/// Using this rather than implementing a similar function yourself allows that pipeline to evolve without breaking your code.
///
/// - parameters:
/// - mode: The mode this pipeline will operate in, server or client.
Expand All @@ -208,7 +209,8 @@ extension Channel {
/// - streamDelegate: The delegate to be notified in the event of stream creation and close.
/// - position: The position in the pipeline into which to insert this handler.
/// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream.
/// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can be used to initiate new streams and iterate over inbound HTTP/2 stream channels.
/// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can
/// be used to initiate new streams and iterate over inbound HTTP/2 stream channels.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public func configureAsyncHTTP2Pipeline<Output>(
Expand Down Expand Up @@ -244,6 +246,73 @@ extension Channel {
}
}

/// Configures a `ChannelPipeline` to speak HTTP/2 and wraps any created inbound stream channels in `NIOAsyncChannel`s.
///
/// In general this is not entirely useful by itself, as HTTP/2 is a negotiated protocol. This helper does not handle negotiation.
/// Instead, this simply adds the handler required to speak HTTP/2 after negotiation has completed, or when agreed by prior knowledge.
/// Use this function to setup a HTTP/2 pipeline if you wish to use `NIOAsyncChannel`s to provide async sequence abstractions
/// over inbound and outbound streams whilst handling back-pressure.
///
/// Using this rather than implementing a similar function yourself allows that pipeline to evolve without breaking your code.
///
/// - parameters:
/// - mode: The mode this pipeline will operate in, server or client.
/// - connectionConfiguration: The settings that will be used when establishing the connection. These will be sent to the peer as part of the
/// handshake.
/// - streamConfiguration: The settings that will be used when establishing new streams. These mainly pertain to flow control.
/// - streamDelegate: The delegate to be notified in the event of stream creation and close.
/// - position: The position in the pipeline into which to insert the `NIOHTTP2Handler`.
/// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for inbound stream channels.
/// This type must match the `InboundOut` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or `HTTP2Frame.Payload` if there are none.
/// - streamOutboundType: The ``NIOAsyncChannel/outboundWriter`` message type for inbound stream channels.
/// This type must match the `OutboundIn` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or `HTTP2Frame.Payload` if there are none.
/// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream.
/// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline which wraps
/// inbound streams as `NIOAsyncChannels` after initialization. The multiplexer can be used to initiate new streams
/// and iterate over inbound HTTP/2 stream channels.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public func configureAsyncHTTP2Pipeline<StreamInbound, StreamOutbound>(
mode: NIOHTTP2Handler.ParserMode,
connectionConfiguration: NIOHTTP2Handler.ConnectionConfiguration,
streamConfiguration: NIOHTTP2Handler.StreamConfiguration,
streamDelegate: NIOHTTP2StreamDelegate? = nil,
position: ChannelPipeline.Position = .last,
streamInboundType: StreamInbound.Type,
streamOutboundType: StreamOutbound.Type,
inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer
) throws -> EventLoopFuture<NIOHTTP2Handler.AsyncStreamMultiplexer<NIOAsyncChannel<StreamInbound, StreamOutbound>>> {
if self.eventLoop.inEventLoop {
return self.eventLoop.makeCompletedFuture {
return try self.pipeline.syncOperations.configureAsyncHTTP2Pipeline(
mode: mode,
connectionConfiguration: connectionConfiguration,
streamConfiguration: streamConfiguration,
streamDelegate: streamDelegate,
position: position,
streamInboundType: streamInboundType,
streamOutboundType: streamOutboundType,
inboundStreamInitializer: inboundStreamInitializer
)
}
} else {
return self.eventLoop.submit {
return try self.pipeline.syncOperations.configureAsyncHTTP2Pipeline(
mode: mode,
connectionConfiguration: connectionConfiguration,
streamConfiguration: streamConfiguration,
streamDelegate: streamDelegate,
position: position,
streamInboundType: streamInboundType,
streamOutboundType: streamOutboundType,
inboundStreamInitializer: inboundStreamInitializer
)
}
}
}

/// Configures a channel to perform a HTTP/2 secure upgrade.
///
/// HTTP/2 secure upgrade uses the Application Layer Protocol Negotiation TLS extension to
Expand Down Expand Up @@ -394,6 +463,76 @@ extension Channel {
}
}
}

/// Configures a `ChannelPipeline` to speak HTTP/2 and wraps both the connection channel and any
/// created inbound stream channels in `NIOAsyncChannel`s.
///
/// In general this is not entirely useful by itself, as HTTP/2 is a negotiated protocol. This helper does not handle negotiation.
/// Instead, this simply adds the handler required to speak HTTP/2 after negotiation has completed, or when agreed by prior knowledge.
/// Use this function to setup a HTTP/2 pipeline if you wish to use `NIOAsyncChannel`s to provide async sequence abstractions
/// over inbound and outbound streams whilst handling back-pressure.
///
/// Using this rather than implementing a similar function yourself allows that pipeline to evolve without breaking your code.
///
/// - parameters:
/// - mode: The mode this pipeline will operate in, server or client.
/// - connectionConfiguration: The settings that will be used when establishing the connection. These will be sent to the peer as part of the
/// handshake.
/// - streamConfiguration: The settings that will be used when establishing new streams. These mainly pertain to flow control.
/// - streamDelegate: The delegate to be notified in the event of stream creation and close.
/// - position: The position in the pipeline into which to insert the `NIOHTTP2Handler`.
/// - connectionInboundType: The ``NIOAsyncChannel/inboundStream`` message type for the HTTP/2 connection channel.
/// This type must match the `InboundOut` type of the final handler in the connection channel.
/// - connectionOutboundType: The ``NIOAsyncChannel/outboundWriter`` message type for the HTTP/2 connection channel.
/// This type must match the `OutboundIn` type of the final handler in the connection channel.
/// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for inbound stream channels.
/// This type must match the `InboundOut` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or `HTTP2Frame.Payload` if there are none.
/// - streamOutboundType: The ``NIOAsyncChannel/outboundWriter`` message type for inbound stream channels.
/// This type must match the `OutboundIn` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or `HTTP2Frame.Payload` if there are none.
/// - connectionInitializer: A closure that will be called once the `NIOHTTP2Handler` has been added to the pipeline.
/// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream.
/// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline which wraps
/// inbound streams as `NIOAsyncChannels` after initialization. The multiplexer can be used to initiate new streams
/// and iterate over inbound HTTP/2 stream channels.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public func configureAsyncHTTP2Pipeline<ConnectionInbound, ConnectionOutbound, StreamInbound, StreamOutbound>(
mode: NIOHTTP2Handler.ParserMode,
connectionConfiguration: NIOHTTP2Handler.ConnectionConfiguration,
streamConfiguration: NIOHTTP2Handler.StreamConfiguration,
streamDelegate: NIOHTTP2StreamDelegate? = nil,
connectionInboundType: ConnectionInbound.Type,
connectionOutboundType: ConnectionOutbound.Type,
streamInboundType: StreamInbound.Type,
streamOutboundType: StreamOutbound.Type,
connectionInitializer: @escaping NIOHTTP2Handler.ConnectionInitializer,
inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer
) throws -> EventLoopFuture<(
NIOAsyncChannel<ConnectionInbound, ConnectionOutbound>,
NIOHTTP2Handler.AsyncStreamMultiplexer<NIOAsyncChannel<StreamInbound, StreamOutbound>>
)> {
return try self.configureAsyncHTTP2Pipeline(
mode: mode,
connectionConfiguration: connectionConfiguration,
streamConfiguration: streamConfiguration,
streamDelegate: streamDelegate,
streamInboundType: streamInboundType,
streamOutboundType: streamOutboundType,
inboundStreamInitializer: inboundStreamInitializer
).flatMap { multiplexer in
return connectionInitializer(self).flatMapThrowing { _ in
return try NIOAsyncChannel(
synchronouslyWrapping: self,
inboundType: ConnectionInbound.self,
outboundType: ConnectionOutbound.self
)
}.map { connectionAsyncChannel in
return (connectionAsyncChannel, multiplexer)
}
}
}
}

extension ChannelPipeline.SynchronousOperations {
Expand Down Expand Up @@ -445,8 +584,8 @@ extension ChannelPipeline.SynchronousOperations {
/// - streamDelegate: The delegate to be notified in the event of stream creation and close.
/// - position: The position in the pipeline into which to insert this handler.
/// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream.
/// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can be used to initiate new streams and iterate over inbound HTTP/2 stream channels.
/// inserted into this pipeline, which can be used to initiate new streams.
/// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can
/// be used to initiate new streams and iterate over inbound HTTP/2 stream channels.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public func configureAsyncHTTP2Pipeline<Output>(
Expand All @@ -473,4 +612,60 @@ extension ChannelPipeline.SynchronousOperations {
return try handler.syncAsyncStreamMultiplexer(continuation: continuation, inboundStreamChannels: inboundStreamChannels)
}

/// Configures a `ChannelPipeline` to speak HTTP/2 and wraps any created inbound stream channels in `NIOAsyncChannel`s.
///
/// This operation **must** be called on the event loop.
///
/// In general this is not entirely useful by itself, as HTTP/2 is a negotiated protocol. This helper does not handle negotiation.
/// Instead, this simply adds the handler required to speak HTTP/2 after negotiation has completed, or when agreed by prior knowledge.
/// Use this function to setup a HTTP/2 pipeline if you wish to use `NIOAsyncChannel`s to provide async sequence abstractions
/// over inbound and outbound streams whilst handling back-pressure.
///
/// Using this rather than implementing a similar function yourself allows that pipeline to evolve without breaking your code.
///
/// - parameters:
/// - mode: The mode this pipeline will operate in, server or client.
/// - connectionConfiguration: The settings that will be used when establishing the connection. These will be sent to the peer as part of the
/// handshake.
/// - streamConfiguration: The settings that will be used when establishing new streams. These mainly pertain to flow control.
/// - streamDelegate: The delegate to be notified in the event of stream creation and close.
/// - position: The position in the pipeline into which to insert the `NIOHTTP2Handler`.
/// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for inbound stream channels.
/// This type must match the `InboundOut` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or `HTTP2Frame.Payload` if there are none.
/// - streamOutboundType: The ``NIOAsyncChannel/outboundWriter`` message type for inbound stream channels.
/// This type must match the `OutboundIn` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or `HTTP2Frame.Payload` if there are none.
/// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream.
/// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline which wraps
/// inbound streams as `NIOAsyncChannels` after initialization. The multiplexer can be used to initiate new streams
/// and iterate over inbound HTTP/2 stream channels.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public func configureAsyncHTTP2Pipeline<StreamInbound, StreamOutbound>(
mode: NIOHTTP2Handler.ParserMode,
connectionConfiguration: NIOHTTP2Handler.ConnectionConfiguration,
streamConfiguration: NIOHTTP2Handler.StreamConfiguration,
streamDelegate: NIOHTTP2StreamDelegate? = nil,
position: ChannelPipeline.Position = .last,
streamInboundType: StreamInbound.Type,
streamOutboundType: StreamOutbound.Type,
inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer
) throws -> NIOHTTP2Handler.AsyncStreamMultiplexer<NIOAsyncChannel<StreamInbound, StreamOutbound>> {
return try configureAsyncHTTP2Pipeline(
mode: mode,
connectionConfiguration: connectionConfiguration,
streamConfiguration: streamConfiguration,
streamDelegate: streamDelegate,
position: position
) { channel in
inboundStreamInitializer(channel).flatMapThrowing { _ in
return try NIOAsyncChannel(
synchronouslyWrapping: channel,
inboundType: StreamInbound.self,
outboundType: StreamOutbound.self
)
}
}
}
}

0 comments on commit d46839c

Please sign in to comment.