Python, Apache Beam, Relational Database (MySQL, PostgreSQL), Struggle and The Solution

Background

We all(most of us) are fans of python programming due to the ease of development efforts we need to put with this programming language.

Apache Beam is an SDK to develop a data processing pipeline for batch and streaming data. Now when it comes to the practical use of Apache beam SDK in the real world, we often encounter the limitation or feature supported by Apache beam SDK to process a certain type of source using in-built connectors.

The Apache Beam SDK for python only supports a limited database connectors Google BigQuery, Google Cloud Datastore, Google Cloud Bigtable (Write), MongoDB. The Real-world also depends on MySQL and PostgreSQL being the widely used relational database across all the domains and all the levels of software development.

Apache beam also provides a guide to develop your IO connector but it is not that easy to write a connector. You need to take care of a lot of factors like distributing your queries across your apache beam workers, collecting the records and all that stuff, and most importantly designing your IO connector so that your fellow developer can call them easily and be able to specify the table or SQL query to read using.

Motivation

There are good chances that you will have to work on either MySQL or PostgreSQL on your every third or fourth project. However, the amount of engagement with these databases can be different. Here is where my story starts.

I was working on one of the Customer projects who have their databases on AWS and was majorly using Redshift. Now as we know Redshift can be used with PostgreSQL connector as well. I still have an assumption that Redshift is built on the top of PostgreSQL.

Now as I started developing my apache beam and trying to read data from the Amazon RDS database, My apache-beam dataflow pipeline struggles to scale to multiple workers.

Why Apache Beam pipeline was not scaling

Being no IO connector available to read data from the AWS RDS Database or in particular PostgreSQL or MySQL database, I write a ParDo function and was creating my connection to RDS and in doFn I was reading from the RDS based on a SQL query. Now because of how the ParDo works or you can say how I was reading data from the RDS in my ParDo function was not the correct way to Read Data from any data source in Apache beam or any other scalable data processing pipeline.

As I deep dived into how the in-built IO connectors are coded in Apache beam, I came to know that it is not so easy to write an IO connector in Apache Beam.

But anyway ended up writing a New IO connector to read from PostgreSQL and MySQL database. The Code is available here https://github.com/yesdeepakverma/pysql-beam and can be downloaded using

pip install pysql-beam

command as well.

How to use this package

  1. Install the package using pip install pysql-beam command
  2. Import the package in your python apache beam pipeline
Image for post
Image for post

3. Create a PTransform object

Image for post
Image for post

4. The pipeline options are defined as below

Image for post
Image for post

And Use the PTransform in your pipeline like this

Image for post
Image for post

How this works behind the scene

  1. User pass the table name or SQL query
  2. If the user pass the table name, generate the SQL query by
  3. SELECT * FROM TABLE_NAME
  4. Find the number of records to be returned by the query
  5. SELECT COUNT(1) FROM TABLE_NAME
  6. Generate pagination SQL query using pagination feature available by the MySQL and PostgreSQL
  7. Based on the batch size passed by the user, this step will generate the total_records/batch_size number of paginated SQL query
  8. SELECT * FROM TABLE_NAME OFFSET ((PAGE_NUM-1)*BATCH_SIZE) LIMIT BATCH_SIZE
  9. Then these paginated SQL queries are distributed to apache beam workers
  10. And then processing and reading are performed on distributed workers.
  11. Workflow is explained below
Image for post
Image for post

This python package solves the issue when you try to read from the database in a ParDo function as your data pipeline is unable to scale. This solution scales your pipeline based on the batch size you pass when building your pipeline.

I hope this will solve the long-standing problem of reading SQL databases from the Python apache beam pipeline.

Note: Support for MSSQL is coming soon(Thanks to jac2130 for adding support for the MSSQL database). For the updated code, please refer to https://github.com/yesdeepakverma/pysql-beam.git Github repo.

Google Cloud Platform | Machine Learning | Kubernetes | Cloud Architect | Python

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store