ColdFusion and Azure Service Bus

Introduction

Microsoft Azure Service Bus is a managed enterprise integration message broker. Service Bus can decouple applications and services. Service Bus offers a platform for asynchronous data and state transfer.

Data is transferred between different applications and services using messages. A message is in binary format and can contain JSON, XML, or just text.

The Data is transferred between different applications and services using messages. A message is in binary format and can contain JSON, XML, or just text.

You can use Service Bus to transfer data, increase scalability, implement workflows, manage topics and subscriptions, and many more.

For more information, see Overview of Service Bus.

Namespace

A namespace is a container for all messaging components. You can have multiple queues and topics in a single namespace. Namespaces often serve as containers for various applications and services.

Message

Messages are sent to and received from queues. Queues store messages until the receiving application is available to receive and process them.

Topic

You can also use topics to send and receive messages. While a queue is often used for point-to-point communication, topics are useful in publish/subscribe scenarios.

Get started

Install azureservicebus package

Adobe ColdFusion (2021 release) is modularized, if you are only using the ZIP installer. By default, the package for Azure Service Bus is not installed. The first step is to install the Service Bus package in ColdFusion.

Note: If you are using the GUI installer, the packages are pre-installed.

The package for Service Bus is called azureservicebus.

To install the package azureservicebus, use the Package Manager page in the ColdFusion Administrator, or follow the steps below:

  1. Navigate to <CF_HOME>/cfusion/bin.

  2. Enter the command:

    • Windows: cfpm.bat
    • Linux: cfpm.sh
  3. Enter the command, install azureservicebus.

    Wait for the Azure Service Bus package to get installed.

For more information, see ColdFusion Package Manager.

Add cloud service credentials and configuration

In ColdFusion (2021 release), there is a method getCloudService() that gives you a handle to create objects for accessing various cloud services.

The syntax of the service handle is as follows:

service=getCloudService(cloudCred,cloudConfig), where:

  • cloudCred: Defines the credentials for the cloud service. It could either be a struct or a string (also known as credential alias).
  • cloudConfig: Defines the cloud service configuration details. It could either be a struct or a string (also known as config alias).

After you've acquired the Azure credentials, you must declare these credentials in one of the following ways. Only then you can use these credentials to create a Service Bus object, after which you can use the object to make calls to the various Azure Service Bus methods.

ColdFusion Administrator

Set credentials

In the ColdFusion Administrator, click Data & Services > Cloud Credentials

An alias is a named representation of a cloud service and its configuration details. You can set the config alias through ColdFusion Administrator.

A connection string includes the authorization information required for your application to access data in an Azure Storage account.

For more information, see Configure Azure Storage connection strings.

After entering the details, click Add Credential.

Set configuration options

In the ColdFusion Administrator, click Data & Services > Cloud Configuration

Enter the following details, like configuration Alias, Vendor, and the name of the service.

Add cloud configuration options
Add cloud configuration options

After adding the configuration options, you may need to add a few more options. You can do so in the next screen. The following are a few options that you may need to add:

  • Operation Timeout
  • Retry Policy

For more information, see Cloud configuration options.

Create the object

Once you've created the aliases for Blob credential and configuration options, you can create the object by using the getCloudService API, and include the following in your CFM. 

sbObject= getCloudService("sbCred", "sbConf")

Application.cfc

You can specify the Blob credentials and configuration options in Application.cfc. For example,

<cfscript> 
        this.sqsCred  = { 
            "alias" : "firstAWSCred", 
            "vendorName" : "AZURE", 
            "region" : "us-west-2", 
            "secretAccessKey" : "xxxxxxxxxxxxxxxxxxx", 
            "accessKeyId" : "xxxxxxxxxxxxxxxxxx" 
        } 
         
        this.sqsConf  = { 
            "serviceName" : "SERVICE_BUS" 
        } 
        this.serviceBusCred = { 
            "vendorName" : "AZURE", 
                "connectionString" : "xxxxxxxxxx" 
        } 
  
        this.serviceBusConf = { 
            "serviceName" : "SERVICE_BUS" 
        } 
    </cfscript>

