S3 and in the future, multi-file, GCS, and other sources have multiple independent objects that may have their own headers.
For example, imagine an S3 bucket that has historical data encoded as CSV formatted files, one file per day. Each CSV file has its own header line, which is semantically meaningful but should not be included as data rows.
This feature adds support for decoding multiple CSV objects without garbage from headers making it into query results, and lays the groundwork for future file types (e.g. Avro OCF, Parquet) that all require their own header lines.
Supporting decoding multiple CSV headers within a framework that will function for future multi-object sources.
The first version of this will only support the CSV format, Avro OCF objects should be extendable with the current work, but will be prioritized separately.
User-facing changes are limited to an expansion of the ability to combine
existing formats with S3 sources, for example FORMAT CSV WITH HEADER
or Avro
Object Container Files.
When a user specifies a source format that has a header, we will obtain an
arbitrary file from the source declaration and inspect its header, making it the
canonical schema. All files must have exactly the same schema -- if any are
encountered that do not match, the dataflow will be put into an error state. If
it is not immediately possible to determine the header, the CREATE SOURCE
invocation will fail.
Internally, there are only a few changes that need to be made.
We will extend the purification strategy from existing formats into their combination with S3, introducing new syntax for the CSV format that explicitly lists all column names, which will be the purification target for all CSV format clauses.
The new syntax is WITH HEADER COLUMNS (colname (, colname)*)
, making the full
pseudo-grammar for CSV format:
See examples below.
A CSV source containing files with header lines like id,value
as:
CREATE SOURCE example
FROM S3 DISCOVER OBJECTS USING BUCKET SCAN 'bucket'
FORMAT CSV WITH HEADER;
will be rewritten via purify to:
CREATE SOURCE example
FROM S3 DISCOVER OBJECTS USING USING BUCKET SCAN 'bucket'
FORMAT CSV WITH HEADER COLUMNS (id, value);
We preserve column aliases if present, so the following statement with the same
file containing id,value
header line:
CREATE SOURCE example (a, b)
FROM S3 DISCOVER OBJECTS USING BUCKET SCAN 'bucket'
FORMAT CSV WITH HEADER;
will be rewritten to:
CREATE SOURCE example (a, b)
FROM S3 DISCOVER OBJECTS USING USING BUCKET SCAN 'bucket'
FORMAT CSV WITH HEADER COLUMNS (id, value);
and the columns will have the names "a" and "b". If a source file or object is
encountered that has a header column that does not match the COLUMNS (name,
..)
declaration, the dataflow will be put in an error state. But see future
work below for how we may handle this more gracefully in the future.
As part of purification any source that must read a header will fail immediately if it is not possible to determine the header.
Consider, the following create source statement will be rejected if it is not immediately possible to determine the headers because there is no object in the queue:
CREATE SOURCE example
FROM S3 DISCOVER OBJECTS USING SQS NOTIFICATIONS 'queuename'
FORMAT CSV WITH HEADER;
it will instead require that users specify the column names using the following syntax:
CREATE SOURCE example
FROM S3 DISCOVER OBJECTS USING SQS NOTIFICATIONS 'queuename'
FORMAT CSV WITH HEADER COLUMNS (id, value);
All syntaxes for CSV-formatted sources (WITH n COLUMNS
without an alias, WITH
n COLUMNS
with aliases, WITH HEADER
) will always be rewritten to include
their column names columns syntax inside the catalog. This will either use
column aliases for bare numbered column syntax, or the HEADER COLUMNS
syntax
for files with headers.
This means that the following create source statement:
CREATE SOURCE example
FROM ...
FORMAT CSV WITH 2 COLUMNS;
will be rewritten to:
CREATE SOURCE example (column1, column2)
FROM ...
FORMAT CSV WITH 2 COLUMNS;
If both column aliases and the named column syntax is used simultaneously the header columns will be verified against the source files/objects, and the columns presented into the dataflow will get the names from the column aliases.
For some use cases -- imagine an ETL job that has added or dropped columns over time -- it would be useful to support only a few required fields, or allowing a subset or reordering of fields.
Supporting this is not planned for the initial implementation, but may be considered if it is requested.
In particular, we may add modes that support any combination of the following:
FORMAT CSV WITH COLUMNS (a, b)
will support CSV
files with headers like a,b
or b,a
FORMAT CSV WITH COLUMNS (a, b)
will support CSV
files with headers like a,b,c,d
For S3 sources, it's possible to specify that the only objects to ingest are the ones specified in an SQS queue. For this use case, reading from the SQS queue in order to determine the header columns, and it's possible that the SQS queue does not have any objects inside of it at source creation time.
Currently, we intend to require that for this case the schema or header be included literally in the create source declaration, the CREATE SOURCE statement will error if it is not immediately possible to determine column names.
If the S3 source encounters a CSV object that does not match the create source schema we will put its dataflow into an error state to prevent users from reading invalid data. Currently there is no way to recover from this state. There are a few options:
Neither of those can handle a CSV object that has a subset of rows that are incorrect, like the following:
a,b
1,2
3,4
5,6,7
8,9
I don't believe that we have any plans for resolving this specific kind of invalid data, though.