github-8809.td 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # Regression test for https://github.com/MaterializeInc/database-issues/issues/8809.
  10. $ set-arg-default single-replica-cluster=quickstart
  11. $ set schema=[
  12. {
  13. "type": "array",
  14. "items": {
  15. "type": "record",
  16. "name": "update",
  17. "namespace": "com.materialize.cdc",
  18. "fields": [
  19. {
  20. "name": "data",
  21. "type": {
  22. "type": "record",
  23. "name": "data",
  24. "fields": [
  25. {"name": "a", "type": "long"},
  26. {"name": "b", "type": "long"}
  27. ]
  28. }
  29. },
  30. {
  31. "name": "time",
  32. "type": "long"
  33. },
  34. {
  35. "name": "diff",
  36. "type": "long"
  37. }
  38. ]
  39. }
  40. },
  41. {
  42. "type": "record",
  43. "name": "progress",
  44. "namespace": "com.materialize.cdc",
  45. "fields": [
  46. {
  47. "name": "lower",
  48. "type": {
  49. "type": "array",
  50. "items": "long"
  51. }
  52. },
  53. {
  54. "name": "upper",
  55. "type": {
  56. "type": "array",
  57. "items": "long"
  58. }
  59. },
  60. {
  61. "name": "counts",
  62. "type": {
  63. "type": "array",
  64. "items": {
  65. "type": "record",
  66. "name": "counts",
  67. "fields": [
  68. {
  69. "name": "time",
  70. "type": "long"
  71. },
  72. {
  73. "name": "count",
  74. "type": "long"
  75. }
  76. ]
  77. }
  78. }
  79. }
  80. ]
  81. }
  82. ]
  83. $ kafka-create-topic topic=topic1
  84. $ kafka-ingest format=avro topic=topic1 schema=${schema}
  85. {"array":[{"data":{"a":1,"b":2},"time":1,"diff":1}]}
  86. {"com.materialize.cdc.progress":{"lower":[0],"upper":[10],"counts":[{"time":1,"count":1}]}}
  87. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  88. ALTER SYSTEM SET enable_envelope_materialize = true
  89. > CREATE CONNECTION kafka_conn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  90. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (URL '${testdrive.schema-registry-url}');
  91. > CREATE SOURCE source1
  92. IN CLUSTER ${arg.single-replica-cluster}
  93. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-topic1-${testdrive.seed}')
  94. $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="5s"
  95. > CREATE TABLE source1_tbl FROM SOURCE source1 (REFERENCE "testdrive-topic1-${testdrive.seed}")
  96. FORMAT AVRO USING SCHEMA '${schema}' ENVELOPE MATERIALIZE
  97. > SELECT write_frontier, read_frontier < write_frontier
  98. FROM mz_internal.mz_frontiers
  99. JOIN mz_tables ON (id = object_id)
  100. WHERE name = 'source1_tbl'
  101. 10 true
  102. > SELECT * FROM source1_tbl
  103. 1 2