Create the object

sbObject = getCloudService(this.serviceBusCred, this.serviceBusConf)

On CFM page

On a CFM page, you can specify the Service Bus credentials and configuration options in one of the four methods, specified below:

Credential alias and configuration alias

After you've created the aliases for Azure Blob credential and configuration options, you can use these aliases in the getCloudService handle as shown below:

<cfscript> 
  // define the credential and the configuration aliases in the ColdFusion Admin 
  sbObject=getCloudService("sbCred","sbConf") 
  // code below. 
  ........... 
</cfscript>

Credential Alias and Struct for configuration options

<cfscript> 
    sbCred = { 
        "vendorName" : "AZURE", 
        "connectionString" : "xxxxxx" 
    } 
    sbConf = { 
        "serviceName" : "AZURE_SERVICEBUS", 
        "options" : { 
            "operationTimeout" : "10s", 
            "retryPolicy":"default" 
         }     
        
    } 
</cfscript>

Configuration alias and struct for credentials

<cfscript> 
    // Using config alias and struct for service credentials 
    // service bus credentials 
     sbCred= { 
        "vendorName" : "AZURE", 
        "connectionString" : "xxxxxx" 
    } 
    sb= getCloudService(sbCred, "sbConf") 
    // code below 
    ..................................... 
</cfscript>

Structs for both credential and configuration options

<cfscript> 
  // Using Structs for both cloud credential and config 
  sbCred = { 
        "vendorName" : "AZURE", 
        "connectionString" : "xxxxxx" 
    } 
    sbConf = { 
        "serviceName" : "AZURE_SERVICEBUS", 
        "options" : { 
            "operationTimeout" : "10s", 
            "retryPolicy":"default" 
         }     
        
    } 
  sb= getCloudService(sbCred , sbConf ) 
   
  // code below 
  ................................................................... 
</cfscript>

Admin API

You can also add Service Bus credentials and configuration options by using the Admin APIs. The methods to add credentials and configuration are available in cloud.cfc.

The examples below demonstrate the usage of the methods addCredential(struct credential) and addServiceConfig(struct config).

Add credentials

<cfscript> 
  // Create an object of administrator component and call the login method 
  adminObj = createObject("component","cfide.adminapi.administrator") 
  adminObj.login("admin") 
   
  // Create an object of cloud component 
  cloudObj = createObject("component","cfide.adminapi.cloud") 
   
  // define credentials struct 
  credentialStruct= { 
        "vendorName" : "AZURE", 
        "connectionString" : "xxxxxx" 
    } 
   
  // add credential credentialStruct 
  try{ 
    cloudObj.addCredential(credentialStruct) 
    writeOutput("Credentials added successfully") 
  } 
  catch(any e){ 
      writeDump(e) 
  } 
</cfscript>

Add configuration

<cfscript> 
  // Create an object of administrator component and call the login method 
  adminObj = createObject("component","cfide.adminapi.administrator") 
  adminObj.login("admin") 
   
  // Create an object of cloud component 
  cloudObj = createObject("component","cfide.adminapi.cloud") 
   
  // define configuration struct 
  configStruct= { 
        "serviceName" : "AZURE_SERVICEBUS", 
        "options" : { 
            "operationTimeout" : "10s", 
            "retryPolicy":"default" 
         }     
        
    } 
  // add config configStruct 
  try{ 
    cloudObj.addServiceConfig(configStruct) 
    writeOutput("Configuration service added successfully") 
  } 
  catch(any e){ 
    writeDump(e) 
  } 
</cfscript>

CFSetup

You can also set up SQS credential and configuration via the CFSetup configuration utility.

Add cloud credential

  • add cloudcredential credentialAlias=s3cred accesskeyid=<access> secretaccesskey=<secret> region=ap-southeast-1 vendorname=AWS

Set credential

  • set cloudcredential snscred secretaccesskey=awssecret

Add cloud configuration

  • add cloudconfiguration serviceName=S3 alias=s3Config

Set configuration

  • set cloudconfiguration conf1 alias=conf2

