KSQL has similar semantics to SQL:html
;
.\
to indicate continuation of a multi-line statement on the next line.'
) inside string literals by using two successive single quotes (''
). For example, to escape 'T'
, write ''T''
.When using KSQL, the following terminology is used.java
A stream is an unbounded sequence of structured data (「facts」). For example, we could have a stream of financial transactions such as 「Alice sent $100 to Bob, then Charlie sent $50 to Bob」. Facts in a stream are immutable, which means new facts can be inserted to a stream, but existing facts can never be updated or deleted. Streams can be created from an Apache Kafka® topic or derived from an existing stream. A stream’s underlying data is durably stored (persisted) within a Kafka topic on the Kafka brokers.sql
A table is a view of a stream, or another table, and represents a collection of evolving facts. For example, we could have a table that contains the latest financial information such as 「Bob’s current account balance is $150」. It is the equivalent of a traditional database table but enriched by streaming semantics such as windowing. Facts in a table are mutable, which means new facts can be inserted to the table, and existing facts can be updated or deleted. Tables can be created from a Kafka topic or derived from existing streams and tables. In both cases, a table’s underlying data is durably stored (persisted) within a Kafka topic on the Kafka brokers.shell
In KSQL 5.0 and higher, you can read nested data, in Avro and JSON formats, by using the STRUCT
type in CREATE STREAM and CREATE TABLE statements. You can use the STRUCT
type in these KSQL statements:express
Use the following syntax to declare nested data:json
STRUCT<FieldName FieldType, ...>
The STRUCT
type requires you to specify a list of fields. For each field, you specify the field name and field type. The field type can be any of the supported KSQL types, including the complex types MAP
, ARRAY
, and STRUCT
.bootstrap
Notewindows
Properties
is not a valid field name.api
Here’s an example CREATE STREAM statement that uses a STRUCT
to encapsulate a street address and a postal code:bash
CREATE STREAM orders ( orderId BIGINT, address STRUCT<street VARCHAR, zip INTEGER>) WITH (...);
Access the fields in a STRUCT
by using the dereference operator (->
):
SELECT address->city, address->zip FROM orders;
For more info, see Operators.
Note
You can’t create new nested STRUCT
data as the result of a query, but you can copy existing STRUCT
fields as-is.
The following list shows valid time units for the SIZE, ADVANCE BY, SESSION, and WITHIN clauses.
For more information, see Windows in KSQL Queries.
Time-based operations, like windowing, process records according to the timestamp in ROWTIME
. By default, the implicit ROWTIME
column is the timestamp of a message in a Kafka topic. Timestamps have an accuracy of one millisecond.
Use the TIMESTAMP property to override ROWTIME
with the contents of the specified column. Define the format of a record’s timestamp by using the TIMESTAMP_FORMAT property.
If you use the TIMESTAMP property but don’t set TIMESTAMP_FORMAT, KSQL assumes that the timestamp field is a bigint
. If you set TIMESTAMP_FORMAT, the TIMESTAMP field must be of type varchar
and have a format that the DateTimeFormatter
Java class can parse.
If your timestamp format has embedded single quotes, you can escape them by using two successive single quotes, ''
. For example, to escape 'T'
, write ''T''
. The following examples show how to escape the '
character in KSQL statements.
-- Example timestamp format: yyyy-MM-dd'T'HH:mm:ssX CREATE STREAM TEST (ID bigint, event_timestamp VARCHAR) \ WITH (kafka_topic='test_topic', \ value_format='JSON', \ timestamp='event_timestamp', \ timestamp_format='yyyy-MM-dd''T''HH:mm:ssX'); -- Example timestamp format: yyyy.MM.dd G 'at' HH:mm:ss z CREATE STREAM TEST (ID bigint, event_timestamp VARCHAR) \ WITH (kafka_topic='test_topic', \ value_format='JSON', \ timestamp='event_timestamp', \ timestamp_format='yyyy.MM.dd G ''at'' HH:mm:ss z'); -- Example timestamp format: hh 'o'clock' a, zzzz CREATE STREAM TEST (ID bigint, event_timestamp VARCHAR) \ WITH (kafka_topic='test_topic', \ value_format='JSON', \ timestamp='event_timestamp', \ timestamp_format='hh ''o''clock'' a, zzzz');
For more information on timestamp formats, see DateTimeFormatter.
The KSQL CLI commands can be run after starting the KSQL CLI. You can view the KSQL CLI help by running <path-to-confluent>/bin/ksql --help
.
Tip: You can search and browse your command history in the KSQL CLI with Ctrl-R
. After pressing Ctrl-R
, start typing the command or any part of the command to show an auto-complete of past commands.
NAME
ksql - KSQL CLI SYNOPSIS ksql [ --config-file <configFile> ] [ {-h | --help} ] [ --output <outputFormat> ] [ --query-row-limit <streamedQueryRowLimit> ] [ --query-timeout <streamedQueryTimeoutMs> ] [--] <server> OPTIONS --config-file <configFile> A file specifying configs for Ksql and its underlying Kafka Streams instance(s). Refer to KSQL documentation for a list of available configs. -h, --help Display help information --output <outputFormat> The output format to use (either 'JSON' or 'TABULAR'; can be changed during REPL as well; defaults to TABULAR) --query-row-limit <streamedQueryRowLimit> An optional maximum number of rows to read from streamed queries This options value must fall in the following range: value >= 1 --query-timeout <streamedQueryTimeoutMs> An optional time limit (in milliseconds) for streamed queries This options value must fall in the following range: value >= 1 -- This option can be used to separate command-line options from the list of arguments (useful when arguments might be mistaken for command-line options) <server> The address of the Ksql server to connect to (ex: http://confluent.io:9098