upsert_many_updates.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from textwrap import dedent
  10. from materialize.checks.actions import Testdrive
  11. from materialize.checks.checks import Check
  12. from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
  13. INCREMENTS = 100000
  14. class UpsertManyUpdates(Check):
  15. """Update the same row over and over"""
  16. def initialize(self) -> Testdrive:
  17. return Testdrive(
  18. dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
  19. + dedent(
  20. """
  21. $ kafka-create-topic topic=upsert-many-updates
  22. $ kafka-ingest format=avro key-format=avro topic=upsert-many-updates key-schema=${keyschema} schema=${schema}
  23. {"key1": "A"} {"f1": "0"}
  24. > CREATE SOURCE upsert_many_updates_src
  25. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-many-updates-${testdrive.seed}')
  26. > CREATE TABLE upsert_many_updates FROM SOURCE upsert_many_updates_src (REFERENCE "testdrive-upsert-many-updates-${testdrive.seed}")
  27. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  28. ENVELOPE UPSERT
  29. > CREATE MATERIALIZED VIEW upsert_many_updates_view AS
  30. SELECT f1 FROM upsert_many_updates
  31. """
  32. )
  33. )
  34. def manipulate(self) -> list[Testdrive]:
  35. # Construct inputs for $kafka-ingest where every update is a separate Kafka message to be ingested
  36. increment1 = "\n".join(
  37. [f"""{{"key1": "A"}} {{"f1": "{i+1}"}}""" for i in range(INCREMENTS)]
  38. )
  39. increment2 = "\n".join(
  40. [
  41. f"""{{"key1": "A"}} {{"f1": "{INCREMENTS+i+1}"}}"""
  42. for i in range(INCREMENTS)
  43. ]
  44. )
  45. return [
  46. Testdrive(
  47. dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
  48. + dedent(
  49. """
  50. $ kafka-ingest format=avro key-format=avro topic=upsert-many-updates key-schema=${keyschema} schema=${schema}
  51. """
  52. )
  53. + increment1
  54. ),
  55. Testdrive(
  56. dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
  57. + dedent(
  58. """
  59. $ kafka-ingest format=avro key-format=avro topic=upsert-many-updates key-schema=${keyschema} schema=${schema}
  60. """
  61. )
  62. + increment2
  63. ),
  64. ]
  65. def validate(self) -> Testdrive:
  66. return Testdrive(
  67. dedent(
  68. f"""
  69. > SELECT * FROM upsert_many_updates
  70. A {INCREMENTS*2}
  71. """
  72. )
  73. )