definition.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import random
  10. from collections.abc import Iterator
  11. from enum import Enum
  12. from materialize.data_ingest.data_type import RecordSize
  13. from materialize.data_ingest.field import Field
  14. from materialize.data_ingest.row import Operation, Row
  15. from materialize.data_ingest.rowlist import RowList
  16. rng = random.Random()
  17. class Records(Enum):
  18. ALL = 0 # Only applies to DELETE operations
  19. ONE = 1
  20. HUNDRED = 100
  21. SOME = 1_000
  22. MANY = 1_000_000
  23. class Keyspace(Enum):
  24. SINGLE_VALUE = 1
  25. LARGE = 2
  26. EXISTING = 3
  27. class Target(Enum):
  28. KAFKA = 1
  29. POSTGRES = 2
  30. PRINT = 3
  31. class Definition:
  32. def generate(self, fields: list[Field]) -> Iterator[RowList]:
  33. raise NotImplementedError
  34. class Insert(Definition):
  35. def __init__(self, count: Records, record_size: RecordSize):
  36. self.count = count.value
  37. self.record_size = record_size
  38. self.current_key = 0
  39. def max_key(self) -> int:
  40. if self.count < 1:
  41. raise ValueError(
  42. f'Unexpected count {self.count}, doesn\'t make sense to generate "ALL" values'
  43. )
  44. return self.count
  45. def generate(self, fields: list[Field]) -> Iterator[RowList]:
  46. if self.count < 1:
  47. raise ValueError(
  48. f'Unexpected count {self.count}, doesn\'t make sense to generate "ALL" values'
  49. )
  50. for i in range(self.count):
  51. if self.current_key >= self.count:
  52. break
  53. values = [
  54. (
  55. field.data_type.numeric_value(self.current_key)
  56. if field.is_key
  57. else field.data_type.random_value(rng, self.record_size)
  58. )
  59. for field in fields
  60. ]
  61. self.current_key += 1
  62. yield RowList(
  63. [
  64. Row(
  65. fields=fields,
  66. values=values,
  67. operation=Operation.INSERT,
  68. )
  69. ]
  70. )
  71. class Upsert(Definition):
  72. def __init__(self, keyspace: Keyspace, count: Records, record_size: RecordSize):
  73. self.keyspace = keyspace
  74. self.count = count.value
  75. self.record_size = record_size
  76. def generate(self, fields: list[Field]) -> Iterator[RowList]:
  77. if self.count < 1:
  78. raise ValueError(
  79. f'Unexpected count {self.count}, doesn\'t make sense to generate "ALL" values'
  80. )
  81. for i in range(self.count):
  82. values = [
  83. (
  84. field.data_type.numeric_value(0)
  85. if field.is_key
  86. else field.data_type.random_value(rng, self.record_size)
  87. )
  88. for field in fields
  89. ]
  90. yield RowList(
  91. [
  92. Row(
  93. fields=fields,
  94. values=values,
  95. operation=Operation.UPSERT,
  96. )
  97. ]
  98. )
  99. class Delete(Definition):
  100. def __init__(
  101. self,
  102. number_of_records: Records,
  103. record_size: RecordSize,
  104. num: int | None = None,
  105. ):
  106. self.number_of_records = number_of_records
  107. self.record_size = record_size
  108. self.num = num
  109. def generate(self, fields: list[Field]) -> Iterator[RowList]:
  110. if self.number_of_records == Records.ONE:
  111. values = [
  112. field.data_type.random_value(rng, self.record_size)
  113. for field in fields
  114. if field.is_key
  115. ]
  116. yield RowList([Row(fields, values, Operation.DELETE)])
  117. elif self.number_of_records in (Records.SOME, Records.MANY):
  118. for i in range(self.number_of_records.value):
  119. values = [
  120. field.data_type.random_value(rng, self.record_size)
  121. for field in fields
  122. if field.is_key
  123. ]
  124. yield RowList([Row(fields, values, Operation.DELETE)])
  125. elif self.number_of_records == Records.ALL:
  126. assert self.num is not None
  127. for i in range(self.num):
  128. values = [
  129. field.data_type.numeric_value(i) for field in fields if field.is_key
  130. ]
  131. yield RowList([Row(fields, values, Operation.DELETE)])
  132. else:
  133. raise ValueError(f"Unexpected number of records {self.number_of_records}")