CREATE SOURCE
IF NOT EXISTS
src_name
(
col_name
,
)
IN CLUSTER
cluster_name
FROM
KAFKA
CONNECTION
connection_name
(
TOPIC
topic
,
connection_option
)
KEY FORMAT
format_spec
VALUE FORMAT
FORMAT
format_spec
INCLUDE
KEY
PARTITION
OFFSET
TIMESTAMP
HEADERS
AS
name
HEADER
key
AS
name
BYTES
,
ENVELOPE
NONE
DEBEZIUM
UPSERT
(
VALUE DECODING ERRORS = INLINE
AS
name
)
EXPOSE
PROGRESS
AS
progress_subsource_name
with_options