concurrency.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from materialize.feature_benchmark.action import Action, TdAction
  10. from materialize.feature_benchmark.measurement_source import MeasurementSource, Td
  11. from materialize.feature_benchmark.scenario import Scenario
  12. from materialize.feature_benchmark.scenario_version import ScenarioVersion
  13. class Concurrency(Scenario):
  14. """Feature benchmarks related to testing concurrency aspects of the system"""
  15. class ParallelIngestion(Concurrency):
  16. """Measure the time it takes to ingest multiple sources concurrently."""
  17. SOURCES = 10
  18. FIXED_SCALE = True # Disk slowness in CRDB leading to CRDB going down
  19. def version(self) -> ScenarioVersion:
  20. return ScenarioVersion.create(1, 1, 0)
  21. def shared(self) -> Action:
  22. return TdAction(
  23. self.schema()
  24. + self.keyschema()
  25. + f"""
  26. $ kafka-create-topic topic=kafka-parallel-ingestion partitions=4
  27. $ kafka-ingest format=avro topic=kafka-parallel-ingestion key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
  28. {{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }}
  29. """
  30. )
  31. def benchmark(self) -> MeasurementSource:
  32. sources = range(1, ParallelIngestion.SOURCES + 1)
  33. drop_sources = "\n".join(
  34. [
  35. f"""
  36. > DROP SOURCE IF EXISTS s{s} CASCADE
  37. > DROP CLUSTER IF EXISTS s{s}_cluster
  38. """
  39. for s in sources
  40. ]
  41. )
  42. create_sources = "\n".join(
  43. [
  44. f"""
  45. > CREATE CONNECTION IF NOT EXISTS csr_conn
  46. FOR CONFLUENT SCHEMA REGISTRY
  47. URL '${{testdrive.schema-registry-url}}';
  48. > CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
  49. > CREATE CLUSTER s{s}_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
  50. > CREATE SOURCE s{s}
  51. IN CLUSTER s{s}_cluster
  52. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-parallel-ingestion-${{testdrive.seed}}')
  53. > CREATE TABLE s{s}_tbl FROM SOURCE s{s} (REFERENCE "testdrive-kafka-parallel-ingestion-${{testdrive.seed}}")
  54. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  55. """
  56. for s in sources
  57. ]
  58. )
  59. create_indexes = "\n".join(
  60. [
  61. f"""
  62. > CREATE DEFAULT INDEX ON s{s}_tbl
  63. """
  64. for s in sources
  65. ]
  66. )
  67. selects = "\n".join(
  68. [
  69. f"""
  70. > SELECT * FROM s{s}_tbl WHERE f2 = {self.n()-1}
  71. {self.n()-1}
  72. """
  73. for s in sources
  74. ]
  75. )
  76. return Td(
  77. self.schema()
  78. + f"""
  79. {drop_sources}
  80. {create_sources}
  81. > SELECT 1
  82. /* A */
  83. 1
  84. {create_indexes}
  85. {selects}
  86. > SELECT 1
  87. /* B */
  88. 1
  89. """
  90. )
  91. class ParallelDataflows(Concurrency):
  92. """Measure the time it takes to compute multiple parallel dataflows."""
  93. SCALE = 6
  94. VIEWS = 25
  95. def benchmark(self) -> MeasurementSource:
  96. views = range(1, ParallelDataflows.VIEWS + 1)
  97. create_views = "\n".join(
  98. [
  99. f"""
  100. > CREATE MATERIALIZED VIEW v{v} AS
  101. SELECT COUNT(DISTINCT generate_series) + {v} - {v} AS f1
  102. FROM generate_series(1,{self.n()})
  103. """
  104. for v in views
  105. ]
  106. )
  107. selects = "\n".join(
  108. [
  109. f"""
  110. > SELECT * FROM v{v}
  111. {self.n()}
  112. """
  113. for v in views
  114. ]
  115. )
  116. return Td(
  117. f"""
  118. $ postgres-execute connection=postgres://mz_system@materialized:6877/materialize
  119. DROP SCHEMA public CASCADE;
  120. > CREATE SCHEMA public;
  121. > SELECT 1
  122. /* A */
  123. 1
  124. {create_views}
  125. {selects}
  126. > SELECT 1
  127. /* B */
  128. 1
  129. """
  130. )