123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- # Copyright Materialize, Inc. and contributors. All rights reserved.
- #
- # Use of this software is governed by the Business Source License
- # included in the LICENSE file at the root of this repository.
- #
- # As of the Change Date specified in that file, in accordance with
- # the Business Source License, use of this software will be governed
- # by the Apache License, Version 2.0.
- from random import getrandbits
- from textwrap import dedent
- from materialize.feature_benchmark.measurement_source import MeasurementSource, Td
- from materialize.feature_benchmark.scenario import Scenario
- class SubscribeParallel(Scenario):
- """Feature benchmarks related to SUBSCRIBE"""
- SCALE = (
- 2 # So 100 concurrent SUBSCRIBEs by default, limited by database-issues#5376
- )
- FIXED_SCALE = True
- def benchmark(self) -> MeasurementSource:
- return Td(
- self.create_subscribe_source()
- + "\n".join(
- [
- dedent(
- f"""
- $ postgres-connect name=conn{i} url=postgres://materialize:materialize@${{testdrive.materialize-sql-addr}}
- $ postgres-execute connection=conn{i}
- # STRICT SERIALIZABLE is affected by database-issues#5407
- START TRANSACTION ISOLATION LEVEL SERIALIZABLE;
- DECLARE c{i} CURSOR FOR SUBSCRIBE s1
- """
- )
- for i in range(0, self.n())
- ]
- )
- + self.insert()
- # We measure from here ...
- + dedent(
- """
- > SELECT COUNT(*) FROM s1;
- /* A */
- 1
- """
- )
- + "\n".join(
- [
- dedent(
- f"""
- $ postgres-execute connection=conn{i}
- FETCH ALL FROM c{i};
- """
- )
- for i in range(0, self.n())
- ]
- )
- # ... to here
- + dedent(
- """
- > SELECT 1
- /* B */
- 1
- """
- )
- )
- def create_subscribe_source(self) -> str:
- raise NotImplementedError
- def insert(self) -> str:
- raise NotImplementedError
- class SubscribeParallelTable(SubscribeParallel):
- def create_subscribe_source(self) -> str:
- return dedent(
- """
- > DROP TABLE IF EXISTS s1;
- > CREATE TABLE s1 (f1 TEXT);
- """
- )
- def insert(self) -> str:
- return "> INSERT INTO s1 VALUES (REPEAT('x', 1024))\n"
- class SubscribeParallelTableWithIndex(SubscribeParallel):
- def create_subscribe_source(self) -> str:
- return dedent(
- """
- > DROP TABLE IF EXISTS s1;
- > CREATE TABLE s1 (f1 INTEGER);
- > CREATE DEFAULT INDEX ON s1;
- """
- )
- def insert(self) -> str:
- return "> INSERT INTO s1 VALUES (123)\n"
- class SubscribeParallelKafka(SubscribeParallel):
- def create_subscribe_source(self) -> str:
- # As we are doing `kafka-ingest` in the middle of the benchmark() method
- # we must always use a unique topic to ensure isolation between the individal
- # measurements
- self._unique_topic_id = getrandbits(64)
- return dedent(
- f"""
- # Separate topic for each Mz instance
- $ kafka-create-topic topic=subscribe-kafka-{self._unique_topic_id}
- > CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
- > DROP CLUSTER IF EXISTS source_cluster CASCADE;
- > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
- > DROP SOURCE IF EXISTS s1 CASCADE;
- > CREATE SOURCE s1_source
- IN CLUSTER source_cluster
- FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-subscribe-kafka-{self._unique_topic_id}-${{testdrive.seed}}');
- > CREATE TABLE s1 FROM SOURCE s1_source (REFERENCE "testdrive-subscribe-kafka-{self._unique_topic_id}-${{testdrive.seed}}")
- FORMAT BYTES ENVELOPE NONE;
- > CREATE DEFAULT INDEX ON s1;
- """
- )
- def insert(self) -> str:
- return dedent(
- f"""
- $ kafka-ingest format=bytes topic=subscribe-kafka-{self._unique_topic_id}
- 123
- """
- )
|