Skip to main content
Version: latest

Operators

Operators are APIs that enable developers to customize their data applications. The operators have opinionated function signatures and flexible types. Some operators can be used independently, whereas others must be chained to accomplish the task.

Summary

The engine implements transforms operators chained in an array inside a transforms section and named operators following a hierarchical convention.

The transforms operators are:

The named operators are:

The engine also uses keyword operators to identify a multi-step operation:

Let's take a look at each operator in detail.

Transforms Operators

The transforms section is an array that allows multiple operators to be chained and perform composite operations.

map

The map operator takes an input record, applies a transformation, and forwards the modified record to the next operator.

For example, given an user record, you can mask their social security number with the following map operation:

transforms:
- operator: map
run: |
fn mask_ssn(user: User) -> Result<User> {
let mut u = user.clone();
u.ssn = u.ssn.replace(|c: char| c.is_digit(10), "*");
Ok(u)
}

Check out map example in github for a working demo.

filter

The filter operator takes a record and returns a boolean that tells the engine whether to drop it or forward it to the next operator:

  • true - forwards the record to the next operator.
  • false - drops the record.

While in most cases, one operator's output becomes the next operator's input; the filter operation works differently. The operator returns a boolean to control the state, and the engine then sends the record to the next operator.

The following example defines a filter that drops all sentences without a question mark:

transforms:
- operator: filter
run: |
fn filter_questions(input: String) -> Result<bool> {
Ok(input.contains("?"))
}

Check out filter example in github for a working demo.

filter-map

The filter-map operator combines a filter and a map operation; it takes a record and returns a modified record or none. The return value tells the engine to forward the records to the next operator or to drop them:

  • Some(Record) - forwards the record to the next operator.
  • None - drops the record.

The following example defines a filter-map that transforms sentences longer than ten characters to uppercase and drops all others:

transforms:
- operator: filter-map
run: |
fn long_sentence_to_uppercase(input: String) -> Result<Option<String>> {
if input.len() > 10 {
Ok(Some(input.to_uppercase()))
} else {
Ok(None)
}
}

Check out filter-map example in github for a working demo.

flat-map

The flat-map operator takes an aggregate record and returns an array of records. The engine reads the array and sends individual records, one at a time, to the next operator.

The following example defines a flat-map that splits sentences into words:

transforms:
- operator: flat-map
run: |
fn split_sentence(sentence: String) -> Result<Vec<String>> {
Ok(sentence.split_whitespace().map(String::from).collect())
}

Check out flat-map example in github for a working demo.

Named Operators

Named operators are pre-pended by a keyword that identifies the operator's function. The keywords are defined in the next section.

assign-timestamp

The assign-timestamp operator lets you choose a timestamp for window processing. Timestamps help the engine distributed records to windows. The timestamp may be defined from the record metadata or from the record value.

Assign timestamp from record metadata

The following example shows how to update the timestamp from the record metadata, passed down through the event_time field:

assign-timestamp:
run: |
fn assign_timestamp(_sentence: String, event_time: i64) -> Result<i64> {
Ok(event_time)
}

Check out word-counter example in github for a working demo.

Assign timestamp from record value

Assuming an user_event record has a timestamp field, the following example shows how to update the timestamp from the record value:

assign-timestamp:
run: |
fn assign_timestamp(user_event: UserEvent, _event_time: i64) -> Result<i64> {
Ok(user_event.timestamp)
}

The assign_timestamp operation is mandatory in window processing

assign-key

The assign-key operation assigns a key to the record metadata. The engine uses the key to determine which records to group in the same partition.

The following example shows how the assign-key operator groups cars by their color:

assign-key:
run: |
fn key_by_color(car: Car) -> Result<String> {
Ok(car.color)
}

Check out car-count example in github for a working demo.

update-state

The update-state operator informs engine that a state object update is imminent, and this is the last step for the service.

The following example shows how the update-state operator updates the state object temperature() with the latest temperature value:

update-state:
run: |
fn update_temperature(event: Temp) -> Result<()> {
let mut temp = temperature();
temp.sensor = event.sensor;
temp.temperature = event.temperature;
temp.update()?;
Ok(())
}

Check out update-state example in github for a working demo.

flush

The flush operator is the final stage of a window processing service. The engine invokes flush after the watermark closes the window. The flush API is expected to return a value passed to the sink.

Assuming a window operator that generates a state object that counts car colors, the following example shows how the flush operator returns a list of colors and their counts:

flush:
run: |
fn get_car_color_count() -> Result<CarColors> {
let cc = count_by_color().clone();
Ok(cc.into_iter().map(|(color, count)|
Color {
color,
count
}
).collect())
}

Check out car-count example in github for a working demo.

Keyword Operators

Keyword operators wrap a hierarchy of operation under its name. The keyword operators are: partition and window.

partition

The partition operation divides the stream into data sets that are processed in parallel. Each partition will have its own state. The partition operation has the following sub-sections:

Assign-key is mandatory, whereas transforms and update-state are optional, but at least one of them is required.

The following example shows how the partition operation divides the stream into words and updates the state object count_per_word() with the latest word count:

partition:
assign-key:
run: |
fn assign_word_key(word: String) -> Result<String> {
Ok(word.to_lowercase().chars().filter(|c| c.is_alphanumeric()).collect())
}

update-state:
run: |
fn increment_word_count(word: String) -> Result<()> {
count_per_word().increment(1);
Ok(())
}

Check out word-counter example in github for a working demo.

window

The window operation turns streams of records into finite groups of records captured based on the window size. The window operation is a multi-state operation with the following sub-section:

While there are several types of windows, and the engine will eventually implement all of them, the first version supports tumbling window. A tumbling window is a window that is continuous and non-overlapping.

Building on the example above, a tumbling window of 20 seconds that computes the top 3 words in a stream of sentences has the following hierarchy:

window:
tumbling:
duration: 20s

assign-timestamp:
run: |
fn assign_event_timestamp(_value: String, event_time: i64) -> Result<i64> {
Ok(event_time)
}

partition:
assign-key:
run: |
fn assign_word_key(word: String) -> Result<String> {
Ok(word.to_lowercase().chars().filter(|c| c.is_alphanumeric()).collect())
}

update-state:
run: |
fn increment_word_count(word: String) -> Result<()> {
count_per_word().increment(1);
Ok(())
}

flush:
run: |
fn compute_most_used_words() -> Result<TopWords> {
let word_counts = count_per_word();
let top3 = word_counts.sql("select * from count_per_word order by count desc limit 3")?;
let rows = top3.rows()?;
let columns = top3.schema(["_key","count"])?;
let mut top_words = vec![];
match &columns[..] {
[key,value] => {
while rows.next() {
let word = rows.str(&key)?;
let count = rows.u32(&value)?;
let word_count = WordCount { word, count };
top_words.push(word_count);
}
},
_ => return Err(anyhow::anyhow!("unexpected schema")),
}
Ok(top_words)
}

First section is the window properties, followed by partitioning, and finally the flush.

Check out word-counter example in github for a working demo.

References