Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rnro committed Jul 3, 2023
1 parent a186116 commit 664601a
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,17 +246,17 @@ extension NIOHTTP2Handler {

/// Create a stream channel initialized with the provided closure and return it wrapped within a `NIOAsyncChannel`.
/// - parameters:
/// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for outbound stream channels.
/// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for the created channel.
/// This type must match the `InboundOut` type of the final handler added to the stream channel by the `initializer`
/// or `HTTP2Frame.Payload` if there are none.
/// - streamOutboundType: The ``NIOAsyncChannel/outboundWriter`` message type for outbound stream channels.
/// or ``HTTP2Frame/FramePayload`` if there are none.
/// - streamOutboundType: The ``NIOAsyncChannel/outboundWriter`` message type for the created channel.
/// This type must match the `OutboundIn` type of the final handler added to the stream channel by the `initializer`
/// or `HTTP2Frame.Payload` if there are none.
/// or ``HTTP2Frame/FramePayload`` if there are none.
/// - initializer: A callback that will be invoked to allow you to configure the
/// `ChannelPipeline` for the newly created channel.
public func createStreamChannel<Inbound, Outbound>(
inboundType: Inbound.Type = Inbound.self,
outboundType: Outbound.Type = Outbound.self,
inboundType: Inbound.Type,
outboundType: Outbound.Type,
initializer: @escaping NIOHTTP2Handler.StreamInitializer
) async throws -> NIOAsyncChannel<Inbound, Outbound> {
return try await self.createStreamChannel { channel in
Expand Down
17 changes: 8 additions & 9 deletions Sources/NIOHTTP2/HTTP2PipelineHelpers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,10 @@ extension Channel {
/// - position: The position in the pipeline into which to insert the `NIOHTTP2Handler`.
/// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for inbound stream channels.
/// This type must match the `InboundOut` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or `HTTP2Frame.Payload` if there are none.
/// or ``HTTP2Frame/FramePayload`` if there are none.
/// - streamOutboundType: The ``NIOAsyncChannel/outboundWriter`` message type for inbound stream channels.
/// This type must match the `OutboundIn` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or `HTTP2Frame.Payload` if there are none.
/// or ``HTTP2Frame/FramePayload`` if there are none.
/// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream.
/// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline which wraps
/// inbound streams as `NIOAsyncChannels` after initialization. The multiplexer can be used to initiate new streams
Expand Down Expand Up @@ -511,10 +511,10 @@ extension Channel {
/// This type must match the `OutboundIn` type of the final handler in the connection channel.
/// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for inbound stream channels.
/// This type must match the `InboundOut` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or `HTTP2Frame.Payload` if there are none.
/// or ``HTTP2Frame/FramePayload`` if there are none.
/// - streamOutboundType: The ``NIOAsyncChannel/outboundWriter`` message type for inbound stream channels.
/// This type must match the `OutboundIn` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or `HTTP2Frame.Payload` if there are none.
/// or ``HTTP2Frame/FramePayload`` if there are none.
/// - connectionInitializer: A closure that will be called once the `NIOHTTP2Handler` has been added to the pipeline.
/// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream.
/// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline which wraps
Expand Down Expand Up @@ -547,12 +547,11 @@ extension Channel {
inboundStreamInitializer: inboundStreamInitializer
).flatMap { multiplexer in
return connectionInitializer(self).flatMapThrowing { _ in
return try NIOAsyncChannel(
let connectionAsyncChannel = try NIOAsyncChannel(
synchronouslyWrapping: self,
inboundType: ConnectionInbound.self,
outboundType: ConnectionOutbound.self
)
}.map { connectionAsyncChannel in
return (connectionAsyncChannel, multiplexer)
}
}
Expand Down Expand Up @@ -663,10 +662,10 @@ extension ChannelPipeline.SynchronousOperations {
/// - position: The position in the pipeline into which to insert the `NIOHTTP2Handler`.
/// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for inbound stream channels.
/// This type must match the `InboundOut` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or `HTTP2Frame.Payload` if there are none.
/// or ``HTTP2Frame/FramePayload`` if there are none.
/// - streamOutboundType: The ``NIOAsyncChannel/outboundWriter`` message type for inbound stream channels.
/// This type must match the `OutboundIn` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or `HTTP2Frame.Payload` if there are none.
/// or ``HTTP2Frame/FramePayload`` if there are none.
/// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream.
/// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline which wraps
/// inbound streams as `NIOAsyncChannels` after initialization. The multiplexer can be used to initiate new streams
Expand All @@ -683,7 +682,7 @@ extension ChannelPipeline.SynchronousOperations {
streamOutboundType: StreamOutbound.Type,
inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer
) throws -> NIOHTTP2Handler.AsyncStreamMultiplexer<NIOAsyncChannel<StreamInbound, StreamOutbound>> {
return try configureAsyncHTTP2Pipeline(
return try self.configureAsyncHTTP2Pipeline(
mode: mode,
connectionConfiguration: connectionConfiguration,
streamConfiguration: streamConfiguration,
Expand Down
80 changes: 77 additions & 3 deletions Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ final class ConfiguringPipelineAsyncMultiplexerTests: XCTestCase {
serverRecorder.receivedFrames.assertFramePayloadsMatch(Array(repeating: ConfiguringPipelineAsyncMultiplexerTests.requestFramePayload, count: requestCount))
}

func testNIOAsyncChannelPipelineCommunicates() async throws {
func testNIOAsyncConnectionStreamChannelPipelineCommunicates() async throws {
let requestCount = 100

let (clientAsyncChannel, clientMultiplexer) = try await assertNoThrowWithValue(
Expand All @@ -148,7 +148,7 @@ final class ConfiguringPipelineAsyncMultiplexerTests: XCTestCase {
) { channel in
channel.eventLoop.makeSucceededVoidFuture()
} inboundStreamInitializer: { channel -> EventLoopFuture<Void> in
self.serverChannel.eventLoop.makeSucceededVoidFuture()
channel.eventLoop.makeSucceededVoidFuture()
}.get()
)

Expand All @@ -164,7 +164,7 @@ final class ConfiguringPipelineAsyncMultiplexerTests: XCTestCase {
) { channel in
channel.eventLoop.makeSucceededVoidFuture()
} inboundStreamInitializer: { channel -> EventLoopFuture<Void> in
self.serverChannel.eventLoop.makeSucceededVoidFuture()
channel.eventLoop.makeSucceededVoidFuture()
}.get()
)

Expand Down Expand Up @@ -217,6 +217,80 @@ final class ConfiguringPipelineAsyncMultiplexerTests: XCTestCase {
XCTAssertEqual(serverInboundChannelCount, requestCount, "We should have created one server-side channel as a result of the one HTTP/2 stream used.")
}
}

func testNIOAsyncStreamChannelPipelineCommunicates() async throws {
let requestCount = 100

let clientMultiplexer = try await assertNoThrowWithValue(
try await self.clientChannel.configureAsyncHTTP2Pipeline(
mode: .client,
connectionConfiguration: .init(),
streamConfiguration: .init(),
streamInboundType: HTTP2Frame.FramePayload.self,
streamOutboundType: HTTP2Frame.FramePayload.self
) { channel -> EventLoopFuture<Void> in
channel.eventLoop.makeSucceededVoidFuture()
}.get()
)

let serverMultiplexer = try await assertNoThrowWithValue(
try await self.serverChannel.configureAsyncHTTP2Pipeline(
mode: .server,
connectionConfiguration: .init(),
streamConfiguration: .init(),
streamInboundType: HTTP2Frame.FramePayload.self,
streamOutboundType: HTTP2Frame.FramePayload.self
) { channel -> EventLoopFuture<Void> in
channel.eventLoop.makeSucceededVoidFuture()
}.get()
)

try await assertNoThrow(try await self.assertDoHandshake(client: self.clientChannel, server: self.serverChannel))

try await withThrowingTaskGroup(of: Int.self, returning: Void.self) { group in
// server
group.addTask {
var serverInboundChannelCount = 0
for try await streamChannel in serverMultiplexer.inbound {
for try await receivedFrame in streamChannel.inboundStream {
receivedFrame.assertFramePayloadMatches(this: ConfiguringPipelineAsyncMultiplexerTests.requestFramePayload)

try await streamChannel.outboundWriter.write(ConfiguringPipelineAsyncMultiplexerTests.responseFramePayload)
streamChannel.outboundWriter.finish()

try await self.interactInMemory(self.clientChannel, self.serverChannel)
}
serverInboundChannelCount += 1
}
return serverInboundChannelCount
}

// client
for _ in 0 ..< requestCount {
let streamChannel = try await clientMultiplexer.createStreamChannel(
inboundType: HTTP2Frame.FramePayload.self,
outboundType: HTTP2Frame.FramePayload.self
) { channel -> EventLoopFuture<Void> in
channel.eventLoop.makeSucceededVoidFuture()
}
// Let's try sending some requests
try await streamChannel.outboundWriter.write(ConfiguringPipelineAsyncMultiplexerTests.requestFramePayload)
streamChannel.outboundWriter.finish()

try await self.interactInMemory(self.clientChannel, self.serverChannel)

for try await receivedFrame in streamChannel.inboundStream {
receivedFrame.assertFramePayloadMatches(this: ConfiguringPipelineAsyncMultiplexerTests.responseFramePayload)
}
}

try await assertNoThrow(try await self.clientChannel.finish())
try await assertNoThrow(try await self.serverChannel.finish())

let serverInboundChannelCount = try await assertNoThrowWithValue(try await group.next()!)
XCTAssertEqual(serverInboundChannelCount, requestCount, "We should have created one server-side channel as a result of the one HTTP/2 stream used.")
}
}
}

#if swift(<5.9)
Expand Down

0 comments on commit 664601a

Please sign in to comment.