Streaming data from MySQL to Postgres.

Streaming data from MySQL to Postgres.

Streaming data from a MySQL database to a PostgreSQL database can be a useful way to move data between systems or to create a real-time replica of a database for reporting and analysis. One way to accomplish this is through the use of Change Data Capture (CDC) tools.

CDC is a method of tracking changes made to a database and capturing them in a separate stream. This stream can then be used to replicate the changes to another database.

DBConvert Streams helps to replicate your MySQL data to PostgreSQL in real time. It captures data changes from a source MySQL database and applies them to a target PostgreSQL database. This can be done by setting up a source to read the binary log of a MySQL database and transform the changes to a format that can be consumed by the target PostgreSQL database.


This blog post will provide a comprehensive guide on how to stream data from MySQL to Postgres.

DBConvert Streams GitHub repository contains multiple examples for using DBConvert Streams, e.g. configuration files, Docker Compose files.

To begin, let's clone the GitHub repository containing the MySQL to PostgreSQL streaming example.

git clone git@github.com:slotix/dbconvert-streams-public.git && cd dbconvert-streams-public/examples/mysql2postgres/sales-db/

Docker Compose configuration.

Since DBConvert Streams relies on multiple services, the most efficient way to start the containers is by using Docker Compose.

The docker-compose.yml file from the repository is provided below.

version: '3.9'
services:
  dbs-api:
    container_name: api
    image: slotix/dbs-api
    entrypoint:
      - ./dbs-api
      - --nats=nats:4222
      - --source=source-reader:8021
      - --target=target-writer:8022
    ports:
      - 8020:8020
    depends_on:
      - nats
    volumes:
      - ./mysql2pg.json:/mysql2pg.json:ro

  dbs-source-reader:
    container_name: source-reader
    image: slotix/dbs-source-reader
    entrypoint:
      - ./dbs-source-reader
      - --nats=nats:4222
    ports:
      - 8021:8021
    depends_on:
      - dbs-api

  dbs-target-writer:
    container_name: target-writer
    image: slotix/dbs-target-writer
    entrypoint:
      - ./dbs-target-writer
      - --nats=nats:4222
      - --prometheus=http://prometheus:9090
    ports:
      - 8022:8022
    depends_on:
      - dbs-source-reader

  nats:
    container_name: nats
    image: nats
    entrypoint: /nats-server
    command: "--jetstream -m 8222 --store_dir /data/nats-server"
    ports:
      - 4222:4222
      - 8222:8222

  prometheus:
    image: slotix/dbs-prometheus:latest
    container_name: prom
    user: root
    ports:
      - 9090:9090

  mysql-source:
    container_name: mysql-source
    build: ./source 
    environment:
      - MYSQL_ROOT_PASSWORD=123456
    ports:
      - '3306:3306'


  postgres-target:
    container_name: postgres-target
    image: postgres:15-alpine
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
    command: postgres
    ports:
      - '5432:5432'

Docker-compose file for DBConvert Streams

This docker-compose file is going to bring up multiple services and link them together so they can communicate with each other. With this setup, the replication process takes place between mysql-source and postgres-target databases, and it is controlled by dbs-api, dbs-source-reader, dbs-target-writer and nats services.

DBConvert Streams services.

  • dbs-api service is the entry point of DBConvert Streams. It is where requests are sent with configuration settings for the source and target databases. It specifies the connection details to other components of the system such as source-reader, and target-writer.

  • dbs-source-reader service is responsible for monitoring and capturing changes in the source database, then sending batches of records to the Event Hub.

  • dbs-target-writer service is used to receive changes from the Event Hub and apply them to the target database..

  • nats service is the core of the Event Hub, it provides communication between other DBS services.

  • prometheus service is used for monitoring the metrics of DBS services.

Database services.

The structure of the tables that will be used in our example is depicted in the diagram below.

Streaming data from MySQL to Postgres.

Entity Relationship (ER) Diagram of Database.

The structure of MySQL source tables is adapted from the jdaarevalo/docker_postgres_with_data GitHub repository.

Source and Target Databases.

mysql-source database image is based on slotix/dbs-mysql:8, which includes all the necessary settings to enable MySQL CDC replication. This image also contains the initdb.sql script, which creates tables with the structures shown above.

postgres-target database, on the other hand, is based on the official lightweight postgres:15-alpine image. postgres-target database will receive all changes made to the mysql-source database.