ColdFusion Administrator

In ColdFusion Administrator > Settings, you can modify thread pool settins that are used for handling Service Bus events.

  • Core Pool Size: The minimum number of worker threads that must be kept alive.
  • Maximum Pool Size: The maximim number of threads that can be available in the pool.
  • Keep Alive Time: Timeout in ms that the idele threads have to wait.
sb-admin

Create a topic

In Service Bus, topics can have multiple, independent subscriptions. A subscriber to a topic can receive a copy of each message sent to that Topic. Multiple receivers could be allocated to a subscription. A topic can have zero or more subscriptions, but a subscription can only belong to one topic.

EXAMPLE

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    // topic metadata 
    createTopicMetadata = { 
        "autoDeleteOnIdle" = "45M", 
        "requiresDuplicateDetection" = "yes", 
        //"maxSize" = 1024, 
        "entityStatus" = "Active", 
        "userMetadata" = "my user metadata", 
        "enableBatchedOperations" = "true", 
        "enablePartitioning" = "yes" 
    } 
    //sb.deleteTopic("secondTopic") 
    tempTopic = sb.createTopic("Topic_005", createTopicMetadata) 
    //sb.deleteTopic("firstTopic") 
    writeDump(tempTopic) 
    //writeDump(sb.listTopics()) 
</cfscript>

Create a subscription

Applications receive messages from subscriptions in the same way that they receive them from queues by using a QueueClient or MessageSession object. A topic can have zero or more subscriptions, but a subscription can only belong to one topic. For more information, see Azure SB subscriptions.

EXAMPLE

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    subscriptionMetadata={ 
        "subscriptionName"="sub_1", 
        "topicPath"="Topic_005", 
        "lockDuration" = "77s", 
        "maxDeliveryCount" = 12, 
        "enableDeadLetteringOnMessageExpiration"="true", 
        "enableDeadLetteringOnFilterEvaluationException"="true" 
    } 
    mysub=sb.subscribe(subscriptionMetadata) 
    writeDump(mysub) 
</cfscript>

Send message to a queue

Once you’ve created a queue, you can send messages to the queue, using the function sendMessage. For more information, see the official docs.

EXAMPLE

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    sb.deleteQueue("Queue18") 
    // send message 
    sendMessageMetadata = { 
        "messageBody" = "my message is here", 
        "messageProperties" = { 
            "myatt1" = "att1,att2" 
        } 
    } 
    tempQ = sb.createQueue("Queue18") 
    //writedump(tempQ) 
    mypath=tempQ.getPath() 
    sendMessageResponse = tempQ.sendMessage(sendMessageMetadata) 
    writeDump(sendMessageResponse) 
</cfscript>

Member function

<cfscript> 
              sb = getcloudService(this.serviceBusCred, this.serviceBusConf); 
              myQ = sb.listQueues().get("queueList")[1] 
              sendMessage() 
               
                public function sendMessage() { 
                             writeoutput("send message response") 
                             closure = function (){ 
                                           writeoutput("my function my choice")    
                             } 
                             sendMessageMetadata = { 
                                           "messageBody" = "my message my choice", 
                                           "messageProperties" = { 
                                                          "myatt1" = "att1,att2" 
                                           } 
                             } 
                             sendMessageResponse = myQ.sendMessage(sendMessageMetadata) 
                             writeDump(sendMessageResponse) 
              } 
</cfscript>

List queues

To list all queues, use the function listQueues.

EXAMPLE

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    // list all queues 
    allQueues=sb.listQueues() 
    writeDump(allQueues) 
</cfscript>

Member function

<cfscript> 
              sb = getcloudService(this.serviceBusCred, this.serviceBusConf); 
              myQ = sb.listQueues().get("queueList")[1] 
              writeDump(myQ.getDescription()) 
</cfscript>

Delete a queue

To delete a queue, use the function deleteQueue. This function the name of the queue as parameter.

EXAMPLE

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    // list all queues 
    try{ 
        sb.deleteQueue("Queue8") 
    writeOutput("Queue deleted successfully") 
    } 
    catch (any e){ 
    writeOutput("Unable to delete queue") 
    } 
