rename_source.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from textwrap import dedent
  10. from materialize.checks.actions import Testdrive
  11. from materialize.checks.checks import Check, externally_idempotent
  12. @externally_idempotent(False)
  13. class RenameSource(Check):
  14. def _source_schema(self) -> str:
  15. return dedent(
  16. """
  17. $ set rename-source-schema={
  18. "type" : "record",
  19. "name" : "test",
  20. "fields" : [
  21. {"name":"f1", "type":"string"}
  22. ]
  23. }
  24. """
  25. )
  26. def initialize(self) -> Testdrive:
  27. return Testdrive(
  28. self._source_schema()
  29. + dedent(
  30. """
  31. $ kafka-create-topic topic=rename-source
  32. $ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema}
  33. {"f1": "A"}
  34. > CREATE SOURCE rename_source1_src
  35. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-rename-source-${testdrive.seed}')
  36. > CREATE TABLE rename_source1_tbl FROM SOURCE rename_source1_src (REFERENCE "testdrive-rename-source-${testdrive.seed}")
  37. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  38. ENVELOPE NONE
  39. $ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema}
  40. {"f1": "B"}
  41. > CREATE MATERIALIZED VIEW rename_source_view AS SELECT DISTINCT f1 FROM rename_source1_tbl;
  42. $ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema}
  43. {"f1": "C"}
  44. """
  45. )
  46. )
  47. def manipulate(self) -> list[Testdrive]:
  48. return [
  49. Testdrive(self._source_schema() + dedent(s))
  50. for s in [
  51. """
  52. $ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema}
  53. {"f1": "D"}
  54. > ALTER SOURCE rename_source1_src RENAME to rename_source2_src;
  55. $ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema}
  56. {"f1": "E"}
  57. """,
  58. """
  59. $ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema}
  60. {"f1": "F"}
  61. > ALTER SOURCE rename_source2_src RENAME to rename_source3_src;
  62. $ kafka-ingest format=avro topic=rename-source schema=${rename-source-schema}
  63. {"f1": "G"}
  64. """,
  65. ]
  66. ]
  67. def validate(self) -> Testdrive:
  68. return Testdrive(
  69. dedent(
  70. """
  71. > SELECT * FROM rename_source1_tbl;
  72. A
  73. B
  74. C
  75. D
  76. E
  77. F
  78. G
  79. > SELECT * FROM rename_source_view;
  80. A
  81. B
  82. C
  83. D
  84. E
  85. F
  86. G
  87. """
  88. )
  89. )