Python|Amazon S3にある大容量CSVファイルを少ないメモリ使用量で値ごとに分割する方法
今回は、Pythonを使用してAmazon S3にある大容量のCSVファイルを読み込み、グループIDなどの値ごとにファイルを分割する際に、メモリ消費量を抑える方法について説明します。
AWS Lambdaの様にメモリ容量がコストに影響したり、メモリ不足で落ちたりするサービスでの処理に活用できるかと思います。
概要
メモリ消費を抑える一般的な方法として、ファイルをチャンクごとに読み込み、処理を行った後にチャンクデータを破棄する方法があります。しかし、今回のように値ごとにファイルを分割する場合、チャンクごとに読み込んで処理すると、出力ファイルがチャンクごとにばらばらになってしまうという問題が発生します。
この問題を解決するために、PythonのAWS用ライブラリであるAWS SDK for pandas (awswrangler)の機能を使用します。
実装方法
データ出力
awswrangler.s3.to_csvには、データを通常のファイルではなくデータセットとして出力する機能があります。この機能を有効にすると、出力を上書きではなく追記できるようになります。これを活用すると、チャンクごとに読み込んだデータを追記出力して破棄することにより、メモリ消費を抑えつつデータをひと塊のデータセットとして出力することが可能になります。
また、データセット機能を有効にすると指定した列でパーティションを分ける(= S3のフォルダを分ける)ことも可能になります。このパーティションを利用して値ごとに出力先を分割します。
実際にグループID (group_id
)ごとに分割するコードは下の通りです。
import awswrangler as wr
file_path = "s3://<バケット名>/sample.csv"
output_path = "s3://<バケット名>/output/"
# チャンクごとにファイルを読み込みメモリ消費を抑える
dfs = wr.s3.read_csv(
file_path,
chunksize=100_000,
)
for df in dfs:
# chunkごとにファイルを書き込み
# パーティション用に分割したい値をもつ列をコピー。
# これをしないと実際に出力されるファイルにgroup_id列が出力されない。
df["partition_group_id"] = df["group_id"]
wr.s3.to_csv(
df,
output_path,
dataset=True, # Datasetとしての書き込みを有効化
mode="append", # 追記モードを設定
sanitize_columns=False, # カラム名が変更されないようFalseを設定
partition_cols=["partition_group_id"], # パーティションを分ける列を設定
)
データ読込
データは通常のファイルではなくデータセットとして出力されますので、読み込む側でも考慮が必要になります。awswrangler.s3.read_csvにもデータセット機能がありますのでこちらを有効にします。有効化により、どのパーティションのデータを読み込むかをpartition_filter
で条件指定ができるようになります。
実際に特定のグループID (group_id
)のデータを読み込むコードは下の通りです。
import awswrangler as wr
group_id_value = "<取得したいグループIDの値>"
# partition_filterに取得したい値の条件を設定し読込み
df = wr.s3.read_csv(
f"s3://<バケット名>/output/",
dataset=True,
partition_filter=lambda x: True
if x["partition_group_id"] == group_id_value
else False,
)
# dataset読込みしたことによる不要な列(パーティション情報列)を削除
df = df.drop(["partition_group_id"], axis=1)
データセット機能の詳細
データセットとして出力したものを、実際にS3で確認すると下の様にグループIDごとにS3のフォルダが分かれる形となります。追記するとそのたびに新しいCSVファイルが作成されるため、疑似的に複数のファイルを1つのデータの塊としてみなすのがデータセット機能のようです。
# group_idが"A"のデータ
s3://<バケット名>/output/partition_group_id=A/<自動生成される番号>.csv
s3://<バケット名>/output/partition_group_id=A/<自動生成される番号>.csv ←追記されるごとにファイルが増える
# group_idが"B"のデータ
s3://<バケット名>/output/partition_group_id=B/<自動生成される番号>.csv
# group_idが"C"のデータ
s3://<バケット名>/output/partition_group_id=C/<自動生成される番号>.csv
また、awswrangler.s3.to_csvのpartition_cols
で指定した列は実際のCSVファイルから除外されますが、awswrangler.s3.read_csvでデータセットとして読み込めばpartition_cols
で指定した列も含めて読み込まれます。上のサンプルコードでは、列順が変わってしまうのとファイルにも残しておいた方が何かと良いので、パーティション用の列を追加し読込み後に削除する形としました。
恐らく、S3のフォルダのパーティション列と実際のCSVの列の名前が重複するとAWS Glueでうまく読み込めないので、partition_cols
で指定した列は実際のCSVファイルから除外されるのだと思います。