20230615_webhook_source.md 20 KB

Webhook Source

Associated

Context

We want to make it easier for folks to get data into Materialize. Currently the only way is via one of our supported sources, which depending on the environment, can be pretty hard to setup. A common method of publishing data from an application is via webhooks. When some event occurrs, e.g. a new analytics event from Segment, or a new Pull Request on a GitHub repository, an HTTP endpoint you define will get called so you can receive the event in near real-time.

Supporting webhooks in Materialize allows users to easily get their data into MZ, and the low latency of the events works very well with the premise of being an "operational" data warehouse.

Note To achieve the goal of "make it easier to get data in Materialize" we also considered building support for Fivetran. We may still do this in the future, but the amount of work to get webhooks working is significantly less, and the frequency of data is more suited to the goals of Materialize.

Goals

  1. Provide URLs that users can push data to.
  2. Support an extensible syntax similar to CREATE SOURCE <name> FROM WEBHOOK that allows users to generate the aforementioned URLs.
  3. Handle at least a total of 100 requests per-second across all WEBHOOK sources.
  4. Code complete by August 15th, 2023.

Non-Goals

The following are all features we want to support, but are out of scope in the immediate term.

  • Extracting keys from the data for UPSERT-like behavior.
  • Custom filtering of headers.
  • Manipulating the incoming data, splitting a single request into any kind of "sub-sources".
  • Handling batch requests.

Overview

Our implementation will mostly exist in the adapter, and we can re-use existing APIs on the storage-client to handle the persistence of data.

We'll update the SQL parser to support our new kind of WEBHOOK source, and add a new endpoint to environmentd's HTTP server /api/webhook/:database/:schema/:name. When a user creates this new kind of source, we'll persist the new Source in the catalog, use the storage-client to create a new collection.

When we receive a event we'll use the mz_adapter::Client that the HTTP server has to send the raw headers and body to the coordinator, pack the data into a row, and append it to the necessary collection using a new monotonic append API on the StorageController.

Detailed description

We'll update the parser to support creating a new kind of Source, WEBHOOK, for which you can specify a BODY FORMAT, this denotes how we decode the body of a request. To start we'll support three formats, BYTES, JSON, or TEXT. By default we will not include the headers of the request, they can optionally be added by specifying INCLUDE HEADERS.

CREATE SOURCE <name> FROM WEBHOOK
  BODY FORMAT [BYTES | JSON | TEXT]
  (INCLUDE HEADERS)?
;

After executing this request, we'll create a source with the following columns:

name type optional?
body bytea, jsonb, or text No
headers map[text => text] Yes, present if INCLUDE HEADERS is specified

The headers map, if present, maps the name of each header in the request, converted to lower case, to its value.

We'll add a new endpoint to the existing base_router in environmentd with the path: /api/webhook/:database/:schema/:name. This follows the existing pattern of our other two endpoints, whose paths also start with /api. Users can then send events to their source, using its fully qualified name.

Note: The existing Web Sockets API is at /api/experimental/ws, we could move the webhooks API under the path /api/experimental/webhook, if we think there will be significant breaking changes in the future.

We'll also add a new API mz_adapter::Client::append_webhook which is very similar to the existing insert_rows on the SessionClient. This new API will send the following to the Coordinator via a new Command:

  1. The captured :database, :schema, and :name from the URL path.
  2. The body and optionally headers of the request.
  3. A oneshot::Sender that can be used to send a response.

Note, the reason we'll use the mz_adapter::Client and not the SessionClient, is because to push data to a source we don't need a Session. Creating and tearing down a Session adds non-trivial overhead and also possibly results in unneccessary writes to CockroachDB.

In the coordinator we'll do the following:

  1. Map the database, schema, and name to a webhook source using the Catalog. Erroring if no object can be found or the object is not a webhook source.
  2. Get the sending side of a channel that can be used to send new data to the StorageController. This would be using the new StorageController::monotonic_appender(...) detailed below.
  3. Spawn a new tokio::task, do all further work in this task and off the Coordinator thread.
  4. Decode the body of our request according to BODY FORMAT.
  5. Pack our data into a Row.
  6. Send a tuple of (Row, 1) to the StorageController, wait for it the data to committed.
  7. Use the provided oneshot::Sender to respond with success or failure.

