upsert_unordered_key.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from textwrap import dedent
  10. from materialize.checks.actions import Testdrive
  11. from materialize.checks.checks import Check
  12. # A schema that allows null values
  13. SCHEMA = dedent(
  14. """
  15. # Must be a subset of the keys in the rows AND
  16. # in a different order than the value.
  17. $ set keyschema={
  18. "type": "record",
  19. "name": "Key",
  20. "fields": [
  21. {"name": "b", "type": "string"},
  22. {"name": "a", "type": "long"}
  23. ]
  24. }
  25. $ set schema={
  26. "type" : "record",
  27. "name" : "envelope",
  28. "fields" : [
  29. {
  30. "name": "before",
  31. "type": [
  32. {
  33. "name": "row",
  34. "type": "record",
  35. "fields": [
  36. {
  37. "name": "a",
  38. "type": "long"
  39. },
  40. {
  41. "name": "data",
  42. "type": "string"
  43. },
  44. {
  45. "name": "b",
  46. "type": "string"
  47. }]
  48. },
  49. "null"
  50. ]
  51. },
  52. {
  53. "name": "after",
  54. "type": ["row", "null"]
  55. }
  56. ]
  57. }
  58. """
  59. )
  60. class UpsertUnorderedKey(Check):
  61. """Upsert with keys in a different order than values."""
  62. def initialize(self) -> Testdrive:
  63. return Testdrive(
  64. SCHEMA
  65. + dedent(
  66. """
  67. $ kafka-create-topic topic=upsert-unordered-key
  68. $ kafka-ingest format=avro topic=upsert-unordered-key key-format=avro key-schema=${keyschema} schema=${schema}
  69. {"b": "bdata", "a": 1} {"before": {"row": {"a": 1, "data": "fish", "b": "bdata"}}, "after": {"row": {"a": 1, "data": "fish2", "b": "bdata"}}}
  70. > CREATE SOURCE upsert_unordered_key_src
  71. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-unordered-key-${testdrive.seed}')
  72. > CREATE TABLE upsert_unordered_key
  73. FROM SOURCE upsert_unordered_key_src (REFERENCE "testdrive-upsert-unordered-key-${testdrive.seed}")
  74. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  75. ENVELOPE DEBEZIUM
  76. """
  77. )
  78. )
  79. def manipulate(self) -> list[Testdrive]:
  80. return [
  81. Testdrive(SCHEMA + dedent(s))
  82. for s in [
  83. """
  84. $ kafka-ingest format=avro topic=upsert-unordered-key key-format=avro key-schema=${keyschema} schema=${schema}
  85. {"b": "bdata", "a": 1} {"before": {"row": {"a": 1, "data": "fish2", "b": "bdata"}}, "after": {"row": {"a": 1, "data": "fish3", "b": "bdata"}}}
  86. """,
  87. """
  88. $ kafka-ingest format=avro topic=upsert-unordered-key key-format=avro key-schema=${keyschema} schema=${schema}
  89. {"b": "bdata", "a": 1} {"before": {"row": {"a": 1, "data": "fish3", "b": "bdata"}}, "after": {"row": {"a": 1, "data": "fish4", "b": "bdata"}}}
  90. """,
  91. ]
  92. ]
  93. def validate(self) -> Testdrive:
  94. return Testdrive(
  95. dedent(
  96. """
  97. > SELECT * FROM upsert_unordered_key
  98. 1 fish4 bdata
  99. """
  100. )
  101. )