Navigate to <CF_HOME>/cfusion/bin.
Pub/Sub is a publish/subscribe (Pub/Sub) service, a messaging platform that separates message senders and recipients. A Pub/Sub service has several important entities, including:
- Message: the data that moves through the service.
- Topic: a named entity that represents a feed of messages.
- Subscription: a named entity that represents an interest in receiving messages on a particular topic.
- Publisher (also called a producer): creates messages and sends (publishes) them to the messaging service on a specified topic.
- Subscriber (also called a consumer): receives messages on a specified subscription.
For more information, see GCP Pub/Sub.
-
-
Enter the command:
- Windows: cfpm.bat
- Non-Windows: ./cfpm.sh
-
Enter the command, install gcppubsub.
Wait for the package to get installed.
Install the package in a non-interactive mode
Open the command prompt, navigate to the coldFusion/cfusion/bin folder, and enter, cfpm install gcppubsub
For more information, see ColdFusion Package Manager.
Get credentials to access GCP PubSub
When you interact with GCP, you specify your GCP security credentials to verify your credentials and check whether you have permission to access the resources that you are requesting.
GCP uses the security credentials to authenticate and authorize your requests.
For more information, see GCP API Keys Overview.
Authentication for GCP services
To access GCP services, create a service account using Google cloud console (you may alternatively use gcloud CLI, REST calls or do it programmatically). Such accounts are managed by Google Cloud's Identity and Access Management (IAM).
For more information, see Authentication for GCP Services.
Add cloud service credentials and configuration
Use the method getCloudService() that gives you a handle to create objects for accessing various cloud services.
The syntax of the service handle is as follows:
service=getCloudService(struct cloudCred,struct cloudConfig), where:
- cloudCred: Defines the credentials for the cloud service. It could either be a struct or a string (also known as credential alias).
- cloudConfig: Defines the cloud service configuration details. It could either be a struct or a string (also known as config alias).
Add the configuration via Data & Services > Cloud Configuration.
Once you’ve created the credentials and configuration, on the cfm file, you can define the credentials and configuration as pubsubclient=getCloudService(cred, conf).
Add
add cloudconfiguration alias=pubsubdemo servicename=pubsub
Set
set cloudconfiguration pubsubdemo elementCountThreshold=102 requestByteThreshold=2048 delayThreshold=10m
The list below mentions the other parameters:
- publisher.endpoint
- enableMessageOrdering
- enableCompression
- compressionBytesThreshold
- enableBatching
- delayThreshold
- elementCountThreshold
- requestByteThreshold
- publisher.limitExceededBehavior
- publisher.maxOutstandingElementCount
- publisher.maxOutstandingRequestBytes
- publisher.threadCount
- publisher.threadNamePrefix
- publisher.initialRetryDelay
- publisher.maxRetryDelay
- publisher.retryDelayMultiplier
- publisher.initialRpcTimeout
- publisher.maxRpcTimeout
- publisher.rpcTimeoutMultiplier
- publisher.totalTimeout
- publisher.maxAttempts
- subscriber.endpoint
- maxAckExtensionPeriod
- minDurationPerAckExtensionPeriod
- maxDurationPerAckExtensionPeriod
- parallelPullCount
- useLegacyFlowControl
- subscriber.limitExceededBehavior
- subscriber.maxOutstandingElementCount
- subscriber.maxOutstandingRequestBytes
- subscriber.threadCount
- subscriber.threadNamePrefix
- system.threadCount
- system.threadNamePrefix
Usage Example,
set cloudconfiguration pubsubdemo publisher.endpoint=pubsub.googleapis.com subscriber.endpoint=us-east1-pubsub.googleapis.com:443 enableMessageOrdering=false enableCompression=False compressionBytesThreshold=1 delayThreshold='2m'
Show
- Cloud credential: show cloudcredential <cred_name>
- Cloud configuration: show cloudconfiguration <conf_name>
Delete
- Cloud credential: delete cloudcredential <cred_name>
- Cloud configuration: delete cloudconfiguration <conf_name>
All GCP resource names are case-sensitive.
Guidelines for naming a topic, subscription, schema, or snapshot
To see the guidelines on naming, see this doc.
Message format of a topic in PubSub
Message format to be sent to a GCP Pubsub topic.
{ "data": string/struct/CFC/query/array/binary data "attributes": { string: string, ... }, "messageId": string, "publishTime": string, "orderingKey": string }
Message format that you receive from GCP PubSub service when you pull messages from a subscription using the pullMessages method:
Message = { MessageID: "", Data: “Hello World!”, Attributes: { year: “2020”, author: “Martin Kleppman” }, // Attribute’s key-value pairs must be only strings. Other data types are not supported. OrderingKey: "", PublishTime: "", AttributesCount: 2, AckId: "", DeliveryAttempts: 1 }
Message format that you receive in the onSuccess message handler while receiving messages asynchronously using a registed message handler:
Message = { MessageID: "", Data: “Hello World!”, Attributes: { year: “2020”, author: “Martin Kleppmann” }, // Attribute’s key-value pairs must be only strings. Other data types are not supported. OrderingKey: "", PublishTime: "", AttributesCount: 2 }
Create a topic with attributes
Syntax
createTopic(topicName [, structAttributes])
Parameters
The topic attributes struct has the following key-value pairs:
Parameter | Description | Required |
---|---|---|
topicName | Name of the topic to be created. | Yes |
messageRetentionDuration |
Specifies how long the Pub/Sub topic retains messages after publication. After the message retention duration is over, Pub/Sub might discard the message regardless of its acknowledgment state. The acceptable range of values of a duration of a message in a topic is 10 minutes to 31 days. The duration parameters accepted are:
|
Yes |
schemaSettings |
Struct of the following:
|
Yes |
labels |
Topic label as key-value pairs. |
Yes |
messageStoragePolicy |
Specifies which regions your messages are stored in, you can configure a message storage policy on your topic. |
Yes |
kmsKeyName |
Specifies if the topic is encrypted with a CMEK(customer-managed encryption key). Pub/Sub encrypts messages with Google-managed keys by default. For more information, see this doc. |
Yes |
Returns
Topic id.
For Example,
<cfscript> cred = { projectId : "<your project id>", credentialJsonFilePath : "<path to your credentials json file>" } conf = { serviceName : "pubsub" } pubsubclient=getCloudService(cred, conf) topicName = "mytopic" topicAttributes = { messageRetentionDuration : "600s", schemaSettings : { schemaName : "my-schema", schemaEncoding : "JSON" // or "BINARY" }, labels : { "shape" : "circle", "color" : "green" }, messageStoragePolicy: ["asia-northeast1"], kmsKeyName = "projects/my-gcp-project/locations/asia-northeast1/keyRings/customKMSKey/cryptoKeys/customManagedKey" } Topic = pubsubClient.createTopic(topicName, topicAttributes) </cfscript>
List topics
Syntax
pubSubClient.listTopics(listAttributesStruct)
Returns the number of topics. By default, 100 topics are returned. The return type is a list.
Parameters
Parameter | Description | Required |
pageSize | The maximum number of subscriptions to return. | No |
nextPageToken | The value to return the next page of subsciptions. | No |
For next page in the list of topics, call listTopics passing the nextPageToken value in the attributes struct.
For Example,
<cfscript> cred = { projectId : "<your project id>", credentialJsonFilePath : "<path to your credentials json file>" } conf = { serviceName : "pubsub" } Pubsubclient = getCloudService(cred, conf) topics = pubsubclient.listTopics({pageSize: 100}) Writedump(topics) </cfscript>
Delete a topic
Syntax
deleteTopic(topicName)
Parameters
Parameter |
Description |
topicName |
Name of the topic to be deleted. |
For Example,
<cfscript> cred = { projectId : "<your project id>", credentialJsonFilePath : "<path to your credentials json file>" } conf = { serviceName : "pubsub" } pubsubclient=getCloudService(cred, conf) topicName="mytopic" pubsubclient.createTopic(topicName) pubsubclient.deleteTopic(topicName) </cfscript>
List topic subscriptions
Syntax
listTopicSubscriptions(topicName)
Parameters
Parameter |
Description |
topicName |
Name of the topic that has subscriptions. |
For Example,
<cfscript> cred = { projectId : "<your project id>", credentialJsonFilePath : "<path to your credentials json file>" } conf = { serviceName : "pubsub" } pubsubclient=getCloudService(cred, conf) topicName="mytopic" subName1="subscription_01" subName2="subscription_02" subscriptionMetadata1 = { "subscriptionName" : subName1 } subscriptionMetadata2 = { "subscriptionName" : subName2 } topic=pubsubclient.createTopic(topicName) // add a subscription subscription1=topic.subscribe(subscriptionMetadata1) subscription2=topic.subscribe(subscriptionMetadata2) subscriptions = topic.listSubscriptions() writeDump(subscriptions) </cfscript>
Update a topic
Syntax
updateTopic(topicName, topicAttributes)
For Example,
<cfscript> cred = { projectId : "<your project id>", credentialJsonFilePath : "<path to your credentials json file>" } conf = { serviceName : "pubsub" } pubsubclient=getCloudService(cred, conf) topicName = "mytopic" topicAttributes= { labels : { shape : "square", color : "blue" }, messageStoragePolicy: ["asia-east1-a", "europe-west1-c"], messageRetentionDuration: "600s", schemaSettings : { schemaName : "testSchema", schemaEncoding : "Binary" } } topic = pubsubclient.createTopic(topicName, topicAttributes) updatedTopicAttributes = { labels : { size : "10cm", }, messageRetentionDuration: "300s" } pubsubclient.updateTopic(topicName, updatedTopicAttributes) </cfscript>
Topic object methods
- Struct publish(Struct messageMetadata)
- Subscription subscribe(String subscriptionName)
- Subscription subscribe(Struct subscriptionMetadata)
- Subscription getSubscription(String subscriptionName)
- Subscription updateSubscription(Struct subscriptionMetadata)
- Struct detachSubscription(String subscriptionName)
- Struct deleteSubscription(String subscriptionName)
- Struct listSubscriptionsS)
- Struct listSubscriptions(Struct listMetadata)S
- Struct listSnapshots()
- Struct listSnapshots(Struct listMetadata)
- String getId()
- String getName()
- void setKmsKeyName(String keyName)
- void setMessageRetentionDuration(String duration)
- void setMessageStoragePolicy(Array messageStoragePolicy)
- void setLabels(Struct labels)
- void addLabels(Struct labels)
- String getKmsKeyName()
- String getMessageRetentionDuration()
- Array getMessageStoragePolicy()
- Struct getLabels()
- boolean containsLabels(String key)
- Struct getSchemaSettings()
- void delete()
- Struct getIAMPolicy(Struct policyMap)
- Struct setIAMPolicy(Struct policyMap)
- Struct addIAMPolicy(Struct policyMap)
- Struct testIAMPermissions(Struct permissionsMap)
- Struct publishAllOutstanding()
- Struct resumePublish(String orderingKey)
Subscription management
Detach a subscription
Description
When the user detaches a subscription, the subscription is no longer allowed to read any data from the topic, and all the stored messages on the subscription, unacknowledged and acknowledged, are dropped.
Syntax
detachSubscription(subscriptionName)
Returns
A struct of subscriptions.
Parameters
Parameter |
Description |
Required |
subscriptionName |
Name of the subscription to be detached from a topic. The subscription does not get deleted. It can be attached to another topic or re-attached to the original topic. |
Yes |
For Example,
<cfscript> cred = { projectId : "<your project id>", credentialJsonFilePath : "<path to your credentials json file>" } conf = { serviceName : "pubsub" } pubsubclient=getCloudService(cred, conf) topicName = "myTopic" subName="mySubscription" subscriptionAttributes= { "subscriptionName" : subName } topic = pubsubclient.createTopic(topicName) subscription=topic.subscribe(subscriptionMetadata) topic.detachSubscription(subName) </cfscript>
Update a subscription
Syntax
Pubsubclient.updateSubscription(subscriptionAttributesStruct)
Subscription.updatesubscription(subscriptionAttributesStruct)
All PubSub entity names are case-sensitive.
Parameters
Parameter |
Description |
Required |
topicName | The name of the topic. | Yes |
SubscriptionName |
Subscription ID |
Yes |
deliveryConfig |
Struct of the following:
|
Yes |
retryPolicy |
Struct of the following:
|
Yes |
deadLetterPolicy |
Struct of the following:
|
Yes |
labels |
Struct containing- author, year, env. |
Yes |
filter |
Filter attributes of the topic. |
Yes |
For Example,
<cfscript> cred = { projectId : "<your project id>", credentialJsonFilePath : "<path to your credentials json file>" } conf = { serviceName : "pubsub" } pubsubclient=getCloudService(cred, conf) topicName = "myTopic" subName= "mySubscription" subscriptionDetails = { subscriptionName : subName, topicName : topicName, AckDeadlineSeconds : "600s", enableExactlyOnceDelivery : false, enableMessageOrdering : false, retainAckedMessages : false, MessageRetentionDuration = "700s" filter = "attributes.color='green" } subscription= pubsubclient.subscribe(subscriptionDetails) label="listProjectSubscriptions") newAttributes = { subscriptionName : subName, enableExactlyOnceDelivery : true, AckDeadlineSeconds : "400s", enableMessageOrdering : true, retainAckedMessages : true, MessageRetentionDuration : "120s" filter = "attributes.color='red" } pubsubClient.updateSubscription(newAttributes) </cfscript>
Delete a subscription
Syntax
deleteSubscription(subscriptionName)
Parameters
Parameter |
Description |
Required |
subscriptionName |
Name of the subscription to be deleted from a topic. |
Yes |
For Example,
<cfscript> cred = { projectId : "<your project id>", credentialJsonFilePath : "<path to your credentials json file>" } conf = { serviceName : "pubsub" } pubsubclient=getCloudService(cred, conf) topicName = "myTopic" subName="mySubscription" subscriptionAttributes = { "subscriptionName" : subName } topic = pubsubclient.createTopic(topicName) topic.subscribe(subscriptionAttributes) topic.deleteSubscription(subName) </cfscript>
<cfscript> cred = { projectId : "<your project id>", credentialJsonFilePath : "<path to your credentials json file>" } conf = { serviceName : "pubsub" } pubsubclient=getCloudService(cred, conf) topicName="myTopic" topic= pubsubClient.createTopic(topicName) message = { data: "hello world!" } subName = "mysubscription" subscriptionAttributes= { "subscriptionName" : subName, "topicName" : topicName } topic.subscribe(subscriptionAttributes) cffuture=topicObj.publish(message) MessageID = Cffuture.get() writeOutput("Message with ID #MessageID# published") </cfscript>
Examples of message publishing
Publish a message to topic Asynchronously
messageMetadata = { data: "Hello World 12345!", attributes: { author: "Martin Kleppmann", age: 33, } } publishedFuture = topic.publish(messageMetadata) WriteDump(publishedFuture.get()) // It should return the published message id.
Publish a message to topic with Avro encoding
MyCFC cfcObj = new MyCFC() messageMetadata = { data: serializeAVRO(cfcObj, schema), attributes: { "attribute1" : "value1", "attribute2" : "value2" } } publishedFuture = topic.publish(messageMetadata) WriteDump(publishedFuture.get()) // It should return the published message id.
Publish a message to topic with Protobuf encoding
MyCFC cfcObj = new MyCFC() messageMetadata = { data: serializeProtoBuf(cfcObj, schema), attributes: { "attribute1" : "value1", "attribute2" : "value2" } } publishedFuture = topic.publish(messageMetadata) WriteDump(publishedFuture.get()) // It should return the published message id.
Publish a message to topic with Batch settings
<cfscript> gcpCred = { vendorName: "GCP", alias: "gcp_cred_alias" // This alias must be defined in the CF Administrator. } pubsubConfig = { alias: "PubSubOne", serviceName: "PUBSUB", publisherSettings: { batchingSettings: { enable: true, // default: false elementCountThreshold : 1, // default: 100 messages requestByteThreshold : 50, // default: 1000 bytes delayThreshold : "100m" // default: 1 milliseconds } } }; pubsub = getCloudService(gcpCred, pubsubConf) myTopic = pubsub.getTopic("myTopic") messageMetadata = { "data" : "Hello World!!" } publishedFuture = topic.publish(messageMetadata) WriteDump(publishedFuture.get()) // It should return the published message id. </cfscript>
Publish a message to a topic with Concurrency control
<cfscript> gcpCred = { vendorName:"GCP", alias: "gcp_cred_alias" // This alias must be defined in the CF Administrator. } pubsubConfig = { alias: "PubSubOne", serviceName: "PUBSUB", publisherSettings: { executorProvider: { threadCount: 4, threadNamePrefix: "cf-pubsub-thread" } } } pubsub = getCloudService(gcpCred, pubsubConf) topic = pubsub.getTopic("myTopic") messageMetadata = { "data" : "Hello" } publishedFuture = topic.publish(messageMetadata) WriteDump(publishedFuture.get()) // It should return the published message id. </cfscript>
Publish a message to topic with Custom attributes
<cfscript> gcpCred = { vendorName: "GCP", alias: "gcp_cred_alias" // This alias must be defined in the CF Administrator. } pubsubConf = { alias: "pubsub_conf_alias", serviceName: "PUBSUB" } pubsub = cloudService(gcpCred, pubsubConf) topic = pubsub.getTopic("myTopic") messageMetadata = { message: { data: "Hello World 12345!", attributes: { author : "Martin Fowler", year : 2014 } } } publishedFuture = topic.publish(messageMetadata) WriteDump(publishedFuture.get()) // It should return the published message id. </cfscript>
Publish a message to topic with Flow Control Settings
<cfscript> gcpCred = { vendorName:"GCP", alias: "gcp_cred_alias" // This alias must be defined in the CF Administrator. } pubsubConfig = { alias: "PubSubOne", serviceName: "PUBSUB", publisherSettings: { batchingSettings: { enable: true, elementCountThreshold: 1, // default: 100 messages requestByteThreshold: 50, // default: 1000 bytes delayThreshold: "100m", // default: 1 milliseconds flowControlSettings: { maxOutstandingRequestBytes : 10240, maxOutstandingElementCount : 100, limitExceededBehavior : "Block" // Supported values: Block, Ignore, ThrowException } } } } pubsub = getCloudService(gcpCred,pubsubConf) topic = pubsub.getTopic("myTopic") messageMetadata = { message: { data : "Hello World 12345!", attributes : { author : "Martin Fowler", year : 2014 } } } publishedFuture = topic.publish(messageMetadata) WriteDump(publishedFuture.get()) // It should return the published message id. </cfscript>
Publish a message to topic with Ordering Keys
<cfscript> gcpCred = { vendorName:"GCP", alias: "gcp_cred_alias" // This alias must be defined in the CF Administrator. } pubsubConfig = { alias: "PubSubOne", serviceName: "PUBSUB", publisherSettings: { endpoint: "asia-northeast1-pubsub.googleapis.com:443", enableMessageOrdering: "true" } } pubsub = getCloudService(gcpCred, pubsubConf) topic = pubsub.getTopic("myTopic") messageMetadata = { message: { data: "Hello World 12345!", orderingKey: "NamingFilter" } } publishedFuture = topic.publish(messageMetadata) WriteDump(publishedFuture.get()) // It should return the published message id. </cfscript>
Publish a message to topic with Retry settings
<cfscript> gcpCred = { vendorName:"GCP", alias: "gcp_cred_alias" // This alias must be defined in the CF Administrator. } pubsubConfig = { alias: "PubSubOne", serviceName: "PUBSUB", publisherSettings: { retrySettings: { initialRetryDelay. : "100m", // default: 100 ms initialRpcTimeout : "5s", // default: 5 seconds maxAttempts : 5, // default: 0 maxRetryDelay : "60s", // default : 60 seconds maxRpcTimeout : "10M", // default: 600 seconds maxDelayMultiplier : 2, // back off for repeated failures, default: 1.3 rpcTimeoutMultiplier : 1, // default: 1.0 totalTimeout : "10M" // default: 600 seconds } } } pubsub = getCloudService(gcpCred,pubsubConfing) topic = pubsub.getTopic("myTopic") messageMetadata = { "messages" : [{ data : "Hello World!" }] } publishedFuture = topic.publish(messageMetadata) WriteDump(publishedFuture.get()) // It should return the published message id. </cfscript>
Publish all outstanding messages
Syntax
publishAllOutstanding(topicName)
Parameters
Parameter | Description | Required |
topicName |
The name of the topic. | Yes |
Example
<cfscript> cred = { projectId : “<your project id>“, credentialJsonFilePath : “<path to your credentials json file>” } conf = { serviceName : “pubsub” } pubsubclient = getCloudService(cred, conf) response = pubsubclient.publishAllOutstanding(“myTopic”) WriteDump(response) </cfscript>
Subscription object methods
- String getId()
- String getName()
- String getTopicId()
- String getTopicName()
- int getAckDeadlineSeconds()
- Struct getLabels()
- String getMessageRetentionDuration()
- boolean isEnableExactlyOnceDelivery()
- boolean isEnableMessageOrdering()
- boolean isRetainAckedMessages()
- boolean isDetached()
- boolean containsLabels(String key)
- String getFilter()
- Struct getExpirationPolicy()
- Struct getRetryPolicy()
- Struct getDeadLetterPolicy()
- Struct getPushConfig()
- Struct getBigQueryConfig()
- void setAckDeadlineSeconds(long seconds)
- void setLabels(Struct labels)
- void addLabels(Struct labels)
- void setMessageRetentionDuration(String duration)
- void setEnableExactlyOnceDelivery(boolean enableExactlyOnceDelivery)
- void setRetainAckedMessages(boolean retainAckedMessages)
- Struct setDetached(boolean detached)
- void setExpirationPolicy(Struct expirationPolicy)
- void setRetryPolicy(Struct retryPolicy)
- void setDeadLetterPolicy(Struct deadLetterPolicy)
- void setDeliveryConfig(Struct deliveryConfig)
- void registerMessageHandler(Struct messageHandlerMetadata)
- Struct pullMessages(Struct pullMessageMetadata)
- Struct acknowledgeMessages(Struct acknowledgeMetadata)
- Struct modifyAckDeadline(Struct ackDeadlineMetadata)
- Struct seekMessages(Struct seekMessageMetadata)
- Struct getIAMPolicy(Struct policyStruct)
- Struct setIAMPolicy(Struct policyStruct)
- Struct addIAMPolicy(Struct policyStruct)
- Struct testIAMPermissions(Struct permissionsStruct)
- Struct delete()
<cfscript> cred = { projectId : "<your project id>", credentialJsonFilePath : "<path to your credentials json file>" } conf = { serviceName : "pubsub" } pubsubclient=getCloudService(cred, conf) topicName = "myTopic" topic = pubsubClient.createTopic(topicName) subName="mysubscription" subscriptionMetadata = { "subscriptionName" : subName } // create the subscription topic.subscribe(subscriptionMetadata) // create the snapshot snapshotName="snshot-01" snapshotAttributes = { snapshotName: snapshotName, subscriptionName: subName, labels : { name : "wrench", mass : "13kg", createdtime : dateTimeFormat(now(), "hh-nn_mm-dd-yyyy"), emptykey : "" } } snapshotResponse=pubsubClient.createSnapshot(snapshotAttributes) </cfscript>
Get a snapshot
Syntax
getSnapshot(snapshotName)
Parameters
Parameter |
Description |
Required |
snapshotName |
Name of the snapshot whose details are to be retrieved. |
Yes |
For Example,
<cfscript> cred = { projectId : "my-gcp-project", credentialJsonFilePath : "path of creds json file" }; conf = { serviceName : "pubsub" }; pubsubclient=getCloudService(cred, conf) topicName = "topic-ss-02" // create the topic topic = pubsubClient.createTopic(topicName) subName="sub-name-ss-02" subscriptionMetadata = { "subscriptionName" : subName } // create the subscription topic.subscribe(subscriptionMetadata) // create the snapshot snapshotName="snshot-02" snapshotAttributes = { snapshotName: snapshotName, subscriptionName: subName, labels : { name : "wrench", mass : "13kg", createdtime : dateTimeFormat(now(), "hh-nn_mm-dd-yyyy"), emptykey : "" } } // create a snapshot pubsubClient.createSnapshot(snapshotAttributes) // get the snapshot getResponse=pubsubClient.getSnapshot(snapshotName) writeDump(getResponse) </cfscript>
Update a snapshot
Syntax
updateSnapshot(updatedSnapshotAttributes)
For Example,
<cfscript> cred = { projectId : "my-gcp-project", credentialJsonFilePath : "C:\GCP-keys\my-gcp-project-ab38bee5f1e0.json" }; conf = { serviceName : "pubsub" }; pubsubclient=getCloudService(cred, conf) topicName="topic-update-ss01" // create the topic topic = pubsubClient.createTopic(topicName) subName="subupdate-ss01" subscriptionMetadata = { "subscriptionName" : subName } // create the subscription topic.subscribe(subscriptionMetadata) // create the snapshot snapshotName="snshot-ss-01" snapshotAttributes = { snapshotName: snapshotName, subscriptionName: subName, labels : { name : "wrench", mass : "13kg", createdtime : dateTimeFormat(now(), "hh-nn_mm-dd-yyyy"), emptykey : "" } } // create a snapshot pubsubClient.createSnapshot(snapshotAttributes) updatedSnaphotAttributes={ snapshotName: snapshotName, subscriptionName: subName, labels : { name : "shovel", mass : "5kg", createdtime : dateTimeFormat(now(), "hh-nn_mm-dd-yyyy"), emptykey : "" } } // update the snapshot pubsubClient.updateSnapshot(updatedSnaphotAttributes) writeOutput("Snapshot " & snapshotName & " updated succcessfully") </cfscript>
List snapshots
Syntax
listSnapshots()
Parameters
Parameter |
Description |
nextPageToken |
Retrieve the next page of results and perform the same request. |
For Example,
<cfscript> cred = { projectId : "my-gcp-project", credentialJsonFilePath : "C:\GCP-keys\my-gcp-project-ab38bee5f1e0.json" } conf = { serviceName : "pubsub" } pubsubclient=getCloudService(cred, conf) listResponse=pubsubclient.listSnapshots() writeDump(listResponse) </cfscript>
Delete a snapshot
Syntax
deleteSnapshot(snapshotName)
Parameters
Parameter |
Description |
snapshotName |
Name of the snapshot to be deleted. |
For Example,
<cfscript> cred = { projectId : "my-gcp-project", credentialJsonFilePath : "C:\GCP-keys\my-gcp-project-ab38bee5f1e0.json" }; conf = { serviceName : "pubsub" }; pubsubclient=getCloudService(cred, conf) topicName = "topic-ss-delete" // create the topic topic = pubsubClient.createTopic(topicName) subName="sub-name-ss-delete" subscriptionMetadata = { "subscriptionName" : subName } // create the subscription topic.subscribe(subscriptionMetadata) // create the snapshot snapshotName="snshot-delete" snapshotAttributes = { snapshotName: snapshotName, subscriptionName: subName, labels : { name : "wrench", mass : "13kg", createdtime : dateTimeFormat(now(), "hh-nn_mm-dd-yyyy"), emptykey : "" } } // create a snapshot pubsubClient.createSnapshot(snapshotAttributes) // delete the snapshot pubsubClient.deleteSnapshot(snapshotName) writeOutput("Snapshot deleted successfully") </cfscript>
Seek Messages
Once you've acknowledged a message, Pub/Sub cannot recover it. But, occasionally, you could feel the need to replay the acknowledged messages, for instance, if you acknowledged something incorrectly. Then you may utilize the Seek feature to mark previously accepted communications as unacknowledged and cause Pub/Sub to redeliver those messages. By setting their status to acknowledged, you can also utilize seek to delete unacknowledged messages.
You can only seek messages either using snapshotName or timestamp. You cannot use both.
Syntax
seekMessages(requestParameters)
Parameters
Parameter |
Description |
Required |
snapshotName |
Name of the snapshot. |
Yes |
subscriptionName |
Name of the subscription. |
Yes |
timeStamp |
Timestamp of the snapshot. |
No |
For Example,
<cfscript> cred = { projectId : "adbe-gcp0318", credentialJsonFilePath : "C:\GCP-keys\adbe-gcp0318-ab38bee5f1e0.json" }; conf = { serviceName : "pubsub" }; pubSubClient=getCloudService(cred,conf) // set names topicName = "topic-ss-seek" subName = "sub_ss"; snapshotName="snapshot_ss" // create a topic topic = pubsubClient.createTopic(topicName) subscriptionMetadata = { "subscriptionName" : "#subName#", "topicName" : "#topicName#" } // subscribe to the topic subscription = topic.subscribe(subscriptionMetadata) // create a snapshot snapshot = pubsubClient.createSnapshot({ snapshotName: snapshotName, subscriptionName: subName }) writeOutput("seeking to snapshot.. replaying msgs.<hr>") snapshotID=snapshot.getID() seekAttributes = { subName, snapshotID } seekResponse = pubsubClient.seekMessages(seekAttributes) writeDump(seekAttributes) </cfscript>
Snapshot object methods
- String getId()
- String getName()
- String getTopic()
- OleDateTime getExpireTime()
- boolean hasExpireTime()
- boolean containsLabels(String key)
- Struct getLabels()
- void setLabels(Struct labels)
- void addLabels(Struct labels)
- Struct delete()
- Struct getIAMPolicy(Struct policyStruct)
- Struct setIAMPolicy(Struct policyStruct)
- Struct addIAMPolicy(Struct policyStruct)
- Struct testIAMPermissions(Struct permissionsStruct)
Security APIs
GCP PubSub uses Identity and Access Management to control access. You can enforce access control on both project and resource levels. For more information, see Access control.
Add IAM policy
Syntax
setIAMolicy(subscriptionPolicyMetadata)
Parameters
Parameter |
Description |
resourceName |
Name of the subscription. |
resourceType |
Valid values are:
|
bindings |
Specify the role and email of the members who can add an IAM policy. Valid values for role are:
|
For Example,
<cfscript> cred = { projectId : "my-gcp-project", credentialJsonFilePath : "C:\GCP-keys\my-gcp-project-ab38bee5f1e0.json" }; conf = { serviceName : "pubsub" }; pubsubclient=getCloudService(cred, conf) topicName="topic-add-iam" // create the topic topic = pubsubClient.createTopic(topicName) // create the subscription subName="sub-add-iam" subscriptionMetadata = { "subscriptionName" : subName } topic.subscribe(subscriptionMetadata) // add IAM Policy addIamPolicy = { resourceName : subscriptionName, resourceType : "Subscription", bindings : { role : "roles/viewer", members : ["user:user@example.com"] } } pubsubClient.addIAMPolicy(addIamPolicy) writeOutput("IAM policy added successfully") </cfscript>
Set IAM Policy
Syntax
setIAMPolicy(policyStruct)
Parameters
Parameter | Description | Required |
subName | Name of the subscription for which IAM policy needs to be set. |
Yes |
Example
<cfscript> this.gcpCred = { "vendorName":"GCP", "alias": "gcp_cred_alias", "projectid": "{ProjectID}" } this.pubsubConf = { "alias":"pubsub_conf_alias", "serviceName" : "PUBSUB", } pubsub = cloudService(this.gcpCred, this.pubsubConf) // Create new role > members binding and update the topic policy topicPolicyMetadata = { "resource" : "topicName", "policy" : "oldPolicy", "bindings" : { "role" : "roles/pubsub.editor", "members" : "allUsers", "conditions" : "" } } // set the IAM policy pubsub.setIAMPolicy(topicPolicyMetadata) // retrieve the IAP policy pubsub.getIAMPolicy("topicName") </cfscript>
GET IAM Policy
Syntax
getIAMPolicy(policyStruct)
Parameters
Parameter | Description | Required |
resourceName | The name of the subscription. | Yes |
resourceType | Topic or subscription. | Yes |
policyOptions | Struct of options | Yes |
Example
<cscript> cred = { projectId : "<your project id>", credentialJsonFilePath : "<path to your credentials json file>" } conf = { serviceName : "pubsub" } pubsubclient=getCloudService(cred, conf) topicName = "topic-add-policy" subscriptionName = "sub-#topicName#" topic = pubsubClient.createTopic(topicName) subscriptionAttributes = { subscriptionName : "#subscriptionName#", topicName : "#topicName#" } subscription = topic.subscribe(subscriptionAttributes) policyMapV1 = { version = "1", resourceName : subscriptionName, resourceType : "Subscription", binding : [{ ROLE : "roles/editor", MEMBERS : ["user:pn@adb.com"] }, { ROLE : "roles/owner", MEMBERS : ["user:pn@adb.com"] }, { ROLE : "roles/viewer", MEMBERS : ["user:mj@adb.com","user:pn@adb.com"] }] } setIAMPolicy = pubsubClient.setIAMPolicy(policyMapV1) cfdump(var=#setIAMPolicy#, label="set policy") iAMPolicy = pubsubClient.getIAMPolicy({ resourceName : subscriptionName, resourceType : "Subscription", policyOptions : { requestedPolicyVersion : policyMapV1[VERSION] } }) cfdump(var=#iAMPolicy#, label="get policy") </cfscript>
Test IAM Permissions
Syntax
testIAMPermissions(parameterStruct)
Parameters
Parameter | Description | Required |
resourceName | The name of the subscription. | Yes |
resourceType | Topic or subscription. | Yes |
permissions | The set of permissions to check for the resource. |
Yes |
Example
<cfscript> cred = { projectId : "<your project id>", credentialJsonFilePath : "<path to your credentials json file>" } conf = { serviceName : "pubsub" } pubsubclient=getCloudService(cred, conf) topicName = "topic-add-policy" subscriptionName = "sub-#topicName#" topic = pubsubClient.createTopic(topicName) subscriptionAttributes = { subscriptionName : "#subscriptionName#", topicName : "#topicName#" } subscription = topic.subscribe(subscriptionAttributes) policyMapV1 = { version = "1", resourceName : subscriptionName, resourceType : "Subscription", binding : [{ ROLE : "roles/editor", MEMBERS : ["user:pn@example.com"] }, { ROLE : "roles/owner", MEMBERS : ["user:pn@example.com"] }, { ROLE : "roles/viewer", MEMBERS : ["user:mj@example.com","user:pn@example.com"] }] } setIAMPolicy = pubsubClient.setIAMPolicy(policyMapV1) cfdump(var=#setIAMPolicy#, label="set policy") iAMPolicy = pubsubClient.getIAMPolicy({ resourceName : subscriptionName, resourceType : "Subscription", policyOptions : { requestedPolicyVersion : policyMapV1[VERSION] } }) cfdump(var=#iAMPolicy#, label="get policy") addIamPolicy = { resourceName : "#subName#",, resourceType : "Subscription", bindings : { role : "roles/editor", members : ["user:mj@adb.com"] } } iAMPolicy = pubsubClient.addIAMPolicy(addIamPolicy); cfdump(var=#iAMPolicy#, label="add policy") testIAMPermissions = { resourceName : "#subName#", resourceType : "Subscription", permissions : ["pubsub.topics.attachSubscription", "pubsub.topics.publish", "pubsub.topics.update"] } permissions = pubsubClient.testIAMPermissions(testIAMPermissions); WriteDump(permissions); </cfscript>
Pull messages
Syntax
pullMessages(structMetadata)
Parameters
Parameter | Description | Required |
subscriptionName |
Name of the subscription. | Yes |
maxMessages |
The maximum amount of messages to pull. | Yes |
returnImmediately |
Returns te messages immediately even if the subscription doesn't contain enough messages to satisfy maxMessages. |
No |
Example
<cfscript> cred = { projectId : "<your project id>", credentialJsonFilePath : "<path to your credentials json file>" } conf = { serviceName : "pubsub" } pubsubclient = getCloudService(cred, conf) topic = pubsubclient.getTopic("myTopic") subscription = topic.getSubscription("mySubscription”) pullMessageMetadata = { maxMessages: 100 } pullMsgResponse = subscription.pullMessages(pullMessageMetadata) for(message in pullMsgResponse.messages) { WriteOutput(message.ackId) WriteOutput(message.messageId) WriteOutput(message.data) WriteOutput(message.publishTime) </cfscript>
Acknowledge messages
Syntax
acknowledgeMessages(acknowledgeMetadata)
Parameters
Parameter | Description | Required |
subscriptionName |
The name of the subscription. | Yes |
ackIds |
Array of acknowledgement Ids. |
Yes |
Example
<cfscript> cred = { projectId : "<your project id>", credentialJsonFilePath : "<path to your credentials json file>" } conf = { serviceName : "pubsub" } pubsubclient = getCloudService(cred, conf) topic = pubsubclient.getTopic("myTopic") subscription = topic.getSubscription(“mySubscription”) pullMessageMetadata = { maxMessages: 100 } pullMsgResponse = subscription.pullMessages(pullMessageMetadata) ackIds = arrayNew(1) for(message in pullMsgResponse.messages) { ackId = message.ackId ackIds.append(ackId) WriteOutput(ackId) WriteOutput(message.messageId) WriteOutput(message.data) WriteOutput(message.publishTime) } acknowledgeMessageMetadata = { ackIds: ackIds } ackMsgResponse = subscription.acknowledgeMessages(acknowledgeMessageMetadata) writeDump(ackMsgResponse) </cfscript>
Modify acknowledgement deadline
Syntax
modifyAckDeadline(ackDeadlineMetadata)
Parameters
Parameter | Required | Description |
subscriptionName |
The name of the subscription. | Yes |
ackIds |
Array of acknowledgement Ids. |
Yes |
ackDeadlineSeconds |
The time within which the acknowledgement expires. | Yes |
Example
<cfscript> cred = { projectId : "<your project id>" credentialJsonFilePath : "<path to your credentials json file>" } conf = { serviceName : "pubsub" } pubsubclient = getCloudService(cred, conf) topic = pubsubclient.getTopic("myTopic") subscription = topic.getSubscription("mySubscription") pullMessageMetadata = { maxMessages: 100 } pullMsgResponse = subscription.pullMessages(pullMessageMetadata) ackIds = arrayNew(1) for(message in pullMsgResponse.messages) { ackId = message.ackId ackIds.append(ackId) WriteOutput(ackId) WriteOutput(message.messageId) WriteOutput(message.data) WriteOutput(message.publishTime) } modifyAckDeadlineMetadata = { ackIds: ackIds, ackDeadlineSeconds: 80 } WriteDump(subscription.modifyAckDeadline(modifyAckDeadlineMetadata)); acknowledgeMessageMetadata = { ackIds: ackIds } ackMsgResponse = subscription.acknowledgeMessages(acknowledgeMessageMetadata) writeDump(ackMsgResponse) </cfscript>
Create a Schema
Syntax
createSchema(schemaStruct)
Parameters
Parameter | Description | Required |
schemaName | The name of the schema for Avro and Protobuf. | Yes |
type | The valid values are: Avro or Protobuf. | Yes |
definition | The schema definition. You can pass the definition directly. | Yes |
Example
<cfscript> cred = { projectId : "<your project id>", credentialJsonFilePath : "<path to your credentials json file>" } conf = { serviceName : "pubsub" } pubsubclient=getCloudService(cred, conf) schemaName = "schemaOne" schemaMetadata = { schemaName: schemaName, type: "protocol_Buffer", Definition: 'syntax = "proto2"; message ProtocolBuffer {required string name = 1;}' } createResponse=pubsubClient.createSchema(schemaMetadata) writeDump(createResponse) </cfscript>
Get a Schema
Syntax
getSchema(schemaName)
Parameter
- schemaName: Name of the schema
Example
<cfscript> cred = { projectId : "<your project id>", credentialJsonFilePath : "<path to your credentials json file>" } conf = { serviceName : "pubsub" } pubsubclient=getCloudService(cred, conf) schemaName = "schemaOne" schemaMetadata = { schemaName: schemaName, type: "protocol_Buffer", Definition: 'syntax = "proto2"; message ProtocolBuffer {required string name = 1;}' } createResponse=pubsubClient.createSchema(schemaMetadata) createResponse=pubsubClient.createSchema(schemaMetadata) writeDump(createResponse) schema = pubsubClient.getSchema(schemaName) writeOutput("schema ID: " & schema.getID() & "<br>") writeOutput("schema Name: " & schema.getName() & "<br>") writeOutput("schema type: " & schema.getType() & "<br>") writeOutput("schema Definition: " & schema.getDefinition() & "<br>") </cfscript>
List all schemas in the project
Syntax
listSchemas()
listSchemas(parameterStruct)
Parameters
Parameter | Description | Required |
pageSize | Number of schemas to be displayed. | Yes |
nextPageToken | Token used to access the next page of this result. |
Yes |
schemaView | Valid values are: Full or Basic | Yes |
Example
<cfscript> cred = { projectId : "<your project id>", credentialJsonFilePath : "<path to your credentials json file>" } conf = { serviceName : "pubsub" } pubsubclient=getCloudService(cred, conf) schemaName = "schemaOne" schemaMetadata = { schemaName: schemaName, type: "protocol_Buffer", Definition: 'syntax = "proto2"; message ProtocolBuffer {required string name = 1;}' } pubsubClient.createSchema(schemaMetadata) schema = pubsubClient.getSchema(schemaName) first2schemas = pubsubClient.listSchemas({ pageSize: 2, schemaView : "full" }) writeDump(first2schemas) </cfscript>
Delete a schema
Syntax
deleteSchema(schemaName)
Parameter
- schemaName: Name of the schema
Example
<cfscript> cred = { projectId : "<your project id>", credentialJsonFilePath : "<path to your credentials json file>" } conf = { serviceName : "pubsub" } pubsubclient=getCloudService(cred, conf) schemaName = "schemaOne" schemaMetadata = { schemaName: schemaName, type: "protocol_Buffer", Definition: 'syntax = "proto2"; message ProtocolBuffer {required string name = 1;}' } pubsubClient.createSchema(schemaMetadata) deleteResponse=pubsubClient.deleteSchema(schemaName) writeDump(deleteResponse) </cfscript>
Validate a schema
Syntax
validateSchema(schemaMetadata)
Parameters
Parameter | Description | Required |
schemaName |
The name of the schema. | Yes |
type |
AVRO or Protobuf | Yes |
definition |
The schema definition. | Yes |
Example
<cfscript> cred = { projectId : "<your project id>", credentialJsonFilePath : "<path to your credentials json file>" } conf = { serviceName : "pubsub" } pubsubclient = getCloudService(cred, conf) schemaMetadata = { schemaName: "mySchema", type: “AVRO”, definition: ‘{ "type" : "record", "name" : "Avro", "fields" : [ { "name" : "StringField”, "type" : “string” }, { "name" : "IntField", "type" : "int" } ] }’ } schema = pubsubclient.validateSchema(schemaMetadata); WriteDump(schema); </cfscript>
Validate a message
Syntax
validateMessage(validateMetadata)
Parameters
Parameter | Description | Required |
schemaName |
The name of the schema. | Yes |
schemaEncoding |
JSON or Binary | Yes |
data |
The data to validate. | Yes |
Example
<cfscript> cred = { projectId : "<your project id>", credentialJsonFilePath : "<path to your credentials json file>" } conf = { serviceName : "pubsub" } pubsubclient = getCloudService(cred, conf) validateMessageMetadata = { schemaName: "mySchema", schemaEncoding: "JSON", data: ‘{ "name" : "charlie"}’ } schema = pubsubclient.validateMessage(validateMessageMetadata); WriteDump(schema); </cfscript>
Schema object methods
- createSchema(struct)
- deleteSchema(string)
- getSchema(string)
- getSchema(string, string)
- listSchemas()
- listSchemas(struct)
- validateSchema(struct)