kafka-progress.td 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default single-replica-cluster=quickstart
  10. # Create sources and verify they can ingest data while `environmentd` is online.
  11. $ kafka-create-topic topic=data partitions=1
  12. $ kafka-ingest format=bytes topic=data
  13. one
  14. > CREATE CONNECTION kafka_conn
  15. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  16. > CREATE SOURCE data
  17. IN CLUSTER ${arg.single-replica-cluster}
  18. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-${testdrive.seed}')
  19. FORMAT TEXT;
  20. > SELECT * from data
  21. one
  22. > SELECT status FROM mz_internal.mz_source_statuses WHERE name = 'data_progress';
  23. running
  24. # Ensure that we can select from automatically generated remap collection
  25. > SELECT partition::text, "offset" FROM data_progress
  26. [0,0] 1
  27. (0,) 0
  28. # Ensure we report the write frontier of the progress subsource
  29. $ set-regex match=(\s{12}0|\d{13,20}|u\d{1,5}|\(\d+-\d\d-\d\d\s\d\d:\d\d:\d\d\.\d\d\d\)|true|false) replacement=<>
  30. > EXPLAIN TIMESTAMP FOR SELECT * FROM data_progress
  31. " query timestamp: <> <>\n oracle read timestamp: <> <>\nlargest not in advance of upper: <> <>\n upper:[<> <>]\n since:[<> <>]\n can respond immediately: <>\n timeline: Some(EpochMilliseconds)\n session wall time: <> <>\n\nsource materialize.public.data_progress (<>, storage):\n read frontier:[<> <>]\n write frontier:[<> <>]\n\nbinding constraints:\nlower:\n (IsolationLevel(StrictSerializable)): [<> <>]\n"
  32. > CREATE SOURCE d
  33. IN CLUSTER ${arg.single-replica-cluster}
  34. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-${testdrive.seed}')
  35. FORMAT TEXT
  36. EXPOSE PROGRESS AS exposed_progress_data;
  37. > SELECT partition::text, "offset" FROM exposed_progress_data
  38. [0,0] 1
  39. (0,) 0