1

Closed

Race condition on send/receive AMQP header

description

Hi,
I think I found an interesting race condition ...

I'm using AMQP.Net Lite library as client and ActiveMQ as broker on my PC. In this situation all works fine.
The client and broker execute SASL handshake and then AMQP header exchange. The trace is the following ..

[02:46.457] SEND AMQP 3 1 0 0
[02:46.469] SEND sasl-init(mechanism:PLAIN,initial-response:0061646D696E0061646D696E,hostname:192.168.1.103)
[02:46.471] RECV AMQP 3 1 0 0
[02:46.475] RECV sasl-mechanisms(sasl-server-mechanisms:[ANONYMOUS,PLAIN])
[02:46.476] RECV sasl-outcome(code:0)
[02:46.999] SEND AMQP 0 1.0.0
[02:47.001] SEND (ch=0) open(container-id:4d295088-20fa-4cf3-b0f3-f20d77e94c74,host-name:192.168.1.103,max-frame-size:16384,channel-max:3)
[02:47.511] RECV AMQP 0 1 0 0

The first part (related to SASL handshake) has send and receive operations executed in the same thread in a synchronous way ... send and then wait to receive from broker.
The second part related to AMQP header exchange happens in two distinct threads...

The SendHeader() method in the Connection class is executed in the AddSession() method of same class inside a lock (on ThisLock) and when the state is State.Start. After header and open frames sent, the state is State.OpenPipe. The AddSession() method is executed when we created a session associated to a connection. All these operations are executed inside main/application thread.
lock (this.ThisLock)
{
       this.ThrowIfClosed("AddSession");
       if (this.state == State.Start)
       {
            this.SendHeader();
            this.SendOpen();
            this.state = State.OpenPipe;
       }
The OnHeader() method is executed inside PumpThread for receiving frames from broker. Inside this method there is the same lock above (on ThisLock). Here, if state is State.OpenPipe (so the header and open frames are already sent by client) all works fine otherwise an exception of illegal state is raised.
lock (this.ThisLock)
{
     if (this.state == State.OpenPipe)
     {
           this.state = State.OpenSent;
     }
     else
     {
           throw new AmqpException(ErrorCode.IllegalState,
               Fx.Format(SRAmqp.AmqpIllegalOperationState, "OnHeader", this.state));
      }
Here the race condition ! It can happen that the PumpThread enters the lock before we execute AddSession() to send header and open frames from the client. In this case the state isn't State.OpenPipe and the exception is raised.

On my PC this situation doesn't happen but I'm trying the library also with .Net Micro Framework on a FEZ Hydra (.Net Gadgeteer) board and due to different thread scheduling and timing execution I have always the exception. PumpThread enters always the lock before the main thread.

However, I think that there is a problem.
We need to send AMQP header and open frames before receiving AMQP header from broker. Obviously, client and broker send the AMQP header at the same time (each of them doesn't wait for receiving the AMQP header from the other peer). The problem is the internal state machine where a change from State.Start to State.OpenPipe is needed before read AMQP header from broker.

NOTE : to be more precise with tracing inside OnHeader() method I moved trace line inside the lock ...
void OnHeader(ProtocolHeader header)
{
     //Trace.WriteLine(TraceLevel.Frame, "RECV AMQP {0}", header);
     lock (this.ThisLock)
     {
         Trace.WriteLine(TraceLevel.Frame, "RECV AMQP {0}", header);
With Trace.WriteLine outside lock a false tracing can happen. The OnHeader() method is called in PumpThread that writes trace line but it could be blocked on lock because the AddSession() is executing. In this case we could see the "RECV AMQP ... " trace line before the "SEND AMQP ... " but the received header isn't processed to advance the state machine from State.OpenPipe to State.OpenSent.

Paolo.
Closed Sep 1, 2015 at 1:14 AM by xinchen

comments

ppatierno wrote Jun 21, 2014 at 1:49 PM

Hi,

I think I fixed the bug reading better the AMQP 1.0 specification on page 43 about Connection State Diagram.
The library defines the HeaderSent, HeaderReceived and HeaderExchanged states but never use them !
They are important in the situation I have where AMQP header arrives before we send it to the broker.

I have changed the AddSession() and OnHeader() methods to consider all state about header exchange.

I want to test them better or tell me if you want already my code changes to evaluate them.

I'm happy to help to improve this library !

Paolo.

xinchen wrote Jun 23, 2014 at 5:59 PM

Exellent find! My idea was to get into OpenPipe mode before starting the pump so we have less states to handle. But opening the connection (sending protocol header and open frame) was missing after the transport is established.

Thank you very much for helping us improve this library.

Xin

ppatierno wrote Jun 23, 2014 at 7:45 PM

I think that it is better to have a complete state machine.

I changed AddSession() (Connection class) in the following way :
if ((this.state == State.Start) || (this.state == State.HeaderReceived))
{
         this.SendHeader();

         if (this.state == State.Start)
               this.state = State.HeaderSent;
         else if (this.state == State.HeaderReceived)
               this.state = State.HeaderExchanged;
 
         this.SendOpen();
         if (this.state == State.HeaderSent)
               this.state = State.OpenPipe;
         else if (this.state == State.HeaderExchanged)
               this.state = State.OpenSent;
}
and the OnHeader() method (Connection class) in the following way :
if (this.state == State.Start)
{
      this.state = State.HeaderReceived;
}
else if (this.state == State.HeaderSent)
{
      this.state = State.HeaderExchanged;
}
else if (this.state == State.OpenPipe)
{
      this.state = State.OpenSent;
}
else
{
      throw new AmqpException(ErrorCode.IllegalState,
              Fx.Format(SRAmqp.AmqpIllegalOperationState, "OnHeader", this.state));
}
The unit tests works fine even if I want to test these changes better.
Can you evaluate if these changes are good or there is a better solution ?

It's a pleasure to help to improve the library !

Paolo.

ppatierno wrote Jun 25, 2014 at 3:20 PM

Hi,
I saw that a new commit was done on fixing this issue.
I haven't tried it yet but viewing the code (statically) I don't understand how it can fix the race condition.

As previous version, it seems that if OnHeader() method is called before AddSession(), the exeption is thrown because state isn't State.OpenPipe or State.OpenClosePipe but it is State.Start.

Paolo.

ppatierno wrote Jun 26, 2014 at 6:54 AM

Hi,
I understood ... I didn't focus on OnConnect() method.
There, you execute StartIfNeeded() (so send Header and Open) before starting the Pump thread. In this way it's impossibile that a Header message is received before the client sends its Header message :-)

Paolo.

xinchen wrote Jun 27, 2014 at 6:48 AM

Yes that fixed the state machine issue. More importantly, OnConnection should open the connection by sending the protocol header and the open frame. It should not be triggered by AddSession. AddSession also needs this check because it cannot send begin frame without opening the connection.

Xin

ppatierno wrote Jun 27, 2014 at 7:17 AM

Ok....
Just one point ... calling StartIfNeeded() inside AddSession() is "defensive programming", because AddSession() method can't be called without a Connection object and if it is already created, the OnConnect() method is called so header and open messages are sent.

However, why don't you want to handle the entire state machine with HeaderSent, HeaderReceived and HeaderExchanged messages ?

Paolo.

xinchen wrote Jun 29, 2014 at 6:12 AM

If the pump is started as soon as the transported is connected, I would need to handle these state transitions. I think the connection state machine is a bit over complicated given the protocol header for version negotiation, so I chose to go with something simpler.