01-create-sources.td 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  10. ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true
  11. ALTER SYSTEM SET storage_statistics_collection_interval = 1000
  12. ALTER SYSTEM SET storage_statistics_interval = 2000
  13. # Create sources and verify they can ingest data while `environmentd` is online.
  14. $ kafka-create-topic topic=remote1
  15. $ kafka-create-topic topic=remote2
  16. $ kafka-ingest format=bytes topic=remote1
  17. one
  18. $ kafka-ingest format=bytes topic=remote2
  19. one
  20. > CREATE CLUSTER storage_cluster REPLICAS (
  21. r1 (
  22. STORAGECTL ADDRESSES ['clusterd1:2100', 'clusterd2:2100'],
  23. STORAGE ADDRESSES ['clusterd1:2103', 'clusterd2:2103'],
  24. COMPUTECTL ADDRESSES ['clusterd1:2101', 'clusterd2:2101'],
  25. COMPUTE ADDRESSES ['clusterd1:2102', 'clusterd2:2102'],
  26. WORKERS 4
  27. )
  28. )
  29. > CREATE CONNECTION kafka_conn
  30. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  31. > CREATE SOURCE remote1
  32. IN CLUSTER storage_cluster
  33. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-remote1-${testdrive.seed}')
  34. > CREATE TABLE remote1_tbl FROM SOURCE remote1 (REFERENCE "testdrive-remote1-${testdrive.seed}")
  35. FORMAT TEXT
  36. > CREATE SOURCE remote2
  37. IN CLUSTER storage_cluster
  38. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-remote2-${testdrive.seed}')
  39. > CREATE TABLE remote2_tbl FROM SOURCE remote2 (REFERENCE "testdrive-remote2-${testdrive.seed}")
  40. FORMAT TEXT
  41. > CREATE SOURCE webhook_text IN CLUSTER storage_cluster FROM WEBHOOK
  42. BODY FORMAT TEXT;
  43. $ webhook-append database=materialize schema=public name=webhook_text
  44. a
  45. > SELECT * from remote1_tbl
  46. one
  47. > SELECT * from remote2_tbl
  48. one
  49. # The `CREATE TABLE ... FROM SOURCE` commands caused a recreation of the
  50. # respective source dataflows, during which we might have lost the
  51. # statistics about committed updates from the snapshot. Ingest some more data
  52. # to ensure we see some `updates_committed`.
  53. $ kafka-ingest format=bytes topic=remote1
  54. two
  55. $ kafka-ingest format=bytes topic=remote2
  56. two
  57. > SELECT s.name,
  58. SUM(u.updates_committed) > 0,
  59. SUM(u.messages_received) >= 2,
  60. SUM(u.offset_known),
  61. SUM(u.offset_committed)
  62. FROM mz_sources s
  63. JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
  64. WHERE s.name IN ('remote1', 'remote2')
  65. GROUP BY s.id, s.name
  66. remote1 true true 2 2
  67. remote2 true true 2 2
  68. > SELECT s.name,
  69. SUM(u.updates_committed)
  70. FROM mz_sources s
  71. JOIN mz_internal.mz_source_statistics_raw u ON s.id = u.id
  72. WHERE s.name IN ('webhook_text')
  73. GROUP BY s.id, s.name
  74. webhook_text 1