Navigate to <CF_HOME>/cfusion/bin.
Introduction
Microsoft Azure Service Bus is a managed enterprise integration message broker. Service Bus can decouple applications and services. Service Bus offers a platform for asynchronous data and state transfer.
Data is transferred between different applications and services using messages. A message is in binary format and can contain JSON, XML, or just text.
The Data is transferred between different applications and services using messages. A message is in binary format and can contain JSON, XML, or just text.
You can use Service Bus to transfer data, increase scalability, implement workflows, manage topics and subscriptions, and many more.
For more information, see Overview of Service Bus.
Namespace
A namespace is a container for all messaging components. You can have multiple queues and topics in a single namespace. Namespaces often serve as containers for various applications and services.
Message
Messages are sent to and received from queues. Queues store messages until the receiving application is available to receive and process them.
Topic
You can also use topics to send and receive messages. While a queue is often used for point-to-point communication, topics are useful in publish/subscribe scenarios.
Get started
Install azureservicebus package
Adobe ColdFusion (2021 release) is modularized, if you are only using the ZIP installer. By default, the package for Azure Service Bus is not installed. The first step is to install the Service Bus package in ColdFusion.
Note: If you are using the GUI installer, the packages are pre-installed.
The package for Service Bus is called azureservicebus.
To install the package azureservicebus, use the Package Manager page in the ColdFusion Administrator, or follow the steps below:
-
-
Enter the command:
- Windows: cfpm.bat
- Linux: cfpm.sh
-
Enter the command, install azureservicebus.
Wait for the Azure Service Bus package to get installed.
For more information, see ColdFusion Package Manager.
Add cloud service credentials and configuration
In ColdFusion (2021 release), there is a method getCloudService() that gives you a handle to create objects for accessing various cloud services.
The syntax of the service handle is as follows:
service=getCloudService(cloudCred,cloudConfig), where:
- cloudCred: Defines the credentials for the cloud service. It could either be a struct or a string (also known as credential alias).
- cloudConfig: Defines the cloud service configuration details. It could either be a struct or a string (also known as config alias).
After you've acquired the Azure credentials, you must declare these credentials in one of the following ways. Only then you can use these credentials to create a Service Bus object, after which you can use the object to make calls to the various Azure Service Bus methods.
ColdFusion Administrator
Set credentials
In the ColdFusion Administrator, click Data & Services > Cloud Credentials.
An alias is a named representation of a cloud service and its configuration details. You can set the config alias through ColdFusion Administrator.
A connection string includes the authorization information required for your application to access data in an Azure Storage account.
For more information, see Configure Azure Storage connection strings.
After entering the details, click Add Credential.
Set configuration options
In the ColdFusion Administrator, click Data & Services > Cloud Configuration.
Enter the following details, like configuration Alias, Vendor, and the name of the service.
After adding the configuration options, you may need to add a few more options. You can do so in the next screen. The following are a few options that you may need to add:
- Operation Timeout
- Retry Policy
For more information, see Cloud configuration options.
Create the object
Once you've created the aliases for Blob credential and configuration options, you can create the object by using the getCloudService API, and include the following in your CFM.
sbObject= getCloudService("sbCred", "sbConf")
Application.cfc
You can specify the Blob credentials and configuration options in Application.cfc. For example,
<cfscript> this.sqsCred = { "alias" : "firstAWSCred", "vendorName" : "AZURE", "region" : "us-west-2", "secretAccessKey" : "xxxxxxxxxxxxxxxxxxx", "accessKeyId" : "xxxxxxxxxxxxxxxxxx" } this.sqsConf = { "serviceName" : "SERVICE_BUS" } this.serviceBusCred = { "vendorName" : "AZURE", "connectionString" : "xxxxxxxxxx" } this.serviceBusConf = { "serviceName" : "SERVICE_BUS" } </cfscript>
Create the object
sbObject = getCloudService(this.serviceBusCred, this.serviceBusConf)
On CFM page
On a CFM page, you can specify the Service Bus credentials and configuration options in one of the four methods, specified below:
Credential alias and configuration alias
After you've created the aliases for Azure Blob credential and configuration options, you can use these aliases in the getCloudService handle as shown below:
<cfscript> // define the credential and the configuration aliases in the ColdFusion Admin sbObject=getCloudService("sbCred","sbConf") // code below. ........... </cfscript>
Credential Alias and Struct for configuration options
<cfscript> sbCred = { "vendorName" : "AZURE", "connectionString" : "xxxxxx" } sbConf = { "serviceName" : "AZURE_SERVICEBUS", "options" : { "operationTimeout" : "10s", "retryPolicy":"default" } } </cfscript>
Configuration alias and struct for credentials
<cfscript> // Using config alias and struct for service credentials // service bus credentials sbCred= { "vendorName" : "AZURE", "connectionString" : "xxxxxx" } sb= getCloudService(sbCred, "sbConf") // code below ..................................... </cfscript>
Structs for both credential and configuration options
<cfscript> // Using Structs for both cloud credential and config sbCred = { "vendorName" : "AZURE", "connectionString" : "xxxxxx" } sbConf = { "serviceName" : "AZURE_SERVICEBUS", "options" : { "operationTimeout" : "10s", "retryPolicy":"default" } } sb= getCloudService(sbCred , sbConf ) // code below ................................................................... </cfscript>
Admin API
You can also add Service Bus credentials and configuration options by using the Admin APIs. The methods to add credentials and configuration are available in cloud.cfc.
The examples below demonstrate the usage of the methods addCredential(struct credential) and addServiceConfig(struct config).
Add credentials
<cfscript> // Create an object of administrator component and call the login method adminObj = createObject("component","cfide.adminapi.administrator") adminObj.login("admin") // Create an object of cloud component cloudObj = createObject("component","cfide.adminapi.cloud") // define credentials struct credentialStruct= { "vendorName" : "AZURE", "connectionString" : "xxxxxx" } // add credential credentialStruct try{ cloudObj.addCredential(credentialStruct) writeOutput("Credentials added successfully") } catch(any e){ writeDump(e) } </cfscript>
Add configuration
<cfscript> // Create an object of administrator component and call the login method adminObj = createObject("component","cfide.adminapi.administrator") adminObj.login("admin") // Create an object of cloud component cloudObj = createObject("component","cfide.adminapi.cloud") // define configuration struct configStruct= { "serviceName" : "AZURE_SERVICEBUS", "options" : { "operationTimeout" : "10s", "retryPolicy":"default" } } // add config configStruct try{ cloudObj.addServiceConfig(configStruct) writeOutput("Configuration service added successfully") } catch(any e){ writeDump(e) } </cfscript>
CFSetup
You can also set up SQS credential and configuration via the CFSetup configuration utility.
Add cloud credential
- add cloudcredential credentialAlias=s3cred accesskeyid=<access> secretaccesskey=<secret> region=ap-southeast-1 vendorname=AWS
Set credential
- set cloudcredential snscred secretaccesskey=awssecret
Add cloud configuration
- add cloudconfiguration serviceName=S3 alias=s3Config
Set configuration
- set cloudconfiguration conf1 alias=conf2
ColdFusion Administrator
In ColdFusion Administrator > Settings, you can modify thread pool settins that are used for handling Service Bus events.
- Core Pool Size: The minimum number of worker threads that must be kept alive.
- Maximum Pool Size: The maximim number of threads that can be available in the pool.
- Keep Alive Time: Timeout in ms that the idele threads have to wait.
Create a topic
In Service Bus, topics can have multiple, independent subscriptions. A subscriber to a topic can receive a copy of each message sent to that Topic. Multiple receivers could be allocated to a subscription. A topic can have zero or more subscriptions, but a subscription can only belong to one topic.
EXAMPLE
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // topic metadata createTopicMetadata = { "autoDeleteOnIdle" = "45M", "requiresDuplicateDetection" = "yes", //"maxSize" = 1024, "entityStatus" = "Active", "userMetadata" = "my user metadata", "enableBatchedOperations" = "true", "enablePartitioning" = "yes" } //sb.deleteTopic("secondTopic") tempTopic = sb.createTopic("Topic_005", createTopicMetadata) //sb.deleteTopic("firstTopic") writeDump(tempTopic) //writeDump(sb.listTopics()) </cfscript>
Create a subscription
Applications receive messages from subscriptions in the same way that they receive them from queues by using a QueueClient or MessageSession object. A topic can have zero or more subscriptions, but a subscription can only belong to one topic. For more information, see Azure SB subscriptions.
EXAMPLE
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) subscriptionMetadata={ "subscriptionName"="sub_1", "topicPath"="Topic_005", "lockDuration" = "77s", "maxDeliveryCount" = 12, "enableDeadLetteringOnMessageExpiration"="true", "enableDeadLetteringOnFilterEvaluationException"="true" } mysub=sb.subscribe(subscriptionMetadata) writeDump(mysub) </cfscript>
Send message to a queue
Once you’ve created a queue, you can send messages to the queue, using the function sendMessage. For more information, see the official docs.
EXAMPLE
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) sb.deleteQueue("Queue18") // send message sendMessageMetadata = { "messageBody" = "my message is here", "messageProperties" = { "myatt1" = "att1,att2" } } tempQ = sb.createQueue("Queue18") //writedump(tempQ) mypath=tempQ.getPath() sendMessageResponse = tempQ.sendMessage(sendMessageMetadata) writeDump(sendMessageResponse) </cfscript>
Member function
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf); myQ = sb.listQueues().get("queueList")[1] sendMessage() public function sendMessage() { writeoutput("send message response") closure = function (){ writeoutput("my function my choice") } sendMessageMetadata = { "messageBody" = "my message my choice", "messageProperties" = { "myatt1" = "att1,att2" } } sendMessageResponse = myQ.sendMessage(sendMessageMetadata) writeDump(sendMessageResponse) } </cfscript>
List queues
To list all queues, use the function listQueues.
EXAMPLE
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // list all queues allQueues=sb.listQueues() writeDump(allQueues) </cfscript>
Member function
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf); myQ = sb.listQueues().get("queueList")[1] writeDump(myQ.getDescription()) </cfscript>
Delete a queue
To delete a queue, use the function deleteQueue. This function the name of the queue as parameter.
EXAMPLE
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // list all queues try{ sb.deleteQueue("Queue8") writeOutput("Queue deleted successfully") } catch (any e){ writeOutput("Unable to delete queue") } </cfscript>
Update a queue
To update a queue, use the function updateQueue. This function accepts the following parameters:
- Name of the queue
- Metadata struct
The metadata struct consists of the following values:
- "autoDeleteOnIdle" = "time",
- "enableDeadLetteringOnMessageExpiration" = false|true,
- "entityStatus" = "Disabled|Enabled",
- "userMetadata" = "some metadata",
- "enableBatchedOperations" = "no|yes"
EXAMPLE
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // define queue attributes lockDuration = 51 qName = "Q_001" queueAttributes = { "autoDeleteOnIdle" = "240h", "enableDeadLetteringOnMessageExpiration" = true, "maxDeliveryCount" = 101, "requiresDuplicateDetection" = "YES", // cannot be changed w/ update. "lockDuration" = "#lockDuration#s", //"maxSizeInMB" = 2048, //check for default value "entityStatus" = "Active", "userMetadata" = "some user metadata : data val", "enableBatchedOperations" = true, "enablePartitioning" = false // cannot be changed w/ update. } queueAttributesUpdated = { "autoDeleteOnIdle" = "10M", "enableDeadLetteringOnMessageExpiration" = false, "entityStatus" = "Disabled", "userMetadata" = "some new user metadata", "enableBatchedOperations" = "no" } // create the queue sb.createQueue(qName, queueAttributes) try{ sb.updateQueue(qName, queueAttributesUpdated) writeOutput("Queue updated successfully.") } catch (any e){ writeOutput("Could not update queue.") } </cfscript>
Prefetch
When Prefetch is enabled in any of the official Service Bus clients, the receiver quietly acquires more messages, up to the PrefetchCount limit, beyond what the application initially asked for. For more information, see Service bus prefetch.
setPrefetchCount
EXAMPLE
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // create a queue myQueue=sb.createQueue("Q_004") // set prefetch count try{ setCount=sb.setPrefectCount(myQueue,50) writeOutput("Prefetch count set successfully.") } catch (any e){ writeOutput("Unable to set prefetch count.") } </cfscript>
getPrefetchCount
Example
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // get prefetch count getCount=sb.getPrefetchCount("Q_004") writeDump(getCount) </cfscript>
Send messages to a queue
Use the function sendMessageBatch to send a batch of messages to a queue. The function accepts the following parameters:
- Name of the queue
- Message body to be sent to the queue
Example
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) msgLockDurationInQ = 3 pauseTimeInSecs = (msgLockDurationInQ+3) * 1000 msgCount = 5 qName = "Q_batchOps" queueAttributes = { "maxDeliveryCount" = 100, "lockDuration" = "#msgLockDurationInQ#s" } // create a queue myQueue=sb.createQueue(qName,queueAttributes) // send messages in a batch msgBatch = arrayNew(1) for(n=1; n <= msgCount; n++){ msgInstance = { "messageId" = "#n#", "messageBody" = "msg time stamp - #now()#" } msgBatch[n] = msgInstance } messageBatchResponse=sb.sendMessageBatch(qName, msgBatch) writeDump(messageBatchResponse) </cfscript>
Schedule a message
You can submit messages to a queue or topic for delayed processing; for example, to schedule a job to become available for processing by a system at a certain time. For more information, see the official docs.
Example
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // create a queue myQueue=sb.createQueue("Q_006") schedMessage={ "messageId" = "001", "messageBody" = "message body", "ScheduledEnqueueTimeUtc"="02/02/2020" } scheduleResponse=sb.scheduleMessageToQueue(myQueue, schedMessage) writeDump(scheduleResponse) </cfscript>
Browse a message
Message browsing, or peeking, enables a Service Bus client to enumerate all messages that reside in a queue or subscription, typically for diagnostic and debugging purposes. For more information, see the official docs.
Example
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) msgCount = 3 msgLockDuration = 3 qName = "Q_Peek_Ex" queueAttributes = { "maxDeliveryCount" = msgCount, "lockDuration" = "#msgLockDuration#s" } queue = sb.createQueue(qName, queueAttributes) peekProps = { "fromSequenceNumber" = 1, "messageCount" = msgCount - 1 } writeOutput("peeking q msgs...<br>") writeDump(queue.peekMessage(peekProps)) </cfscript>
Update a topic
After you create a topic, you can update the topic using the updateTopic function. The function accepts the following parameters:
- Topic
- Topic metadata
Example
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // define topic attributes topicName = "topic_001"; autoDeleteOnIdle = 45 topicAttributes = { "autoDeleteOnIdle" = "#autoDeleteOnIdle#M", //"requiresDuplicateDetection" = "yes", //cannot be modified. def.: false "maxSizeInMB" = 1024, "entityStatus" = "Active", "userMetadata" = "modified metadata", "enableBatchedOperations" = true, "enablePartitioning" = "no" } // create a topic topic = sb.createTopic(topicName) //create with default attributes writeOutput("created topic: " & topic.getPath() & "<br>") // update the topic try{ sb.updateTopic(topicName, topicAttributes) writeOutput("Topic " & topicName & " updated successfully") } catch (any e){ writeOutput("Unable to update topic") } </cfscript>
Delete a topic
Delete a topic that you’d created. Use the function deleteTopic. The function accepts the topicPath as argument.
Example
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) topicName="topic_001" // delete the previously created topic- "topic_001" try{ sb.deleteTopic(topicName) writeOutput("Topic " & topicName & " deleted successfully") } catch(any e){ writeOutput("Unable to delete the topic") } </cfscript>
List topics
List all topics in an account. Use the function listTopics.
Example
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // create two topics sb.createTopic("T1") // Topic 1 sb.createTopic("T2") // Topic 2 // list all topics listResponse=sb.listTopics() writeDump(listResponse) </cfscript>
Unsubscribe from a topic
You can unsubscribe to a topic using the unsubscribe method.
Example
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) topicName = "topic-for-sub"; subscriptionName = "subsciption-001" subscriptionAttribs = { "subscriptionName" = subscriptionName, "autoDeleteOnIdle" = "5M", "maxDeliveryCount" = 10, "lockDuration" = "10s", "defaultMessageTimeToLive" = "10M" } // create topic try{ topic=sb.createTopic(topicName) //create w/ default attributes writeOutput("Topic created successfully") } catch(any e){ writeOutput("Unable to create topic") } // create subscription try{ subscription = topic.subscribe(topicName, subscriptionAttribs) //create w/ default attributes writeOutput("Subscribed to topic successfully") } catch(any e){ writeOutput("Unable to subscribe to topic") } // unsubscribe to topic //unsubscribe(String topicPath, String subscriptionName) try{ sb.unsubscribe(topic,subscription) writeOutput("Unsubscribed successfully") } catch(any e){ writeOutput("Unable to unsubscribe") } </cfscript>
Get subscription name
To retrieve the name of a subscription, previously, created, use the function getSubscription. The function accepts the name of the topic and the subscription.
Example
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) topicName = "testTopic" // create a topic topic=sb.createTopic(topicName) subscriptionName = "sub-00" // get a subscription subscription = topic.getSubscription(subscriptionName) writeDump(subsciption) </cfscript>
Register message handler
The RegisterMessageHandler method registers handlers. The handler is invoked when the event happens.
For more information, see the official docs.
Example
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // void registerMessageHandler(String topicPath, String subscriptionName, Map metadata) // create topic // topic metadata createTopicMetadata = { "autoDeleteOnIdle" = "45M", "requiresDuplicateDetection" = "yes", //"maxSize" = 1024, "entityStatus" = "Active", "userMetadata" = "my user metadata", "enableBatchedOperations" = "true", "enablePartitioning" = "yes" } topic = sb.createTopic("Topic_013", createTopicMetadata) // create a subscription subscriptionName = "Sub_013" subscriptionAttribs = { "subscriptionName" = subscriptionName, "autoDeleteOnIdle" = "5M", "maxDeliveryCount" = 10, "lockDuration" = "10s", "defaultMessageTimeToLive" = "10M" } subscription = topic.subscribe(subscriptionAttribs) messageHandlerOptions={ "autocomplete"="yes", "maxautorenewduration"="50M", "maxconsurrentcalls"=30 } renewMessageResponse=sb.registerMessageHandler(topic,subscription,messageHandlerOptions) writeDump(renewMessageResponse) </cfscript>
Abandon message
Abandon subscription message
Discards all messages to a subscription.
Syntax
abandonSubscriptionMessage(String formattedSubscriptionPath, Map metadata)
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) topicName = "topic_003"; topic = topicUtil.createTopic(sb, topicName, false, {}) //purgeAndRecreate, attribs originalLabel = "Red" adandonCountLabel = "adandonCount" subscriptionName = "sub_003" subscription = topic.subscribe({ "subscriptionName" = subscriptionName }) updatedMessageProperties = { "DeadLetterReason" = "reason for this", "DeadLetterErrorDescription" = "collateral" } abandonResponse = sb.abandonSubscriptionMessage(subscription.getTopicName(), subscription.getSubscriptionName(), updatedMessageProperties) writeDump(abandonResponse) </cfscript>
Abandon message
Abandons a message with a lock token. This will make the message available again for processing. Abandoning a message will increase the delivery count on the message.
Syntax
abandonMessage(String queuePath, Map metadata)
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) qName = "cfqa-abandon-msg" // create queue queue = sb.createQueue(qName) oldLabel = "red" newLabel = "blue" sentMessage = sendMessage(queue, oldLabel) writeOutput("peeking at the queue: #qName#, before abandoning message.<br>") receivedMessage = sb.receiveMessagesFromQueue(queue, "PEEKLOCK", 1) //READ_MODE_RECEIVEANDDELETE for(message in receivedMessage){ //cfdump(var=#message#, label="message received.") abandonMessage(qName, message.lockToken, newLabel) } </cfscript>
Defer message
Defer message
When a queue or subscription client receives a message that it is willing to process, but for which processing is not currently possible due to special circumstances inside of the application, it has the option of "deferring" retrieval of the message to a later point. The message remains in the queue or subscription, but it is set aside.
Syntax
deferMessage(String queuePath, Map metadata)
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) qName="queue_002" // create queue queue = sb.createQueue(qName, {"lockDuration": "2s"}) // send message to the queue msgBatch = arrayNew(1) for(n=1; n <= msgCount; n++){ msgInstance = { "messageId" = "#n#", "messageBody" = "msg time stamp - #now()#" } msgBatch[n] = msgInstance } sb.sendMessageBatch(qName, msgBatch) receivedMessage = sb.receiveMessagesFromQueue(queue, "PEEKLOCK", 1) //READ_MODE_RECEIVEANDDELETE // defer message for(message in receivedMessage){ propertiesToModify = {"deferred_time:" : dateTimeFormat(now())} sb.deferMessage(qName, { "lockToken" = message.lockToken , "propertiesToModify" = propertiesToModify }) } </cfscript>
Receive deferred message
Deferred messages remain in the main queue along with all other active messages (unlike dead-letter messages that live in a subqueue), but they can no longer be received using the regular Receive/ReceiveAsync functions. Deferred messages can be discovered via message browsing if an application loses track of them.
Syntax
receiveDeferredMessage(String queuePath, Map metadata)
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) qName="queue_002" // create queue queue = sb.createQueue(qName, {"lockDuration": "2s"}) // send message to the queue msgBatch = arrayNew(1) for(n=1; n <= msgCount; n++){ msgInstance = { "messageId" = "#n#", "messageBody" = "msg time stamp - #now()#" } msgBatch[n] = msgInstance } sb.sendMessageBatch(qName, msgBatch) receivedMessage = sb.receiveMessagesFromQueue(queue, "PEEKLOCK", 1) //READ_MODE_RECEIVEANDDELETE // defer message for(message in receivedMessage){ propertiesToModify = {"deferred_time:" : dateTimeFormat(now())} sb.deferMessage(qName, { "lockToken" = message.lockToken , "propertiesToModify" = propertiesToModify }) } // receive deferred message receivedDeferredMessage = sb.receiveDeferredMessage(qName, { "sequenceNumber" : seqNo } ) writeDump(receivedDeferredMessage) </cfscript>
Dead letter operations
Azure Service Bus queues and topic subscriptions provide a secondary sub-queue, called a dead-letter queue. The dead-letter queue need not be explicitly created and cannot be deleted. The purpose of the dead-letter queue is to hold messages that cannot be delivered to any receiver, or messages that cannot be processed.
Dead letter message
Syntax
deadLetterMessage(String queuePath, Map metadata)
Dead letter subscription message
Syntax
deadLetterSubscriptionMessage(String formattedSubscriptionPath, Map metadata)
Rules
A Filter or a Subscription Rule specifies which messages that have been sent to that Topic must be received by a Topic Subscription. Whenever a Topic Subscription is created, a default Topic Subscription Rule gets created, which enables all the messages sent to that Topic to be received by it.
Add a subscription rule
Syntax
addSubscriptionRule(String topicPath, String subscriptionName, Map metadata)
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // create a topic topicName = "topic_001" topic = sb.createTopic(topicName) subscriptionName = "subscription_001" // subscribe to topic sb.subscribe( topicName, { "subscriptionName" = subscriptionName } ) // get rules filterRuleTrue = { "type" = "true", "name" = "trueFilter" } sb.addSubscriptionRule(topicName, subscriptonName, filterRuleTrue) </cfscript>
Remove a subscription rule
Syntax
removeSubscriptionRule(String topicPath, String subscriptionName, String ruleName)
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // create a topic topicName = "topic_001" topic = sb.createTopic(topicName) subscriptionName = "subscription_001" // subscribe to topic sb.subscribe( topicName, { "subscriptionName" = subscriptionName } ) // get rules filterRuleTrue = { "type" = "true", "name" = "trueFilter" } sb.addSubscriptionRule(topicName, subscriptonName, filterRuleTrue) removeRuleResponse=sb.removeSubscriptionRule(topicName, subscriptonName, filterRuleTrue) </cfscript>
Retrieve a subscription rule
Syntax
getSubscriptionRules(String formattedSubscriptionPath)
Schedule messages
You can schedule a message to a queue or topic. The message is enqueued into the bus to scheduled time message, the message is delivered to the receiver end. This is currently asynchronous process for better performance.
Schedule message to queue
Syntax
scheduleMessageToQueue(String queuePath, Map metadata)
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // create a queue myQueue=sb.createQueue("Q_006") schedMessage={ "messageId" = "001", "messageBody" = "message body", "ScheduledEnqueueTimeUtc"="02/02/2020" } scheduleResponse=sb.scheduleMessageToQueue(myQueue, schedMessage) writeDump(scheduleResponse) </cfscript>
Schedule message to topic
Syntax
scheduleMessageToTopic(String topicPath, Map metadata)
Cancel scheduled message to topic
Syntax
cancelScheduledTopicMessage(String topicPath, long sequenceNumber)
Cancel scheduled message to queue
Syntax
cancelScheduledQueueMessage(String queuePath, long sequenceNumber)
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) qName = "cancel-scheduled-1msg2" delayInSecs = 30 msgCount = 1 // create a queue queue = sb.createQueue(qName) if(!listFindNoCase(qUtil.getQueuesListStr(sb), qName)) writeOutput("queue #qName# NOT created. This is NOT expected.<br>") else writeOutput("queue #qName# created successfully.<br>") messageArrayToSend = getMessageArray(msgCount) seqNo = scheduler.scheduleMessage(sb, qName, messageArrayToSend[1], delayInSecs) writeOutput("cancelling message with sequence no: #seqNo#<br>") cancelMessageResponse = sb.cancelScheduledMessage(qName, seqNo) if(cancelMessageResponse[statusCode] NEQ 200) writeOutput("error cancelling msg for #qName#, seqno. #seqNo#<br>") </cfscript>
Message browsing
The Peek method enumerates all messages that exist in the queue or subscription log, in a sequence, from the lowest to the highest.
Peek message from queue
Syntax
peekMessageFromQueue(String queuePath, Struct metadata)
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) // create queue qName="queue-005" queue = sb.createQueue(qName) peekProperties = { "fromSequenceNumber" = 1, "messageCount" = 10 } peekResponse = sb.peekMessageFromQueue(queue.getPath(), peekProperties) receivedMsgArray = peekResponse.messages writeDump(receivedMsgArray) </cfscript>
Peek messages from subscription
Syntax
peekMessageFromSubscription(String topicPath, String subscriptionName, Struct metadata)
<cfscript> sb = getcloudService(this.serviceBusCred, this.serviceBusConf) qName = "q-msg-Peek-x4" msgCount = 3 msgLockDuration = 2 // create a queue queue=sb.createQueue(qName,{ "lockDuration"=msgLockDuration, "maxDeliveryCount"=msgCount }) // set peek properties peekProps = { "fromSequenceNumber" = 1, "messageCount" = msgCount } peekResponse = sb.peekMessage(queue.getPath(), peekProps) // array of size msgCount - 1 if(peekResponse[statusCode] == 200){ receivedMessages = peekResponse.messages writeOutput("received message deserialized:<br>") } else writeOutput("message browsing failed") </cfscript>