Speed matters! In data pipelines, as in anywhere else.
Dealing with massive datasets is an everyday thing for most data professionals. There would be no issue if they streamed into a database or a warehouse.
But there are instances where we need to upload batch data that are heavy enough to keep our workstation useless for hours. Take a walk, and have some water. It’s good for you!
But if you really want to keep this task shorter, we need the most optimal way to load data to databases.
If it’s a pre-formatted file, I’d prefer to use the client libraries to do this. For instance, the following shell command can upload your CSV file to a remote Postgres database.
psql \
-h $hostname -d $dbname -U $username \
-c "\copy mytable (column1, column2) from '/path/to/local/file.csv' with delimiter as ','"
But this is rarely the case, at least in my case.
Since Python is my primary programming language, I have to upload them from Python, possibly after some pre-processing. Therefore I did a little experiment to see the fastest.
I found the fastest, and it’s not what I regularly use.
The ugly: How not to upload data
Although I’m now confident I should never use this method, I thought this might help initially.
Here’s what we do. I’ve got a dataset that’s about 500MB on the disk. First, I tried to load it with the insert command in the psycopg2 module.
import psycopg2
import csv
import time
def timing_decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"Function {func.__name__} took {end_time - start_time} seconds to run.")
return result
return wrapper
# Establish a connection to the PostgreSQL database
conn = psycopg2.connect(
host="localhost",
database="playground",
user="<your-user-name>",
password="<your-password>",
)
# Define the path to the CSV file and the name of the table to load it into
csv_file_path = "./data.csv"
table_name = "temp_table"
@timing_decorator
def load_csv_with_insert():
# Create a cursor object
cur = conn.cursor()
# Open the CSV file and read its contents
with open(csv_file_path, 'r') as f:
csv_reader = csv.reader(f)
next(csv_reader) # skip header row
# Define the query to insert data into the table
insert_query = f"INSERT INTO {table_name} VALUES ({','.join(['%s']*len(next(csv_reader)))})"
# Iterate over the CSV rows and execute the insert query for each row
for row in csv_reader:
cur.execute(insert_query, row)
# Commit the changes to the database
conn.commit()
# Close the cursor and connection
cur.close()
load_csv_with_insert()
conn.close()
I use the timing decorator to measure how long it takes to load. It’s one of my five favorite python decorators.
Further, I intentionally kept my database in my local host. Thus, bandwidth is out of the equation.
Earlier, I genuinely thought this might be the fastest way to load data. That’s because we use a cursor to insert data and commit simultaneously. But the whole process took this much time:
Function load_csv_with_insert took 1046.109834432602 seconds to run.
Okay, how do we know this is too slow without a reference?
Let’s try out the most popular way.
The bad: Using Pandas to_sql to load massive datasets.
If you use Pandas and it’s to_sql
API frequently, this might surprise you.
I’ve been using this, and I’m continuing to use it. But this is still not the best way to go about for large datasets.
Here’s my code. I’m using the same database and CSV as before. I’ve truncated the table before I start loading the dataset.
import pandas as pd
from sqlalchemy import create_engine
import time
...
conn = create_engine("postgresql+psycopg2://thuwa:Flora1990@localhost:5432/playground")
@timing_decorator
def load_csv_with_pandas():
df = pd.read_csv(csv_file_path)
df.to_sql(table_name, conn, if_exists="append", chunksize=100, index=False)
load_csv_with_pandas()
In the above code, I’m not using the popular method
argument. Setting them to multi
would speed up data loading to an analytical database such as Redshift. But in our case, we use a transactional database.
Returning to the main point, this method took us 376 seconds to load.
Function load_csv_with_pandas took 376.70790338516235 seconds to run.
This makes Pandas way better than using a cursor to load data. It’s about 3X gain.
But what makes it not the fastest? After all, this is my favorite and most used method.
The good: Using the COPY method
There’s a native way to upload text data to SQL tables. And libraries such as psycopg2 offer this out of the box.
Yes, we can copy a file as it is to SQL tables in Python.
...
@timing_decorator
def load_csv_with_copy():
# Create a cursor object
cur = conn.cursor()
# Use the COPY command to load the CSV file into the table
with open(csv_file_path, "r") as f:
next(f) # skip header row
cur.copy_from(f, table_name, sep=",")
conn.commit()
# Close the cursor and connection
cur.close()
The copy_from
method in the cursor uses the COPY
method in SQL client API and directly uploads the file to SQL. Along with it, we pass additional arguments.
Here, in this case, we specify that a comma separates the columns. We also used the next
method to skip the first row, which is the table header.
Here are the results:
Function load_csv_with_copy took 50.60594058036804 seconds to run.
An astonishing 50 seconds — 20X faster than using a cursor and almost 8X faster than the to_sql
Pandas method.
But wait!
I said I use Python because often do data pre-processing. The other two methods are straightforward. But how will we upload an existing dataset in Python runtime in this method?
Writing a dataset existing in the memory using COPY
This is where we can benefit from the buffer method. Here’s the code fast load an existing Pandas dataframe to a SQL database.
import io
...
@timing_decorator
def load_dataframe_with_copy(df):
# Create a cursor object
cur = conn.cursor()
# Convert the DataFrame to a CSV file-like object
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False, header=False)
# Use the COPY command to load the CSV file into the table
csv_buffer.seek(0)
cur.copy_from(csv_buffer, table_name, sep=",")
conn.commit()
# Close the cursor and connection
cur.close()
df = pd.read_csv(csv_file_path)
# Do data processing on df
load_dataframe_with_copy(df)
This code creates an StringIO
object called csv_buffer
, which is a file-like object that behaves like a CSV file. The DataFrame is written to this object using the to_csv()
method with index=False
and header=False
to exclude the index and header from the CSV output.
The seek(0)
method is then called on the csv_buffer
object to move the file pointer back to the beginning of the file-like object.
Conclusion
Working with large datasets is not the same as working with an ordinary dataset. And one of the challenging tasks is to load such gigantic datasets into databases.
Unless no pre-processing is required, I’d almost always use the Pandas’ to_sql method. Yet, my experiment suggests this isn’t the best way for large datasets.
The COPY method is the fastest I’ve seen so far. While I suggest this for batch loading in a controlled environment, for day-to-day tasks, to_sql
gives a fantastic interface for tweaking several upload behaviors.
No comments:
Post a Comment