Both of these databases, mysql-source and postgres-target, are typically located on separate physical servers in a production environment. However, in this example, we will run them on the same machine within distinct containers for demonstration purposes.

Execution.

To start all services described above, execute the following command:

docker-compose up --build -d

This command will use the docker-compose.yml file to build and start the necessary containers in detached mode (-d option). The --build flag will force the rebuild of the images before starting the containers.


To check if the MySQL database inside the running container has all tables created successfully by the script on start, you can run the command:

docker exec -it mysql-source mysql -uroot -p123456 source -e 'SHOW TABLES;'

By running the above command, you can see a list of tables created in the source database inside the container, which will confirm if the script has created all the tables successfully on start.

+------------------+
| Tables_in_source |
+------------------+
| city             |
| country          |
| order_status     |
| product          |
| sale             |
| status_name      |
| store            |
| users            |
+------------------+

List of tables created on source

Stream configuration

This is the stream configuration file mysql2pg.json which is used to setup the database replication process.

{
  "source": {
    "type": "mysql",
    "connection": "root:123456@tcp(mysql-source:3306)/source",
    "filter": {
      "tables": [
        { "name": "product", "operations": ["insert"]},
        { "name": "country", "operations": ["insert"]},
        { "name": "city", "operations": ["insert"]},
        { "name": "store", "operations": ["insert"]},
        { "name": "users", "operations": ["insert"]},
        { "name": "status_name", "operations": ["insert"]},
        { "name": "sale", "operations": ["insert"]},
        { "name": "order_status", "operations": ["insert"]}
      ]
    }
  },
  "target": {
    "type": "postgresql",
    "connection": "postgres://postgres:postgres@postgres-target:5432/postgres"
  },
  "limits": {
    "numberOfEvents": 0,
    "elapsedTime": 0
  }
}
  • The source field specifies the type of the source database as "mysql" and the connection details to connect to the database, including username and password as well as the host and port.

  • The filter field within the source field specifies that only certain tables and their corresponding operations (in this case "insert") will be replicated.

  • The target field specifies the type of the target database as "postgresql" and the connection details to connect to the target database, including username and password as well as the host and port.

  • The limits field specifies that number of events and elapsed time are set to zero (0), which means that there are no limits in place for this replication process.

It is also worth noting that according to this config, only insert operations will be captured on the tables specified in the filter field.

Send configuration to DBConvert Streams API.

docker run -t --rm \
    --network sales-db_default \
    curlimages/curl  \
    --request POST \
    --url http://dbs-api:8020/api/v1/streams\?file=./mysql2pg.json

This command runs a docker run command to start a new container from the curlimages/curl image. It specifies that the container should join the network named sales-db_default using --network option.

This container will then run the command curl to make a HTTP POST request to the url http://dbs-api:8020/api/v1/streams?file=./mysql2pg.json. The URL contains an endpoint that is the dbs-api service which is running on port 8020 and is expecting a json file as a query parameter. This command creates a new stream on the dbs-api service with the configuration specified in the mysql2pg.json file.

It's important to note that this command assumes that the sales-db_default network and the dbs-api service are already created and running. It also assumes that the mysql2pg.json file is in the current working directory from which the command is run.

This is a JSON response indicating that the stream creation was successful.

{"status":"success",
    "data":{
        "id":"2KGlt8BCHLT0lXklrs5wqM6n7BQ",
        "source":{...},
        "target":{...},
        "limits":{}
     }
}

It contains the following fields:

  • status: This field indicates the status of the request, in this case "success"

  • data: This field contains the details of the stream that was created.

  • id: This field contains a unique identifier for the stream, in this case "2KGlt8BCHLT0lXklrs5wqM6n7BQ"

  • source: This field contains the details of the source database, including the type, connection details, and filter settings.

  • target: This field contains the details of the target database, including the type and connection details.

  • limits: This field contains the limits for the replication process, such as number of events and elapsed time.

Note that the details of the source and target field are not given here for brevity, it is just shown as ....

Check if tables on the target are created successfully.

💡 DBConvert Streams creates tables with the same structure as the source on the target if they are missing. At this point, all tables specified in the filter should exist on the Postgres target database.

To connect to the postgres-target Docker container and check if tables exist, you can run the following command:

docker exec -it postgres-target psql -U postgres -d postgres -c '\dt'

