__init__.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """The implementation of the mzcompose system for Docker compositions.
  10. For an overview of what mzcompose is and why it exists, see the [user-facing
  11. documentation][user-docs].
  12. [user-docs]: https://github.com/MaterializeInc/materialize/blob/main/doc/developer/mzbuild.md
  13. """
  14. import os
  15. import random
  16. import subprocess
  17. import sys
  18. from collections.abc import Iterable
  19. from dataclasses import dataclass
  20. from typing import Any, Literal, TypeVar
  21. import psycopg
  22. from materialize import spawn, ui
  23. from materialize.mz_version import MzVersion
  24. from materialize.ui import UIError
  25. T = TypeVar("T")
  26. say = ui.speaker("C> ")
  27. DEFAULT_CONFLUENT_PLATFORM_VERSION = "7.9.0"
  28. DEFAULT_MZ_VOLUMES = [
  29. "mzdata:/mzdata",
  30. "mydata:/var/lib/mysql-files",
  31. "tmp:/share/tmp",
  32. "scratch:/scratch",
  33. ]
  34. # Parameters which disable systems that periodically/unpredictably impact performance
  35. ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS = {
  36. "enable_statement_lifecycle_logging": "false",
  37. "persist_catalog_force_compaction_fuel": "0",
  38. "statement_logging_default_sample_rate": "0",
  39. "statement_logging_max_sample_rate": "0",
  40. # Default of 128 MB increases memory usage by a lot for some small
  41. # performance in benchmarks, see for example FastPathLimit scenario: 55%
  42. # more memory, 5% faster
  43. "persist_blob_cache_mem_limit_bytes": "1048576",
  44. # This would increase the memory usage of many tests, making it harder to
  45. # tell small memory increase regressions
  46. "persist_blob_cache_scale_with_threads": "false",
  47. # The peek response stash kicks in when results get larger, and it
  48. # increases query latency. Which in turn makes benchmarking more
  49. # unpredictable.
  50. "enable_compute_peek_response_stash": "false",
  51. }
  52. def get_minimal_system_parameters(
  53. version: MzVersion,
  54. zero_downtime: bool = False,
  55. ) -> dict[str, str]:
  56. """Settings we need in order to have tests run at all, but otherwise stay
  57. with the defaults: not changing performance or increasing coverage."""
  58. return {
  59. # -----
  60. # Unsafe functions
  61. "unsafe_enable_unsafe_functions": "true",
  62. # -----
  63. # Others (ordered by name)
  64. "allow_real_time_recency": "true",
  65. "constraint_based_timestamp_selection": "verify",
  66. "enable_compute_peek_response_stash": "true",
  67. "enable_0dt_deployment": "true" if zero_downtime else "false",
  68. "enable_0dt_deployment_panic_after_timeout": "true",
  69. "enable_0dt_deployment_sources": (
  70. "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false"
  71. ),
  72. "enable_alter_swap": "true",
  73. "enable_columnar_lgalloc": "false",
  74. "enable_columnation_lgalloc": "false",
  75. "enable_compute_active_dataflow_cancelation": "true",
  76. "enable_compute_correction_v2": "true",
  77. "enable_compute_logical_backpressure": "true",
  78. "enable_connection_validation_syntax": "true",
  79. "enable_continual_task_create": "true",
  80. "enable_continual_task_retain": "true",
  81. "enable_continual_task_transform": "true",
  82. "enable_copy_to_expr": "true",
  83. "enable_create_table_from_source": "true",
  84. "enable_disk_cluster_replicas": "true",
  85. "enable_eager_delta_joins": "true",
  86. "enable_envelope_debezium_in_subscribe": "true",
  87. "enable_expressions_in_limit_syntax": "true",
  88. "enable_introspection_subscribes": "true",
  89. "enable_kafka_sink_partition_by": "true",
  90. "enable_lgalloc": "false",
  91. "enable_logical_compaction_window": "true",
  92. "enable_multi_worker_storage_persist_sink": "true",
  93. "enable_multi_replica_sources": "true",
  94. "enable_rbac_checks": "true",
  95. "enable_reduce_mfp_fusion": "true",
  96. "enable_refresh_every_mvs": "true",
  97. "enable_cluster_schedule_refresh": "true",
  98. "enable_statement_lifecycle_logging": "true",
  99. "enable_compute_temporal_bucketing": "true",
  100. "enable_variadic_left_join_lowering": "true",
  101. "enable_worker_core_affinity": "true",
  102. "grpc_client_http2_keep_alive_timeout": "5s",
  103. "ore_overflowing_behavior": "panic",
  104. "persist_stats_audit_percent": "100",
  105. "unsafe_enable_table_keys": "true",
  106. "with_0dt_deployment_max_wait": "1800s",
  107. # End of list (ordered by name)
  108. }
  109. @dataclass
  110. class VariableSystemParameter:
  111. key: str
  112. default: str
  113. values: list[str]
  114. # TODO: The linter should check this too
  115. def get_variable_system_parameters(
  116. version: MzVersion,
  117. zero_downtime: bool,
  118. force_source_table_syntax: bool,
  119. ) -> list[VariableSystemParameter]:
  120. return [
  121. # -----
  122. # To reduce CRDB load as we are struggling with it in CI (values based on load test environment):
  123. VariableSystemParameter(
  124. "persist_next_listen_batch_retryer_clamp",
  125. "16s",
  126. ["100ms", "1s", "10s", "100s"],
  127. ),
  128. VariableSystemParameter(
  129. "persist_next_listen_batch_retryer_initial_backoff",
  130. "100ms",
  131. ["10ms", "100ms", "1s", "10s"],
  132. ),
  133. VariableSystemParameter(
  134. "persist_next_listen_batch_retryer_fixed_sleep",
  135. "1200ms",
  136. ["100ms", "1s", "10s"],
  137. ),
  138. # -----
  139. # Persist internals changes, advance coverage
  140. VariableSystemParameter(
  141. "persist_enable_arrow_lgalloc_noncc_sizes", "true", ["true", "false"]
  142. ),
  143. VariableSystemParameter(
  144. "persist_enable_s3_lgalloc_noncc_sizes", "true", ["true", "false"]
  145. ),
  146. # -----
  147. # Others (ordered by name),
  148. VariableSystemParameter("cluster_always_use_disk", "true", ["true", "false"]),
  149. VariableSystemParameter(
  150. "compute_dataflow_max_inflight_bytes",
  151. "134217728",
  152. ["1048576", "4194304", "16777216", "67108864"],
  153. ), # 128 MiB
  154. VariableSystemParameter("compute_hydration_concurrency", "2", ["1", "2", "4"]),
  155. VariableSystemParameter(
  156. "compute_replica_expiration_offset", "3d", ["3d", "10d"]
  157. ),
  158. VariableSystemParameter(
  159. "compute_apply_column_demands", "true", ["true", "false"]
  160. ),
  161. VariableSystemParameter(
  162. "compute_peek_response_stash_threshold_bytes",
  163. # 1 MiB, an in-between value
  164. "1048576",
  165. # force-enabled, the in-between, and the production value
  166. ["0", "1048576", "314572800", "67108864"],
  167. ),
  168. VariableSystemParameter(
  169. "disk_cluster_replicas_default", "true", ["true", "false"]
  170. ),
  171. VariableSystemParameter(
  172. "kafka_default_metadata_fetch_interval",
  173. "1s",
  174. ["100ms", "1s"],
  175. ),
  176. VariableSystemParameter("mysql_offset_known_interval", "1s", ["100ms", "1s"]),
  177. VariableSystemParameter(
  178. "force_source_table_syntax",
  179. "true" if force_source_table_syntax else "false",
  180. ["true", "false"] if force_source_table_syntax else ["false"],
  181. ),
  182. VariableSystemParameter(
  183. "persist_batch_columnar_format",
  184. "structured" if version > MzVersion.parse_mz("v0.135.0-dev") else "both_v2",
  185. ["row", "both_v2", "both", "structured"],
  186. ),
  187. VariableSystemParameter(
  188. "persist_batch_delete_enabled", "true", ["true", "false"]
  189. ),
  190. VariableSystemParameter(
  191. "persist_batch_structured_order", "true", ["true", "false"]
  192. ),
  193. VariableSystemParameter(
  194. "persist_batch_builder_structured", "true", ["true", "false"]
  195. ),
  196. VariableSystemParameter(
  197. "persist_batch_structured_key_lower_len",
  198. "256",
  199. ["0", "1", "512", "1000"],
  200. ),
  201. VariableSystemParameter(
  202. "persist_batch_max_run_len", "4", ["2", "3", "4", "16"]
  203. ),
  204. VariableSystemParameter(
  205. "persist_catalog_force_compaction_fuel",
  206. "1024",
  207. ["256", "1024", "4096"],
  208. ),
  209. VariableSystemParameter(
  210. "persist_catalog_force_compaction_wait",
  211. "1s",
  212. ["100ms", "1s", "10s"],
  213. ),
  214. VariableSystemParameter(
  215. "persist_encoding_enable_dictionary", "true", ["true", "false"]
  216. ),
  217. VariableSystemParameter(
  218. "persist_fast_path_limit",
  219. "1000",
  220. ["100", "1000", "10000"],
  221. ),
  222. VariableSystemParameter("persist_fast_path_order", "true", ["true", "false"]),
  223. VariableSystemParameter(
  224. "persist_gc_use_active_gc",
  225. ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"),
  226. (
  227. ["true", "false"]
  228. if version > MzVersion.parse_mz("v0.127.0-dev")
  229. else ["false"]
  230. ),
  231. ),
  232. VariableSystemParameter(
  233. "persist_gc_min_versions",
  234. "16",
  235. ["16", "256", "1024"],
  236. ),
  237. VariableSystemParameter(
  238. "persist_gc_max_versions",
  239. "128000",
  240. ["256", "128000"],
  241. ),
  242. VariableSystemParameter(
  243. "persist_inline_writes_single_max_bytes",
  244. "4096",
  245. ["256", "1024", "4096", "16384"],
  246. ),
  247. VariableSystemParameter(
  248. "persist_inline_writes_total_max_bytes",
  249. "1048576",
  250. ["65536", "262144", "1048576", "4194304"],
  251. ),
  252. VariableSystemParameter(
  253. "persist_pubsub_client_enabled", "true", ["true", "false"]
  254. ),
  255. VariableSystemParameter(
  256. "persist_pubsub_push_diff_enabled", "true", ["true", "false"]
  257. ),
  258. VariableSystemParameter(
  259. "persist_record_compactions", "true", ["true", "false"]
  260. ),
  261. VariableSystemParameter(
  262. "persist_record_schema_id",
  263. ("true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"),
  264. (
  265. ["true", "false"]
  266. if version > MzVersion.parse_mz("v0.127.0-dev")
  267. else ["false"]
  268. ),
  269. ),
  270. VariableSystemParameter(
  271. "persist_rollup_use_active_rollup",
  272. ("true" if version > MzVersion.parse_mz("v0.143.0-dev") else "false"),
  273. (
  274. ["true", "false"]
  275. if version > MzVersion.parse_mz("v0.127.0-dev")
  276. else ["false"]
  277. ),
  278. ),
  279. # 16 MiB - large enough to avoid a big perf hit, small enough to get more coverage...
  280. VariableSystemParameter(
  281. "persist_blob_target_size",
  282. "16777216",
  283. ["4096", "1048576", "16777216", "134217728"],
  284. ),
  285. # 5 times the default part size - 4 is the bare minimum.
  286. VariableSystemParameter(
  287. "persist_compaction_memory_bound_bytes",
  288. "83886080",
  289. ["67108864", "134217728", "536870912", "1073741824"],
  290. ),
  291. VariableSystemParameter(
  292. "persist_use_critical_since_catalog", "true", ["true", "false"]
  293. ),
  294. VariableSystemParameter(
  295. "persist_use_critical_since_snapshot",
  296. "false" if zero_downtime else "true",
  297. ["false"] if zero_downtime else ["true", "false"],
  298. ),
  299. VariableSystemParameter(
  300. "persist_use_critical_since_source",
  301. "false" if zero_downtime else "true",
  302. ["false"] if zero_downtime else ["true", "false"],
  303. ),
  304. VariableSystemParameter(
  305. "persist_part_decode_format", "arrow", ["arrow", "row_with_validate"]
  306. ),
  307. VariableSystemParameter(
  308. "persist_blob_cache_scale_with_threads", "true", ["true", "false"]
  309. ),
  310. VariableSystemParameter(
  311. "persist_validate_part_bounds_on_read", "true", ["true", "false"]
  312. ),
  313. VariableSystemParameter(
  314. "persist_validate_part_bounds_on_write", "true", ["true", "false"]
  315. ),
  316. VariableSystemParameter("pg_offset_known_interval", "1s", ["100ms", "1s"]),
  317. VariableSystemParameter(
  318. "statement_logging_default_sample_rate", "0.01", ["0", "0.01"]
  319. ),
  320. VariableSystemParameter(
  321. "statement_logging_max_sample_rate", "0.01", ["0", "0.01"]
  322. ),
  323. VariableSystemParameter("storage_reclock_to_latest", "true", ["true", "false"]),
  324. VariableSystemParameter(
  325. "storage_source_decode_fuel",
  326. "100000",
  327. ["10000", "100000", "1000000"],
  328. ),
  329. VariableSystemParameter(
  330. "storage_statistics_collection_interval",
  331. "1000",
  332. ["100", "1000", "10000"],
  333. ),
  334. VariableSystemParameter(
  335. "storage_statistics_interval", "2000", ["100", "1000", "10000"]
  336. ),
  337. VariableSystemParameter(
  338. "storage_use_continual_feedback_upsert", "true", ["true", "false"]
  339. ),
  340. # End of list (ordered by name)
  341. ]
  342. def get_default_system_parameters(
  343. version: MzVersion | None = None,
  344. zero_downtime: bool = False,
  345. force_source_table_syntax: bool = False,
  346. ) -> dict[str, str]:
  347. """For upgrade tests we only want parameters set when all environmentd /
  348. clusterd processes have reached a specific version (or higher)
  349. """
  350. if not version:
  351. version = MzVersion.parse_cargo()
  352. params = get_minimal_system_parameters(version, zero_downtime)
  353. system_param_setting = os.getenv("CI_SYSTEM_PARAMETERS", "")
  354. variable_params = get_variable_system_parameters(
  355. version, zero_downtime, force_source_table_syntax
  356. )
  357. if system_param_setting == "":
  358. for param in variable_params:
  359. params[param.key] = param.default
  360. elif system_param_setting == "random":
  361. seed = os.getenv("CI_SYSTEM_PARAMETERS_SEED", os.getenv("BUILDKITE_JOB_ID", 1))
  362. rng = random.Random(seed)
  363. for param in variable_params:
  364. params[param.key] = rng.choice(param.values)
  365. print(
  366. f"System parameters with seed CI_SYSTEM_PARAMETERS_SEED={seed}: {params}",
  367. file=sys.stderr,
  368. )
  369. elif system_param_setting == "minimal":
  370. pass
  371. else:
  372. raise ValueError(
  373. f"Unknown value for CI_SYSTEM_PARAMETERS: {system_param_setting}"
  374. )
  375. return params
  376. # If you are adding a new config flag in Materialize, consider setting values
  377. # for it in get_variable_system_parameters if it can be varied in tests. Set it
  378. # in get_minimal_system_parameters if it's required for tests to succeed at
  379. # all. Only add it in UNINTERESTING_SYSTEM_PARAMETERS if none of the above
  380. # apply.
  381. UNINTERESTING_SYSTEM_PARAMETERS = [
  382. "enable_mz_join_core",
  383. "enable_compute_mv_append_smearing",
  384. "linear_join_yielding",
  385. "enable_lgalloc_eager_reclamation",
  386. "lgalloc_background_interval",
  387. "lgalloc_file_growth_dampener",
  388. "lgalloc_local_buffer_bytes",
  389. "lgalloc_slow_clear_bytes",
  390. "lgalloc_limiter_interval",
  391. "lgalloc_limiter_usage_factor",
  392. "lgalloc_limiter_usage_bias",
  393. "lgalloc_limiter_burst_factor",
  394. "memory_limiter_interval",
  395. "memory_limiter_usage_factor",
  396. "memory_limiter_usage_bias",
  397. "memory_limiter_burst_factor",
  398. "compute_server_maintenance_interval",
  399. "compute_dataflow_max_inflight_bytes_cc",
  400. "compute_flat_map_fuel",
  401. "compute_temporal_bucketing_summary",
  402. "consolidating_vec_growth_dampener",
  403. "copy_to_s3_parquet_row_group_file_ratio",
  404. "copy_to_s3_arrow_builder_buffer_ratio",
  405. "copy_to_s3_multipart_part_size_bytes",
  406. "enable_compute_replica_expiration",
  407. "enable_compute_render_fueled_as_specific_collection",
  408. "compute_logical_backpressure_max_retained_capabilities",
  409. "compute_logical_backpressure_inflight_slack",
  410. "persist_fetch_semaphore_cost_adjustment",
  411. "persist_fetch_semaphore_permit_adjustment",
  412. "persist_optimize_ignored_data_fetch",
  413. "persist_pubsub_same_process_delegate_enabled",
  414. "persist_pubsub_connect_attempt_timeout",
  415. "persist_pubsub_request_timeout",
  416. "persist_pubsub_connect_max_backoff",
  417. "persist_pubsub_client_sender_channel_size",
  418. "persist_pubsub_client_receiver_channel_size",
  419. "persist_pubsub_server_connection_channel_size",
  420. "persist_pubsub_state_cache_shard_ref_channel_size",
  421. "persist_pubsub_reconnect_backoff",
  422. "persist_encoding_compression_format",
  423. "persist_batch_max_runs",
  424. "persist_write_combine_inline_writes",
  425. "persist_reader_lease_duration",
  426. "persist_consensus_connection_pool_max_size",
  427. "persist_consensus_connection_pool_max_wait",
  428. "persist_consensus_connection_pool_ttl",
  429. "persist_consensus_connection_pool_ttl_stagger",
  430. "crdb_connect_timeout",
  431. "crdb_tcp_user_timeout",
  432. "persist_use_critical_since_txn",
  433. "use_global_txn_cache_source",
  434. "persist_batch_builder_max_outstanding_parts",
  435. "persist_compaction_heuristic_min_inputs",
  436. "persist_compaction_heuristic_min_parts",
  437. "persist_compaction_heuristic_min_updates",
  438. "persist_gc_blob_delete_concurrency_limit",
  439. "persist_state_versions_recent_live_diffs_limit",
  440. "persist_usage_state_fetch_concurrency_limit",
  441. "persist_blob_operation_timeout",
  442. "persist_blob_operation_attempt_timeout",
  443. "persist_blob_connect_timeout",
  444. "persist_blob_read_timeout",
  445. "persist_stats_collection_enabled",
  446. "persist_stats_filter_enabled",
  447. "persist_stats_budget_bytes",
  448. "persist_stats_untrimmable_columns_equals",
  449. "persist_stats_untrimmable_columns_prefix",
  450. "persist_stats_untrimmable_columns_suffix",
  451. "persist_expression_cache_force_compaction_fuel",
  452. "persist_expression_cache_force_compaction_wait",
  453. "persist_blob_cache_mem_limit_bytes",
  454. "persist_blob_cache_scale_factor_bytes",
  455. "persist_claim_unclaimed_compactions",
  456. "persist_claim_compaction_percent",
  457. "persist_claim_compaction_min_version",
  458. "persist_next_listen_batch_retryer_multiplier",
  459. "persist_rollup_threshold",
  460. "persist_rollup_fallback_threshold_ms",
  461. "persist_gc_fallback_threshold_ms",
  462. "persist_compaction_minimum_timeout",
  463. "persist_compaction_use_most_recent_schema",
  464. "persist_compaction_check_process_flag",
  465. "balancerd_sigterm_connection_wait",
  466. "balancerd_sigterm_listen_wait",
  467. "balancerd_inject_proxy_protocol_header_http",
  468. "balancerd_log_filter",
  469. "balancerd_opentelemetry_filter",
  470. "balancerd_log_filter_defaults",
  471. "balancerd_opentelemetry_filter_defaults",
  472. "balancerd_sentry_filters",
  473. "persist_enable_s3_lgalloc_cc_sizes",
  474. "persist_enable_arrow_lgalloc_cc_sizes",
  475. "controller_past_generation_replica_cleanup_retry_interval",
  476. "wallclock_lag_recording_interval",
  477. "enable_wallclock_lag_histogram_collection",
  478. "wallclock_lag_histogram_period_interval",
  479. "enable_timely_zero_copy",
  480. "enable_timely_zero_copy_lgalloc",
  481. "timely_zero_copy_limit",
  482. "arrangement_exert_proportionality",
  483. "txn_wal_apply_ensure_schema_match",
  484. "persist_txns_data_shard_retryer_initial_backoff",
  485. "persist_txns_data_shard_retryer_multiplier",
  486. "persist_txns_data_shard_retryer_clamp",
  487. "storage_cluster_shutdown_grace_period",
  488. "storage_dataflow_delay_sources_past_rehydration",
  489. "storage_dataflow_suspendable_sources",
  490. "storage_downgrade_since_during_finalization",
  491. "replica_metrics_history_retention_interval",
  492. "wallclock_lag_history_retention_interval",
  493. "wallclock_global_lag_histogram_retention_interval",
  494. "kafka_client_id_enrichment_rules",
  495. "kafka_poll_max_wait",
  496. "kafka_default_aws_privatelink_endpoint_identification_algorithm",
  497. "kafka_buffered_event_resize_threshold_elements",
  498. "mysql_replication_heartbeat_interval",
  499. "postgres_fetch_slot_resume_lsn_interval",
  500. "pg_schema_validation_interval",
  501. "storage_enforce_external_addresses",
  502. "storage_upsert_prevent_snapshot_buffering",
  503. "storage_rocksdb_use_merge_operator",
  504. "storage_upsert_max_snapshot_batch_buffering",
  505. "storage_rocksdb_cleanup_tries",
  506. "storage_suspend_and_restart_delay",
  507. "storage_sink_snapshot_frontier",
  508. "storage_server_maintenance_interval",
  509. "storage_sink_progress_search",
  510. "storage_sink_ensure_topic_config",
  511. "sql_server_max_lsn_wait",
  512. "sql_server_snapshot_progress_report_interval",
  513. "sql_server_cdc_poll_interval",
  514. "sql_server_cdc_cleanup_change_table",
  515. "sql_server_cdc_cleanup_change_table_max_deletes",
  516. "sql_server_offset_known_interval",
  517. "allow_user_sessions",
  518. "with_0dt_deployment_ddl_check_interval",
  519. "enable_0dt_caught_up_check",
  520. "with_0dt_caught_up_check_allowed_lag",
  521. "with_0dt_caught_up_check_cutoff",
  522. "plan_insights_notice_fast_path_clusters_optimize_duration",
  523. "enable_continual_task_builtins",
  524. "enable_expression_cache",
  525. "enable_password_auth",
  526. "mz_metrics_lgalloc_map_refresh_interval",
  527. "mz_metrics_lgalloc_refresh_interval",
  528. "mz_metrics_rusage_refresh_interval",
  529. "compute_peek_response_stash_batch_max_runs",
  530. "compute_peek_response_stash_read_batch_size_bytes",
  531. "compute_peek_response_stash_read_memory_budget_bytes",
  532. "compute_peek_stash_num_batches",
  533. "compute_peek_stash_batch_size",
  534. "persist_enable_incremental_compaction",
  535. "storage_statistics_retention_duration",
  536. ]
  537. DEFAULT_CRDB_ENVIRONMENT = [
  538. "COCKROACH_ENGINE_MAX_SYNC_DURATION_DEFAULT=120s",
  539. "COCKROACH_LOG_MAX_SYNC_DURATION=120s",
  540. ]
  541. # TODO(benesch): change to `docker-mzcompose` once v0.39 ships.
  542. DEFAULT_CLOUD_PROVIDER = "mzcompose"
  543. DEFAULT_CLOUD_REGION = "us-east-1"
  544. DEFAULT_ORG_ID = "00000000-0000-0000-0000-000000000000"
  545. DEFAULT_ORDINAL = "0"
  546. DEFAULT_MZ_ENVIRONMENT_ID = f"{DEFAULT_CLOUD_PROVIDER}-{DEFAULT_CLOUD_REGION}-{DEFAULT_ORG_ID}-{DEFAULT_ORDINAL}"
  547. # TODO(benesch): replace with Docker health checks.
  548. def _check_tcp(
  549. cmd: list[str], host: str, port: int, timeout_secs: int, kind: str = ""
  550. ) -> list[str]:
  551. cmd.extend(
  552. [
  553. "timeout",
  554. str(timeout_secs),
  555. "bash",
  556. "-c",
  557. f"until [ cat < /dev/null > /dev/tcp/{host}/{port} ] ; do sleep 0.1 ; done",
  558. ]
  559. )
  560. try:
  561. spawn.capture(cmd, stderr=subprocess.STDOUT)
  562. except subprocess.CalledProcessError as e:
  563. ui.log_in_automation(
  564. "wait-for-tcp ({}{}:{}): error running {}: {}, stdout:\n{}\nstderr:\n{}".format(
  565. kind, host, port, ui.shell_quote(cmd), e, e.stdout, e.stderr
  566. )
  567. )
  568. raise
  569. return cmd
  570. # TODO(benesch): replace with Docker health checks.
  571. def _wait_for_pg(
  572. timeout_secs: int,
  573. query: str,
  574. dbname: str,
  575. port: int,
  576. host: str,
  577. user: str,
  578. password: str | None,
  579. expected: Iterable[Any] | Literal["any"],
  580. print_result: bool = False,
  581. sslmode: str = "disable",
  582. ) -> None:
  583. """Wait for a pg-compatible database (includes materialized)"""
  584. obfuscated_password = password[0:1] if password is not None else ""
  585. args = f"dbname={dbname} host={host} port={port} user={user} password='{obfuscated_password}...'"
  586. ui.progress(f"waiting for {args} to handle {query!r}", "C")
  587. error = None
  588. for remaining in ui.timeout_loop(timeout_secs, tick=0.5):
  589. try:
  590. conn = psycopg.connect(
  591. dbname=dbname,
  592. host=host,
  593. port=port,
  594. user=user,
  595. password=password,
  596. connect_timeout=1,
  597. sslmode=sslmode,
  598. )
  599. # The default (autocommit = false) wraps everything in a transaction.
  600. conn.autocommit = True
  601. with conn.cursor() as cur:
  602. cur.execute(query.encode())
  603. if expected == "any" and cur.rowcount == -1:
  604. ui.progress(" success!", finish=True)
  605. return
  606. result = list(cur.fetchall())
  607. if expected == "any" or result == expected:
  608. if print_result:
  609. say(f"query result: {result}")
  610. else:
  611. ui.progress(" success!", finish=True)
  612. return
  613. else:
  614. say(
  615. f"host={host} port={port} did not return rows matching {expected} got: {result}"
  616. )
  617. except Exception as e:
  618. ui.progress(f"{e if print_result else ''} {int(remaining)}")
  619. error = e
  620. ui.progress(finish=True)
  621. raise UIError(f"never got correct result for {args}: {error}")
  622. def bootstrap_cluster_replica_size() -> str:
  623. return "bootstrap"
  624. def cluster_replica_size_map() -> dict[str, dict[str, Any]]:
  625. def replica_size(
  626. workers: int,
  627. scale: int,
  628. disabled: bool = False,
  629. is_cc: bool = True,
  630. memory_limit: str | None = None,
  631. ) -> dict[str, Any]:
  632. return {
  633. "cpu_exclusive": False,
  634. "cpu_limit": None,
  635. "credits_per_hour": f"{workers * scale}",
  636. "disabled": disabled,
  637. "disk_limit": None,
  638. "is_cc": is_cc,
  639. "memory_limit": memory_limit or "4Gi",
  640. "scale": scale,
  641. "workers": workers,
  642. # "selectors": {},
  643. }
  644. replica_sizes = {
  645. bootstrap_cluster_replica_size(): replica_size(1, 1),
  646. "2-4": replica_size(4, 2),
  647. "free": replica_size(0, 0, disabled=True),
  648. "1cc": replica_size(1, 1),
  649. "1C": replica_size(1, 1),
  650. "1-no-disk": replica_size(1, 1, is_cc=False),
  651. "2-no-disk": replica_size(2, 1, is_cc=False),
  652. }
  653. for i in range(0, 6):
  654. workers = 1 << i
  655. replica_sizes[f"{workers}"] = replica_size(workers, 1)
  656. for mem in [4, 8, 16, 32]:
  657. replica_sizes[f"{workers}-{mem}G"] = replica_size(
  658. workers, 1, memory_limit=f"{mem} GiB"
  659. )
  660. replica_sizes[f"{workers}-1"] = replica_size(1, workers)
  661. replica_sizes[f"{workers}-{workers}"] = replica_size(workers, workers)
  662. replica_sizes[f"mem-{workers}"] = replica_size(
  663. workers, 1, memory_limit=f"{workers} GiB"
  664. )
  665. return replica_sizes