source-sink-clusters.td 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  10. ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true
  11. ALTER SYSTEM SET enable_create_table_from_source = true
  12. # Clean up cluster manually, since testdrive does not automatically clean up
  13. # clusters.
  14. > DROP CLUSTER IF EXISTS storage;
  15. # Create a table for use throughout the test.
  16. > CREATE TABLE t (a int)
  17. # Create a cluster for sources and sinks.
  18. > CREATE CLUSTER storage REPLICAS ()
  19. # Querying a cluster with no replicas does not succeed.
  20. > SET cluster = storage
  21. ! SELECT generate_series(1, 1)
  22. contains:CLUSTER "storage" has no replicas available to service request
  23. # Creating a source in an empty, zero-replica cluster should work.
  24. > CREATE SOURCE loadgen IN CLUSTER storage FROM LOAD GENERATOR COUNTER (UP TO 100)
  25. ! ALTER SOURCE loadgen SET (SIZE = '1')
  26. contains:Expected one of TIMESTAMP or RETAIN, found SIZE
  27. # Create indexes and materialized views in a storage cluster is allowed.
  28. > CREATE INDEX t_idx IN CLUSTER storage ON t (a)
  29. > CREATE MATERIALIZED VIEW mv IN CLUSTER storage AS SELECT 1
  30. > CREATE CLUSTER REPLICA storage.r1 SIZE = '1'
  31. # Executing queries on a storage cluster is allowed.
  32. > SELECT generate_series(1, 1)
  33. 1
  34. # Creating sources on clusters containing compute objects is allowed
  35. > CREATE SOURCE lg2 IN CLUSTER storage FROM LOAD GENERATOR COUNTER (UP TO 100)
  36. # Test that `DROP CLUSTER` only succeeds with `CASCADE`.
  37. ! DROP CLUSTER storage
  38. contains:cannot drop cluster "storage" because other objects depend on it
  39. > DROP CLUSTER storage CASCADE
  40. > SET cluster = quickstart
  41. # Test that a cluster can contain multiple sources and sinks, and verify that
  42. # the sources and sinks produce the correct output.
  43. > CREATE CLUSTER storage REPLICAS (r1 (SIZE '1'))
  44. $ kafka-create-topic topic=data1 partitions=1
  45. $ kafka-create-topic topic=data2 partitions=1
  46. $ kafka-ingest format=bytes topic=data1
  47. a
  48. b
  49. $ kafka-ingest format=bytes topic=data2
  50. aa
  51. bb
  52. > CREATE CONNECTION kafka
  53. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  54. > CREATE SOURCE data1
  55. IN CLUSTER storage
  56. FROM KAFKA CONNECTION kafka (TOPIC 'testdrive-data1-${testdrive.seed}')
  57. > CREATE TABLE data1_tbl FROM SOURCE data1 (REFERENCE "testdrive-data1-${testdrive.seed}")
  58. FORMAT TEXT
  59. > CREATE SOURCE data2
  60. IN CLUSTER storage
  61. FROM KAFKA CONNECTION kafka (TOPIC 'testdrive-data2-${testdrive.seed}')
  62. > CREATE TABLE data2_tbl FROM SOURCE data2 (REFERENCE "testdrive-data2-${testdrive.seed}")
  63. FORMAT TEXT
  64. > SELECT * FROM data1_tbl
  65. a
  66. b
  67. > SELECT * FROM data2_tbl
  68. aa
  69. bb
  70. > CREATE MATERIALIZED VIEW view1 AS SELECT text FROM data1_tbl
  71. > CREATE MATERIALIZED VIEW view2 AS SELECT text || text AS text FROM data2_tbl
  72. > CREATE SINK sink1
  73. IN CLUSTER storage
  74. FROM view1
  75. INTO KAFKA CONNECTION kafka (TOPIC 'sink1-${testdrive.seed}')
  76. FORMAT JSON ENVELOPE DEBEZIUM
  77. ! ALTER SINK sink1 SET (SIZE = '1')
  78. contains:Expected one of PARTITION or SNAPSHOT or VERSION
  79. > CREATE SINK sink2
  80. IN CLUSTER storage
  81. FROM view2
  82. INTO KAFKA CONNECTION kafka (TOPIC 'sink2-${testdrive.seed}')
  83. FORMAT JSON ENVELOPE DEBEZIUM
  84. $ kafka-verify-data format=json sink=materialize.public.sink1 key=false sort-messages=true
  85. {"before": null, "after": {"text": "a"}}
  86. {"before": null, "after": {"text": "b"}}
  87. $ kafka-verify-data format=json sink=materialize.public.sink2 key=false sort-messages=true
  88. {"before": null, "after": {"text": "aaaa"}}
  89. {"before": null, "after": {"text": "bbbb"}}
  90. # Test that the replica can be sized up and the sources and sinks correctly
  91. # restart.
  92. > DROP CLUSTER REPLICA storage.r1
  93. > CREATE CLUSTER REPLICA storage.r1 SIZE '2'
  94. $ kafka-ingest format=bytes topic=data1
  95. c
  96. $ kafka-ingest format=bytes topic=data2
  97. cc
  98. > SELECT * FROM data1_tbl
  99. a
  100. b
  101. c
  102. > SELECT * FROM data2_tbl
  103. aa
  104. bb
  105. cc
  106. $ kafka-verify-data format=json sink=materialize.public.sink1 key=false sort-messages=true
  107. {"before": null, "after": {"text": "c"}}
  108. $ kafka-verify-data format=json sink=materialize.public.sink2 key=false sort-messages=true
  109. {"before": null, "after": {"text": "cccc"}}
  110. # Test that the `size` and `cluster_id` fields are correctly populated in the
  111. # system catalog for sources and sinks.
  112. > SELECT s.name, s.size, c.name
  113. FROM mz_sources s
  114. JOIN mz_clusters c ON c.id = s.cluster_id
  115. WHERE s.id LIKE 'u%'
  116. data1 <null> storage
  117. data2 <null> storage
  118. > SELECT s.name, s.size, c.name
  119. FROM mz_sinks s
  120. JOIN mz_clusters c ON c.id = s.cluster_id
  121. WHERE s.id LIKE 'u%'
  122. sink1 <null> storage
  123. sink2 <null> storage