mzcompose.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # This mzcompose currently tests `UPSERT` sources with `DISK` configured.
  10. # TODO(guswynn): move ALL upsert-related tests into this directory.
  11. """
  12. Test Kafka Upsert sources using Testdrive.
  13. """
  14. import os
  15. from textwrap import dedent
  16. from materialize import ci_util
  17. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  18. from materialize.mzcompose.services.clusterd import Clusterd
  19. from materialize.mzcompose.services.kafka import Kafka
  20. from materialize.mzcompose.services.materialized import Materialized
  21. from materialize.mzcompose.services.mz import Mz
  22. from materialize.mzcompose.services.redpanda import Redpanda
  23. from materialize.mzcompose.services.schema_registry import SchemaRegistry
  24. from materialize.mzcompose.services.testdrive import Testdrive
  25. from materialize.mzcompose.services.zookeeper import Zookeeper
  26. materialized_environment_extra = ["MZ_PERSIST_COMPACTION_DISABLED=false"]
  27. SERVICES = [
  28. Zookeeper(),
  29. Kafka(),
  30. SchemaRegistry(),
  31. Mz(app_password=""),
  32. Materialized(
  33. options=[
  34. "--orchestrator-process-scratch-directory=/scratch",
  35. ],
  36. additional_system_parameter_defaults={
  37. "disk_cluster_replicas_default": "true",
  38. "unsafe_enable_unorchestrated_cluster_replicas": "true",
  39. "storage_dataflow_delay_sources_past_rehydration": "true",
  40. },
  41. environment_extra=materialized_environment_extra,
  42. default_replication_factor=2,
  43. ),
  44. Testdrive(),
  45. Clusterd(name="clusterd1"),
  46. Redpanda(),
  47. ]
  48. def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
  49. parser.add_argument(
  50. "--compaction-disabled",
  51. action="store_true",
  52. help="Run with MZ_PERSIST_COMPACTION_DISABLED",
  53. )
  54. args = parser.parse_args()
  55. if args.compaction_disabled:
  56. materialized_environment_extra[0] = "MZ_PERSIST_COMPACTION_DISABLED=true"
  57. def process(name: str) -> None:
  58. if name in ["default", "load-test", "large-scale"]:
  59. return
  60. with c.test_case(name):
  61. c.workflow(name)
  62. c.test_parts(list(c.workflows.keys()), process)
  63. def workflow_testdrive(c: Composition, parser: WorkflowArgumentParser) -> None:
  64. """Run testdrive."""
  65. parser.add_argument(
  66. "--kafka-default-partitions",
  67. type=int,
  68. metavar="N",
  69. help="set the default number of kafka partitions per topic",
  70. )
  71. parser.add_argument(
  72. "--default-size",
  73. type=int,
  74. default=Materialized.Size.DEFAULT_SIZE,
  75. help="Use SIZE 'N-N' for replicas and SIZE 'N' for sources",
  76. )
  77. parser.add_argument("--replicas", type=int, default=1, help="use multiple replicas")
  78. parser.add_argument(
  79. "files",
  80. nargs="*",
  81. default=["*.td"],
  82. help="run against the specified files",
  83. )
  84. args = parser.parse_args()
  85. dependencies = ["materialized", "zookeeper", "kafka", "schema-registry"]
  86. testdrive = Testdrive(
  87. forward_buildkite_shard=True,
  88. kafka_default_partitions=args.kafka_default_partitions,
  89. validate_catalog_store=True,
  90. volumes_extra=["mzdata:/mzdata"],
  91. )
  92. materialized = Materialized(
  93. default_size=args.default_size,
  94. options=[
  95. "--orchestrator-process-scratch-directory=/scratch",
  96. ],
  97. additional_system_parameter_defaults={
  98. "disk_cluster_replicas_default": "true",
  99. },
  100. environment_extra=materialized_environment_extra,
  101. default_replication_factor=2,
  102. )
  103. with c.override(testdrive, materialized):
  104. c.rm("testdrive")
  105. c.up(*dependencies)
  106. if args.replicas > 1:
  107. c.sql("DROP CLUSTER quickstart")
  108. # Make sure a replica named 'r1' always exists
  109. replica_names = [
  110. "r1" if replica_id == 0 else f"replica{replica_id}"
  111. for replica_id in range(0, args.replicas)
  112. ]
  113. replica_string = ",".join(
  114. f"{replica_name} (SIZE '{materialized.default_replica_size}')"
  115. for replica_name in replica_names
  116. )
  117. c.sql(f"CREATE CLUSTER quickstart REPLICAS ({replica_string})")
  118. junit_report = ci_util.junit_report_filename(c.name)
  119. def process(file: str) -> None:
  120. c.run_testdrive_files(
  121. f"--junit-report={junit_report}",
  122. f"--var=replicas={args.replicas}",
  123. f"--var=default-replica-size={materialized.default_replica_size}",
  124. f"--var=default-storage-size={materialized.default_storage_size}",
  125. file,
  126. )
  127. # Uploading successful junit files wastes time and contains no useful information
  128. os.remove(f"test/upsert/{junit_report}")
  129. c.test_parts(args.files, process)
  130. c.sanity_restart_mz()
  131. def workflow_rehydration(c: Composition) -> None:
  132. """Test creating sources in a remote clusterd process."""
  133. dependencies = [
  134. "materialized",
  135. "zookeeper",
  136. "kafka",
  137. "schema-registry",
  138. "clusterd1",
  139. ]
  140. for style, mz, clusterd in [
  141. (
  142. "with DISK",
  143. Materialized(
  144. options=[
  145. "--orchestrator-process-scratch-directory=/scratch",
  146. ],
  147. additional_system_parameter_defaults={
  148. "storage_statistics_collection_interval": "1000",
  149. "storage_statistics_interval": "2000",
  150. "unsafe_enable_unorchestrated_cluster_replicas": "true",
  151. "disk_cluster_replicas_default": "true",
  152. "enable_disk_cluster_replicas": "true",
  153. # Force backpressure to be enabled.
  154. "storage_dataflow_max_inflight_bytes": "1",
  155. "storage_dataflow_max_inflight_bytes_to_cluster_size_fraction": "0.01",
  156. "storage_dataflow_max_inflight_bytes_disk_only": "false",
  157. "storage_dataflow_delay_sources_past_rehydration": "true",
  158. # Enabling shrinking buffers
  159. "upsert_rocksdb_shrink_allocated_buffers_by_ratio": "4",
  160. "storage_shrink_upsert_unused_buffers_by_ratio": "4",
  161. },
  162. environment_extra=materialized_environment_extra,
  163. default_replication_factor=2,
  164. ),
  165. Clusterd(
  166. name="clusterd1",
  167. options=[
  168. "--announce-memory-limit=1048376000", # 1GiB
  169. ],
  170. workers=4,
  171. ),
  172. ),
  173. (
  174. "with DISK and RocksDB Merge Operator",
  175. Materialized(
  176. options=[
  177. "--orchestrator-process-scratch-directory=/scratch",
  178. ],
  179. additional_system_parameter_defaults={
  180. "storage_statistics_collection_interval": "1000",
  181. "storage_statistics_interval": "2000",
  182. "unsafe_enable_unorchestrated_cluster_replicas": "true",
  183. "disk_cluster_replicas_default": "true",
  184. "enable_disk_cluster_replicas": "true",
  185. # Force backpressure to be enabled.
  186. "storage_dataflow_max_inflight_bytes": "1",
  187. "storage_dataflow_max_inflight_bytes_to_cluster_size_fraction": "0.01",
  188. "storage_dataflow_max_inflight_bytes_disk_only": "false",
  189. "storage_dataflow_delay_sources_past_rehydration": "true",
  190. # Enabling shrinking buffers
  191. "upsert_rocksdb_shrink_allocated_buffers_by_ratio": "4",
  192. "storage_shrink_upsert_unused_buffers_by_ratio": "4",
  193. # Enable the RocksDB merge operator
  194. "storage_rocksdb_use_merge_operator": "true",
  195. },
  196. environment_extra=materialized_environment_extra,
  197. default_replication_factor=2,
  198. ),
  199. Clusterd(
  200. name="clusterd1",
  201. options=[
  202. "--announce-memory-limit=1048376000", # 1GiB
  203. ],
  204. workers=4,
  205. ),
  206. ),
  207. (
  208. "without DISK",
  209. Materialized(
  210. options=[
  211. "--orchestrator-process-scratch-directory=/scratch",
  212. ],
  213. additional_system_parameter_defaults={
  214. "storage_statistics_collection_interval": "1000",
  215. "storage_statistics_interval": "2000",
  216. "unsafe_enable_unorchestrated_cluster_replicas": "true",
  217. # Force backpressure to be enabled.
  218. "storage_dataflow_max_inflight_bytes": "1",
  219. "storage_dataflow_max_inflight_bytes_to_cluster_size_fraction": "0.01",
  220. "storage_dataflow_max_inflight_bytes_disk_only": "false",
  221. "storage_dataflow_delay_sources_past_rehydration": "true",
  222. },
  223. environment_extra=materialized_environment_extra,
  224. default_replication_factor=2,
  225. ),
  226. Clusterd(
  227. name="clusterd1",
  228. workers=4,
  229. ),
  230. ),
  231. ]:
  232. with c.override(
  233. mz,
  234. clusterd,
  235. Testdrive(no_reset=True, consistent_seed=True),
  236. ):
  237. c.rm("testdrive")
  238. print(f"Running rehydration workflow {style}")
  239. c.down(destroy_volumes=True)
  240. c.up(*dependencies)
  241. c.run_testdrive_files("rehydration/01-setup.td")
  242. c.run_testdrive_files("rehydration/02-source-setup.td")
  243. c.kill("materialized")
  244. c.kill("clusterd1")
  245. c.up("materialized")
  246. c.up("clusterd1")
  247. c.run_testdrive_files("rehydration/03-after-rehydration.td")
  248. c.run_testdrive_files("rehydration/04-reset.td")
  249. def workflow_failpoint(c: Composition) -> None:
  250. """Test behaviour when upsert state errors"""
  251. for failpoint in [
  252. (
  253. "fail_consolidate_chunk",
  254. "upsert: Failed to rehydrate state: Error consolidating values",
  255. ),
  256. (
  257. "fail_state_multi_put",
  258. "upsert: Failed to update records in state: Error putting values into state",
  259. ),
  260. (
  261. "fail_state_multi_get",
  262. "upsert: Failed to fetch records from state: Error getting values from state",
  263. ),
  264. ]:
  265. run_one_failpoint(c, failpoint[0], failpoint[1])
  266. def run_one_failpoint(c: Composition, failpoint: str, error_message: str) -> None:
  267. print(f">>> Running failpoint test for failpoint {failpoint}")
  268. dependencies = ["zookeeper", "kafka", "materialized"]
  269. c.kill("clusterd1")
  270. c.up(*dependencies)
  271. c.run_testdrive_files("failpoint/00-reset.td")
  272. with c.override(
  273. Testdrive(no_reset=True, consistent_seed=True),
  274. Clusterd(name="clusterd1", workers=4),
  275. ):
  276. c.rm("testdrive")
  277. c.run_testdrive_files("failpoint/01-setup.td")
  278. c.up("clusterd1")
  279. c.run_testdrive_files("failpoint/02-source.td")
  280. c.kill("clusterd1")
  281. with c.override(
  282. # Start clusterd with failpoint
  283. Clusterd(
  284. name="clusterd1",
  285. environment_extra=[f"FAILPOINTS={failpoint}=return"],
  286. workers=4,
  287. ),
  288. ):
  289. c.up("clusterd1")
  290. c.run_testdrive_files(
  291. f"--var=error={error_message}", "failpoint/03-failpoint.td"
  292. )
  293. c.kill("clusterd1")
  294. # Running without set failpoint
  295. c.up("clusterd1")
  296. c.run_testdrive_files("failpoint/04-recover.td")
  297. def workflow_incident_49(c: Composition) -> None:
  298. """Regression test for incident 49."""
  299. c.down(destroy_volumes=True)
  300. dependencies = [
  301. "materialized",
  302. "zookeeper",
  303. "kafka",
  304. "schema-registry",
  305. ]
  306. for style, mz in [
  307. (
  308. "with DISK",
  309. Materialized(
  310. additional_system_parameter_defaults={
  311. "disk_cluster_replicas_default": "true",
  312. "storage_dataflow_delay_sources_past_rehydration": "true",
  313. },
  314. environment_extra=materialized_environment_extra,
  315. default_replication_factor=2,
  316. ),
  317. ),
  318. (
  319. "without DISK",
  320. Materialized(
  321. additional_system_parameter_defaults={
  322. "disk_cluster_replicas_default": "false",
  323. "storage_dataflow_delay_sources_past_rehydration": "true",
  324. },
  325. environment_extra=materialized_environment_extra,
  326. default_replication_factor=2,
  327. ),
  328. ),
  329. ]:
  330. with c.override(
  331. mz,
  332. Testdrive(no_reset=True, consistent_seed=True),
  333. ):
  334. c.rm("testdrive")
  335. print(f"Running incident-49 workflow {style}")
  336. c.down(destroy_volumes=True)
  337. c.up(*dependencies)
  338. c.run_testdrive_files("incident-49/01-setup.td")
  339. c.kill("materialized")
  340. c.up("materialized")
  341. c.run_testdrive_files("incident-49/02-after-rehydration.td")
  342. c.run_testdrive_files("incident-49/03-reset.td")
  343. def workflow_rocksdb_cleanup(c: Composition) -> None:
  344. """Testing rocksdb cleanup after dropping sources"""
  345. c.down(destroy_volumes=True)
  346. dependencies = [
  347. "materialized",
  348. "zookeeper",
  349. "kafka",
  350. "schema-registry",
  351. ]
  352. c.up(*dependencies)
  353. # Returns rockdb's cluster level and source level paths for a given source table name
  354. def rocksdb_path(source_tbl_name: str) -> tuple[str, str]:
  355. (source_id, cluster_id, replica_id) = c.sql_query(
  356. f"""select t.id, s.cluster_id, c.id
  357. from
  358. mz_sources s,
  359. mz_tables t,
  360. mz_cluster_replicas c
  361. where t.name ='{source_tbl_name}' AND t.source_id = s.id AND s.cluster_id = c.cluster_id"""
  362. )[0]
  363. prefix = "/scratch"
  364. cluster_prefix = f"cluster-{cluster_id}-replica-{replica_id}-gen-0"
  365. postfix = "storage/upsert"
  366. return (
  367. f"{prefix}/{cluster_prefix}/{postfix}",
  368. f"{prefix}/{cluster_prefix}/{postfix}/{source_id}",
  369. )
  370. # Returns the number of files recursive in a given directory
  371. def num_files(dir: str) -> int:
  372. num_files = c.exec(
  373. "materialized", "bash", "-c", f"find {dir} -type f | wc -l", capture=True
  374. ).stdout.strip()
  375. return int(num_files)
  376. scenarios = [
  377. ("drop-source.td", "DROP SOURCE dropped_upsert CASCADE", False),
  378. ("drop-cluster-cascade.td", "DROP CLUSTER c1 CASCADE", True),
  379. ("drop-source-in-cluster.td", "DROP SOURCE dropped_upsert CASCADE", False),
  380. ]
  381. for testdrive_file, drop_stmt, cluster_dropped in scenarios:
  382. with c.override(
  383. Testdrive(no_reset=True),
  384. ):
  385. c.rm("testdrive")
  386. c.up({"name": "testdrive", "persistent": True})
  387. c.exec("testdrive", f"rocksdb-cleanup/{testdrive_file}")
  388. (_, kept_source_path) = rocksdb_path("kept_upsert_tbl")
  389. (dropped_cluster_path, dropped_source_path) = rocksdb_path(
  390. "dropped_upsert_tbl"
  391. )
  392. assert num_files(kept_source_path) > 0
  393. assert num_files(dropped_source_path) > 0
  394. c.testdrive(f"> {drop_stmt}")
  395. assert num_files(kept_source_path) > 0
  396. if cluster_dropped:
  397. assert num_files(dropped_cluster_path) == 0
  398. else:
  399. assert num_files(dropped_source_path) == 0
  400. c.testdrive(
  401. dedent(
  402. """
  403. $ nop
  404. # this is to reset testdrive
  405. """
  406. )
  407. )
  408. # This should not be run on ci and is not added to workflow_default above!
  409. # This test is there to compare rehydration metrics with different configs.
  410. # Can be run locally with the command ./mzcompose run load-test
  411. def workflow_load_test(c: Composition, parser: WorkflowArgumentParser) -> None:
  412. # Following variables can be updated to tweak how much data the kafka
  413. # topic should be populated with and what should be the upsert state size.
  414. pad_len = 1024
  415. string_pad = "x" * pad_len # 1KB
  416. repeat = 100 * 1024 # repeat * string_pad = 100MB upsert state size
  417. updates_count = 20 # repeat * updates_count = 2000MB total kafka topic size with multiple updates for the same key
  418. backpressure_bytes = 50 * 1024 * 1024 # 50MB
  419. c.down(destroy_volumes=True)
  420. c.up("redpanda", "materialized", "clusterd1")
  421. # initial hydration
  422. with c.override(
  423. Testdrive(no_reset=True, consistent_seed=True, default_timeout=f"{5 * 60}s"),
  424. Materialized(
  425. options=[
  426. "--orchestrator-process-scratch-directory=/scratch",
  427. ],
  428. additional_system_parameter_defaults={
  429. "disk_cluster_replicas_default": "true",
  430. "enable_disk_cluster_replicas": "true",
  431. # Force backpressure to be enabled.
  432. "storage_dataflow_max_inflight_bytes": f"{backpressure_bytes}",
  433. "storage_dataflow_max_inflight_bytes_disk_only": "true",
  434. },
  435. environment_extra=materialized_environment_extra,
  436. default_replication_factor=2,
  437. ),
  438. Clusterd(
  439. name="clusterd1",
  440. ),
  441. ):
  442. c.rm("testdrive")
  443. c.up({"name": "testdrive", "persistent": True})
  444. c.exec("testdrive", "load-test/setup.td")
  445. c.testdrive(
  446. dedent(
  447. """
  448. $ kafka-ingest format=bytes topic=topic1 key-format=bytes key-terminator=:
  449. "AAA":"START MARKER"
  450. """
  451. )
  452. )
  453. for number in range(updates_count):
  454. c.exec(
  455. "testdrive",
  456. f"--var=value={number}{string_pad}",
  457. f"--var=repeat={repeat}",
  458. "load-test/insert.td",
  459. )
  460. c.testdrive(
  461. dedent(
  462. """
  463. $ kafka-ingest format=bytes topic=topic1 key-format=bytes key-terminator=:
  464. "ZZZ":"END MARKER"
  465. """
  466. )
  467. )
  468. c.testdrive(
  469. dedent(
  470. f"""
  471. > select * from v1;
  472. {repeat + 2}
  473. """
  474. )
  475. )
  476. scenarios = [
  477. (
  478. "default",
  479. {
  480. "disk_cluster_replicas_default": "true",
  481. "enable_disk_cluster_replicas": "true",
  482. # Force backpressure to be enabled.
  483. "storage_dataflow_max_inflight_bytes": f"{backpressure_bytes}",
  484. "storage_dataflow_max_inflight_bytes_disk_only": "true",
  485. },
  486. ),
  487. (
  488. "default with RocksDB Merge Operator",
  489. {
  490. "storage_rocksdb_use_merge_operator": "true",
  491. "disk_cluster_replicas_default": "true",
  492. "enable_disk_cluster_replicas": "true",
  493. # Force backpressure to be enabled.
  494. "storage_dataflow_max_inflight_bytes": f"{backpressure_bytes}",
  495. "storage_dataflow_max_inflight_bytes_disk_only": "true",
  496. },
  497. ),
  498. (
  499. "with write_buffer_manager no stall",
  500. {
  501. "disk_cluster_replicas_default": "true",
  502. "enable_disk_cluster_replicas": "true",
  503. # Force backpressure to be enabled.
  504. "storage_dataflow_max_inflight_bytes": f"{backpressure_bytes}",
  505. "storage_dataflow_max_inflight_bytes_disk_only": "true",
  506. "upsert_rocksdb_write_buffer_manager_memory_bytes": f"{5 * 1024 * 1024}",
  507. "upsert_rocksdb_write_buffer_manager_allow_stall": "false",
  508. },
  509. ),
  510. (
  511. "with write_buffer_manager stall enabled",
  512. {
  513. "disk_cluster_replicas_default": "true",
  514. "enable_disk_cluster_replicas": "true",
  515. # Force backpressure to be enabled.
  516. "storage_dataflow_max_inflight_bytes": f"{backpressure_bytes}",
  517. "storage_dataflow_max_inflight_bytes_disk_only": "true",
  518. "upsert_rocksdb_write_buffer_manager_memory_bytes": f"{5 * 1024 * 1024}",
  519. "upsert_rocksdb_write_buffer_manager_allow_stall": "true",
  520. },
  521. ),
  522. ]
  523. last_latency = None
  524. for scenario_name, mz_configs in scenarios:
  525. with c.override(
  526. Materialized(
  527. options=[
  528. "--orchestrator-process-scratch-directory=/scratch",
  529. ],
  530. additional_system_parameter_defaults=mz_configs,
  531. environment_extra=materialized_environment_extra,
  532. default_replication_factor=2,
  533. ),
  534. ):
  535. c.kill("materialized", "clusterd1")
  536. print(f"Running rehydration for scenario {scenario_name}")
  537. c.up("materialized", "clusterd1")
  538. c.testdrive(
  539. dedent(
  540. f"""
  541. > select sum(records_indexed)
  542. from mz_internal.mz_source_statistics_raw st
  543. join mz_sources s on s.id = st.id
  544. where name = 's1';
  545. {repeat + 2}
  546. > select bool_and(rehydration_latency IS NOT NULL)
  547. from mz_internal.mz_source_statistics_raw st
  548. join mz_sources s on s.id = st.id
  549. where name = 's1';
  550. true
  551. """
  552. )
  553. )
  554. # ensure we wait till the stat is updated
  555. rehydration_latency = last_latency
  556. while rehydration_latency == last_latency:
  557. rehydration_latency = c.sql_query(
  558. """select max(rehydration_latency)
  559. from mz_internal.mz_source_statistics_raw st
  560. join mz_sources s on s.id = st.id
  561. where name = 's1';"""
  562. )[0]
  563. last_latency = rehydration_latency
  564. print(f"Scenario {scenario_name} took {rehydration_latency} ms")
  565. def workflow_large_scale(c: Composition, parser: WorkflowArgumentParser) -> None:
  566. """
  567. The goal is to test a large scale Kafka upsert instance and to make sure that we can successfully ingest data from it quickly.
  568. """
  569. dependencies = ["materialized", "zookeeper", "kafka", "schema-registry"]
  570. c.up(*dependencies)
  571. with c.override(
  572. Testdrive(no_reset=True, consistent_seed=True),
  573. ):
  574. c.rm("testdrive")
  575. c.up({"name": "testdrive", "persistent": True})
  576. c.testdrive(
  577. dedent(
  578. """
  579. $ kafka-create-topic topic=topic1
  580. > CREATE CONNECTION IF NOT EXISTS kafka_conn
  581. FOR KAFKA BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT;
  582. """
  583. )
  584. )
  585. def make_inserts(c: Composition, start: int, batch_num: int):
  586. c.testdrive(
  587. args=["--no-reset"],
  588. input=dedent(
  589. f"""
  590. $ kafka-ingest format=bytes topic=topic1 key-format=bytes key-terminator=: repeat={batch_num}
  591. "${{kafka-ingest.iteration}}":"{'x'*1_000_000}"
  592. """
  593. ),
  594. )
  595. num_rows = 100_000 # out of disk with 200_000 rows
  596. batch_size = 10_000
  597. for i in range(0, num_rows, batch_size):
  598. batch_num = min(batch_size, num_rows - i)
  599. make_inserts(c, i, batch_num)
  600. c.testdrive(
  601. args=["--no-reset"],
  602. input=dedent(
  603. f"""
  604. > CREATE SOURCE s1
  605. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-topic1-${{testdrive.seed}}');
  606. > CREATE TABLE s1_tbl FROM SOURCE s1 (REFERENCE "testdrive-topic1-${{testdrive.seed}}")
  607. KEY FORMAT TEXT VALUE FORMAT TEXT
  608. ENVELOPE UPSERT;
  609. > SELECT COUNT(*) FROM s1_tbl;
  610. {batch_size}
  611. """
  612. ),
  613. )
  614. c.testdrive(
  615. args=["--no-reset"],
  616. input=dedent(
  617. f"""
  618. $ kafka-ingest format=bytes topic=topic1 key-format=bytes key-terminator=:
  619. "{batch_size + 1}":"{'x'*1_000_000}"
  620. """
  621. ),
  622. )
  623. c.testdrive(
  624. args=["--no-reset"],
  625. input=dedent(
  626. f"""
  627. > SELECT COUNT(*) FROM s1_tbl;
  628. {batch_size + 1}
  629. """
  630. ),
  631. )