02-after-clusterd-restart.td 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Use of this software is governed by the Business Source License
  4. # included in the LICENSE file at the root of this repository.
  5. #
  6. # As of the Change Date specified in that file, in accordance with
  7. # the Business Source License, use of this software will be governed
  8. # by the Apache License, Version 2.0.
  9. $ set keyschema={
  10. "type": "record",
  11. "name": "Key",
  12. "fields": [
  13. {"name": "id", "type": "long"}
  14. ]
  15. }
  16. $ set schema={
  17. "type" : "record",
  18. "name" : "envelope",
  19. "fields" : [
  20. {
  21. "name": "before",
  22. "type": [
  23. {
  24. "name": "row",
  25. "type": "record",
  26. "fields": [
  27. {
  28. "name": "id",
  29. "type": "long"
  30. },
  31. {
  32. "name": "creature",
  33. "type": "string"
  34. }]
  35. },
  36. "null"
  37. ]
  38. },
  39. {
  40. "name": "after",
  41. "type": ["row", "null"]
  42. }
  43. ]
  44. }
  45. # TODO - we should verify here that the topic is not being re-read from the beginning,
  46. # but I don't know of any way to do that in Testdrive today.
  47. $ kafka-ingest format=avro topic=dbzupsert-broken-key key-format=avro key-schema=${keyschema} schema=${schema}
  48. {"id": 1} {"before": null, "after": {"row": {"id": 1, "creature": "Tyrannosaurus rex"}}}
  49. {"id": 1} {"before": null, "after": {"row": {"id": 1, "creature": "dragon"}}}
  50. $ kafka-ingest format=avro topic=dbzupsert-broken-value key-format=avro key-schema=${keyschema} schema=${schema}
  51. {"id": 1} {"before": null, "after": {"row": {"id": 1, "creature": "Tyrannosaurus rex"}}}
  52. {"id": 1} {"before": null, "after": {"row": {"id": 1, "creature": "dragon"}}}
  53. # Verify that the errors from before are still there
  54. ! SELECT * FROM upsert_broken_key_tbl
  55. contains: Key decode
  56. ! SELECT * FROM upsert_broken_value_tbl
  57. contains: Value error
  58. # Retract the bad key
  59. $ kafka-ingest format=bytes topic=dbzupsert-broken-key key-format=bytes omit-value=true
  60. broken-key:
  61. > SELECT * FROM upsert_broken_key_tbl
  62. id creature
  63. -----------
  64. 1 dragon
  65. # There is still an error in the other source, due to the bad value.
  66. ! SELECT * FROM upsert_broken_value_tbl
  67. contains: Value error
  68. # Update the bad value
  69. $ kafka-ingest format=avro topic=dbzupsert-broken-value key-format=avro key-schema=${keyschema} schema=${schema}
  70. {"id": 2} {"before": null, "after": {"row": {"id": 2, "creature": "cow"}}}
  71. > SELECT * FROM upsert_broken_value_tbl
  72. id creature
  73. -----------
  74. 1 dragon
  75. 2 cow
  76. # Verify that we still can't query the third source, because of the NULL-key error.
  77. # TODO database-issues#8598
  78. # ! select * from upsert_nullkey_tbl
  79. # contains: record with NULL key in UPSERT source
  80. # Ingest a NULL value for our null key, to retract it.
  81. $ kafka-ingest format=bytes topic=upsert-nullkey key-format=bytes key-terminator=:
  82. :
  83. # Now we should be able to query the source.
  84. > select * from upsert_nullkey_tbl
  85. key text
  86. -------------------
  87. bird1 goose