We want to make it easier for folks to get data into Materialize. Currently the only way is via one of our supported sources, which depending on the environment, can be pretty hard to setup. A common method of publishing data from an application is via webhooks. When some event occurrs, e.g. a new analytics event from Segment, or a new Pull Request on a GitHub repository, an HTTP endpoint you define will get called so you can receive the event in near real-time.
Supporting webhooks in Materialize allows users to easily get their data into MZ, and the low latency of the events works very well with the premise of being an "operational" data warehouse.
Note To achieve the goal of "make it easier to get data in Materialize" we also considered building support for Fivetran. We may still do this in the future, but the amount of work to get webhooks working is significantly less, and the frequency of data is more suited to the goals of Materialize.
CREATE SOURCE <name> FROM WEBHOOK
that allows
users to generate the aforementioned URLs.WEBHOOK
sources.The following are all features we want to support, but are out of scope in the immediate term.
UPSERT
-like behavior.Our implementation will mostly exist in the adapter, and we can re-use existing APIs on the
storage-client
to handle the persistence of data.
We'll update the SQL parser to support our new kind of WEBHOOK
source, and add a new endpoint to
environmentd
's HTTP server /api/webhook/:database/:schema/:name
. When a user creates this new
kind of source, we'll persist the new Source
in the catalog, use the storage-client
to create a
new collection.
When we receive a event we'll use the mz_adapter::Client
that the HTTP server has to send the raw
headers
and body
to the coordinator, pack the data into a row, and append it to the necessary
collection using a new monotonic append API on the StorageController
.
We'll update the parser to support creating a new kind of Source, WEBHOOK
, for which you can
specify a BODY FORMAT
, this denotes how we decode the body of a request. To start we'll support
three formats, BYTES
, JSON
, or TEXT
. By default we will not include the headers of the
request, they can optionally be added by specifying INCLUDE HEADERS
.
CREATE SOURCE <name> FROM WEBHOOK
BODY FORMAT [BYTES | JSON | TEXT]
(INCLUDE HEADERS)?
;
After executing this request, we'll create a source with the following columns:
name | type | optional? |
---|---|---|
body | bytea , jsonb , or text |
No |
headers | map[text => text] |
Yes, present if INCLUDE HEADERS is specified |
The headers
map, if present, maps the name of each header in the
request, converted to lower case, to its value.
We'll add a new endpoint to the existing base_router
in environmentd
with the path: /api/webhook/:database/:schema/:name
. This follows the existing
pattern of our other two endpoints, whose paths also start with /api
. Users can then send events
to their source, using its fully qualified name.
Note: The existing Web Sockets API is at
/api/experimental/ws
, we could move the webhooks API under the path/api/experimental/webhook
, if we think there will be significant breaking changes in the future.
We'll also add a new API mz_adapter::Client::append_webhook
which is very similar to the existing
insert_rows
on the SessionClient
. This new API will send the following to the Coordinator
via a new Command
:
:database
, :schema
, and :name
from the URL path.oneshot::Sender
that can be used to send a response.Note, the reason we'll use the mz_adapter::Client
and not the SessionClient
, is because to push
data to a source we don't need a Session
. Creating and tearing down a Session
adds non-trivial
overhead and also possibly results in unneccessary writes to CockroachDB.
In the coordinator we'll do the following:
database
, schema
, and name
to a webhook source using the Catalog
. Erroring if no
object can be found or the object is not a webhook source.StorageController
.
This would be using the new StorageController::monotonic_appender(...)
detailed below.tokio::task
, do all further work in this task and off the Coordinator
thread.BODY FORMAT
.Row
.(Row, 1)
to the StorageController
, wait for it the data to committed.oneshot::Sender
to respond with success or failure.Today we expose a method append
on the StorageController
trait, I would like to add a new method:
pub async fn monotonic_appender(&self, id: GlobalId) -> CollectionAppender;
This new method would return a struct called CollectionAppender
, which essentially acts as a
oneshot::Sender<Vec<(Row, Diff)>>
. It's a Send
-able and non-Clone
-able struct that has one
method async fn append(self, ...)
where it takes ownership of self
so it can only be used once.
This would be implemented as part of collection_mgmt
and would approximately be structed like:
struct CollectionAppender {
id: GlobalId,
tx: mpsc::Sender<(GlobalId, Vec<(Row, Diff)>)>,
}
In addition to the CollectionAppender
, we'll update the CollectionManager
to receive a
oneshot::Sender<Result<(), StorageError>>
that it can use to optionally notify any listeners when
and if the updates were successfully committed.
Adding the CollectionAppender
introduces a possible correctness issue though:
CollectionAppender
for GlobalId(A)
.GlobalId(A)
.To protect against this we can change the CollectionManager
to return an error via the
oneshot::Sender
if the provided GlobalId no longer exists.
Adding this new API gets us two important properties that the existing append
API does not have:
StorageController
from a thread other than the main
Coordinator
thread.StorageController
to pick the timestamp for the update, instead of the
Coordinator
, which aligns with our long term goals if having a storage owned "push source" in
the future.Every WEBHOOK
source will be open to the entire internet, as such we need to include some sort of
validation so we can verify that requests are legitimate. In general webhooks are validated by
HMAC-ing the request body using SHA256, and validating the result with a signature provided in the
request headers. An issue though is everyone does this just a little bit different:
As such we'll need to support some custom logic for validation. What we can do
is support a CHECK
constraint, like PostgreSQL supports on
tables, which validates the incoming HTTP request
by executing a Boolean-valued scalar expression. As an example:
CHECK (
WITH (
SECRET db.schema.webhook_secret,
HEADERS,
BODY
)
headers['x-signature'] = hmac('sha256', webhook_secret, body)
)
The full syntax for the CHECK
constraint is as follows:
check-option = 'CHECK' '('
['WITH' '(' check-with (',' check-with)* ')']
scalar-expr
')' ;
check-with = (
'SECRET' object-name [AS ident] [BYTES] |
'HEADERS' ['AS' ident] [BYTES] |
'BODY' ['AS' ident] [BYTES]
) ;
The WITH
clause, designed to be reminiscent of a common table expression in a
SELECT
statement, allows the check constraint to gain access to secrets and
to the properties of the incoming HTTP request:
SECRET
clause makes the specified secret available to the constraint
expression via the specified name. If no name is specified for a given
secret, the secret is made available under its item name in the catalog
(e.g., a secret named db.sch.sek
is made avaialable with name sek
). If
the BYTES
option is specified, the secret has type bytea
; otherwise the
secret has type text
.HEADERS
clause makes the HTTP request's headers available to the
constraint expression via the specified name, or headers
if no name is
specified. If the BYTES
option is specified, the headers have type
map[text => bytea]
; otherwise the headers have type map[text => text]
.
Like with INCLUDE HEADERS
, the keys of the map are lowercase header
names.BODY
clause makes the HTTP request's body available to the constraint
expression via the specified name, or body
if no name is specified. If the
BYTES
option is specified, the body has type bytea
; otherwise the body
has type text
.Note
The
BYTES
option may be deferred to future work.
If the constraint expression evaluates to true
, the incoming HTTP request is
accepted. If the constraint expression evalutes to false
or NULL, or produces
an error, the incoming HTTP request is rejected with a 403 Forbidden status
code.
The components of the WITH
clause may be specified in any order, and even
multiple times. For example, the following are all valid:
CHECK (WITH (HEADERS, BODY) ...)
CHECK (WITH (BODY, HEADERS) ...)
CHECK (WITH (BODY AS b1, BODY AS b2) ...)
CHECK (WITH (BODY AS btext, BODY AS bbytes BYTES) ...)
However, to avoid user error, WITH
clauses that define the same name multiple
times will be rejected. For example, the following are all invalid:
CHECK (WITH (HEADERS AS body, BODY) ...)
CHECK (WITH (BODY, BODY) ...)
CHECK (WITH (SECRETS (schema1.foo, schema2.foo)) ...)
Note that we decouple the BODY FORMAT
and INCLUDE HEADERS
options, which
control the request properties that are ultimately stored by the source, from
the WITH
options which control which request properties are available to the
constraint expression. This is intentional. In typical usage, the constraint
expression needs access to an x-signature
header and the body as text
, while
the source itself wants to decode the body as JSON and discard all headers.
Decoupling these options allows the constraint expression to access these
request properties in its desired format, without committing the source to
persisting them in that format.
To ensure that secrets referenced by the expression are not exposed, Materialize
will not provide details about why a request failed validation, neither via the
HTTP response to the webhook request, nor via system catalog tables. This is
necessary because SQL expressions that error can might include the secret
contents in the error message. Consider WITH (SECRET s) s::int4
, which will
produce an error message like invalid input syntax for type integer: invalid
digit found in string: "<secret value>"
.
Internally the provided expression will be turned into a MirScalarExpr
which will be used to
evaluate each request. This evaluation will happen off the main coordinator thread.
To start we'll aim to support at least 100 concurrent requests, in other words, 10 sources could
at one time be processing 10 requests each. We can probably scale further, but this seems like a
sensible limit to start at. We'll do this by adding an axum::middleware
to enforce this limit on our new endpoint. This should be a pretty easy way to prevent a
misbehaving source from taking down all of environmentd
.
We'll also introduce a maximum size the body of each request can be, the default limit will be 2MB.
Both the maximum number of concurrent requests, and max body size, will be configurable via
LaunchDarkly, and later we can introduce SQL syntax so the user can ALTER
these parameters on a
per-source basis.
Note: Some applications provide batching of webhook events. To reduce scope we are not going to support this, our current thought is handling batched requests should come naturally with support for
TRANSFORM USING
.
There are three metrics we should track, to measure success and performance of this new source:
WEBHOOK
sources created.
SELECT count(*) FROM mz_sources WHERE type = 'webhook';
./api/webhook
endpoint.
mz_webhook_source_requests
./api/webhook
endpoint.
mz_webhook_source_request_duration_ms
.A second way to measure success, and get feedback on this new feature, is to use it internally. To start, we plan to dogfood the new source with the following applications:
This internal usage will help us quickly determine if we built the right thing, and how to prioritize further improvements.
For the idea of "making it easier to get data into Materialize", an alternative approach we considered was building Fivetran support. While this is feasible, and something we might pursue in the future, we're not pursuing this path for two reasons:
environmentd
vs clusterd
In the proposed design, environmentd
handles all of the incoming requests, which is not good for
scalability. An alternative is to have a clusterd
handle requests for a given Source, like every
other Source does today. The reason we chose environmentd
to primarily handle webhook requests is
because it is already setup to handle incoming requests from an external network. All of our
other sources are pull based, where we send a request and ask for data. Whereas webhooks are push
based, we receive a request, when there is new data available. Exposing a clusterd
outside of our
network, maybe isn't even something we want to do (e.g. in the future maybe there is a reverse
proxy that talks to clusterd
), and would require significant lift from the Cloud Team. We also
have a separate project in the works that will allow environmentd
to scale horizontally, which
will mitigate these concerns.
An alternative approach is to use a TABLE
for these new sources, and "translate" any incoming
requests to INSERT
statements. There are two reasons we did not chose this approach:
TABLE
to this new
kind of Source isn't entirely clear.INSERT
s into a table are linearizable, whereas requests from a webhook should not be. We can
update TABLE
s to make INSERT
s optionally linearizable, but this work probably isn't worth it
given we also have reason 1.The second alternative is to build this new kind of storage owned Source, which gets installed on a cluster and can do more complex operations everytime a request is received. We plan to build something like this in the future, but at the moment this would require a large amount of work, and we first we want to validate that webhooks are a feature users actually want.
There is plenty of future work that can be done for Webhooks, but by limiting our scope and shipping quickly, we can collect feedback from users to help inform our priorities.
clusterd
primarily handles requests. This would improve our ability to scale, e.g. 100s of
QPS per source, and allow us to build the more complex features listed below.TRANSFORM USING
a given request. Being able to map a JSON (or Avro, protobuf, etc.) body to
a relation at the time of receiving a request, would be a win for the user as it's more ergonomic,
and a win for us to reduce the amount of storage used.ALTER <source>
, i.e. rate and size./api/webhook/abcdefg
. This would allow users to drop and re-create sources without needing to
update external systems.
See https://github.com/MaterializeInc/materialize/pull/20002#discussion_r1234726035 for more
details.VALIDATE STRIPE (SECRET ...)
would handle HMAC-ing the body of the request, and matching it
against the exact header that Stripe expects.Including headers selectively, e.g.:
CREATE SOURCE src FROM WEBHOOK (
INCLUDE HEADERS (
'header-1' AS header_1
'header-2' AS header_2 BYTES
),
CHECK (
WITH (HEADER 'x-signature' AS x_signature)
x_signature ...
)
)
Selectively including headers yields efficiency in two ways. First, unneeded
headers don't need to be packed into a map
for the CHECK
constraint,
which saves a bit of CPU and memory. Second, unneeded headers don't need to
be stored on disk.
As a rough sketch, the north star for this feature in terms of SQL syntax could be:
CREATE SOURCE <name> FROM WEBHOOK
-- parse the body as JSON.
BODY FORMAT JSON,
INCLUDE HEADERS,
-- reject the request if this expression returns False.
CHECK (
WITH (HEADERS)
SELECT headers->>'signature' = sha256hash(body)
),
-- transform the body and headers into a well formed relation, before persisting it.
AS (
SELECT
body->>'id' as id,
body->>'name' as name,
body->>'event_type' as event_type,
body->>'country' as country,
headers->>'X-Application' as application
),
-- create a subsource that contains specific info.
SUBSOURCE <name> (
SELECT
body->>'id' as id,
body->>'ip_addr' as id_address
);