The Platform V2 project is aiming to increase use-case isolation. The exact definition of use-case isolation in the context of Platform V2 is still being refined, but it can roughly be thought of as isolating the actions of one user from negatively impacting the actions of another user. To put it another way, to what extent can a user pretend that they are the only user of Materialize without noticing performance impacts from other users. Part of the use case isolation work is to make a scalable serving layer. The scalable serving layer needs access to the catalog state and may even want to be able to react to changes in the catalog state.
This design doc describes an interface for getting data out of a durable catalog as well as how to implement this interface with both the stash and persist. Persist is a desirable implementation because it exposes a Listen API which lays the groundwork for responding to catalog changes.
The durable catalog interface can currently be found in
the mz_adapter::catalog::storage
module. At a high level it stores the following types of objects:
max_tables
).We will codify the access and mutation of these objects behind a trait that we will create the following implementations for:
A rough prototype of this design can be found here (it does not include a persist implementation): https://github.com/MaterializeInc/materialize/pull/21071
We will add the following object safe trait in the catalog to describe interacting with the durable catalog state.
NB: async
modifiers and trait bounds (such as Debug
) are left off for easier readability.
pub trait DurableCatalogState {
// Initialization
/// Reports if the catalog state has been initialized.
fn is_initialized(&self) -> Result<bool, Error>;
/// Optionally initialize the catalog if it has not
/// been initialized and perform any migrations needed.
fn open(&mut self) -> Result<(), Error>;
/// Checks to see if opening the catalog would be
/// successful, without making any durable changes.
///
/// Will return an error in the following scenarios:
/// - Catalog not initialized.
/// - Catalog migrations fail.
fn check_open(&self) -> Result<(), Error>;
/// Opens the catalog in read only mode. All mutating methods
/// will return an error.
///
/// If the catalog is uninitialized or requires a migrations, then
/// it will fail to open in read only mode.
fn open_read_only(&mut self) -> Result<(), Error>;
/// Returns the epoch of the current durable catalog state. The epoch acts as
/// a fencing token to prevent split brain issues across two
/// [`DurableCatalogState`]s. When a new [`DurableCatalogState`] opens the
/// catalog, it will increment the epoch by one (or initialize it to some
/// value if there's no existing epoch) and store the value in memory. It's
/// guaranteed that no two [`DurableCatalogState`]s will return the same value
/// for their epoch.
///
/// None is returned if the catalog hasn't been opened yet.
///
/// NB: We may remove this in later iterations of Pv2.
fn epoch(&mut self) -> Option<NonZeroI64>;
// Read
/*
* Each object type will an accessor method of the form
* get_<object-type>s(&self) -> Vec<ObjectType>;
*/
/// Get all clusters.
fn get_clusters(&mut self) -> Result<Vec<Cluster>, Error>;
/*
* Currently, there isn't much need for methods of the form
* get_<object-type>_by_id(&self, id: I) -> ObjectType;
* As we separate the catalog state out from the rest of the
* Coordinator, we will most likely need to add these methods
* which should be fairly straight forward.
*/
// ...
// Write
/// Creates a new durable catalog state transaction.
fn transaction(&mut self) -> Transaction;
/// Commits a durable catalog state transaction.
fn commit_transaction(&mut self, tx: Transaction) -> Result<(), Error>;
/*
* The majority of writes will go through a Transaction.
* However as an optimization it may be useful to have more
* targeted methods. We can add those on a case-by-case basis.
*/
/// Persist mapping from system objects to global IDs and fingerprints.
fn set_system_object_mapping(
&mut self,
mappings: Vec<SystemObjectMapping>,
) -> Result<(), Error>;
// ...
// Misc
/// Confirms that this catalog is connected as the current leader.
///
/// NB: We may remove this in later iterations of Pv2.
fn confirm_leadership(&mut self) -> Result<(), Error>;
/// Dumps the entire catalog contents in human readable JSON.
fn dump(&self) -> Result<String, Error>;
}
We will also add the following methods to initialize the durable catalog state structs. They do not open the catalog, but they may create connections over the network.
fn init_stash_catalog_state(
now: NowFn,
stash_factory: StashFactory,
stash_url: String,
tls: MakeTlsConnector,
) -> Box<dyn DurableCatalogState> {
// ...
}
fn init_persist_catalog_state(
now: NowFn,
shard_id: ShardId,
persist_client: PersistClient,
) -> Box<dyn DurableCatalogState> {
// ...
}
fn init_shadow_catalog_state(
primary_state: Box<dyn DurableCatalogState>,
secondary_state: Box<dyn DurableCatalogState>,
) -> Box<dyn DurableCatalogState> {
// ...
}
These methods are needed to solve a bootstrapping problem. Some catalog state, such as deploy generation, is needed before attempting to open the catalog. So these methods will return unopened catalog states capable of returning only the state we need for bootstrapping.
All current usages of mz_adapter::catalog::storage::Connection
will be updated to
use Box<dyn DurableCatalogState>
. The environmentd
binary will initialize
the Box<dyn DurableCatalogState>
using the methods above depending on command line arguments.
A stash based implementation will be fairly trivial to implement. This interface is already implemented using the stash, we will just need to change the name of some methods.
This implementation is based on Aljoscha's skunk works project. The entire state will be stored in a single persist shard as raw bytes with a tag to differentiate between object types. The existing protobuf infrastructure will be used to serialize and deserialize objects. The implementing struct will maintain a persist write handle, an upper, and an in-memory cache of all the objects. Certain append only object types, like audit logs and storage usage, will not be cached in memory since they are only written to and not read. NB: The storage events are read once at start-time and never read again.
We will use an environments org ID to deterministically generate the persist shard ID. The org ID
and persist shards are both v4 UUIDs, so we could technically use the org ID as the persist shard
ID. However, we'll likely want to modify the ord ID to something like hash(ord_id) + "catalog"
for
nicer looking observability in dashboards and such.
init_persist_catalog_state
:
is_initialized
: Checks if upper is equal to the minimum timestamp.open
:
check_open
:
Ok
.open_read_only
:
boot_ts
: Return boot timestamp from memory.epoch
: This will be stored the same as any other catalog object so this can be treated as a
normal read.All reads (except for the deploy generation) will return an object from the in memory cache.
Reading the deploy generation will perform the following steps:
None
.Reading the deployment generation needs to work before the catalog has been opened and before migrations have been run.
transaction
: Use objects from in memory cache to initialize a new Transaction
.commit_transaction
:
set_X
:
confirm_leadership
: Check that the persist shard upper is equal to the upper cached in memory.
This will use a linearized version of the fetch_recent_upper
method in persist which requires
fetching the latest state from consensus and is therefore a potentially expensive operation.dump
: Convert the in-memory state to a JSON string.The current catalog transactions will remain mostly the same. It works by reading the entire durable catalog state into memory, making changes to the state in memory, then committing all changes at once to the backing durable storage. The few changes needed are:
DurableCatalogState
to commit the Transaction
instead of
the Transaction
committing itself.Transaction
will not hold onto a mutable reference to a Stash
. Though it will need a mutable
reference to some part of DurableCatalogState
to prevent multiple concurrent transactions.TableTransaction
s to use CoW
s.The catalog debug tool is modeled after the stash debug tool and allows users to inspect and modify the durable catalog state. It will provide the following functionality (descriptions are mostly taken from the existing stash debug tool):
edit
or delete
commands.
target
: Write output to specified path. Default to stdout.object-type
: name of the object type to edit.key
: JSON encoded key of object.value
: JSON encoded value of object.object-type
: name of the object type to edit.key
: JSON encoded key of object.environmentd
. Operates without
interfering with it or committing any data to that catalog.
cluster-replica-sizes
: Valid cluster replica sizes.The tool will have two modes:
The tool will initialize a Box<dyn DurableCatalogState>
and use the trait methods to implement
each functionality. dyn DurableCatalogState
is safe because the trait is designed to be object
safe.
dump
method.check_open
method.NB: There is an open issue that you need two different versions of the stash debug tool to perform upgrade checks: https://github.com/MaterializeInc/database-issues/issues/6355. This proposal will not fix that issue and suffers from the exact same issue.
This section is currently hand-wavy and needs the most additional design work.
Stash migration ownership is currently tangled between the adapter crate and the stash crate, see stash: Untangle the stash config collection. Ideally the adapter would be in charge of how and why to migrate the catalog contents while the stash provides the primitives needed to migrate a stash collection.
Once the migrations are de-tangled, at a high level they will take the following steps:
impl From<objects_v(X - 1)::(Object)> for objects_v(X)::(Object)
to convert every object
from their old format into their new format.mz_adapter::catalog::storage::Transaction
to perform any deletions, insertions, or updates
that require read/write semantics instead of mz_stash::transaction::Transaction
.As a bonus, these migrations start to look very similar to the existing catalog migrations. We might be able to combine them into a single logical step instead of having two separate migration frameworks (not counting the builtin migration framework).
fetch_recent_upper
in persist.DurableCatalogTrait
be broken up into two traits? A read only trait and a read/write
trait?tag: u64, raw: bytes
and take advantage
of persist filtering based on the tag?