Skip to content

Commit

Permalink
[#576] AmqpSettings IdleTimeout error
Browse files Browse the repository at this point in the history
  • Loading branch information
xinchen10 committed Dec 27, 2023
1 parent 3e42f0c commit 54554af
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 15 deletions.
24 changes: 21 additions & 3 deletions src/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ internal void Init(IBufferManager bufferManager, AmqpSettings amqpSettings, IAsy
this.writer = new TransportWriter(transport, this.OnIoException);

// after getting the transport, move state to open pipe before starting the pump
uint idleTimeout = 0;
if (open == null)
{
open = new Open()
Expand All @@ -240,13 +241,30 @@ internal void Init(IBufferManager bufferManager, AmqpSettings amqpSettings, IAsy
HostName = amqpSettings.HostName ?? this.address.Host,
ChannelMax = this.channelMax,
MaxFrameSize = this.maxFrameSize,
IdleTimeOut = (uint)amqpSettings.IdleTimeout / 2
};

if (amqpSettings.IdleTimeout != -1)
{
idleTimeout = (uint)amqpSettings.IdleTimeout;
open.IdleTimeOut = idleTimeout / 2;
}
}
else if (open.HasField(4))
{
idleTimeout = open.IdleTimeOut;
if (idleTimeout > (uint.MaxValue / 2))
{
idleTimeout = uint.MaxValue;
}
else
{
idleTimeout *= 2;
}
}

if (open.IdleTimeOut > 0)
if (idleTimeout > 0)
{
this.heartBeat = new HeartBeat(this, open.IdleTimeOut * 2);
this.heartBeat = new HeartBeat(this, idleTimeout);
}

this.SendHeader();
Expand Down
3 changes: 2 additions & 1 deletion src/Framing/Open.cs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ public override string ToString()
return this.GetDebugString(
"open",
new object[] { "container-id", "host-name", "max-frame-size", "channel-max", "idle-time-out", "outgoing-locales", "incoming-locales", "offered-capabilities", "desired-capabilities", "properties" },
new object[] {containerId, hostName, maxFrameSize, channelMax, idleTimeOut, outgoingLocales, incomingLocales, offeredCapabilities, desiredCapabilities, properties});
new object[] { containerId, hostName, maxFrameSize, channelMax, idleTimeOut, outgoingLocales, incomingLocales, offeredCapabilities, desiredCapabilities, properties }
);
}
#endif
}
Expand Down
8 changes: 6 additions & 2 deletions src/Net/AmqpSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,13 @@ public int MaxLinksPerSession
}

/// <summary>
/// Gets or sets the connection idle timeout. Half the value is set
/// as the value of open.idle-time-out field.
/// Gets or sets the connection idle timeout. One-half of the value is set
/// as the value of the open.idle-time-out field.
/// </summary>
/// <remarks>
/// A value of -1 means infinite (no idle timeout). Other negative values are
/// allowed but not recommended. They will be processed as a uint value instead.
/// </remarks>
public int IdleTimeout
{
get;
Expand Down
2 changes: 1 addition & 1 deletion src/Net/ConnectionFactoryBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected ConnectionFactoryBase()
{
MaxFrameSize = (int)Connection.DefaultMaxFrameSize,
ContainerId = Connection.MakeAmqpContainerId(),
IdleTimeout = int.MaxValue,
IdleTimeout = -1,
MaxSessionsPerConnection = Connection.DefaultMaxSessions,
MaxLinksPerSession = Connection.DefaultMaxLinksPerSession
};
Expand Down
79 changes: 71 additions & 8 deletions test/Common/ProtocolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public void RemoteLinkHandleTest()
}

[TestMethod]
public void ConnectionRemoteIdleTimeoutTest()
public void ConnectionIdleTimeoutRemoteTest()
{
ManualResetEvent received = new ManualResetEvent(false);

Expand All @@ -313,15 +313,15 @@ public void ConnectionRemoteIdleTimeoutTest()
return TestOutcome.Continue;
});

string testName = "ConnectionRemoteIdleTimeoutTest";
string testName = "ConnectionIdleTimeoutRemoteTest";

