Skip to main content
Version: 0.11.12 (stable)

Topic -> SQL Sink

This tutorial is part of the HTTP website to SQL database series:

This guide uses local Fluvio cluster. If you need to install it, please follow the instructions here!

We will be using Postgres database, You can download it and set up from PostgreSQL website for your OS. Alternatively use a cloud service like ElephantSQL.

Introduction

In previous tutorials, we have seen how to read data from external sources and write it to a Fluvio topic. In this tutorial, we will go through how to sink data from a Fluvio topic to external sink such as a database.

We will use sink type of connectors. All sink connectors consume data from luvio topic and write it to an external system. Particularly, we will use the SQL Sink Connector which can write to a PostgreSQL or SQLite database.

Since this is targeted to SQL database, configuration will be concern with mapping the JSON data to SQL columns. Sink Connector will perform these steps:

  • Read data from the topic
  • Transform the data to SQL insert statement.
  • Send the SQL insert statement to the database.

SQL transformation will be done using SmartModule which allow you to plug-in different transformation logic if needed.

We will be using topics from first tutorial [Streaming from HTTP Source] which stream data to cat-facts topic. Please run that tutorial first to set up the topic.

As in previous tutorials, we will use cdk to manage the connectors. Run following command to download the connector from the Hub.

$ cdk hub download infinyon/sql-sink@0.4.3

Then download SQL SmartModule from the Hub.

$ fluvio hub sm download infinyon/json-sql@0.2.1

Then you should see two smartmodules downloaded assuming you have already downloaded the jolt SmartModule from previous tutorial.

$ fluvio sm list
SMARTMODULE SIZE
infinyon/json-sql@0.2.1 559.6 KB
infinyon/jolt@0.4.1 589.3 KB

Sink Connector configuration

Copy and paste following config and save it as sql-cat-fact.yaml.

# sql.yaml
apiVersion: 0.1.0
meta:
name: simple-cat-facts-sql
type: sql-sink
version: 0.4.3
topic: cat-facts
sql:
url: "postgres://user:password@db.postgreshost.example/dbname"
transforms:
- uses: infinyon/json-sql@0.2.1
invoke: insert
with:
mapping:
table: "animalfacts"
map-columns:
"length":
json-key: "length"
value:
type: "int"
default: "0"
required: true
"raw_fact_json":
json-key: "$"
value:
type: "jsonb"
required: true

This configuration will read data from cat-facts topic and insert into animalfacts table in the database. The json-sql SmartModule will transform the JSON data into SQL insert statement.

Please change line containing url to your database connection string.

SQL Mapping

The SmartModule json-sql implements a domain specific language (DSL) to specify a transformation of input JSON to SQL insert statement. It uses model similar to Django Model where SQL tables are abstract into a model. The model is then used to generate SQL insert statement.

The mapping is designed for translation JSON into SQL. Each column of the table is mapped from a JSON expression.

For example, here is mapping for length column:

  "length":
json-key: "length"
value:
type: "int"
default: "0"
required: true

This mapping will take length field from JSON and insert into length column in the table. If length field is not found, it will use default value of 0.

Setup the Database

In order to run the connector, you need to create a table in your database. Run following SQL command in postgres CLI:

# create table animalfacts(length integer, raw_fact_json jsonb);

You can confirm table is created:

# select * from animalfacts;
length | raw_fact_json
--------+---------------
(0 rows)

Once you have the config file, you can create the connector using the cdk deploy start command.

$ cdk deploy start --ipkg infinyon-sql-sink-0.4.3.ipkg --config ./sql-cat-fact.yaml

You can use cdk deploy list to view the status of the connector.

$ cdk deploy list
NAME STATUS
simple-cat-facts-sql Running

Generate data and Check the result

Fluvio topic allow you to decouple the data source from the data sink. This means both source and sink can be run independently without affecting each other. You can run the source connector to generate data but it is not required for this demo.

Here, we will manually produce same data from previous tutorial to the cat-facts topic. This way we can control the data and see how it is sinked to the database. By default, sink connector will consume the data from the end of topic which means it will ignore exiting data in the topic.

Let's produce a single record to the topic.

$ fluvio produce cat-facts
{"fact":"A cat’s jaw can’t move sideways, so a cat can’t chew large chunks of food.","length":74}
Ok!

Then you can query the database to see the record.

# select * from animalfacts;
length | raw_fact_json
--------+------------------------------------------------------------------------------------------------------
74 | {"fact": "A cat’s jaw can’t move sideways, so a cat can’t chew large chunks of food.", "length": 74}
(1 row)

You can add more records to the topic and see how SQL connector is inserting the data into the database.

$ fluvio produce cat-facts
{"fact":"Unlike humans, cats are usually lefties. Studies indicate that their left paw is typically their dominant paw.","length":110}
Ok!
# select * from animalfacts;
length | raw_fact_json
--------+-------------------------------------------------------------------------------------------------------------------------------------------
74 | {"fact": "A cat’s jaw can’t move sideways, so a cat can’t chew large chunks of food.", "length": 74}
110 | {"fact": "Unlike humans, cats are usually lefties. Studies indicate that their left paw is typically their dominant paw.", "length": 110}
(2 rows)

Clean up

Same in previous tutorials, use cdk deploy shutdown to stop the connector.

Conclusion

This tutorial showed you how to sink data from a Fluvio topic to a SQL database. You can use the same concept to sink data to other databases or systems.

You can combine this tutorial with previous tutorials to create a complete data pipeline from source to sink. This just requires deploying multiple connectors.

With Fluvio's event driven architecture, source and sink can be run independently and doesn't effect each other. You can also chain together multiple sources and sinks to create complex data pipelines.

References