source-timestamps.td 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # Tests to verify that source timestamps are rounded to the timestamp interval.
  10. > CREATE CLUSTER test SIZE '1'
  11. $ kafka-create-topic topic=test
  12. > CREATE CONNECTION kafka_conn
  13. TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
  14. > CREATE SOURCE kafka_src
  15. IN CLUSTER test
  16. FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-test-${testdrive.seed}')
  17. $ postgres-execute connection=postgres://postgres:postgres@postgres
  18. DROP PUBLICATION IF EXISTS mz_source
  19. CREATE TABLE t (x int);
  20. ALTER TABLE t REPLICA IDENTITY FULL;
  21. CREATE PUBLICATION mz_source FOR ALL TABLES
  22. > CREATE SECRET pg_pass AS 'postgres'
  23. > CREATE CONNECTION pg_conn TO POSTGRES (
  24. HOST postgres,
  25. DATABASE postgres,
  26. USER postgres,
  27. PASSWORD SECRET pg_pass
  28. )
  29. > CREATE SOURCE pg_src
  30. IN CLUSTER test
  31. FROM POSTGRES CONNECTION pg_conn (PUBLICATION 'mz_source')
  32. $ mysql-connect name=mysql url=mysql://root@mysql password=p@ssw0rd
  33. $ mysql-execute name=mysql
  34. DROP DATABASE IF EXISTS public;
  35. CREATE DATABASE public;
  36. USE public;
  37. CREATE TABLE t (x int)
  38. > CREATE SECRET mysql_pass AS 'p@ssw0rd';
  39. > CREATE CONNECTION mysql_conn TO MYSQL (
  40. HOST mysql,
  41. USER root,
  42. PASSWORD SECRET mysql_pass
  43. )
  44. > CREATE SOURCE mysql_src
  45. IN CLUSTER test
  46. FROM MYSQL CONNECTION mysql_conn;
  47. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  48. ALTER SYSTEM SET kafka_default_metadata_fetch_interval = '1s'
  49. ALTER SYSTEM SET pg_offset_known_interval = '1s'
  50. ALTER SYSTEM SET mysql_offset_known_interval = '1s'
  51. > SELECT name, write_frontier::text::uint8 % 1000 = 1
  52. FROM mz_internal.mz_frontiers
  53. JOIN mz_sources ON object_id = id
  54. WHERE id LIKE 'u%'
  55. kafka_src true
  56. kafka_src_progress true
  57. pg_src true
  58. pg_src_progress true
  59. mysql_src true
  60. mysql_src_progress true
  61. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  62. ALTER SYSTEM SET kafka_default_metadata_fetch_interval = 1234
  63. ALTER SYSTEM SET pg_offset_known_interval = 1234
  64. ALTER SYSTEM SET mysql_offset_known_interval = 1234
  65. > SELECT name, write_frontier::text::uint8 % 1234 = 1
  66. FROM mz_internal.mz_frontiers
  67. JOIN mz_sources ON object_id = id
  68. WHERE id LIKE 'u%'
  69. kafka_src true
  70. kafka_src_progress true
  71. pg_src true
  72. pg_src_progress true
  73. mysql_src true
  74. mysql_src_progress true
  75. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  76. ALTER SYSTEM SET kafka_default_metadata_fetch_interval = 500
  77. ALTER SYSTEM SET pg_offset_known_interval = 500
  78. ALTER SYSTEM SET mysql_offset_known_interval = 500
  79. > SELECT name, write_frontier::text::uint8 % 500 = 1
  80. FROM mz_internal.mz_frontiers
  81. JOIN mz_sources ON object_id = id
  82. WHERE id LIKE 'u%'
  83. kafka_src true
  84. kafka_src_progress true
  85. pg_src true
  86. pg_src_progress true
  87. mysql_src true
  88. mysql_src_progress true