SmokeTest.cs 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. // Copyright Materialize, Inc. and contributors. All rights reserved.
  2. //
  3. // Use of this software is governed by the Business Source License
  4. // included in the LICENSE file at the root of this repository.
  5. //
  6. // As of the Change Date specified in that file, in accordance with
  7. // the Business Source License, use of this software will be governed
  8. // by the Apache License, Version 2.0.
  9. using Npgsql;
  10. using NUnit.Framework;
  11. using System;
  12. using System.Threading;
  13. namespace csharp
  14. {
  15. public class Tests
  16. {
  17. private NpgsqlConnection OpenConnection() {
  18. var conn = new NpgsqlConnection("host=materialized;port=6875;database=materialize;username=materialize");
  19. conn.Open();
  20. return conn;
  21. }
  22. [Test]
  23. public void BasicQuery()
  24. {
  25. using var conn = OpenConnection();
  26. using var cmd = new NpgsqlCommand("SELECT 42::int8", conn);
  27. using var reader = cmd.ExecuteReader();
  28. while (reader.Read())
  29. {
  30. Assert.AreEqual(42, reader.GetValue(0));
  31. }
  32. }
  33. [Test]
  34. public void BasicSubscribe() {
  35. using var conn = OpenConnection();
  36. // Create a table with one row of data.
  37. new NpgsqlCommand("CREATE TABLE t (a int, b text)", conn).ExecuteNonQuery();
  38. new NpgsqlCommand("INSERT INTO t VALUES (1, 'a')", conn).ExecuteNonQuery();
  39. var txn = conn.BeginTransaction();
  40. new NpgsqlCommand("DECLARE c CURSOR FOR SUBSCRIBE t", conn, txn).ExecuteNonQuery();
  41. using (var cmd = new NpgsqlCommand("FETCH ALL c", conn, txn))
  42. using (var reader = cmd.ExecuteReader())
  43. {
  44. Assert.IsTrue(reader.Read());
  45. Assert.AreEqual(1, reader[1]); // diff
  46. Assert.AreEqual(1, reader[2]); // a
  47. Assert.AreEqual("a", reader[3]); // b
  48. Assert.IsFalse(reader.Read());
  49. }
  50. // Insert another row from another connection to simulate an update
  51. // arriving.
  52. using (var conn2 = OpenConnection()) {
  53. new NpgsqlCommand("INSERT INTO t VALUES (2, 'b')", conn2).ExecuteNonQuery();
  54. }
  55. using (var cmd = new NpgsqlCommand("FETCH ALL c", conn, txn))
  56. using (var reader = cmd.ExecuteReader())
  57. {
  58. Assert.IsTrue(reader.Read());
  59. Assert.AreEqual(1, reader[1]); // diff
  60. Assert.AreEqual(2, reader[2]); // a
  61. Assert.AreEqual("b", reader[3]); // b
  62. Assert.IsFalse(reader.Read());
  63. }
  64. txn.Commit();
  65. new NpgsqlCommand("DROP TABLE t", conn).ExecuteNonQuery();
  66. }
  67. [Test]
  68. public void CopySubscribe() {
  69. using var conn = OpenConnection();
  70. // Create a table with one row of data.
  71. new NpgsqlCommand("CREATE TABLE t (a int, b text)", conn).ExecuteNonQuery();
  72. new NpgsqlCommand("INSERT INTO t VALUES (1, 'a')", conn).ExecuteNonQuery();
  73. // Start a subscribe using the binary copy protocol.
  74. var reader = conn.BeginBinaryExport("COPY (SUBSCRIBE t) TO STDOUT (FORMAT BINARY)");
  75. // Validate the first row.
  76. Assert.AreEqual(4, reader.StartRow());
  77. reader.Read<decimal>(); // ignore timestamp column
  78. Assert.AreEqual(1, reader.Read<long>()); // diff column
  79. Assert.AreEqual(1, reader.Read<int>()); // a column
  80. Assert.AreEqual("a", reader.Read<string>()); // b column
  81. // Wait 2s so that the 1s NoticeResponse "test that the connection is still
  82. // alive" check triggers. This verifies Npgsql can successfully ignore the
  83. // NoticeResponse.
  84. Thread.Sleep(2000);
  85. // Insert another row from another connection to simulate an update
  86. // arriving.
  87. using (var conn2 = OpenConnection()) {
  88. new NpgsqlCommand("INSERT INTO t VALUES (2, 'b')", conn2).ExecuteNonQuery();
  89. }
  90. // Validate the new row.
  91. Assert.AreEqual(4, reader.StartRow());
  92. reader.Read<decimal>(); // ignore timestamp column
  93. Assert.AreEqual(1, reader.Read<long>()); // diff column
  94. Assert.AreEqual(2, reader.Read<int>()); // a column
  95. Assert.AreEqual("b", reader.Read<string>()); // b column
  96. // The subscribe won't end until we send a cancel request.
  97. reader.Cancel();
  98. // Ensure the COPY has ended after being canceled.
  99. Assert.Throws<OperationCanceledException>(delegate { reader.StartRow(); });
  100. new NpgsqlCommand("DROP TABLE t", conn).ExecuteNonQuery();
  101. }
  102. }
  103. }