By running the above command, you can see a list of tables created in the postgres-target database, which will confirm if DBConvert Streams has created all the tables successfully.

            List of relations
 Schema |     Name     | Type  |  Owner
--------+--------------+-------+----------
 public | city         | table | postgres
 public | country      | table | postgres
 public | order_status | table | postgres
 public | product      | table | postgres
 public | sale         | table | postgres
 public | status_name  | table | postgres
 public | store        | table | postgres
 public | users        | table | postgres
(8 rows)

Populate the source with sample data.

Now that the mysql-source and postgres-target databases have identical table sets with the same structure, it is time to find out if the streaming of data works properly. This can be done by inserting data into the mysql-source database and observing if the same data is replicated to the postgres-target database.

-- Set params
SET @number_of_sales = '100';
SET @number_of_users = '100';
SET @number_of_products = '100';
SET @number_of_stores = '100';
SET @number_of_countries = '100';
SET @number_of_cities = '30';
SET @status_names = '5';
SET @start_date = '2023-01-01 00:00:00';
SET @end_date = '2023-02-01 00:00:00';

USE source;

TRUNCATE TABLE city ;
TRUNCATE TABLE product ;
TRUNCATE TABLE country ;
TRUNCATE TABLE status_name;
TRUNCATE TABLE users;
TRUNCATE TABLE order_status;
TRUNCATE TABLE sale;
TRUNCATE TABLE store;

-- Filling of products
INSERT INTO product
WITH RECURSIVE t(id) AS (
    SELECT 1
    UNION ALL
    SELECT id + 1
    FROM t
    WHERE id + 1 <= @number_of_products
)
SELECT id, CONCAT_WS(' ','Product', id)
FROM t;


-- Filling of countries
INSERT INTO country
WITH RECURSIVE t(id) AS (
    SELECT 1
    UNION ALL
    SELECT id + 1
    FROM t
    WHERE id + 1 <= @number_of_countries
)
SELECT id, CONCAT('Country ', id)
FROM t;


-- Filling of cities
INSERT INTO city
WITH RECURSIVE t(id) AS (
    SELECT 1
    UNION ALL
    SELECT id + 1
    FROM t
    WHERE id + 1 <= @number_of_cities
)
SELECT id
    , CONCAT('City ', id)
    , FLOOR(RAND() * (@number_of_countries + 1))
FROM t;


-- Filling of stores
INSERT INTO store
WITH RECURSIVE t(id) AS (
    SELECT 1
    UNION ALL
    SELECT id + 1
    FROM t
    WHERE id + 1 <= @number_of_stores
)
SELECT id
    , CONCAT('Store ', id)
    , FLOOR(RAND() * (@number_of_cities + 1))
FROM t;

-- Filling of users
INSERT INTO users
WITH RECURSIVE t(id) AS (
    SELECT 1
    UNION ALL
    SELECT id + 1
    FROM t
    WHERE id + 1 <= @number_of_users
)
SELECT id
    , CONCAT('User ', id)
FROM t;

-- Filling of status_names
INSERT INTO status_name
WITH RECURSIVE t(status_name_id) AS (
    SELECT 1
    UNION ALL
    SELECT status_name_id + 1
    FROM t
    WHERE status_name_id + 1 <= @status_names
)
SELECT status_name_id
    , CONCAT('Status Name ', status_name_id)
FROM t;

-- Filling of sales  
INSERT INTO sale
WITH RECURSIVE t(sale_id) AS (
    SELECT 1
    UNION ALL
    SELECT sale_id + 1
    FROM t
    WHERE sale_id + 1 <= @number_of_sales
)
SELECT UUID() AS sale_id
    , ROUND(RAND() * 10, 3) AS amount
    , DATE_ADD(@start_date, INTERVAL RAND() * 5 DAY) AS date_sale
    , FLOOR(RAND() * (@number_of_products + 1)) AS product_id
    , FLOOR(RAND() * (@number_of_users + 1)) AS user_id
    , FLOOR(RAND() * (@number_of_stores + 1)) AS store_id
FROM  t;

-- Filling of order_status
INSERT INTO order_status
WITH RECURSIVE t(order_status_id) AS (
    SELECT 1
    UNION ALL
    SELECT order_status_id + 1
    FROM t
    WHERE order_status_id + 1 <= @number_of_sales
)
SELECT UUID() AS order_status_id
    , DATE_ADD(@start_date, INTERVAL RAND() * 5 DAY) AS update_at
    , FLOOR(RAND() * (@number_of_sales + 1)) AS sale_id
    , FLOOR(RAND() * (@status_names + 1)) AS status_name_id
