Skip to main content
Version: latest

NodeJS SDK Examples

  • This client uses node-bindgen to wrap the Rust client.
  • It supports most administrator features.
  • The blocking calls to Fluvio return promises allowing for async on blocking Fluvio calls.
  • The PartitionConsumer.createStream call returns an asyncIterator to allow iterating over the stream in a for-loop.

To see the full docs, visit our typedoc page.

Example Workflow

Follow the installation instructions to run this example.

/**
* This is an example of a basic Fluvio workflow in Typescript
*
* 1. Establish a connection to the Fluvio cluster
* 2. Create a topic to store data in
* 3. Create a producer and send some bytes
* 4. Create a consumer, and stream the data back
*/
import Fluvio, { Offset, Record } from "@fluvio/client";

const TOPIC_NAME = "hello-node";
const PARTITION = 0;

async function createTopic() {
try {
// Connect to the Fluvio cluster
console.log("Connecting client to fluvio");
await fluvio.connect();

// Create admin client;
const admin = await fluvio.admin();

// Create topic
console.log("Creating topic");
await admin.createTopic(TOPIC_NAME);
} catch (ex) {
console.log("Topic already exists", ex);
}
}

const produce = async () => {
// Connect to the Fluvio cluster
console.log("Connecting client to fluvio");
await fluvio.connect();

// Create a topic producer;
const producer = await fluvio.topicProducer(TOPIC_NAME);
await producer.send("example-key", "Hello World! - Time is " + Date());
};

const consume = async () => {
try {
// Connect to the fluvio cluster referenced in the cli profile.
await fluvio.connect();

// Create partition consumer
const consumer = await fluvio.partitionConsumer(TOPIC_NAME, PARTITION);

console.log("read from the end");
await consumer.stream(Offset.FromEnd(), async (record: Record) => {
// handle record;
console.log(`Key=${record.keyString()}, Value=${record.valueString()}`);
process.exit(0);
});
} catch (ex) {
console.log("error", ex);
}
};

// Create Fluvio Client Instance
const fluvio = new Fluvio();
createTopic();
produce();
consume();

Run

$ npx ts-node example.ts

Expected Output

Connecting client to fluvio
Connecting client to fluvio
Creating topic
read from the end
Key=example-key, Value=Hello World! - Time is (...)

The above code tries to create a topic, produces an entry for the topic, and consumes the said entry. The process.exit(0) consumes only one record before ending.