CREATE SOURCE IF NOT EXISTS src_name ( col_name , ) IN CLUSTER cluster_name FROM KAFKA CONNECTION connection_name ( TOPIC topic , connection_option ) KEY FORMAT format_spec VALUE FORMAT FORMAT format_spec INCLUDE KEY PARTITION OFFSET TIMESTAMP HEADERS AS name HEADER key AS name BYTES , ENVELOPE NONE DEBEZIUM UPSERT ( VALUE DECODING ERRORS = INLINE AS name ) EXPOSE PROGRESS AS progress_subsource_name with_options