upsert_wide.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from textwrap import dedent
  10. from materialize.checks.actions import Testdrive
  11. from materialize.checks.checks import Check, externally_idempotent
  12. from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
  13. PAD_100K = "X" * (100 * 1024)
  14. PAD_500K = "Y" * (500 * 1024)
  15. PAD_1M = "Z" * (1000 * 1024)
  16. @externally_idempotent(False)
  17. class UpsertWideValue(Check):
  18. """Perform upsert over records with a very long/wide value."""
  19. def initialize(self) -> Testdrive:
  20. return Testdrive(
  21. dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
  22. + dedent(
  23. f"""
  24. $ kafka-create-topic topic=upsert-wide-value
  25. $ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}}
  26. {{"key1": "A"}} {{"f1": "{PAD_100K}"}}
  27. > CREATE SOURCE upsert_wide_value_src
  28. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-wide-value-${{testdrive.seed}}')
  29. > CREATE TABLE upsert_wide_value FROM SOURCE upsert_wide_value_src (REFERENCE "testdrive-upsert-wide-value-${{testdrive.seed}}")
  30. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  31. ENVELOPE UPSERT
  32. > CREATE MATERIALIZED VIEW upsert_wide_value_view AS
  33. SELECT LEFT(f1, 1), RIGHT(f1, 1),
  34. LENGTH(f1)
  35. FROM upsert_wide_value
  36. """
  37. )
  38. )
  39. def manipulate(self) -> list[Testdrive]:
  40. return [
  41. Testdrive(dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD) + dedent(s))
  42. for s in [
  43. f"""
  44. $ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}}
  45. {{"key1": "A"}} {{"f1": "{PAD_1M}"}}
  46. $ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}}
  47. {{"key1": "B"}} {{"f1": "{PAD_500K}"}}
  48. $ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}}
  49. {{"key1": "C"}} {{"f1": "{PAD_1M}"}}
  50. """,
  51. f"""
  52. $ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}}
  53. {{"key1": "A"}} {{"f1": "{PAD_500K}"}}
  54. $ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}}
  55. {{"key1": "B"}} {{"f1": "{PAD_1M}"}}
  56. $ kafka-ingest format=avro key-format=avro topic=upsert-wide-value key-schema=${{keyschema}} schema=${{schema}}
  57. {{"key1": "C"}} {{"f1": "{PAD_100K}"}}
  58. """,
  59. ]
  60. ]
  61. def validate(self) -> Testdrive:
  62. return Testdrive(
  63. dedent(
  64. """
  65. > SELECT * FROM upsert_wide_value_view
  66. X X 102400
  67. Y Y 512000
  68. Z Z 1024000
  69. """
  70. )
  71. )
  72. @externally_idempotent(False)
  73. class UpsertWideKey(Check):
  74. """Perform upsert over records with a very long/wide key."""
  75. def initialize(self) -> Testdrive:
  76. return Testdrive(
  77. dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
  78. + dedent(
  79. f"""
  80. $ kafka-create-topic topic=upsert-wide-key
  81. $ kafka-ingest format=avro key-format=avro topic=upsert-wide-key key-schema=${{keyschema}} schema=${{schema}}
  82. {{"key1": "A{PAD_1M}"}} {{"f1": "A1"}}
  83. {{"key1": "B{PAD_1M}"}} {{"f1": "B1"}}
  84. {{"key1": "C{PAD_1M}"}} {{"f1": "C1"}}
  85. > CREATE SOURCE upsert_wide_key_src
  86. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-wide-key-${{testdrive.seed}}')
  87. > CREATE TABLE upsert_wide_key FROM SOURCE upsert_wide_key_src (REFERENCE "testdrive-upsert-wide-key-${{testdrive.seed}}")
  88. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  89. ENVELOPE UPSERT
  90. > CREATE MATERIALIZED VIEW upsert_wide_key_view AS
  91. SELECT LEFT(key1, 1), RIGHT(key1, 1),
  92. LENGTH(key1), f1
  93. FROM upsert_wide_key
  94. """
  95. )
  96. )
  97. def manipulate(self) -> list[Testdrive]:
  98. return [
  99. Testdrive(dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD) + dedent(s))
  100. for s in [
  101. f"""
  102. $ kafka-ingest format=avro key-format=avro topic=upsert-wide-key key-schema=${{keyschema}} schema=${{schema}}
  103. {{"key1": "A{PAD_1M}"}} {{"f1": "A2"}}
  104. {{"key1": "D{PAD_1M}"}} {{"f1": "D1"}}
  105. # Delete B ...
  106. $ kafka-ingest format=avro key-format=avro topic=upsert-wide-key key-schema=${{keyschema}} schema=${{schema}}
  107. {{"key1": "B{PAD_1M}"}}
  108. """,
  109. f"""
  110. $ kafka-ingest format=avro key-format=avro topic=upsert-wide-key key-schema=${{keyschema}} schema=${{schema}}
  111. {{"key1": "A{PAD_1M}"}} {{"f1": "A3"}}
  112. {{"key1": "E{PAD_1M}"}} {{"f1": "E1"}}
  113. # Delete C ...
  114. $ kafka-ingest format=avro key-format=avro topic=upsert-wide-key key-schema=${{keyschema}} schema=${{schema}}
  115. {{"key1": "C{PAD_1M}"}}
  116. """,
  117. ]
  118. ]
  119. def validate(self) -> Testdrive:
  120. return Testdrive(
  121. dedent(
  122. """
  123. > SELECT * FROM upsert_wide_key_view
  124. A Z 1024001 A3
  125. D Z 1024001 D1
  126. E Z 1024001 E1
  127. """
  128. )
  129. )