# Copy to S3 ## The Problem Github issue: [Issue 7256](https://github.com/MaterializeInc/database-issues/issues/2269) As an operational data warehouse, Materialize sits upstream of a traditional data warehouse for our customers. The data brought in to Materialize will serve not only operational workloads, but ultimately analytical ones. This means our customers can benefit from a cheap and user-friendly method to share their data or send it on to the tools purpose-built for their analytical workloads while reducing the need to duplicate transformation logic across operational and analytical data warehouses. A full copy of data from Materialize into S3 is a simple and intuitive way to enable this for our customers. ## Goals * Allow batch exports of data in Materialize to S3 in a format that can be easily ingested into analytical data warehouses. * Provide basic internal and external observability into the frequency, reliability, and size of the batch exports. * Tightly constrain scope so that the initial feature can reach public preview in early Q1 next year. ## Non-goals * Supporting continuous or scheduled exports. * Integrating directly with batch data warehouses, without writing to S3. ## Solution Proposal ### SQL In materialize we already have some support for `COPY` commands. We should expand upon them, keeping them as close to the [postgres syntax](https://www.postgresql.org/docs/current/sql-copy.html). The SQL would look like the following. ```sql COPY -- name of table/mv/source/view or a valid SELECT query TO -- the scalar_expr should give a path like 's3://prefix' WITH ( AWS CONNECTION = aws_conn -- previously created aws connection FORMAT = 'csv', -- file format name MAX FILE SIZE = 16777216 -- max file size in bytes, to limit size of files in s3 ) ``` This option is the most similar to the Postgres `COPY` command syntax. Though note, there is no `COPY` in the SQL standard and Postgres itself used to support a different syntax in the older versions. For reference, this is what an example SQL looks like in other data warehouses and postgres. [Redshift](https://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html) ```sql -- Redshift UNLOAD ('select * from venue') TO 's3://mybucket/unload/' IAM_ROLE 'arn:aws:iam::0123456789012:role/MyRedshiftRole' CSV DELIMITER AS '|'; ``` [Snowflake](https://docs.snowflake.com/en/sql-reference/sql/copy-into-location) ```sql -- Snowflake COPY INTO 's3://mybucket/unload/' FROM mytable STORAGE_INTEGRATION = myint -- ~ equivalent to a materialize connection FILE_FORMAT = (TYPE = csv FIELD_DELIMITER = '|'); ``` [Postgres](https://www.postgresql.org/docs/current/sql-copy.html) ```sql COPY table_name TO 'path/to/file' WITH (FORMAT CSV); ``` Note: We'll not support `ORDER BY` clause in the select query because it will be very expensive. As a workaround when users are using the data as an external table, they can create a view on top with required ordering. #### S3 Path For the s3 path, users should be able to provide an expression like `'s3://prefix/path/' || mz_now()` to help generate the path. This would help as they can run the query in a schedule without having to modify the output path for each run. For individual file names within the directory refer [S3 file names below](#s3-file-names). #### Option `MAX FILE SIZE` For the `MAX FILE SIZE` we can take different approaches. Following is a list of how other data warehouses or databases deal with this. **Snowflake:** Snowflake accepts a similar parameter called [`MAX_FILE_SIZE`](https://docs.snowflake.com/en/sql-reference/sql/copy-into-location#copy-options-copyoptions) where they only accept an integer. **Redshift:** For their `UNLOAD` command Redshift also has [`MAXFILESIZE [AS] max-size [ MB | GB ]`](https://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html#unload-parameters) where they expect the size to be only in MB or GB. It's not entirely clear what's the multiplier here, could not find any Redshift specific documentation for this. Curiously for S3 documentation, they define [1 GB as 2^30 bytes](https://aws.amazon.com/s3/pricing/) (i.e. 1024 multiplier) but in EBS documentation, they do specify correctly that [1 GB is 0.931323 GiB](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-custom-ami-root-volume-size.html). **Postgres:** Postgres does not have any file size option in their `COPY` command. But looking at their [config settings](https://www.postgresql.org/docs/current/config-setting.html#CONFIG-SETTING-NAMES-VALUES), they accept both a number and a string with quotes specifying the unit as per their documentation below. > Numeric with Unit: Some numeric parameters have an implicit unit, because they describe quantities of memory or time. The unit might be bytes, kilobytes, blocks (typically eight kilobytes), milliseconds, seconds, or minutes. An unadorned numeric value for one of these settings will use the setting's default unit, which can be learned from pg_settings.unit. For convenience, settings can be given with a unit specified explicitly, for example '120 ms' for a time value, and they will be converted to whatever the parameter's actual unit is. Note that the value must be written as a string (with quotes) to use this feature. The unit name is case-sensitive, and there can be whitespace between the numeric value and the unit. > Valid memory units are B (bytes), kB (kilobytes), MB (megabytes), GB (gigabytes), and TB (terabytes). The multiplier for memory units is 1024, not 1000. We can take a combination of both the approaches of just accepting the MAX FILE SIZE in bytes along with supporting a string like `'5 GB'` similar to Redshift/Postgres. So the `MAX FILE SIZE` can be either an integer or a parseable string and this will be Postgres compatible as well. Note: Even though Postgres specifies MB not MiB, GB not GiB, the multiplier is 1024 not 1000. To keep things consistent we should have the same multipliers and document it. ### User Experience - User will need either an AWS IAM user (to use credentials) or a AWS IAM role (to use AssumeRole) on their end to set up an [AWS connection](../design/20231110_aws_connections.md). - User needs to create the AWS connection in Materialize to use for `COPY TO ... s3://...` and check with `VALIDATE CONNECTION` that it's working. - User will also need to give the AWS user/role for Materialize the following access to the s3 prefixes so that we can write the data. - `s3:ListBucket`: We need this permission to make sure that we are writing to an empty path. - `s3:PutObject`: This is required to actually upload files to S3. - If they try to run a `COPY` to S3 without all the permissions, then the operation should fail with an error. Note, running a `VALIDATE CONNECTION` does not guarantee that the specific S3 permissions are set up correctly. These can only be validated when the actual `COPY` command is run. - If the S3 path given already has some data in it, we should fail the operation. - When they run the `COPY` command, to keep things simple we'll block the UI till the operation is complete, similar to any other query. - If successful, the output of the `COPY` command should return the number of rows written to S3 similar to a [Postgres `COPY` output](https://www.postgresql.org/docs/current/sql-copy.html) of `COPY `. - If the operation fails midway for some reason or is cancelled by the user, we should show a notice to the users that incomplete data could have been written to the S3 path and will need to be cleaned up. - Since we'll support scalar expressions in the s3 path, user can give a path like .`'s3://prefix/path/' || mz_now()` so that they can run the same query again and again without needing to update the path or clean up previous data. ## Implementation ### Parser #### S3 path We should accept the S3 path as an arbitrary scalar expression which we later convert to a string in the planner. ``` Uri::from_str(lower_uncorrelated().eval().unwrap_str()) ``` #### `MAX FILE SIZE` Option We should introduce a `Memory` type which can take either an integer or a parseable string like `'2 GB'`, similar to the implementation for `Duration`. The multiplier should be 1024 to keep it consistent with Postgres. If the value is not provided we should select an appropriate default. Snowflake uses 16MB as default for their `MAX_FILE_SIZE` whereas Redshift defaults to 6.2GB for similar `MAXFILESIZE` parameter. Also, Snowflake recommends for data in S3 to be used in external tables, file sizes should be under 256MB. Given that, 256MB can be the default for our `MAX FILE SIZE`. ### Which cluster to run things on? Currently existing supported `COPY ... TO STDOUT` command fetches the result to `environmentd`. This will not be advisable for `COPY ... TO S3` as the data size could be large. Therefore, we should trigger a dataflow in a `clusterd` which should eventually write to S3 using user's aws connection details. The next question being, should it be a Storage cluser or a Compute cluster (at least till the time the cluster unification is done). We should use the Compute cluster. Compute already has a concept of ["Compute Sinks"](https://github.com/MaterializeInc/materialize/blob/v0.77.1/src/compute-types/src/sinks.rs#L100-L103), which include Subscribe and Persist (for materialized views). We can create another Compute S3 sink, which will create the required dataflow and then write to S3. Keeping this in a compute cluster also means, - We can easily support `SELECT` queries in a `COPY ... TO S3` command if things are already in compute. - Via the Persist Compute Sink, there's already precedence to talking to S3, we should be able to leverage some existing code. - This also will be consistent with the behaviour that a user can select the compute cluster where they want their query to run like they do for other queries. - Note, in Compute, clusters can have multiple replicas unlike Storage. So all the replicas for a cluster will be running the same dataflow to write to S3 and we should make sure that we are writing the same data and corresponding files from each of them. This will also mean, depending upon the number of replicas we'll end up doing multiple overwrites of the files in S3. Refer [Writing to S3 deterministically](#writing-to-s3-deterministically) below. #### Other Alternative: Storage cluster The other alternative would have been to actually make use of a Storage cluster. If we think of `COPY ... TO S3` as a one time sink, then it might also make sense to have it in the Storage cluster. Especially if the initial scope did not include `SELECT` queries which would have required spinning up compute dataflows. Reasons to not do so, - As mentioned above, we do have to support `SELECT` queries in `COPY ... TO S3`. The only reason it would have made sense to do this in storage would be to do as a quick stop gap if the effort is drastically less. It does not appear to be so. - There could be a possible workaround of running arbitrary `SELECT` queries, if we first save that query as a temporary materialized view first and then use the storage cluster to actually do the writes to S3. - Unlike Compute, Storage does not have a concept of running temporary/one-off dataflows. Everything which runs, like sources or sinks, do so continuously once scheduled till they are dropped. ### `CopyResponse` Similar to [`PeekResponse`](https://github.com/MaterializeInc/materialize/blob/main/src/compute-client/src/protocol/response.rs#L210-L217) we should have the copy command return a `CopyResponse` like shown below. For successful response, it will return the number of rows copied, and return the error string to be shown to the user in case of errors. ``` pub enum CopyResponse { /// Returns number of rows copied. Rows(u64), /// Error of an unsuccessful copy. Error(String), /// The copy was cancelled. Cancelled, } ``` ### Output file format We should start with a single file format and add more if there's more user interest. Currently, there has been request for parquet but CSV support would be easiest to build. Postgres supports `COPY ... TO` with a csv file and we should mimic the file structure for our export as well. With appropriate care to escape NULLs and multi-line values we should be able to have a lossless roundtrip using the CSV format. Eventually we'll need to add some additional formatting options to the SQL like `DELIMITER` (default will be comma), `QUOTE` (default is double quotes) etc. In the initial version we should stick to the default values as per the [Postgres COPY command](https://www.postgresql.org/docs/current/sql-copy.html). ### Is bounded memory possible? I had wrongly assumed that with reading directly from persist we could use backpressure similar to what we do in upsert. As @guswynn correctly pointed out, that wouldn't work due to the following: * For upsert, we read configured amount of data from persist, but we eventually consolidate in rocksdb (or in memory if disk is not enabled) which maintains the upsert state. * In compute, we may be able to fetch data in chunks from persist, but it will still need to be consolidated for the entire shard and this will have to happen in memory. One possibility could be that we actually use disk (with rocksdb) in Compute to keep state similar to upsert and consolidate the data there before writing the snapshot to S3. This seems complicated and I don't think it will be worth the effort. Also, given that we will have arrangements spilling to disk in the future, we'll probably get this for free. Consolidation on read from Persist is not going to be available in this timeline. But once it's available we should be able to update this to use that painlessly. In the meantime we should be careful that we don't introduce any new uses of memory in the implementation for COPY to S3. Also, since backpressure is not feasible right off the bat when reading from materialized view as I had assumed, there's no reason to limit the scope to only support materialized views/tables and we can support arbitrary SELECT queries as well. ### Writing to S3 deterministically * We should make sure that we write and upload the output files to S3 deterministically since each cluster replica would run the dataflow. * To do so, the file name and the contents of the file, both should be deterministic. * We should pick a minimum number of part files, and tell the compute sink how many parts it should have. The number could be the number of workers in a replica. * Then assign records based on a hash function to parts and then exchange the data based on the part to workers. * Each worker will then upload the data in the parts to S3 as mentioned in [Multi part uploads to S3](#multi-part-uploads-to-s3) below. ### Multi part uploads to S3 While writing to S3, we'll need to split the data into multiple files so that each file is under the configured MAX FILE SIZE. Multiple timely workers can do writes in parallel, where each worker would do the [multi part uploads](https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html) to S3. * Each worker starts a multi part upload with a `CreateMultipartUpload` API call to start writing data to a S3 file in the given path for a part ID. * The worker should keep a counter, say `bytes_to_write` to keep track of the aggregate bytes it will write. * The worker should also keep some state to track the number of parts in the upload and the corresponding etags returned from AWS for the uploaded parts. * There should be a configurable value `max_multi_part_bytes_in_memory` which can default to 100MB. This is the max value of the data which the worker will keep in memory before uploading a part to the multi-part upload. * The worker receives data to be written to S3, it checks if the size of the data exceeds configured `max_multi_part_bytes_in_memory`. If so, then the data should be split to keep within the limit. The worker then adds the size of the data to `bytes_to_write` to see if it will exceed MAX FILE SIZE. * If less than MAX FILE SIZE, the worker assigns an incremental part number starting from 1, makes a `UploadPart` call and keeps the returned `ETag` along with the part number in memory. * If more than MAX FILE SIZE, the worker makes a `CompleteMultipartUpload` call to finish the upload of a single file and provides the part number and etags in the request. * Then the worker creates a new multi part request to write to a new S3, resets the `bytes_to_write`, clears the part number with etag mapping in memory and uploads the part for the new multi part call. #### S3 file names With multiple workers writing data to S3 in parallel, the file names can follow a pattern like `/part--0001.csv`, `/part--0002.csv`. ## Rollout and Testing We should put COPY TO S3 behind a feature flag. #### Testing after code is merged - Switch on the feature flag for the staging environment and do an end to end copy to s3 flow in staging. - Try out a scenario where we cancel the copy midway. - Benchmark with material amount of data and measure how long does the COPY take. ## Possible Future work ### Parquet output format As mentioned earlier, there's already user interest for parquet. That can be a follow-up to this feature. We'll need a comprehensive type mapping between parquet and materialize types and we should do a separate design doc for that. Ideally that can be re-used for `COPY FROM S3` as well if we ever want to support that. ### Internal use cases We'll do dogfooding of this feature internally with the following use cases * [Copying out catalog data in the extractor](https://github.com/MaterializeInc/cloud/issues/8261) * [Copying out enriched operational data to our analytical data warehouse](https://github.com/MaterializeInc/operational-analytics/issues/4) ### More observability Postgres reports the progress of an ongoing `COPY` to a `pg_stat_progress_copy`. We can have a similar table to show progress which the user can query to see how far along they are. ### Survive restarts Create a `COPY` job like ``` CREATE JOB copy_job ... CREATE COPY copy_job ... ``` which will create a named catalog object which can survive restarts. ### Partitions We could support partitioning the data in separate folders based upon given partition keys. Depending upon the query pattern on the data warehouse, partitioning can greatly improve performance. ## Open Questions ### Should we write a manifest/progress file? Given this will not be a continuous sink but rather a one-shot copy there's not much progress tracking to be done. Also, we are going to prevent users from running the `COPY` command to a non-empty directory, so there's less chance of the data getting clobbered by separate instances of `COPY` command as well. Nonetheless, it might be beneficial to keep a manifest file like `progress_[0]-[+1].json` listing the name of the part files in the folder. We can resolve this during implementation.