</cfscript>

Update a queue

To update a queue, use the function updateQueue. This function accepts the following parameters:

  • Name of the queue
  • Metadata struct

The metadata struct consists of the following values:

  • "autoDeleteOnIdle" = "time",
  • "enableDeadLetteringOnMessageExpiration" = false|true,
  • "entityStatus" = "Disabled|Enabled",
  • "userMetadata" = "some metadata",
  • "enableBatchedOperations" = "no|yes"

EXAMPLE

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    // define queue attributes 
    lockDuration = 51 
    qName = "Q_001" 
    queueAttributes = { 
        "autoDeleteOnIdle" = "240h", 
        "enableDeadLetteringOnMessageExpiration" = true, 
        "maxDeliveryCount" = 101, 
        "requiresDuplicateDetection" = "YES", // cannot be changed w/ update. 
        "lockDuration" = "#lockDuration#s", 
        //"maxSizeInMB" = 2048, //check for default value 
        "entityStatus" = "Active", 
        "userMetadata" = "some user metadata : data val", 
        "enableBatchedOperations" = true, 
        "enablePartitioning" = false // cannot be changed w/ update. 
    } 
    queueAttributesUpdated = { 
        "autoDeleteOnIdle" = "10M", 
        "enableDeadLetteringOnMessageExpiration" = false, 
        "entityStatus" = "Disabled", 
        "userMetadata" = "some new user metadata", 
        "enableBatchedOperations" = "no" 
    } 
    // create the queue 
    sb.createQueue(qName, queueAttributes) 
    try{ 
        sb.updateQueue(qName, queueAttributesUpdated) 
    writeOutput("Queue updated successfully.") 
    } 
    catch (any e){ 
        writeOutput("Could not update queue.") 
    } 
</cfscript>

Prefetch

When Prefetch is enabled in any of the official Service Bus clients, the receiver quietly acquires more messages, up to the PrefetchCount limit, beyond what the application initially asked for. For more information, see Service bus prefetch.

setPrefetchCount

EXAMPLE

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    // create a queue 
    myQueue=sb.createQueue("Q_004") 
    // set prefetch count 
    try{ 
        setCount=sb.setPrefectCount(myQueue,50) 
        writeOutput("Prefetch count set successfully.") 
    } 
    catch (any e){ 
    writeOutput("Unable to set prefetch count.") 
    } 
</cfscript>

getPrefetchCount

Example

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    // get prefetch count 
    getCount=sb.getPrefetchCount("Q_004") 
    writeDump(getCount) 
</cfscript>

Send messages to a queue

Use the function sendMessageBatch to send a batch of messages to a queue. The function accepts the following parameters:

  1. Name of the queue
  2. Message body to be sent to the queue

Example

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    msgLockDurationInQ = 3 
    pauseTimeInSecs = (msgLockDurationInQ+3) * 1000 
    msgCount = 5 
    qName = "Q_batchOps" 
    queueAttributes = { 
        "maxDeliveryCount" = 100, 
        "lockDuration" = "#msgLockDurationInQ#s" 
    } 
    // create a queue 
    myQueue=sb.createQueue(qName,queueAttributes) 
    // send messages in a batch 
    msgBatch = arrayNew(1) 
    for(n=1; n <= msgCount; n++){ 
    msgInstance = { 
        "messageId" = "#n#", 
        "messageBody" = "msg time stamp - #now()#" 
    } 
    msgBatch[n] = msgInstance 
    } 
    messageBatchResponse=sb.sendMessageBatch(qName, msgBatch) 
    writeDump(messageBatchResponse) 
</cfscript>

Schedule a message

You can submit messages to a queue or topic for delayed processing; for example, to schedule a job to become available for processing by a system at a certain time. For more information, see the official docs.

Example

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    // create a queue 
    myQueue=sb.createQueue("Q_006") 
    schedMessage={ 
        "messageId" = "001", 
        "messageBody" = "message body", 
        "ScheduledEnqueueTimeUtc"="02/02/2020" 
    } 
    scheduleResponse=sb.scheduleMessageToQueue(myQueue, schedMessage) 
    writeDump(scheduleResponse) 
