Thursday, January 26, 2023

Streaming Big Data Files from Cloud Storage

Methods for efficient consumption of large files

Photo by Aron Visuals on Unsplash

Direct Download from Amazon S3

Metrics for Comparative Performance Measurement

Toy Example

import os, boto3

KB = 1024
MB = KB * KB

def write_and_upload():
# write 2 GB file
with open('2GB.bin', 'wb') as f:
for i in range(2*MB):
f.write(os.urandom(MB))

# upload to S3
bucket = '<s3 bucket name>'
key = '<s3 key>'
s3 = boto3.client('s3')
s3.upload_file('2GB.bin', bucket, key)

def read_sequential(f, t0):
t1 = time.time()
x = f.read(MB)
print(f'time of first sample: {time.time() - t1}')
print(f'total to first sample: {time.time() - t0}')
t1 = time.time()
count = 0
while True:
x = f.read(MB)
if len(x) == 0:
break
count += 1
print(f'time of avg read: {(time.time() - t1)/count}')

def fast_forward(f):
t1 = time.time()
total = 10
for i in range(total):
f.seek(i * 100 * MB)
t1 = time.time()
x = f.read(MB)
print(f'time of avg random read: {(time.time() - t1)/total}')

Downloading from S3 to Local Disk

Boto3 File Download

import boto3, time

bucket = '<s3 bucket name>'
key = '<s3 key>'
local_path = '<local path>'

s3 = boto3.client('s3')

config = boto3.s3.transfer.TransferConfig(
multipart_threshold=8 * MB,
max_concurrency=10,
multipart_chunksize=8 * MB,
num_download_attempts=5,
max_io_queue=100,
io_chunksize=256 * KB,
use_threads=True,
max_bandwidth=None)

t0 = time.time()
s3.download_file(bucket, key, local_path, Config=config)

with open(local_path, 'rb') as f:
read_sequential(f,t0)

print(f'total time: {time.time()-t0}')

AWS CLI

import shlex, time
from subprocess import Popen

bucket = '<s3 bucket name>'
key = '<s3 key>'
local_path = '<local path>'

cmd = f'aws s3 cp s3://{bucket}/{key} {local_path}'
p = Popen(shlex.split(cmd)).wait()

with open(local_path, 'rb') as f:
read_sequential(f,t0)

print(f'total time: {time.time()-t0}')

S5cmd

import shlex, time
from subprocess import Popen

bucket = '<s3 bucket name>'
key = '<s3 key>'
local_path = '<local path>'

s5cmd = f's5cmd cp --concurrency 10 s3://{bucket}/{key} {local_path}'
p = Popen(shlex.split(cmd)).wait()

with open(local_path, 'rb') as f:
read_sequential(f,t0)

print(f'total time: {time.time()-t0}')

Multi-part vs. Single-threaded Download

The Art of Prefetching Data Files

Streaming Data from S3

import os, boto3, time, multiprocessing as mp

bucket = '<s3 bucket name>'
key = '<s3 key>'

t0 = time.time()

os.mkfifo(local_path)

def stream_file():
s3 = boto3.client('s3')
with open(local_path, 'wb') as f:
s3.download_fileobj(bucket, key, f)


proc = mp.Process(target=stream_file)
proc.start()

with open(local_path, 'rb') as f:
read_sequential(f, t0)

print(f'total time: {time.time()-t0}')

Reading Data from Arbitrary Offsets

import boto3, time

bucket = '<s3 bucket name>'
key = '<s3 key>'

s3 = boto3.client('s3')

# stream entire file
t0 = time.time()
response = s3.get_object(
Bucket=bucket,
Key=key
)
f = response['Body']
read_sequential(f,t0)

print(f'total time: {time.time()-t0}')

# fast forward
total = 10
t0 = time.time()

for i in range(total):
response = s3.get_object(
Bucket=bucket,
Key=key,
Range=f'bytes={i*100*MB}-{i*100*MB+MB}'
)
f = response['Body']
x = f.read()

print(f'time of avg random read: {(time.time() - t0)/total}')

Filtering Data with Amazon S3 Select

Mounting S3 Data Using Goofys

goofys -o ro -f <s3 bucket name> <local path>
bucket = '<s3 bucket name>'
key = '<s3 key>'
mount_dir = '<local goofys mount>'
sequential = True # toggle flag to run fast_forward

t0 = time.time()
with open(f'{mount_dir}/{key}', 'rb') as f:
if sequential:
read_sequential(f, t0)
print(f'total time: {time.time()-t0}')
else:
fast_forward(f)

Comparative Results

Comparative Results of Pulling 2 GB File from S3 (by Author)

Summary

No comments:

Must Watch YouTube Videos for Databricks Platform Administrators

  While written word is clearly the medium of choice for this platform, sometimes a picture or a video can be worth 1,000 words. Below are  ...