Skip to content

Commit

Permalink
Add additional NIOAsyncChannel config (#406)
Browse files Browse the repository at this point in the history
* Add additional NIOAsyncChannel config

Motivation:

Recent additions to pipeline configuration did not expose some of the
`NIOAsyncChannel` configuration.

Modifications:

Expose `backpressureStrategy`, `isOutboundHalfClosureEnabled`.

I think specific naming and object encapsulation for these parameters should be discussed
later before SPI is removed and we review the API as a whole.

Result:

Pipelie helpers for `NIOAsyncChannel` expose `backpressureStrategy`, `isOutboundHalfClosureEnabled`.

* review comments, re-order parameters

* review comments

* param reordering, missing docs comments
  • Loading branch information
rnro committed Jul 4, 2023
1 parent f6f4319 commit 6e35856
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 10 deletions.
12 changes: 10 additions & 2 deletions Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ extension NIOHTTP2Handler {
/// Create a stream channel initialized with the provided closure and return it wrapped within a `NIOAsyncChannel`.
///
/// - Parameters:
/// - backpressureStrategy: The backpressure strategy of the ``NIOAsyncChannel`` wrapping the HTTP/2 stream channel.
/// - isOutboundHalfClosureEnabled: If outbound half closure should be enabled for the ``NIOAsyncChannel`` wrapping the HTTP/2 stream channel.
/// - inboundType: The ``NIOAsyncChannel/inboundStream`` message type for the created channel.
/// This type must match the `InboundOut` type of the final handler added to the stream channel by the `initializer`
/// or ``HTTP2Frame/FramePayload`` if there are none.
Expand All @@ -255,15 +257,21 @@ extension NIOHTTP2Handler {
/// or ``HTTP2Frame/FramePayload`` if there are none.
/// - initializer: A callback that will be invoked to allow you to configure the
/// `ChannelPipeline` for the newly created channel.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public func createStreamChannel<Inbound, Outbound>(
inboundType: Inbound.Type,
outboundType: Outbound.Type,
backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
isOutboundHalfClosureEnabled: Bool = false,
inboundType: Inbound.Type = Inbound.self,
outboundType: Outbound.Type = Outbound.self,
initializer: @escaping NIOHTTP2Handler.StreamInitializer
) async throws -> NIOAsyncChannel<Inbound, Outbound> {
return try await self.createStreamChannel { channel in
initializer(channel).flatMapThrowing { _ in
return try NIOAsyncChannel(
synchronouslyWrapping: channel,
backpressureStrategy: backpressureStrategy,
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled,
inboundType: Inbound.self,
outboundType: Outbound.self
)
Expand Down
42 changes: 34 additions & 8 deletions Sources/NIOHTTP2/HTTP2PipelineHelpers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ extension Channel {
/// - streamConfiguration: The settings that will be used when establishing new streams. These mainly pertain to flow control.
/// - streamDelegate: The delegate to be notified in the event of stream creation and close.
/// - position: The position in the pipeline into which to insert the `NIOHTTP2Handler`.
/// - inboundStreamBackpressureStrategy: The backpressure strategy of the ``NIOAsyncChannel``s wrapping inbound HTTP/2 streams.
/// - isInboundStreamOutboundHalfClosureEnabled: If outbound half closure should be enabled for the ``NIOAsyncChannel``s wrapping inbound HTTP/2 streams.
/// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for inbound stream channels.
/// This type must match the `InboundOut` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or ``HTTP2Frame/FramePayload`` if there are none.
Expand All @@ -294,8 +296,10 @@ extension Channel {
streamConfiguration: NIOHTTP2Handler.StreamConfiguration,
streamDelegate: NIOHTTP2StreamDelegate? = nil,
position: ChannelPipeline.Position = .last,
streamInboundType: StreamInbound.Type,
streamOutboundType: StreamOutbound.Type,
inboundStreamBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
isInboundStreamOutboundHalfClosureEnabled: Bool = false,
streamInboundType: StreamInbound.Type = StreamInbound.self,
streamOutboundType: StreamOutbound.Type = StreamOutbound.self,
inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer
) throws -> EventLoopFuture<NIOHTTP2Handler.AsyncStreamMultiplexer<NIOAsyncChannel<StreamInbound, StreamOutbound>>> {
if self.eventLoop.inEventLoop {
Expand All @@ -306,6 +310,8 @@ extension Channel {
streamConfiguration: streamConfiguration,
streamDelegate: streamDelegate,
position: position,
inboundStreamBackpressureStrategy: inboundStreamBackpressureStrategy,
isInboundStreamOutboundHalfClosureEnabled: isInboundStreamOutboundHalfClosureEnabled,
streamInboundType: streamInboundType,
streamOutboundType: streamOutboundType,
inboundStreamInitializer: inboundStreamInitializer
Expand All @@ -319,6 +325,8 @@ extension Channel {
streamConfiguration: streamConfiguration,
streamDelegate: streamDelegate,
position: position,
inboundStreamBackpressureStrategy: inboundStreamBackpressureStrategy,
isInboundStreamOutboundHalfClosureEnabled: isInboundStreamOutboundHalfClosureEnabled,
streamInboundType: streamInboundType,
streamOutboundType: streamOutboundType,
inboundStreamInitializer: inboundStreamInitializer
Expand Down Expand Up @@ -505,10 +513,14 @@ extension Channel {
/// - streamConfiguration: The settings that will be used when establishing new streams. These mainly pertain to flow control.
/// - streamDelegate: The delegate to be notified in the event of stream creation and close.
/// - position: The position in the pipeline into which to insert the `NIOHTTP2Handler`.
/// - connectionBackpressureStrategy: The backpressure strategy of the ``NIOAsyncChannel`` wrapping the HTTP/2 connection channel.
/// - isConnectionOutboundHalfClosureEnabled: If outbound half closure should be enabled for the ``NIOAsyncChannel`` wrapping the HTTP/2 connection channel.
/// - connectionInboundType: The ``NIOAsyncChannel/inboundStream`` message type for the HTTP/2 connection channel.
/// This type must match the `InboundOut` type of the final handler in the connection channel.
/// - connectionOutboundType: The ``NIOAsyncChannel/outboundWriter`` message type for the HTTP/2 connection channel.
/// This type must match the `OutboundIn` type of the final handler in the connection channel.
/// - inboundStreamBackpressureStrategy: The backpressure strategy of the ``NIOAsyncChannel``s wrapping inbound HTTP/2 streams.
/// - isInboundStreamOutboundHalfClosureEnabled: If outbound half closure should be enabled for the ``NIOAsyncChannel``s wrapping inbound HTTP/2 streams.
/// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for inbound stream channels.
/// This type must match the `InboundOut` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or ``HTTP2Frame/FramePayload`` if there are none.
Expand All @@ -527,10 +539,14 @@ extension Channel {
connectionConfiguration: NIOHTTP2Handler.ConnectionConfiguration,
streamConfiguration: NIOHTTP2Handler.StreamConfiguration,
streamDelegate: NIOHTTP2StreamDelegate? = nil,
connectionInboundType: ConnectionInbound.Type,
connectionOutboundType: ConnectionOutbound.Type,
streamInboundType: StreamInbound.Type,
streamOutboundType: StreamOutbound.Type,
connectionBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
isConnectionOutboundHalfClosureEnabled: Bool = false,
connectionInboundType: ConnectionInbound.Type = ConnectionInbound.self,
connectionOutboundType: ConnectionOutbound.Type = ConnectionOutbound.self,
inboundStreamBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
isInboundStreamOutboundHalfClosureEnabled: Bool = false,
streamInboundType: StreamInbound.Type = StreamInbound.self,
streamOutboundType: StreamOutbound.Type = StreamOutbound.self,
connectionInitializer: @escaping NIOHTTP2Handler.ConnectionInitializer,
inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer
) throws -> EventLoopFuture<(
Expand All @@ -542,13 +558,17 @@ extension Channel {
connectionConfiguration: connectionConfiguration,
streamConfiguration: streamConfiguration,
streamDelegate: streamDelegate,
inboundStreamBackpressureStrategy: inboundStreamBackpressureStrategy,
isInboundStreamOutboundHalfClosureEnabled: isInboundStreamOutboundHalfClosureEnabled,
streamInboundType: streamInboundType,
streamOutboundType: streamOutboundType,
inboundStreamInitializer: inboundStreamInitializer
).flatMap { multiplexer in
return connectionInitializer(self).flatMapThrowing { _ in
let connectionAsyncChannel = try NIOAsyncChannel(
synchronouslyWrapping: self,
backpressureStrategy: connectionBackpressureStrategy,
isOutboundHalfClosureEnabled: isConnectionOutboundHalfClosureEnabled,
inboundType: ConnectionInbound.self,
outboundType: ConnectionOutbound.self
)
Expand Down Expand Up @@ -660,6 +680,8 @@ extension ChannelPipeline.SynchronousOperations {
/// - streamConfiguration: The settings that will be used when establishing new streams. These mainly pertain to flow control.
/// - streamDelegate: The delegate to be notified in the event of stream creation and close.
/// - position: The position in the pipeline into which to insert the `NIOHTTP2Handler`.
/// - inboundStreamBackpressureStrategy: The backpressure strategy of the ``NIOAsyncChannel``s wrapping inbound HTTP/2 streams.
/// - isInboundStreamOutboundHalfClosureEnabled: If outbound half closure should be enabled for the ``NIOAsyncChannel``s wrapping inbound HTTP/2 streams.
/// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for inbound stream channels.
/// This type must match the `InboundOut` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or ``HTTP2Frame/FramePayload`` if there are none.
Expand All @@ -678,8 +700,10 @@ extension ChannelPipeline.SynchronousOperations {
streamConfiguration: NIOHTTP2Handler.StreamConfiguration,
streamDelegate: NIOHTTP2StreamDelegate? = nil,
position: ChannelPipeline.Position = .last,
streamInboundType: StreamInbound.Type,
streamOutboundType: StreamOutbound.Type,
inboundStreamBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
isInboundStreamOutboundHalfClosureEnabled: Bool = false,
streamInboundType: StreamInbound.Type = StreamInbound.self,
streamOutboundType: StreamOutbound.Type = StreamOutbound.self,
inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer
) throws -> NIOHTTP2Handler.AsyncStreamMultiplexer<NIOAsyncChannel<StreamInbound, StreamOutbound>> {
return try self.configureAsyncHTTP2Pipeline(
Expand All @@ -692,6 +716,8 @@ extension ChannelPipeline.SynchronousOperations {
inboundStreamInitializer(channel).flatMapThrowing { _ in
return try NIOAsyncChannel(
synchronouslyWrapping: channel,
backpressureStrategy: inboundStreamBackpressureStrategy,
isOutboundHalfClosureEnabled: isInboundStreamOutboundHalfClosureEnabled,
inboundType: StreamInbound.self,
outboundType: StreamOutbound.self
)
Expand Down

0 comments on commit 6e35856

Please sign in to comment.