upsert_enrich.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from textwrap import dedent
  10. from materialize.checks.actions import Testdrive
  11. from materialize.checks.checks import Check
  12. PAD_128B = "X" * 128
  13. # 1000 and not 1024 to keep entire Kafka message under 1M
  14. PAD_1K = "Y" * 1000
  15. # A schema that allows null values
  16. SCHEMA = dedent(
  17. """
  18. $ set keyschema={
  19. "type": "record",
  20. "name": "Key",
  21. "fields": [
  22. {"name": "key1", "type": "string"}
  23. ]
  24. }
  25. $ set schema={
  26. "type" : "record",
  27. "name" : "test",
  28. "fields" : [
  29. {"name":"f1", "type": ["null", "string"], "default": null }
  30. ]
  31. }
  32. """
  33. )
  34. class UpsertEnrichValue(Check):
  35. """Progressively enrich records that started their lives as null values.
  36. Progressively empoverish records that started as very large values.
  37. """
  38. def initialize(self) -> Testdrive:
  39. return Testdrive(
  40. SCHEMA
  41. + dedent(
  42. f"""
  43. $ kafka-create-topic topic=upsert-enrich-value
  44. # 'A...' records start as NULLs
  45. $ kafka-ingest format=avro key-format=avro topic=upsert-enrich-value key-schema=${{keyschema}} schema=${{schema}} repeat=1000
  46. {{"key1": "A${{kafka-ingest.iteration}}"}} {{"f1": null}}
  47. # 'B...' records start as 1Ks
  48. $ kafka-ingest format=avro key-format=avro topic=upsert-enrich-value key-schema=${{keyschema}} schema=${{schema}} repeat=1000
  49. {{"key1": "B${{kafka-ingest.iteration}}"}} {{"f1": {{"string":"{PAD_1K}"}}}}
  50. > CREATE SOURCE upsert_enrich_value_src
  51. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-enrich-value-${{testdrive.seed}}')
  52. > CREATE TABLE upsert_enrich_value FROM SOURCE upsert_enrich_value_src (REFERENCE "testdrive-upsert-enrich-value-${{testdrive.seed}}")
  53. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  54. ENVELOPE UPSERT
  55. > CREATE MATERIALIZED VIEW upsert_enrich_value_view AS
  56. SELECT LEFT(key1, 1) AS key_left, LEFT(f1, 1) AS value_left, RIGHT(f1, 1),
  57. LENGTH(f1), COUNT(*), SUM(CASE WHEN f1 IS NULL THEN 1 ELSE 0 END) AS nulls, COUNT(f1) AS not_nulls
  58. FROM upsert_enrich_value
  59. GROUP BY LEFT(key1, 1), f1
  60. """
  61. )
  62. )
  63. def manipulate(self) -> list[Testdrive]:
  64. return [
  65. Testdrive(SCHEMA + dedent(s))
  66. for s in [
  67. f"""
  68. $ kafka-ingest format=avro key-format=avro topic=upsert-enrich-value key-schema=${{keyschema}} schema=${{schema}} repeat=1000
  69. {{"key1": "A${{kafka-ingest.iteration}}"}} {{"f1": {{"string":"{PAD_128B}"}}}}
  70. $ kafka-ingest format=avro key-format=avro topic=upsert-enrich-value key-schema=${{keyschema}} schema=${{schema}} repeat=1000
  71. {{"key1": "B${{kafka-ingest.iteration}}"}} {{"f1": {{"string":"{PAD_128B}"}}}}
  72. """,
  73. f"""
  74. # 'A...' records will now be enriched to 1Ks
  75. $ kafka-ingest format=avro key-format=avro topic=upsert-enrich-value key-schema=${{keyschema}} schema=${{schema}} repeat=1000
  76. {{"key1": "A${{kafka-ingest.iteration}}"}} {{"f1": {{"string":"{PAD_1K}"}}}}
  77. # 'B...' records will now be enpoverished to NULLs
  78. $ kafka-ingest format=avro key-format=avro topic=upsert-enrich-value key-schema=${{keyschema}} schema=${{schema}} repeat=1000
  79. {{"key1": "B${{kafka-ingest.iteration}}"}} {{"f1": null}}
  80. """,
  81. ]
  82. ]
  83. def validate(self) -> Testdrive:
  84. return Testdrive(
  85. dedent(
  86. """
  87. > SELECT * FROM upsert_enrich_value_view
  88. A Y Y 1000 1000 0 1000
  89. B <null> <null> <null> 1000 1000 0
  90. """
  91. )
  92. )