Skip to main content
Version: 0.11.11

HTTP Connector

The HTTP sink connector reads records from data streaming and generates an HTTP request.

Supports HTTP/1.0, HTTP/1.1, HTTP/2.0 protocols.

Configuration

HTTP Sink is configured using a YAML file:

# sample-config.yaml
apiVersion: 0.1.0
meta:
  version: x.y.z
  name: my-http-sink
  type: http-sink
  topic: http-sink-topic
  secrets:
    - name: HTTP_TOKEN
http:
  endpoint: "http://127.0.0.1/post"
  headers:
    - "Authorization: token ${{ secrets.HTTP_TOKEN }}"
    - "Cache-Control: no-cache"
Optiondefaulttypedescription
methodPOSTStringPOST, PUT
endpoint-StringHTTP URL endpoint
headers-Array<String>Request header(s) "Key:Value" pairs
user-agentfluvio/http-sink 0.2.2StringRequest user-agent
http_request_timeout1sStringHTTP Request Timeout
http_connect_timeout15sStringHTTP Connect Timeout

By default HTTP headers will use Content-Type: text/html unless anothed value is provided to the Headers configuration.

Usage

Login to your Fluvio Cloud Account via Fluvio CLI

fluvio cloud login --use-oauth2
info

All versions are marked with x.y.z. To find the latest version, run:

  • fluvio hub connector list
  • fluvio hub smartmodule list

Then configure the HTTP Request to be sent using a YAML file following the connector configuration schema. The following configuration will send a POST HTTP request to http://httpbin.org/post.

# config.yaml
apiVersion: 0.1.0
meta:
  version: x.y.z
  name: httpbin
  type: http-sink
  topic: httpbin-send-post
http:
  endpoint: http://httpbin.org/post
  interval: 3s

Finally create your connector by running:

fluvio cloud connector create --config ./config.yaml

You can see active connectors by running the following command:

fluvio cloud connector list

Check connector logs by running

fluvio cloud connector logs httpbin
INFO connect:connect_with_config:connect: fluvio_socket::versioned: connect to socket add=fluvio-sc-public:9003
INFO dispatcher_loop{self=MultiplexDisp(10)}: fluvio_socket::multiplexing: multiplexer terminated
2023-05-02T20:59:50.192104Z  INFO stream_with_config:inner_stream_batches_with_config:request_stream{offset=Offset { inner: FromEnd(0) }}:create_serial_socket:create_serial_socket_from_leader{leader_id=0}:connect_to_leader{leader=0}:connect: fluvio_socket::versioned: connect to socket add=fluvio-spu-main-0.acct-584fd564-1d4a-4308-9061-09acea387bea.svc.cluster.local:9005
INFO fluvio_connector_common::monitoring: using metric path: /fluvio_metrics/connector.sock
INFO fluvio_connector_common::monitoring: monitoring started

Produce Records to send as HTTP POST Requests

You can produce records using fluvio produce <TOPIC>, values produced will be sent as HTTP Body payloads on HTTP Sink Connector.

Running the following command will attach stdin to the topic stream, any data written to stdin will be sent as a record through the httpbin-send-post topic, and as a side effect of the HTTP Sink Connector, these records will also be sent as HTTP POST requests to http://httpbin.org/post, based on our configuration.

fluvio produce httpbin-send-post

Then send data:

> {\"hello\": \"world\"}
Ok!

Teardown

To stop your connector just use fluvio cloud connector delete <NAME>

fluvio cloud connector delete httpbin

httpbin is our connector instance name from the configuration file shown above

Transformations

Fluvio HTTP Sink Connector supports Transformations. Records can be modified before sending to endpoint.

The previous example can be extended to add extra transformations to outgoing records:

# sample-config.yaml
apiVersion: 0.1.0
meta:
  version: x.y.z
  name: my-http-sink
  type: http-sink
  topic: http-sink-topic
  secrets:
    - name: AUTHORIZATION_TOKEN
http:
  endpoint: "http://127.0.0.1/post"
  headers:
    - "Authorization: token ${{ secrets.AUTHORIZATION_TOKEN }}"
    - "Content-Type: application/json"
transforms:
  - uses: infinyon/jolt@x.y.z
    with:
      spec:
        - operation: shift
          spec:
            "result": "text"

In this case, additional transformation will be performed before records are sent the http endpoint. A json field called result will be renamed to text.

Read more about JSON to JSON transformations.

Offset Management

Fluvio Consumer Offset feature allows for a connector to store the offset in the Fluvio cluster and use it on restart. To activate it, you need to provide the consumer name and set the strategy: auto. See the example below:

apiVersion: 0.1.0
meta:
  version: x.y.z
  name: my-http-sink
  type: http-sink
  topic: http-sink-topic
  consumer:
    id: my-http-sink
    offset:
      strategy: auto
      start: beginning
      flush-period:
        secs: 10
        nanos: 0
http:
  endpoint: "http://127.0.0.1/post"

After the connector processed any records, you can check the last stored offset value via:

$ fluvio consumer list
  CONSUMER      TOPIC            PARTITION  OFFSET  LAST SEEN
  my-http-sink  http-sink-topic  0          0       3s

Read more about Fluvio Offset.

Contributing

Follow on the conventional CONTRIBUTING.md file to setup your environment and contribute to this project.