Skip to main content
Version: latest

Key-Value - Output

Continuing from the last tutorial about inputs using key-values, we will use store entries as key-values. We can view this as the reverse of the previous tutorial.

Prerequisites

This guide uses local Fluvio cluster. If you need to install it, please follow the instructions at here.

Dataflow

Overview

For this example, we will write a dataflow that tracks books in a library system. It will take a book from the source and return the book as a value combined with a barcode as the key. We will write a short hashing function that takes the title and hashes it into a barcode that will serve as the key for our sink.

Visual of defined dataflow

Define the types

Like the previous example, we will need to define our types. This time, its a little shorter.

types:
book:
type: object
properties:
name:
type: string
year:
type: u32
author:
type: string

Topic List

The following is our list of topics.

topics:
new-book:
schema:
value:
type: book
books-in-system:
schema:
key:
type: string
value:
type: book
Source

The new-book is the source topic.

Sink

The books-in-system is the sink topic. It contains a string key and a book object.

Transform

We will apply a transform to take the Book entries into the tuple (String,Book).

transforms:
- operator: map
run: |
fn newbook(book:Book) -> Result<(Option<String>,Book)>{
let mut p:u32 = 53;
let m:u32 = 1000000009;
let mut hash:u32 = 0;
for c in book.name.chars() {
hash = (hash + (c as u32)*p) % m;
p = (p*53) % m;
}

Ok((Some(hash.to_string()),book))
}

The following just calculates the hash value and returns it as the barcode.

Running the Example

Full Code

Copy and paste following config and save it as dataflow.yaml.

apiVersion: 0.5.0

meta:
name: key-value-output
version: 0.1.0
namespace: example

config:
converter: json
consumer:
default_starting_offset:
value: 0
position: End

types:
book:
type: object
properties:
name:
type: string
year:
type: u32
author:
type: string

topics:
new-book:
schema:
value:
type: book
books-in-system:
schema:
key:
type: string
value:
type: book

services:
addbook:
sources:
- type: topic
id: new-book

transforms:
- operator: map
run: |
fn newbook(book:Book) -> Result<(Option<String>,Book)>{
let mut p:u32 = 53;
let m:u32 = 1000000009;
let mut hash:u32 = 0;
for c in book.name.chars() {
hash = (hash + (c as u32)*p) % m;
p = (p*53) % m;
}

Ok((Some(hash.to_string()),book))
}

sinks:
- type: topic
id: books-in-system

Running SDF

To run example:

$ sdf run --ephemeral

Produce data

We will produce some data by first writing it into a file name book.txt.

{ "author": "Brian W. Kernighan and Dennis M. Ritchie", "name": "C Programming Language", "year": 1988 }
{ "author": "Anany Levitin", "name": "Introduction to the Design and Analysis of Algorithms", "year": 2001 }
{ "author": "Steve Klabnik and Carol Nichols", "name": "The Rust Programming Language", "year": 2018 }
{ "author": "Steve Klabnik and Carol Nichols", "name": "Rust Cookbook", "year": 2020 }

We can produce data via

$ fluvio produce new-book -f book.txt
$ fluvio consume new-book -Bdk

Consume data

To consume the data

$ fluvio consume books-in-system -Bdk
[439139953] {"author":"Brian W. Kernighan and Dennis M. Ritchie","name":"C Programming Language","year":1988}
[747501183] {"author":"Anany Levitin","name":"Introduction to the Design and Analysis of Algorithms","year":2001}
[305905177] {"author":"Steve Klabnik and Carol Nichols","name":"The Rust Programming Language","year":2018}
[605191240] {"author":"Steve Klabnik and Carol Nichols","name":"Rust Cookbook","year":2020}

We can see that the key is the hash value followed by the data of the book.

Cleanup

Exit sdf terminal and clean-up. The --force flag removes the topics:

$ sdf clean --force

Conclusion

This how-to focused on using key-values as out. The following pages contains another example of key-value as inputs.

  1. Key Value Output