amazon-rds.md 23 KB


title: "Ingest data from Amazon RDS" description: "How to stream data from Amazon RDS for PostgreSQL to Materialize" aliases:

  • /guides/cdc-postgres/
  • /integrations/cdc-postgres/
  • /connect-sources/cdc-postgres-direct/
  • /ingest-data/postgres-amazon-rds/ menu: main: parent: "postgresql" name: "Amazon RDS" identifier: "pg-amazon-rds" ---

This page shows you how to stream data from Amazon RDS for PostgreSQL to Materialize using the PostgreSQL source.

{{< tip >}} {{< guided-tour-blurb-for-ingest-data >}} {{< /tip >}}

Before you begin

{{% postgres-direct/before-you-begin %}}

A. Configure Amazon RDS

1. Enable logical replication

Materialize uses PostgreSQL's logical replication protocol to track changes in your database and propagate them to Materialize.

As a first step, you need to make sure logical replication is enabled.

  1. As a user with the rds_superuser role, use psql (or your preferred SQL client) to connect to your database.

  2. Check if logical replication is enabled:

    SELECT name, setting
      FROM pg_settings
      WHERE name = 'rds.logical_replication';
    

            name             | setting
    -------------------------+---------
    rds.logical_replication  | off
    (1 row)
    
  3. Using the AWS Management Console, create a DB parameter group in RDS.

    • Set Parameter group family to your PostgreSQL version.
    • Set Type to DB Parameter Group.
    • Set Engine type to PostgreSQL.
  4. Edit the new parameter group and set the rds.logical_replication parameter to 1.

  5. Associate the DB parameter group with your database.

    Use the Apply Immediately option to immediately reboot your database and apply the change. Keep in mind that rebooting the RDS instance can affect database performance.

    Do not move on to the next step until the database Status is Available in the RDS Console.

  6. Back in the SQL client connected to PostgreSQL, verify that replication is now enabled:

    SELECT name, setting
      FROM pg_settings
      WHERE name = 'rds.logical_replication';
    

            name             | setting
    -------------------------+---------
    rds.logical_replication  | on
    (1 row)
    

    If replication is still not enabled, reboot the database.

2. Create a publication and a replication user

{{% postgres-direct/create-a-publication-aws %}}

B. (Optional) Configure network security

{{< note >}} If you are prototyping and your RDS instance is publicly accessible, you can skip this step. For production scenarios, we recommend configuring one of the network security options below. {{< /note >}}

There are various ways to configure your database's network to allow Materialize to connect:

  • Allow Materialize IPs: If your database is publicly accessible, you can configure your database's security group to allow connections from a set of static Materialize IP addresses.

  • Use AWS PrivateLink: If your database is running in a private network, you can use AWS PrivateLink to connect Materialize to the database. For details, see AWS PrivateLink.

  • Use an SSH tunnel: If your database is running in a private network, you can use an SSH tunnel to connect Materialize to the database.

{{< tabs >}}

{{< tab "Allow Materialize IPs">}}

  1. In the SQL Shell, or your preferred SQL client connected to Materialize, find the static egress IP addresses for the Materialize region you are running in:

    SELECT * FROM mz_egress_ips;
    
  2. In the AWS Management Console, add an inbound rule to your RDS security group for each IP address from the previous step.

    In each rule:

    • Set Type to PostgreSQL.
    • Set Source to the IP address in CIDR notation.

{{< /tab >}}

{{< tab "Use AWS PrivateLink">}}

AWS PrivateLink lets you connect Materialize to your RDS instance without exposing traffic to the public internet. To use AWS PrivateLink, you create a network load balancer in the same VPC as your RDS instance and a VPC endpoint service that Materialize connects to. The VPC endpoint service then routes requests from Materialize to RDS via the network load balancer.

