Skip to content

Commit

Permalink
Retrypolicy (Azure#110)
Browse files Browse the repository at this point in the history
Implemented an abstract RetryPolicy class with two implementations: NoRetry and RetryExponential
(Issue Azure#7)
  • Loading branch information
nemakam authored and mentat9 committed Jun 10, 2019
1 parent 900870e commit 1ad05fd
Show file tree
Hide file tree
Showing 36 changed files with 759 additions and 197 deletions.
30 changes: 26 additions & 4 deletions src/Microsoft.Azure.ServiceBus/Amqp/AmqpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,22 @@ class AmqpClient : IInnerSenderReceiver
MessageSender innerSender;
MessageReceiver innerReceiver;

internal AmqpClient(ServiceBusConnection servicebusConnection, string entityPath, MessagingEntityType entityType, ReceiveMode mode = ReceiveMode.ReceiveAndDelete)
internal AmqpClient(
ServiceBusConnection servicebusConnection,
string entityPath,
MessagingEntityType entityType,
RetryPolicy retryPolicy,
ReceiveMode mode = ReceiveMode.ReceiveAndDelete)
{
this.ServiceBusConnection = servicebusConnection;
this.EntityPath = entityPath;
this.MessagingEntityType = entityType;
this.ReceiveMode = mode;
this.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(servicebusConnection.SasKeyName, servicebusConnection.SasKey);
this.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(
servicebusConnection.SasKeyName,
servicebusConnection.SasKey);
this.CbsTokenProvider = new TokenProviderAdapter(this.TokenProvider, servicebusConnection.OperationTimeout);
this.RetryPolicy = retryPolicy;
}

public MessageSender InnerSender
Expand Down Expand Up @@ -71,6 +79,8 @@ public MessageReceiver InnerReceiver

internal ICbsTokenProvider CbsTokenProvider { get; }

internal RetryPolicy RetryPolicy { get; }

protected object ThisLock { get; } = new object();

TokenProvider TokenProvider { get; }
Expand All @@ -85,12 +95,24 @@ public async Task CloseAsync()

MessageSender CreateMessageSender()
{
return new AmqpMessageSender(this.EntityPath, this.MessagingEntityType, this.ServiceBusConnection, this.CbsTokenProvider);
return new AmqpMessageSender(
this.EntityPath,
this.MessagingEntityType,
this.ServiceBusConnection,
this.CbsTokenProvider,
this.RetryPolicy);
}

MessageReceiver CreateMessageReceiver()
{
return new AmqpMessageReceiver(this.EntityPath, this.MessagingEntityType, this.ReceiveMode, this.ServiceBusConnection.PrefetchCount, this.ServiceBusConnection, this.CbsTokenProvider);
return new AmqpMessageReceiver(
this.EntityPath,
this.MessagingEntityType,
this.ReceiveMode,
this.ServiceBusConnection.PrefetchCount,
this.ServiceBusConnection,
this.CbsTokenProvider,
this.RetryPolicy);
}
}
}
24 changes: 20 additions & 4 deletions src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,29 @@ sealed class AmqpMessageReceiver : MessageReceiver
string sessionId;
DateTime lockedUntilUtc;

internal AmqpMessageReceiver(string entityName, MessagingEntityType? entityType, ReceiveMode mode, int prefetchCount, ServiceBusConnection serviceBusConnection, ICbsTokenProvider cbsTokenProvider)
: this(entityName, entityType, mode, prefetchCount, serviceBusConnection, cbsTokenProvider, null)
internal AmqpMessageReceiver(
string entityName,
MessagingEntityType? entityType,
ReceiveMode mode,
int prefetchCount,
ServiceBusConnection serviceBusConnection,
ICbsTokenProvider cbsTokenProvider,
RetryPolicy retryPolicy)
: this(entityName, entityType, mode, prefetchCount, serviceBusConnection, cbsTokenProvider, null, retryPolicy)
{
}

