webhook.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from textwrap import dedent
  10. from materialize.checks.actions import Testdrive
  11. from materialize.checks.checks import Check, disabled
  12. from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
  13. from materialize.checks.executors import Executor
  14. from materialize.mz_version import MzVersion
  15. def schemas() -> str:
  16. return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
  17. class Webhook(Check):
  18. def initialize(self) -> Testdrive:
  19. return Testdrive(
  20. schemas()
  21. + dedent(
  22. """
  23. >[version>=14700] CREATE CLUSTER webhook_cluster REPLICATION FACTOR 2, SIZE '1'
  24. >[version<14700] CREATE CLUSTER webhook_cluster REPLICATION FACTOR 1, SIZE '1'
  25. > CREATE SOURCE webhook_text IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT TEXT;
  26. > CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON INCLUDE HEADERS;
  27. > CREATE SOURCE webhook_bytes IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT BYTES;
  28. $ webhook-append database=materialize schema=public name=webhook_text
  29. fooä
  30. $ webhook-append database=materialize schema=public name=webhook_json content-type=application/json app=platform-checks1
  31. {
  32. "hello": "wörld"
  33. }
  34. $ webhook-append database=materialize schema=public name=webhook_bytes
  35. \u0001
  36. """
  37. )
  38. )
  39. def manipulate(self) -> list[Testdrive]:
  40. return [
  41. Testdrive(schemas() + dedent(s))
  42. for s in [
  43. """
  44. $ webhook-append database=materialize schema=public name=webhook_text
  45. bar❤️
  46. $ webhook-append database=materialize schema=public name=webhook_json content-type=application/json app=
  47. {
  48. "still": 123,
  49. "foo": []
  50. }
  51. $ webhook-append database=materialize schema=public name=webhook_bytes
  52. \u0000\u0000\u0000\u0000
  53. """,
  54. """
  55. $ webhook-append database=materialize schema=public name=webhook_text
  56. baz123
  57. $ webhook-append database=materialize schema=public name=webhook_json content-type=application/json app=null
  58. [{"good": "bye"}, 42, null]
  59. $ webhook-append database=materialize schema=public name=webhook_bytes
  60. \u0001\u0002\u0003\u0004
  61. """,
  62. ]
  63. ]
  64. def validate(self) -> Testdrive:
  65. return Testdrive(
  66. dedent(
  67. """
  68. > SHOW COLUMNS FROM webhook_text
  69. body false text ""
  70. > SHOW COLUMNS FROM webhook_json
  71. body false jsonb ""
  72. headers false map ""
  73. > SHOW COLUMNS FROM webhook_bytes
  74. body false bytea ""
  75. > SELECT * FROM webhook_text
  76. fooä
  77. bar❤️
  78. baz123
  79. > SELECT body FROM webhook_json WHERE headers -> 'app' = 'platform-checks1'
  80. "{\\"hello\\":\\"wörld\\"}"
  81. > SELECT body FROM webhook_json WHERE headers -> 'app' = ''
  82. "{\\"foo\\":[],\\"still\\":123}"
  83. > SELECT body FROM webhook_json WHERE headers -> 'app' = 'null'
  84. "[{\\"good\\":\\"bye\\"},42,null]"
  85. > SELECT * FROM webhook_bytes
  86. \\\\x00\\x00\\x00\\x00
  87. \\\\x01
  88. \\\\x01\\x02\\x03\\x04
  89. >[version>=14000] SHOW CREATE SOURCE webhook_text
  90. materialize.public.webhook_text "CREATE SOURCE materialize.public.webhook_text IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT TEXT;"
  91. >[version>=14000] SHOW CREATE SOURCE webhook_json
  92. materialize.public.webhook_json "CREATE SOURCE materialize.public.webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON INCLUDE HEADERS;"
  93. >[version>=14000] SHOW CREATE SOURCE webhook_bytes
  94. materialize.public.webhook_bytes "CREATE SOURCE materialize.public.webhook_bytes IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT BYTES;"
  95. >[version<14000] SHOW CREATE SOURCE webhook_text
  96. materialize.public.webhook_text "CREATE SOURCE \\"materialize\\".\\"public\\".\\"webhook_text\\" IN CLUSTER \\"webhook_cluster\\" FROM WEBHOOK BODY FORMAT TEXT"
  97. >[version<14000] SHOW CREATE SOURCE webhook_json
  98. materialize.public.webhook_json "CREATE SOURCE \\"materialize\\".\\"public\\".\\"webhook_json\\" IN CLUSTER \\"webhook_cluster\\" FROM WEBHOOK BODY FORMAT JSON INCLUDE HEADERS"
  99. >[version<14000] SHOW CREATE SOURCE webhook_bytes
  100. materialize.public.webhook_bytes "CREATE SOURCE \\"materialize\\".\\"public\\".\\"webhook_bytes\\" IN CLUSTER \\"webhook_cluster\\" FROM WEBHOOK BODY FORMAT BYTES"
  101. """
  102. )
  103. )
  104. @disabled(
  105. "Reenable when database-issues#9184 is fixed and there is a way to set the cluster"
  106. )
  107. class WebhookTable(Check):
  108. def _can_run(self, e: Executor) -> bool:
  109. return self.base_version >= MzVersion.parse_mz("v0.130.0-dev")
  110. def initialize(self) -> Testdrive:
  111. return Testdrive(
  112. schemas()
  113. + dedent(
  114. """
  115. > CREATE CLUSTER webhook_table_cluster REPLICATION FACTOR 2, SIZE '1'
  116. > SET cluster = webhook_table_cluster
  117. > CREATE TABLE webhook_table_text FROM WEBHOOK BODY FORMAT TEXT;
  118. > SET cluster = quickstart
  119. $ webhook-append database=materialize schema=public name=webhook_table_text
  120. hello_world
  121. """
  122. )
  123. )
  124. def manipulate(self) -> list[Testdrive]:
  125. return [
  126. Testdrive(schemas() + dedent(s))
  127. for s in [
  128. """
  129. $ webhook-append database=materialize schema=public name=webhook_table_text
  130. anotha_one!
  131. """,
  132. """
  133. $ webhook-append database=materialize schema=public name=webhook_table_text
  134. threeeeeee
  135. """,
  136. ]
  137. ]
  138. def validate(self) -> Testdrive:
  139. return Testdrive(
  140. dedent(
  141. r"""
  142. > SELECT * FROM webhook_table_text
  143. hello_world
  144. anotha_one!
  145. threeeeeee
  146. >[version>=14000] SHOW CREATE TABLE webhook_table_text
  147. materialize.public.webhook_table_text "CREATE TABLE materialize.public.webhook_table_text FROM WEBHOOK BODY FORMAT TEXT;"
  148. >[version<14000] SHOW CREATE TABLE webhook_table_text
  149. materialize.public.webhook_table_text "CREATE TABLE \"materialize\".\"public\".\"webhook_table_text\" FROM WEBHOOK BODY FORMAT TEXT"
  150. """
  151. )
  152. )