This project is read-only.
1

Closed

BasicSendReceive Test fails with Apache Qpid Proton

description

Hi.

I tried running test suite on .NET (not .NETMF) against Apache Qpid Proton "stock" setup.

After having changed TcpTransport.cs a little in order to exclude IPv6addresses (if (ipAddress == null || ipAddress.AddressFamily!=AddressFamily.InterNetwork)), I run TestMethod_BasicSendReceive() test method and, despite I succeed in sending all 200 messages, receiving procedure fails (receiver.Receive() returns null after 60 secs timeout).

Any hints about broker configuration?
Closed Sep 1, 2015 at 2:16 AM by xinchen

comments

maiorfi wrote May 19, 2014 at 10:13 AM

Just tried on ActiveMQ too, with same results.

As an additional info, even though I can see q1 queue created on admin web console, it always shows 0 messages, despite sending procedure runs fine.

Thanks.

xinchen wrote May 19, 2014 at 7:48 PM

Can you add the following in the very begining of the test code and share the debug output:

Trace.TraceLevel = TraceLevel.Frame;
Trace.TraceListener = (f, a) => System.Diagnostics.Trace.WriteLine(string.Format(f, a));

maiorfi wrote May 20, 2014 at 6:52 AM

Here it is:

SEND AMQP 3 1 0 0
SEND sasl-init(mechanism:PLAIN,initial-response:006775657374006775657374,hostname:localhost)
RECV AMQP 3 1 0 0
RECV sasl-mechanisms(sasl-server-mechanisms:System.Object[])
RECV sasl-outcome(code:0)
RECV AMQP 0 1 0 0
SEND AMQP 0 1.0.0
SEND (ch=0) open(container-id:2331d2bc-c4d3-488d-99f6-7bafb5cfbe41,host-name:localhost,max-frame-size:16384,channel-max:3)
SEND (ch=0) begin(next-outgoing-id:4294967293,incoming-window:2048,outgoing-window:2048,handle-max:7)
SEND (ch=0) attach(name:send-link,handle:0,role:False,source:source(),target:source(address:q1))
RECV (ch=0) open(container-id:e7a5be8a-8f3b-4150-b2a6-e946c6c26c45,max-frame-size:32768,channel-max:3,properties:Amqp.Types.Map)
RECV (ch=0) begin(remote-channel:0,next-outgoing-id:0,incoming-window:2048,outgoing-window:2048)
SEND (ch=0) attach(name:receive-link,handle:1,role:True,source:source(address:q1),target:source())
SEND (ch=0) flow(next-in-id:0,in-window:2048,next-out-id:4294967293,out-window:2048,handle:1,delivery-count:0,link-credit:50)
RECV (ch=0) attach(name:send-link,handle:0,role:True,source:source())
RECV (ch=0) detach(handle:0,error:error(condition:amqp:not-found))
SEND (ch=0) detach(handle:0,closed:True)
RECV (ch=0) attach(name:receive-link,handle:1,role:False,target:source(),initial-delivery-count:0)
RECV (ch=0) detach(handle:1,error:error(condition:amqp:not-found))
SEND (ch=0) detach(handle:1,closed:True)

ppatierno wrote May 20, 2014 at 7:33 AM

It seems to be a problem to access node "q1" (your queue). I have never played with Qpid Proton Java broker but this link could be useful (even if it talks abut Qpid Proton C++ broker) :

https://issues.apache.org/jira/browse/QPID-5299

You are sure that the node "q1" exists on the broker but are there some access rights problems ? Are you using user and password to access to the queue and is it configured to be accessed from this user ?

Paolo.

ppatierno wrote May 20, 2014 at 7:41 AM

The QPID 5299 issue seems to be fixed in the Qpid 0.26 version...only for C++ version ?
However, it seems to be the problem : "amqp:not-found" is returned if node doesn't exist (not your case) or ACL problem on access rights.
I don't think that the problem is in the library.

Paolo.

maiorfi wrote May 20, 2014 at 10:54 AM

Looking at ACL files for user I'm using (whose username is "client") it seems that everything is ok on that side:

Client side

Allow the 'client' user to publish requests to the request queue and create, consume from, and delete temporary reply queues.

