State - Arrow Dataframe
States are a key concept in SDF. They hold a simple value or they can be divided into multiple partitions based on a key. Each partition state can have many different types.
One of the type is arrow-row
. For example, follow snippet define state count_per_word
that store each row into arrow dataframe.
states:
count-per-word:
type: keyed-state
properties:
key:
type: string
value:
type: arrow-row
properties:
count:
type: u32
In here, we are defining a state count_per_word
to track frequency of the words. For each key, we have a row that has a column count
that store the count of the word.
For example, let's we have following word: apple
, orange
, banana
, orange
, grape
, orange
, banana
.
Then this will be mapped to arrow dataframe as follows:
_key | count |
---|---|
apple | 2 |
orange | 3 |
banana | 2 |
grape | 1 |
To update the state, you can use the update-state
operator as below:
update-state:
run: |
fn count_word(_word: String) -> Result<()> {
let mut state = count_per_word();
state.count += 1;
state.update()?;
Ok(())
}
Note that state value can be access using count_per_word
state function which is automatically injected by SDF builder.
This API is invoked by the update-state
operator, which only returns the value of the partition state.
In the example, count_per_word
represents a row value of the dataframe. If operator sees apple
, it will be first row in the dataframe above.
However, aggregate operators like flush
can access the entire state and perform aggregation across all partitions. In this case, the count_per_word
state function returns the entire DataFrame, not just individual rows. You can then perform DataFrame operations using the SQL API. The snippet below shows how to use SQL to get the 3 most frequent words.
flush:
run: |
fn aggregate_wordcount() -> Result<TopWords> {
let word_counts = count_per_word();
let top3 = word_counts.sql("select * from count_per_word order by count desc limit 3")?;
let rows = top3.rows()?;
let mut top_words = vec![];
let key = top3.key()?;
let count = top3.col("count")?;
while rows.next() {
let word = rows.str(&key)?;
let count = rows.u32(&count)?;
let word_count = WordCount { word, count };
top_words.push(word_count);
}
Ok(top_words)
}
SQL API
For any state that is dataframe, you can use SQL API to perform dataframe operation. SDF uses polar SQL to perform dataframe operation.
The result of the SQL operation is always dataframe. So you can perform multiple SQL operation to get the desired result.
The SQL is executed in the context of the dataframe. And name of the dataframe is state as illustrated below:
let top3 = word_counts.sql("select * from count_per_word order by count desc limit 3")?;