Skip to content

Commit

Permalink
Use pipelining to begin a transaction. Fixes #1286
Browse files Browse the repository at this point in the history
  • Loading branch information
bgrainger committed Jan 24, 2024
1 parent 957dd42 commit e400950
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 12 deletions.
35 changes: 30 additions & 5 deletions src/MySqlConnector/Core/ServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -586,15 +586,13 @@ public async Task<bool> TryResetConnectionAsync(ConnectionSettings cs, MySqlConn
Log.SendingPipelinedResetConnectionRequest(m_logger, Id, ServerVersion.OriginalString);

// send both packets at once
await m_payloadHandler!.ByteHandler.WriteBytesAsync(m_pipelinedResetConnectionBytes!, ioBehavior).ConfigureAwait(false);
await SendRawAsync(m_pipelinedResetConnectionBytes, ioBehavior, cancellationToken).ConfigureAwait(false);

// read two OK replies
m_payloadHandler.SetNextSequenceNumber(1);
payload = await ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
payload = await ReceiveReplyAsync(1, ioBehavior, cancellationToken).ConfigureAwait(false);
OkPayload.Verify(payload.Span, SupportsDeprecateEof, SupportsSessionTrack);

m_payloadHandler.SetNextSequenceNumber(1);
payload = await ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
payload = await ReceiveReplyAsync(1, ioBehavior, cancellationToken).ConfigureAwait(false);
OkPayload.Verify(payload.Span, SupportsDeprecateEof, SupportsSessionTrack);

return true;
Expand Down Expand Up @@ -896,6 +894,12 @@ public async ValueTask<PayloadData> ReceiveReplyAsync(IOBehavior ioBehavior, Can
return payload;
}

public ValueTask<PayloadData> ReceiveReplyAsync(int expectedSequenceNumber, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
m_payloadHandler!.SetNextSequenceNumber(expectedSequenceNumber);
return ReceiveReplyAsync(ioBehavior, cancellationToken);
}

// Continues a conversation with the server by sending a reply to a packet received with 'Receive' or 'ReceiveReply'.
public async ValueTask SendReplyAsync(PayloadData payload, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
Expand All @@ -916,6 +920,27 @@ public async ValueTask SendReplyAsync(PayloadData payload, IOBehavior ioBehavior
}
}

// Sends raw bytes over the wire without formatting them into a payload. The caller is expected to set the packet header(s) correctly.
public async ValueTask SendRawAsync(ReadOnlyMemory<byte> data, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
if (CreateExceptionForInvalidState() is { } exception)
{
Log.FailedInSendReplyAsync(m_logger, exception, Id);
throw exception;
}

try
{
// send both packets at once
await m_payloadHandler!.ByteHandler.WriteBytesAsync(data, ioBehavior).ConfigureAwait(false);
}
catch (Exception ex)
{
SetFailed(ex);
throw;
}
}

public static void ThrowIfStatementContainsDelimiter(MySqlException exception, IMySqlCommand command)
{
// check if the command used "DELIMITER"
Expand Down
29 changes: 22 additions & 7 deletions src/MySqlConnector/MySqlConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,31 @@ private async ValueTask<MySqlTransaction> BeginTransactionAsync(IsolationLevel i
// get the bytes for both payloads concatenated together (suitable for pipelining)
var startTransactionPayload = GetStartTransactionPayload(isolationLevel, isReadOnly, m_session!.SupportsQueryAttributes);

// send the two packets separately
await m_session.SendAsync(new Protocol.PayloadData(startTransactionPayload.Slice(4, startTransactionPayload.Span[0])), ioBehavior, cancellationToken).ConfigureAwait(false);
if (GetInitializedConnectionSettings() is { UseCompression: false, Pipelining: not false })
{
// send the two packets together
await m_session.SendRawAsync(startTransactionPayload, ioBehavior, cancellationToken).ConfigureAwait(false);

var payload = await m_session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
OkPayload.Verify(payload.Span, m_session.SupportsDeprecateEof, m_session.SupportsSessionTrack);
// read the two OK replies
var payload = await m_session.ReceiveReplyAsync(1, ioBehavior, cancellationToken).ConfigureAwait(false);
OkPayload.Verify(payload.Span, m_session.SupportsDeprecateEof, m_session.SupportsSessionTrack);

payload = await m_session.ReceiveReplyAsync(1, ioBehavior, cancellationToken).ConfigureAwait(false);
OkPayload.Verify(payload.Span, m_session.SupportsDeprecateEof, m_session.SupportsSessionTrack);
}
else
{
// send the two packets separately
await m_session.SendAsync(new Protocol.PayloadData(startTransactionPayload.Slice(4, startTransactionPayload.Span[0])), ioBehavior, cancellationToken).ConfigureAwait(false);

await m_session.SendAsync(new Protocol.PayloadData(startTransactionPayload.Slice(8 + startTransactionPayload.Span[0], startTransactionPayload.Span[startTransactionPayload.Span[0] + 4])), ioBehavior, cancellationToken).ConfigureAwait(false);
var payload = await m_session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
OkPayload.Verify(payload.Span, m_session.SupportsDeprecateEof, m_session.SupportsSessionTrack);

payload = await m_session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
OkPayload.Verify(payload.Span, m_session.SupportsDeprecateEof, m_session.SupportsSessionTrack);
await m_session.SendAsync(new Protocol.PayloadData(startTransactionPayload.Slice(8 + startTransactionPayload.Span[0], startTransactionPayload.Span[startTransactionPayload.Span[0] + 4])), ioBehavior, cancellationToken).ConfigureAwait(false);

payload = await m_session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
OkPayload.Verify(payload.Span, m_session.SupportsDeprecateEof, m_session.SupportsSessionTrack);
}

var transaction = new MySqlTransaction(this, isolationLevel);
CurrentTransaction = transaction;
Expand Down

0 comments on commit e400950

Please sign in to comment.