</cfscript>

Browse a message

Message browsing, or peeking, enables a Service Bus client to enumerate all messages that reside in a queue or subscription, typically for diagnostic and debugging purposes. For more information, see the official docs.

Example

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    msgCount = 3 
    msgLockDuration = 3 
    qName = "Q_Peek_Ex" 
    queueAttributes = { 
        "maxDeliveryCount" = msgCount, 
        "lockDuration" = "#msgLockDuration#s" 
    } 
    queue = sb.createQueue(qName, queueAttributes) 
    peekProps = { 
        "fromSequenceNumber" = 1, 
        "messageCount" = msgCount - 1 
    } 
    writeOutput("peeking q msgs...<br>") 
    writeDump(queue.peekMessage(peekProps)) 
</cfscript>

Update a topic

After you create a topic, you can update the topic using the updateTopic function. The function accepts the following parameters:

  • Topic
  • Topic metadata

Example

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    // define topic attributes 
    topicName = "topic_001"; 
    autoDeleteOnIdle = 45 
    topicAttributes = { 
        "autoDeleteOnIdle" = "#autoDeleteOnIdle#M", 
        //"requiresDuplicateDetection" = "yes", //cannot be modified. def.: false 
        "maxSizeInMB" = 1024, 
        "entityStatus" = "Active", 
        "userMetadata" = "modified metadata", 
        "enableBatchedOperations" = true, 
        "enablePartitioning" = "no" 
    } 
    // create a topic 
    topic = sb.createTopic(topicName) //create with default attributes 
    writeOutput("created topic: " & topic.getPath() & "<br>") 
    // update the topic 
    try{ 
        sb.updateTopic(topicName, topicAttributes) 
        writeOutput("Topic " & topicName & " updated successfully") 
    } 
    catch (any e){ 
        writeOutput("Unable to update topic") 
    } 
</cfscript>

Delete a topic

Delete a topic that you’d created. Use the function deleteTopic. The function accepts the topicPath as argument.

Example

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    topicName="topic_001" 
    // delete the previously created topic- "topic_001" 
    try{ 
        sb.deleteTopic(topicName) 
        writeOutput("Topic " & topicName & " deleted successfully") 
    } 
    catch(any e){ 
    writeOutput("Unable to delete the topic") 
} 
</cfscript>

List topics

List all topics in an account. Use the function listTopics.

Example

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    // create two topics 
    sb.createTopic("T1") // Topic 1 
    sb.createTopic("T2") // Topic 2 
    // list all topics 
    listResponse=sb.listTopics() 
    writeDump(listResponse) 
</cfscript>

Unsubscribe from a topic

You can unsubscribe to a topic using the unsubscribe method.

Example

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    topicName = "topic-for-sub"; 
    subscriptionName = "subsciption-001" 
    subscriptionAttribs = { 
        "subscriptionName" = subscriptionName, 
        "autoDeleteOnIdle" = "5M", 
        "maxDeliveryCount" = 10, 
        "lockDuration" = "10s", 
        "defaultMessageTimeToLive" = "10M" 
    } 
    // create topic 
    try{ 
        topic=sb.createTopic(topicName) //create w/ default attributes 
        writeOutput("Topic created successfully") 
    } 
    catch(any e){ 
        writeOutput("Unable to create topic") 
    } 
    // create subscription 
    try{ 
        subscription = topic.subscribe(topicName, subscriptionAttribs) //create w/ default attributes 
    writeOutput("Subscribed to topic successfully") 
    } 
    catch(any e){ 
    writeOutput("Unable to subscribe to topic") 
    } 
    // unsubscribe to topic 
    //unsubscribe(String topicPath, String subscriptionName) 
    try{ 
        sb.unsubscribe(topic,subscription) 
        writeOutput("Unsubscribed successfully") 
    } 
    catch(any e){ 
    writeOutput("Unable to unsubscribe") 
    } 
</cfscript>