Today we expose a method append on the StorageController trait, I would like to add a new method:

pub async fn monotonic_appender(&self, id: GlobalId) -> CollectionAppender;

This new method would return a struct called CollectionAppender, which essentially acts as a oneshot::Sender<Vec<(Row, Diff)>>. It's a Send-able and non-Clone-able struct that has one method async fn append(self, ...) where it takes ownership of self so it can only be used once. This would be implemented as part of collection_mgmt and would approximately be structed like:

struct CollectionAppender {
  id: GlobalId,
  tx: mpsc::Sender<(GlobalId, Vec<(Row, Diff)>)>,
}

In addition to the CollectionAppender, we'll update the CollectionManager to receive a oneshot::Sender<Result<(), StorageError>> that it can use to optionally notify any listeners when and if the updates were successfully committed.

Adding the CollectionAppender introduces a possible correctness issue though:

  1. Create a CollectionAppender for GlobalId(A).
  2. Drop source associated with GlobalId(A).
  3. Try to append data.

To protect against this we can change the CollectionManager to return an error via the oneshot::Sender if the provided GlobalId no longer exists.

Adding this new API gets us two important properties that the existing append API does not have:

  1. The ability to send updates to the StorageController from a thread other than the main Coordinator thread.
  2. Allows the StorageController to pick the timestamp for the update, instead of the Coordinator, which aligns with our long term goals if having a storage owned "push source" in the future.

Request Validation

Every WEBHOOK source will be open to the entire internet, as such we need to include some sort of validation so we can verify that requests are legitimate. In general webhooks are validated by HMAC-ing the request body using SHA256, and validating the result with a signature provided in the request headers. An issue though is everyone does this just a little bit different:

  • Segment signature = HMAC of just the body of the request
  • Buildkite signature = HMAC of "#{timestamp}.#{body}"
  • GitHub signature = "body=" + HMAC of {body}

As such we'll need to support some custom logic for validation. What we can do is support a CHECK constraint, like PostgreSQL supports on tables, which validates the incoming HTTP request by executing a Boolean-valued scalar expression. As an example:

CHECK (
  WITH (
    SECRET db.schema.webhook_secret,
    HEADERS,
    BODY
  )
  headers['x-signature'] = hmac('sha256', webhook_secret, body)
)

The full syntax for the CHECK constraint is as follows:

check-option = 'CHECK' '('
    ['WITH' '(' check-with (',' check-with)* ')']
    scalar-expr
')' ;

check-with = (
    'SECRET' object-name [AS ident] [BYTES] |
    'HEADERS' ['AS' ident] [BYTES] |
    'BODY' ['AS' ident] [BYTES]
) ;

The WITH clause, designed to be reminiscent of a common table expression in a SELECT statement, allows the check constraint to gain access to secrets and to the properties of the incoming HTTP request:

  • The SECRET clause makes the specified secret available to the constraint expression via the specified name. If no name is specified for a given secret, the secret is made available under its item name in the catalog (e.g., a secret named db.sch.sek is made avaialable with name sek). If the BYTES option is specified, the secret has type bytea; otherwise the secret has type text.
  • The HEADERS clause makes the HTTP request's headers available to the constraint expression via the specified name, or headers if no name is specified. If the BYTES option is specified, the headers have type map[text => bytea]; otherwise the headers have type map[text => text]. Like with INCLUDE HEADERS, the keys of the map are lowercase header names.
  • The BODY clause makes the HTTP request's body available to the constraint expression via the specified name, or body if no name is specified. If the BYTES option is specified, the body has type bytea; otherwise the body has type text.

Note

The BYTES option may be deferred to future work.

