Skip to content

RDS PostgreSQL Migration and Splitting a Monolithic Database into Multiple RDS Instances

Written by:

Igor Gorovyy
DevOps Engineer Lead & Senior Solutions Architect
LinkedIn

Migrating Amazon RDS PostgreSQL databases and splitting a monolithic PostgreSQL database into multiple RDS instances can significantly enhance performance, scalability, and manageability. In this article, we will explore best practices for RDS PostgreSQL migration and database splitting, covering challenges, strategies, and tools to streamline the process.

How to migrate and split aws rds databases

Why Migrate or Split RDS PostgreSQL?

Reasons for Migration:

Cost Optimization: Moving from on-demand pricing to reserved instances or using a more cost-efficient database engine.

Performance Improvement: Upgrading to a larger instance type or a different database engine version.

High Availability & Disaster Recovery: Migrating to a multi-AZ setup or cross-region replication for redundancy.

Compliance Requirements: Moving to a region that meets data sovereignty regulations.

Reasons for Splitting a Monolithic RDS PostgreSQL Database:

Performance Bottlenecks: Large databases may lead to slower queries and increased latency.

Scalability: Decoupling services and reducing the load on a single instance.

Microservices Architecture: Different services might need their own databases for isolation and optimized access.

Easier Maintenance: Reducing the impact of schema changes or updates.

The Importance of Python for DevOps Engineers

Python is an essential tool for DevOps engineers, providing automation, integration, and flexibility in managing cloud infrastructure, CI/CD pipelines, and monitoring systems. Some key benefits include:

Infrastructure as Code (IaC): Python helps in managing infrastructure through tools like boto3 for AWS automation and Terraform scripting.

Automation & Scripting: Automating database migrations, backups, deployments, and monitoring using Python scripts.

Cloud Management: Managing AWS resources (EC2, RDS, S3, etc.) efficiently with boto3.

CI/CD Integration: Using Python to create deployment scripts, integrate with Jenkins, GitHub Actions, and Kubernetes.

Monitoring & Logging: Parsing logs, monitoring metrics with Prometheus, and automating alerts with Python.

For database migration and splitting, Python plays a crucial role in automating the process using AWS SDK (boto3), PostgreSQL utilities (pg_dump, pg_restore), and scripting to manage database connections, snapshots, and cross-region transfers.

RDS PostgreSQL Migration Strategies

1. Same-Engine Migration (Minimal Downtime)

If you are migrating within PostgreSQL: - AWS DMS (Database Migration Service): Supports near-zero downtime migration. - Logical Replication: Set up logical replication for seamless migration. - Snapshots & Restores: Create a snapshot and restore it in the new environment. - pg_dump / pg_restore: Manually export and import data for smaller databases.

2. Cross-Engine Migration

If migrating from another engine (e.g., MySQL → PostgreSQL), you will need: - AWS DMS: Converts schema and migrates data. - Schema Conversion Tool (SCT): Helps with schema translation. - Manual Export/Import: Using pg_dump and pg_restore.

3. Cross-Region Migration

If moving a database across regions: - Enable Multi-AZ with cross-region replication. - Use AWS DMS for continuous replication. - Leverage S3 for data transfer.

4. Automated Migration Approach with boto3

An automated approach to migrating RDS PostgreSQL instances across regions can be implemented using Python and the boto3 library. The key steps include:

  1. Identify source RDS instances
  2. Create snapshots for each instance
  3. Copy snapshots to the target region using copy_db_snapshot
  4. Restore instances in the target region
  5. Validate the migration

Visual representation of migration process:

sequenceDiagram
    participant User
    participant Script
    participant Source RDS
    participant Target RDS
    participant AWS KMS

    User->>Script: Start migration
    Script->>Source RDS: List DB instances
    Source RDS-->>Script: Return instances list
    loop For each instance
        Script->>Source RDS: Create snapshot
        Source RDS-->>Script: Snapshot created
        Script->>AWS KMS: Get KMS key (if encrypted)
        AWS KMS-->>Script: KMS key
        Script->>Target RDS: Copy snapshot to target region
        Target RDS-->>Script: Snapshot copied
        Script->>Target RDS: Restore DB from snapshot
        Target RDS-->>Script: DB restored
    end
    Script-->>User: Migration complete

Example Python implementation:

import boto3
import time
import concurrent.futures
import botocore.exceptions

# Configuration
source_region = 'eu-central-1'  # Source region
target_region = 'eu-west-1'  # Target region
availability_zone = f'{target_region}a'  # Availability zone in the target region
multi_az = False  # Set to True for Multi-AZ deployment
publicly_accessible = False  # Set to False if the instance should not be publicly accessible
# Security group ID to be applied to the restored DB instance
security_group_ids = ['sg-0fea3798bf54e9fe6']

# Specify your KMS key ID in the target region (required for encrypted snapshots)
kms_key_id = '18d******d-89df-4690-82a1-f2**********ccb'  # Ireland
db_subnet_group_name = 'multiregion-dev-vpc-blue'  # Your specific DB subnet group name

