source-sink-clusters.td 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  10. ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true
  11. # Clean up cluster manually, since testdrive does not automatically clean up
  12. # clusters.
  13. > DROP CLUSTER IF EXISTS storage;
  14. # Create a table for use throughout the test.
  15. > CREATE TABLE t (a int)
  16. # Create a cluster for sources and sinks.
  17. > CREATE CLUSTER storage REPLICAS ()
  18. # Querying a cluster with no replicas does not succeed.
  19. > SET cluster = storage
  20. ! SELECT generate_series(1, 1)
  21. contains:CLUSTER "storage" has no replicas available to service request
  22. # Creating a source in an empty, zero-replica cluster should work.
  23. > CREATE SOURCE loadgen IN CLUSTER storage FROM LOAD GENERATOR COUNTER (UP TO 100)
  24. ! ALTER SOURCE loadgen SET (SIZE = '1')
  25. contains:Expected one of TIMESTAMP or RETAIN, found SIZE
  26. # Create indexes and materialized views in a storage cluster is allowed.
  27. > CREATE INDEX t_idx IN CLUSTER storage ON t (a)
  28. > CREATE MATERIALIZED VIEW mv IN CLUSTER storage AS SELECT 1
  29. > CREATE CLUSTER REPLICA storage.r1 SIZE = '1'
  30. # Executing queries on a storage cluster is allowed.
  31. > SELECT generate_series(1, 1)
  32. 1
  33. # Creating sources on clusters containing compute objects is allowed
  34. > CREATE SOURCE lg2 IN CLUSTER storage FROM LOAD GENERATOR COUNTER (UP TO 100)
  35. # Test that `DROP CLUSTER` only succeeds with `CASCADE`.
  36. ! DROP CLUSTER storage
  37. contains:cannot drop cluster "storage" because other objects depend on it
  38. > DROP CLUSTER storage CASCADE
  39. > SET cluster = quickstart
  40. # Test that a cluster can contain multiple sources and sinks, and verify that
  41. # the sources and sinks produce the correct output.
  42. > CREATE CLUSTER storage REPLICAS (r1 (SIZE '1'))
  43. $ kafka-create-topic topic=data1 partitions=1
  44. $ kafka-create-topic topic=data2 partitions=1
  45. $ kafka-ingest format=bytes topic=data1
  46. a
  47. b
  48. $ kafka-ingest format=bytes topic=data2
  49. aa
  50. bb
  51. > CREATE CONNECTION kafka
  52. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  53. > CREATE SOURCE data1
  54. IN CLUSTER storage
  55. FROM KAFKA CONNECTION kafka (TOPIC 'testdrive-data1-${testdrive.seed}')
  56. FORMAT TEXT
  57. > CREATE SOURCE data2
  58. IN CLUSTER storage
  59. FROM KAFKA CONNECTION kafka (TOPIC 'testdrive-data2-${testdrive.seed}')
  60. FORMAT TEXT
  61. > SELECT * FROM data1
  62. a
  63. b
  64. > SELECT * FROM data2
  65. aa
  66. bb
  67. > CREATE MATERIALIZED VIEW view1 AS SELECT text FROM data1
  68. > CREATE MATERIALIZED VIEW view2 AS SELECT text || text AS text FROM data2
  69. > CREATE SINK sink1
  70. IN CLUSTER storage
  71. FROM view1
  72. INTO KAFKA CONNECTION kafka (TOPIC 'sink1-${testdrive.seed}')
  73. FORMAT JSON ENVELOPE DEBEZIUM
  74. ! ALTER SINK sink1 SET (SIZE = '1')
  75. contains:Expected one of PARTITION or SNAPSHOT or VERSION
  76. > CREATE SINK sink2
  77. IN CLUSTER storage
  78. FROM view2
  79. INTO KAFKA CONNECTION kafka (TOPIC 'sink2-${testdrive.seed}')
  80. FORMAT JSON ENVELOPE DEBEZIUM
  81. $ kafka-verify-data format=json sink=materialize.public.sink1 key=false sort-messages=true
  82. {"before": null, "after": {"text": "a"}}
  83. {"before": null, "after": {"text": "b"}}
  84. $ kafka-verify-data format=json sink=materialize.public.sink2 key=false sort-messages=true
  85. {"before": null, "after": {"text": "aaaa"}}
  86. {"before": null, "after": {"text": "bbbb"}}
  87. # Test that the replica can be sized up and the sources and sinks correctly
  88. # restart.
  89. > DROP CLUSTER REPLICA storage.r1
  90. > CREATE CLUSTER REPLICA storage.r1 SIZE '2'
  91. $ kafka-ingest format=bytes topic=data1
  92. c
  93. $ kafka-ingest format=bytes topic=data2
  94. cc
  95. > SELECT * FROM data1
  96. a
  97. b
  98. c
  99. > SELECT * FROM data2
  100. aa
  101. bb
  102. cc
  103. $ kafka-verify-data format=json sink=materialize.public.sink1 key=false sort-messages=true
  104. {"before": null, "after": {"text": "c"}}
  105. $ kafka-verify-data format=json sink=materialize.public.sink2 key=false sort-messages=true
  106. {"before": null, "after": {"text": "cccc"}}
  107. # Test that the `size` and `cluster_id` fields are correctly populated in the
  108. # system catalog for sources and sinks.
  109. > SELECT s.name, s.size, c.name
  110. FROM mz_sources s
  111. JOIN mz_clusters c ON c.id = s.cluster_id
  112. WHERE s.id LIKE 'u%'
  113. data1 <null> storage
  114. data2 <null> storage
  115. > SELECT s.name, s.size, c.name
  116. FROM mz_sinks s
  117. JOIN mz_clusters c ON c.id = s.cluster_id
  118. WHERE s.id LIKE 'u%'
  119. sink1 <null> storage
  120. sink2 <null> storage