Skip to main content
Version: latest

Split Filter Map Operator

The Split Filter is an operator used to selectively filter out what entries enter what sinks. The Split Filter is useful when wanting to sort entries into multiple sinks. For this example, we will refer to the example for the filter operator. We will map strings from the topic sentences to sink topics question and not-question. The following visual shows how the service interacts with a single source to two sinks.

Visual of defined dataflow

Prerequisites

This guide uses local Fluvio cluster. If you need to install it, please follow the instructions at here.

Transformation

The general syntax follows from the filter operator's transform but is instead embedded with the sinks.

sinks:
- type: topic
id: (...)
transforms:
- operator: filter
run: |
(... filter function ...)
(... more topics ...)

In our case, the function takes a string as input, but will always return a boolean value. If the input string contains a question mark, the operator will return true. Otherwise, it will return false.

sinks:
- type: topic
id: question
transforms:
- operator: filter
run: |
fn filter_question(sentence: String) -> Result<bool>{
Ok(sentence.contains("?"))
}
- type: topic
id: not-question
transforms:
- operator: filter
run: |
fn filter_not_question(sentence: String) -> Result<bool>{
Ok(!sentence.contains("?"))
}

Running the Example

Copy and paste following config and save it as dataflow.yaml.

# dataflow.yaml
apiVersion: 0.5.0
meta:
name: split-filter-example
version: 0.1.0
namespace: examples

config:
converter: raw

topics:
sentences:
schema:
value:
type: string
question:
schema:
value:
type: string
not-question:
schema:
value:
type: string


services:
filter-service:
sources:
- type: topic
id: sentences
sinks:
- type: topic
id: question
transforms:
- operator: filter
run: |
fn filter_question(sentence: String) -> Result<bool>{
Ok(sentence.contains("?"))
}
- type: topic
id: not-question
transforms:
- operator: filter
run: |
fn filter_not_question(sentence: String) -> Result<bool>{
Ok(!sentence.contains("?"))
}

To run example:

$ sdf run --ephemeral

Produce sentences to in sentence topic:

$ echo "Hello world" | fluvio produce sentences
$ echo "Are you there?" | fluvio produce sentences

Consume topic question to retrieve the result in another terminal:

$ fluvio consume question -Bd
Are you there?

Consume the other topic not-question

$ fluvio consume not-question -Bd
Hello world

Instead of one topic with only questions, we see that both topics are sent to sinks.

Cleanup

Exit sdf terminal and clean-up. The --force flag removes the topics:

$ sdf clean --force

Conclusion

In this example, we covered how to use split traffic with the filter operator. While quite similar to the filter transform, using it as a split operator may allow for easier code.