
fluvio consume

The fluvio consume command is a way to read the contents of records in a Fluvio topic from a command-line environment.

The consume command will only read from one of those partitions, defaulting to partition 0.

Read messages from a topic/partition

Usage: fluvio consume [OPTIONS] <topic>

          Topic name

  -p, --partition <integer>
          Partition id

  -A, --all-partitions
          Consume records from all partitions

  -d, --disable-continuous
          Disable continuous processing of messages

          Disable the progress bar and wait spinner

  -k, --key-value
          Print records in "[key] value" format, with "[null]" for no key

  -F, --format <FORMAT>
          Provide a template string to print records with a custom format. See --help for details.
          Template strings may include the variables {{key}}, {{value}}, {{offset}}, {{partition}} and {{time}} which will have each record's contents substituted in their place. Note that timestamp is displayed using RFC3339, is always UTC and ignores system timezone.
          For example, the following template string:
          Offset {{offset}} has key {{key}} and value {{value}}
          Would produce a printout where records might look like this:
          Offset 0 has key A and value Apple

      --table-format <TABLE_FORMAT>
          Consume records using the formatting rules defined by TableFormat name

  -B, --beginning
          Consume records from the beginning of the log

  -H, --head <integer>
          Consume records starting <integer> from the beginning of the log

  -T, --tail <integer>
          Consume records starting <integer> from the end of the log

      --start <integer>
          The absolute offset of the first record to begin consuming from

      --end <integer>
          Consume records until end offset (inclusive)

  -b, --maxbytes <integer>
          Maximum number of bytes to be retrieved

      --isolation <ISOLATION>
          Isolation level that consumer must respect. Supported values: read_committed (ReadCommitted) - consume only committed records, read_uncommitted (ReadUncommitted) - consume all records accepted by leader

          Suppress items items that have an unknown output type

  -O, --output <type>
          [possible values: dynamic, text, binary, json, raw, table, full-table]

      --smartmodule <SMARTMODULE>
          Name of the smartmodule

      --smartmodule-path <SMARTMODULE_PATH>
          Path to the smart module

      --aggregate-initial <AGGREGATE_INITIAL>
          (Optional) Value to use as an initial accumulator for aggregate SmartModules

  -e, --params <PARAMS>
          (Optional) Extra input parameters passed to the smartmodule. They should be passed using key=value format Eg. fluvio consume topic-name --smartmodule my_filter -e foo=bar -e key=value -e one=1

  -t, --transforms <TRANSFORMS>
          (Optional) Path to a file with transformation specification

      --transforms-line <TRANSFORMS_LINE>
          (Optional) Transformation specification as JSON formatted string. E.g. fluvio consume topic-name --transforms-line='{"uses":"infinyon/jolt@0.1.0","with":{"spec":"[{\"operation\":\"default\",\"spec\":{\"source\":\"test\"}}]"}}'

          Truncate the output to one line

  -c, --consumer <CONSUMER>
          Consumer id

  -h, --help
          Print help (see a summary with '-h')

The following fluvio consume examples come after the fluvio produce examples.




Consume all records

When consuming, we need to specify a starting offset from which to begin reading. We can use the --from-beginning (-B) flag in order to read everything from the very beginning. Here we’ll also use the --disable-continuous (-d) flag in order to exit after all the records have been read:

$ fluvio consume my-topic -B -d
This is my first record ever
This is my second record ever
Alice In Wonderland
Bruce Wayne
Santa Claus

Notice that all the records are printed by value only: the records with keys have not had their keys printed! This is the default behavior of the consumer. To see how to print the keys of key/value records, see the next example!


Consume key/value records

If we want to see both the keys and values of the records in the topic, you can use the --key-value flag:

$ fluvio consume my-topic -dB --key-value
[null] This is my first record ever
[null] This is my second record ever
[alice] Alice In Wonderland
[batman] Bruce Wayne
[santa] Santa Claus

Records that were not given a key are printed with [null].


Consume using a SmartModule

Fluvio SmartModules are WASM modules that can edit the contents of a stream inline, before the records of that stream are delivered to a consumer. One way to use SmartModules is to supply the WASM module with the --smartmodule-path flag to the fluvio consume command.

The simplest SmartModule is the filter example, which filters records from the stream based on whether they contain the letter a or not. You can find the full example code in our GitHub repo and compile it to test out yourself.

Once you have compiled your SmartModule Filter and have a .wasm file for it, you can apply it by sending the binary to the cluster when you start your CLI consumer:

$ fluvio consume my-topic -B --smartmodule-path="fluvio_wasm_filter.wasm"

Alternatively, to avoid sending the SmartModule binary to the cluster with each fluvio consume session, you can have the cluster store it for you:

$ fluvio smartmodule create --wasm-file="fluvio_wasm_filter.wasm" my_filter

Then you can apply the SmartModule by name:

$ fluvio consume my-topic -B --smartmodule="my_filter"

Consume from a topic with multiple partitions

As of today, the Fluvio CLI Consumer can only consume records from a single partition at a time. When running fluvio consume topic-name, the CLI will read records from partition 0 by default. Let’s look at how we can read records from the different partitions in a topic by using the --partition (-p) flag.

Start out by creating a new topic with multiple partitions using fluvio topic create.

$ fluvio topic create consume-multi -p 3

Let’s create a text file with some records we would like to send. Each line of the text file will be treated as one record.

# Put the following records into a text file using your favorite editor
$ cat records.txt

Then, produce the test data to the topic.

$ fluvio produce "consume-multi" -f records.txt

After producing some data, let’s take a look at how the records got distributed among our partitions using fluvio partition list.

$ fluvio partition list
 consume-multi  0          5001    []        Online      3   3    0    []
 consume-multi  1          5001    []        Online      3   3    0    []
 consume-multi  2          5001    []        Online      3   3    0    []

We can see by the high watermark (HW) and log-end-offset (LEO) that 3 records were sent to each partition. Let’s look at how to consume from each partition.

To consume from a specific partition, use the --partition (-p) flag on fluvio consume.

$ fluvio consume "consume-multi" -B --partition 0

To consume from partition 1:

$ fluvio consume "consume-multi" -B --partition 1

And from partition 2:

$ fluvio consume "consume-multi" -B --partition 2

Consume from all partitions

At times, it is useful to see all records from all partitions from a single consumer. Using the example above:

$ fluvio partition list
 consume-multi  0          5001    []        Online      3   3    0    []
 consume-multi  1          5001    []        Online      3   3    0    []
 consume-multi  2          5001    []        Online      3   3    0    []

Each partition has 3 records. Now let’s consume from all partitions:

$ fluvio consume "consume-multi" -B -A           

Print consumed records with custom formatting

Sometimes, the default Consumer printout might not work for your needs. As of Fluvio 0.9.6 you can now use the --format string to describe how the Consumer should print your records!

The format string will replace placeholders such as {{key}}, {{value}}, {{partition}}(added in Fluvio 0.9.9 ), {{offset}} and {{time}} (added in Fluvio 0.9.25) with the actual contents for each record. One possible use for this is formatting each record as a CSV row:

$ fluvio consume my-topic -B --format="{{time}},{{partition}},{{offset}},{{key}},{{value}}"
2022-05-04T15:35:49.244Z,0,0,null,This is my first record ever
2022-05-04T15:35:49.244Z,0,1,null,This is my second record ever
2022-05-04T15:52:19.963Z,0,2,alice,Alice In Wonderland
2022-05-04T15:52:28.875Z,0,3,batman,Bruce Wayne
2022-05-04T15:53:37.099Z,0,4,santa,Santa Claus