Trace.WriteLine(TraceLevel.Information, "sync test");
{
Connection connection = new Connection(this.address);
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "sender-" + testName, "any");
sender.Send(new Message("test") { Properties = new Properties() { MessageId = testName } });
var h = connection.GetType().GetField("heartBeat", BindingFlags.NonPublic | BindingFlags.Instance);
var h = connection.GetType().GetField("heartBeat", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(connection);
Assert.IsTrue(h != null, "heart beat is not initialized");
Assert.IsTrue(received.WaitOne(5000), "Heartbeat not received");
connection.Close();
Expand All @@ -336,7 +336,7 @@ public void ConnectionRemoteIdleTimeoutTest()
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "sender-" + testName, "any");
await sender.SendAsync(new Message("test") { Properties = new Properties() { MessageId = testName } });
var h = connection.GetType().GetField("heartBeat", BindingFlags.NonPublic | BindingFlags.Instance);
var h = connection.GetType().GetField("heartBeat", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(connection);
Assert.IsTrue(h != null, "heart beat is not initialized");
await Task.Yield();
Assert.IsTrue(received.WaitOne(5000), "Heartbeat not received");
Expand All @@ -346,9 +346,9 @@ public void ConnectionRemoteIdleTimeoutTest()
}

[TestMethod]
public void ConnectionLocalIdleTimeoutTest()
public void ConnectionIdleTimeoutLocalTest()
{
string testName = "ConnectionLocalIdleTimeoutTest";
string testName = "ConnectionIdleTimeoutLocalTest";

Trace.WriteLine(TraceLevel.Information, "sync test");
{
Expand All @@ -357,7 +357,7 @@ public void ConnectionLocalIdleTimeoutTest()
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "sender-" + testName, "any");
sender.Send(new Message("test") { Properties = new Properties() { MessageId = testName } });
var h = connection.GetType().GetField("heartBeat", BindingFlags.NonPublic | BindingFlags.Instance);
var h = connection.GetType().GetField("heartBeat", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(connection);
Assert.IsTrue(h != null, "heart beat is not initialized");
Thread.Sleep(600);
Assert.IsTrue(!connection.IsClosed, "connection should not be closed");
Expand All @@ -376,13 +376,76 @@ public void ConnectionLocalIdleTimeoutTest()
SenderLink sender = new SenderLink(session, "sender-" + testName, "any");
await sender.SendAsync(new Message("test") { Properties = new Properties() { MessageId = testName } });
await Task.Delay(1200);
var h = connection.GetType().GetField("heartBeat", BindingFlags.NonPublic | BindingFlags.Instance);
var h = connection.GetType().GetField("heartBeat", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(connection);
Assert.IsTrue(h != null, "heart beat is not initialized");
Assert.IsTrue(connection.IsClosed, "connection not closed");
}).Unwrap().GetAwaiter().GetResult();
#endif
}

[TestMethod]
public void ConnectionIdleTimeoutNoHeartbeatTest()
{
string testName = "ConnectionIdleTimeoutNoHeartbeatTest";

Trace.WriteLine(TraceLevel.Information, "sync test");
{
Open open = new Open() { ContainerId = testName, HostName = "localhost" };
Connection connection = new Connection(this.address, null, open, null);
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "sender-" + testName, "any");
sender.Send(new Message("test") { Properties = new Properties() { MessageId = testName } });
var h = connection.GetType().GetField("heartBeat", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(connection);
Assert.IsTrue(h == null, "heart beat should not be initialized");
connection.Close();
}

#if !NETFX40
Trace.WriteLine(TraceLevel.Information, "async test");
Task.Factory.StartNew(async () =>
{
ConnectionFactory factory = new ConnectionFactory();
factory.AMQP.IdleTimeout = -1;
Connection connection = await factory.CreateAsync(this.address);
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "sender-" + testName, "any");
await sender.SendAsync(new Message("test") { Properties = new Properties() { MessageId = testName } });
var h = connection.GetType().GetField("heartBeat", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(connection);
Assert.IsTrue(h == null, "heart beat should not be initialized");
await connection.CloseAsync();
}).Unwrap().GetAwaiter().GetResult();
#endif
}

#if !NETFX40
[TestMethod]
public void ConnectionIdleTimeoutNegativeValueTest()
{
string testName = "ConnectionIdleTimeoutNegativeValueTest";
uint? timeout = null;
int value = -1000;

this.testListener.RegisterTarget(TestPoint.Open, (stream, channel, fields) =>
{
timeout = (uint)fields[4];
return TestOutcome.Continue;
});

Task.Factory.StartNew(async () =>
{
ConnectionFactory factory = new ConnectionFactory();
factory.AMQP.IdleTimeout = value;
Connection connection = await factory.CreateAsync(this.address);
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "sender-" + testName, "any");
await sender.SendAsync(new Message("test") { Properties = new Properties() { MessageId = testName } });
await connection.CloseAsync();
}).Unwrap().GetAwaiter().GetResult();

Assert.IsTrue(timeout == (uint)value / 2);
}
#endif

[TestMethod]
public void ConnectionHeartBeatCloseTimeoutTest()
{
Expand Down

0 comments on commit 54554af

Please sign in to comment.