State CLI SQL access
This tutorial is a continuation from the previous state example. This tutorial shows how to use SQL interface through CLI to access state data.
Prerequisites
This guide uses local
Fluvio cluster. If you need to install it, please follow the instructions at here.
Dataflow
Overview
From previous examples we learned how to create and manipulate a state. Here we will do the same but also will use CLI interface to access it.
Collect-sensor-data
1. Define the state
For this state, we will track the latitude
, longitude
, sensor_status
and more parameters from the sensors.
states:
vehicle-data:
type: keyed-state
properties:
key:
type: string
value:
type: arrow-row
properties:
latitude:
type: f64
longitude:
type: f64
fuel_consumption:
type: u32
sensor_status:
type: string
engine_temperature:
type: i32
Here, the key
is a string but the value is stored as an arrow-row
which can contain multiple properties
(acts like columns).
2. Assign key
We will access the id from the sensor data to partition the state.
partition:
assign-key:
run: |
fn key_by_id(data: VehicleDataType) -> Result<String> {
Ok(data.vehicle_id)
}
update-state:
(...)
3. Updating State
To update the state in an arrow-row
, we need to update the individual row's columns manual and call an update()
.
partition:
assign-key:
(...)
update-state:
run: |
fn update_temperature(data: VehicleDataType) -> Result<()> {
let mut vd = vehicle_data();
vd.latitude = data.latitude;
vd.longitude = data.longitude;
vd.fuel_consumption = data.fuel_consumption;
vd.engine_temperature = data.engine_temperature;
vd.sensor_status = data.sensor_status;
vd.update()?;
Ok(())
}
States are terminal so no other action will be run.
In this example there is not other service consuming the state, we will use the SQL interface to access it from CLI.
Running the Example
Full Code
Copy and paste following config and save it as dataflow.yaml
.
apiVersion: 0.5.0
meta:
name: sql-example
version: 0.1.0
namespace: examples
config:
converter: json
types:
vehicle-data-type:
type: object
properties:
vehicle_id:
type: string
latitude:
type: f64
longitude:
type: f64
sensor_status:
type: string
fuel_consumption:
type: u32
engine_rpm:
type: u32
engine_temperature:
type: i32
speed:
type: float32
topics:
vehicle-sensor:
schema:
value:
type: vehicle-data-type
services:
collect-sensor-data:
sources:
- type: topic
id: vehicle-sensor
states:
vehicle-data:
type: keyed-state
properties:
key:
type: string
value:
type: arrow-row
properties:
latitude:
type: f64
longitude:
type: f64
fuel_consumption:
type: u32
sensor_status:
type: string
engine_temperature:
type: i32
partition:
assign-key:
run: |
fn key_by_id(data: VehicleDataType) -> Result<String> {
Ok(data.vehicle_id)
}
update-state:
run: |
fn update_temperature(data: VehicleDataType) -> Result<()> {
let mut vd = vehicle_data();
vd.latitude = data.latitude;
vd.longitude = data.longitude;
vd.fuel_consumption = data.fuel_consumption;
vd.engine_temperature = data.engine_temperature;
vd.sensor_status = data.sensor_status;
vd.update()?;
Ok(())
}
Running SDF
To run example:
$ sdf run
Produce data
We will produce some data to mimic sensors behavior.
$ echo '{ "timestamp": "2023-11-22T12:34:56Z", "vehicle_id": "V001", "latitude": 40.7128, "longitude": -74.0060, "speed": 60, "engine_temperature": 90, "engine_rpm": 2000, "fuel_consumption": 10, "sensor_status": "ok" }
' | fluvio produce vehicle-sensor
$ echo '{ "timestamp": "2023-11-22T12:35:01Z", "vehicle_id": "V002", "latitude": 34.0522, "longitude": -118.2437, "speed": 30, "engine_temperature": 85, "engine_rpm": 1500, "fuel_consumption": 8, "sensor_status": "failed"}
' | fluvio produce vehicle-sensor
Enter SQL Mode
In the SDF interactive shell use the sql
command to enter the SQL Mode:
>> sql
SDF SQL version sdf-beta5
Type .help for help.
sql >>
Run queries on the SQL mode
In the SQL mode we will be able to access the dataframe states of the dataflow.
We can list the tables available with:
sql >> show tables
shape: (1, 1)
┌──────────────┐
│ name │
│ --- │
│ str │
╞══════════════╡
│ vehicle_data │
└──────────────┘
We can also perform normal sql queries:
select * from vehicle_data
shape: (2, 6)
┌──────┬────────────────────┬──────────────────┬──────────┬───────────┬───────────────┐
│ _key ┆ engine_temperature ┆ fuel_consumption ┆ latitude ┆ longitude ┆ sensor_status │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ i32 ┆ u32 ┆ f64 ┆ f64 ┆ str │
╞══════╪════════════════════╪══════════════════╪══════════╪═══════════╪═══════════════╡
│ V001 ┆ 90 ┆ 10 ┆ 40.7128 ┆ -74.006 ┆ ok │
│ V002 ┆ 85 ┆ 8 ┆ 34.0522 ┆ -118.2437 ┆ failed │
└──────┴────────────────────┴──────────────────┴──────────┴───────────┴───────────────┘
sql >> select * from vehicle_data where sensor_status = 'failed'
shape: (1, 6)
┌──────┬────────────────────┬──────────────────┬──────────┬───────────┬───────────────┐
│ _key ┆ engine_temperature ┆ fuel_consumption ┆ latitude ┆ longitude ┆ sensor_status │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ i32 ┆ u32 ┆ f64 ┆ f64 ┆ str │
╞══════╪════════════════════╪══════════════════╪══════════╪═══════════╪═══════════════╡
│ V002 ┆ 85 ┆ 8 ┆ 34.0522 ┆ -118.2437 ┆ failed │
└──────┴────────────────────┴──────────────────┴──────────┴───────────┴───────────────┘
Exit the SQL mode
Use .quit
or .exit
to exit the SQL mode.
sql >> .quit
Cleanup
Exit sdf
terminal and clean-up. The --force
flag removes the topics:
$ sdf clean --force
Conclusion
We just implement example accessing arrow states through SQL interface. The following link contains another example using the data from multiple states to perform a JOIN Query