Using Amqp.Net Lite with Azure Service Bus Event Hubs

The Azure Service Bus Event Hub supports the standard AMQP 1.0 protocol. The only thing specific to the Event Hub is the addressing scheme and message properties (the following assumes you created an Event Hub named "myeventhub").
  • Addressing
    • Send to event hub: attach.target.address = "myeventhub"
    • Send to event hub partition "0": attach.target.address = "myeventhub/Partitions/0"
    • Send to a publisher endpoint: attach.target.address = "myeventhub/Publishers/device1"
    • Receive from the default consumer group of event hub partition "0": attach.source.address = "myeventhub/ConsumerGroups/$default/Partitions/0"
  • Message properties: in addition to the standard AMQP message properties, the Event Hub defines the following properties stored in the message annotations section (the keys are AMQP symbols). Only the partition key property can be set by the sender.
    • “x-opt-partition-key”: (string) specifies a partition key used for message partitioning. When the target address is the event hub name, this value is used to compute a hash to select a partition to send the message to.
    • “x-opt-offset”: (string) specifies an opaque pointer of the message in the event stream.
    • “x-opt-sequence-number”: (long) specifies the sequence number of the message in the event stream.
    • “x-opt-enqueued-time”: (timestamp) specifies when the message was enqueued.
    • “x-opt-publisher”: (string) specifies the publisher name if the message was sent to a publisher endpoint.
  • Filter: specifies the position (in attach.source.filter-set) where the receiver wants to start receiving.
    • key: symbol(“apache.org:selector-filter:string”)
    • value: a described string: descriptor=symbol(“apache.org:selector-filter:string”), value is an expression.
”amqp.annotation.x-opt-offset > '100'”: start from message at offset 100 exclusively
”amqp.annotation.x-opt-offset >= '100'”: start from message at offset 100 inclusively
”amqp.annotation.x-opt-enqueuedtimeutc > 1234567”: the number is an AMQP timestamp.

Sample List

Azure Service Bus Event Hubs

Azure Service Bus Event Hub provides cloud-scale telemetry Event ingestion from web sites, apps, and devices. More information: http://azure.microsoft.com/en-us/services/event-hubs/

Sending message to Event Hubs


Sending to specific partition

// eventhub host, e.g. amqps://RootManageSharedAccessKey:[password]@[yournanmespace].servicebus.windows.net
static async Task SendToPartition(string eventhubHost, string eventhubname)
{
    // create address
    var address = new Address(eventhubHost);

    // create connection
    var connection = await address.ConnectAsync();

    // create session
    var session = new Session(connection);

    // partitions here can be update to any partition id in eventhub.
    var sendLink = new SenderLink(
        session,
        "send-link:" + eventhubname,
        string.Format(CultureInfo.InvariantCulture, "{0}/Partitions/0", eventhubname));

    // construct message
    var messageValue = Encoding.UTF8.GetBytes("i am a message.");

    // here, AMQP supports 3 types of body, here we use Data.
    for (int i = 0; i < 10; i++)
    {
        var message = new Message() { BodySection = new Data() { Binary = messageValue } };

        await sendLink.SendAsync(message);

        await Task.Delay(1000);
    }

    await connection.CloseAsync();
}


Sending to publisher

static async Task SendToPubliser(string eventhubHost, string eventhubname)
{
    // create address
    var address = new Address(eventhubHost);

    // create connection
    var connection = await address.ConnectAsync();

    // create session
    var session = new Session(connection);

    // device1 here is the publisher. 
    // alternatively, you could choose to assign the annotation like this:
    // message.MessageAnnotations = new MessageAnnotations();
    // message.MessageAnnotations[new Symbol("x-opt-publisher")] = "device1";
    var sendLink = new SenderLink(
        session,
        "send-link:" + eventhubname,
        string.Format(CultureInfo.InvariantCulture, "{0}/Publishers/device1", eventhubname));

    // construct message
    var messageValue = Encoding.UTF8.GetBytes("i am a message.");

    // here, AMQP supports 3 types of body, here we use Data.
    var message = new Message() { BodySection = new Data() { Binary = messageValue } };

    await sendLink.SendAsync(message);

    await connection.CloseAsync();
}


