Skip to content

Commit

Permalink
Redo binary exporter column reading
Browse files Browse the repository at this point in the history
Fixes #5457
  • Loading branch information
NinoFloris committed Nov 27, 2023
1 parent 7b515c7 commit 3e328b6
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 74 deletions.
149 changes: 75 additions & 74 deletions src/Npgsql/NpgsqlBinaryExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public sealed class NpgsqlBinaryExporter : ICancelable
/// <summary>
/// The number of columns, as returned from the backend in the CopyInResponse.
/// </summary>
internal int NumColumns { get; private set; }
int NumColumns { get; set; }

PgConverterInfo[] _columnInfoCache;

Expand Down Expand Up @@ -140,7 +140,6 @@ async Task ReadHeader(bool async)

async ValueTask<int> StartRow(bool async, CancellationToken cancellationToken = default)
{

CheckDisposed();
if (_isConsumed)
return -1;
Expand All @@ -149,7 +148,10 @@ async ValueTask<int> StartRow(bool async, CancellationToken cancellationToken =

// Consume and advance any active column.
if (_column >= 0)
await Commit(async, resumableOp: false).ConfigureAwait(false);
{
await Commit(async).ConfigureAwait(false);
_column++;
}

// The very first row (i.e. _column == -1) is included in the header's CopyData message.
// Otherwise we need to read in a new CopyData row (the docs specify that there's a CopyData
Expand Down Expand Up @@ -210,29 +212,6 @@ public ValueTask<T> ReadAsync<T>(CancellationToken cancellationToken = default)
ValueTask<T> Read<T>(bool async, CancellationToken cancellationToken = default)
=> Read<T>(async, null, cancellationToken);

PgConverterInfo CreateConverterInfo(Type type, NpgsqlDbType? npgsqlDbType = null)
{
var options = _connector.SerializerOptions;
PgTypeId? pgTypeId = null;
if (npgsqlDbType.HasValue)
{
pgTypeId = npgsqlDbType.Value.ToDataTypeName() is { } name
? options.GetCanonicalTypeId(name)
// Handle plugin types via lookup.
: GetRepresentationalOrDefault(npgsqlDbType.Value.ToUnqualifiedDataTypeNameOrThrow());
}
var info = options.GetTypeInfo(type, pgTypeId)
?? throw new NotSupportedException($"Reading is not supported for type '{type}'{(npgsqlDbType is null ? "" : $" and NpgsqlDbType '{npgsqlDbType}'")}");
// Binary export has no type info so we only do caller-directed interpretation of data.
return info.Bind(new Field("?", info.PgTypeId!.Value, -1), DataFormat.Binary);

PgTypeId GetRepresentationalOrDefault(string dataTypeName)
{
var type = options.DatabaseInfo.GetPostgresType(dataTypeName);
return options.ToCanonicalTypeId(type.GetRepresentationalType());
}
}

/// <summary>
/// Reads the current column, returns its value according to <paramref name="type"/> and
/// moves ahead to the next column.
Expand Down Expand Up @@ -269,39 +248,22 @@ public ValueTask<T> ReadAsync<T>(NpgsqlDbType type, CancellationToken cancellati

async ValueTask<T> Read<T>(bool async, NpgsqlDbType? type, CancellationToken cancellationToken)
{
CheckDisposed();
if (_column is BeforeRow)
ThrowHelper.ThrowInvalidOperationException("Not reading a row");
CheckOnRow();

using var registration = _connector.StartNestedCancellableOperation(cancellationToken, attemptPgCancellation: false);

// Allow one more read if the field is a db null.
// We cannot allow endless rereads otherwise it becomes quite unclear when a column advance happens.
if (PgReader is { Initialized: true, Resumable: true, FieldSize: -1 })
{
await Commit(async, resumableOp: false).ConfigureAwait(false);
return DbNullOrThrow();
}
if (!IsInitializedAndAtStart)
await MoveNextColumn(async, resumableOp: false).ConfigureAwait(false);

// We must commit the current column before reading the next one unless it was an IsNull call.
PgConverterInfo info;
bool asObject;
if (!PgReader.Initialized || !PgReader.Resumable || PgReader.CurrentRemaining != PgReader.FieldSize)
if (PgReader.FieldSize is (-1 or 0) and var fieldSize)
{
await Commit(async, resumableOp: false).ConfigureAwait(false);
info = GetInfo(type, out asObject);

// We need to get info after potential I/O as we don't know beforehand at what column we're at.
var columnLen = await ReadColumnLenIfNeeded(async, resumableOp: false).ConfigureAwait(false);
if (_column == NumColumns)
ThrowHelper.ThrowInvalidOperationException("No more columns left in the current row");

if (columnLen is -1)
// Commit, otherwise we'll have no way of knowing this column is finished.
await Commit(async).ConfigureAwait(false);
if (fieldSize is -1)
return DbNullOrThrow();

}
else
info = GetInfo(type, out asObject);

var info = GetInfo(type, out var asObject);

T result;
if (async)
Expand All @@ -323,6 +285,14 @@ async ValueTask<T> Read<T>(bool async, NpgsqlDbType? type, CancellationToken can

return result;

static T DbNullOrThrow()
{
// When T is a Nullable<T>, we support returning null
if (default(T) is null && typeof(T).IsValueType)
return default!;
throw new InvalidCastException("Column is null");
}

PgConverterInfo GetInfo(NpgsqlDbType? type, out bool asObject)
{
ref var cachedInfo = ref _columnInfoCache[_column];
Expand All @@ -331,12 +301,27 @@ PgConverterInfo GetInfo(NpgsqlDbType? type, out bool asObject)
return converterInfo;
}

T DbNullOrThrow()
PgConverterInfo CreateConverterInfo(Type type, NpgsqlDbType? npgsqlDbType = null)
{
// When T is a Nullable<T>, we support returning null
if (default(T) is null && typeof(T).IsValueType)
return default!;
throw new InvalidCastException("Column is null");
var options = _connector.SerializerOptions;
PgTypeId? pgTypeId = null;
if (npgsqlDbType.HasValue)
{
pgTypeId = npgsqlDbType.Value.ToDataTypeName() is { } name
? options.GetCanonicalTypeId(name)
// Handle plugin types via lookup.
: GetRepresentationalOrDefault(npgsqlDbType.Value.ToUnqualifiedDataTypeNameOrThrow());
}
var info = options.GetTypeInfo(type, pgTypeId)
?? throw new NotSupportedException($"Reading is not supported for type '{type}'{(npgsqlDbType is null ? "" : $" and NpgsqlDbType '{npgsqlDbType}'")}");
// Binary export has no type info so we only do caller-directed interpretation of data.
return info.Bind(new Field("?", info.PgTypeId!.Value, -1), DataFormat.Binary);

PgTypeId GetRepresentationalOrDefault(string dataTypeName)
{
var type = options.DatabaseInfo.GetPostgresType(dataTypeName);
return options.ToCanonicalTypeId(type.GetRepresentationalType());
}
}
}

Expand All @@ -347,8 +332,11 @@ public bool IsNull
{
get
{
Commit(async: false, resumableOp: true);
return ReadColumnLenIfNeeded(async: false, resumableOp: true).GetAwaiter().GetResult() is -1;
CheckOnRow();
if (!IsInitializedAndAtStart)
return MoveNextColumn(async: false, resumableOp: true).GetAwaiter().GetResult() is -1;

return PgReader.FieldSize is - 1;
}
}

Expand All @@ -365,45 +353,58 @@ public Task SkipAsync(CancellationToken cancellationToken = default)

async Task Skip(bool async, CancellationToken cancellationToken = default)
{
CheckDisposed();
CheckOnRow();

using var registration = _connector.StartNestedCancellableOperation(cancellationToken);

// We allow IsNull to have been called before skip.
if (PgReader.Initialized && PgReader is not { Resumable: true, FieldSize: -1 })
await Commit(async, resumableOp: false).ConfigureAwait(false);
await ReadColumnLenIfNeeded(async, resumableOp: false).ConfigureAwait(false);
if (!IsInitializedAndAtStart)
await MoveNextColumn(async, resumableOp: false).ConfigureAwait(false);

await PgReader.Consume(async, cancellationToken: cancellationToken).ConfigureAwait(false);

// Commit, otherwise we'll have no way of knowing this column is finished.
if (PgReader.FieldSize is -1 or 0)
await Commit(async).ConfigureAwait(false);
}

#endregion

#region Utilities

ValueTask Commit(bool async, bool resumableOp)
{
var resuming = PgReader is { Initialized: true, Resumable: true } && resumableOp;
if (!resuming)
_column++;
bool IsInitializedAndAtStart => PgReader.Initialized && (PgReader.FieldSize is -1 || PgReader.FieldOffset is 0);

ValueTask Commit(bool async)
{
if (async)
return PgReader.CommitAsync(resuming);
return PgReader.CommitAsync(resuming: false);

PgReader.Commit(resuming);
PgReader.Commit(resuming: false);
return new();
}

async ValueTask<int> ReadColumnLenIfNeeded(bool async, bool resumableOp)
async ValueTask<int> MoveNextColumn(bool async, bool resumableOp)
{
if (PgReader is { Initialized: true, Resumable: true, FieldSize: -1 })
return -1;
if (async)
await PgReader.CommitAsync(resuming: false).ConfigureAwait(false);
else
PgReader.Commit(resuming: false);

if (_column + 1 == NumColumns)
ThrowHelper.ThrowInvalidOperationException("No more columns left in the current row");
_column++;
await _buf.Ensure(4, async).ConfigureAwait(false);
var columnLen = _buf.ReadInt32();
PgReader.Init(columnLen, DataFormat.Binary, resumableOp);
return PgReader.FieldSize;
}

void CheckOnRow()
{
CheckDisposed();
if (_column is BeforeRow)
ThrowHelper.ThrowInvalidOperationException("Not reading a row");
}

void CheckDisposed()
{
if (_isDisposed)
Expand Down
52 changes: 52 additions & 0 deletions test/Npgsql.Tests/CopyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,58 @@ public async Task Wrong_table_definition_binary_export()
Assert.That(await conn.ExecuteScalarAsync("SELECT 1"), Is.EqualTo(1));
}

[Test, IssueLink("https://github.com/npgsql/npgsql/issues/5457")]
public async Task MixedOperations()
{
if (IsMultiplexing)
Assert.Ignore("Multiplexing: fails");
using var conn = await OpenConnectionAsync();

var reader = conn.BeginBinaryExport("""
COPY (values ('foo', 1), ('bar', null), (null, 2)) TO STDOUT BINARY
""");
while(reader.StartRow() != -1)
{
string? col1 = null;
if (reader.IsNull)
reader.Skip();
else
col1 = reader.Read<string>();
int? col2 = null;
if (reader.IsNull)
reader.Skip();
else
col2 = reader.Read<int>();
}
}

[Test]
public async Task ReadMoreColumnsThanExist()
{
if (IsMultiplexing)
Assert.Ignore("Multiplexing: fails");
using var conn = await OpenConnectionAsync();

var reader = conn.BeginBinaryExport("""
COPY (values ('foo', 1), ('bar', null), (null, 2)) TO STDOUT BINARY
""");
while(reader.StartRow() != -1)
{
string? col1 = null;
if (reader.IsNull)
reader.Skip();
else
col1 = reader.Read<string>();
int? col2 = null;
if (reader.IsNull)
reader.Skip();
else
col2 = reader.Read<int>();

Assert.Throws<InvalidOperationException>(() => _ = reader.IsNull);
}
}

[Test, IssueLink("https://github.com/npgsql/npgsql/issues/2330")]
public async Task Wrong_format_binary_export()
{
Expand Down

0 comments on commit 3e328b6

Please sign in to comment.