article
Building an ETL flow with asyncio, multiprocessing and asyncpg
Renato Vieira • 31 March 2021
This post will explain how to implement a concurrent ETL (Extract, Transform, Load) flow combining Python asyncio
with multiprocessing
to get the best of both worlds. ETL itself is a procedure that starts with data extraction from sources such as a database (or many databases). Following extraction, we apply functions to the data in order to modify it. These transformation functions are generally used for cleaning or converting extracted data. This flow is particularly profitable for data warehousing.
Operations with databases are core parts of many applications. Many of these procedures deal with tables containing millions of rows, which can hugely impact the performance of said applications. With Python we’d most commonly use the psycopg2
library to perform operations in a PostgreSQL database.
To speed up ETL operations, we can benefit from concurrent code. Python provides us with asyncio
, a built-in library which allows developers to write concurrent code without dealing with low-level programming; thread management, for instance. On asyncio
, functions that are concurrently executed are called coroutines. For our ETL context, we’ll wrap those coroutines into tasks and wait for all of them to be done with asyncio.gather
function. Its behavior means our code waits for every concurrent task to be completed before it can advance, similarly to the barriers used to synchronize the execution of threads running concurrently.
For our context, we are going to implement the following steps to model ETL:
- Fetch records from a database (
extract
), each row having aname
and anage
; - Perform an operation over the obtained data (
transform
) to split thename
from each record intofirst_name
andlast_name
, while keeping theage
value unaltered; - Write the transformed data into a database (
load
).
Since psycopg2
doesn’t support asyncio
, one of the available libraries that implement asynchronous database operations is asyncpg
. We’ll use it to implement extract and load, steps that deal with database access.
According to asyncpg
developers, the library on average is three times faster than psycopg2. It also provides tools to perform database operations, such as prepared statements and cursors. There are however some missing features, such as a built-in mechanism to prevent SQL injection (there’s a trick to implement it though, and we will discuss it later in the text).
Synchronizing all tasks
In order to implement an ETL flow, we must provide functions to represent each stage. In a non-concurrent code, we’d have functions such as fetch_from_db
, transform_data
and write_into_db
that would be called sequentially. In a concurrent run, we want each function to run concurrently, starting computation as soon as there’s data to work with while also providing data to the next stage whenever the previous function’s job is done.
To establish this kind of synchronization between each stage, one approach is to implement a consumer & producer architecture with one queue being used to pass data between them.
We begin structuring our code writing the etl
function:
import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
async def etl():
with ProcessPoolExecutor(
max_workers=multiprocessing.cpu_count(),
) as pool:
loop = asyncio.get_running_loop()
queue = asyncio.Queue(maxsize=1000)
await asyncio.gather(
asyncio.create_task(producer(queue)),
asyncio.create_task(consumer(loop, pool, queue)),
return_exceptions=False,
)
def main():
asyncio.run(etl())
Firstly, our etl
function must be asynchronous (through the async
keyword) in order to fully work with asyncio
.
We'll be using a ProcessPoolExecutor
to perform our transformation procedures. This allows us to not halt our execution when processing data. Setting the number of workers in the pool to all CPUs our machine has will help to speed things up by fully using the available resources. The usage of these structures will be further explained in a later section.
Notice that when instantiating our queue, we’ve used the maxsize
parameter. Why do we do that?
First of all, using an unbounded queue has the risk of depleting memory resources. Secondly, producer would fill the queue indefinitely and only then consumer would start running. That may end up with synchronous code, as your consumer would only start when producer had nothing else to do.
Defining a queue size limit makes consumer start earlier, as soon as the queue reaches the set limit for the first time. When space is freed up, producer will continue its job and add more elements to the queue, making the code run concurrently. This means the queue is acting as a backpressure mechanism. It’s positive to have such mechanism as long as the production speed is faster than consumption speed, which happens in many cases. For more details regarding these issues, please check this article.
main
will be responsible for invoking etl
. While main
itself is not asynchronous, it uses asyncio.run
to execute an asynchronous function. After etl
is over, any code after asyncio.run(etl())
would be executed as regular synchronous code.
Producer has a fairly straightforward implementation:
async def producer(queue):
connection = await get_asyncpg_connection()
async for record in extract(connection):
await queue.put(record)
await queue.put(None)
await connection.close()
We start by grabbing an asyncpg
connection object. Once attained, we create a loop that fetches data from our database and return the records via extract
function. Each record is first inserted into queue then processed by consumer, until there is no more data to fetch. To signalize its job is done, producer will then insert a None
value into the queue, which must be handled gracefully by consumer. Lastly, we close the asyncpg
connection.
Regarding both get_asyncpg_connection
and extract
functions, let’s dive into their implementations.
import asyncpg
async def get_asyncpg_connection():
db_conf = {
"user": "user_name",
"password": "password",
"database": "db_name",
"host": "host",
"port": "port",
}
connection = await asyncpg.connect(**db_conf)
return connection
async def extract(connection):
async with connection.transaction():
query = "SELECT name, age FROM input_table"
async for record in connection.cursor(query, prefetch=1000):
yield record
get_asyncpg_connection
is the only part of the code that explicitly interacts with the asyncpg
module. It creates a connection with our database using the asyncpg.connect
method. This method's parameters are quite similar to the ones found at psycopg2.connect
. To simplify this post, we opted to instantiate a db_conf
variable inside get_asyncpg_connection
, using dummy values for the connection parameters. When adapting contexts, one would simply change values into real ones.
The extract
function is a generator that receives an open asyncpg
connection as its single parameter. It's responsible for accessing a table in our database (in our case, the input_table
table) via the cursor
method inside connection
. This fetches records from the database according to the query defined in extract
and yields each grabbed row.
Now it’s time to talk about consumer
implementation:
def transform(batch):
transformed_batch = []
for record in batch:
first_name, last_name = record["name"].split(" ", 1)
age = record["age"]
transformed_batch.append((first_name, last_name, age))
return transformed_batch
async def load(batch, connection):
async with connection.transaction():
columns = ("first_name", "last_name", "age")
await connection.copy_records_to_table(
"output_table", records=batch, columns=columns
)
async def task_set_load_helper(task_set, connection):
for future in task_set:
await load(await future, connection)
async def consumer(loop, pool, queue):
connection = await get_asyncpg_connection()
task_set = set()
batch = []
while True:
record = await queue.get()
if record is not None:
record = dict(record)
batch.append(record)
if queue.empty():
task = loop.run_in_executor(pool, transform, batch)
task_set.add(task)
if len(task_set) >= pool._max_workers:
done_set, task_set = await asyncio.wait(
task_set, return_when=asyncio.FIRST_COMPLETED
)
await task_set_load_helper(done_set, connection)
batch = []
if record is None:
break
if task_set:
await task_set_load_helper(
asyncio.as_completed(task_set), connection
)
await connection.close()
The consumer
function will be responsible for grabbing the records our producer stores in the queue and add them to a batch. We must remember to convert each record into a dict
before adding it to the batch in order to process data later - not doing so would raise a TypeError: can't pickle asyncpg.Record objects
exception. Whenever the queue becomes empty, consumer will start running the transformation step over the batch it built.
The transform
function receives a batch from the consumer, which is nothing more than a list
of dict
objects fetched from the database. Since we want to split the full names from each record into first and last names, we start by looping over the batch and grabbing every record. Our context is simple, so the transformation is pretty straightforward. After obtaining the transformed data we create a tuple
containing each value and finally add it to a list
representing a modified batch.
As applications are more complex and involve multiple output tables, it’s recommended to keep track of the columns and which table the records should be inserted into. This can be achieved by using a dict
to group records from each table. Since our use-case involves a single output table, using a list
of tuple
is enough.
When we use run_in_executor
method on line 30, our transformation step runs inside a given executor (in our case it’ll run in pool
, which is the instance of ProcessPoolExecutor
we've created on etl
). Since transformation function is CPU-bound, if we executed transform(batch)
directly here we would halt other coroutines until our batch has been fully processed, therefore dragging execution time.
Moreover, run_in_executor
returns a Future
object, not a processed batch. According to the Python docs, “a Future
represents an eventual result of an asynchronous operation”, meaning that our data won’t necessarily be ready when line 30 is executed, but will be at some point in the future. This means transform
will be running in background, so we can move on to process next batches concurrently.
The conditional block starting on line 32 prevents the application running out of memory by storing too many transform results. Whenever task_set
has more objects than our pool has of available workers, we can speed things up and process some tasks to obtain transformation results earlier. Since they’re Future
objects, we must wait for them to be completed. After calling asyncio.wait
, we’ll wait for a task to become ready. When this happens, we grab a new done_set
containing all completed tasks and update task_set
to keep track only of the pending ones.
As we now have completed tasks, we can store the results in the database. Since we are going to use our storing procedures elsewhere, we created a task_set_load_helper
helper function to avoid code repetition. It’s responsible for iterating over a set of completed tasks and calling the load
function that will effectively write into database.
Whenever load
is called from within task_set_load_helper
, it will receive a list
with transformed data and store it into output_table
in our database. To achieve this, we use the copy_records_to_table
method from asyncpg
connection object to bulk insert the records, which is faster than inserting each record individually. It receives a table to write data into and a list
of tuple
objects representing the records to insert. It may additionally receive an iterable with the columns from the table. It’s important to note that each column in this iterable must match the order of the respective value in the objects from the records
parameter.
When there are no more records to process, we finish creating batches and move on to obtain the last transformation results, in case there are still some objects to work with. We perform a similar procedure to what was done on line 36, but now we have to pass the entire set with the remaining tasks, which could still be pending. That means we must apply the asyncio.as_completed
function over task_set
to be assured that we’ll be iterating exclusively over completed tasks as soon as they become available.
Caveats
SQL injection is one of the main concerns when using asyncpg
. When using psycopg2
, we can avoid this issue by composing our queries using the psycopg2.sql
module objects, such as in the example below:
from psycopg2 import sql
table_name = "input_table"
column_name = "name"
value = "foo"
query = sql.SQL(
"SELECT * FROM {table_name} WHERE {column_name}={value}"
).format(
table_name=sql.Identifier(table_name),
column_name=sql.Identifier(column_name),
value=sql.Literal(value)
)
By using the sql.Identifier
and sql.Literal
we can respectively sanitize identifiers (i.e.: table and column names) and literals (i.e.: strings and integers) to build parameterized queries, while being assured that the resulting query won’t harm our database due to a maliciously crafted input.
asyncpg
allows us to build parameterized queries using syntax similar to the one used by native PostgreSQL, using the $n
syntax to provide query arguments. However, since PostgreSQL queries don’t allow us to parameterize tables and columns names, we are stuck with using Python’s string formatting to compose queries with dynamic identifiers. Without proper care, this can lead to catastrophic results in case these identifiers are user-provided. A malicious user can send a value to our system that when processed can lead to a DROP DATABASE
statement being executed.
Build safe systems. Have user profile in mind whenever designing permissions. Allowing admins to dynamically choose tables is sound, but granting an end-user the same privileges may be compromising.
In order to work around these issues we can look how psycopg2
works behind the scenes with sql.Identifier
.
async def sanitize_identifier(identifier, connection):
sanitized = await connection.fetchval("SELECT quote_ident($1)", identifier)
return sanitized
async def sanitize_table(table, connection):
try:
dirty_schema, dirty_table = table.split(".")
sanitized_schema = await sanitize_identifier(dirty_schema, connection)
sanitized_table = await sanitize_identifier(dirty_table, connection)
sanitized = f"{sanitized_schema}.{sanitized_table}"
except ValueError:
sanitized = await sanitize_identifier(table, connection)
return sanitized
The magic happens on sanitize_identifier
, where we establish a connection with the database (via an asyncpg
connection object) and retrieve the result for the quote_ident
PostgreSQL function, which helps us to sanitize input. Because table name is a literal and not an identifier in this case, notice that we are using the $n
syntax to pass the dynamic value to our query.
sanitize_table
builds upon the previous function in order to cover situations where our table name is prefixed by a schema name. We have to sanitize both components separately, not as a single string. After cleaning, we can safely merge them again into a single value using Python’s formatting.
Below is an example of a more robust extract
function, which would allow us to fetch data from our database filtering the results from a dynamic table and column.
async def extract(table_name, column_name, value, connection)
async with connection.transaction():
sanitized_table_name = sanitize_table(table_name, connection)
sanitized_column_name = sanitize_identifier(column_name, connection)
query = f"SELECT * FROM {sanitized_table_name} WHERE {sanitized_column_name}=$1"
async for record in connection.cursor(query, value):
yield dict(record)
More info
Below are some approaches to our topic that were implemented on third-party solutions:
aiomultiprocess
- A Python lib that also combinesasyncio
andmultiprocessing
and implements a solution quite similar to ours. Here's also a PyCon 2018 talk that introduces this library.pypeln
- A Python lib that abstracts the process when creating a concurrent data pipeline.
Special thanks to Rafael Carício who kindly reviewed this post.
comments