If the constraint expression evaluates to true, the incoming HTTP request is accepted. If the constraint expression evalutes to false or NULL, or produces an error, the incoming HTTP request is rejected with a 403 Forbidden status code.

The components of the WITH clause may be specified in any order, and even multiple times. For example, the following are all valid:

  • CHECK (WITH (HEADERS, BODY) ...)
  • CHECK (WITH (BODY, HEADERS) ...)
  • CHECK (WITH (BODY AS b1, BODY AS b2) ...)
  • CHECK (WITH (BODY AS btext, BODY AS bbytes BYTES) ...)

However, to avoid user error, WITH clauses that define the same name multiple times will be rejected. For example, the following are all invalid:

  • CHECK (WITH (HEADERS AS body, BODY) ...)
  • CHECK (WITH (BODY, BODY) ...)
  • CHECK (WITH (SECRETS (schema1.foo, schema2.foo)) ...)

Note that we decouple the BODY FORMAT and INCLUDE HEADERS options, which control the request properties that are ultimately stored by the source, from the WITH options which control which request properties are available to the constraint expression. This is intentional. In typical usage, the constraint expression needs access to an x-signature header and the body as text, while the source itself wants to decode the body as JSON and discard all headers. Decoupling these options allows the constraint expression to access these request properties in its desired format, without committing the source to persisting them in that format.

To ensure that secrets referenced by the expression are not exposed, Materialize will not provide details about why a request failed validation, neither via the HTTP response to the webhook request, nor via system catalog tables. This is necessary because SQL expressions that error can might include the secret contents in the error message. Consider WITH (SECRET s) s::int4, which will produce an error message like invalid input syntax for type integer: invalid digit found in string: "<secret value>".

Internally the provided expression will be turned into a MirScalarExpr which will be used to evaluate each request. This evaluation will happen off the main coordinator thread.

Request Limits

To start we'll aim to support at least 100 concurrent requests, in other words, 10 sources could at one time be processing 10 requests each. We can probably scale further, but this seems like a sensible limit to start at. We'll do this by adding an axum::middleware to enforce this limit on our new endpoint. This should be a pretty easy way to prevent a misbehaving source from taking down all of environmentd.

We'll also introduce a maximum size the body of each request can be, the default limit will be 2MB.

Both the maximum number of concurrent requests, and max body size, will be configurable via LaunchDarkly, and later we can introduce SQL syntax so the user can ALTER these parameters on a per-source basis.

Note: Some applications provide batching of webhook events. To reduce scope we are not going to support this, our current thought is handling batched requests should come naturally with support for TRANSFORM USING.

Metrics

There are three metrics we should track, to measure success and performance of this new source:

  1. Number of WEBHOOK sources created.
    • This will be collected via the Prometheus SQL exporter running a query like: SELECT count(*) FROM mz_sources WHERE type = 'webhook';.
  2. Number of requests made to the /api/webhook endpoint.
    • Subdivided by number of successes and failures.
    • This will be a new Prometheus counter called mz_webhook_source_requests.
  3. Response time of the /api/webhook endpoint.
    • This will be a new Prometheus histogram called mz_webhook_source_request_duration_ms.

A second way to measure success, and get feedback on this new feature, is to use it internally. To start, we plan to dogfood the new source with the following applications:

  1. Segment
  2. GitHub
  3. Slack
  4. Buildkite

This internal usage will help us quickly determine if we built the right thing, and how to prioritize further improvements.

Alternatives

Fivetran

For the idea of "making it easier to get data into Materialize", an alternative approach we considered was building Fivetran support. While this is feasible, and something we might pursue in the future, we're not pursuing this path for two reasons:

  1. It requires more work to build.
  2. The at best 5 minute sync time of Fivetran, does not align well with being an "operational" data warehouse.

environmentd vs clusterd

