Skip to main content
Version: latest

Key-Value - Input

In this tutorial we use key-values. Key-values are entries that take record a key and the corresponding value. This example will focus on taking inputs from a key-value topic to a non-key-value topic.

Prerequisites

This guide uses local Fluvio cluster. If you need to install it, please follow the instructions at here.

Dataflow

Overview

In this example, we will write an example that takes inputs from a key-value topic then transforms it via a filter-map to select only students in a certain class.

Visual of defined dataflow

Define the types

For this example, we will not be using primitive types. We have the following objects.

types:
student-info:
type: object
properties:
age:
type: u32
teacher:
type: string
grade:
type: f32
student-in-class:
type: object
properties:
name:
type: string
grade:
type: f32

We have two object types here. A student-info that stores the value of the source. And student-in-class that captures the sink's type.

Topic List

The following is our list of topics.

topics:
student-list:
schema:
key:
type: string
value:
type: student-info
profx-students:
schema:
value:
type: student-in-class
Source

The student-list is the source topic. It maps a primitive string to a student-info. For our example, the primitive string will be the name of the student.

Sink

The profx-students is the sink topic. It contains the type student-in-class defined above.

Transform

We will apply a transform to take the tuple (String,StudentInfo) entries into StudentInClass. We will apply a filter-map transform to filter out students that do not have profX in their teacher entry.

transforms:
- operator: filter-map
run: |
fn make_user(name: Option<String>, info: StudentInfo) -> Result<Option<StudentInClass>>{
if info.teacher != "profX" {
return Ok(None);
}
let ret = StudentInClass{
name: name.unwrap_or("".to_string()),
grade: info.grade
};
Ok(Some(ret))
}

In this transfrom function, the input parameters contain a variable for the key, name, and another for the value, info.

Running the Example

Full Code

Copy and paste following config and save it as dataflow.yaml.

apiVersion: 0.5.0

meta:
name: key-value-type
version: 0.1.0
namespace: example

config:
converter: json
consumer:
default_starting_offset:
value: 0
position: End

types:
student-info:
type: object
properties:
age:
type: u32
teacher:
type: string
grade:
type: f32
student-in-class:
type: object
properties:
name:
type: string
grade:
type: f32

topics:
student-list:
schema:
key:
type: string
value:
type: student-info
profx-students:
schema:
value:
type: student-in-class

services:
filter-profx:
sources:
- type: topic
id: student-list

transforms:
- operator: filter-map
run: |
fn student_filter(name: Option<String>, info: StudentInfo) -> Result<Option<StudentInClass>>{
if info.teacher != "profX" {
return Ok(None);
}
let ret = StudentInClass{
name: name.unwrap_or("".to_string()),
grade: info.grade
};
Ok(Some(ret))
}

sinks:
- type: topic
id: profx-students

Running SDF

To run example:

$ sdf run --ephemeral

Produce data

We will produce some data by first writing it into a file name student.txt.

Jerry>{"age":13,"teacher":"profX","grade":90.1} 
Tom>{"age":14,"teacher":"profY","grade":99.2}
Jane>{"age":-1,"teacher":"profZ","grade":99.11}
Terry>{"age":13,"teacher":"profX","grade":50}

We can produce data via

$ fluvio produce student-list --key-separator ">" -f student.txt

We should see the following output if we consume with -k

$ fluvio consume student-list -Bdk
[Jerry] {"age":13,"teacher":"profX","grade":90.1}
[Tom] {"age":14,"teacher":"profY","grade":99.2}
[Jane] {"age":-1,"teacher":"profZ","grade":99.11}
[Terry] {"age":13,"teacher":"profX","grade":50}

Consume data

To consume the data.

$ fluvio consume profx-students -Bd
{"grade":90.1,"name":"Jerry"}
{"grade":50.0,"name":"Terry"}

Only students with teacher names profX is sent to the sink.

Cleanup

Exit sdf terminal and clean-up. The --force flag removes the topics:

$ sdf clean --force

Conclusion

This how-to focused on using key-values as inputs. The following pages contains another example of key-value as inputs.

  1. Key Value Input