ACL ALLOW-LOG client CREATE QUEUE temporary="true"
ACL ALLOW-LOG client CONSUME QUEUE temporary="true"
ACL ALLOW-LOG client DELETE QUEUE temporary="true"
ACL ALLOW-LOG client BIND EXCHANGE name="amq.direct" temporary="true"
ACL ALLOW-LOG client UNBIND EXCHANGE name="amq.direct" temporary="true"
ACL ALLOW-LOG client PUBLISH EXCHANGE name="amq.direct" routingKey="example.RequestQueue"

ppatierno wrote May 20, 2014 at 11:45 AM

I see that there are concept like "exchange" and "binding" ... they are related to AMQP 0.9.1 and don't exist anymore in AMQP 1.0 ... could be it the problem ? It's also true that "protol header" exchanged are rights and 1.0.0 version for both peers.

With AMQP 0.9.1, you talk with "exchange" and not with nodes/queues ... you have a direct exchange.
Have you tried to use "amq.direct" instead of "q1" when you create your Sender and Receiver Links ?
It's also true that you need to set a "routingKey" property inside the message with value "example.RequestQueue" and then exchange routes message to the queue using related binding.

Paolo.

ppatierno wrote May 20, 2014 at 1:23 PM

I have installed Qpid Proton Java Broker and configured the direct exchange "amq.direct", a bind and a queue ("myqueue" is binding to "amq.direct" using routingkey "myqueuekey").
The amqp:notfound error was due to not existing queue or exchange.

Now for me...
The ReceiverLink connection works fine (on exchange and on queue).
The SenderLink connection fails ...

With ServiceBus the right flow is the following :

SEND AMQP 3 1 0 0
SEND sasl-init(mechanism:PLAIN,initial-response:006F776E657200657079677642637A49674C546F7554772B756850327733457369334E47356857704D694F565045703058383D,hostname:amqpnetlitetest.servicebus.windows.net)
RECV AMQP 3 1 0 0
RECV sasl-mechanisms(sasl-server-mechanisms:System.Object[])
RECV sasl-outcome(code:0,additional-data:57656C636F6D6521)
SEND AMQP 0 1.0.0
SEND (ch=0) open(container-id:cc8228ec-5d3c-4ea5-972d-3c2b82ba889c,host-name:amqpnetlitetest.servicebus.windows.net,max-frame-size:16384,channel-max:3)
SEND (ch=0) begin(next-outgoing-id:4294967293,incoming-window:2048,outgoing-window:2048,handle-max:7)
RECV AMQP 0 1 0 0
RECV (ch=0) open(container-id:e2d1ea145d144e3ba3682e0adb69e248_G11,max-frame-size:16384,channel-max:3,idle-time-out:240000)
RECV (ch=0) begin(remote-channel:0,next-outgoing-id:1,incoming-window:2048,outgoing-window:2048,handle-max:7)
SEND (ch=0) attach(name:send-link,handle:0,role:False,source:source(),target:source(address:q1))
RECV (ch=0) attach(name:send-link,handle:0,role:True,source:source(),target:source(address:q1),max-message-size:262144,properties:Amqp.Types.Map)
RECV (ch=0) flow(next-in-id:4294967293,in-window:2048,next-out-id:1,out-window:2048,handle:0,delivery-count:0,link-credit:50,available:0,echo:False)

After "attach" we receive "flow" message.
If I create a SenderLink to Qpid Proton the flow is :

SEND AMQP 3 1 0 0
SEND sasl-init(mechanism:PLAIN,initial-response:0061646D696E0061646D696E,hostname:localhost)
RECV AMQP 3 1 0 0
RECV sasl-mechanisms(sasl-server-mechanisms:System.Object[])
RECV sasl-outcome(code:0)
RECV AMQP 0 1 0 0
SEND AMQP 0 1.0.0
SEND (ch=0) open(container-id:826bc66a-61a1-42a9-a3e9-18942bddd38b,host-name:localhost,max-frame-size:16384,channel-max:3)
SEND (ch=0) begin(next-outgoing-id:4294967293,incoming-window:2048,outgoing-window:2048,handle-max:7)
RECV (ch=0) open(container-id:cf2bbc72-0962-48a0-bebe-83cc9b4ec181,max-frame-size:32768,channel-max:3,properties:Amqp.Types.Map)
RECV (ch=0) begin(remote-channel:0,next-outgoing-id:0,incoming-window:2048,outgoing-window:2048)
SEND (ch=0) attach(name:send-link,handle:0,role:False,source:source(),target:source(address:amq.direct))
RECV (ch=0) attach(name:send-link,handle:0,role:True,source:source(),target:source(address:amq.direct))
RECV (ch=0) close(error:error(condition:amqp:connection:forced,description:java.lang.NullPointerException))