# Create a client for the source region
rds_client_source = boto3.client('rds', region_name=source_region)

# Create a client for the target region

rds_client_target = boto3.client('rds', region_name=target_region)

# Get the account ID for ARN generation
account_id = boto3.client('sts').get_caller_identity().get('Account')

# Define an exclude list for instance identifiers
exclude_list = ['env-dev-msv1', 'env-dev-msv2', 'env-dev-msv3', 'env-msv4', 'env-msv5', 'env-msv6', 'env-msv7']  # Add your specific instance IDs to exclude

# Record the total start time
total_start_time = time.time()

def migrate_rds_instance(db_instance):
    try:
        db_instance_identifier = db_instance['DBInstanceIdentifier']

        if db_instance_identifier in exclude_list:
            print(f"Skipping excluded RDS instance: {db_instance_identifier}")
            return

        instance_start_time = time.time()

        db_instance_class = db_instance['DBInstanceClass']
        storage_type = db_instance.get('StorageType', 'Not specified')
        engine = db_instance['Engine']
        allocated_storage = db_instance['AllocatedStorage']

        print(f"Processing RDS Instance: {db_instance_identifier}")
        print(f" - Instance Class: {db_instance_class}")
        print(f" - Storage Type: {storage_type}")
        print(f" - Engine: {engine}")
        print(f" - Allocated Storage: {allocated_storage} GiB")

        snapshot_identifier = f"{db_instance_identifier}-snapshot-{int(time.time())}"
        target_db_instance_identifier = f"{db_instance_identifier}-migrated"

        print(f"\nCreating snapshot for RDS instance: {db_instance_identifier}")
        snapshot_start_time = time.time()

        response_snapshot = rds_client_source.create_db_snapshot(
            DBSnapshotIdentifier=snapshot_identifier,
            DBInstanceIdentifier=db_instance_identifier
        )

        print("Waiting for snapshot to be available...")
        snapshot_waiter = rds_client_source.get_waiter('db_snapshot_available')
        snapshot_waiter.wait(
            DBSnapshotIdentifier=snapshot_identifier
        )

        snapshot_end_time = time.time()
        snapshot_duration = snapshot_end_time - snapshot_start_time
        print(f"Snapshot {snapshot_identifier} is now available for {db_instance_identifier}. Time taken: {snapshot_duration:.2f} seconds.")

        print(f"Copying snapshot to the target region: {target_region}")
        copy_start_time = time.time()

        snapshot_details = rds_client_source.describe_db_snapshots(
            DBSnapshotIdentifier=snapshot_identifier
        )['DBSnapshots'][0]

        encrypted = snapshot_details['Encrypted']

        copy_params = {
            'SourceDBSnapshotIdentifier': f"arn:aws:rds:{source_region}:{account_id}:snapshot:{snapshot_identifier}",
            'TargetDBSnapshotIdentifier': snapshot_identifier,
            'SourceRegion': source_region
        }

        if encrypted:
            copy_params['KmsKeyId'] = kms_key_id
            print(f"Snapshot is encrypted. Using KMS key: {kms_key_id}")

        response_copy = rds_client_target.copy_db_snapshot(**copy_params)

        print("Waiting for the snapshot to be available in the target region...")
        copy_waiter = rds_client_target.get_waiter('db_snapshot_completed')

        max_attempts = 60
        delay = 30

        try:
            copy_waiter.wait(
                DBSnapshotIdentifier=snapshot_identifier,
                WaiterConfig={
                    'Delay': delay,
                    'MaxAttempts': max_attempts
                }
            )
        except botocore.exceptions.WaiterError as e:
            print(f"Error waiting for snapshot copy: {e}")
            return

        copy_end_time = time.time()
        copy_duration = copy_end_time - copy_start_time
        print(f"Snapshot {snapshot_identifier} copied successfully to {target_region} for {db_instance_identifier}. Time taken: {copy_duration:.2f} seconds.")

        print(f"Restoring DB instance from snapshot {snapshot_identifier} in {target_region}")
        restore_start_time = time.time()

        restore_params = {
            'DBInstanceIdentifier': target_db_instance_identifier,
            'DBSnapshotIdentifier': snapshot_identifier,
            'DBInstanceClass': db_instance_class,
            'DBSubnetGroupName': db_subnet_group_name,
            'MultiAZ': multi_az,
            'PubliclyAccessible': publicly_accessible,
            'VpcSecurityGroupIds': security_group_ids
        }

        if not multi_az:
            restore_params['AvailabilityZone'] = availability_zone

        response_restore = rds_client_target.restore_db_instance_from_db_snapshot(**restore_params)

        print("Waiting for the new DB instance to be available...")
        instance_waiter = rds_client_target.get_waiter('db_instance_available')
        instance_waiter.wait(
            DBInstanceIdentifier=target_db_instance_identifier
        )

        restore_end_time = time.time()
        restore_duration = restore_end_time - restore_start_time
        print(f"DB instance {target_db_instance_identifier} is now available in {target_region}. Time taken: {restore_duration:.2f} seconds.")

        instance_end_time = time.time()
        instance_duration = instance_end_time - instance_start_time
        print(f"Migration of {db_instance_identifier} completed. Total time taken: {instance_duration:.2f} seconds.\n")

    except Exception as e:
        print(f"Error migrating instance {db_instance['DBInstanceIdentifier']}: {e}")