Get subscription name

To retrieve the name of a subscription, previously, created, use the function getSubscription. The function accepts the name of the topic and the subscription.

Example

<cfscript> 
sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
topicName = "testTopic" 
// create a topic 
topic=sb.createTopic(topicName) 
subscriptionName = "sub-00" 
// get a subscription 
subscription = topic.getSubscription(subscriptionName) 
writeDump(subsciption) 
</cfscript>

Register message handler

The RegisterMessageHandler method registers handlers. The handler is invoked when the event happens.

For more information, see the official docs.

Example

<cfscript> 
sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
// void registerMessageHandler(String topicPath, String subscriptionName, Map metadata) 
// create topic 
// topic metadata 
createTopicMetadata = { 
"autoDeleteOnIdle" = "45M", 
"requiresDuplicateDetection" = "yes", 
//"maxSize" = 1024, 
"entityStatus" = "Active", 
"userMetadata" = "my user metadata", 
"enableBatchedOperations" = "true", 
"enablePartitioning" = "yes" 
} 
topic = sb.createTopic("Topic_013", createTopicMetadata) 
// create a subscription 
subscriptionName = "Sub_013" 
subscriptionAttribs = { 
"subscriptionName" = subscriptionName, 
"autoDeleteOnIdle" = "5M", 
"maxDeliveryCount" = 10, 
"lockDuration" = "10s", 
"defaultMessageTimeToLive" = "10M" 
} 
subscription = topic.subscribe(subscriptionAttribs) 
messageHandlerOptions={ 
"autocomplete"="yes", 
"maxautorenewduration"="50M", 
"maxconsurrentcalls"=30 
} 
renewMessageResponse=sb.registerMessageHandler(topic,subscription,messageHandlerOptions) 
writeDump(renewMessageResponse) 
</cfscript>

Abandon message

Abandon subscription message

Discards all messages to a subscription.

Syntax

abandonSubscriptionMessage(String formattedSubscriptionPath, Map metadata)

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    topicName = "topic_003"; 
    topic = topicUtil.createTopic(sb, topicName, false, {}) //purgeAndRecreate, attribs 
    originalLabel = "Red" 
    adandonCountLabel = "adandonCount" 
    subscriptionName = "sub_003" 
    subscription = topic.subscribe({ 
        "subscriptionName" = subscriptionName 
    }) 
 
    updatedMessageProperties = { 
        "DeadLetterReason" = "reason for this", 
        "DeadLetterErrorDescription" = "collateral"  
    } 
    abandonResponse = sb.abandonSubscriptionMessage(subscription.getTopicName(), subscription.getSubscriptionName(), updatedMessageProperties) 
    writeDump(abandonResponse) 
</cfscript>

Abandon message

Abandons a message with a lock token. This will make the message available again for processing. Abandoning a message will increase the delivery count on the message.

Syntax

abandonMessage(String queuePath, Map metadata)

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    qName = "cfqa-abandon-msg" 
    // create queue 
    queue = sb.createQueue(qName) 
    oldLabel = "red" 
    newLabel = "blue" 
    sentMessage = sendMessage(queue, oldLabel) 
    writeOutput("peeking at the queue: #qName#, before abandoning message.<br>") 
 receivedMessage = sb.receiveMessagesFromQueue(queue, "PEEKLOCK", 1) //READ_MODE_RECEIVEANDDELETE 
 for(message in receivedMessage){ 
   //cfdump(var=#message#, label="message received.") 
  abandonMessage(qName, message.lockToken, newLabel) 
 }   
</cfscript>

Defer message

Defer message

When a queue or subscription client receives a message that it is willing to process, but for which processing is not currently possible due to special circumstances inside of the application, it has the option of "deferring" retrieval of the message to a later point. The message remains in the queue or subscription, but it is set aside.

Syntax

