Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async stream initializers order matches existing #415

Merged
merged 8 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ extension InlineStreamMultiplexer {
self.commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), streamStateInitializer)
}

internal func createStreamChannel<Output>(_ initializer: @escaping NIOChannelInitializerWithOutput<Output>) -> EventLoopFuture<Output> {
internal func createStreamChannel<Output: Sendable>(_ initializer: @escaping NIOChannelInitializerWithOutput<Output>) -> EventLoopFuture<Output> {
self.commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), initializer)
}
}
Expand Down Expand Up @@ -239,7 +239,7 @@ extension NIOHTTP2Handler {
}

/// Create a stream channel initialized with the provided closure
public func createStreamChannel<OutboundStreamOutput>(_ initializer: @escaping NIOChannelInitializerWithOutput<OutboundStreamOutput>) async throws -> OutboundStreamOutput {
public func createStreamChannel<OutboundStreamOutput: Sendable>(_ initializer: @escaping NIOChannelInitializerWithOutput<OutboundStreamOutput>) async throws -> OutboundStreamOutput {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think OutboundStreamOutput is unnecessarily verbose here: Output is fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Output should be enough.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally still think we should call this method createOutboundStream so that it aligns with the inboundStreamInitializer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name aligns with the equivalent method on the non-async multiplexer, I think that alignment is more important.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally don't think that is more important because this here is the chance for a new name. I recently saw somebody giving feedback on exactly this API that they are confused what initializer runs when.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think createOutboundStream is misleading: it sounds like it's a unidirectional stream to me, which it isn't. (FWIW I have the same issue with our inbound stream initializers.)

return try await self.inlineStreamMultiplexer.createStreamChannel(initializer).get()
}
}
Expand Down
4 changes: 2 additions & 2 deletions Sources/NIOHTTP2/HTTP2ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1005,12 +1005,12 @@ extension NIOHTTP2Handler {
/// The type of all `inboundStreamInitializer` callbacks which do not need to return data.
public typealias StreamInitializer = NIOChannelInitializer
/// The type of NIO Channel initializer callbacks which need to return untyped data.
internal typealias StreamInitializerWithAnyOutput = @Sendable (Channel) -> EventLoopFuture<Any>
internal typealias StreamInitializerWithAnyOutput = @Sendable (Channel) -> EventLoopFuture<any Sendable>
#else
/// The type of all `inboundStreamInitializer` callbacks which need to return data.
public typealias StreamInitializer = NIOChannelInitializer
/// The type of NIO Channel initializer callbacks which need to return untyped data.
internal typealias StreamInitializerWithAnyOutput = (Channel) -> EventLoopFuture<Any>
internal typealias StreamInitializerWithAnyOutput = (Channel) -> EventLoopFuture<any Sendable>
#endif

/// Creates a new ``NIOHTTP2Handler`` with a local multiplexer. (i.e. using
Expand Down
30 changes: 20 additions & 10 deletions Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,18 @@ extension HTTP2CommonInboundStreamMultiplexer {
// channel which in turn might cause further frames to be processed synchronously.
channel.receiveInboundFrame(frame)

let initializerProductPromise = self.streamChannelContinuation == nil ? nil : self.channel.eventLoop.makePromise(of: Any.self)
channel.configureInboundStream(initializer: self.inboundStreamStateInitializer, promise: initializerProductPromise)

// Configure the inbound stream.
// If we have an async sequence of inbound stream channels yield the channel to it
// but only once we are sure initialization and activation succeed
if let streamChannelContinuation = self.streamChannelContinuation {
initializerProductPromise!.futureResult.whenSuccess { value in
let promise = self.channel.eventLoop.makePromise(of: Any.self)
promise.futureResult.whenSuccess { value in
streamChannelContinuation.yield(any: value)
}

channel.configureInboundStream(initializer: self.inboundStreamStateInitializer, promise: promise)
} else {
channel.configureInboundStream(initializer: self.inboundStreamStateInitializer)
}

if !channel.inList {
Expand Down Expand Up @@ -290,7 +293,7 @@ extension HTTP2CommonInboundStreamMultiplexer {
}

extension HTTP2CommonInboundStreamMultiplexer {
internal func _createStreamChannel<Output>(
internal func _createStreamChannel<Output: Sendable>(
_ multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
_ promise: EventLoopPromise<Output>?,
_ streamStateInitializer: @escaping NIOChannelInitializerWithOutput<Output>
Expand All @@ -309,22 +312,29 @@ extension HTTP2CommonInboundStreamMultiplexer {
)
self.pendingStreams[channel.channelID] = channel

let anyInitializer: NIOChannelInitializerWithOutput<Any> = { channel in
streamStateInitializer(channel).map { $0 }
let anyInitializer: NIOChannelInitializerWithOutput<any Sendable> = { channel in
streamStateInitializer(channel).map { return $0 }
}

let anyPromise: EventLoopPromise<Any>?
if let promise = promise {
anyPromise = channel.baseChannel.eventLoop.makePromise(of: Any.self)
promise.completeWith(anyPromise!.futureResult.map { value in value as! Output })
anyPromise?.futureResult.whenComplete { result in
switch result {
case .success(let any):
promise.succeed(any as! Output)
FranzBusch marked this conversation as resolved.
Show resolved Hide resolved
case .failure(let error):
promise.fail(error)
}
}
} else {
anyPromise = nil
}

channel.configure(initializer: anyInitializer, userPromise: anyPromise)
}

internal func createStreamChannel<Output>(
internal func createStreamChannel<Output: Sendable>(
multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
promise: EventLoopPromise<Output>?,
_ streamStateInitializer: @escaping NIOChannelInitializerWithOutput<Output>
Expand All @@ -337,7 +347,7 @@ extension HTTP2CommonInboundStreamMultiplexer {
}
}

internal func createStreamChannel<Output>(
internal func createStreamChannel<Output: Sendable>(
multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
_ streamStateInitializer: @escaping NIOChannelInitializerWithOutput<Output>
) -> EventLoopFuture<Output> {
Expand Down
4 changes: 2 additions & 2 deletions Sources/NIOHTTP2/HTTP2PipelineHelpers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ extension ChannelPipeline.SynchronousOperations {
/// be used to initiate new streams and iterate over inbound HTTP/2 stream channels.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public func configureAsyncHTTP2Pipeline<Output>(
public func configureAsyncHTTP2Pipeline<Output: Sendable>(
mode: NIOHTTP2Handler.ParserMode,
configuration: NIOHTTP2Handler.Configuration = .init(),
position: ChannelPipeline.Position = .last,
Expand All @@ -608,7 +608,7 @@ extension ChannelPipeline.SynchronousOperations {
connectionConfiguration: configuration.connection,
streamConfiguration: configuration.stream,
inboundStreamInitializerWithAnyOutput: { channel in
inboundStreamInitializer(channel).map { $0 }
inboundStreamInitializer(channel).map { return $0 }
}
)

Expand Down
2 changes: 1 addition & 1 deletion Sources/NIOHTTP2/HTTP2StreamChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
// This variant is used in the async stream case.
// It uses `Any`s because when called from `configureInboundStream` it is passed the initializer stored on the handler
// which can't be a typed generic without changing the handler API.
internal func configure(initializer: (@escaping (Channel) -> EventLoopFuture<Any>), userPromise promise: EventLoopPromise<Any>?) {
internal func configure(initializer: (@escaping (Channel) -> EventLoopFuture<any Sendable>), userPromise promise: EventLoopPromise<Any>?) {
assert(self.streamDataType == .framePayload)
// We need to configure this channel. This involves doing four things:
// 1. Setting our autoRead state from the parent
Expand Down
6 changes: 3 additions & 3 deletions Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
/// channel that is created by the remote peer. For servers, these are channels created by
/// receiving a `HEADERS` frame from a client. For clients, these are channels created by
/// receiving a `PUSH_PROMISE` frame from a server. To initiate a new outbound channel, use
/// ``createStreamChannel(promise:_:)-18bxc``.
/// ``createStreamChannel(promise:_:)-1jk0q``.
@available(*, deprecated, renamed: "init(mode:channel:targetWindowSize:outboundBufferSizeHighWatermark:outboundBufferSizeLowWatermark:inboundStreamInitializer:)")
public convenience init(mode: NIOHTTP2Handler.ParserMode, channel: Channel, targetWindowSize: Int = 65535, inboundStreamStateInitializer: ((Channel, HTTP2StreamID) -> EventLoopFuture<Void>)? = nil) {
// We default to an 8kB outbound buffer size: this is a good trade off for avoiding excessive buffering while ensuring that decent
Expand Down Expand Up @@ -170,7 +170,7 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
/// channel that is created by the remote peer. For servers, these are channels created by
/// receiving a `HEADERS` frame from a client. For clients, these are channels created by
/// receiving a `PUSH_PROMISE` frame from a server. To initiate a new outbound channel, use
/// ``createStreamChannel(promise:_:)-18bxc``.
/// ``createStreamChannel(promise:_:)-1jk0q``.
public convenience init(mode: NIOHTTP2Handler.ParserMode,
channel: Channel,
targetWindowSize: Int = 65535,
Expand Down Expand Up @@ -199,7 +199,7 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
/// channel that is created by the remote peer. For servers, these are channels created by
/// receiving a `HEADERS` frame from a client. For clients, these are channels created by
/// receiving a `PUSH_PROMISE` frame from a server. To initiate a new outbound channel, use
/// ``createStreamChannel(promise:_:)-18bxc``.
/// ``createStreamChannel(promise:_:)-1jk0q``.
@available(*, deprecated, renamed: "init(mode:channel:targetWindowSize:outboundBufferSizeHighWatermark:outboundBufferSizeLowWatermark:inboundStreamInitializer:)")
public convenience init(mode: NIOHTTP2Handler.ParserMode,
channel: Channel,
Expand Down
15 changes: 12 additions & 3 deletions Sources/NIOHTTP2/MultiplexerAbstractChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ extension MultiplexerAbstractChannel {
enum InboundStreamStateInitializer {
case includesStreamID(((Channel, HTTP2StreamID) -> EventLoopFuture<Void>)?)
case excludesStreamID(((Channel) -> EventLoopFuture<Void>)?)
case returnsAny(((Channel) -> EventLoopFuture<Any>))
case returnsAny(((Channel) -> EventLoopFuture<any Sendable>))
}
}

Expand All @@ -90,12 +90,21 @@ extension MultiplexerAbstractChannel {
}
}

func configureInboundStream(initializer: InboundStreamStateInitializer, promise: EventLoopPromise<Any>?) {
func configureInboundStream(initializer: InboundStreamStateInitializer) {
switch initializer {
case .includesStreamID(let initializer):
self.baseChannel.configure(initializer: initializer, userPromise: nil)
case .excludesStreamID(let initializer):
self.baseChannel.configure(initializer: initializer, userPromise: nil)
case .returnsAny(let initializer):
self.baseChannel.configure(initializer: initializer, userPromise: nil)
}
}

func configureInboundStream(initializer: InboundStreamStateInitializer, promise: EventLoopPromise<Any>?) {
switch initializer {
case .includesStreamID, .excludesStreamID:
preconditionFailure("Configuration with a supplied `Any` promise is not supported with this initializer type.")
case .returnsAny(let initializer):
self.baseChannel.configure(initializer: initializer, userPromise: promise)
glbrntt marked this conversation as resolved.
Show resolved Hide resolved
}
Expand All @@ -112,7 +121,7 @@ extension MultiplexerAbstractChannel {
}

// used for async multiplexer
func configure(initializer: @escaping NIOChannelInitializerWithOutput<Any>, userPromise promise: EventLoopPromise<Any>?) {
func configure(initializer: @escaping NIOChannelInitializerWithOutput<any Sendable>, userPromise promise: EventLoopPromise<Any>?) {
self.baseChannel.configure(initializer: initializer, userPromise: promise)
}

Expand Down
4 changes: 2 additions & 2 deletions Tests/NIOHTTP2Tests/HTTP2InlineStreamMultiplexerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import NIOHTTP1
private extension Channel {
/// Adds a simple no-op ``HTTP2StreamMultiplexer`` to the pipeline.
func addNoOpInlineMultiplexer(mode: NIOHTTP2Handler.ParserMode, eventLoop: EventLoop) {
XCTAssertNoThrow(try self.pipeline.addHandler(NIOHTTP2Handler(mode: mode, eventLoop: eventLoop) { channel in
XCTAssertNoThrow(try self.pipeline.addHandler(NIOHTTP2Handler(mode: mode, eventLoop: eventLoop, inboundStreamInitializer: { channel in
self.eventLoop.makeSucceededFuture(())
}).wait())
})).wait())
}
}

Expand Down