Skip to main content
Version: latest

State CLI SQL access

This tutorial is a continuation from the previous state example. This tutorial shows how to use SQL interface through CLI to access state data.

Prerequisites

This guide uses local Fluvio cluster. If you need to install it, please follow the instructions at here.

Dataflow

Overview

From previous examples we learned how to create and manipulate a state. Here we will do the same but also will use CLI interface to access it.

Collect-sensor-data

1. Define the state

For this state, we will track the latitude, longitude, sensor_status and more parameters from the sensors.

    states:
      vehicle-data:
        type: keyed-state
        properties:
          key:
            type: string
          value:
            type: arrow-row
            properties:
              latitude:
                type: f64
              longitude:
                type: f64
              fuel_consumption:
                type: u32
              sensor_status:
                type: string
              engine_temperature:
                type: i32

Here, the key is a string but the value is stored as an arrow-row which can contain multiple properties(acts like columns).

2. Assign key

We will access the id from the sensor data to partition the state.

partition:
  assign-key:
    run: |
        fn key_by_id(data: VehicleDataType) -> Result<String> {
          Ok(data.vehicle_id)
        }
  update-state:
    (...)

3. Updating State

To update the state in an arrow-row, we need to update the individual row's columns manual and call an update().

partition:
  assign-key:
    (...)
  update-state:
    run: |
        fn update_temperature(data: VehicleDataType) -> Result<()> {
            let mut vd = vehicle_data();

            vd.latitude = data.latitude;
            vd.longitude = data.longitude;
            vd.fuel_consumption = data.fuel_consumption;
            vd.engine_temperature = data.engine_temperature;
            vd.sensor_status = data.sensor_status;
            vd.update()?;
            Ok(())
          }

States are terminal so no other action will be run.

In this example there is not other service consuming the state, we will use the SQL interface to access it from CLI.

Running the Example

Full Code

Copy and paste following config and save it as dataflow.yaml.

apiVersion: 0.5.0
meta:
  name: sql-example
  version: 0.1.0
  namespace: examples

config:
  converter: json

types:
  vehicle-data-type:
    type: object
    properties:
      vehicle_id:
        type: string
      latitude:
        type: f64
      longitude:
        type: f64
      sensor_status:
        type: string
      fuel_consumption:
        type: u32
      engine_rpm:
        type: u32
      engine_temperature:
        type: i32
      speed:
        type: float32

topics:
  vehicle-sensor:
    schema:
      value:
        type: vehicle-data-type
services:
  collect-sensor-data:
    sources:
      - type: topic
        id: vehicle-sensor
    states:
      vehicle-data:
        type: keyed-state
        properties:
          key:
            type: string
          value:
            type: arrow-row
            properties:
              latitude:
                type: f64
              longitude:
                type: f64
              fuel_consumption:
                type: u32
              sensor_status:
                type: string
              engine_temperature:
                type: i32

    partition:
      assign-key:
        run: |
          fn key_by_id(data: VehicleDataType) -> Result<String> {
            Ok(data.vehicle_id)
          }

      update-state:
        run: |
          fn update_temperature(data: VehicleDataType) -> Result<()> {
            let mut vd = vehicle_data();

            vd.latitude = data.latitude;
            vd.longitude = data.longitude;
            vd.fuel_consumption = data.fuel_consumption;
            vd.engine_temperature = data.engine_temperature;
            vd.sensor_status = data.sensor_status;
            vd.update()?;
            Ok(())
          }

Running SDF

To run example:

$ sdf run

Produce data

We will produce some data to mimic sensors behavior.

$ echo '{ "timestamp": "2023-11-22T12:34:56Z", "vehicle_id": "V001", "latitude": 40.7128, "longitude": -74.0060, "speed": 60, "engine_temperature": 90, "engine_rpm": 2000, "fuel_consumption": 10, "sensor_status": "ok" }
' | fluvio produce vehicle-sensor
$ echo '{ "timestamp": "2023-11-22T12:35:01Z", "vehicle_id": "V002", "latitude": 34.0522, "longitude": -118.2437, "speed": 30, "engine_temperature": 85, "engine_rpm": 1500, "fuel_consumption": 8, "sensor_status": "failed"}
' | fluvio produce vehicle-sensor

Enter SQL Mode

In the SDF interactive shell use the sql command to enter the SQL Mode:

>> sql
SDF SQL version sdf-beta5
Type .help for help.
sql >> 

Run queries on the SQL mode

In the SQL mode we will be able to access the dataframe states of the dataflow.

We can list the tables available with:

sql >> show tables
shape: (1, 1)
┌──────────────┐
│ name         │
│ ---          │
│ str          │
╞══════════════╡
│ vehicle_data │
└──────────────┘

We can also perform normal sql queries:

select * from vehicle_data
shape: (2, 6)
┌──────┬────────────────────┬──────────────────┬──────────┬───────────┬───────────────┐
│ _key ┆ engine_temperature ┆ fuel_consumption ┆ latitude ┆ longitude ┆ sensor_status │
│ ---  ┆ ---                ┆ ---              ┆ ---      ┆ ---       ┆ ---           │
│ str  ┆ i32                ┆ u32              ┆ f64      ┆ f64       ┆ str           │
╞══════╪════════════════════╪══════════════════╪══════════╪═══════════╪═══════════════╡
│ V001 ┆ 90                 ┆ 10               ┆ 40.7128  ┆ -74.006   ┆ ok            │
│ V002 ┆ 85                 ┆ 8                ┆ 34.0522  ┆ -118.2437 ┆ failed        │
└──────┴────────────────────┴──────────────────┴──────────┴───────────┴───────────────┘

sql >> select * from vehicle_data where sensor_status = 'failed'
shape: (1, 6)
┌──────┬────────────────────┬──────────────────┬──────────┬───────────┬───────────────┐
│ _key ┆ engine_temperature ┆ fuel_consumption ┆ latitude ┆ longitude ┆ sensor_status │
│ ---  ┆ ---                ┆ ---              ┆ ---      ┆ ---       ┆ ---           │
│ str  ┆ i32                ┆ u32              ┆ f64      ┆ f64       ┆ str           │
╞══════╪════════════════════╪══════════════════╪══════════╪═══════════╪═══════════════╡
│ V002 ┆ 85                 ┆ 8                ┆ 34.0522  ┆ -118.2437 ┆ failed        │
└──────┴────────────────────┴──────────────────┴──────────┴───────────┴───────────────┘

Exit the SQL mode

Use .quit or .exit to exit the SQL mode.

sql >> .quit

Cleanup

Exit sdf terminal and clean-up. The --force flag removes the topics:

$ sdf clean --force

Conclusion

We just implement example accessing arrow states through SQL interface. The following link contains another example using the data from multiple states to perform a JOIN Query

  1. Join Example