Sending with Partition Key

static async Task SendWithPartitionKey(string eventhubHost, string eventhubname)
{
    // create address
    var address = new Address(eventhubHost);

    // create connection
    var connection = await address.ConnectAsync();

    // create session
    var session = new Session(connection);

    var sendLink = new SenderLink(
        session,
        "send-link:" + eventhubname,
        eventhubname);

    // construct message
    var messageValue = Encoding.UTF8.GetBytes("i am a message.");

    // here, AMQP supports 3 types of body, here we use Data.
    var message = new Message() { BodySection = new Data() { Binary = messageValue } };
    message.MessageAnnotations = new MessageAnnotations();
    message.MessageAnnotations[new Symbol("x-opt-partition-key")] = "partitionkey";

    await sendLink.SendAsync(message);

    await connection.CloseAsync();
}


Sending without partition info

static async Task SendWithoutPartition(string eventhubHost, string eventhubname)
{
    // create address
    var address = new Address(eventhubHost);

    // create connection
    var connection = await address.ConnectAsync();

    // create session
    var session = new Session(connection);

    // if no partition id is presented, the message distribution will be managed by Event Hub
    var sendLink = new SenderLink(
        session,
        "send-link:" + eventhubname,
        eventhubname);

    // construct message
    var messageValue = Encoding.UTF8.GetBytes("i am a message.");

    // here, AMQP supports 3 types of body, here we use Data.
    var message = new Message() { BodySection = new Data() { Binary = messageValue } };

    await sendLink.SendAsync(message);

    await connection.CloseAsync();
}


Receive from a partition

static async Task ReceiveFromPartition(string eventhubHost, string eventhubname, string partitionName)
{
    // create address
    var address = new Address(eventhubHost);

    // create connection
    var connection = await address.ConnectAsync();

    // create session
    var session = new Session(connection);

    // create a receiver from the default consumer group of the partition
    var receiveLink = new ReceiverLink(
        session,
        "receive-link:" + eventhubname,
        eventhubname + "/ConsumerGroups/$default/Partitions/" + partitionName);
    var message = await receiveLink.ReceiveAsync();
    var offset = message.MessageAnnotations[new Symbol("x-opt-offset")];
    var seqNumber = message.MessageAnnotations[new Symbol("x-opt-sequence-number")];
    var enqueuedTime = message.MessageAnnotations[new Symbol("x-opt-enqueued-time")];
    receiveLink.Accept(message);

    await connection.CloseAsync();
}


Receive by offset from a partition

static async Task ReceiveByOffsetFromPartition(string eventhubHost, string eventhubname, string partitionName, string offset)
{
    // create address
    var address = new Address(eventhubHost);

    // create connection
    var connection = await address.ConnectAsync();

    // create session
    var session = new Session(connection);

    // create a receiver from the default consumer group of the partition after the specified offset
    Map filters = new Map();
    filters.Add(
        new Symbol("apache.org:selector-filter:string"),
        new DescribedValue(
            new Symbol("apache.org:selector-filter:string"),
            "amqp.annotation.x-opt-offset > '" + offset + "'"));

    ReceiverLink receiver = new ReceiverLink(
        session,
        "receiver-" + eventhubname,
        new Source()
        {
            Address = eventhubname + "/ConsumerGroups/$default/Partitions/" + partitionName,
            FilterSet = filters
        });
    var message = await receiveLink.ReceiveAsync();
    var offset = message.MessageAnnotations[new Symbol("x-opt-offset")];
    var seqNumber = message.MessageAnnotations[new Symbol("x-opt-sequence-number")];
    var enqueuedTime = message.MessageAnnotations[new Symbol("x-opt-enqueued-time")];
    receiveLink.Accept(message);

    await connection.CloseAsync();
}

Last edited Mar 16, 2015 at 10:51 PM by xinchen, version 26