Getting Started with Stateful Dataflows
This guide will get you started with SDF, an utility that helps developers build, troubleshoot, and run full-featured event-driven dataflows.
Example Dataflow
As an example, we'll create a dataflow that splits sentences into words, and counts the number of characters in each word.
The dataflow reads from sentences
topic and writes to words
topics. This example is available for download in github.
Prerequisites
Building a Stateful Dataflow requires the following software:
- Rust 1.80 or beyond - Install Rust
wasm32-wasip1
Rust target installed. Typically, installed with:rustup target add wasm32-wasip1
Installing Fluvio & Start a Cluster
SDF requires a Fluvio Cluster to consume, produce, and stream records between services.
Download and install the CLI.
$ curl -fsS https://hub.infinyon.cloud/install/install.sh | bash
This command will download the Fluvio Version Manager (fvm), Fluvio CLI (fluvio) and config files into $HOME/.fluvio
, with the executables in $HOME/.fluvio/bin
. To complete the installation, you must add the executables to your shell $PATH
.
Start a Local cluster:
$ fluvio cluster start
If you prefer to run your cluster in InfinyOn Cloud follow the instructions here.
Run the following command to check the CLI and the Cluster platform versions:
$ fluvio version
Your Fluvio cluster is ready for use.
Install and Setup SDF
SDF is in beta and it requires the following image:
fvm install sdf-beta5
You can validate prerequisites with:
sdf setup
All pre-requisites are installed!
Your SDF environment is ready to go.
Use Composition
to build a Stateful Dataflow
Composition has two main components, packages and dataflows. You can build and test a package independently, then import it into a dataflow. For additional information, check out the Composition section.
1. Building a Package
The package is a collection of services, functions, and states that can be defined, tested, then be imported into a dataflow.
Let's build one:
1.1 Create a Package File
Open the terminal, create a fresh project directory, say split-sentence
, where we'll add the dataflow later on. Inside the project directory, create a packages
directory and a subdirectory sentence
for the package itself:
$ mkdir -p split-sentence/packages/sentence
$ cd split-sentence/packages/sentence
Inside the sentence
directory and create an file called sdf-package.yaml
and add the following content:
apiVersion: 0.5.0
meta:
name: sentence-pkg
version: 0.1.0
namespace: example
functions:
sentence-to-words:
operator: flat-map
inputs:
- name: sentence
type: string
output:
type: string
augment-count:
operator: map
inputs:
- name: word
type: string
output:
type: string
dev:
converter: raw
The package file instructs the generator to build two functions, sentence-to-words
and augment-count
, that we intend to implement. In addition, it tells the sdf test
runtime that our inputs should be ingested as raw
for testing our routines.
1.2 Generate the Package Project
The SDF generate command parses the sdf-package.yaml
file and builds the project:
$ sdf generate
The generator created several directories and files that we'll edit next.
1.3 Add the Custom Code
First let's update the first function sentence-to-words
. Open rust/sentence-pkg/src/sentence_to_words.rs
and update the function body with the following code:
fn sentence_to_words(sentence: String) -> Result<Vec<String>> {
Ok(sentence.split_whitespace().map(String::from).collect())
}
Next update augment_count
. Open rust/sentence-pkg/src/augment_count.rs
and replace the function body:
fn augment_count(word: String) -> Result<String> {
Ok(format!("{}({})", word, word.chars().count()))
}
Let's add some tests as well:
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_augment_count() {
let input = "Hello".to_string();
let output = Component::augment_count(input);
assert_eq!(output.unwrap(), "Hello(5)");
}
}
We've implemented both functions, it's time to compile and test our work.
1.4 Build and Test the Package
To build the package, run:
$ sdf build
SDF has a built-in test
interactive shell. Let's bring it up:
$ sdf test
In the test shell, you can view the functions available for testing:
>> show functions
sentence-to-words
augment-count
Let's test sentence-to-words
first:
>> test function sentence-to-words --value "Hello World"
Hello
World
Next, test augment-count
:
>> test function augment-count --value "Hello"
Hello(5)
You may also test the rust
code via Cargo:
cd rust/augment-count
cargo test
The tests passed, and package now is ready to use in the dataflow file.
2. Build the Stateful Dataflow
We are building a dataflow that reads words from sentences
topic, and publishes the result to words
topic. Let's get started.
2.1 Create a Dataflow File
Navigate to the base project directory.
$ cd ../../
Create a file called dataflow.yaml
and copy/paste the following content:
apiVersion: 0.5.0
meta:
name: split-sentence
version: 0.1.0
namespace: example
imports:
- pkg: example/sentence-pkg@0.1.0
path: ./packages/sentence
functions:
- name: sentence-to-words
- name: augment-count
topics:
sentence:
schema:
value:
type: string
converter: raw
words:
schema:
value:
type: string
converter: raw
services:
sentence-words:
sources:
- type: topic
id: sentence
transforms:
- operator: flat-map
uses: sentence-to-words
- operator: map
uses: augment-count
sinks:
- type: topic
id: words
dev:
imports:
- pkg: example/sentence-pkg@0.1.0
path: ./packages/sentence
This example focuses on composition, and the area of interest is the imports
section:
package
- is the name of the package we are importing (a composition of themeta
fields of the package)path
- is the relative directory where this package can be found.functions
- the name of the functions we want to import.
The imported functions are then referenced them by name in the transforms
section.
2.2 Run the Dataflow
Let's run the project:
$ sdf run --ui
Please visit http://127.0.0.1:8000 to view your workflow visualization
>>
Note:
- The
run
command performs multiple operations:- imports and links all packages
- compiles inline code (if needed)
- looks-up the topics in the cluster and automatically creates them if they don't exist.
- The
--ui
flag generates a visual representation of the dataflow at http://127.0.0.1:8000. - When you close the
run
interactive editor, the dataflow stops processing records.