Skip to content

Commit

Permalink
Check whether a stream is active before throwing unconsumed exception
Browse files Browse the repository at this point in the history
Fixes #5450
  • Loading branch information
NinoFloris committed Nov 25, 2023
1 parent 01f92e0 commit fe53868
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 3 deletions.
7 changes: 4 additions & 3 deletions src/Npgsql/Internal/PgReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ internal void EndRead()
return;
}

if (FieldOffset != FieldSize)
if (FieldOffset != FieldSize && !IsStreamActive())
ThrowNotConsumedExactly();

_fieldConsumed = true;
Expand All @@ -501,7 +501,7 @@ internal ValueTask EndReadAsync()
if (_fieldBufferRequirement is { Kind: SizeKind.UpperBound })
return ConsumeAsync(FieldRemaining);

if (FieldOffset != FieldSize)
if (FieldOffset != FieldSize && !IsStreamActive())
ThrowNotConsumedExactly();

_fieldConsumed = true;
Expand Down Expand Up @@ -560,9 +560,10 @@ internal async ValueTask Consume(bool async, int? count = null, CancellationToke
public void Consume(int? count = null) => Consume(async: false, count).GetAwaiter().GetResult();
public ValueTask ConsumeAsync(int? count = null, CancellationToken cancellationToken = default) => Consume(async: true, count, cancellationToken);

bool IsStreamActive() => _userActiveStream is { IsDisposed: false };
internal void ThrowIfStreamActive()
{
if (_userActiveStream is { IsDisposed: false})
if (IsStreamActive())
ThrowHelper.ThrowInvalidOperationException("A stream is already open for this reader");
}

Expand Down
29 changes: 29 additions & 0 deletions test/Npgsql.Tests/ReaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,35 @@ public async Task GetTextReader_in_middle_of_column_throws([Values] bool async)

#endregion GetChars / GetTextReader

[Test, IssueLink("https://github.com/npgsql/npgsql/issues/5450")]
public async Task EndRead_StreamActive([Values]bool async)
{
if (IsMultiplexing)
return;

const int columnLength = 1;

await using var conn = await OpenConnectionAsync();
var buffer = conn.Connector!.ReadBuffer;
buffer.FilledBytes += columnLength;
var reader = buffer.PgReader;
reader.Init(columnLength, DataFormat.Binary, resumable: false);
if (async)
await reader.StartReadAsync(Size.Unknown, CancellationToken.None);
else
reader.StartRead(Size.Unknown);

await using (var _ = reader.GetStream())
{
if (async)
Assert.DoesNotThrowAsync(async () => await reader.EndReadAsync());
else
Assert.DoesNotThrow(() => reader.EndRead());
}

reader.Commit(resuming: false);
}

[Test, Description("Tests that everything goes well when a type handler generates a NpgsqlSafeReadException")]
public async Task SafeReadException()
{
Expand Down

0 comments on commit fe53868

Please sign in to comment.