kafka-source.td 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # Test creating a Kafka source using SSH, with and without the CSR
  10. $ kafka-create-topic topic=thetopic
  11. $ kafka-ingest topic=thetopic format=bytes
  12. one
  13. # Create a dedicated cluster for the sources, so we can easily restart the
  14. # sources by dropping and recreating the cluster's replica.
  15. #
  16. # We also use a large number of worker threads to protect against past behavior
  17. # in which we opened one SSH tunnel per worker thread per broker, which
  18. # tripped the default `MaxStartups 10` sshd configuration.
  19. > DROP CLUSTER IF EXISTS sc;
  20. > CREATE CLUSTER sc REPLICAS (r1 (SIZE '32'))
  21. # Test the various types of tunnels
  22. > CREATE CONNECTION kafka_conn_using
  23. TO KAFKA (BROKER '${testdrive.kafka-addr}' USING SSH TUNNEL thancred, SECURITY PROTOCOL PLAINTEXT);
  24. > CREATE CONNECTION kafka_conn_dynamic
  25. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT, SSH TUNNEL thancred);
  26. > CREATE SOURCE fixed_text IN CLUSTER sc
  27. FROM KAFKA CONNECTION kafka_conn_using (TOPIC 'testdrive-thetopic-${testdrive.seed}')
  28. > CREATE TABLE fixed_text_tbl FROM SOURCE fixed_text (REFERENCE "testdrive-thetopic-${testdrive.seed}")
  29. FORMAT TEXT
  30. ENVELOPE NONE
  31. > CREATE SOURCE dynamic_text IN CLUSTER sc
  32. FROM KAFKA CONNECTION kafka_conn_dynamic (TOPIC 'testdrive-thetopic-${testdrive.seed}')
  33. > CREATE TABLE dynamic_text_tbl FROM SOURCE dynamic_text (REFERENCE "testdrive-thetopic-${testdrive.seed}")
  34. FORMAT TEXT
  35. ENVELOPE NONE
  36. > SELECT * FROM fixed_text_tbl
  37. text
  38. ----
  39. one
  40. $ kafka-ingest topic=thetopic format=bytes
  41. two
  42. # Ensure both types of tunnels work
  43. > SELECT * FROM fixed_text_tbl
  44. text
  45. ----
  46. one
  47. two
  48. > SELECT * FROM dynamic_text_tbl
  49. text
  50. ----
  51. one
  52. two
  53. > CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (
  54. URL '${testdrive.schema-registry-url}',
  55. SSH TUNNEL thancred
  56. );
  57. $ set schema={
  58. "type" : "record",
  59. "name" : "test",
  60. "fields" : [
  61. {"name":"f1", "type":"string"},
  62. {"name":"f2", "type":"long"}
  63. ]
  64. }
  65. $ kafka-create-topic topic=avroavro
  66. $ kafka-ingest format=avro topic=avroavro schema=${schema}
  67. {"f1": "fish", "f2": 1000}
  68. > CREATE SOURCE fixed_plus_csr
  69. IN CLUSTER sc
  70. FROM KAFKA CONNECTION kafka_conn_using (TOPIC 'testdrive-avroavro-${testdrive.seed}')
  71. > CREATE TABLE fixed_plus_csr_tbl FROM SOURCE fixed_plus_csr (REFERENCE "testdrive-avroavro-${testdrive.seed}")
  72. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  73. ENVELOPE NONE
  74. > SELECT * FROM fixed_plus_csr_tbl
  75. f1 f2
  76. ----------
  77. fish 1000
  78. # Test csr sources for dynamic connections as well.
  79. > CREATE SOURCE dynamic_plus_csr
  80. IN CLUSTER sc
  81. FROM KAFKA CONNECTION kafka_conn_dynamic (TOPIC 'testdrive-avroavro-${testdrive.seed}')
  82. > CREATE TABLE dynamic_plus_csr_tbl FROM SOURCE dynamic_plus_csr (REFERENCE "testdrive-avroavro-${testdrive.seed}")
  83. FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  84. ENVELOPE NONE
  85. > SELECT * FROM dynamic_plus_csr_tbl
  86. f1 f2
  87. ----------
  88. fish 1000
  89. # ensure they all were marked as running correctly
  90. > SELECT status FROM mz_internal.mz_source_statuses st
  91. JOIN mz_sources s ON st.id = s.id
  92. WHERE s.name in ('fixed_text', 'dynamic_text', 'fixed_plus_csr', 'dynamic_plus_csr')
  93. running
  94. running
  95. running
  96. running