SDF provides a schema validation feature to ensure that the data flowing through the dataflow is in the correct format.
Schema
First step is to define data schema. Schema is defined in the types section of the dataflow. The types can be define as inline or in the package which can be shared across multiple dataflows.
For example, the following is a simple object type representing a person.
types:
person:
type: object
properties:
name:
type: string
weight:
type: u8
To enforce schema, all you have to is to specify the schema in the topic
section. For example, the following is a topic definition with schema:
topics:
persons:
schema:
value:
type: person
Schema can be enforced for both key and value part of the record.
Once defined, it can used to enforce the schema on the data from the source. The enforcement is specific to serialization format. Currently, SDF supports JSON serialization format but it can be extended to other formats in the future.
The serialization format is defined int the configuration section:
config:
converter: json
This will use json
for all topics. But you can override per topic.
Given the schema above, the following JSON object will pass the schema validation:
{
"name": "joe",
"age": 30
}
However, the following JSON object will fail the schema validation:
{
"name": "joe",
"age": "30"
}
The schema validation error will be reported in the operator log. The error message will indicate the field that failed the validation. The failed record will be skipped and the dataflow will continue to process the next record.
For example, with bad user data above, the error message will be:
$ sdf log -f
Error deserializing input value ExpectedUnsigned at character 0
Number of failed records will be also reflected in the internal metrics. The metrics can be accessed via the sdf show state <operator>/metrics
command.
>> show state
Namespace Keys Type
check-adult/user-topic/topic.offset 1 offset
check-adult/age-check/metrics 1 table
request-processing/request/metrics 1 table
request-processing/request/topic.offset 1 offset
check-adult/user-topic/metrics 1 table
>> show state check-adult/user-topic/metrics
Key Window succeeded failed last_error_offset
stats * 4 2 5
>>
The SDF type supports following concepts in the schema:
- primitive types such as string, integer, float, boolean.
- enum types
- composite objects with nested properties
- array or list of objects
Versioning
Inline schema's version is inherited from dataflow version. If you want to version the schema, you can define the schema in the package and then version the package. The versioned package can be then used in the dataflow.
The schema package then can be published to Hub and imported into the dataflow.
Version follows semver syntax. For example, the following is a versioned schema package:
apiVersion: 0.5.0
meta:
name: person-age-validation
version: 0.1.0
namespace: examples
The apiVersion
is the pkg syntax version and version
in the meta
section is the schema version.
Dataflow
Full dataflow is defined as follows:
apiVersion: 0.5.0
meta:
name: person-age-validation
version: 0.1.0
namespace: examples
config:
converter: json
consumer:
default_starting_offset:
value: 0
position: End
types:
user:
type: object
properties:
name:
type: string
age:
type: u8
topics:
user-topic:
name: user
schema:
value:
type: user
message-topic:
name: message
schema:
value:
type: string
services:
check-adult:
sources:
- type: topic
id: user-topic
transforms:
- operator: map
run: |
fn age_check(user: User) -> Result<String> {
if user.age < 18 {
Ok("minor".to_string())
} else {
Ok("adult".to_string())
}
}
sinks:
- type: topic
id: message-topic