kafka-progress.td 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. # Create sources and verify they can ingest data while `environmentd` is online.
  11. $ kafka-create-topic topic=data partitions=1
  12. $ kafka-ingest format=bytes topic=data
  13. one
  14. > CREATE CONNECTION kafka_conn
  15. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  16. > CREATE SOURCE data
  17. IN CLUSTER ${arg.single-replica-cluster}
  18. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-${testdrive.seed}');
  19. > CREATE TABLE data_tbl FROM SOURCE data (REFERENCE "testdrive-data-${testdrive.seed}")
  20. FORMAT TEXT;
  21. > SELECT * from data_tbl
  22. one
  23. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'data_progress';
  24. running
  25. # Ensure that we can select from automatically generated remap collection
  26. > SELECT partition::text, "offset" FROM data_progress
  27. [0,0] 1
  28. (0,) 0
  29. # Ensure we report the write frontier of the progress subsource
  30. $ set-regex match=(\s{12}0|\d{13,20}|u\d{1,5}|\(\d+-\d\d-\d\d\s\d\d:\d\d:\d\d\.\d\d\d\)|true|false) replacement=<>
  31. > EXPLAIN TIMESTAMP FOR SELECT * FROM data_progress
  32. " query timestamp: <> <>\n oracle read timestamp: <> <>\nlargest not in advance of upper: <> <>\n upper:[<> <>]\n since:[<> <>]\n can respond immediately: <>\n timeline: Some(EpochMilliseconds)\n session wall time: <> <>\n\nsource materialize.public.data_progress (<>, storage):\n read frontier:[<> <>]\n write frontier:[<> <>]\n\nbinding constraints:\nlower:\n (IsolationLevel(StrictSerializable)): [<> <>]\n"
  33. > CREATE SOURCE d
  34. IN CLUSTER ${arg.single-replica-cluster}
  35. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-${testdrive.seed}')
  36. EXPOSE PROGRESS AS exposed_progress_data;
  37. > CREATE TABLE d_tbl FROM SOURCE d (REFERENCE "testdrive-data-${testdrive.seed}")
  38. FORMAT TEXT;
  39. > SELECT partition::text, "offset" FROM exposed_progress_data
  40. [0,0] 1
  41. (0,) 0