mzcompose.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. """
  10. Functional tests for the COPY TO S3 command against a local minio service
  11. instead of a real AWS S3.
  12. """
  13. import csv
  14. import json
  15. import random
  16. import string
  17. from io import BytesIO, StringIO
  18. import pyarrow.parquet #
  19. from minio import Minio
  20. from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
  21. from materialize.mzcompose.services.materialized import Materialized
  22. from materialize.mzcompose.services.minio import Mc
  23. from materialize.mzcompose.services.minio import Minio as MinioService
  24. from materialize.mzcompose.services.mz import Mz
  25. from materialize.mzcompose.services.testdrive import Testdrive
  26. SERVICES = [
  27. MinioService(
  28. additional_directories=["copytos3"],
  29. ports=["9000:9000", "9001:9001"],
  30. allow_host_ports=True,
  31. ),
  32. Mz(app_password=""),
  33. Materialized(
  34. additional_system_parameter_defaults={
  35. "log_filter": "mz_storage_operators::s3_oneshot_sink=trace,debug,info,warn"
  36. },
  37. ),
  38. Testdrive(),
  39. Mc(),
  40. ]
  41. def workflow_default(c: Composition) -> None:
  42. """
  43. Run all workflows (including nightly and CI workflows)
  44. """
  45. def process(name: str) -> None:
  46. if name == "default":
  47. return
  48. with c.test_case(name):
  49. c.workflow(name)
  50. c.test_parts(list(c.workflows.keys()), process)
  51. def workflow_nightly(c: Composition, parser: WorkflowArgumentParser) -> None:
  52. """
  53. Run only during the nightly
  54. """
  55. parser.add_argument(
  56. "--default-size",
  57. type=int,
  58. default=1, # Reduced memory
  59. help="Use SIZE 'N-N' for replicas and SIZE 'N' for sources",
  60. )
  61. args = parser.parse_args()
  62. dependencies = ["minio", "materialized"]
  63. testdrive = Testdrive(
  64. forward_buildkite_shard=True,
  65. aws_region=None,
  66. validate_catalog_store=True,
  67. default_timeout="1800s",
  68. volumes_extra=["mzdata:/mzdata"],
  69. no_reset=True,
  70. )
  71. materialized = Materialized(
  72. default_size=args.default_size,
  73. additional_system_parameter_defaults={
  74. "log_filter": "mz_storage_operators::s3_oneshot_sink=trace,debug,info,warn"
  75. },
  76. )
  77. with c.override(testdrive, materialized):
  78. c.up(*dependencies)
  79. c.sql(
  80. "ALTER SYSTEM SET max_clusters = 50;",
  81. port=6877,
  82. user="mz_system",
  83. )
  84. s3 = Minio(
  85. f"127.0.0.1:{c.default_port('minio')}",
  86. "minioadmin",
  87. "minioadmin",
  88. region="minio",
  89. secure=False,
  90. )
  91. c.run_testdrive_files("nightly/setup.td")
  92. for size in [
  93. "tpch10mb",
  94. "tpch1gb",
  95. # "tpch10gb", # SELECT(*) in Mz is way too slow
  96. ]:
  97. c.run_testdrive_files(f"nightly/{size}.td")
  98. for table in [
  99. "customer",
  100. "lineitem",
  101. "nation",
  102. "orders",
  103. "part",
  104. "partsupp",
  105. "region",
  106. "supplier",
  107. ]:
  108. expected = sorted(
  109. [
  110. [str(c) for c in row]
  111. for row in c.sql_query(f"SELECT * FROM {size}.{table}")
  112. ]
  113. )
  114. actual_csv = []
  115. for obj in s3.list_objects(
  116. "copytos3", f"test/{size}/csv/{table}/", recursive=True
  117. ):
  118. assert obj.object_name is not None
  119. response = s3.get_object("copytos3", obj.object_name)
  120. f = StringIO(response.data.decode("utf-8"))
  121. response.close()
  122. response.release_conn()
  123. del response
  124. actual_csv.extend([row for row in csv.reader(f)])
  125. del f
  126. actual_csv.sort()
  127. assert (
  128. actual_csv == expected
  129. ), f"Table {table}\nActual:\n{actual_csv[0:3]}\n...\n\nExpected:\n{expected[0:3]}\n..."
  130. del actual_csv
  131. actual_parquet = []
  132. for obj in s3.list_objects(
  133. "copytos3", f"test/{size}/parquet/{table}/", recursive=True
  134. ):
  135. assert obj.object_name is not None
  136. response = s3.get_object("copytos3", obj.object_name)
  137. f = BytesIO(response.data)
  138. response.close()
  139. response.release_conn()
  140. del response
  141. actual_parquet.extend(
  142. [
  143. [str(c) for c in row.values()]
  144. for row in pyarrow.parquet.read_table(f).to_pylist()
  145. ]
  146. )
  147. del f
  148. actual_parquet.sort()
  149. assert (
  150. actual_parquet == expected
  151. ), f"Table {table}\nActual:\n{actual_parquet[0:3]}\n...\n\nExpected:\n{expected[0:3]}\n..."
  152. del actual_parquet
  153. def workflow_ci(c: Composition, _parser: WorkflowArgumentParser) -> None:
  154. """
  155. Workflows to run during CI
  156. """
  157. for name in ["auth", "http"]:
  158. with c.test_case(name):
  159. c.workflow(name)
  160. def workflow_auth(c: Composition) -> None:
  161. c.up({"name": "mc", "persistent": True})
  162. c.up("materialized", "minio")
  163. # Set up IAM credentials for 3 users with different permissions levels to S3:
  164. # User 'readwritedelete': PutObject, ListBucket, DeleteObject
  165. # User 'nodelete': PutObject, ListBucket
  166. # User 'read': GetObject, ListBucket
  167. def make_random_key(n: int):
  168. return "".join(
  169. random.SystemRandom().choice(string.ascii_uppercase + string.digits)
  170. for _ in range(n)
  171. )
  172. def make_user(username: str, actions: list[str]):
  173. return (
  174. username,
  175. make_random_key(10),
  176. {
  177. "Version": "2012-10-17",
  178. "Statement": [
  179. {
  180. "Effect": "Allow",
  181. "Action": actions,
  182. "Resource": ["arn:aws:s3:::*"],
  183. }
  184. ],
  185. },
  186. )
  187. users = [
  188. make_user(username, actions)
  189. for username, actions in [
  190. ("readwritedelete", ["s3:PutObject", "s3:ListBucket", "s3:DeleteObject"]),
  191. ("nodelete", ["s3:PutObject", "s3:ListBucket"]),
  192. ("read", ["s3:GetObject", "s3:ListBucket"]),
  193. ]
  194. ]
  195. minio_alias = "s3test"
  196. c.exec(
  197. "mc",
  198. "mc",
  199. "alias",
  200. "set",
  201. minio_alias,
  202. "http://minio:9000/",
  203. "minioadmin",
  204. "minioadmin",
  205. )
  206. # create a user, policy, and policy attachment for each user
  207. testdrive_args = []
  208. for user in users:
  209. c.exec(
  210. "mc",
  211. "mc",
  212. "admin",
  213. "user",
  214. "add",
  215. minio_alias,
  216. user[0],
  217. user[1],
  218. )
  219. c.exec("mc", "cp", "/dev/stdin", f"/tmp/{user[0]}", stdin=json.dumps(user[2]))
  220. c.exec(
  221. "mc",
  222. "mc",
  223. "admin",
  224. "policy",
  225. "create",
  226. minio_alias,
  227. user[0],
  228. f"/tmp/{user[0]}",
  229. )
  230. c.exec(
  231. "mc",
  232. "mc",
  233. "admin",
  234. "policy",
  235. "attach",
  236. minio_alias,
  237. user[0],
  238. "--user",
  239. user[0],
  240. )
  241. testdrive_args.append(f"--var=s3-user-{user[0]}-secret-key={user[1]}")
  242. c.run_testdrive_files(
  243. *testdrive_args,
  244. "s3-auth-checks.td",
  245. )
  246. def workflow_http(c: Composition) -> None:
  247. """Test http endpoint allows COPY TO S3 but not COPY TO STDOUT."""
  248. c.up("materialized", "minio")
  249. with c.override(Testdrive(no_reset=True)):
  250. c.run_testdrive_files("http/setup.td")
  251. result = c.exec(
  252. "materialized",
  253. "curl",
  254. "http://localhost:6878/api/sql",
  255. "-k",
  256. "-s",
  257. "--header",
  258. "Content-Type: application/json",
  259. "--data",
  260. "{\"query\": \"COPY (SELECT 1) TO 's3://copytos3/test/http/' WITH (AWS CONNECTION = aws_conn, FORMAT = 'csv');\"}",
  261. capture=True,
  262. )
  263. assert result.returncode == 0
  264. assert (
  265. json.loads(result.stdout)["results"][0]["error"]["message"]
  266. == 'permission denied for CONNECTION "materialize.public.aws_conn"'
  267. and json.loads(result.stdout)["results"][0]["error"]["detail"]
  268. == "The 'anonymous_http_user' role needs USAGE privileges on CONNECTION \"materialize.public.aws_conn\""
  269. )
  270. c.run_testdrive_files("http/grant.td")
  271. result = c.exec(
  272. "materialized",
  273. "curl",
  274. "http://localhost:6878/api/sql",
  275. "-k",
  276. "-s",
  277. "--header",
  278. "Content-Type: application/json",
  279. "--data",
  280. "{\"query\": \"COPY (SELECT 1) TO 's3://copytos3/test/http/' WITH (AWS CONNECTION = aws_conn, FORMAT = 'csv');\"}",
  281. capture=True,
  282. )
  283. assert result.returncode == 0
  284. assert json.loads(result.stdout)["results"][0]["ok"] == "COPY 1"
  285. result = c.exec(
  286. "materialized",
  287. "curl",
  288. "http://localhost:6878/api/sql",
  289. "-k",
  290. "-s",
  291. "--header",
  292. "Content-Type: application/json",
  293. "--data",
  294. '{"query": "COPY (SELECT 1) TO STDOUT"}',
  295. capture=True,
  296. )
  297. assert result.returncode == 0
  298. assert "unsupported via this API" in result.stdout