mzcompose.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Test that setting feature flags works
  11. """
  12. import argparse
  13. from textwrap import dedent, indent
  14. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  15. from materialize.mzcompose.services.materialized import Materialized
  16. from materialize.mzcompose.services.redpanda import Redpanda
  17. from materialize.mzcompose.services.testdrive import Testdrive
  18. from materialize.util import all_subclasses
  19. SERVICES = [
  20. Redpanda(),
  21. Materialized(unsafe_mode=False),
  22. Testdrive(no_reset=True, seed=1),
  23. ]
  24. MZ_SYSTEM_CONNECTION_URL = (
  25. "postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}"
  26. )
  27. USER_CONNECTION_URL = (
  28. "postgres://materialize:materialize@${testdrive.materialize-sql-addr}"
  29. )
  30. def header(test_name: str, drop_schema: bool) -> str:
  31. """Generate a TD header for a SQL feature test scenario."""
  32. header = dedent(
  33. f"""
  34. # Feature test for SQL feature test: {test_name}
  35. #####################################{'#' * len(test_name)}
  36. """
  37. )
  38. # Re-create schema (optional).
  39. if drop_schema:
  40. header += dedent(
  41. f"""
  42. $ postgres-execute connection=postgres://mz_system@materialized:6877/materialize
  43. DROP SCHEMA IF EXISTS public CASCADE;
  44. CREATE SCHEMA public /* {test_name} */;
  45. GRANT ALL PRIVILEGES ON SCHEMA public TO materialize;
  46. """
  47. )
  48. # Create connections.
  49. header += dedent(
  50. f"""
  51. $ postgres-connect name=user url={USER_CONNECTION_URL}
  52. $ postgres-connect name=mz_system url={MZ_SYSTEM_CONNECTION_URL}
  53. """
  54. )
  55. return header.strip()
  56. def statement_error(statement: str, error_msg: str) -> str:
  57. """Generate a TD command that asserts that `statement` fails with `error_msg`."""
  58. return "\n".join(
  59. [
  60. indent(statement.strip(), prefix=" ").replace(" ", "! ", 1),
  61. f"contains:{error_msg}",
  62. ]
  63. )
  64. def statement_ok(statement: str) -> str:
  65. """Generate a TD command that executes `statement`."""
  66. return indent(statement.strip(), prefix=" ").replace(" ", "> ", 1)
  67. def query_ok(query: str) -> str:
  68. """Generate a TD command that asserts that a query does not fail."""
  69. return "\n".join(
  70. [
  71. "$ postgres-execute connection=user",
  72. query.strip(),
  73. ]
  74. )
  75. def alter_system_set(name: str, value: str) -> str:
  76. """Generate a TD command that sets a system parameter."""
  77. return dedent(
  78. f"""
  79. $ postgres-execute connection=mz_system
  80. ALTER SYSTEM SET {name} = '{value}';
  81. """
  82. ).strip()
  83. def alter_system_reset(name: str) -> str:
  84. """Generate a TD command that resets a system parameter."""
  85. return dedent(
  86. f"""
  87. $ postgres-execute connection=mz_system
  88. ALTER SYSTEM RESET {name};
  89. """
  90. ).strip()
  91. def alter_system_reset_all() -> str:
  92. """Generate a TD command that reset all system parameters."""
  93. return dedent(
  94. """
  95. $ postgres-execute connection=mz_system
  96. ALTER SYSTEM RESET ALL;
  97. """
  98. ).strip()
  99. class FeatureTestScenario:
  100. """
  101. A base class for all feature test scenarios.
  102. Each scenario is a `FeatureTestScenario` defined in this file. All
  103. subclasses are included in the `default` mzcompose workflow by default.
  104. """
  105. @classmethod
  106. def phase1(cls) -> str:
  107. return "\n\n".join(
  108. [
  109. # Include the header.
  110. header(f"{cls.__name__} (phase 1)", drop_schema=True),
  111. cls.initialize(),
  112. # Ensure the feature is off, regardless of CI config.
  113. alter_system_set(cls.feature_name(), "off"),
  114. # We cannot create item #1 when the feature is turned off (default).
  115. statement_error(cls.create_item(ordinal=1), cls.feature_error()),
  116. # Turn the feature on.
  117. alter_system_set(cls.feature_name(), "on"),
  118. # We can create item #1 when the feature is turned on.
  119. statement_ok(cls.create_item(ordinal=1)),
  120. # We can query item #1 when the feature is turned on.
  121. query_ok(cls.query_item(ordinal=1)),
  122. # Turn the feature off.
  123. alter_system_set(cls.feature_name(), "off"),
  124. # We cannot create item #2 when the feature is turned off.
  125. statement_error(cls.create_item(ordinal=2), cls.feature_error()),
  126. ]
  127. )
  128. @classmethod
  129. def phase2(cls) -> str:
  130. return "\n\n".join(
  131. [
  132. # Include the header.
  133. header(f"{cls.__name__} (phase 2)", drop_schema=False),
  134. cls.initialize(),
  135. # We can query item #1 when the feature is turned on. Ensures
  136. # that catalog rehydration ignores SQL-level feature flags.
  137. query_ok(cls.query_item(ordinal=1)),
  138. # We can drop item #1.
  139. statement_ok(cls.drop_item(ordinal=1)),
  140. # We cannot create item #2 when the feature is turned off.
  141. # Ensures that the feature flag is respected for new items.
  142. statement_error(cls.create_item(ordinal=2), cls.feature_error()),
  143. ]
  144. )
  145. @classmethod
  146. def phase3(cls) -> str:
  147. return "\n\n".join(
  148. [
  149. # Include the header.
  150. header(f"{cls.__name__} (phase 3)", drop_schema=False),
  151. # Because we have restarted, we need to ensure that we're getting
  152. # the parameter's default value, which will be "on".
  153. alter_system_reset(cls.feature_name()),
  154. cls.initialize(),
  155. # The feature is immediately turned on because it's a default parameter.
  156. statement_ok(cls.create_item(ordinal=1)),
  157. query_ok(cls.query_item(ordinal=1)),
  158. # We can drop item #1.
  159. statement_ok(cls.drop_item(ordinal=1)),
  160. ]
  161. )
  162. @classmethod
  163. def reset_all(cls) -> str:
  164. return "\n\n".join(
  165. [
  166. cls.initialize(),
  167. # The feature is immediately turned on because it's a default parameter.
  168. statement_ok(cls.create_item(ordinal=1)),
  169. query_ok(cls.query_item(ordinal=1)),
  170. # We can drop item #1.
  171. statement_ok(cls.drop_item(ordinal=1)),
  172. ]
  173. )
  174. @classmethod
  175. def feature_name(cls) -> str:
  176. """The name of the feature flag under test."""
  177. raise NotImplementedError
  178. @classmethod
  179. def feature_error(cls) -> str:
  180. """The error expected when the feature is disabled."""
  181. raise NotImplementedError
  182. @classmethod
  183. def initialize(cls) -> str:
  184. """Any SQL statements that must be executed before the statement under test."""
  185. return ""
  186. @classmethod
  187. def create_item(cls, ordinal: int) -> str:
  188. """A SQL statement that creates an item that depends on the feature."""
  189. raise NotImplementedError
  190. @classmethod
  191. def drop_item(cls, ordinal: int) -> str:
  192. """A SQL statement that drops an item that depends on the feature."""
  193. raise NotImplementedError
  194. @classmethod
  195. def query_item(cls, ordinal: int) -> str:
  196. """A SQL query referencing an item that depends on the feature."""
  197. raise NotImplementedError
  198. def run_test(c: Composition, args: argparse.Namespace) -> None:
  199. c.up("redpanda", "materialized", {"name": "testdrive", "persistent": True})
  200. scenarios = (
  201. [globals()[args.scenario]]
  202. if args.scenario
  203. else all_subclasses(FeatureTestScenario)
  204. )
  205. # To add a new scenario create a new FeatureTestScenario subclass
  206. for scenario in scenarios:
  207. print(f"--- Running scenario {scenario.__name__} phase 1")
  208. c.testdrive(scenario.phase1())
  209. c.stop("materialized")
  210. c.up("materialized")
  211. print(f"--- Running scenario {scenario.__name__} phase 2")
  212. c.testdrive(scenario.phase2())
  213. materialized = Materialized(
  214. unsafe_mode=False,
  215. additional_system_parameter_defaults={
  216. scenario.feature_name(): "on",
  217. },
  218. )
  219. with c.override(materialized):
  220. c.stop("materialized")
  221. c.up("materialized")
  222. print(f"--- Running scenario {scenario.__name__} phase 3")
  223. c.testdrive(scenario.phase3())
  224. # Dedicated test for ALTER SYSTEM RESET ALL
  225. print("--- Running ALTER SYSTEM RESET ALL")
  226. tmp = [header("(phase reset-all)", drop_schema=False)]
  227. for scenario in scenarios:
  228. # Turn all features off.
  229. tmp.append(alter_system_set(scenario.feature_name(), "off"))
  230. # Run ALTER SYSTEM RESET ALL
  231. tmp.append(alter_system_reset_all())
  232. for scenario in scenarios:
  233. # Write each scenarios reset all data
  234. tmp.append(scenario.reset_all())
  235. # Create MZ config with all features set on by default
  236. materialized = Materialized(
  237. unsafe_mode=False,
  238. additional_system_parameter_defaults={
  239. scenario.feature_name(): "on" for scenario in scenarios
  240. },
  241. )
  242. with c.override(materialized):
  243. c.stop("materialized")
  244. c.up("materialized")
  245. c.testdrive("\n\n".join(tmp))
  246. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  247. parser.add_argument(
  248. "--scenario", metavar="SCENARIO", type=str, help="Scenario to run."
  249. )
  250. run_test(c, parser.parse_args())