subscribe.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. from random import getrandbits
  10. from textwrap import dedent
  11. from materialize.feature_benchmark.measurement_source import MeasurementSource, Td
  12. from materialize.feature_benchmark.scenario import Scenario
  13. class SubscribeParallel(Scenario):
  14. """Feature benchmarks related to SUBSCRIBE"""
  15. SCALE = (
  16. 2 # So 100 concurrent SUBSCRIBEs by default, limited by database-issues#5376
  17. )
  18. FIXED_SCALE = True
  19. def benchmark(self) -> MeasurementSource:
  20. return Td(
  21. self.create_subscribe_source()
  22. + "\n".join(
  23. [
  24. dedent(
  25. f"""
  26. $ postgres-connect name=conn{i} url=postgres://materialize:materialize@${{testdrive.materialize-sql-addr}}
  27. $ postgres-execute connection=conn{i}
  28. # STRICT SERIALIZABLE is affected by database-issues#5407
  29. START TRANSACTION ISOLATION LEVEL SERIALIZABLE;
  30. DECLARE c{i} CURSOR FOR SUBSCRIBE s1
  31. """
  32. )
  33. for i in range(0, self.n())
  34. ]
  35. )
  36. + self.insert()
  37. # We measure from here ...
  38. + dedent(
  39. """
  40. > SELECT COUNT(*) FROM s1;
  41. /* A */
  42. 1
  43. """
  44. )
  45. + "\n".join(
  46. [
  47. dedent(
  48. f"""
  49. $ postgres-execute connection=conn{i}
  50. FETCH ALL FROM c{i};
  51. """
  52. )
  53. for i in range(0, self.n())
  54. ]
  55. )
  56. # ... to here
  57. + dedent(
  58. """
  59. > SELECT 1
  60. /* B */
  61. 1
  62. """
  63. )
  64. )
  65. def create_subscribe_source(self) -> str:
  66. raise NotImplementedError
  67. def insert(self) -> str:
  68. raise NotImplementedError
  69. class SubscribeParallelTable(SubscribeParallel):
  70. def create_subscribe_source(self) -> str:
  71. return dedent(
  72. """
  73. > DROP TABLE IF EXISTS s1;
  74. > CREATE TABLE s1 (f1 TEXT);
  75. """
  76. )
  77. def insert(self) -> str:
  78. return "> INSERT INTO s1 VALUES (REPEAT('x', 1024))\n"
  79. class SubscribeParallelTableWithIndex(SubscribeParallel):
  80. def create_subscribe_source(self) -> str:
  81. return dedent(
  82. """
  83. > DROP TABLE IF EXISTS s1;
  84. > CREATE TABLE s1 (f1 INTEGER);
  85. > CREATE DEFAULT INDEX ON s1;
  86. """
  87. )
  88. def insert(self) -> str:
  89. return "> INSERT INTO s1 VALUES (123)\n"
  90. class SubscribeParallelKafka(SubscribeParallel):
  91. def create_subscribe_source(self) -> str:
  92. # As we are doing `kafka-ingest` in the middle of the benchmark() method
  93. # we must always use a unique topic to ensure isolation between the individal
  94. # measurements
  95. self._unique_topic_id = getrandbits(64)
  96. return dedent(
  97. f"""
  98. # Separate topic for each Mz instance
  99. $ kafka-create-topic topic=subscribe-kafka-{self._unique_topic_id}
  100. > CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
  101. > DROP CLUSTER IF EXISTS source_cluster CASCADE;
  102. > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
  103. > DROP SOURCE IF EXISTS s1 CASCADE;
  104. > CREATE SOURCE s1_source
  105. IN CLUSTER source_cluster
  106. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-subscribe-kafka-{self._unique_topic_id}-${{testdrive.seed}}');
  107. > CREATE TABLE s1 FROM SOURCE s1_source (REFERENCE "testdrive-subscribe-kafka-{self._unique_topic_id}-${{testdrive.seed}}")
  108. FORMAT BYTES ENVELOPE NONE;
  109. > CREATE DEFAULT INDEX ON s1;
  110. """
  111. )
  112. def insert(self) -> str:
  113. return dedent(
  114. f"""
  115. $ kafka-ingest format=bytes topic=subscribe-kafka-{self._unique_topic_id}
  116. 123
  117. """
  118. )