Insted of "flow" I receive a "close".

Following console log of Qpid Proton ...

00 53 12 c0 26 07 a1 09 73 65 6e 64 2d 6c 69 6e 6b
43 42 40 40 00 53 28 45 00 53 29 c0 0d 01 a1 0a 61
6d 71 2e 64 69 72 65 63 74
java.lang.NullPointerException
    at org.apache.qpid.amqp_1_0.type.UnsignedInteger.add(UnsignedInteger.jav
a:124)
    at org.apache.qpid.amqp_1_0.transport.LinkEndpoint.sendFlow(LinkEndpoint
.java:474)
    at org.apache.qpid.amqp_1_0.transport.LinkEndpoint.sendFlow(LinkEndpoint
.java:463)
    at org.apache.qpid.amqp_1_0.transport.LinkEndpoint.sendFlowConditional(L
inkEndpoint.java:445)
    at org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint.setCreditWin
dow(ReceivingLinkEndpoint.java:280)
    at org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint.setCreditWin
dow(ReceivingLinkEndpoint.java:274)
    at org.apache.qpid.server.protocol.v1_0.ReceivingLink_1_0.start(Receivin
gLink_1_0.java:260)
    at org.apache.qpid.server.protocol.v1_0.Session_1_0.remoteLinkCreation(S
ession_1_0.java:317)
    at org.apache.qpid.amqp_1_0.transport.SessionEndpoint.receiveAttach(Sess
ionEndpoint.java:254)
    at org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint.receiveAttach(C
onnectionEndpoint.java:582)
    at org.apache.qpid.amqp_1_0.type.transport.Attach.invoke(Attach.java:351
)
    at org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint.receive(Connect
ionEndpoint.java:717)
    at org.apache.qpid.amqp_1_0.framing.FrameHandler.parse(FrameHandler.java
:242)
    at org.apache.qpid.server.protocol.v1_0.ProtocolEngine_1_0_0_SASL.receiv
ed(ProtocolEngine_1_0_0_SASL.java:350)
    at org.apache.qpid.server.protocol.v1_0.ProtocolEngine_1_0_0_SASL.receiv
ed(ProtocolEngine_1_0_0_SASL.java:58)
    at org.apache.qpid.server.protocol.MultiVersionProtocolEngine.received(M
ultiVersionProtocolEngine.java:131)
    at org.apache.qpid.server.protocol.MultiVersionProtocolEngine.received(M
ultiVersionProtocolEngine.java:47)
    at org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:1
61)
    at java.lang.Thread.run(Unknown Source)
java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(Unknown Source)
    at java.net.SocketInputStream.read(Unknown Source)
    at org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:1
56)
    at java.lang.Thread.run(Unknown Source)
I hope that AMQP.Net Lite team can help you (maiorfi) and me ! ;-)

xinchen wrote May 26, 2014 at 12:37 AM

I fixed one issue in sender link. Now it should work with the Qpid Java broker. I ran the basic send and receive test case against the latest Qpid Java broker and it passed.

ppatierno wrote May 26, 2014 at 7:54 AM

Great ! This weekend I saw that the problem was on initial delivery count for SenderLink.

Thanks, Xin Chen !
Paolo.

ppatierno wrote May 26, 2014 at 7:55 AM

The important thing to add is that even if Qpid Proton Java broker has the concepts of exchange and binding in its web user interface, to use AMQP.Net Lite we can simply create a queue and avoid to create related binding for amq.direct exchange.

Paolo.

maiorfi wrote May 26, 2014 at 9:48 AM

Now it seems to work much better, despite I still have some subtle issues, maybe due to async management of protocol messages.

Specifically, whenever I run a basic send test of .NET library (targeting QPid Proton Java broker), I can see messages actually enqueued only if I Thread.Sleep about 100 msecs after each Send method call.

I.e., this way no messages are enqueued:
Connection connection = new Connection(address);
            Session session = new Session(connection);

            SenderLink sender = new SenderLink(session, "send-link", "q1");

            for (int i = 0; i < 5; ++i)
            {
                Message message = new Message();
                message.Properties = new Properties() { GroupId = "abcdefg" };
                message.ApplicationProperties = new ApplicationProperties();
                message.ApplicationProperties["sn"] = i;
                sender.Send(message, null, null);
            }