deferMessage(String queuePath, Map metadata)

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    qName="queue_002" 
    // create queue 
    queue = sb.createQueue(qName, {"lockDuration": "2s"}) 
    // send message to the queue 
    msgBatch = arrayNew(1) 
    for(n=1; n <= msgCount; n++){ 
    msgInstance = { 
        "messageId" = "#n#", 
        "messageBody" = "msg time stamp - #now()#" 
    } 
    msgBatch[n] = msgInstance 
    } 
    sb.sendMessageBatch(qName, msgBatch) 
    receivedMessage = sb.receiveMessagesFromQueue(queue, "PEEKLOCK", 1) //READ_MODE_RECEIVEANDDELETE 
    // defer message 
    for(message in receivedMessage){ 
        propertiesToModify = {"deferred_time:" : dateTimeFormat(now())} 
     sb.deferMessage(qName, {  
   "lockToken" = message.lockToken ,  
   "propertiesToModify" = propertiesToModify 
     })  
    } 
</cfscript>

Receive deferred message

Deferred messages remain in the main queue along with all other active messages (unlike dead-letter messages that live in a subqueue), but they can no longer be received using the regular Receive/ReceiveAsync functions. Deferred messages can be discovered via message browsing if an application loses track of them.

Syntax

receiveDeferredMessage(String queuePath, Map metadata)

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    qName="queue_002" 
    // create queue 
    queue = sb.createQueue(qName, {"lockDuration": "2s"}) 
    // send message to the queue 
    msgBatch = arrayNew(1) 
    for(n=1; n <= msgCount; n++){ 
    msgInstance = { 
        "messageId" = "#n#", 
        "messageBody" = "msg time stamp - #now()#" 
    } 
    msgBatch[n] = msgInstance 
    } 
    sb.sendMessageBatch(qName, msgBatch) 
    receivedMessage = sb.receiveMessagesFromQueue(queue, "PEEKLOCK", 1) //READ_MODE_RECEIVEANDDELETE 
    // defer message 
    for(message in receivedMessage){ 
        propertiesToModify = {"deferred_time:" : dateTimeFormat(now())} 
     sb.deferMessage(qName, {  
   "lockToken" = message.lockToken ,  
   "propertiesToModify" = propertiesToModify 
     })  
    } 
    // receive deferred message 
    receivedDeferredMessage = sb.receiveDeferredMessage(qName, 
  { 
   "sequenceNumber" : seqNo 
  } 
    ) 
    writeDump(receivedDeferredMessage) 
</cfscript>

Dead letter operations

Azure Service Bus queues and topic subscriptions provide a secondary sub-queue, called a dead-letter queue. The dead-letter queue need not be explicitly created and cannot be deleted. The purpose of the dead-letter queue is to hold messages that cannot be delivered to any receiver, or messages that cannot be processed.

Dead letter message

Syntax

deadLetterMessage(String queuePath, Map metadata)

Dead letter subscription message

Syntax

deadLetterSubscriptionMessage(String formattedSubscriptionPath, Map metadata)

Rules

A Filter or a Subscription Rule specifies which messages that have been sent to that Topic must be received by a Topic Subscription. Whenever a Topic Subscription is created, a default Topic Subscription Rule gets created, which enables all the messages sent to that Topic to be received by it.

Add a subscription rule

Syntax

addSubscriptionRule(String topicPath, String subscriptionName, Map metadata)

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    // create a topic 
    topicName = "topic_001" 
    topic = sb.createTopic(topicName) 
    subscriptionName = "subscription_001" 
    // subscribe to topic 
    sb.subscribe( topicName, { 
        "subscriptionName" = subscriptionName 
      } 
    ) 
    // get rules 
    filterRuleTrue = { 
        "type" = "true", 
        "name" = "trueFilter" 
    } 
    sb.addSubscriptionRule(topicName, subscriptonName, filterRuleTrue) 
</cfscript>

Remove a subscription rule

Syntax

removeSubscriptionRule(String topicPath, String subscriptionName, String ruleName)

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    // create a topic 
    topicName = "topic_001" 
    topic = sb.createTopic(topicName) 
    subscriptionName = "subscription_001" 
    // subscribe to topic 
    sb.subscribe( topicName, { 
        "subscriptionName" = subscriptionName 
      } 
    ) 
    // get rules 
    filterRuleTrue = { 
        "type" = "true", 
        "name" = "trueFilter" 
    } 
    sb.addSubscriptionRule(topicName, subscriptonName, filterRuleTrue) 
    removeRuleResponse=sb.removeSubscriptionRule(topicName, subscriptonName, filterRuleTrue) 
