Skip to content

Commit

Permalink
[#566] Support custom message format (no custom encoding)
Browse files Browse the repository at this point in the history
  • Loading branch information
xinchen10 committed May 1, 2023
1 parent c7596e6 commit ff3ea9e
Show file tree
Hide file tree
Showing 31 changed files with 881 additions and 59 deletions.
3 changes: 3 additions & 0 deletions csproj/Amqp.Net.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
<Compile Include="..\src\CreditMode.cs">
<Link>CreditMode.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DataList.cs">
<Link>Framing\DataList.cs</Link>
</Compile>
<Compile Include="..\src\Handler\Event.cs">
<Link>Handler\Event.cs</Link>
</Compile>
Expand Down
3 changes: 3 additions & 0 deletions csproj/Amqp.Net35.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
<Compile Include="..\src\CreditMode.cs">
<Link>CreditMode.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DataList.cs">
<Link>Framing\DataList.cs</Link>
</Compile>
<Compile Include="..\src\Handler\Event.cs">
<Link>Handler\Event.cs</Link>
</Compile>
Expand Down
3 changes: 3 additions & 0 deletions csproj/Amqp.Net40.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
<Compile Include="..\src\CreditMode.cs">
<Link>CreditMode.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DataList.cs">
<Link>Framing\DataList.cs</Link>
</Compile>
<Compile Include="..\src\Handler\Event.cs">
<Link>Handler\Event.cs</Link>
</Compile>
Expand Down
3 changes: 3 additions & 0 deletions csproj/Amqp.NetCore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@
<Compile Include="..\src\Framing\Data.cs">
<Link>Framing\Data.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DataList.cs">
<Link>Framing\DataList.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DeliveryAnnotations.cs">
<Link>Framing\DeliveryAnnotations.cs</Link>
</Compile>
Expand Down
3 changes: 3 additions & 0 deletions csproj/Amqp.Uwp.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@
<Compile Include="..\src\Framing\Data.cs">
<Link>Framing\Data.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DataList.cs">
<Link>Framing\DataList.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DeliveryAnnotations.cs">
<Link>Framing\DeliveryAnnotations.cs</Link>
</Compile>
Expand Down
27 changes: 27 additions & 0 deletions docs/articles/azure_eventhubs.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,30 @@ value: a described string: descriptor=symbol(“apache.org:selector-filter:strin
The offset filter is perferred as it is more performant than the enqueued-time filter. The enqueued-time filter is for rare cases when you lose the checkpoint data and have to go back a certain period of time in history. The following special offsets are defined.
'-1': beginning of the event stream.
'@latest': end of the even stream, in other words, all new events after the link is attached.

## Batching in message sender
Azure Event Hubs supports an extended message format (0x80013700) which allows a sender to pack multiple messages into one AMQP message.
It is intended to help applications publish messages more efficiently, especially with small messages and high-latency networks.
The envelop message is a standard AMQP 1.0 message with multiple Data sections, each of which contains one encoded payload message in its
binary value. On the service side, the payload messages are extracted and delivered to receivers individually.
The `MessageBatch` class in the test project illustrates how such batch messages can be created.
```
public class MessageBatch : Message
{
public const uint BatchFormat = 0x80013700;
public static MessageBatch Create<T>(IEnumerable<T> objects)
{
DataList dataList = new DataList();
foreach (var obj in objects)
{
ByteBuffer buffer = new ByteBuffer(1024, true);
var section = new AmqpValue<T>(obj);
AmqpSerializer.Serialize(buffer, section);
dataList.Add(new Data() { Buffer = buffer });
}
return new MessageBatch() { Format = BatchFormat, BodySection = dataList };
}
}
```
3 changes: 3 additions & 0 deletions nanoFramework/Amqp.nanoFramework/Amqp.nanoFramework.nfproj
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@
<Compile Include="..\..\src\Framing\Data.cs">
<Link>Framing\Data.cs</Link>
</Compile>
<Compile Include="..\..\src\Framing\DataList.cs">
<Link>Framing\DataList.cs</Link>
</Compile>
<Compile Include="..\..\src\Framing\DeliveryAnnotations.cs">
<Link>Framing\DeliveryAnnotations.cs</Link>
</Compile>
Expand Down
3 changes: 3 additions & 0 deletions netmf/Amqp.NetMF42.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
<Compile Include="..\src\CreditMode.cs">
<Link>CreditMode.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DataList.cs">
<Link>Framing\DataList.cs</Link>
</Compile>
<Compile Include="..\src\Handler\Event.cs">
<Link>Handler\Event.cs</Link>
</Compile>
Expand Down
3 changes: 3 additions & 0 deletions netmf/Amqp.NetMF43.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@
<Compile Include="..\src\Framing\Data.cs">
<Link>Framing\Data.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DataList.cs">
<Link>Framing\DataList.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DeliveryAnnotations.cs">
<Link>Framing\DeliveryAnnotations.cs</Link>
</Compile>
Expand Down
3 changes: 3 additions & 0 deletions netmf/Amqp.NetMF44.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@
<Compile Include="..\src\Framing\Data.cs">
<Link>Framing\Data.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DataList.cs">
<Link>Framing\DataList.cs</Link>
</Compile>
<Compile Include="..\src\Framing\DeliveryAnnotations.cs">
<Link>Framing\DeliveryAnnotations.cs</Link>
</Compile>
Expand Down
2 changes: 1 addition & 1 deletion src/ByteBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void Complete(int size)
/// <summary>
/// Sets the read position.
/// </summary>
/// <param name="seekPosition">Position to set.</param>
/// <param name="seekPosition">The position relative to <see cref="Offset"/> of the buffer.</param>
public void Seek(int seekPosition)
{
Fx.Assert(seekPosition >= 0, "seekPosition must not be negative.");
Expand Down
10 changes: 10 additions & 0 deletions src/Framing/Data.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public ByteBuffer Buffer
set;
}

internal int Length
{
get { return this.Buffer == null ? 0 : this.Buffer.Length; }
}

internal override void EncodeValue(ByteBuffer buffer)
{
if (this.Buffer != null)
Expand All @@ -93,6 +98,11 @@ public byte[] Binary
set { this.binary = value; }
}

internal int Length
{
get { return this.Binary == null ? 0 : this.Binary.Length; }
}

internal override void EncodeValue(ByteBuffer buffer)
{
Encoder.WriteBinary(buffer, this.binary, true);
Expand Down
138 changes: 138 additions & 0 deletions src/Framing/DataList.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------

namespace Amqp.Framing
{
using System;
using Amqp.Types;

/// <summary>
/// Represents one or more <see cref="Data"/> sections, typically used as a message body.
/// </summary>
/// <remarks>As a message body, the list is encoded as continuous Data sections without
/// the list encoding preamble (format code, size and count). If the list is empty, it is
/// equivalent to one Data section with empty binary data.</remarks>
public sealed class DataList : RestrictedDescribed
{
Data[] array;
int count;

/// <summary>
/// Initializes a Data object.
/// </summary>
public DataList()
: base(Codec.Data)
{
}

/// <summary>
/// Gets the number of elements.
/// </summary>
public int Count
{
get { return this.count; }
}

/// <summary>
/// Gets the <see cref="Data"/> element at index. Caller should check <see cref="Count"/>
/// before calling this method to ensure that index is valid.
/// </summary>
/// <param name="index">The zero-based index.</param>
/// <returns>The <see cref="Data"/> element at index.</returns>
public Data this[int index]
{
get { return this.array[index]; }
}

/// <summary>
/// Adds a <see cref="Data"/> section.
/// </summary>
/// <param name="data"></param>
public void Add(Data data)
{
if (this.array == null)
{
this.array = new Data[4];
}
else if (this.count == this.array.Length)
{
var temp = new Data[this.count * 2];
Array.Copy(this.array, temp, this.count);
this.array = temp;
}

this.array[this.count++] = data;
}

/// <inheritdoc cref="Object.GetHashCode()" />
public override int GetHashCode()
{
return base.GetHashCode();
}

/// <inheritdoc cref="Object.Equals(object)" />
public override bool Equals(object obj)
{
var data = obj as Data;
if (data != null)
{
if (data.Length == 0 && this.count == 0)
{
return true;
}
}

return base.Equals(obj);
}

internal Data[] ToArray()
{
if (this.array.Length == this.count)
{
return this.array;
}

var copy = new Data[this.count];
Array.Copy(this.array, copy, this.count);
return copy;
}

internal override void EncodeValue(ByteBuffer buffer)
{
if (this.count == 0)
{
// Encode this as an empty binary.
AmqpBitConverter.WriteUByte(buffer, FormatCode.Binary8);
AmqpBitConverter.WriteUByte(buffer, 0);
}
else
{
this.array[0].EncodeValue(buffer);
for (int i = 1; i < this.count; i++)
{
this.array[i].Encode(buffer);
}
}
}

internal override void DecodeValue(ByteBuffer buffer)
{
// Should never be called directly.
throw new InvalidOperationException();
}
}
}
19 changes: 10 additions & 9 deletions src/Listener/ListenerLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class ListenerLink : Link
// receive
bool autoRestore;
int restored;
Delivery deliveryCurrent;
MessageDelivery deliveryCurrent;
Action<ListenerLink, Message, DeliveryState, object> onMessage;

/// <summary>
Expand Down Expand Up @@ -435,23 +435,21 @@ internal override void OnTransfer(Delivery delivery, Transfer transfer, ByteBuff
{
if (delivery != null)
{
this.deliveryCurrent = new MessageDelivery(delivery, transfer.MessageFormat);
buffer.AddReference();
delivery.Buffer = buffer;
this.deliveryCount++;
}
else
{
delivery = this.deliveryCurrent;
delivery = this.deliveryCurrent.Delivery;
AmqpBitConverter.WriteBytes(delivery.Buffer, buffer.Buffer, buffer.Offset, buffer.Length);
}

if (!transfer.More)
{
this.DeliverMessage(delivery);
}
else
{
this.deliveryCurrent = delivery;
this.DeliverMessage(this.deliveryCurrent);
this.deliveryCurrent = MessageDelivery.None;
}
}

Expand Down Expand Up @@ -489,10 +487,13 @@ static void ThrowIfNotNull(object obj, string name)
}
}

void DeliverMessage(Delivery delivery)
void DeliverMessage(MessageDelivery messageDelivery)
{
var container = ((ListenerConnection)this.Session.Connection).Listener.Container;
delivery.Message = container.CreateMessage(delivery.Buffer);
Delivery delivery = messageDelivery.Delivery;
var message = container.CreateMessage(delivery.Buffer);
message.Format = messageDelivery.MessageFormat;
delivery.Message = message;

IHandler handler = this.Session.Connection.Handler;
if (handler != null && handler.CanHandle(EventId.SendDelivery))
Expand Down

0 comments on commit ff3ea9e

Please sign in to comment.