Github issue: Issue 7256
As an operational data warehouse, Materialize sits upstream of a traditional data warehouse for our customers. The data brought in to Materialize will serve not only operational workloads, but ultimately analytical ones. This means our customers can benefit from a cheap and user-friendly method to share their data or send it on to the tools purpose-built for their analytical workloads while reducing the need to duplicate transformation logic across operational and analytical data warehouses. A full copy of data from Materialize into S3 is a simple and intuitive way to enable this for our customers.
In materialize we already have some support for COPY
commands. We should expand upon them, keeping them as close to
the postgres syntax.
The SQL would look like the following.
COPY <object_name or select_query> -- name of table/mv/source/view or a valid SELECT query
TO <scalar_expr_for_s3_path> -- the scalar_expr should give a path like 's3://prefix'
WITH (
AWS CONNECTION = aws_conn -- previously created aws connection
FORMAT = 'csv', -- file format name
MAX FILE SIZE = 16777216 -- max file size in bytes, to limit size of files in s3
)
This option is the most similar to the Postgres COPY
command syntax.
Though note, there is no COPY
in the SQL standard and Postgres itself used to support a
different syntax in the older versions.
For reference, this is what an example SQL looks like in other data warehouses and postgres.
-- Redshift
UNLOAD ('select * from venue')
TO 's3://mybucket/unload/'
IAM_ROLE 'arn:aws:iam::0123456789012:role/MyRedshiftRole'
CSV DELIMITER AS '|';
-- Snowflake
COPY INTO 's3://mybucket/unload/'
FROM mytable
STORAGE_INTEGRATION = myint -- ~ equivalent to a materialize connection
FILE_FORMAT = (TYPE = csv FIELD_DELIMITER = '|');
COPY table_name
TO 'path/to/file'
WITH (FORMAT CSV);
Note: We'll not support ORDER BY
clause in the select query because it will be
very expensive. As a workaround when users are using the data as an external
table, they can create a view on top with required ordering.
For the s3 path, users should be able to provide an expression
like 's3://prefix/path/' || mz_now()
to help generate the path. This would
help as they can run the query in a schedule without having to modify the
output path for each run. For individual file names within the directory
refer S3 file names below.
MAX FILE SIZE
For the MAX FILE SIZE
we can take different approaches. Following is a list
of how other data warehouses or databases deal with this.
Snowflake: Snowflake accepts a similar parameter
called MAX_FILE_SIZE
where they only accept an integer.
Redshift: For their UNLOAD
command Redshift also has
MAXFILESIZE [AS] max-size [ MB | GB ]
where they expect the size to be only in MB or GB. It's not entirely clear what's the multiplier
here, could not find any Redshift specific documentation for this. Curiously for S3 documentation,
they define 1 GB as 2^30 bytes (i.e. 1024 multiplier) but in EBS
documentation, they do specify correctly that
1 GB is 0.931323 GiB.
Postgres: Postgres does not have any file size option in their COPY
command. But looking at
their config settings, they accept both a number and a string with quotes specifying
the unit as per their documentation below.
Numeric with Unit: Some numeric parameters have an implicit unit, because they describe quantities of memory or time. The unit might be bytes, kilobytes, blocks (typically eight kilobytes), milliseconds, seconds, or minutes. An unadorned numeric value for one of these settings will use the setting's default unit, which can be learned from pg_settings.unit. For convenience, settings can be given with a unit specified explicitly, for example '120 ms' for a time value, and they will be converted to whatever the parameter's actual unit is. Note that the value must be written as a string (with quotes) to use this feature. The unit name is case-sensitive, and there can be whitespace between the numeric value and the unit.
Valid memory units are B (bytes), kB (kilobytes), MB (megabytes), GB (gigabytes), and TB (terabytes). The multiplier for memory units is 1024, not 1000.
We can take a combination of both the approaches of just accepting the MAX FILE SIZE in bytes
along with supporting a string like '5 GB'
similar to Redshift/Postgres.
So the MAX FILE SIZE
can be either an integer or a parseable string and this will be
Postgres compatible as well. Note: Even though Postgres specifies MB not MiB, GB not GiB, the
multiplier is 1024 not 1000. To keep things consistent we should have the same multipliers
and document it.
COPY TO ... s3://...
and check with VALIDATE CONNECTION
that it's working.s3:ListBucket
: We need this permission to make sure that we are writing to an empty path.s3:PutObject
: This is required to actually upload files to S3.COPY
to S3 without all the permissions, then the operation should fail
with an error. Note, running a VALIDATE CONNECTION
does not guarantee that the specific S3
permissions are set up correctly. These can only be validated when the actual COPY
command
is run.COPY
command, to keep things simple we'll block the UI till the operation
is complete, similar to any other query.COPY
command should return the number of rows written to S3
similar to a Postgres COPY
output of
COPY <row_count>
.'s3://prefix/path/' || mz_now()
so that they can run the same query again and again without
needing to update the path or clean up previous data.We should accept the S3 path as an arbitrary scalar expression which we later convert to a string in the planner.
Uri::from_str(lower_uncorrelated().eval().unwrap_str())
MAX FILE SIZE
OptionWe should introduce a Memory
type which can take either an integer or a parseable string
like '2 GB'
, similar to the implementation for Duration
. The multiplier should be
1024 to keep it consistent with Postgres. If the value is not provided we should select
an appropriate default.
Snowflake uses 16MB as default for their MAX_FILE_SIZE
whereas Redshift defaults to
6.2GB for similar MAXFILESIZE
parameter. Also, Snowflake recommends for data in S3 to be
used in external tables, file sizes should be under 256MB. Given that, 256MB
can be the default for our MAX FILE SIZE
.
Currently existing supported COPY ... TO STDOUT
command fetches the result to environmentd
.
This will not be advisable for COPY ... TO S3
as the data size could be large. Therefore,
we should trigger a dataflow in a clusterd
which should eventually write to S3 using
user's aws connection details. The next question being, should it be a Storage cluser or a
Compute cluster (at least till the time the cluster unification is done).
We should use the Compute cluster. Compute already has a concept of "Compute Sinks", which include Subscribe and Persist (for materialized views). We can create another Compute S3 sink, which will create the required dataflow and then write to S3. Keeping this in a compute cluster also means,
SELECT
queries in a COPY ... TO S3
command if things are
already in compute.The other alternative would have been to actually make use of a Storage cluster. If we think
of COPY ... TO S3
as a one time sink, then it might also make sense to have it in the Storage
cluster. Especially if the initial scope did not include SELECT
queries which
would have required spinning up compute dataflows.
Reasons to not do so,
SELECT
queries in COPY ... TO S3
. The
only reason it would have made sense to do this in storage would be to do as a quick stop gap
if the effort is drastically less. It does not appear to be so.SELECT
queries, if we first save
that query as a temporary materialized view first and then use the storage cluster to
actually do the writes to S3.CopyResponse
Similar to PeekResponse
we should have the copy command return a CopyResponse
like shown below. For successful response,
it will return the number of rows copied, and return the error string to be shown to the user in case of errors.
pub enum CopyResponse {
/// Returns number of rows copied.
Rows(u64),
/// Error of an unsuccessful copy.
Error(String),
/// The copy was cancelled.
Cancelled,
}
We should start with a single file format and add more if there's more user interest. Currently, there has been request for parquet but CSV support would be easiest to build.
Postgres supports COPY ... TO
with a csv file and we should mimic the file structure for our
export as well. With appropriate care to escape NULLs and multi-line values we should be able
to have a lossless roundtrip using the CSV format.
Eventually we'll need to add some additional formatting options to the SQL like
DELIMITER
(default will be comma), QUOTE
(default is double quotes) etc. In the initial
version we should stick to the default values as per the
Postgres COPY command.
I had wrongly assumed that with reading directly from persist we could use backpressure similar to what we do in upsert. As @guswynn correctly pointed out, that wouldn't work due to the following:
One possibility could be that we actually use disk (with rocksdb) in Compute to keep state similar to upsert and consolidate the data there before writing the snapshot to S3. This seems complicated and I don't think it will be worth the effort. Also, given that we will have arrangements spilling to disk in the future, we'll probably get this for free.
Consolidation on read from Persist is not going to be available in this timeline. But once it's available we should be able to update this to use that painlessly. In the meantime we should be careful that we don't introduce any new uses of memory in the implementation for COPY to S3.
Also, since backpressure is not feasible right off the bat when reading from materialized view as I had assumed, there's no reason to limit the scope to only support materialized views/tables and we can support arbitrary SELECT queries as well.
While writing to S3, we'll need to split the data into multiple files so that each file is under the configured MAX FILE SIZE. Multiple timely workers can do writes in parallel, where each worker would do the multi part uploads to S3.
CreateMultipartUpload
API call to start
writing data to a S3 file in the given path for a part ID.bytes_to_write
to keep track of the aggregate bytes
it will write.max_multi_part_bytes_in_memory
which can default to 100MB.
This is the max value of the data which the worker will keep in memory before uploading
a part to the multi-part upload.max_multi_part_bytes_in_memory
. If so, then the data should be split to keep
within the limit. The worker then adds the size of the data to bytes_to_write
to see if
it will exceed MAX FILE SIZE.UploadPart
call and keeps the returned ETag
along with the part number in memory.CompleteMultipartUpload
call to finish
the upload of a single file and provides the part number and etags in the request.bytes_to_write
, clears the part number with etag mapping in memory and uploads the part for
the new multi part call.With multiple workers writing data to S3 in parallel, the file names can follow a pattern
like <prefix>/part-<part-id>-0001.csv
, <prefix>/part-<part-id>-0002.csv
.
We should put COPY TO S3 behind a feature flag.
As mentioned earlier, there's already user interest for parquet. That can be a follow-up to this
feature. We'll need a comprehensive type mapping between parquet and materialize types and we
should do a separate design doc for that. Ideally that can be re-used for COPY FROM S3
as well
if we ever want to support that.
We'll do dogfooding of this feature internally with the following use cases
Postgres reports the progress of an ongoing COPY
to a pg_stat_progress_copy
. We can have a
similar table to show progress which the user can query to see how far along they are.
Create a COPY
job like
CREATE JOB copy_job ...
CREATE COPY copy_job ...
which will create a named catalog object which can survive restarts.
We could support partitioning the data in separate folders based upon given partition keys. Depending upon the query pattern on the data warehouse, partitioning can greatly improve performance.
Given this will not be a continuous sink but rather a one-shot copy there's not much progress
tracking to be done. Also, we are going to prevent users from running the COPY
command
to a non-empty directory, so there's less chance of the data getting clobbered by separate
instances of COPY
command as well. Nonetheless, it might be beneficial to keep a manifest
file like progress_[0]-[<as_of>+1].json
listing the name of the part files in the folder.
We can resolve this during implementation.