Skip to main content
Version: latest

State Example Arrow Row

This tutorial is a continuation from the previous state example. This tutorial shows how to use arrow-rows to store more complicated data types not available in primitive key-values. Read more about arrow rows. We will continue from the tutorial from the merge example to have a saved balance.

Prerequisites

This guide uses local Fluvio cluster. If you need to install it, please follow the instructions at here.

Dataflow

Overview

In this example, we will first show how to create a state, update the state as data enters from the source, and how to interface with the state. The state and update will be defined in the mergeservice and the interfacing will be defined in the interface service.

Visual of defined dataflow

Mergeservice

1. Define the state

For this state, we will simply only track the balance as a float.

states:
tracker:
type: keyed-state
properties:
key:
type: string
value:
type: arrow-row
properties:
balance:
type: f32

Here, the key is a string but the value is stored as an arrow-row which can contain multiple properties(acts like columns).

2. Assign key

Like our previous example, we will use a trivial key to store the balance.

partition:
assign-key:
run: |
fn map_cash(order: f32) -> Result<String> {
Ok("cash".to_string())
}
update-state:
(...)

3. Updating State

To update the state in an arrow-row, we need to update the individual row's columns manual and call an update().

partition:
assign-key:
(...)
update-state:
run: |
fn add_count(order: f32) -> Result<()> {
let mut tracker = tracker();
tracker.balance += order;
tracker.update()?;
Ok(())

States are terminal so no other action will be run.

Iterface

The second service serves as a way to read from the state.

interface:
sources:
- type: topic
id: command
states:
tracker:
from: mergeservice.tracker
sinks:
- type: topic
id: message
transforms:
- operator: map
run: |
fn new_input(_input: String) -> Result<String> {
let track = tracker();
let trackrow = track.sql(&format!("select * from `tracker`"))?;
let rows = trackrow.rows()?;
if !rows.next() {
return Ok("empty".to_string())
}
let balancecol = trackrow.col("balance")?;
let balance = rows.f32(&balancecol)?;
Ok(format!("{:#?}",balance))
}

The service first has to refer to the state created by the mergeservice. Inside the sink is the transform that will iterface with the state. For simplicity, whatever is sent to the source command will result in the service message outputting how much the balance is. For the transform function:

1. We use a sql statement to read from track, a LazyDf.
let trackrow = track.sql(&format!("select * from `tracker`"))?;
2. Afterwards, we can select the column balance.
let rows = trackrow.rows()?;
if !rows.next() {
return Ok("empty".to_string())
}
let balancecol = trackrow.col("balance")?;
let balance = rows.f32(&balancecol)?;

Running the Example

Full Code

Copy and paste following config and save it as dataflow.yaml.

# dataflow.yaml
apiVersion: 0.5.0
meta:
name: arrow-example
version: 0.1.0
namespace: examples

config:
converter: json

types:
order:
type: object
properties:
name:
type: string
amount:
type: u32
price:
type: f32

topics:
buy:
schema:
value:
type: order
sell:
schema:
value:
type: order
command:
schema:
value:
type: string
converter: raw
message:
schema:
value:
type: string


services:
interface:
sources:
- type: topic
id: command
states:
tracker:
from: mergeservice.tracker
sinks:
- type: topic
id: message
transforms:
- operator: map
run: |
fn new_input(_input: String) -> Result<String> {
let track = tracker();
let trackrow = track.sql(&format!("select * from `tracker`"))?;
let rows = trackrow.rows()?;
if !rows.next() {
return Ok("empty".to_string())
}
let balancecol = trackrow.col("balance")?;
let balance = rows.f32(&balancecol)?;
Ok(format!("{:#?}",balance))
}
mergeservice:
sources:
- type: topic
id: buy
transforms:
- operator: map
run: |
fn buy_order(order: Order) -> Result<f32> {
Ok(order.amount as f32 * order.price * -1.0)
}
- type: topic
id: sell
transforms:
- operator: map
run: |
fn sell_order(order: Order) -> Result<f32> {
Ok(order.amount as f32 * order.price)
}
states:
tracker:
type: keyed-state
properties:
key:
type: string
value:
type: arrow-row
properties:
balance:
type: f32
partition:
assign-key:
run: |
fn map_cash(order: f32) -> Result<String> {
Ok("cash".to_string())
}
update-state:
run: |
fn add_count(order: f32) -> Result<()> {
let mut tracker = tracker();
tracker.balance += order;
tracker.update()?;
Ok(())
}

Running SDF

To run example:

$ sdf run --ephemeral

Produce data

We will produce some data for the first service through the buy and sell topics.

$ echo '{"name":"AMZN","amount":20,"price":173.33}' | fluvio produce buy
$ echo '{"name":"TSLA","amount":20,"price":219.41}' | fluvio produce sell

Make sure the state exists by entering show state in sdf. It should have the following states:

>> show state
Namespace Keys Type
(...)
mergeservice/tracker/state 1 u32
(...)

And when running a show state on the that state

>> show state mergeservice/tracker/state
Key Window balance
cash * 921.6001

Consume data

Then lets send any string to command and consume the output found in message

$ echo 'Do stuff' | fluvio produce command
$ fluvio consume message -Bd
921.6001

Cleanup

Exit sdf terminal and clean-up. The --force flag removes the topics:

$ sdf clean --force

Conclusion

We just implement example using arrow states. The following link contains another example with arrow-states.

  1. Temperature Example