FROM t;

SQL to fill data with random data

This script above starts by truncating all the tables in the source database to clear any previous data, and then it inserts sample data into the tables. Each table is filled with sample data using a different set of parameters set at the top of the script.

To execute this SQL script, you can run the following command:

docker exec -i \
    mysql-source \
    mysql -uroot -p123456 -D source < $PWD/fill_tables.sql

Comparing number of records in source and target databases.

Let's compare the number of rows in the tables of the source and target databases.

docker exec -it mysql-source mysql -uroot -p123456 -D source -e "SELECT (SELECT COUNT(*) FROM product) as 'product_count',(SELECT COUNT(*) FROM country) as 'country_count',(SELECT COUNT(*) FROM city) as 'city_count',(SELECT COUNT(*) FROM store) as 'store_count',(SELECT COUNT(*) FROM users) as 'users_count',(SELECT COUNT(*) FROM status_name) as 'status_name_count',(SELECT COUNT(*) FROM sale) as 'sale_count',(SELECT COUNT(*) FROM order_status) as 'order_status_count';"

How many records are in each table of mysql-source db?

COUNTs for mysql-source tables.

COUNTs for mysql-source tables.

docker exec -it postgres-target psql -U postgres -d postgres -c "SELECT (SELECT COUNT(*) FROM product) as product_count,(SELECT COUNT(*) FROM country) as country_count,(SELECT COUNT(*) FROM city) as city_count,(SELECT COUNT(*) FROM store) as store_count,(SELECT COUNT(*) FROM users) as users_count,(SELECT COUNT(*) FROM status_name) as status_name_count,(SELECT COUNT(*) FROM sale) as sale_count,(SELECT COUNT(*) FROM order_status) as order_status_count;"

how many records are in each table of postgres-target db?

Streaming data from MySQL to Postgres.

COUNTs for postgres-target tables.

As you can see from the resulting output, all tables in both the source and target databases have an identical number of records in each table.

Check statistics.

The following command sends a GET request to the DBConvert API endpoint /api/v1/streams/stat which retrieves the statistics of current data stream. The jq command is used to format the JSON output for better readability.

docker run -t --rm \
    --network sales-db_default \
    curlimages/curl  \
    --request GET \
    --url http://dbs-api:8020/api/v1/streams/stat | jq
{
  "streamID": "2KHTjpsZAUCb8y1BZny3YKoX5qO",
  "source": {
    "counter": 635,
    "elapsed": "0s",
    "started": "2023-01-13T17:24:50.089257721Z",
    "status": "RUNNING"
  },
  "target": {
    "counter": 635,
    "elapsed": "0s",
    "started": "2023-01-13T17:24:50.089649312Z",
    "status": "RUNNING"
  }
}

The above output shows the statistics of the current data stream. The "source" field shows the statistics of the source database, and "target" field shows the statistics of the target database.

The "counter" field shows the number of events that have been processed by the stream, "elapsed" field shows the time elapsed since the stream started, "started" field shows the date and time when the stream was started and "status" field shows the status of the current stream.

Prometheus metrics.

Prometheus is a monitoring system that scrapes metrics data from various sources and stores them in a time-series database. DBConvert Streams collects its internal metrics in Prometheus format, allowing you to explore and visualize live data in dashboards. To view the collected metrics, visit http://127.0.0.1:9090 in a web browser to access the Prometheus UI.

Streaming data from MySQL to Postgres.

Conclusion

Change Data Capture (CDC) systems like DBConvert Streams can be used to stream data from a MySQL database to a PostgreSQL database in real-time, allowing you to keep the two systems in sync and take advantage of the unique features and capabilities of each database.

This guide provided information on streaming data in one direction, from MySQL Binlog to PostgreSQL. The DBConvert Streams Github repository contains more examples of configuring data streams, including from PostgreSQL Wals to MySQL and other configurations. These examples can serve as a starting point for setting up your own data stream, tailored to your specific needs and use case.

Getting your feedback about DBConvert Streams is essential for the development team to improve the software and make it more useful for the community. By sharing your ideas, reporting bugs, and requesting new features, you can actively participate in the development of the software and help to make it more robust and useful for everyone. Your participation is valued and appreciated by the development team and the community.

Originally posted at DBConvert Blog