</cfscript>

Retrieve a subscription rule

Syntax

getSubscriptionRules(String formattedSubscriptionPath)

Schedule messages

You can schedule a message to a queue or topic. The message is enqueued into the bus to scheduled time message, the message is delivered to the receiver end. This is currently asynchronous process for better performance.

Schedule message to queue

Syntax

scheduleMessageToQueue(String queuePath, Map metadata)

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    // create a queue 
    myQueue=sb.createQueue("Q_006") 
    schedMessage={ 
        "messageId" = "001", 
        "messageBody" = "message body", 
        "ScheduledEnqueueTimeUtc"="02/02/2020" 
    } 
    scheduleResponse=sb.scheduleMessageToQueue(myQueue, schedMessage) 
    writeDump(scheduleResponse) 
</cfscript>

Schedule message to topic

Syntax

scheduleMessageToTopic(String topicPath, Map metadata)

Cancel scheduled message to topic

Syntax

cancelScheduledTopicMessage(String topicPath, long sequenceNumber)

Cancel scheduled message to queue

Syntax

cancelScheduledQueueMessage(String queuePath, long sequenceNumber)

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    qName = "cancel-scheduled-1msg2" 
 delayInSecs = 30 
    msgCount = 1 
    // create a queue 
    queue = sb.createQueue(qName) 
    if(!listFindNoCase(qUtil.getQueuesListStr(sb), qName)) 
  writeOutput("queue #qName# NOT created. This is NOT expected.<br>") 
 else 
        writeOutput("queue #qName# created successfully.<br>") 
    messageArrayToSend = getMessageArray(msgCount) 
    seqNo = scheduler.scheduleMessage(sb, qName, messageArrayToSend[1], delayInSecs) 
    writeOutput("cancelling message with sequence no: #seqNo#<br>") 
 cancelMessageResponse = sb.cancelScheduledMessage(qName, seqNo) 
 if(cancelMessageResponse[statusCode] NEQ 200) 
  writeOutput("error cancelling msg for #qName#, seqno. #seqNo#<br>") 
</cfscript>

Message browsing

The Peek method enumerates all messages that exist in the queue or subscription log, in a sequence, from the lowest to the highest.

Peek message from queue

Syntax

peekMessageFromQueue(String queuePath, Struct metadata)

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    // create queue 
    qName="queue-005" 
    queue = sb.createQueue(qName) 
    peekProperties = { 
     "fromSequenceNumber" = 1, 
     "messageCount" = 10 
    }  
    peekResponse = sb.peekMessageFromQueue(queue.getPath(), peekProperties) 
    receivedMsgArray = peekResponse.messages 
    writeDump(receivedMsgArray)    
</cfscript>

Peek messages from subscription

Syntax

peekMessageFromSubscription(String topicPath, String subscriptionName, Struct metadata)

<cfscript> 
    sb = getcloudService(this.serviceBusCred, this.serviceBusConf) 
    qName = "q-msg-Peek-x4" 
 msgCount = 3 
    msgLockDuration = 2 
    // create a queue 
    queue=sb.createQueue(qName,{ 
        "lockDuration"=msgLockDuration, 
        "maxDeliveryCount"=msgCount 
    }) 
    // set peek properties 
    peekProps = { 
        "fromSequenceNumber" = 1, 
        "messageCount" = msgCount 
    } 
    peekResponse = sb.peekMessage(queue.getPath(), peekProps) // array of size  msgCount - 1 
 
    if(peekResponse[statusCode] == 200){ 
  receivedMessages = peekResponse.messages 
  writeOutput("received message deserialized:<br>") 
 } 
 else 
  writeOutput("message browsing failed") 
</cfscript>

 Adobe

Получайте помощь быстрее и проще

Новый пользователь?