smoketest.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. import unittest
  10. import psycopg
  11. import psycopg2 # type: ignore
  12. import sqlalchemy # type: ignore
  13. MATERIALIZED_URL = "postgresql://materialize@materialized:6875/materialize"
  14. class SmokeTest(unittest.TestCase):
  15. def test_connection_options(self) -> None:
  16. with psycopg.connect(MATERIALIZED_URL + "?options=--cluster%3Db") as conn:
  17. with conn.cursor() as cur:
  18. cur.execute("SHOW cluster")
  19. row = cur.fetchone()
  20. self.assertEqual(row, ("b",))
  21. def test_custom_types(self) -> None:
  22. with psycopg.connect(MATERIALIZED_URL, autocommit=True) as conn:
  23. # Text encoding of lists and maps is supported...
  24. with conn.cursor() as cur:
  25. cur.execute("SELECT LIST[1, 2, 3]")
  26. row = cur.fetchone()
  27. self.assertEqual(row, ("{1,2,3}",))
  28. cur.execute("SELECT '{a => 1, b => 2}'::map[text => int]")
  29. row = cur.fetchone()
  30. self.assertEqual(row, ("{a=>1,b=>2}",))
  31. # ...but binary encoding is not.
  32. with conn.cursor(binary=True) as cur:
  33. with self.assertRaisesRegex(
  34. psycopg.errors.ProtocolViolation,
  35. "binary encoding of list types is not implemented",
  36. ):
  37. cur.execute("SELECT LIST[1, 2, 3]")
  38. with self.assertRaisesRegex(
  39. psycopg.errors.ProtocolViolation,
  40. "binary encoding of map types is not implemented",
  41. ):
  42. cur.execute("SELECT '{a => 1, b => 2}'::map[text => int]")
  43. def test_arrays(self) -> None:
  44. with psycopg.connect(MATERIALIZED_URL, autocommit=True) as conn:
  45. # Text roundtripping of a one-dimensional integer array is supported.
  46. with conn.cursor() as cur:
  47. cur.execute("SELECT %t", ([1, 2, 3],))
  48. row = cur.fetchone()
  49. self.assertEqual(row, ([1, 2, 3],))
  50. # Text roundtripping of a two-dimensional integer array is
  51. # not supported.
  52. with conn.cursor() as cur:
  53. cur.execute("SELECT %t", ([[1], [2], [3]],))
  54. row = cur.fetchone()
  55. self.assertEqual(row, ([[1], [2], [3]],))
  56. # Binary roundtripping is not.
  57. with conn.cursor(binary=True) as cur:
  58. with self.assertRaisesRegex(
  59. psycopg.errors.InvalidParameterValue,
  60. "input of array types is not implemented",
  61. ):
  62. cur.execute("SELECT %b", ([1, 2, 3],))
  63. row = cur.fetchone()
  64. self.assertEqual(row, ([1, 2, 3],))
  65. def test_sqlalchemy(self) -> None:
  66. engine = sqlalchemy.engine.create_engine(MATERIALIZED_URL)
  67. results = [[c1, c2] for c1, c2 in engine.execute("VALUES (1, 2), (3, 4)")]
  68. self.assertEqual(results, [[1, 2], [3, 4]])
  69. def test_psycopg2_subscribe(self) -> None:
  70. """Test SUBSCRIBE with psycopg2 via server cursors."""
  71. conn = psycopg2.connect(MATERIALIZED_URL)
  72. conn.set_session(autocommit=True)
  73. with conn.cursor() as cur:
  74. # Create a table with one row of data.
  75. cur.execute("CREATE TABLE psycopg2_subscribe (a int, b text)")
  76. cur.execute("INSERT INTO psycopg2_subscribe VALUES (1, 'a')")
  77. conn.set_session(autocommit=False)
  78. # Start SUBSCRIBE using the binary copy protocol.
  79. cur.execute("DECLARE cur CURSOR FOR SUBSCRIBE psycopg2_subscribe")
  80. cur.execute("FETCH ALL cur")
  81. # Validate the first row, but ignore the timestamp column.
  82. row = cur.fetchone()
  83. if row is not None:
  84. (ts, diff, a, b) = row
  85. self.assertEqual(diff, 1)
  86. self.assertEqual(a, 1)
  87. self.assertEqual(b, "a")
  88. else:
  89. self.fail("row is None")
  90. self.assertEqual(cur.fetchone(), None)
  91. # Insert another row from another connection to simulate an
  92. # update arriving.
  93. with psycopg2.connect(MATERIALIZED_URL) as conn2:
  94. conn2.set_session(autocommit=True)
  95. with conn2.cursor() as cur2:
  96. cur2.execute("INSERT INTO psycopg2_subscribe VALUES (2, 'b')")
  97. # Validate the new row, again ignoring the timestamp column.
  98. cur.execute("FETCH ALL cur")
  99. row = cur.fetchone()
  100. assert row is not None
  101. (ts, diff, a, b) = row
  102. self.assertEqual(diff, 1)
  103. self.assertEqual(a, 2)
  104. self.assertEqual(b, "b")
  105. self.assertEqual(cur.fetchone(), None)
  106. def test_psycopg3_subscribe_copy(self) -> None:
  107. """Test SUBSCRIBE with psycopg3 via its new binary COPY decoding support."""
  108. with psycopg.connect(MATERIALIZED_URL) as conn:
  109. conn.autocommit = True
  110. with conn.cursor() as cur:
  111. # Create a table with one row of data.
  112. cur.execute("CREATE TABLE psycopg3_subscribe_copy (a int, b text)")
  113. cur.execute("INSERT INTO psycopg3_subscribe_copy VALUES (1, 'a')")
  114. conn.autocommit = False
  115. # Start a subscribe using the binary copy protocol.
  116. with cur.copy(
  117. "COPY (SUBSCRIBE psycopg3_subscribe_copy) TO STDOUT (FORMAT BINARY)"
  118. ) as copy:
  119. copy.set_types(
  120. [
  121. psycopg.adapters.types["numeric"].oid, # timestamp
  122. psycopg.adapters.types["int8"].oid, # diff
  123. psycopg.adapters.types["int4"].oid, # a column
  124. psycopg.adapters.types["text"].oid, # b column
  125. ]
  126. )
  127. # Validate the first row, but ignore the timestamp column.
  128. row = copy.read_row()
  129. assert row is not None
  130. (ts, diff, a, b) = row
  131. self.assertEqual(diff, 1)
  132. self.assertEqual(a, 1)
  133. self.assertEqual(b, "a")
  134. # Insert another row from another connection to simulate an
  135. # update arriving.
  136. with psycopg.connect(MATERIALIZED_URL) as conn2:
  137. conn2.autocommit = True
  138. with conn2.cursor() as cur2:
  139. cur2.execute(
  140. "INSERT INTO psycopg3_subscribe_copy VALUES (2, 'b')"
  141. )
  142. # Validate the new row, again ignoring the timestamp column.
  143. row = copy.read_row()
  144. assert row is not None
  145. (ts, diff, a, b) = row
  146. self.assertEqual(diff, 1)
  147. self.assertEqual(a, 2)
  148. self.assertEqual(b, "b")
  149. # The subscribe won't end until we send a cancel request.
  150. conn.cancel()
  151. with self.assertRaises(psycopg.errors.QueryCanceled):
  152. copy.read_row()
  153. def test_psycopg3_subscribe_stream(self) -> None:
  154. """Test subscribe with psycopg3 via its new streaming query support."""
  155. with psycopg.connect(MATERIALIZED_URL) as conn:
  156. conn.autocommit = True
  157. with conn.cursor() as cur:
  158. # Create a table with one row of data.
  159. cur.execute("CREATE TABLE psycopg3_subscribe_stream (a int, b text)")
  160. cur.execute("INSERT INTO psycopg3_subscribe_stream VALUES (1, 'a')")
  161. conn.autocommit = False
  162. # Start a subscribe using the streaming query API.
  163. stream = cur.stream("SUBSCRIBE psycopg3_subscribe_stream")
  164. # Validate the first row, but ignore the timestamp column.
  165. (ts, diff, a, b) = next(stream)
  166. self.assertEqual(diff, 1)
  167. self.assertEqual(a, 1)
  168. self.assertEqual(b, "a")
  169. # Insert another row from another connection to simulate an
  170. # update arriving.
  171. with psycopg.connect(MATERIALIZED_URL) as conn2:
  172. conn2.autocommit = True
  173. with conn2.cursor() as cur2:
  174. cur2.execute(
  175. "INSERT INTO psycopg3_subscribe_stream VALUES (2, 'b')"
  176. )
  177. # Validate the new row, again ignoring the timestamp column.
  178. (ts, diff, a, b) = next(stream)
  179. self.assertEqual(diff, 1)
  180. self.assertEqual(a, 2)
  181. self.assertEqual(b, "b")
  182. # The subscribe won't end until we send a cancel request.
  183. conn.cancel()
  184. with self.assertRaises(psycopg.errors.QueryCanceled):
  185. next(stream)
  186. def test_psycopg3_subscribe_terminate_connection(self) -> None:
  187. """Test terminating a bare subscribe with psycopg3.
  188. This test ensures that Materialize notices a TCP connection close when a
  189. bare SUBSCRIBE statement (i.e., one not wrapped in a COPY statement) is
  190. producing no rows.
  191. """
  192. # Create two connections: one to create a subscription and one to
  193. # query metadata about the subscription.
  194. with psycopg.connect(MATERIALIZED_URL) as metadata_conn:
  195. with psycopg.connect(MATERIALIZED_URL) as subscribe_conn:
  196. try:
  197. metadata_session_id = metadata_conn.pgconn.backend_pid
  198. subscribe_session_id = subscribe_conn.pgconn.backend_pid
  199. # Subscribe to the list of active subscriptions in
  200. # Materialize.
  201. metadata = metadata_conn.cursor().stream(
  202. "SUBSCRIBE (SELECT s.connection_id FROM mz_internal.mz_subscriptions b JOIN mz_internal.mz_sessions s ON s.id = b.session_id)"
  203. )
  204. # Ensure we see our own subscription in `mz_subscriptions`.
  205. (_ts, diff, pid) = next(metadata)
  206. self.assertEqual(int(pid), metadata_session_id)
  207. self.assertEqual(diff, 1)
  208. # Create a dummy subscribe that we know will only ever
  209. # produce a single row, but, as far as Materialize can tell,
  210. # has the potential to produce future updates. This ensures
  211. # the SUBSCRIBE operation will be blocked inside of
  212. # Materialize waiting for more rows.
  213. #
  214. # IMPORTANT: this must use a bare `SUBSCRIBE` statement,
  215. # rather than a `SUBSCRIBE` inside of a `COPY` operation, to
  216. # test the code path that previously had the bug.
  217. stream = subscribe_conn.cursor().stream(
  218. "SUBSCRIBE (SELECT * FROM mz_tables LIMIT 1)"
  219. )
  220. next(stream)
  221. # Ensure we see the dummy subscription added to
  222. # `mz_subscriptions`.
  223. (_ts, diff, pid) = next(metadata)
  224. self.assertEqual(int(pid), subscribe_session_id)
  225. self.assertEqual(diff, 1)
  226. # Kill the dummy subscription by forcibly closing the
  227. # connection.
  228. subscribe_conn.close()
  229. # Ensure we see the dummy subscription removed from
  230. # `mz_subscriptions`.
  231. (_ts, diff, pid) = next(metadata)
  232. self.assertEqual(int(pid), subscribe_session_id)
  233. self.assertEqual(diff, -1)
  234. finally:
  235. # Ensure the connections are always closed, even if an
  236. # assertion fails partway through the test, as otherwise the
  237. # `with` context manager will hang forever waiting for the
  238. # subscribes to gracefully terminate, which they never will.
  239. subscribe_conn.close()
  240. metadata_conn.close()