{{< note >}} Materialize provides a Terraform module that automates the creation and configuration of AWS resources for a PrivateLink connection. For more details, see the Terraform module repository. {{</ note >}}

  1. Get the IP address of your RDS instance. You'll need this address to register your RDS instance as the target for the network load balancer in the next step.

    To get the IP address of your RDS instance:

    1. Select your database in the RDS Console.

    2. Find your RDS endpoint under Connectivity & security.

    3. Use the dig or nslooklup command to find the IP address that the endpoint resolves to:

      dig +short <RDS_ENDPOINT>
      
  2. Create a dedicated target group for your RDS instance.

    • Choose the IP addresses type.

    • Set the protocol and port to TCP and 5432.

    • Choose the same VPC as your RDS instance.

    • Use the IP address from the previous step to register your RDS instance as the target.

    Warning: The IP address of your RDS instance can change without notice. For this reason, it's best to set up automation to regularly check the IP of the instance and update your target group accordingly. You can use a lambda function to automate this process - see Materialize's Terraform module for AWS PrivateLink for an example. Another approach is to configure an EC2 instance as an RDS router for your network load balancer.

  3. Create a network load balancer.

    • For Network mapping, choose the same VPC as your RDS instance and select all of the availability zones and subnets that you RDS instance is in.

    • For Listeners and routing, set the protocol and port to TCP and 5432 and select the target group you created in the previous step.

  4. In the security group of your RDS instance, allow traffic from the network load balancer.

    If client IP preservation is disabled, the easiest approach is to add an inbound rule with the VPC CIDR of the network load balancer. If you don't want to grant access to the entire VPC CIDR, you can add inbound rules for the private IP addresses of the load balancer subnets.

    • To find the VPC CIDR, go to your network load balancer and look under Network mapping.
    • To find the private IP addresses of the load balancer subnets, go to Network Interfaces, search for the name of the network load balancer, and look on the Details tab for each matching network interface.
  5. Create a VPC endpoint service.

    • For Load balancer type, choose Network and then select the network load balancer you created in the previous step.

    • After creating the VPC endpoint service, note its Service name. You'll use this service name when connecting Materialize later.

    Remarks: By disabling Acceptance Required, while still strictly managing who can view your endpoint via IAM, Materialze will be able to seamlessly recreate and migrate endpoints as we work to stabilize this feature.

  6. Go back to the target group you created for the network load balancer and make sure that the health checks are reporting the targets as healthy.

{{< /tab >}}

{{< tab "Use an SSH tunnel">}}

To create an SSH tunnel from Materialize to your database, you launch an instance to serve as an SSH bastion host, configure the bastion host to allow traffic only from Materialize, and then configure your database's private network to allow traffic from the bastion host.

{{< note >}} Materialize provides a Terraform module that automates the creation and configuration of resources for an SSH tunnel. For more details, see the Terraform module repository. {{</ note >}}

  1. Launch an EC2 instance to serve as your SSH bastion host.

    • Make sure the instance is publicly accessible and in the same VPC as your RDS instance.
    • Add a key pair and note the username. You'll use this username when connecting Materialize to your bastion host.

    Warning: Auto-assigned public IP addresses can change in certain cases.

    For this reason, it's best to associate an elastic IP address to your bastion host.

  2. Configure the SSH bastion host to allow traffic only from Materialize.

    1. In the Materialize console's SQL Shell, or your preferred SQL client connected to Materialize, get the static egress IP addresses for the Materialize region you are running in:

      SELECT * FROM mz_egress_ips;
      
    2. For each static egress IP, add an inbound rule to your SSH bastion host's security group.

      In each rule:

      • Set Type to PostgreSQL.
      • Set Source to the IP address in CIDR notation.
  3. In the security group of your RDS instance, add an inbound rule to allow traffic from the SSH bastion host.

    • Set Type to All TCP.
    • Set Source to Custom and select the bastion host's security group.

{{< /tab >}}

{{< /tabs >}}

C. Ingest data in Materialize

1. (Optional) Create a cluster

{{< note >}} If you are prototyping and already have a cluster to host your PostgreSQL source (e.g. quickstart), you can skip this step. For production scenarios, we recommend separating your workloads into multiple clusters for resource isolation. {{< /note >}}

{{% postgres-direct/create-a-cluster %}}

2. Start ingesting data

Now that you've configured your database network and created an ingestion cluster, you can connect Materialize to your PostgreSQL database and start ingesting data. The exact steps depend on your networking configuration, so start by selecting the relevant option.

{{< tabs >}}

