ksqlDB
ksqlDB is a purpose-built database for stream processing applications, ingesting data from Apache Kafka .
Available on the Enterprise Premier plan . Contact us for details.
See how you can use ksqlDB and Cube Cloud to power real-time analytics in Power BI:
In this video, the SQL API is used to connect to Power BI. Currently, it’s recommended to use the DAX API.
Prerequisites
- Hostname for the ksqlDB server
- Username and password (or an API key) to connect to ksqlDB server
Confluent Cloud
If you are using Confluent Cloud , you need to generate an API key and use the API key name as your username and the API key secret as your password.
You can generate an API key by installing confluent-cli and running the
following commands in the command line:
brew install --cask confluent-cli
confluent login
confluent environment use <YOUR-ENVIRONMENT-ID>
confluent ksql cluster list
confluent api-key create --resource <YOUR-KSQL-CLUSTER-ID>Setup
Manual
Add the following to a .env file in your Cube project:
CUBEJS_DB_TYPE=ksql
CUBEJS_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443
CUBEJS_DB_USER=username
CUBEJS_DB_PASS=passwordEnvironment Variables
| Environment Variable | Description | Possible Values | Required |
|---|---|---|---|
CUBEJS_DB_URL | The host URL for ksqlDB with port | A valid database host URL | ✅ |
CUBEJS_DB_USER | The username used to connect to the ksqlDB. API key for Confluent Cloud. | A valid database username | ✅ |
CUBEJS_DB_PASS | The password used to connect to the ksqlDB. API secret for Confluent Cloud. | A valid database password | ✅ |
CUBEJS_DB_KAFKA_HOST | Kafka broker host(s) for Kafka streams mode. Multiple brokers can be comma-separated. | A valid Kafka broker URL | ❌ |
CUBEJS_DB_KAFKA_USER | Username for Kafka broker authentication (SASL PLAIN) | A valid Kafka username | ❌ |
CUBEJS_DB_KAFKA_PASS | Password for Kafka broker authentication (SASL PLAIN) | A valid Kafka password | ❌ |
CUBEJS_DB_KAFKA_USE_SSL | If true, enables SASL_SSL for the Kafka connection | true, false | ❌ |
CUBEJS_CONCURRENCY | The number of concurrent queries to the data source | A valid number | ❌ |
Pre-Aggregations Support
ksqlDB supports only streaming pre-aggregations.
Kafka streams mode
By default, Cube connects to ksqlDB via its REST API. ksqlDB uses its REST API both for metadata (discovering tables and streams) and for streaming data into Cube Store during pre-aggregation builds.
In this default mode, Cube may create tables and streams in ksqlDB as part
of the pre-aggregation build process (e.g., CREATE TABLE ... AS SELECT
statements for non-read-only pre-aggregations).
When Kafka streams mode is enabled, Cube reads data directly from the underlying Kafka topics instead of going through the ksqlDB REST API for data streaming. ksqlDB is still used for metadata operations such as discovering tables, streams, and their schemas, but Cube Store subscribes to the backing Kafka topic directly.
In this mode, Cube does not create any tables or streams in ksqlDB. All pre-aggregations use the read-only refresh path: Cube discovers the existing ksqlDB objects and their backing Kafka topics, then streams data directly from Kafka into Cube Store.
When to use Kafka streams mode
Kafka streams mode is useful when:
- You want to prevent Cube from creating any objects in ksqlDB
- You need higher throughput for data ingestion by reading Kafka directly
- Your ksqlDB environment has restricted permissions that don’t allow creating tables or streams
- You prefer Cube Store to consume from Kafka topics without an intermediary
Enabling Kafka streams mode
Set the CUBEJS_DB_KAFKA_HOST environment variable to the address of your
Kafka broker(s). This activates Kafka streams mode automatically:
CUBEJS_DB_TYPE=ksql
CUBEJS_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443
CUBEJS_DB_USER=ksql_username
CUBEJS_DB_PASS=ksql_password
CUBEJS_DB_KAFKA_HOST=pkc-xxxxx.us-west4.gcp.confluent.cloud:9092
CUBEJS_DB_KAFKA_USER=kafka_api_key
CUBEJS_DB_KAFKA_PASS=kafka_api_secret
CUBEJS_DB_KAFKA_USE_SSL=trueMultiple Kafka brokers can be specified as a comma-separated list:
CUBEJS_DB_KAFKA_HOST=broker1:9092,broker2:9092,broker3:9092When using Confluent Cloud ,
the Kafka credentials are separate from the ksqlDB credentials. Generate
an API key for the Kafka cluster (not the ksqlDB cluster) and use it as
CUBEJS_DB_KAFKA_USER and CUBEJS_DB_KAFKA_PASS.
How it works
With Kafka streams mode enabled:
- Cube uses the ksqlDB REST API to discover available tables and streams
and to retrieve their schemas via
DESCRIBE. - For each table or stream, Cube resolves the backing Kafka topic name from the ksqlDB metadata.
- Instead of streaming data through ksqlDB, Cube Store connects directly to the Kafka broker(s) and consumes from the resolved topic.
- Pre-aggregation builds use the read-only refresh strategy. Cube does
not issue any
CREATE TABLEorCREATE STREAMstatements to ksqlDB.
Data modeling
ksqlDB is typically used as an additional data source alongside a primary
data warehouse. To use Kafka streams mode, configure ksqlDB as a named
data source using decorated environment variables
and point your cubes to it with the
data_source property.
First, declare the data sources and configure the ksqlDB connection with Kafka credentials:
CUBEJS_DATASOURCES=default,ksql
CUBEJS_DB_TYPE=postgres
CUBEJS_DB_HOST=my.postgres.host
CUBEJS_DB_NAME=my_database
CUBEJS_DB_USER=postgres_user
CUBEJS_DB_PASS=postgres_password
CUBEJS_DS_KSQL_DB_TYPE=ksql
CUBEJS_DS_KSQL_DB_URL=https://xxxxxx-xxxxx.us-west4.gcp.confluent.cloud:443
CUBEJS_DS_KSQL_DB_USER=ksql_api_key
CUBEJS_DS_KSQL_DB_PASS=ksql_api_secret
CUBEJS_DS_KSQL_DB_KAFKA_HOST=pkc-xxxxx.us-west4.gcp.confluent.cloud:9092
CUBEJS_DS_KSQL_DB_KAFKA_USER=kafka_api_key
CUBEJS_DS_KSQL_DB_KAFKA_PASS=kafka_api_secret
CUBEJS_DS_KSQL_DB_KAFKA_USE_SSL=trueThen, create cubes that reference your data. A common pattern is to combine a batch cube (reading historical data from your warehouse) with a streaming cube (reading real-time data from ksqlDB via Kafka) using a lambda pre-aggregation.
The batch cube queries the warehouse and builds daily partitions
incrementally. The streaming cube points at an existing ksqlDB stream
with data_source: ksql and uses a read-only streaming pre-aggregation
that consumes from the backing Kafka topic directly. The lambda
pre-aggregation in the batch cube merges both, serving historical data
from the warehouse rollup and real-time data from the streaming rollup:
cubes:
- name: order_events
data_source: default
sql: >
SELECT
order_id,
user_id,
status,
amount,
created_at
FROM ecommerce.order_events
WHERE {FILTER_PARAMS.order_events.created_at.filter(
(from, to) =>
`created_at >= ${from} AND created_at < ${to}`
)}
measures:
- name: count
type: count
- name: total_amount
sql: amount
type: sum
- name: failed_count
sql: "CASE WHEN status = 'failed' THEN 1 ELSE 0 END"
type: sum
dimensions:
- name: order_id
sql: order_id
type: string
primary_key: true
- name: user_id
sql: user_id
type: string
- name: status
sql: status
type: string
- name: created_at
sql: created_at
type: time
pre_aggregations:
- name: lambda
type: rollup_lambda
rollups:
- order_events.batch
- order_events_stream.stream
- name: batch
type: rollup
measures:
- CUBE.count
- CUBE.total_amount
- CUBE.failed_count
dimensions:
- CUBE.order_id
- CUBE.user_id
- CUBE.status
time_dimension: CUBE.created_at
granularity: second
partition_granularity: day
build_range_start:
sql: SELECT NOW() - INTERVAL '90 days'
build_range_end:
sql: SELECT NOW()
refresh_key:
every: 8 hour
update_window: 1 day
incremental: true
indexes:
- name: user_status
columns:
- CUBE.user_id
- CUBE.status
- name: order_events_stream
data_source: ksql
sql: "SELECT * FROM ORDER_EVENTS_STREAM"
measures:
- name: count
type: count
- name: total_amount
sql: AMOUNT
type: sum
- name: failed_count
sql: "CASE WHEN STATUS = 'failed' THEN 1 ELSE 0 END"
type: sum
dimensions:
- name: order_id
sql: ORDER_ID
type: string
primary_key: true
- name: user_id
sql: USER_ID
type: string
- name: status
sql: STATUS
type: string
- name: created_at
sql: CREATED_AT
type: time
pre_aggregations:
- name: stream
type: rollup
read_only: true
measures:
- CUBE.count
- CUBE.total_amount
- CUBE.failed_count
dimensions:
- CUBE.order_id
- CUBE.user_id
- CUBE.status
unique_key_columns:
- order_id
time_dimension: CUBE.created_at
granularity: second
partition_granularity: day
build_range_start:
sql: "SELECT date_trunc('day', DATE_SUB(NOW(), INTERVAL '5 hour'))"
build_range_end:
sql: "SELECT DATE_ADD(NOW(), INTERVAL '15 minute')"
refresh_key:
every: 1 minute
update_window: 1 hour
incremental: true
indexes:
- name: user_status
columns:
- CUBE.user_id
- CUBE.status
stream_offset: latestcube("order_events", {
data_source: "default",
sql: `
SELECT
order_id,
user_id,
status,
amount,
created_at
FROM ecommerce.order_events
WHERE ${FILTER_PARAMS.order_events.created_at.filter(
(from, to) => `created_at >= ${from} AND created_at < ${to}`
)}
`,
measures: {
count: {
type: `count`,
},
total_amount: {
sql: `amount`,
type: `sum`,
},
failed_count: {
sql: `CASE WHEN status = 'failed' THEN 1 ELSE 0 END`,
type: `sum`,
},
},
dimensions: {
order_id: {
sql: `order_id`,
type: `string`,
primary_key: true,
},
user_id: {
sql: `user_id`,
type: `string`,
},
status: {
sql: `status`,
type: `string`,
},
created_at: {
sql: `created_at`,
type: `time`,
},
},
pre_aggregations: {
lambda: {
type: `rollup_lambda`,
rollups: [
order_events.batch,
order_events_stream.stream,
],
},
batch: {
type: `rollup`,
measures: [CUBE.count, CUBE.total_amount, CUBE.failed_count],
dimensions: [CUBE.order_id, CUBE.user_id, CUBE.status],
time_dimension: CUBE.created_at,
granularity: `second`,
partition_granularity: `day`,
build_range_start: {
sql: `SELECT NOW() - INTERVAL '90 days'`,
},
build_range_end: {
sql: `SELECT NOW()`,
},
refresh_key: {
every: `8 hour`,
update_window: `1 day`,
incremental: true,
},
indexes: {
user_status: {
columns: [CUBE.user_id, CUBE.status],
},
},
},
},
});
cube("order_events_stream", {
data_source: "ksql",
sql: `SELECT * FROM ORDER_EVENTS_STREAM`,
measures: {
count: {
type: `count`,
},
total_amount: {
sql: `AMOUNT`,
type: `sum`,
},
failed_count: {
sql: `CASE WHEN STATUS = 'failed' THEN 1 ELSE 0 END`,
type: `sum`,
},
},
dimensions: {
order_id: {
sql: `ORDER_ID`,
type: `string`,
primary_key: true,
},
user_id: {
sql: `USER_ID`,
type: `string`,
},
status: {
sql: `STATUS`,
type: `string`,
},
created_at: {
sql: `CREATED_AT`,
type: `time`,
},
},
pre_aggregations: {
stream: {
type: `rollup`,
read_only: true,
measures: [CUBE.count, CUBE.total_amount, CUBE.failed_count],
dimensions: [CUBE.order_id, CUBE.user_id, CUBE.status],
unique_key_columns: [`order_id`],
time_dimension: CUBE.created_at,
granularity: `second`,
partition_granularity: `day`,
build_range_start: {
sql: `SELECT date_trunc('day', DATE_SUB(NOW(), INTERVAL '5 hour'))`,
},
build_range_end: {
sql: `SELECT DATE_ADD(NOW(), INTERVAL '15 minute')`,
},
refresh_key: {
every: `1 minute`,
update_window: `1 hour`,
incremental: true,
},
indexes: {
user_status: {
columns: [CUBE.user_id, CUBE.status],
},
},
stream_offset: `latest`,
},
},
});Key properties for the streaming pre-aggregation:
read_only: true— Cube will not create any objects in ksqlDB. The data is consumed directly from the backing Kafka topic.stream_offset— controls where Cube Store starts consuming from in the Kafka topic. Set to"latest"to only consume new messages arriving after the pre-aggregation is created. Set to"earliest"to replay the topic from the beginning. Defaults to"latest"if not specified. On subsequent refreshes, Cube Store automatically resumes from the last processed offset regardless of this setting.unique_key_columns— columns that uniquely identify a record, used for deduplication (see below).
Primary key and ungrouped queries
For the streaming pre-aggregation to work in read-only mode, the
generated SQL must not contain a GROUP BY clause — Cube Store’s stream
post-processing engine does not support aggregation.
Cube automatically omits the GROUP BY clause when the dimensions
included in the pre-aggregation contain a primary key. In that case, the
generated query becomes a simple SELECT ... FROM ... without grouping,
and measures are passed through as raw expressions rather than
aggregated. This is what makes the pre-aggregation eligible for the
read-only streaming path.
You must include all primary key columns of the cube in the
streaming pre-aggregation’s dimensions list. If any primary key
dimension is missing, the query may not be recognized as ungrouped
and will fail to use the streaming path.
The sql_table or sql value should reference an existing ksqlDB stream
or table. Cube discovers its schema automatically. With Kafka streams
mode enabled, the streaming pre-aggregation reads the backing Kafka topic
directly — no objects are created in ksqlDB.
Unique key columns and deduplication
When unique_key_columns is set, Cube Store appends an internal
sequence column (__seq) to the table, populated from the Kafka
partition offset. The unique key columns together with __seq form the
sort key for all indexes on this table.
Deduplication is not applied at ingestion time — all incoming records are
appended as they arrive. Instead, Cube Store deduplicates during
reads and compaction: rows are sorted by the unique key columns
and then by __seq, and only the last row per unique key (the one
with the highest sequence number) is kept. This means that if the same
key appears multiple times in the stream, the most recent version is
always the one returned by queries.
For Kafka messages, unique key column values can come from either the
message payload (the JSON value) or the message key. If a column
listed in unique_key_columns is missing from the payload, Cube Store
falls back to the Kafka message key: for a single unique key column, the
raw key value is used; for composite keys, the key is expected to be a
JSON object with matching field names.
Stream format
Cube Store expects Kafka messages to have a JSON object as their value payload, with field names matching the column names defined in the cube. For example, given the streaming cube above, each Kafka message value should look like:
{
"ORDER_ID": "ord_12345",
"USER_ID": "usr_789",
"STATUS": "completed",
"AMOUNT": 49.99,
"CREATED_AT": "2025-01-15T10:30:00.000"
}Field names are case-sensitive and must match the column names used in
the sql property of each dimension and measure definition. Missing
fields default to null.
The message key is optional. When present and the value starts with {,
it is parsed as a JSON object and used as a fallback source for unique
key column values (see above).
Timestamp handling
For dimensions with type: time, Cube Store accepts timestamp values in
two formats:
- String — parsed using ISO 8601 / RFC 3339 formats. Supported
patterns include:
2025-01-15T10:30:00.000Z2025-01-15T10:30:00Z2025-01-15 10:30:00.000 UTC2025-01-15T10:30:002025-01-15 10:30:002025-01-15
- Number — interpreted as epoch milliseconds (not seconds, not
microseconds). For example,
1736939400000represents2025-01-15T10:30:00.000Z.
If your Kafka topic produces timestamps as strings in a non-standard
format, you can use PARSE_TIMESTAMP in the cube’s sql property to
convert them. In that case, define the source column as type: string
in a source_table and use the select_statement to transform it:
sql: `SELECT PARSE_TIMESTAMP(TIMESTAMP_STR,
'yyyy-MM-dd''T''HH:mm:ss.SSSX', 'UTC') AS created_at,
ORDER_ID, USER_ID, STATUS, AMOUNT
FROM ORDER_EVENTS_STREAM`,Time dimension truncation (controlled by the granularity property of
the pre-aggregation) is handled automatically. Cube generates the
appropriate PARSE_TIMESTAMP(FORMAT_TIMESTAMP(CONVERT_TZ(...)))
expression chain to truncate timestamps to the configured granularity
(e.g., day, hour, minute). Cube Store evaluates these expressions
natively on each micro-batch during ingestion. Standard SQL functions
like date_trunc are also available in the select_statement.
Filtering on the stream
When the streaming cube defines a sql property with a SELECT
statement (rather than sql_table), Cube Store applies the projection
and any WHERE filters from that statement directly on each micro-batch
of incoming Kafka messages. This filtering happens inside Cube Store
using its query engine — it does not require ksqlDB to process the
filter. Only rows that pass the filter are ingested into the
pre-aggregation table.
This allows you to define a streaming cube that only ingests a subset of the data from the underlying Kafka topic without creating any server-side filter objects in ksqlDB.
Supported SQL syntax
The SELECT statement must follow a strict shape. Cube Store only
accepts plans that resolve to Projection > Filter > TableScan (where
the filter is optional). Any other query plan shape is rejected.
Supported:
SELECTwith column references (e.g.,SELECT col1, col2 FROM topic)SELECT *wildcard- Column aliases (
SELECT col1 AS my_alias) WHEREclause with comparison operators (=,!=,<,>,<=,>=)- Boolean logic in
WHERE(AND,OR,NOT) IS NULLandIS NOT NULLINlists (col IN (1, 2, 3))BETWEENexpressionsCASE ... WHEN ... THEN ... ELSE ... ENDexpressionsCAST(expr AS type)type conversionsEXTRACT(field FROM expr)for date/time partsSUBSTRING(expr FROM start FOR length)- Scalar functions (e.g.,
COALESCE,CONCAT, arithmetic) CONVERT_TZfor timezone conversion (internally rewritten for compatibility)PARSE_TIMESTAMPandFORMAT_TIMESTAMPfor timestamp parsing and formatting using ksql-style format strings (e.g.,yyyy-MM-dd'T'HH:mm:ss.SSS)- Nested expressions with parentheses
date_truncfor timestamp truncation
Not supported:
JOINclauses — only a singleFROMtable is allowed- Subqueries in
SELECTorWHERE GROUP BY,HAVING, or aggregate functions (SUM,COUNT,AVG, etc.)ORDER BY(rows are consumed in stream order)LIMITandOFFSETUNION,INTERSECT,EXCEPT- Window functions (
OVER,PARTITION BY) - Multiple
FROMor multipleWHEREclauses - Common Table Expressions (
WITH ... AS)
All column expressions in the SELECT list that are not simple column
references must have explicit aliases. Unique key columns may reference
the source column through a scalar function (e.g.,
CAST(id AS VARCHAR) AS id), but not through arbitrary expressions.
Was this page useful?