Skip to content

Commit

Permalink
Add additional NIOAsyncChannel config
Browse files Browse the repository at this point in the history
Motivation:

Recent additions to pipeline configuration did not expose some of the
`NIOAsyncChannel` configuration.

Modifications:

Expose `backpressureStrategy`, `isOutboundHalfClosureEnabled`.

I think specific naming and object encapsulation for these parameters should be discussed
later before SPI is removed and we review the API as a whole.

Result:

Pipelie helpers for `NIOAsyncChannel` expose `backpressureStrategy`, `isOutboundHalfClosureEnabled`.
  • Loading branch information
rnro committed Jul 4, 2023
1 parent e0ffed7 commit 0647d28
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,11 @@ extension NIOHTTP2Handler {
/// or ``HTTP2Frame/FramePayload`` if there are none.
/// - initializer: A callback that will be invoked to allow you to configure the
/// `ChannelPipeline` for the newly created channel.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public func createStreamChannel<Inbound, Outbound>(
backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
isOutboundHalfClosureEnabled: Bool = false,
inboundType: Inbound.Type,
outboundType: Outbound.Type,
initializer: @escaping NIOHTTP2Handler.StreamInitializer
Expand All @@ -264,6 +268,8 @@ extension NIOHTTP2Handler {
initializer(channel).flatMapThrowing { _ in
return try NIOAsyncChannel(
synchronouslyWrapping: channel,
backpressureStrategy: backpressureStrategy,
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled,
inboundType: Inbound.self,
outboundType: Outbound.self
)
Expand Down
16 changes: 16 additions & 0 deletions Sources/NIOHTTP2/HTTP2PipelineHelpers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ extension Channel {
position: ChannelPipeline.Position = .last,
streamInboundType: StreamInbound.Type,
streamOutboundType: StreamOutbound.Type,
inboundStreamBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
isInboundStreamOutboundHalfClosureEnabled: Bool = false,
inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer
) throws -> EventLoopFuture<NIOHTTP2Handler.AsyncStreamMultiplexer<NIOAsyncChannel<StreamInbound, StreamOutbound>>> {
if self.eventLoop.inEventLoop {
Expand All @@ -308,6 +310,8 @@ extension Channel {
position: position,
streamInboundType: streamInboundType,
streamOutboundType: streamOutboundType,
inboundStreamBackpressureStrategy: inboundStreamBackpressureStrategy,
isInboundStreamOutboundHalfClosureEnabled: isInboundStreamOutboundHalfClosureEnabled,
inboundStreamInitializer: inboundStreamInitializer
)
}
Expand All @@ -321,6 +325,8 @@ extension Channel {
position: position,
streamInboundType: streamInboundType,
streamOutboundType: streamOutboundType,
inboundStreamBackpressureStrategy: inboundStreamBackpressureStrategy,
isInboundStreamOutboundHalfClosureEnabled: isInboundStreamOutboundHalfClosureEnabled,
inboundStreamInitializer: inboundStreamInitializer
)
}
Expand Down Expand Up @@ -529,8 +535,12 @@ extension Channel {
streamDelegate: NIOHTTP2StreamDelegate? = nil,
connectionInboundType: ConnectionInbound.Type,
connectionOutboundType: ConnectionOutbound.Type,
connectionBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
connectionIsOutboundHalfClosureEnabled: Bool = false,
streamInboundType: StreamInbound.Type,
streamOutboundType: StreamOutbound.Type,
inboundStreamBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
isInboundStreamOutboundHalfClosureEnabled: Bool = false,
connectionInitializer: @escaping NIOHTTP2Handler.ConnectionInitializer,
inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer
) throws -> EventLoopFuture<(
Expand All @@ -544,11 +554,15 @@ extension Channel {
streamDelegate: streamDelegate,
streamInboundType: streamInboundType,
streamOutboundType: streamOutboundType,
inboundStreamBackpressureStrategy: inboundStreamBackpressureStrategy,
isInboundStreamOutboundHalfClosureEnabled: isInboundStreamOutboundHalfClosureEnabled,
inboundStreamInitializer: inboundStreamInitializer
).flatMap { multiplexer in
return connectionInitializer(self).flatMapThrowing { _ in
let connectionAsyncChannel = try NIOAsyncChannel(
synchronouslyWrapping: self,
backpressureStrategy: connectionBackpressureStrategy,
isOutboundHalfClosureEnabled: connectionIsOutboundHalfClosureEnabled,
inboundType: ConnectionInbound.self,
outboundType: ConnectionOutbound.self
)
Expand Down Expand Up @@ -680,6 +694,8 @@ extension ChannelPipeline.SynchronousOperations {
position: ChannelPipeline.Position = .last,
streamInboundType: StreamInbound.Type,
streamOutboundType: StreamOutbound.Type,
inboundStreamBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
isInboundStreamOutboundHalfClosureEnabled: Bool = false,
inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer
) throws -> NIOHTTP2Handler.AsyncStreamMultiplexer<NIOAsyncChannel<StreamInbound, StreamOutbound>> {
return try self.configureAsyncHTTP2Pipeline(
Expand Down

0 comments on commit 0647d28

Please sign in to comment.