{{< tab "Allow Materialize IPs">}}

  1. In the SQL Shell, or your preferred SQL client connected to Materialize, use the CREATE SECRET command to securely store the password for the materialize PostgreSQL user you created earlier:

    CREATE SECRET pgpass AS '<PASSWORD>';
    
  2. Use the CREATE CONNECTION command to create a connection object with access and authentication details for Materialize to use:

    CREATE CONNECTION pg_connection TO POSTGRES (
      HOST '<host>',
      PORT 5432,
      USER 'materialize',
      PASSWORD SECRET pgpass,
      SSL MODE 'require',
      DATABASE '<database>'
      );
    
    • Replace <host> with your RDS endpoint. To find your RDS endpoint, select your database in the RDS Console, and look under Connectivity & security.

    • Replace <database> with the name of the database containing the tables you want to replicate to Materialize.

  3. Use the CREATE SOURCE command to connect Materialize to your RDS instance and start ingesting data from the publication you created earlier:

    CREATE SOURCE mz_source
      IN CLUSTER ingest_postgres
      FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
      FOR ALL TABLES;
    

    By default, the source will be created in the active cluster; to use a different cluster, use the IN CLUSTER clause. To ingest data from specific schemas or tables in your publication, use FOR SCHEMAS (<schema1>,<schema2>) or FOR TABLES (<table1>, <table2>) instead of FOR ALL TABLES.

  4. After source creation, you can handle upstream schema changes for specific replicated tables using the ALTER SOURCE...ADD SUBSOURCE and DROP SOURCE syntax.

{{< /tab >}}

{{< tab "Use AWS PrivateLink">}}

  1. In the SQL Shell, or your preferred SQL client connected to Materialize, use the CREATE CONNECTION command to create an in-region or cross-region AWS PrivateLink connection.

    ↕️ In-region connections

    To connect to an AWS PrivateLink endpoint service in the same region as your Materialize environment:

      CREATE CONNECTION privatelink_svc TO AWS PRIVATELINK (
        SERVICE NAME 'com.amazonaws.vpce.<region_id>.vpce-svc-<endpoint_service_id>',
        AVAILABILITY ZONES ('use1-az1', 'use1-az2', 'use1-az4')
      );
    
    • Replace the SERVICE NAME value with the service name you noted earlier.

    • Replace the AVAILABILITY ZONES list with the IDs of the availability zones in your AWS account. For in-region connections the availability zones of the NLB and the consumer VPC must match.

    To find your availability zone IDs, select your database in the RDS Console and click the subnets under Connectivity & security. For each subnet, look for Availability Zone ID (e.g., use1-az6), not Availability Zone (e.g., us-east-1d).

    ↔️ Cross-region connections

    To connect to an AWS PrivateLink endpoint service in a different region to the one where your Materialize environment is deployed:

      CREATE CONNECTION privatelink_svc TO AWS PRIVATELINK (
        SERVICE NAME 'com.amazonaws.vpce.us-west-1.vpce-svc-<endpoint_service_id>',
        -- For now, the AVAILABILITY ZONES clause **is** required, but will be
        -- made optional in a future release.
        AVAILABILITY ZONES ()
      );
    
    • Replace the SERVICE NAME value with the service name you noted earlier.

    • The service name region refers to where the endpoint service was created. You do not need to specify AVAILABILITY ZONES manually — these will be optimally auto-assigned when none are provided.

  2. Retrieve the AWS principal for the AWS PrivateLink connection you just created:

    SELECT principal
    FROM mz_aws_privatelink_connections plc
    JOIN mz_connections c ON plc.id = c.id
    WHERE c.name = 'privatelink_svc';
    

                                     principal
    ---------------------------------------------------------------------------
     arn:aws:iam::664411391173:role/mz_20273b7c-2bbe-42b8-8c36-8cc179e9bbc3_u1
    
  3. Update your VPC endpoint service to accept connections from the AWS principal.

  4. If your AWS PrivateLink service is configured to require acceptance of connection requests, manually approve the connection request from Materialize.

    Note: It can take some time for the connection request to show up. Do not move on to the next step until you've approved the connection.

  5. Validate the AWS PrivateLink connection you created using the VALIDATE CONNECTION command:

    VALIDATE CONNECTION privatelink_svc;
    

    If no validation error is returned, move to the next step.

  6. Use the CREATE SECRET command to securely store the password for the materialize PostgreSQL user you created earlier:

    CREATE SECRET pgpass AS '<PASSWORD>';
    
  7. Use the CREATE CONNECTION command to create another connection object, this time with database access and authentication details for Materialize to use:

    CREATE CONNECTION pg_connection TO POSTGRES (
      HOST '<host>',
      PORT 5432,
      USER 'materialize',
      PASSWORD SECRET pgpass,
      DATABASE '<database>',
      AWS PRIVATELINK privatelink_svc
      );
    
    • Replace <host> with your RDS endpoint. To find your RDS endpoint, select your database in the RDS Console, and look under Connectivity & security.

    • Replace <database> with the name of the database containing the tables you want to replicate to Materialize.

  8. Use the CREATE SOURCE command to connect Materialize to your RDS instance via AWS PrivateLink and start ingesting data from the publication you created earlier:

    CREATE SOURCE mz_source
      IN CLUSTER ingest_postgres
      FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
      FOR ALL TABLES;
    

    By default, the source will be created in the active cluster; to use a different cluster, use the IN CLUSTER clause. To ingest data from specific schemas or tables in your publication, use FOR SCHEMAS (<schema1>,<schema2>) or FOR TABLES (<table1>, <table2>) instead of FOR ALL TABLES.

