Apache Pulsar standalone usage and basic topics

4 May, 2021

LinkedInTwitter

In my last Pulsar post I did a side by side comparison of Apache Kafka and Apache Pulsar. Let’s continue looking at Pulsar a little closer, there are some really interesting things when it comes to topics and the options available.

Starting a Standalone Pulsar Cluster

As with Kafka, Pulsar lets you operate a standalone cluster so you can get the grasp of the basics. For this blog I’m going to assume that you have installed the Pulsar binaries, while you can operate Pulsar in a Docker container or via Kubernetes, I will not be covering those in this post. 

In the bin directory of the Pulsar distribution is the pulsar command. This gives you control on starting a standalone cluster or the Zookeeper, Bookkeeper and Pulsar broker components separately. I’m going to start a standalone cluster:

$ bin/pulsar standalone

After a few minutes you will see that the cluster is up and running.

11:02:06.902 [worker-scheduler-0] INFO  org.apache.pulsar.functions.worker.SchedulerManager - Schedule summary - execution time: 0.042227224 sec | total unassigned: 0 | stats: {"Added": 0, "Updated": 0, "removed": 0}
{
  "c-standalone-fw-localhost-8080" : {
    "originalNumAssignments" : 0,
    "finalNumAssignments" : 0,
    "instancesAdded" : 0,
    "instancesRemoved" : 0,
    "instancesUpdated" : 0,
    "alive" : true
  }
}

Pulsar Consumers

In the comparison blog post I noted that where Kafka pulls messages from the brokers, Pulsar pushes messages out to consumers. Pulsar uses subscriptions to route messages from the brokers to any number of consumers that are subscribed. The read position of the log is handled by the brokers.

Kafka Pull Pulsar Push

I’m going to create a basic consumer to the standalone cluster. In the bin directory there is a Pulsar client application that we can use without having to code anything, very similar to the Kafka console-producer and console-consumer applications.

$ bin/puslar-client consume sales-trigger -s "st-subscription-1"

Let’s break this command down a little bit.

Pulsar Command

Once executed you will see in the consumer application output that it has subscribed to the topic and is awaiting for a message.

12:04:50.621 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [sales-trigger][st-subscription-1] Subscribing to topic on cnx [id: 0x049a4567, L:/127.0.0.1:63912 - R:localhost/127.0.0.1:6650], consumerId 0
12:04:50.664 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [sales-trigger][st-subscription-1] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0

You may have noticed that the topic hasn’t been created yet, the consumer is up and running waiting though. Now let’s create the producer and send a message.

Pulsar Producers

Opening another terminal window, I’m going to run the Pulsar client as a producer this time and send a single message.

$ bin/pulsar-client produce sales-trigger --messages "This is a test message"

When executed the producer will connect to the cluster and send the message, the output shows that the message was sent.

13:50:56.342 [main] INFO  org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully produced 

If you are running your own consumer and producer, now take a look at the consumer and see what’s happened, it’s received the message from the broker and then cleanly exited.

----- got message -----
key:[null], properties:[], content:This is a test message
13:50:56.378 [main] INFO  org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: pulsar://localhost:6650/
13:50:56.404 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [sales-trigger] [st-subscription-1] Closed consumer
13:50:56.409 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [id: 0x049a4567, L:/127.0.0.1:63912 ! R:localhost/127.0.0.1:6650] Disconnected
13:50:56.422 [main] INFO  org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully consumed
$

If you are used to using Kafka you would expect your consumer client to wait for any more messages from the broker, however, with Pulsar this is not the default behaviour of the client application.

Ideally the client consumer should keep running, awaiting more messages from the brokers. There is an additional flag in the client that can be set.

$ bin/puslar-client consume sales-trigger -s "st-subscription-1" -n 0

The -n flag stands for the number of messages to accept before the consumer disconnects from the cluster and closes, the default is 1 message, if set to 0 then no limit is set and it will consume any messages the brokers push to it.

