connection-create-drop.td 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. $ set-arg-default single-replica-cluster=quickstart
  11. #
  12. # Test basic connection functionality
  13. $ skip-consistency-checks reason="workflow uses SSH keys which we currently can't check"
  14. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  15. ALTER SYSTEM SET enable_connection_validation_syntax = true
  16. ###
  17. # Test core functionality by creating, introspecting and dropping a connection
  18. ###
  19. $ kafka-create-topic topic=connection_test partitions=1
  20. $ kafka-ingest format=bytes topic=connection_test
  21. 1,2
  22. 2,3
  23. > CREATE SECRET s AS '...';
  24. ! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT, ACCESS KEY ID = 'abc')
  25. contains:KAFKA connections do not support ACCESS KEY ID values
  26. ! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT, ACCESS KEY ID = 'abc', PORT = 1)
  27. contains:KAFKA connections do not support ACCESS KEY ID, PORT values
  28. ! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL NOTHINGREAL)
  29. contains: unknown security protocol: NOTHINGREAL
  30. ! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SASL MECHANISMS 'PLAIN', SASL USERNAME 'materialize', SASL PASSWORD SECRET s, SSL KEY SECRET s)
  31. contains: option SSL KEY not supported with this configuration
  32. ! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT, SASL USERNAME 'materialize')
  33. contains: option SASL USERNAME not supported with this configuration
  34. ! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT, PROGRESS TOPIC REPLICATION FACTOR -4)
  35. contains:PROGRESS TOPIC REPLICATION FACTOR must be greater than 0
  36. > CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  37. > SELECT name, type from mz_connections WHERE id LIKE 'u%'
  38. name type
  39. ------------------------------
  40. testconn kafka
  41. > SHOW CONNECTIONS
  42. testconn kafka ""
  43. > SELECT
  44. brokers,
  45. sink_progress_topic = '_materialize-progress-' || mz_environment_id() || '-' || id
  46. FROM mz_kafka_connections
  47. JOIN mz_connections USING (id)
  48. WHERE name = 'testconn'
  49. {${testdrive.kafka-addr}} true
  50. > DROP CONNECTION testconn
  51. > CREATE CONNECTION progress_override TO KAFKA (
  52. BROKER '${testdrive.kafka-addr}',
  53. PROGRESS TOPIC 'override_topic',
  54. PROGRESS TOPIC REPLICATION FACTOR 1,
  55. SECURITY PROTOCOL PLAINTEXT
  56. )
  57. > SELECT
  58. brokers, sink_progress_topic
  59. FROM mz_kafka_connections
  60. JOIN mz_connections USING (id)
  61. WHERE name = 'progress_override'
  62. {${testdrive.kafka-addr}} override_topic
  63. ###
  64. # Test that connections work in creating a source
  65. ###
  66. > CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  67. > CREATE CLUSTER connection_source_cluster SIZE '${arg.default-storage-size}';
  68. > CREATE SOURCE connection_source (first, second)
  69. IN CLUSTER connection_source_cluster
  70. FROM KAFKA CONNECTION testconn (TOPIC 'testdrive-connection_test-${testdrive.seed}')
  71. FORMAT CSV WITH 2 COLUMNS
  72. > SELECT * FROM connection_source
  73. first second
  74. ------------
  75. 1 2
  76. 2 3
  77. # Confirm we cannot drop the connection while a source depends upon it
  78. ! DROP CONNECTION testconn;
  79. contains:depended upon by source "connection_source"
  80. # Confirm the drop works if we add cascade
  81. > DROP CONNECTION testconn CASCADE;
  82. # Validate the cascading drop actually happened
  83. ! SELECT * FROM connection_source
  84. contains:unknown catalog item 'connection_source'
  85. ###
  86. # Test schema registry connection create and drop
  87. ###
  88. # Setup kafka topic with schema
  89. # must be a subset of the keys in the rows
  90. $ set keyschema={
  91. "type": "record",
  92. "name": "Key",
  93. "fields": [
  94. {"name": "id", "type": "long"}
  95. ]
  96. }
  97. $ set schema={
  98. "type" : "record",
  99. "name" : "envelope",
  100. "fields" : [
  101. {
  102. "name": "before",
  103. "type": [
  104. {
  105. "name": "row",
  106. "type": "record",
  107. "fields": [
  108. {
  109. "name": "id",
  110. "type": "long"
  111. },
  112. {
  113. "name": "creature",
  114. "type": "string"
  115. }]
  116. },
  117. "null"
  118. ]
  119. },
  120. {
  121. "name": "after",
  122. "type": ["row", "null"]
  123. }
  124. ]
  125. }
  126. $ kafka-create-topic topic=csr_test partitions=1
  127. $ kafka-ingest format=avro topic=csr_test key-format=avro key-schema=${keyschema} schema=${schema} timestamp=1
  128. {"id": 1} {"before": {"row": {"id": 1, "creature": "fish"}}, "after": {"row": {"id": 1, "creature": "mudskipper"}}}
  129. {"id": 1} {"before": {"row": {"id": 1, "creature": "mudskipper"}}, "after": {"row": {"id": 1, "creature": "salamander"}}}
  130. {"id": 1} {"before": {"row": {"id": 1, "creature": "salamander"}}, "after": {"row": {"id": 1, "creature": "lizard"}}}
  131. > CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (
  132. URL '${testdrive.schema-registry-url}'
  133. );
  134. ! CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (
  135. URL '${testdrive.schema-registry-url}',
  136. SESSION TOKEN = 'abc'
  137. );
  138. contains:CONFLUENT SCHEMA REGISTRY connections do not support SESSION TOKEN values
  139. > CREATE CONNECTION kafka_conn
  140. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  141. ! CREATE SOURCE csr_source
  142. IN CLUSTER ${arg.single-replica-cluster}
  143. FROM KAFKA CONNECTION csr_conn (TOPIC 'testdrive-csr_test-${testdrive.seed}')
  144. FORMAT AVRO
  145. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  146. ENVELOPE DEBEZIUM
  147. contains:is not a KAFKA CONNECTION
  148. > CREATE CLUSTER csr_source_cluster SIZE '${arg.default-storage-size}';
  149. > CREATE SOURCE csr_source
  150. IN CLUSTER csr_source_cluster
  151. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-csr_test-${testdrive.seed}')
  152. FORMAT AVRO
  153. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  154. ENVELOPE DEBEZIUM
  155. > SELECT * from csr_source
  156. id creature
  157. -----------
  158. 1 lizard
  159. > CREATE CONNECTION broker_connection TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  160. > CREATE CLUSTER two_connection_source_cluster SIZE '${arg.default-storage-size}';
  161. > CREATE SOURCE two_connection_source
  162. IN CLUSTER two_connection_source_cluster
  163. FROM KAFKA CONNECTION broker_connection (TOPIC 'testdrive-csr_test-${testdrive.seed}')
  164. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  165. ENVELOPE DEBEZIUM
  166. > SELECT * from two_connection_source
  167. id creature
  168. -----------
  169. 1 lizard
  170. ! DROP CONNECTION csr_conn
  171. contains:depended upon by source "csr_source"
  172. > DROP CONNECTION csr_conn CASCADE
  173. ! CREATE SOURCE should_fail
  174. IN CLUSTER ${arg.single-replica-cluster}
  175. FROM KAFKA CONNECTION does_not_exist (TOPIC 'testdrive-error_topic-${testdrive.seed}')
  176. FORMAT TEXT
  177. contains: unknown catalog item 'does_not_exist'
  178. ! CREATE SOURCE should_fail
  179. IN CLUSTER ${arg.single-replica-cluster}
  180. FROM KAFKA CONNECTION kafka_conn
  181. (TOPIC 'testdrive-csr_test-${testdrive.seed}')
  182. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION does_not_exist
  183. ENVELOPE DEBEZIUM
  184. contains: unknown catalog item 'does_not_exist'
  185. # Test protobuf CSR connection
  186. # Duplicated from protobuf-import.td since once a topic has been read we can only create the source again by forcing offsets which is itself a different test case
  187. $ set empty-schema
  188. syntax = "proto3";
  189. $ set importee-schema
  190. syntax = "proto3";
  191. import "google/protobuf/timestamp.proto";
  192. message Importee1 {
  193. bool b = 1;
  194. }
  195. message Importee2 {
  196. google.protobuf.Timestamp ts = 3;
  197. }
  198. $ set importer-schema
  199. syntax = "proto3";
  200. import "empty.proto";
  201. import "importee.proto";
  202. message Importer {
  203. Importee1 importee1 = 1;
  204. Importee2 importee2 = 2;
  205. }
  206. $ file-append path=empty.proto
  207. \${empty-schema}
  208. $ file-append path=importee.proto
  209. \${importee-schema}
  210. $ file-append path=importer.proto
  211. \${importer-schema}
  212. $ protobuf-compile-descriptors inputs=empty.proto,importee.proto,importer.proto output=import.pb set-var=import-schema
  213. $ kafka-create-topic topic=import-csr partitions=1
  214. # The Confluent toolchain publishes even schemas for well-known types, so we
  215. # have to do the same.
  216. # See: https://github.com/protocolbuffers/protobuf/blob/61e0395c89fe520ae7569aea6838313195e05ec5/src/google/protobuf/timestamp.proto
  217. $ schema-registry-publish subject=google/protobuf/timestamp.proto schema-type=protobuf
  218. syntax = "proto3";
  219. package google.protobuf;
  220. message Timestamp {
  221. int64 seconds = 1;
  222. int32 nanos = 2;
  223. }
  224. $ schema-registry-publish subject=empty.proto schema-type=protobuf
  225. \${empty-schema}
  226. $ schema-registry-publish subject=importee.proto schema-type=protobuf references=google/protobuf/timestamp.proto
  227. \${importee-schema}
  228. $ schema-registry-publish subject=testdrive-import-csr-${testdrive.seed}-value schema-type=protobuf references=empty.proto,importee.proto
  229. \${importer-schema}
  230. $ kafka-ingest topic=import-csr format=protobuf descriptor-file=import.pb message=Importer confluent-wire-format=true
  231. {"importee1": {"b": false}, "importee2": {"ts": "1970-01-01T00:20:34.000005678Z"}}
  232. > CREATE CONNECTION proto_csr TO CONFLUENT SCHEMA REGISTRY (
  233. URL '${testdrive.schema-registry-url}'
  234. )
  235. > CREATE CLUSTER import_connection_csr_cluster SIZE '${arg.default-storage-size}';
  236. > CREATE SOURCE import_connection_csr
  237. IN CLUSTER import_connection_csr_cluster
  238. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-import-csr-${testdrive.seed}')
  239. FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION proto_csr
  240. > SELECT importee1::text, importee2::text FROM import_connection_csr
  241. importee1 importee2
  242. --------------------------------
  243. (f) "(\"(1234,5678)\")"
  244. # SSH
  245. ! CREATE CONNECTION ssh_conn TO SSH TUNNEL (
  246. HOST 'host',
  247. USER 'user',
  248. PORT 1,
  249. REGION = 'abc'
  250. );
  251. contains:SSH TUNNEL connections do not support REGION values
  252. > CREATE CONNECTION ssh_conn TO SSH TUNNEL (
  253. HOST 'host',
  254. USER 'user',
  255. PORT 1
  256. );
  257. > SELECT name, public_key_1 LIKE 'ssh-ed25519%' public_key_1
  258. FROM mz_ssh_tunnel_connections
  259. JOIN mz_connections USING (id)
  260. name public_key_1
  261. -----------
  262. ssh_conn true
  263. # Test invalid connection parameter combinations
  264. ## Kafka
  265. ! CREATE CONNECTION not_a_secret TO KAFKA (
  266. BROKER '',
  267. SASL PASSWORD = '',
  268. SECURITY PROTOCOL PLAINTEXT
  269. );
  270. contains:invalid SASL PASSWORD: must provide a secret value
  271. ! CREATE CONNECTION not_a_secret TO KAFKA (
  272. BROKER '',
  273. SSL KEY = '',
  274. SECURITY PROTOCOL PLAINTEXT
  275. );
  276. contains:invalid SSL KEY: must provide a secret value
  277. ! CREATE CONNECTION duplicate_option TO KAFKA (
  278. BROKER '',
  279. BROKER '',
  280. SECURITY PROTOCOL PLAINTEXT
  281. );
  282. contains:BROKER specified more than once
  283. ! CREATE CONNECTION no_broker TO KAFKA (
  284. SECURITY PROTOCOL PLAINTEXT
  285. );
  286. contains:must set one of BROKER, BROKERS, or AWS PRIVATELINK
  287. ! CREATE CONNECTION ssl_underspeced TO KAFKA (
  288. BROKER 'kafka:9092',
  289. BROKERS ['kafka:9092', 'kafka:9093'],
  290. SECURITY PROTOCOL PLAINTEXT
  291. );
  292. contains:can only set one of BROKER, BROKERS, or AWS PRIVATELINK
  293. ! CREATE CONNECTION ssl_underspeced TO KAFKA (
  294. BROKER 'kafka:9092',
  295. SSL CERTIFICATE = ''
  296. );
  297. contains:SSL KEY must be specified with SSL CERTIFICATE
  298. ! CREATE CONNECTION sasl_underspeced TO KAFKA (
  299. BROKER 'kafka:9092',
  300. SASL MECHANISMS = 'PLAIN'
  301. );
  302. contains:SASL USERNAME must be specified
  303. > CREATE CONNECTION kafka_sasl_lowercase_string_mechanism TO KAFKA (
  304. BROKER 'kafka:9092',
  305. SASL MECHANISMS = 'plain',
  306. SASL USERNAME = 'materialize',
  307. SASL PASSWORD = SECRET s
  308. ) WITH (VALIDATE = FALSE);
  309. > CREATE CONNECTION kafka_sasl_spongebob_string_mechanism TO KAFKA (
  310. BROKER 'kafka:9092',
  311. SASL MECHANISMS = 'pLaIN',
  312. SASL USERNAME = 'materialize',
  313. SASL PASSWORD = SECRET s
  314. ) WITH (VALIDATE = FALSE);
  315. > CREATE CONNECTION kafka_sasl_uppercase_ident_mechanism TO KAFKA (
  316. BROKER 'kafka:9092',
  317. SASL MECHANISMS = PLAIN,
  318. SASL USERNAME = 'materialize',
  319. SASL PASSWORD = SECRET s
  320. ) WITH (VALIDATE = FALSE);
  321. > CREATE CONNECTION kafka_sasl_spongebob_ident_mechanism TO KAFKA (
  322. BROKER 'kafka:9092',
  323. SASL MECHANISMS = pLaIN,
  324. SASL USERNAME = 'materialize',
  325. SASL PASSWORD = SECRET s
  326. ) WITH (VALIDATE = FALSE);
  327. ! CREATE CONNECTION multiple_brokers TO KAFKA (
  328. BROKER 'kafka:9092, kafka:9093',
  329. SECURITY PROTOCOL PLAINTEXT
  330. );
  331. contains:cannot specify multiple Kafka broker addresses in one string
  332. ! CREATE CONNECTION multiple_brokers TO KAFKA (
  333. BROKERS ['kafka:9092, kafka:9093'],
  334. SECURITY PROTOCOL PLAINTEXT
  335. );
  336. contains:cannot specify multiple Kafka broker addresses in one string
  337. ## CSR
  338. ! CREATE CONNECTION missing_url TO CONFLUENT SCHEMA REGISTRY (
  339. USERNAME 'foo'
  340. );
  341. contains:must specify URL
  342. ! CREATE CONNECTION missing_cert TO CONFLUENT SCHEMA REGISTRY (
  343. URL 'http://localhost',
  344. SSL KEY = SECRET s
  345. );
  346. contains: requires both SSL KEY and SSL CERTIFICATE
  347. ! CREATE CONNECTION missing_key TO CONFLUENT SCHEMA REGISTRY (
  348. URL 'http://localhost',
  349. SSL CERTIFICATE = ''
  350. );
  351. contains: requires both SSL KEY and SSL CERTIFICATE
  352. ! CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  353. URL '${testdrive.schema-registry-url}/FOO/BAR/BAZ'
  354. );
  355. contains: URL must have an empty path
  356. ## SSH
  357. ! CREATE CONNECTION missing_user TO SSH TUNNEL (
  358. USER 'foo'
  359. );
  360. contains: HOST option is required
  361. ## AWS PrivateLink
  362. ! CREATE CONNECTION conn1 TO KAFKA (BROKER '${testdrive.kafka-addr}' USING AWS PRIVATELINK foo (PORT 9093), SECURITY PROTOCOL PLAINTEXT);
  363. contains: unknown catalog item 'foo'
  364. ! CREATE CONNECTION conn1 TO CONFLUENT SCHEMA REGISTRY (AWS PRIVATELINK foo, PORT 8080)
  365. contains: unknown catalog item 'foo'
  366. ! CREATE CONNECTION pgconn TO POSTGRES (AWS PRIVATELINK foo, PORT 1234)
  367. contains: unknown catalog item 'foo'
  368. # Error in mzcompose: AWS PrivateLink connections are not supported
  369. # Error in cloudtest/K8s: creating AWS PrivateLink Connection would violate max_aws_privatelink_connections limit
  370. ! CREATE CONNECTION privatelinkconn TO AWS PRIVATELINK (
  371. SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
  372. AVAILABILITY ZONES ('use1-az1', 'use1-az4')
  373. )
  374. contains: AWS PrivateLink