Skip to content

Commit

Permalink
Queue storage extension support for QueueMessage & BinaryData (#1470, #…
Browse files Browse the repository at this point in the history
  • Loading branch information
liliankasem committed Jul 24, 2023
1 parent 1888c9a commit 93ceef1
Show file tree
Hide file tree
Showing 23 changed files with 798 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@

### Microsoft.Azure.Functions.Worker.Extensions.Storage.Queues <version>

- <entry>
- Add ability to bind a queue trigger to QueueMessage and BinaryData (#1470)
14 changes: 14 additions & 0 deletions extensions/Worker.Extensions.Storage.Queues/src/Constants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

namespace Microsoft.Azure.Functions.Worker.Storage.Queues
{
internal static class Constants
{
internal const string QueueExtensionName = "AzureStorageQueues";
internal const string QueueMessageText = "MessageText";

// Media content types
internal const string JsonContentType = "application/json";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Text.Json;
using System.Text.Json.Serialization;
using Azure.Storage.Queues.Models;

namespace Microsoft.Azure.Functions.Worker.Storage.Queues
{
internal class QueueMessageJsonConverter : JsonConverter<QueueMessage>
{
public override bool CanConvert(Type objectType) => objectType == typeof(QueueMessage);

public override QueueMessage? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
if (reader.TokenType is not JsonTokenType.StartObject)
{
throw new JsonException("JSON payload expected to start with StartObject token.");
}

string messageId = String.Empty;
string popReceipt = String.Empty;
string messageText = String.Empty;
long dequeueCount = 1;
DateTime? nextVisibleOn = null;
DateTime? insertedOn = null;
DateTime? expiresOn = null;

var startDepth = reader.CurrentDepth;

while (reader.Read())
{
if (reader.TokenType is JsonTokenType.EndObject && reader.CurrentDepth == startDepth)
{
return QueuesModelFactory.QueueMessage(
messageId,
popReceipt,
messageText,
dequeueCount,
nextVisibleOn,
insertedOn,
expiresOn
);
}

if (reader.TokenType is not JsonTokenType.PropertyName)
{
continue;
}

var propertyName = reader.GetString();
reader.Read();

switch (propertyName?.ToLowerInvariant())
{
case "messageid":
messageId = reader.GetString() ?? throw new JsonException("JSON payload must contain a MessageId.");
break;
case "popreceipt":
popReceipt = reader.GetString() ?? throw new JsonException("JSON payload must contain a PopReceipt.");
break;
case "messagetext":
messageText = reader.GetString() ?? throw new JsonException("JSOn payload must contain a MessageText.");
break;
case "dequeuecount":
dequeueCount = reader.GetInt64();
break;
case "nextvisibleon":
nextVisibleOn = reader.GetDateTime();
break;
case "insertedon":
insertedOn = reader.GetDateTime();
break;
case "expireson":
expiresOn = reader.GetDateTime();
break;
default:
break;
}
}

throw new JsonException("JSON payload expected to end with EndObject token.");
}

public override void Write(Utf8JsonWriter writer, QueueMessage value, JsonSerializerOptions options)
{
throw new JsonException($"Serialization is not supported by the {nameof(QueueMessageJsonConverter)}.");
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;
using System.Runtime.CompilerServices;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;

[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.Storage.Queues", "5.1.2")]
[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.Storage.Queues", "5.1.3")]
[assembly: InternalsVisibleTo("Microsoft.Azure.Functions.Worker.Extensions.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001005148be37ac1d9f58bd40a2e472c9d380d635b6048278f7d47480b08c928858f0f7fe17a6e4ce98da0e7a7f0b8c308aecd9e9b02d7e9680a5b5b75ac7773cec096fbbc64aebd429e77cb5f89a569a79b28e9c76426783f624b6b70327eb37341eb498a2c3918af97c4860db6cdca4732787150841e395a29cfacb959c1fd971c1")]
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System;
using Microsoft.Azure.Functions.Worker.Converters;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;

namespace Microsoft.Azure.Functions.Worker
{
[InputConverter(typeof(QueueMessageConverter))]
[InputConverter(typeof(QueueMessageBinaryDataConverter))]
[ConverterFallbackBehavior(ConverterFallbackBehavior.Default)]
public sealed class QueueTriggerAttribute : TriggerBindingAttribute
{
private readonly string _queueName;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Globalization;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker.Converters;
using Microsoft.Azure.Functions.Worker.Core;
using Microsoft.Azure.Functions.Worker.Extensions;
using Microsoft.Azure.Functions.Worker.Storage.Queues;

namespace Microsoft.Azure.Functions.Worker
{
internal abstract class QueueConverterBase<T> : IInputConverter
{
public QueueConverterBase()
{
}

public bool CanConvert(ConverterContext context)
{
if (context is null)
{
throw new ArgumentNullException(nameof(context));
}

if (context.TargetType != typeof(T))
{
return false;
}

if (context.Source is not ModelBindingData bindingData)
{
return false;
}

if (bindingData.Source is not Constants.QueueExtensionName)
{
throw new InvalidBindingSourceException(bindingData.Source, Constants.QueueExtensionName);
}

return true;
}

public async ValueTask<ConversionResult> ConvertAsync(ConverterContext context)
{
try
{
if (!CanConvert(context))
{
return ConversionResult.Unhandled();
}

var modelBindingData = (ModelBindingData)context.Source!;
var result = await ConvertCoreAsync(modelBindingData);
return ConversionResult.Success(result);
}
catch (JsonException ex)
{
string msg = String.Format(CultureInfo.CurrentCulture,
@"Binding parameters to complex objects uses JSON serialization.
1. Bind the parameter type as 'string' instead to get the raw values and avoid JSON deserialization, or
2. Change the queue payload to be valid json.");

return ConversionResult.Failed(new InvalidOperationException(msg, ex));
}
catch (Exception ex)
{
return ConversionResult.Failed(ex);
}
}

protected abstract ValueTask<T> ConvertCoreAsync(ModelBindingData data);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker.Converters;
using Microsoft.Azure.Functions.Worker.Core;
using Microsoft.Azure.Functions.Worker.Extensions;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;
using Microsoft.Azure.Functions.Worker.Storage.Queues;

namespace Microsoft.Azure.Functions.Worker
{
/// <summary>
/// Converter to bind to <see cref="BinaryData" /> type parameters.
/// </summary>
[SupportsDeferredBinding]
[SupportedTargetType(typeof(BinaryData))]
internal sealed class QueueMessageBinaryDataConverter : QueueConverterBase<BinaryData>
{
public QueueMessageBinaryDataConverter() : base()
{
}

protected override ValueTask<BinaryData> ConvertCoreAsync(ModelBindingData data)
{
return new ValueTask<BinaryData>(ExtractQueueMessageContent(data));
}

private BinaryData ExtractQueueMessageContent(ModelBindingData modelBindingData)
{
if (modelBindingData.ContentType is not Constants.JsonContentType)
{
throw new InvalidContentTypeException(modelBindingData.ContentType, Constants.JsonContentType);
}

var content = modelBindingData.Content.ToObjectFromJson<JsonElement>();
var messageText = content.GetProperty(Constants.QueueMessageText).ToString()
?? throw new InvalidOperationException($"The '{Constants.QueueMessageText}' property is missing or null.");

return new BinaryData(messageText);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Text.Json;
using System.Threading.Tasks;
using Azure.Storage.Queues.Models;
using Microsoft.Azure.Functions.Worker.Converters;
using Microsoft.Azure.Functions.Worker.Core;
using Microsoft.Azure.Functions.Worker.Extensions;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;
using Microsoft.Azure.Functions.Worker.Storage.Queues;

namespace Microsoft.Azure.Functions.Worker
{
/// <summary>
/// Converter to bind to <see cref="QueueMessage" /> type parameters.
/// </summary>
[SupportsDeferredBinding]
[SupportedTargetType(typeof(QueueMessage))]
internal sealed class QueueMessageConverter : QueueConverterBase<QueueMessage>
{
private readonly JsonSerializerOptions _jsonOptions;

public QueueMessageConverter() : base()
{
_jsonOptions = new() { Converters = { new QueueMessageJsonConverter() } };
}

protected override ValueTask<QueueMessage> ConvertCoreAsync(ModelBindingData data)
{
return new ValueTask<QueueMessage>(ExtractQueueMessage(data));
}

private QueueMessage ExtractQueueMessage(ModelBindingData modelBindingData)
{
if (modelBindingData.ContentType is not Constants.JsonContentType)
{
throw new InvalidContentTypeException(modelBindingData.ContentType, Constants.JsonContentType);
}

return modelBindingData.Content.ToObjectFromJson<QueueMessage>(_jsonOptions);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
<Description>Azure Queue Storage extensions for .NET isolated functions</Description>

<!--Version information-->
<VersionPrefix>5.1.2</VersionPrefix>
<VersionPrefix>5.1.3</VersionPrefix>
<VersionSuffix>-preview1</VersionSuffix>

<!--Temporarily opting out of documentation. Pending documentation-->
<GenerateDocumentationFile>false</GenerateDocumentationFile>
Expand All @@ -16,6 +17,15 @@

<ItemGroup>
<ProjectReference Include="..\..\Worker.Extensions.Abstractions\src\Worker.Extensions.Abstractions.csproj" />
<ProjectReference Include="..\..\..\src\DotNetWorker.Core\DotNetWorker.Core.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Storage.Queues" Version="12.13.1" />
</ItemGroup>

<ItemGroup>
<SharedReference Include="..\..\Worker.Extensions.Shared\Worker.Extensions.Shared.csproj" />
</ItemGroup>

</Project>
40 changes: 40 additions & 0 deletions samples/WorkerBindingSamples/Queue/QueueSamples.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Azure.Storage.Queues.Models;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;

namespace SampleApp
{
/// <summary>
/// Samples demonstrating binding to <see cref="QueueMessage"/> and <see cref="BinaryData"/> types.
/// </summary>
public class QueueSamples
{
private readonly ILogger<QueueSamples> _logger;

public QueueSamples(ILogger<QueueSamples> logger)
{
_logger = logger;
}

/// <summary>
/// This function demonstrates binding to a single <see cref="QueueMessage"/>.
/// </summary>
[Function(nameof(QueueMessageFunction))]
public void QueueMessageFunction([QueueTrigger("input-queue")] QueueMessage message)
{
_logger.LogInformation(message.MessageText);
}

/// <summary>
/// This function demonstrates binding to a single <see cref="BinaryData"/>.
/// </summary>
[Function(nameof(QueueBinaryDataFunction))]
public void QueueBinaryDataFunction([QueueTrigger("input-queue-binarydata")] BinaryData message)
{
_logger.LogInformation(message.ToString());
}
}
}
2 changes: 1 addition & 1 deletion test/DotNetWorkerTests/DotNetWorkerTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\extensions\Worker.Extensions.Storage.Blobs\src\Worker.Extensions.Storage.Blobs.csproj" />
<ProjectReference Include="..\..\extensions\Worker.Extensions.Abstractions\src\Worker.Extensions.Abstractions.csproj" />
<ProjectReference Include="..\..\extensions\Worker.Extensions.Http.AspNetCore\src\Worker.Extensions.Http.AspNetCore.csproj" />
<ProjectReference Include="..\..\extensions\Worker.Extensions.Http\src\Worker.Extensions.Http.csproj" />
<ProjectReference Include="..\..\extensions\Worker.Extensions.Storage\src\Worker.Extensions.Storage.csproj" />
<ProjectReference Include="..\..\src\DotNetWorker.ApplicationInsights\DotNetWorker.ApplicationInsights.csproj" />
<ProjectReference Include="..\..\src\DotNetWorker\DotNetWorker.csproj" />
<ProjectReference Include="..\TestUtility\TestUtility.csproj" />
Expand Down

0 comments on commit 93ceef1

Please sign in to comment.