json_source.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from textwrap import dedent
  10. from materialize.checks.actions import Testdrive
  11. from materialize.checks.checks import Check, externally_idempotent
  12. @externally_idempotent(False)
  13. class JsonSource(Check):
  14. """Test CREATE SOURCE ... FORMAT JSON"""
  15. def initialize(self) -> Testdrive:
  16. return Testdrive(
  17. dedent(
  18. """
  19. $ kafka-create-topic topic=format-json partitions=1
  20. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-json
  21. "object":{"a":"b","c":"d"}
  22. > CREATE CLUSTER single_replica_cluster SIZE '1';
  23. > CREATE SOURCE format_jsonA_src
  24. IN CLUSTER single_replica_cluster
  25. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-json-${testdrive.seed}')
  26. > CREATE TABLE format_jsonA FROM SOURCE format_jsonA_src (REFERENCE "testdrive-format-json-${testdrive.seed}")
  27. KEY FORMAT JSON
  28. VALUE FORMAT JSON
  29. ENVELOPE UPSERT
  30. > CREATE SOURCE format_jsonB_src
  31. IN CLUSTER single_replica_cluster
  32. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-json-${testdrive.seed}')
  33. > CREATE TABLE format_jsonB FROM SOURCE format_jsonB_src (REFERENCE "testdrive-format-json-${testdrive.seed}")
  34. KEY FORMAT JSON
  35. VALUE FORMAT JSON
  36. ENVELOPE UPSERT
  37. """
  38. )
  39. )
  40. def manipulate(self) -> list[Testdrive]:
  41. return [
  42. Testdrive(dedent(s))
  43. for s in [
  44. """
  45. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-json
  46. "float":1.23
  47. "str":"hello"
  48. """,
  49. """
  50. $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-json
  51. "array":[1,2,3]
  52. "int":1
  53. """,
  54. ]
  55. ]
  56. def validate(self) -> Testdrive:
  57. return Testdrive(
  58. dedent(
  59. """
  60. > SELECT * FROM format_jsonA ORDER BY key
  61. "\\"array\\"" [1,2,3]
  62. "\\"float\\"" 1.23
  63. "\\"int\\"" 1
  64. "\\"object\\"" "{\\"a\\":\\"b\\",\\"c\\":\\"d\\"}"
  65. "\\"str\\"" "\\"hello\\""
  66. > SELECT * FROM format_jsonB ORDER BY key
  67. "\\"array\\"" [1,2,3]
  68. "\\"float\\"" 1.23
  69. "\\"int\\"" 1
  70. "\\"object\\"" "{\\"a\\":\\"b\\",\\"c\\":\\"d\\"}"
  71. "\\"str\\"" "\\"hello\\""
  72. """
  73. + r"""
  74. >[version>=14000] SHOW CREATE SOURCE format_jsonb_src;
  75. materialize.public.format_jsonb_src "CREATE SOURCE materialize.public.format_jsonb_src\nIN CLUSTER single_replica_cluster\nFROM KAFKA CONNECTION materialize.public.kafka_conn (TOPIC = 'testdrive-format-json-${testdrive.seed}')\nEXPOSE PROGRESS AS materialize.public.format_jsonb_src_progress;"
  76. >[version<14000] SHOW CREATE SOURCE format_jsonb_src;
  77. materialize.public.format_jsonb_src "CREATE SOURCE \"materialize\".\"public\".\"format_jsonb_src\" IN CLUSTER \"single_replica_cluster\" FROM KAFKA CONNECTION \"materialize\".\"public\".\"kafka_conn\" (TOPIC = 'testdrive-format-json-1') EXPOSE PROGRESS AS \"materialize\".\"public\".\"format_jsonb_src_progress\""
  78. """
  79. )
  80. )