Skip to main content
Version: latest

Split Filter Operator

Like the Split Filter operator, the Split Filter-Map operator extends the functionality of a traffic splitter with the filter-map transformation. The operator canselectively filter out what entries enter what sinks while also applying mapping functionality. The following example is an extension of the filter map example. We will implement a dataflow that can detect the whether or not an input is a valid addition or substraction statement, compute the equation, and send it to the right sink.

Visual of defined dataflow

Prerequisites

This guide uses local Fluvio cluster. If you need to install it, please follow the instructions at here.

Transformation

The filter-map transformation is placed in the sink section.

sinks:
- type: topic
id: (...)
transforms:
- operator: filter-map
run: |
(... filter map function ...)
(... more topics ...)

The function defined should take in the input and return a Result<Option<...>>. The Result<Option<...>> is either a Some(...) if the input is not filtered or a None if the input should be filtered. The following are our transformations. The code is shortened for brevity but the full example is below. The regex and calculation is just replaced for the dosubtraction topic compared to the filter map example.

sinks:
- type: topic
id: doaddition
transforms:
- operator: filter-map
dependencies:
- name: regex
version: "1"
run: |
fn do_addition(input: String) -> Result<Option<String> > {
(...)
}
- type: topic
id: dosubtraction
transforms:
- operator: filter-map
dependencies:
- name: regex
version: "1"
run: |
fn do_substraction(input: String) -> Result<Option<String> > {
let re = regex::Regex::new(r"^(\d+)-(\d+)=$").unwrap();
if let Some(num) = re.captures(&input) {
(...)
return Ok(Some(format!("{}{}",input,(a-b))));
} else{
return Ok(None);
}
}

Running the Example

Copy and paste following config and save it as dataflow.yaml.

# dataflow.yaml
apiVersion: 0.5.0
meta:
name: split-filter-map-example
version: 0.1.0
namespace: examples

config:
converter: raw

topics:
sentences:
schema:
value:
type: string
doaddition:
schema:
value:
type: string
dosubtraction:
schema:
value:
type: string

services:
filter-map-service:
sources:
- type: topic
id: sentences

sinks:
- type: topic
id: doaddition
transforms:
- operator: filter-map
dependencies:
- name: regex
version: "1"
run: |
fn do_addition(input: String) -> Result<Option<String> > {
let re = regex::Regex::new(r"^(\d+)\+(\d+)=$").unwrap();
if let Some(num) = re.captures(&input) {
let a: i32 = num.get(1).unwrap().as_str().parse().unwrap();
let b: i32 = num.get(2).unwrap().as_str().parse().unwrap();
return Ok(Some(format!("{}{}",input,(a+b))));
} else{
return Ok(None);
}
}
- type: topic
id: dosubtraction
transforms:
- operator: filter-map
dependencies:
- name: regex
version: "1"
run: |
fn do_substraction(input: String) -> Result<Option<String> > {
let re = regex::Regex::new(r"^(\d+)-(\d+)=$").unwrap();
if let Some(num) = re.captures(&input) {
let a: i32 = num.get(1).unwrap().as_str().parse().unwrap();
let b: i32 = num.get(2).unwrap().as_str().parse().unwrap();
return Ok(Some(format!("{}{}",input,(a-b))));
} else{
return Ok(None);
}
}

To run example:

$ sdf run --ephemeral

Produce sentences to in sentence topic:

$ echo "Hello world" | fluvio produce sentences
$ echo "9999+1=" | fluvio produce sentences
$ echo "9999-1=" | fluvio produce sentences

Consume topic doaddition to retrieve the result in another terminal:

$ fluvio consume doaddition -Bd
9999+1=10000

Consume the other topic dosubtraction

$ fluvio consume dosubtraction -Bd
9999-1=9998

We can see the first entry Hello World is discarded, but the other two are sent to the right topic with the respective mapping calculation done.

Cleanup

Exit sdf terminal and clean-up. The --force flag removes the topics:

$ sdf clean --force

Conclusion

In this example, we covered how to use split traffic with the filter map operator.