upsert_shrink_grow.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from textwrap import dedent
  10. from materialize.checks.actions import Testdrive
  11. from materialize.checks.checks import Check
  12. from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
  13. def schemas() -> str:
  14. return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
  15. class ShrinkGrow:
  16. def initialize(self) -> Testdrive:
  17. name = self.name()
  18. pads = self.pads()
  19. return Testdrive(
  20. schemas()
  21. + dedent(
  22. f"""
  23. $ kafka-create-topic topic=upsert-update-{name}
  24. $ kafka-ingest format=avro key-format=avro topic=upsert-update-{name} key-schema=${{keyschema}} schema=${{schema}} repeat=10000
  25. {{"key1": "${{kafka-ingest.iteration}}"}} {{"f1": "A${{kafka-ingest.iteration}}{pads[0]}A"}}
  26. > CREATE SOURCE upsert_update_{name}_src
  27. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-update-{name}-${{testdrive.seed}}')
  28. > CREATE TABLE upsert_update_{name} FROM SOURCE upsert_update_{name}_src (REFERENCE "testdrive-upsert-update-{name}-${{testdrive.seed}}")
  29. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  30. ENVELOPE UPSERT
  31. > CREATE MATERIALIZED VIEW upsert_update_{name}_view AS
  32. SELECT LEFT(f1, 1), RIGHT(f1, 1),
  33. COUNT(*) AS c1, COUNT(DISTINCT key1) AS c2, COUNT(DISTINCT f1) AS c3,
  34. MIN(LENGTH(f1)) AS l1, MAX(LENGTH(f1)) AS l2
  35. FROM upsert_update_{name}
  36. GROUP BY LEFT(f1, 1), RIGHT(f1, 1);
  37. """
  38. )
  39. )
  40. def manipulate(self) -> list[Testdrive]:
  41. name = self.name()
  42. pads = self.pads()
  43. return [
  44. Testdrive(schemas() + dedent(s))
  45. for s in [
  46. f"""
  47. $ kafka-ingest format=avro key-format=avro topic=upsert-update-{name} key-schema=${{keyschema}} schema=${{schema}} repeat=10000
  48. {{"key1": "${{kafka-ingest.iteration}}"}} {{"f1": "B${{kafka-ingest.iteration}}{pads[1]}B"}}
  49. """,
  50. f"""
  51. $ kafka-ingest format=avro key-format=avro topic=upsert-update-{name} key-schema=${{keyschema}} schema=${{schema}} repeat=10000
  52. {{"key1": "${{kafka-ingest.iteration}}"}} {{"f1": "C${{kafka-ingest.iteration}}{pads[2]}C"}}
  53. """,
  54. ]
  55. ]
  56. def validate(self) -> Testdrive:
  57. name = self.name()
  58. last_pad_length = len(self.pads()[-1])
  59. return Testdrive(
  60. dedent(
  61. f"""
  62. > SELECT * FROM upsert_update_{name}_view;
  63. C C 10000 10000 10000 {last_pad_length+3} {last_pad_length+6}
  64. """
  65. )
  66. )
  67. def name(self) -> str:
  68. raise NotImplementedError
  69. def pads(self) -> list[str]:
  70. raise NotImplementedError
  71. class UpsertUpdateShrink(ShrinkGrow, Check):
  72. """Upserts where the data length shrinks"""
  73. def name(self) -> str:
  74. return "shrink"
  75. def pads(self) -> list[str]:
  76. return ["x" * 1024, "x" * 512, "x" * 256]
  77. class UpsertUpdateGrow(ShrinkGrow, Check):
  78. """Upserts where the data lenth grows"""
  79. def name(self) -> str:
  80. return "grow"
  81. def pads(self) -> list[str]:
  82. return ["x" * 256, "x" * 512, "x" * 1024]