def main():
    response = rds_client_source.describe_db_instances()
    db_instances = response['DBInstances']

    if not db_instances:
        print("No RDS instances found in the source region.")
        return

    print(f"Found {len(db_instances)} RDS instances.")

    with concurrent.futures.ThreadPoolExecutor() as executor:
        executor.map(migrate_rds_instance, db_instances)

    total_end_time = time.time()
    total_duration = total_end_time - total_start_time
    print(f"All RDS instances have been processed for migration. Total time taken: {total_duration:.2f} seconds.")

if __name__ == '__main__':
    main()

Splitting a Monolithic RDS PostgreSQL Database

To split a monolithic database into multiple RDS instances, automation using pg_dump and pg_restore can simplify the process.

Automating the Split Process with Python

  1. Read mapping file for old and new database connections
  2. Export each source database using pg_dump
  3. Import into the corresponding new database using pg_restore

Visual representation of splitting process:

sequenceDiagram
    participant Script
    participant Mapping File
    participant Source DB
    participant Target DB
    participant Filesystem

    Script->>Mapping File: Read DB mappings
    Mapping File-->>Script: Return mappings
    loop For each mapping
        Script->>Script: Parse DB URLs
        Script->>Source DB: pg_dump export
        Source DB-->>Script: Database dump
        Script->>Filesystem: Save dump file
        Script->>Target DB: pg_restore import
        Target DB-->>Script: Import complete
    end
    Script->>Script: Calculate total time

Example Python script:

import subprocess
import time
from urllib.parse import urlparse, parse_qs

def read_mapping_file(file_path):
    with open(file_path, 'r') as f:
        lines = f.readlines()

    mappings = [line.strip().split('@@') for line in lines if line.strip()]
    return mappings

def parse_db_url(db_url):
    parsed_url = urlparse(db_url)
    username = parsed_url.username
    password = parsed_url.password
    host = parsed_url.hostname
    port = parsed_url.port
    dbname = parsed_url.path.lstrip('/')
    query_params = parse_qs(parsed_url.query)

    return {
        "user": username,
        "password": password,
        "host": host,
        "port": str(port),
        "dbname": dbname,
        "options": query_params
    }

def export_database(config, dump_file):
    print(f"Starting database export for {config['dbname']}...")
    start_time = time.time()

    dump_command = [
        "/usr/bin/pg_dump",
        "-h", config['host'],
        "-p", config['port'],
        "-U", config['user'],
        "-d", config['dbname'],
        "-F", "c",
        "-f", dump_file
    ]

    env = {"PGPASSWORD": config['password']}

    try:
        subprocess.run(dump_command, check=True, env=env)
        end_time = time.time()
        elapsed_time = end_time - start_time
        print(f"Database exported to {dump_file} in {elapsed_time:.2f} seconds.")
    except subprocess.CalledProcessError as e:
        print("Error exporting database:", e)

def import_database(config, dump_file):
    print(f"Starting database import for {config['dbname']}...")
    start_time = time.time()

    import_command = [
        "/usr/bin/pg_restore",
        "-h", config['host'],
        "-p", config['port'],
        "-U", config['user'],
        "-d", config['dbname'],
        "--clean",
        dump_file
    ]

    env = {"PGPASSWORD": config['password']}

    try:
        subprocess.run(import_command, check=True, env=env)
        end_time = time.time()
        elapsed_time = end_time - start_time
        print(f"Database imported from {dump_file} in {elapsed_time:.2f} seconds.")
    except subprocess.CalledProcessError as e:
        print("Error importing database:", e)

def process_mappings(file_path):
    mappings = read_mapping_file(file_path)

    total_start_time = time.time()
    for old_db_url, new_db_url in mappings:
        old_db_config = parse_db_url(old_db_url)
        new_db_config = parse_db_url(new_db_url)

        print(f"Processing mapping: {new_db_config['dbname']}")
        dump_file = f"{old_db_config['dbname']}.dump"

        export_database(old_db_config, dump_file)

    total_end_time = time.time()
    total_elapsed_time = total_end_time - total_start_time
    print(f"Total process completed in {total_elapsed_time:.2f} seconds.")

mapping_file_path = "db_mapping.txt"
process_mappings(mapping_file_path)

This method ensures smooth database transfers into new RDS PostgreSQL instances.


Conclusion

Migrating and splitting an RDS PostgreSQL database can significantly enhance performance, scalability, and manageability. Python plays a crucial role in automating infrastructure management, database migration, and DevOps workflows. Leveraging AWS tools, boto3, and PostgreSQL utilities minimizes downtime and ensures efficient data transfer.