internal AmqpMessageReceiver(string entityName, MessagingEntityType? entityType, ReceiveMode mode, int prefetchCount, ServiceBusConnection serviceBusConnection, ICbsTokenProvider cbsTokenProvider, string sessionId, bool isSessionReceiver = false)
: base(mode, serviceBusConnection.OperationTimeout)
internal AmqpMessageReceiver(
string entityName,
MessagingEntityType? entityType,
ReceiveMode mode,
int prefetchCount,
ServiceBusConnection serviceBusConnection,
ICbsTokenProvider cbsTokenProvider,
string sessionId,
RetryPolicy retryPolicy,
bool isSessionReceiver = false)
: base(mode, serviceBusConnection.OperationTimeout, retryPolicy)
{
this.entityName = entityName;
this.EntityType = entityType;
Expand Down
4 changes: 2 additions & 2 deletions src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ sealed class AmqpMessageSender : MessageSender
{
int deliveryCount;

internal AmqpMessageSender(string entityName, MessagingEntityType? entityType, ServiceBusConnection serviceBusConnection, ICbsTokenProvider cbsTokenProvider)
: base(serviceBusConnection.OperationTimeout)
internal AmqpMessageSender(string entityName, MessagingEntityType? entityType, ServiceBusConnection serviceBusConnection, ICbsTokenProvider cbsTokenProvider, RetryPolicy retryPolicy)
: base(serviceBusConnection.OperationTimeout, retryPolicy)
{
this.Path = entityName;
this.EntityType = entityType;
Expand Down
4 changes: 2 additions & 2 deletions src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ namespace Microsoft.Azure.ServiceBus.Amqp

class AmqpMessageSession : MessageSession
{
public AmqpMessageSession(string sessionId, DateTime lockedUntilUtc, MessageReceiver innerMessageReceiver)
: base(innerMessageReceiver.ReceiveMode, sessionId, lockedUntilUtc, innerMessageReceiver)
public AmqpMessageSession(string sessionId, DateTime lockedUntilUtc, MessageReceiver innerMessageReceiver, RetryPolicy retryPolicy)
: base(innerMessageReceiver.ReceiveMode, sessionId, lockedUntilUtc, innerMessageReceiver, retryPolicy)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ internal sealed class AmqpSubscriptionClient : AmqpClient, IInnerSubscriptionCli
ServiceBusConnection servicebusConnection,
string entityPath,
MessagingEntityType entityType,
RetryPolicy retryPolicy,
ReceiveMode mode = ReceiveMode.ReceiveAndDelete)
: base(servicebusConnection, entityPath, entityType, mode)
: base(servicebusConnection, entityPath, entityType, retryPolicy, mode)
{
}

Expand Down
2 changes: 2 additions & 0 deletions src/Microsoft.Azure.ServiceBus/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,7 @@ static class Constants
public static readonly TimeSpan MinimumLockDuration = TimeSpan.FromSeconds(5);

public static readonly TimeSpan MaximumRenewBufferDuration = TimeSpan.FromSeconds(10);

public static readonly TimeSpan DefaultRetryDeltaBackoff = TimeSpan.FromSeconds(3);
}
}
68 changes: 54 additions & 14 deletions src/Microsoft.Azure.ServiceBus/Core/MessageReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ internal abstract class MessageReceiver : ClientEntity, IMessageReceiver
MessageReceivePump receivePump;
CancellationTokenSource receivePumpCancellationTokenSource;

protected MessageReceiver(ReceiveMode receiveMode, TimeSpan operationTimeout)
: base(nameof(MessageReceiver) + StringUtility.GetRandomString())
protected MessageReceiver(ReceiveMode receiveMode, TimeSpan operationTimeout, RetryPolicy retryPolicy)
: base(nameof(MessageReceiver) + StringUtility.GetRandomString(), retryPolicy ?? RetryPolicy.Default)
{
this.ReceiveMode = receiveMode;
this.operationTimeout = operationTimeout;
Expand Down Expand Up @@ -133,10 +133,15 @@ public async Task<IList<Message>> ReceiveAsync(int maxMessageCount, TimeSpan ser
{
MessagingEventSource.Log.MessageReceiveStart(this.ClientId, maxMessageCount);

IList<Message> messages;
IList<Message> messages = null;
try
{
messages = await this.OnReceiveAsync(maxMessageCount, serverWaitTime).ConfigureAwait(false);
await this.RetryPolicy.RunOperation(
async () =>
{
messages = await this.OnReceiveAsync(maxMessageCount, serverWaitTime).ConfigureAwait(false);
}, serverWaitTime)
.ConfigureAwait(false);
}
catch (Exception exception)
{
Expand Down Expand Up @@ -166,10 +171,15 @@ public async Task<IList<Message>> ReceiveBySequenceNumberAsync(IEnumerable<long>

MessagingEventSource.Log.MessageReceiveBySequenceNumberStart(this.ClientId, count, sequenceNumbers);

IList<Message> messages;
IList<Message> messages = null;
try
{
messages = await this.OnReceiveBySequenceNumberAsync(sequenceNumbers).ConfigureAwait(false);
await this.RetryPolicy.RunOperation(
async () =>
{
messages = await this.OnReceiveBySequenceNumberAsync(sequenceNumbers).ConfigureAwait(false);
}, this.OperationTimeout)
.ConfigureAwait(false);
}
catch (Exception exception)
{
Expand All @@ -196,7 +206,12 @@ public async Task CompleteAsync(IEnumerable<string> lockTokens)

try
{
await this.OnCompleteAsync(lockTokens).ConfigureAwait(false);
await this.RetryPolicy.RunOperation(
async () =>
{
await this.OnCompleteAsync(lockTokens).ConfigureAwait(false);
}, this.OperationTimeout)
.ConfigureAwait(false);
}
catch (Exception exception)
{
Expand All @@ -214,7 +229,12 @@ public async Task AbandonAsync(string lockToken)
MessagingEventSource.Log.MessageAbandonStart(this.ClientId, 1, lockToken);
try
{
await this.OnAbandonAsync(lockToken).ConfigureAwait(false);
await this.RetryPolicy.RunOperation(
async () =>
{
await this.OnAbandonAsync(lockToken).ConfigureAwait(false);
}, this.OperationTimeout)
.ConfigureAwait(false);
}
catch (Exception exception)
{
Expand All @@ -233,7 +253,12 @@ public async Task DeferAsync(string lockToken)

try
{
await this.OnDeferAsync(lockToken).ConfigureAwait(false);
await this.RetryPolicy.RunOperation(
async () =>
{
await this.OnDeferAsync(lockToken).ConfigureAwait(false);
}, this.OperationTimeout)
.ConfigureAwait(false);
}
catch (Exception exception)
{
Expand All @@ -252,7 +277,12 @@ public async Task DeadLetterAsync(string lockToken)

try
{
await this.OnDeadLetterAsync(lockToken).ConfigureAwait(false);
await this.RetryPolicy.RunOperation(
async () =>
{
await this.OnDeadLetterAsync(lockToken).ConfigureAwait(false);
}, this.OperationTimeout)
.ConfigureAwait(false);
}
catch (Exception exception)
{
Expand All @@ -269,10 +299,15 @@ public async Task<DateTime> RenewLockAsync(string lockToken)

MessagingEventSource.Log.MessageRenewLockStart(this.ClientId, 1, lockToken);

DateTime lockedUntilUtc;
DateTime lockedUntilUtc = DateTime.Now;
try
{
lockedUntilUtc = await this.OnRenewLockAsync(lockToken).ConfigureAwait(false);
await this.RetryPolicy.RunOperation(
async () =>
{
lockedUntilUtc = await this.OnRenewLockAsync(lockToken).ConfigureAwait(false);
}, this.OperationTimeout)
.ConfigureAwait(false);
}
catch (Exception exception)
{
Expand Down Expand Up @@ -320,12 +355,17 @@ public async Task<Message> PeekBySequenceNumberAsync(long fromSequenceNumber)
/// <returns>A batch of messages peeked.</returns>
public async Task<IList<Message>> PeekBySequenceNumberAsync(long fromSequenceNumber, int messageCount)
{
IList<Message> messages;
IList<Message> messages = null;

MessagingEventSource.Log.MessagePeekStart(this.ClientId, fromSequenceNumber, messageCount);
try
{
messages = await this.OnPeekAsync(fromSequenceNumber, messageCount).ConfigureAwait(false);
await this.RetryPolicy.RunOperation(
async () =>
{
messages = await this.OnPeekAsync(fromSequenceNumber, messageCount).ConfigureAwait(false);
}, this.OperationTimeout)
.ConfigureAwait(false);
}
catch (Exception exception)
{
Expand Down
27 changes: 21 additions & 6 deletions src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ internal abstract class MessageSender : ClientEntity, IMessageSender
"StyleCop.CSharp.ReadabilityRules",
"SA1126:PrefixCallsCorrectly",
Justification = "This is not a method call, but a type.")]
protected MessageSender(TimeSpan operationTimeout)
: base(nameof(MessageSender) + StringUtility.GetRandomString())
protected MessageSender(TimeSpan operationTimeout, RetryPolicy retryPolicy)
: base(nameof(MessageSender) + StringUtility.GetRandomString(), retryPolicy ?? RetryPolicy.Default)
{
this.OperationTimeout = operationTimeout;
}
Expand All @@ -35,7 +35,12 @@ public async Task SendAsync(IList<Message> messageList)

try
{
await this.OnSendAsync(messageList).ConfigureAwait(false);
await this.RetryPolicy.RunOperation(
async () =>
{
await this.OnSendAsync(messageList).ConfigureAwait(false);
}, this.OperationTimeout)
.ConfigureAwait(false);
}
catch (Exception exception)
{
Expand Down Expand Up @@ -64,11 +69,16 @@ public async Task<long> ScheduleMessageAsync(Message message, DateTimeOffset sch
message.ScheduledEnqueueTimeUtc = scheduleEnqueueTimeUtc.UtcDateTime;
MessageSender.ValidateMessage(message);
MessagingEventSource.Log.ScheduleMessageStart(this.ClientId, scheduleEnqueueTimeUtc);
long result;
long result = 0;

try
{
result = await this.OnScheduleMessageAsync(message).ConfigureAwait(false);
await this.RetryPolicy.RunOperation(
async () =>
{
result = await this.OnScheduleMessageAsync(message).ConfigureAwait(false);
}, this.OperationTimeout)
.ConfigureAwait(false);
}
catch (Exception exception)
{
Expand All @@ -86,7 +96,12 @@ public async Task CancelScheduledMessageAsync(long sequenceNumber)

try
{
await this.OnCancelScheduledMessageAsync(sequenceNumber).ConfigureAwait(false);
await this.RetryPolicy.RunOperation(
async () =>
{
await this.OnCancelScheduledMessageAsync(sequenceNumber).ConfigureAwait(false);
}, this.OperationTimeout)
.ConfigureAwait(false);
}
catch (Exception exception)
{
Expand Down
4 changes: 2 additions & 2 deletions src/Microsoft.Azure.ServiceBus/MessageSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ namespace Microsoft.Azure.ServiceBus
internal abstract class MessageSession : MessageReceiver, IMessageSession
{
/// <summary>Represents a message session that allows grouping of related messages for processing in a single transaction.</summary>
protected MessageSession(ReceiveMode receiveMode, string sessionId, DateTime lockedUntilUtc, MessageReceiver innerReceiver)
: base(receiveMode, innerReceiver.OperationTimeout)
protected MessageSession(ReceiveMode receiveMode, string sessionId, DateTime lockedUntilUtc, MessageReceiver innerReceiver, RetryPolicy retryPolicy)
: base(receiveMode, innerReceiver.OperationTimeout, retryPolicy)
{
if (innerReceiver == null)
{
Expand Down
15 changes: 15 additions & 0 deletions src/Microsoft.Azure.ServiceBus/MessagingEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -915,5 +915,20 @@ void MessageReceiverPumpRenewMessageException(string clientId, long sequenceNumb
{
this.WriteEvent(76, clientId, sequenceNumber, exception);
}

[NonEvent]
public void RunOperationExceptionEncountered(Exception exception)
{
if (this.IsEnabled())
{
this.RunOperationExceptionEncountered(exception.ToString());
}
}

[Event(77, Level = EventLevel.Warning, Message = "RunOperation encountered an exception and will retry. Exception: {0}")]
void RunOperationExceptionEncountered(string exception)
{
this.WriteEvent(77, exception);
}
}
}
19 changes: 19 additions & 0 deletions src/Microsoft.Azure.ServiceBus/NoRetry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.Azure.ServiceBus
{
using System;

public sealed class NoRetry : RetryPolicy
{
protected override bool OnShouldRetry(
TimeSpan remainingTime,
int currentRetryCount,
out TimeSpan retryInterval)
{
retryInterval = TimeSpan.Zero;
return false;
}
}
}

0 comments on commit 1ad05fd

Please sign in to comment.