Skip to main content
Version: latest

Merge

Services in fluvio can be defined to have multiple sinks and sources. In this example, we will implement a service that takes in multiple sources via a merge. The example will simulate buying and selling stocks. There is a topic buy and another topic sell that aggreate to create a log of buy and sell orders. The visual shows the dataflow we will implement.

Visual of defined dataflow

Prerequisites

This guide uses local Fluvio cluster. If you need to install it, please follow the instructions at here.

Transformation

We can add a transform operator to our source list.

source:
- type: topic
id: (...)
transforms:
- operator: map
run: |
(... filter function ...)
(... more topics ...)

In our case, we have two topics buy and sell that read json objects that include a name, amount, and price. We will map the json object into a string that gets sent into the topic message which will log the orders.

sources:
- type: topic
id: buy
transforms:
- operator: map
run: |
fn buy_order(order: Order) -> Result<String> {
Ok(format!("+ Buy Order for {}x{} at {}",order.name,order.amount,order.price))
}
- type: topic
id: sell
transforms:
- operator: map
run: |
fn sell_order(order: Order) -> Result<String> {
Ok(format!("- Sell Order for {}x{} at {}",order.name,order.amount,order.price))
}

Running the Example

Copy and paste following config and save it as dataflow.yaml.

# dataflow.yaml
apiVersion: 0.5.0
meta:
name: merge-example
version: 0.1.0
namespace: examples

config:
converter: json

types:
order:
type: object
properties:
name:
type: string
amount:
type: u32
price:
type: f32

topics:
buy:
schema:
value:
type: order
sell:
schema:
value:
type: order
message:
schema:
value:
type: string

services:
mergeservice:
sources:
- type: topic
id: buy
transforms:
- operator: map
run: |
fn buy_order(order: Order) -> Result<String> {
Ok(format!("+ Buy Order for {}x{} at {}",order.name,order.amount,order.price))
}
- type: topic
id: sell
transforms:
- operator: map
run: |
fn sell_order(order: Order) -> Result<String> {
Ok(format!("- Sell Order for {}x{} at {}",order.name,order.amount,order.price))
}
sinks:
- type: topic
id: message

To run example:

$ sdf run --ephemeral

Produce json objects to each of the topics:

$ echo '{"name":"AMZN","amount":20,"price":173.33}' | fluvio produce buy
$ echo '{"name":"TSLA","amount":20,"price":219.41}' | fluvio produce sell

Consume topic message to retrieve the result:

$ fluvio consume message -Bd
"+ Buy Order for AMZNx20 at 173.33"
"- Sell Order for TSLAx20 at 219.41"

Both the buy order and sell order has been mapped into a string to be logged.

Cleanup

Exit sdf terminal and clean-up. The --force flag removes the topics:

$ sdf clean --force

Conclusion

In this example, we covered how to use merge to allow services to consume multiple sources.