Python SDK Examples
The Fluvio Python module provides an extension for working with the Fluvio streaming platform.
This module builds on top of the Fluvio Client Rust Crate and provides a pythonic access to the API.
Creating a topic with default settings is as simple as:
fluvio_admin = FluvioAdmin.connect()
fluvio_admin.create_topic("a_topic")
Or just create a topic with custom settings:
import fluvio
fluvio_admin = FluvioAdmin.connect()
topic_spec = (
TopicSpec.create()
.with_retention_time("1h")
.with_segment_size("10M")
.build()
)
fluvio_admin.create_topic("a_topic", topic_spec)
Producing data to a topic in a Fluvio cluster is as simple as:
import fluvio
fluvio = Fluvio.connect()
topic = "a_topic"
producer = fluvio.topic_producer(topic)
for i in range(10):
producer.send_string("Hello %s " % i)
Consuming is also simple:
import fluvio
fluvio = Fluvio.connect()
topic = "a_topic"
builder = ConsumerConfigExtBuilder(topic)
config = builder.build()
stream = fluvio.consumer_with_config(config)
num_items = 2
records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]
Also you can consume using offset management:
import fluvio
fluvio = Fluvio.connect()
topic = "a_topic"
builder = ConsumerConfigExtBuilder(topic)
builder.offset_start(Offset.beginning())
builder.offset_strategy(OffsetManagementStrategy.MANUAL)
builder.offset_consumer("a-consumer")
config = builder.build()
stream = fluvio.consumer_with_config(config)
num_items = 2
records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]
stream.offset_commit()
stream.offset_flush()
For more examples see the integration tests in the fluvio-python repository.