Like the consumer settings, the producer can send multiple messages in one execution

$ bin/pulsar-client produce sales-trigger --messages "This is a test message" -n 100

With the -n flag in the produce mode, the client will send one hundred messages to the broker.

15:01:03.339 [main] INFO  org.apache.pulsar.client.cli.PulsarClientTool - 100 messages successfully produced

The active consumer will receive the messages and await more. 

----- got message -----
key:[null], properties:[], content:This is a test message
----- got message -----
key:[null], properties:[], content:This is a test message
----- got message -----
key:[null], properties:[], content:This is a test message
----- got message -----
key:[null], properties:[], content:This is a test message
----- got message -----
key:[null], properties:[], content:This is a test message

Keys and Properties

You may have noticed in the consumer output that along with the content of the message are two other sections, a key and properties.

Each message can have a key, optional but highly advised. Properties are based on key/value pairs, you have multiple properties by comma separating them. Supposing I want to have an action property with some form of command and the key being the current Unix timestamp, the client would look like the following:

$ bin/pulsar-client produce sales-trigger --messages "This is a test message" -n 100 -p action=create -k `date +%s`

As the consumer is still running, awaiting new messages, you will see the output with the key and properties.

----- got message -----
key:[1611328125], properties:[action=create], content:This is a test message

Persistent and Non-Persistent Messages

There are few differences between Kafka and Pulsar when it comes to persistence of messages. By default Pulsar will assume a topic is classed as persistent and will save messages to the Bookkeeper instances (called Bookies).

Whereas Kafka has a time to live for messages regardless of whether the consumer has read the message or not, the default is seven days (168 hours), Pulsar will keep the messages persisted. Once all subscribed consumers have successfully read the messages and acknowledged so back to the broker, the messages will then be removed from storage.

Pulsar can be configured, and should be in production environments, to have a time-to-live (TTL) for messages held in persistent storage.

In Memory Topics

If you wish for topic messages to be stored within memory and not to disk then non-persistent topics are available.

Creating non-persistent topics can be done for the client but require the full namespace configuration.

non-persistent topics
When interacting with clients we now have to use the full namespace. So running the consumer again, but with a non-persistent topic would like this:
$ bin/pulsar-client consume  non-persistent://public/default/sales-trigger2 -s "st-subscription-2" -n 0
If it were to persistent (storage going to the Bookies):
$ bin/pulsar-client consume  persistent://public/default/sales-trigger2 -s "st-subscription-2" -n 0
With non-persistent topics if the broker fails then any messages stored in memory or in transit to a subscribed consumer are lost.

Listing Topics

The Pulsar admin client handles all aspects of the cluster from the command, this includes broker, bookies, topics and TTL configurations and specific configurations for named subscriptions if required.

For now, let’s just list the topics I’ve been working with in this post:

$ bin/pulsar-admin topics list public/default
"non-persistent://public/default/sales-trigger2"
"persistent://public/default/sales-trigger"

Summary

This post should give you a basic starting point of how the Pulsar client and the standalone cluster work. Consumers and producers give us the backbone of a streaming application, with the added features such as whether a topic is persistent or non-persistent (in memory).

All this has been done from the command line, in a future post I’ll look at putting a basic Producer and Consumer application together in code.

If you would like to know more about how to implement modern data, streaming and cloud technologies into your business, we at Digitalis do it all: from cloud migration to fully managed services, we can help you modernize your operations, data, streaming and applications. We provide consulting and managed services on clouddata, and DevOps for any business type. Contact us for more information.

Jason Bell

Jason Bell

DevOps Engineer and Developer

With over 30 years’ of experience in software, customer loyalty data and big data, Jason now focuses his energy on Kafka and Hadoop. He is also the author of Machine Learning: Hands on for Developers and Technical Professionals. Jason is considered a stalwart in the Kafka community. Jason is a regular speaker on Kafka technologies, AI and customer and client predictions with data.

Categories

Archives

Related Articles