Skip to main content
Version: 0.13.0

Python SDK Examples

To see the full docs, visit our pdoc page.

Examples

Producer

Create the topic used to produce and consume records:

$ fluvio topic create python-data

Create a file called python-produce.py:

#!/usr/bin/env python
from datetime import datetime
from fluvio import Fluvio

TOPIC_NAME = "python-data"
PARTITION = 0

if __name__ == "__main__":
# Connect to cluster
fluvio = Fluvio.connect()

# Produce 10 records to topic
producer = fluvio.topic_producer(TOPIC_NAME)
for x in range(10):
producer.send_string("{}: timestamp: {}".format(x, datetime.now()))

# Flush the last entry
producer.flush()

Let's run the file:

$ python python-produce.py

Consumer

Create a file called python-consume.py:

#!/usr/bin/env python
from fluvio import Fluvio, Offset

TOPIC_NAME = "python-data"
PARTITION = 0

if __name__ == "__main__":
# Connect to cluster
fluvio = Fluvio.connect()

# Consume last 10 records from topic
consumer = fluvio.partition_consumer(TOPIC_NAME, PARTITION)
for idx, record in enumerate( consumer.stream(Offset.from_end(10)) ):
print("{}".format(record.value_string()))

if idx >= 9:
break

Let's run the file:

$ python python-consume.py

Example with a SmartModule

This is a 3 part example:

Python Script that uses a SmartModule

Create a file called smartmodule-consumer.py:

#!/usr/bin/env python
import os
from datetime import datetime
from fluvio import Fluvio, Offset, ConsumerConfig

TOPIC_NAME = "hello-python-smartmodule"
PARTITION = 0

# This is an example of a basic Fluvio workflow in Python
#
# 1. Create a topic to store data in via CLI
# 2. Establish a connection to the Fluvio cluster
# 3. Create a producer and send some bytes
# 4. Create a consumer, and stream the data back
if __name__ == "__main__":
# Currently the Python client does not support creating topics
# Using the Fluvio CLI
os.popen("fluvio topic create {}".format(TOPIC_NAME))

# Connect to cluster
fluvio = Fluvio.connect()

# Produce to topic
producer = fluvio.topic_producer(TOPIC_NAME)
producer.send_string("Hello World! - Time is: {}".format(datetime.now()))

# Consume from topic
# We're just going to get the last record
consumer = fluvio.partition_consumer(TOPIC_NAME, PARTITION)

# Create a ConsumerConfig using your "uppercase" smartmodule
config = ConsumerConfig()
config.smartmodule(name="uppercase")

for record in consumer.stream_with_config(Offset.from_end(0), config):
print("{}".format(record.value_string()))
break

Test Python Script

Read to test the script:

$ python smartmodule-consumer.py
HELLO WORLD! - TIME IS: 2024-08-25 21:04:51.172045