sender.Close();

            session.Close();
            connection.Close();
while this way test works as expected:
Connection connection = new Connection(address);
            Session session = new Session(connection);

            SenderLink sender = new SenderLink(session, "send-link", "q1");

            for (int i = 0; i < 5; ++i)
            {
                Message message = new Message();
                message.Properties = new Properties() { GroupId = "abcdefg" };
                message.ApplicationProperties = new ApplicationProperties();
                message.ApplicationProperties["sn"] = i;
                sender.Send(message, null, null);

                Thread.Sleep(100);
            }

sender.Close();

            session.Close();
            connection.Close();
Any idea?

Thanks!

ppatierno wrote May 26, 2014 at 10:19 AM

For me works fine with original 200 messages loop !
Can you execute test in debug mode (however without breakpoints) and get the debug messages from Output window ?
As xichen said you have to set :

Trace.TraceLevel = TraceLevel.Frame;
Trace.TraceListener = (f, a) => System.Diagnostics.Trace.WriteLine(string.Format(f, a));

I put above code in a TestInitialize() method like this :

if !NETMF

    [TestInitialize]

endif

    public void TestInitialize()
    {
        Trace.TraceLevel = TraceLevel.Verbose | TraceLevel.Frame;
        Trace.TraceListener = (f, a) => System.Diagnostics.Trace.WriteLine(System.String.Format(f, a));
    }
Paolo

maiorfi wrote May 26, 2014 at 1:37 PM

It was an issue depending on test procedure ending before messages was actually sent :)

With this code now I get roughly 10000 messages sent/s (with Java QPid Proton Broker running on dev machine)!
AutoResetEvent are = new AutoResetEvent(false);
            int counter = 0;

            const int MESSAGE_COUNT = 100000;

            for (int i = 0; i < MESSAGE_COUNT; ++i)
            {
                Message message = new Message(DateTime.Now);
                message.Properties = new Properties() { GroupId = "abcdefg" };
                message.ApplicationProperties = new ApplicationProperties();
                message.ApplicationProperties["sn"] = i;

                sender.Send(
                    message,
                    (msg, outcome, state) => { if (Interlocked.Increment(ref counter) == MESSAGE_COUNT) are.Set(); },
                    null);

            }

            are.WaitOne();

ppatierno wrote May 26, 2014 at 2:06 PM

How is it possibile that the test procedure ended before all messages sent ?

After sender loop, the receiver loop is syncrhonous and it ends when all messages are received. Only after this loop all Close() methods are called and the test ends.

Am I wrong ?

Paolo.

maiorfi wrote May 26, 2014 at 2:52 PM

I splitted test procedure in 2 parts (sending and receiving) in order to have a way to inspect enqueued messages through admin broker interface :)

ppatierno wrote May 26, 2014 at 3:34 PM

So you splitted the single test method in two test methods, one for sending and another test method for receiving and in this case the test method for sendigs ended before all messages were sent due to asynchronous nature of AMQP.Net Lite library to send messages ! :-)

maiorfi wrote May 26, 2014 at 3:34 PM

...after having splitted basic test procedure in 2 parts (send and receive), I realized that it seems hard to succeed in receiving more than roughly 2000 messages: I have a queue with >100000 messages and after having received a couple of thousands of them, receiver.Receive() returns a null message after a 60 secs timeout.

xinchen wrote May 26, 2014 at 6:36 PM

RE: async send
My plan was to block session.Close() until all pending operations are completed (or a default timeout) but this hasn't been done yet.

I will investigate the receive issue with more messages.

xinchen wrote May 30, 2014 at 6:39 AM

With commit 41ed29b66c0f I was able to receive all messages from the queue in Qpid Java broker.

xinchen wrote May 30, 2014 at 7:34 AM

Let me know if you are still seeing issues. Thanks a lot!

maiorfi wrote May 30, 2014 at 9:56 AM

Great!Now it works perfectly.

Just for statistics, sending and receiving 100000 messages required less than 20 secs on java qpid proton broker on localhost.

Thanks!

ppatierno wrote May 30, 2014 at 10:40 AM

Great ! Thanks !