Skip to content

Commit

Permalink
Fix EndRead throwing exceptions when a stream is active (#5454)
Browse files Browse the repository at this point in the history
Fixes #5450
  • Loading branch information
NinoFloris committed Nov 28, 2023
1 parent 6cfd399 commit 42ea28f
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 6 deletions.
14 changes: 8 additions & 6 deletions src/Npgsql/Internal/PgReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public void Rewind(int count)
/// <returns>The stream length, if any</returns>
async ValueTask DisposeUserActiveStream(bool async)
{
if (_userActiveStream is { IsDisposed: false })
if (StreamActive)
{
if (async)
await _userActiveStream.DisposeAsync().ConfigureAwait(false);
Expand Down Expand Up @@ -486,7 +486,7 @@ internal void EndRead()
return;
}

if (FieldOffset != FieldSize)
if (FieldOffset != FieldSize && !StreamActive)
ThrowNotConsumedExactly();

_fieldConsumed = true;
Expand All @@ -501,7 +501,7 @@ internal ValueTask EndReadAsync()
if (_fieldBufferRequirement is { Kind: SizeKind.UpperBound })
return ConsumeAsync(FieldRemaining);

if (FieldOffset != FieldSize)
if (FieldOffset != FieldSize && !StreamActive)
ThrowNotConsumedExactly();

_fieldConsumed = true;
Expand Down Expand Up @@ -560,9 +560,11 @@ internal async ValueTask Consume(bool async, int? count = null, CancellationToke
public void Consume(int? count = null) => Consume(async: false, count).GetAwaiter().GetResult();
public ValueTask ConsumeAsync(int? count = null, CancellationToken cancellationToken = default) => Consume(async: true, count, cancellationToken);

[MemberNotNullWhen(true, nameof(_userActiveStream))]
bool StreamActive => _userActiveStream is { IsDisposed: false };
internal void ThrowIfStreamActive()
{
if (_userActiveStream is { IsDisposed: false})
if (StreamActive)
ThrowHelper.ThrowInvalidOperationException("A stream is already open for this reader");
}

Expand Down Expand Up @@ -612,7 +614,7 @@ void CommitSlow()
// Shut down any streaming and pooling going on on the column.
if (_requiresCleanup)
{
if (_userActiveStream is { IsDisposed: false })
if (StreamActive)
DisposeUserActiveStream(async: false).GetAwaiter().GetResult();

if (_pooledArray is not null)
Expand Down Expand Up @@ -693,7 +695,7 @@ async ValueTask CommitSlow()
// Shut down any streaming and pooling going on on the column.
if (_requiresCleanup)
{
if (_userActiveStream is { IsDisposed: false })
if (StreamActive)
await DisposeUserActiveStream(async: true).ConfigureAwait(false);

if (_pooledArray is not null)
Expand Down
29 changes: 29 additions & 0 deletions test/Npgsql.Tests/ReaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1757,6 +1757,35 @@ public async Task GetTextReader_in_middle_of_column_throws([Values] bool async)

#endregion GetChars / GetTextReader

[Test, IssueLink("https://github.com/npgsql/npgsql/issues/5450")]
public async Task EndRead_StreamActive([Values]bool async)
{
if (IsMultiplexing)
return;

const int columnLength = 1;

await using var conn = await OpenConnectionAsync();
var buffer = conn.Connector!.ReadBuffer;
buffer.FilledBytes += columnLength;
var reader = buffer.PgReader;
reader.Init(columnLength, DataFormat.Binary, resumable: false);
if (async)
await reader.StartReadAsync(Size.Unknown, CancellationToken.None);
else
reader.StartRead(Size.Unknown);

await using (var _ = reader.GetStream())
{
if (async)
Assert.DoesNotThrowAsync(async () => await reader.EndReadAsync());
else
Assert.DoesNotThrow(() => reader.EndRead());
}

reader.Commit(resuming: false);
}

[Test, Description("Tests that everything goes well when a type handler generates a NpgsqlSafeReadException")]
public async Task SafeReadException()
{
Expand Down

0 comments on commit 42ea28f

Please sign in to comment.