In the proposed design, environmentd handles all of the incoming requests, which is not good for scalability. An alternative is to have a clusterd handle requests for a given Source, like every other Source does today. The reason we chose environmentd to primarily handle webhook requests is because it is already setup to handle incoming requests from an external network. All of our other sources are pull based, where we send a request and ask for data. Whereas webhooks are push based, we receive a request, when there is new data available. Exposing a clusterd outside of our network, maybe isn't even something we want to do (e.g. in the future maybe there is a reverse proxy that talks to clusterd), and would require significant lift from the Cloud Team. We also have a separate project in the works that will allow environmentd to scale horizontally, which will mitigate these concerns.

Tables vs Sources vs What we Chose

An alternative approach is to use a TABLE for these new sources, and "translate" any incoming requests to INSERT statements. There are two reasons we did not chose this approach:

  1. In the future to support features like validation or custom mapping of the request body, we'll need a storage owned Source so we can run this logic. The migration path from a TABLE to this new kind of Source isn't entirely clear.
  2. INSERTs into a table are linearizable, whereas requests from a webhook should not be. We can update TABLEs to make INSERTs optionally linearizable, but this work probably isn't worth it given we also have reason 1.

The second alternative is to build this new kind of storage owned Source, which gets installed on a cluster and can do more complex operations everytime a request is received. We plan to build something like this in the future, but at the moment this would require a large amount of work, and we first we want to validate that webhooks are a feature users actually want.

Future

There is plenty of future work that can be done for Webhooks, but by limiting our scope and shipping quickly, we can collect feedback from users to help inform our priorities.

  1. A clusterd primarily handles requests. This would improve our ability to scale, e.g. 100s of QPS per source, and allow us to build the more complex features listed below.
  2. TRANSFORM USING a given request. Being able to map a JSON (or Avro, protobuf, etc.) body to a relation at the time of receiving a request, would be a win for the user as it's more ergonomic, and a win for us to reduce the amount of storage used.
  3. Subsources and batching. Being able map a single event to multiple independent sources, could be a big win for the ergonomics of the feature.
  4. User configuration of request limits via ALTER <source>, i.e. rate and size.
  5. Consider using some "external ID" that users can use to reference Webhook sources, e.g. /api/webhook/abcdefg. This would allow users to drop and re-create sources without needing to update external systems. See https://github.com/MaterializeInc/materialize/pull/20002#discussion_r1234726035 for more details.
  6. Supporting "recipes" for request validation for common webhook applications. For example, VALIDATE STRIPE (SECRET ...) would handle HMAC-ing the body of the request, and matching it against the exact header that Stripe expects.
  7. Including headers selectively, e.g.:

    CREATE SOURCE src FROM WEBHOOK (
       INCLUDE HEADERS (
           'header-1' AS header_1
           'header-2' AS header_2 BYTES
       ),
       CHECK (
           WITH (HEADER 'x-signature' AS x_signature)
           x_signature ...
       )
    )
    

Selectively including headers yields efficiency in two ways. First, unneeded headers don't need to be packed into a map for the CHECK constraint, which saves a bit of CPU and memory. Second, unneeded headers don't need to be stored on disk.

As a rough sketch, the north star for this feature in terms of SQL syntax could be:

CREATE SOURCE <name> FROM WEBHOOK
  -- parse the body as JSON.
  BODY FORMAT JSON,
  INCLUDE HEADERS,
  -- reject the request if this expression returns False.
  CHECK (
    WITH (HEADERS)
    SELECT headers->>'signature' = sha256hash(body)
  ),
  -- transform the body and headers into a well formed relation, before persisting it.
  AS (
    SELECT
        body->>'id' as id,
        body->>'name' as name,
        body->>'event_type' as event_type,
        body->>'country' as country,
        headers->>'X-Application' as application
  ),
  -- create a subsource that contains specific info.
  SUBSOURCE <name> (
    SELECT
      body->>'id' as id,
      body->>'ip_addr' as id_address
  );

Open questions

  • Security. Users can now push data to Materialize via a publically accessible address. This opens up a new attack vector that we probably need to lock down. For example, maybe we support an allow list or block list of IP addresses. Do we need to have DDOS protection higher up in the stack? We should at least come up with a plan here so we know how we're exposed and prioritize from there.