connection-create-drop.td 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set-arg-default default-storage-size=1
  10. $ set-arg-default single-replica-cluster=quickstart
  11. #
  12. # Test basic connection functionality
  13. $ skip-consistency-checks reason="workflow uses SSH keys which we currently can't check"
  14. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  15. ALTER SYSTEM SET enable_connection_validation_syntax = true
  16. ###
  17. # Test core functionality by creating, introspecting and dropping a connection
  18. ###
  19. $ kafka-create-topic topic=connection_test partitions=1
  20. $ kafka-ingest format=bytes topic=connection_test
  21. 1,2
  22. 2,3
  23. > CREATE SECRET s AS '...';
  24. ! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT, ACCESS KEY ID = 'abc')
  25. contains:KAFKA connections do not support ACCESS KEY ID values
  26. ! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT, ACCESS KEY ID = 'abc', PORT = 1)
  27. contains:KAFKA connections do not support ACCESS KEY ID, PORT values
  28. ! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL NOTHINGREAL)
  29. contains: unknown security protocol: NOTHINGREAL
  30. ! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SASL MECHANISMS 'PLAIN', SASL USERNAME 'materialize', SASL PASSWORD SECRET s, SSL KEY SECRET s)
  31. contains: option SSL KEY not supported with this configuration
  32. ! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT, SASL USERNAME 'materialize')
  33. contains: option SASL USERNAME not supported with this configuration
  34. ! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT, PROGRESS TOPIC REPLICATION FACTOR -4)
  35. contains:PROGRESS TOPIC REPLICATION FACTOR must be greater than 0
  36. > CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  37. > SELECT name, type from mz_connections WHERE id LIKE 'u%'
  38. name type
  39. ------------------------------
  40. testconn kafka
  41. > SHOW CONNECTIONS
  42. testconn kafka ""
  43. > SHOW CREATE CONNECTION testconn
  44. name create_sql
  45. ---------------------------------
  46. materialize.public.testconn "CREATE CONNECTION materialize.public.testconn TO KAFKA (BROKER = '${testdrive.kafka-addr}', SECURITY PROTOCOL = plaintext);"
  47. > SELECT
  48. brokers,
  49. sink_progress_topic = '_materialize-progress-' || mz_environment_id() || '-' || id
  50. FROM mz_kafka_connections
  51. JOIN mz_connections USING (id)
  52. WHERE name = 'testconn'
  53. {${testdrive.kafka-addr}} true
  54. > DROP CONNECTION testconn
  55. > CREATE CONNECTION progress_override TO KAFKA (
  56. BROKER '${testdrive.kafka-addr}',
  57. PROGRESS TOPIC 'override_topic',
  58. PROGRESS TOPIC REPLICATION FACTOR 1,
  59. SECURITY PROTOCOL PLAINTEXT
  60. )
  61. > SELECT
  62. brokers, sink_progress_topic
  63. FROM mz_kafka_connections
  64. JOIN mz_connections USING (id)
  65. WHERE name = 'progress_override'
  66. {${testdrive.kafka-addr}} override_topic
  67. ###
  68. # Test that connections work in creating a source
  69. ###
  70. > CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  71. > CREATE CLUSTER connection_source_cluster SIZE '${arg.default-storage-size}';
  72. > CREATE SOURCE connection_source
  73. IN CLUSTER connection_source_cluster
  74. FROM KAFKA CONNECTION testconn (TOPIC 'testdrive-connection_test-${testdrive.seed}')
  75. > CREATE TABLE connection_source_tbl (first, second)
  76. FROM SOURCE connection_source (REFERENCE "testdrive-connection_test-${testdrive.seed}")
  77. FORMAT CSV WITH 2 COLUMNS
  78. > SELECT * FROM connection_source_tbl
  79. first second
  80. ------------
  81. 1 2
  82. 2 3
  83. # Confirm we cannot drop the connection while a source depends upon it
  84. ! DROP CONNECTION testconn;
  85. contains:depended upon by source "connection_source"
  86. # Confirm the drop works if we add cascade
  87. > DROP CONNECTION testconn CASCADE;
  88. # Validate the cascading drop actually happened
  89. ! SELECT * FROM connection_source_tbl
  90. contains:unknown catalog item 'connection_source_tbl'
  91. ! SELECT * FROM connection_source
  92. contains:unknown catalog item 'connection_source'
  93. ###
  94. # Test schema registry connection create and drop
  95. ###
  96. # Setup kafka topic with schema
  97. # must be a subset of the keys in the rows
  98. $ set keyschema={
  99. "type": "record",
  100. "name": "Key",
  101. "fields": [
  102. {"name": "id", "type": "long"}
  103. ]
  104. }
  105. $ set schema={
  106. "type" : "record",
  107. "name" : "envelope",
  108. "fields" : [
  109. {
  110. "name": "before",
  111. "type": [
  112. {
  113. "name": "row",
  114. "type": "record",
  115. "fields": [
  116. {
  117. "name": "id",
  118. "type": "long"
  119. },
  120. {
  121. "name": "creature",
  122. "type": "string"
  123. }]
  124. },
  125. "null"
  126. ]
  127. },
  128. {
  129. "name": "after",
  130. "type": ["row", "null"]
  131. }
  132. ]
  133. }
  134. $ kafka-create-topic topic=csr_test partitions=1
  135. $ kafka-ingest format=avro topic=csr_test key-format=avro key-schema=${keyschema} schema=${schema} timestamp=1
  136. {"id": 1} {"before": {"row": {"id": 1, "creature": "fish"}}, "after": {"row": {"id": 1, "creature": "mudskipper"}}}
  137. {"id": 1} {"before": {"row": {"id": 1, "creature": "mudskipper"}}, "after": {"row": {"id": 1, "creature": "salamander"}}}
  138. {"id": 1} {"before": {"row": {"id": 1, "creature": "salamander"}}, "after": {"row": {"id": 1, "creature": "lizard"}}}
  139. > CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (
  140. URL '${testdrive.schema-registry-url}'
  141. );
  142. ! CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (
  143. URL '${testdrive.schema-registry-url}',
  144. SESSION TOKEN = 'abc'
  145. );
  146. contains:CONFLUENT SCHEMA REGISTRY connections do not support SESSION TOKEN values
  147. > CREATE CONNECTION kafka_conn
  148. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT);
  149. ! CREATE SOURCE csr_source
  150. IN CLUSTER ${arg.single-replica-cluster}
  151. FROM KAFKA CONNECTION csr_conn (TOPIC 'testdrive-csr_test-${testdrive.seed}')
  152. contains:is not a KAFKA CONNECTION
  153. > CREATE CLUSTER csr_source_cluster SIZE '${arg.default-storage-size}';
  154. > CREATE SOURCE csr_source
  155. IN CLUSTER csr_source_cluster
  156. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-csr_test-${testdrive.seed}')
  157. > CREATE TABLE csr_source_tbl FROM SOURCE csr_source (REFERENCE "testdrive-csr_test-${testdrive.seed}")
  158. FORMAT AVRO
  159. USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  160. ENVELOPE DEBEZIUM
  161. > SELECT * from csr_source_tbl
  162. id creature
  163. -----------
  164. 1 lizard
  165. > CREATE CONNECTION broker_connection TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  166. > CREATE CLUSTER two_connection_source_cluster SIZE '${arg.default-storage-size}';
  167. > CREATE SOURCE two_connection_source
  168. IN CLUSTER two_connection_source_cluster
  169. FROM KAFKA CONNECTION broker_connection (TOPIC 'testdrive-csr_test-${testdrive.seed}')
  170. > CREATE TABLE two_connection_source_tbl FROM SOURCE two_connection_source (REFERENCE "testdrive-csr_test-${testdrive.seed}")
  171. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  172. ENVELOPE DEBEZIUM
  173. > SELECT * from two_connection_source_tbl
  174. id creature
  175. -----------
  176. 1 lizard
  177. ! DROP CONNECTION csr_conn
  178. contains:depended upon by table "csr_source_tbl"
  179. > DROP CONNECTION csr_conn CASCADE
  180. ! CREATE SOURCE should_fail
  181. IN CLUSTER ${arg.single-replica-cluster}
  182. FROM KAFKA CONNECTION does_not_exist (TOPIC 'testdrive-error_topic-${testdrive.seed}')
  183. contains: unknown catalog item 'does_not_exist'
  184. > CREATE SOURCE source
  185. IN CLUSTER ${arg.single-replica-cluster}
  186. FROM KAFKA CONNECTION kafka_conn
  187. (TOPIC 'testdrive-csr_test-${testdrive.seed}')
  188. ! CREATE TABLE should_fail_tbl FROM SOURCE source (REFERENCE "testdrive-csr_test-${testdrive.seed}")
  189. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION does_not_exist
  190. ENVELOPE DEBEZIUM
  191. contains: unknown catalog item 'does_not_exist'
  192. # Test protobuf CSR connection
  193. # Duplicated from protobuf-import.td since once a topic has been read we can only create the source again by forcing offsets which is itself a different test case
  194. $ set empty-schema
  195. syntax = "proto3";
  196. $ set importee-schema
  197. syntax = "proto3";
  198. import "google/protobuf/timestamp.proto";
  199. message Importee1 {
  200. bool b = 1;
  201. }
  202. message Importee2 {
  203. google.protobuf.Timestamp ts = 3;
  204. }
  205. $ set importer-schema
  206. syntax = "proto3";
  207. import "empty.proto";
  208. import "importee.proto";
  209. message Importer {
  210. Importee1 importee1 = 1;
  211. Importee2 importee2 = 2;
  212. }
  213. $ file-append path=empty.proto
  214. \${empty-schema}
  215. $ file-append path=importee.proto
  216. \${importee-schema}
  217. $ file-append path=importer.proto
  218. \${importer-schema}
  219. $ protobuf-compile-descriptors inputs=empty.proto,importee.proto,importer.proto output=import.pb set-var=import-schema
  220. $ kafka-create-topic topic=import-csr partitions=1
  221. # The Confluent toolchain publishes even schemas for well-known types, so we
  222. # have to do the same.
  223. # See: https://github.com/protocolbuffers/protobuf/blob/61e0395c89fe520ae7569aea6838313195e05ec5/src/google/protobuf/timestamp.proto
  224. $ schema-registry-publish subject=google/protobuf/timestamp.proto schema-type=protobuf
  225. syntax = "proto3";
  226. package google.protobuf;
  227. message Timestamp {
  228. int64 seconds = 1;
  229. int32 nanos = 2;
  230. }
  231. $ schema-registry-publish subject=empty.proto schema-type=protobuf
  232. \${empty-schema}
  233. $ schema-registry-publish subject=importee.proto schema-type=protobuf references=google/protobuf/timestamp.proto
  234. \${importee-schema}
  235. $ schema-registry-publish subject=testdrive-import-csr-${testdrive.seed}-value schema-type=protobuf references=empty.proto,importee.proto
  236. \${importer-schema}
  237. $ kafka-ingest topic=import-csr format=protobuf descriptor-file=import.pb message=Importer confluent-wire-format=true
  238. {"importee1": {"b": false}, "importee2": {"ts": "1970-01-01T00:20:34.000005678Z"}}
  239. > CREATE CONNECTION proto_csr TO CONFLUENT SCHEMA REGISTRY (
  240. URL '${testdrive.schema-registry-url}'
  241. )
  242. > CREATE CLUSTER import_connection_csr_cluster SIZE '${arg.default-storage-size}';
  243. > CREATE SOURCE import_connection_csr
  244. IN CLUSTER import_connection_csr_cluster
  245. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-import-csr-${testdrive.seed}')
  246. > CREATE TABLE import_connection_csr_tbl FROM SOURCE import_connection_csr (REFERENCE "testdrive-import-csr-${testdrive.seed}")
  247. FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION proto_csr
  248. > SELECT importee1::text, importee2::text FROM import_connection_csr_tbl
  249. importee1 importee2
  250. --------------------------------
  251. (f) "(\"(1234,5678)\")"
  252. # SSH
  253. ! CREATE CONNECTION ssh_conn TO SSH TUNNEL (
  254. HOST 'host',
  255. USER 'user',
  256. PORT 1,
  257. REGION = 'abc'
  258. );
  259. contains:SSH TUNNEL connections do not support REGION values
  260. > CREATE CONNECTION ssh_conn TO SSH TUNNEL (
  261. HOST 'host',
  262. USER 'user',
  263. PORT 1
  264. );
  265. > SELECT name, public_key_1 LIKE 'ssh-ed25519%' public_key_1
  266. FROM mz_ssh_tunnel_connections
  267. JOIN mz_connections USING (id)
  268. name public_key_1
  269. -----------
  270. ssh_conn true
  271. # Test invalid connection parameter combinations
  272. ## Kafka
  273. ! CREATE CONNECTION not_a_secret TO KAFKA (
  274. BROKER '',
  275. SASL PASSWORD = '',
  276. SECURITY PROTOCOL PLAINTEXT
  277. );
  278. contains:invalid SASL PASSWORD: must provide a secret value
  279. ! CREATE CONNECTION not_a_secret TO KAFKA (
  280. BROKER '',
  281. SSL KEY = '',
  282. SECURITY PROTOCOL PLAINTEXT
  283. );
  284. contains:invalid SSL KEY: must provide a secret value
  285. ! CREATE CONNECTION duplicate_option TO KAFKA (
  286. BROKER '',
  287. BROKER '',
  288. SECURITY PROTOCOL PLAINTEXT
  289. );
  290. contains:BROKER specified more than once
  291. ! CREATE CONNECTION no_broker TO KAFKA (
  292. SECURITY PROTOCOL PLAINTEXT
  293. );
  294. contains:must set one of BROKER, BROKERS, or AWS PRIVATELINK
  295. ! CREATE CONNECTION ssl_underspeced TO KAFKA (
  296. BROKER 'kafka:9092',
  297. BROKERS ['kafka:9092', 'kafka:9093'],
  298. SECURITY PROTOCOL PLAINTEXT
  299. );
  300. contains:can only set one of BROKER, BROKERS, or AWS PRIVATELINK
  301. ! CREATE CONNECTION ssl_underspeced TO KAFKA (
  302. BROKER 'kafka:9092',
  303. SSL CERTIFICATE = ''
  304. );
  305. contains:SSL KEY must be specified with SSL CERTIFICATE
  306. ! CREATE CONNECTION sasl_underspeced TO KAFKA (
  307. BROKER 'kafka:9092',
  308. SASL MECHANISMS = 'PLAIN'
  309. );
  310. contains:SASL USERNAME must be specified
  311. > CREATE CONNECTION kafka_sasl_lowercase_string_mechanism TO KAFKA (
  312. BROKER 'kafka:9092',
  313. SASL MECHANISMS = 'plain',
  314. SASL USERNAME = 'materialize',
  315. SASL PASSWORD = SECRET s
  316. ) WITH (VALIDATE = FALSE);
  317. > CREATE CONNECTION kafka_sasl_spongebob_string_mechanism TO KAFKA (
  318. BROKER 'kafka:9092',
  319. SASL MECHANISMS = 'pLaIN',
  320. SASL USERNAME = 'materialize',
  321. SASL PASSWORD = SECRET s
  322. ) WITH (VALIDATE = FALSE);
  323. > CREATE CONNECTION kafka_sasl_uppercase_ident_mechanism TO KAFKA (
  324. BROKER 'kafka:9092',
  325. SASL MECHANISMS = PLAIN,
  326. SASL USERNAME = 'materialize',
  327. SASL PASSWORD = SECRET s
  328. ) WITH (VALIDATE = FALSE);
  329. > CREATE CONNECTION kafka_sasl_spongebob_ident_mechanism TO KAFKA (
  330. BROKER 'kafka:9092',
  331. SASL MECHANISMS = pLaIN,
  332. SASL USERNAME = 'materialize',
  333. SASL PASSWORD = SECRET s
  334. ) WITH (VALIDATE = FALSE);
  335. ! CREATE CONNECTION multiple_brokers TO KAFKA (
  336. BROKER 'kafka:9092, kafka:9093',
  337. SECURITY PROTOCOL PLAINTEXT
  338. );
  339. contains:cannot specify multiple Kafka broker addresses in one string
  340. ! CREATE CONNECTION multiple_brokers TO KAFKA (
  341. BROKERS ['kafka:9092, kafka:9093'],
  342. SECURITY PROTOCOL PLAINTEXT
  343. );
  344. contains:cannot specify multiple Kafka broker addresses in one string
  345. ## CSR
  346. ! CREATE CONNECTION missing_url TO CONFLUENT SCHEMA REGISTRY (
  347. USERNAME 'foo'
  348. );
  349. contains:must specify URL
  350. ! CREATE CONNECTION missing_cert TO CONFLUENT SCHEMA REGISTRY (
  351. URL 'http://localhost',
  352. SSL KEY = SECRET s
  353. );
  354. contains: requires both SSL KEY and SSL CERTIFICATE
  355. ! CREATE CONNECTION missing_key TO CONFLUENT SCHEMA REGISTRY (
  356. URL 'http://localhost',
  357. SSL CERTIFICATE = ''
  358. );
  359. contains: requires both SSL KEY and SSL CERTIFICATE
  360. ! CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
  361. URL '${testdrive.schema-registry-url}/FOO/BAR/BAZ'
  362. );
  363. contains: URL must have an empty path
  364. ## SSH
  365. ! CREATE CONNECTION missing_user TO SSH TUNNEL (
  366. USER 'foo'
  367. );
  368. contains: HOST option is required
  369. ## AWS PrivateLink
  370. ! CREATE CONNECTION conn1 TO KAFKA (BROKER '${testdrive.kafka-addr}' USING AWS PRIVATELINK foo (PORT 9093), SECURITY PROTOCOL PLAINTEXT);
  371. contains: unknown catalog item 'foo'
  372. ! CREATE CONNECTION conn1 TO CONFLUENT SCHEMA REGISTRY (AWS PRIVATELINK foo, PORT 8080)
  373. contains: unknown catalog item 'foo'
  374. ! CREATE CONNECTION pgconn TO POSTGRES (AWS PRIVATELINK foo, PORT 1234)
  375. contains: unknown catalog item 'foo'
  376. # Error in mzcompose: AWS PrivateLink connections are not supported
  377. # Error in cloudtest/K8s: creating AWS PrivateLink Connection would violate max_aws_privatelink_connections limit
  378. ! CREATE CONNECTION privatelinkconn TO AWS PRIVATELINK (
  379. SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc',
  380. AVAILABILITY ZONES ('use1-az1', 'use1-az4')
  381. )
  382. contains: AWS PrivateLink