fivetran-destination.td 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. # Exercise the Fivetran Destination.
  10. > SELECT 1;
  11. 1
  12. > CREATE SCHEMA IF NOT EXISTS foo;
  13. > CREATE TABLE foo.bar (a int, b text);
  14. # To identify primary keys we leave a magic comment.
  15. > COMMENT ON COLUMN foo.bar.a IS 'mz_is_primary_key';
  16. $ fivetran-destination action=describe
  17. {
  18. "schema_name": "foo",
  19. "table_name": "bar"
  20. }
  21. {
  22. "response": {
  23. "Table": {
  24. "name": "bar",
  25. "columns": [
  26. {
  27. "name": "a",
  28. "type": 3,
  29. "primary_key": true
  30. },
  31. {
  32. "name": "b",
  33. "type": 13,
  34. "primary_key": false
  35. }
  36. ]
  37. }
  38. }
  39. }
  40. $ file-append container=fivetran path=a.csv compression=gzip
  41. a,b
  42. 1000,hello
  43. 2000,hello
  44. 3000,hello
  45. # Note: The columns on the table are in the opposite order, the Fivetran Destination should re-map
  46. # them.
  47. $ file-append container=fivetran path=b.csv compression=gzip
  48. b,a
  49. world,100
  50. world,200
  51. world,300
  52. $ fivetran-destination action=write_batch
  53. {
  54. "schema_name": "foo",
  55. "table_name": "bar",
  56. "table": {
  57. "name": "bar",
  58. "columns": [
  59. {
  60. "name": "a",
  61. "type": 3,
  62. "primary_key": true
  63. },
  64. {
  65. "name": "b",
  66. "type": 13,
  67. "primary_key": false
  68. }
  69. ]
  70. },
  71. "keys": {},
  72. "replace_files": [
  73. "${testdrive.fivetran-destination-files-path}/a.csv",
  74. "${testdrive.fivetran-destination-files-path}/b.csv"
  75. ],
  76. "update_files": [],
  77. "delete_files": [],
  78. "file_params": {
  79. "compression": 2,
  80. "encryption": 0,
  81. "null_string": "null-123",
  82. "unmodified_string": "unmodified-123"
  83. }
  84. }
  85. {
  86. "response": {
  87. "Success": true
  88. }
  89. }
  90. > SELECT a, b FROM foo.bar ORDER BY a DESC;
  91. 100 world
  92. 200 world
  93. 300 world
  94. 1000 hello
  95. 2000 hello
  96. 3000 hello
  97. > CREATE TABLE foo.large (a int, b text, c text, d int)
  98. # Set the max copy from size to 100MiB.
  99. $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
  100. ALTER SYSTEM SET max_copy_from_size = 104857600;
  101. # Repeating this line 1,300,000 times should get us close to 100MiB.
  102. $ file-append container=fivetran path=c.csv compression=gzip header=a,b,c,d repeat=1300000
  103. 5000,"I am a large log line, <- can this comma mess us up?",foo_bar_baz,10
  104. $ file-append container=fivetran path=d.csv compression=gzip header=a,b,c,d repeat=1300000
  105. 5000,"I am a large log line, <- can this comma mess us up?",foo_bar_baz,10
  106. # Note: Both 'c.csv' and 'd.csv' are individually under the 'max_copy_from_size', but together
  107. # they exceed it. We want to make sure the write_batch still succeeds in this case.
  108. $ fivetran-destination action=write_batch
  109. {
  110. "schema_name": "foo",
  111. "table_name": "large",
  112. "table": {
  113. "name": "large",
  114. "columns": [
  115. {
  116. "name": "a",
  117. "type": 3,
  118. "primary_key": true
  119. },
  120. {
  121. "name": "b",
  122. "type": 13,
  123. "primary_key": false
  124. },
  125. {
  126. "name": "c",
  127. "type": 13,
  128. "primary_key": false
  129. },
  130. {
  131. "name": "d",
  132. "type": 3,
  133. "primary_key": false
  134. }
  135. ]
  136. },
  137. "keys": {},
  138. "replace_files": [
  139. "${testdrive.fivetran-destination-files-path}/c.csv",
  140. "${testdrive.fivetran-destination-files-path}/d.csv"
  141. ],
  142. "update_files": [],
  143. "delete_files": [],
  144. "file_params": {
  145. "compression": 2,
  146. "encryption": 0,
  147. "null_string": "null-123",
  148. "unmodified_string": "unmodified-123"
  149. }
  150. }
  151. {
  152. "response": {
  153. "Success": true
  154. }
  155. }
  156. > SELECT COUNT(*) FROM foo.large;
  157. 2600000
  158. > SELECT * FROM foo.large LIMIT 1;
  159. 5000 "I am a large log line, <- can this comma mess us up?" foo_bar_baz 10
  160. # Try copying a file that is over the limit.
  161. $ file-append container=fivetran path=too_large.csv compression=gzip header=a,b,c,d repeat=2000000
  162. 5000,"I am a large log line, <- can this comma mess us up?",foo_bar_baz,10
  163. $ fivetran-destination action=write_batch
  164. {
  165. "schema_name": "foo",
  166. "table_name": "large",
  167. "table": {
  168. "name": "large",
  169. "columns": [
  170. {
  171. "name": "a",
  172. "type": 3,
  173. "primary_key": true
  174. },
  175. {
  176. "name": "b",
  177. "type": 13,
  178. "primary_key": false
  179. },
  180. {
  181. "name": "c",
  182. "type": 13,
  183. "primary_key": false
  184. },
  185. {
  186. "name": "d",
  187. "type": 3,
  188. "primary_key": false
  189. }
  190. ]
  191. },
  192. "keys": {},
  193. "replace_files": ["${testdrive.fivetran-destination-files-path}/too_large.csv"],
  194. "update_files": [],
  195. "delete_files": [],
  196. "file_params": {
  197. "compression": 2,
  198. "encryption": 0,
  199. "null_string": "null-123",
  200. "unmodified_string": "unmodified-123"
  201. }
  202. }
  203. {
  204. "response": {
  205. "Warning": {"message": "error when calling Materialize: Error { kind: Db, cause: Some(DbError { severity: \"ERROR\", parsed_severity: None, code: SqlState(E53000), message: \"COPY FROM STDIN too large\", detail: None, hint: None, position: None, where_: None, schema: None, table: None, column: None, datatype: None, constraint: None, file: None, line: None, routine: None }) }: closing sink: replace_files: replace files: write_batch"}
  206. }
  207. }
  208. # No new data should have been inserted.
  209. > SELECT COUNT(*) FROM foo.large;
  210. 2600000
  211. # Cleanup.
  212. > DROP SCHEMA foo CASCADE;
  213. $ file-delete container=fivetran path=a.csv
  214. $ file-delete container=fivetran path=b.csv
  215. $ file-delete container=fivetran path=c.csv
  216. $ file-delete container=fivetran path=d.csv
  217. $ file-delete container=fivetran path=too_large.csv