pr-24663-regression.td 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # This is a test that was used to ensure a specific ordering of updates
  10. # in `upsert` continued to be processed properly in pr materialize#24663. Its short,
  11. # so its copied here.
  12. $ set-arg-default default-storage-size=1
  13. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  14. ALTER SYSTEM SET storage_statistics_collection_interval = 1000
  15. ALTER SYSTEM SET storage_statistics_interval = 2000
  16. # must be a subset of the keys in the rows
  17. $ set keyschema={
  18. "type": "record",
  19. "name": "Key",
  20. "fields": [
  21. {"name": "id", "type": "long"}
  22. ]
  23. }
  24. $ set schema={
  25. "type" : "record",
  26. "name" : "envelope",
  27. "fields" : [
  28. { "name": "op", "type": "string" },
  29. {
  30. "name": "after",
  31. "type": [
  32. {
  33. "name": "row",
  34. "type": "record",
  35. "fields": [
  36. {
  37. "name": "id",
  38. "type": "long"
  39. },
  40. {
  41. "name": "creature",
  42. "type": "string"
  43. }]
  44. },
  45. "null"
  46. ]
  47. },
  48. {
  49. "name": "source",
  50. "type": {
  51. "type": "record",
  52. "name": "Source",
  53. "namespace": "io.debezium.connector.mysql",
  54. "fields": [
  55. {
  56. "name": "file",
  57. "type": "string"
  58. },
  59. {
  60. "name": "pos",
  61. "type": "long"
  62. },
  63. {
  64. "name": "row",
  65. "type": "int"
  66. },
  67. {
  68. "name": "snapshot",
  69. "type": [
  70. {
  71. "type": "boolean",
  72. "connect.default": false
  73. },
  74. "null"
  75. ],
  76. "default": false
  77. }
  78. ],
  79. "connect.name": "io.debezium.connector.mysql.Source"
  80. }
  81. }
  82. ]
  83. }
  84. > CREATE CONNECTION kafka_conn
  85. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  86. > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  87. URL '${testdrive.schema-registry-url}'
  88. );
  89. $ kafka-create-topic topic=dbz-no-before partitions=1
  90. # Note: we ignore the `op` field, so can be "u" or "c"
  91. $ kafka-ingest format=avro topic=dbz-no-before key-format=avro key-schema=${keyschema} schema=${schema} timestamp=1
  92. {"id": 1} {"after": {"row": {"id": 1, "creature": "mudskipper"}}, "op": "c", "source": {"file": "binlog1", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  93. {"id": 1} {"after": {"row": {"id": 1, "creature": "salamander"}}, "op": "c", "source": {"file": "binlog2", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  94. {"id": 1} {"after": null, "op": "c", "source": {"file": "binlog3", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  95. > CREATE CLUSTER dbz_no_before_cluster SIZE '${arg.default-storage-size}';
  96. > CREATE SOURCE dbz_no_before
  97. IN CLUSTER dbz_no_before_cluster
  98. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-no-before-${testdrive.seed}')
  99. > CREATE TABLE dbz_no_before_tbl FROM SOURCE dbz_no_before (REFERENCE "testdrive-dbz-no-before-${testdrive.seed}")
  100. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  101. ENVELOPE DEBEZIUM
  102. > SELECT count(*) FROM dbz_no_before_tbl
  103. 0
  104. # WIP: The feedback upsert implementation does not count tombstones in
  105. # bytes_indexed. For now.
  106. > SELECT
  107. bool_and(u.snapshot_committed),
  108. SUM(u.bytes_indexed) > 0,
  109. SUM(u.records_indexed)
  110. FROM mz_tables t
  111. JOIN mz_internal.mz_source_statistics_raw u ON t.id = u.id
  112. WHERE t.name IN ('dbz_no_before_tbl')
  113. GROUP BY t.name
  114. ORDER BY t.name
  115. true false 0
  116. $ kafka-ingest format=avro topic=dbz-no-before key-format=avro key-schema=${keyschema} schema=${schema} timestamp=1
  117. {"id": 1} {"after": {"row": {"id": 1, "creature": "mudskipper"}}, "op": "c", "source": {"file": "binlog1", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  118. > SELECT * FROM dbz_no_before_tbl
  119. id creature
  120. -------------
  121. 1 mudskipper
  122. > SELECT
  123. bool_and(u.snapshot_committed),
  124. SUM(u.bytes_indexed) > 0,
  125. SUM(u.records_indexed)
  126. FROM mz_tables t
  127. JOIN mz_internal.mz_source_statistics_raw u ON t.id = u.id
  128. WHERE t.name IN ('dbz_no_before_tbl')
  129. GROUP BY t.name
  130. ORDER BY t.name
  131. true true 1
  132. $ kafka-ingest format=avro topic=dbz-no-before key-format=avro key-schema=${keyschema} schema=${schema} timestamp=1
  133. {"id": 1} {"after": null, "op": "c", "source": {"file": "binlog3", "pos": 1, "row": 1, "snapshot": {"boolean": false}}}
  134. > SELECT count(*) FROM dbz_no_before_tbl
  135. 0
  136. # WIP: The feedback upsert implementation does not count tombstones in
  137. # bytes_indexed. For now.
  138. > SELECT
  139. bool_and(u.snapshot_committed),
  140. SUM(u.bytes_indexed) > 0,
  141. SUM(u.records_indexed)
  142. FROM mz_tables t
  143. JOIN mz_internal.mz_source_statistics_raw u ON t.id = u.id
  144. WHERE t.name IN ('dbz_no_before_tbl')
  145. GROUP BY t.name
  146. ORDER BY t.name
  147. true false 0