{{< /tab >}}

{{< tab "Use an SSH tunnel">}}

  1. In the Materialize console's SQL Shell, or your preferred SQL client connected to Materialize, use the CREATE CONNECTION command to create an SSH tunnel connection:

    CREATE CONNECTION ssh_connection TO SSH TUNNEL (
        HOST '<SSH_BASTION_HOST>',
        PORT <SSH_BASTION_PORT>,
        USER '<SSH_BASTION_USER>'
    );
    
    • Replace <SSH_BASTION_HOST> and <SSH_BASTION_PORT> with the public IP address and port of the SSH bastion host you created earlier.

    • Replace <SSH_BASTION_USER> with the username for the key pair you created for your SSH bastion host.

  2. Get Materialize's public keys for the SSH tunnel connection you just created:

    SELECT
        mz_connections.name,
        mz_ssh_tunnel_connections.*
    FROM
        mz_connections JOIN
        mz_ssh_tunnel_connections USING(id)
    WHERE
        mz_connections.name = 'ssh_connection';
    
  3. Log in to your SSH bastion host and add Materialize's public keys to the authorized_keys file, for example:

    # Command for Linux
    echo "ssh-ed25519 AAAA...76RH materialize" >> ~/.ssh/authorized_keys
    echo "ssh-ed25519 AAAA...hLYV materialize" >> ~/.ssh/authorized_keys
    
  4. Back in the SQL client connected to Materialize, validate the SSH tunnel connection you created using the VALIDATE CONNECTION command:

    VALIDATE CONNECTION ssh_connection;
    

    If no validation error is returned, move to the next step.

  5. Use the CREATE SECRET command to securely store the password for the materialize PostgreSQL user you created earlier:

    CREATE SECRET pgpass AS '<PASSWORD>';
    
  6. Use the CREATE CONNECTION command to create another connection object, this time with database access and authentication details for Materialize to use:

    CREATE CONNECTION pg_connection TO POSTGRES (
      HOST '<host>',
      PORT 5432,
      USER 'materialize',
      PASSWORD SECRET pgpass,
      DATABASE '<database>',
      SSH TUNNEL ssh_connection
      );
    
    • Replace <host> with your RDS endpoint. To find your RDS endpoint, select your database in the RDS Console, and look under Connectivity & security.

    • Replace <database> with the name of the database containing the tables you want to replicate to Materialize.

  7. Use the CREATE SOURCE command to connect Materialize to your RDS instance and start ingesting data from the publication you created earlier:

    CREATE SOURCE mz_source
      IN CLUSTER ingest_postgres
      FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source')
      FOR ALL TABLES;
    

    By default, the source will be created in the active cluster; to use a different cluster, use the IN CLUSTER clause. To ingest data from specific schemas or tables in your publication, use FOR SCHEMAS (<schema1>,<schema2>) or FOR TABLES (<table1>, <table2>) instead of FOR ALL TABLES.

{{< /tab >}}

{{< /tabs >}}

3. Monitor the ingestion status

{{% postgres-direct/check-the-ingestion-status %}}

4. Right-size the cluster

{{% postgres-direct/right-size-the-cluster %}}

D. Explore your data

{{% postgres-direct/next-steps %}}

Considerations

{{% include-md file="shared-content/postgres-considerations.md" %}}