Ryota Kondo

Ryota Kondo

2024/01/04

AWS SDK for pandas|やりたいことから引く!逆引きS3関数の使い方まとめ

PythonライブラリのAWS SDK for pandas (awswrangler)は、AWSのデータやデータ分析サービスとpandasのDataframeとのやり取りを容易にしますが、それだけではなく、各AWSサービスへの操作もBoto3に比べてシンプルに実装できるようになっています。

今回はこのAWS SDK for pandasのS3関数について、やりたいことから引く逆引き形式でまとめてみました。

下の目次をクリックすると、説明に素早くジャンプできます。

目次

オブジェクト操作

オブジェクトをコピーしたい

awswrangler.s3.copy_objects()を使う。

サンプルコード
import awswrangler as wr

# sample01.csvとsample02.csvをコピー
wr.s3.copy_objects(
    paths=[
        "s3://<バケット名>/<コピー元フォルダ>/sample01.csv",
        "s3://<バケット名>/<コピー元フォルダ>/sample02.csv"
    ],
    source_path="s3://<バケット名>/<コピー元フォルダ>/",
    target_path="s3://<バケット名>/<コピー先フォルダ>/"
)

オブジェクトを削除したい

awswrangler.s3.delete_objects()を使う。

対象を指定する引数pathにはS3 URIのプレフィックス(Unix のシェル形式のワイルドカードに対応)、もしくはS3 URIのリストを指定可能。

対象とするオブジェクトの最終更新日時(LastModified)を引数で範囲指定可能。

サンプルコード
import awswrangler as wr

# sample01.csvとsample02.csvを削除
wr.s3.delete_objects([
    "s3://<バケット名>/sample01.csv", 
    "s3://<バケット名>/sample02.csv"
])

# folder01配下のオブジェクトを削除
wr.s3.delete_objects("s3://<バケット名>/folder01/")

データセットをマージしたい

awswrangler.s3.merge_datasets()を使う。

サンプルコード
import awswrangler as wr

# 追記モードでデータセットをマージ
wr.s3.merge_datasets(
    source_path="s3://<バケット名>/<マージ元フォルダ>/",
    target_path="s3://<バケット名>/<マージ先フォルダ>/",
    mode="append"
)

オブジェクト情報取得

オブジェクトのメタデータを取得したい

awswrangler.s3.describe_objects()を使う。

対象を指定する引数pathにはS3 URIのプレフィックス(Unix のシェル形式のワイルドカードに対応)、もしくはS3 URIのリストを指定可能。

対象とするオブジェクトの最終更新日時(LastModified)を引数で範囲指定可能。

サンプルコード
import awswrangler as wr

# sample01.csvとsample02.csvのメタデータを取得
metadatas = wr.s3.describe_objects([
    "s3://<バケット名>/sample01.csv", 
    "s3://<バケット名>/sample02.csv"
])

# folder01配下のオブジェクトのメタデータを取得
metadatas = wr.s3.describe_objects("s3://<バケット名>/folder01/")

オブジェクトが存在しているか判定したい

awswrangler.s3.does_object_exist()を使う。

サンプルコード
import awswrangler as wr

# sample01.csvが存在しているか判定
if wr.s3.does_object_exist("s3://<バケット名>/sample01.csv"):
    print("sample01.csvあり")
else:
    print("sample01.csvなし")

バケットがあるリージョンのコードを取得したい

awswrangler.s3.get_bucket_region()を使う。

サンプルコード
import awswrangler as wr

# バケットがあるリージョンのコードを取得
region = wr.s3.get_bucket_region("<バケット名>")

バケットの一覧を取得したい

awswrangler.s3.list_buckets()を使う。

サンプルコード
import awswrangler as wr

# バケットの一覧を取得
buckets = wr.s3.list_buckets()

フォルダの一覧を取得したい

awswrangler.s3.list_directories()を使う。

対象を指定する引数pathにはS3 URIのプレフィックスを指定可能。

サンプルコード
import awswrangler as wr

# folder01配下のフォルダ一覧を取得
folders = wr.s3.list_directories("s3://<バケット名>/folder01/")

オブジェクトの一覧を取得したい

awswrangler.s3.list_objects()を使う。

対象を指定する引数pathにはS3 URIのプレフィックスを指定可能。

対象とする、もしくは対象外とするオブジェクトのサフィックスを引数で指定可能。

対象とするオブジェクトの最終更新日時(LastModified)を引数で範囲指定可能。

boto3のS3.Client.list_objects()と異なり、オブジェクト数が1000件を超えても取得可能。

サンプルコード
import awswrangler as wr

# folder01配下のオブジェクト一覧を取得
objects = wr.s3.list_objects("s3://<バケット名>/folder01/")

オブジェクトのサイズを取得したい

awswrangler.s3.size_objects()を使う。

対象を指定する引数pathにはS3 URIのプレフィックス(Unix のシェル形式のワイルドカードに対応)、もしくはS3 URIのリストを指定可能。

サンプルコード
import awswrangler as wr

# sample01.csvとsample02.csvのサイズを取得
object_sizes = wr.s3.size_objects([
    "s3://<バケット名>/sample01.csv", 
    "s3://<バケット名>/sample02.csv"
])

# folder01配下のオブジェクトのサイズを取得
object_sizes = wr.s3.size_objects("s3://<バケット名>/folder01/")

ダウンロード・アップロード

オブジェクトをローカルにダウンロードしたい

awswrangler.s3.download()を使う。

サンプルコード
import awswrangler as wr

# sample01.csvをローカルにダウンロード
wr.s3.download(
    path="s3://<バケット名>/sample01.csv",
    local_file="./sample01.csv"
)

ローカルファイルをS3にアップロードしたい

awswrangler.s3.upload()を使う。

サンプルコード
import awswrangler as wr

# ローカルのsample01.csvをS3にアップロード
wr.s3.upload(
    local_file="./sample01.csv",
    path="s3://<バケット名>/sample01.csv"
)

待機

オブジェクトができるまで待機したい

awswrangler.s3.wait_objects_exist()を使う。

状態チェックを行う間隔と最大試行回数を引数で設定可能。デフォルトは5秒ごとに最大20回試行する。

サンプルコード
import awswrangler as wr

# sample01.csvとsample02.csvができるまで待機
wr.s3.wait_objects_exist([
    "s3://<バケット名>/sample01.csv", 
    "s3://<バケット名>/sample02.csv"
])

オブジェクトが無くなるまで待機したい

awswrangler.s3.wait_objects_not_exist()を使う。

状態チェックを行う間隔と最大試行回数を引数で設定可能。デフォルトは5秒ごとに最大20回試行する。

サンプルコード
import awswrangler as wr

# sample01.csvとsample02.csvが無くなるまで待機
wr.s3.wait_objects_not_exist([
    "s3://<バケット名>/sample01.csv", 
    "s3://<バケット名>/sample02.csv"
])

CSVファイル

CSVファイルをDataFrameとして読み込みたい

awswrangler.s3.read_csv()を使う。

対象を指定する引数pathにはS3 URIのプレフィックス(Unix のシェル形式のワイルドカードに対応)、もしくはS3 URIのリストを指定可能。

対象とする、もしくは対象外とするオブジェクトのサフィックスを引数で指定可能。

対象とするオブジェクトの最終更新日時(LastModified)を引数で範囲指定可能。

サンプルコード
import awswrangler as wr

# sample01.csvをDataFrameとして読み込み
df = wr.s3.read_csv("s3://<バケット名>/sample01.csv")

DataFrameをCSVファイルとして出力したい

awswrangler.s3.to_csv()を使う。

サンプルコード
import awswrangler as wr

# DataFrameをCSVファイルとして出力
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]})
wr.s3.to_csv(df, path="s3://<バケット名>/sample01.csv")

CSVファイルをSQL文でフィルターした内容でDataFrameとして読み込みたい

awswrangler.s3.select_query()を使う。

対象を指定する引数pathにはS3 URIのプレフィックス(Unix のシェル形式のワイルドカードに対応)、もしくはS3 URIのリストを指定可能。

対象とする、もしくは対象外とするオブジェクトのサフィックスを引数で指定可能。

対象とするオブジェクトの最終更新日時(LastModified)を引数で範囲指定可能。

サンプルコード
import awswrangler as wr

# sample01.csvをSQL文でフィルターした内容でDataFrameとして読み込み
df = wr.s3.select_query(
    sql="SELECT * FROM s3object",
    path="s3://<バケット名>/sample01.csv",
    input_serialization="CSV",
    input_serialization_params={
        "FileHeaderInfo": "Use",
    },
)

EXCELファイル

EXCELファイルをDataFrameとして読み込みたい

awswrangler.s3.read_excel()を使う。

サンプルコード
import awswrangler as wr

# sample01.xlsxをDataFrameとして読み込み
df = wr.s3.read_excel("s3://<バケット名>/sample01.xlsx")

DataFrameをEXCELファイルとして出力したい

awswrangler.s3.to_excel()を使う。

サンプルコード
import awswrangler as wr

# DataFrameをEXCELファイルとして出力
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]})
wr.s3.to_excel(df, path="s3://<バケット名>/sample01.xlsx")

固定長ファイル

固定長ファイルをDataFrameとして読み込みたい

awswrangler.s3.read_fwf()を使う。

対象を指定する引数pathにはS3 URIのプレフィックス(Unix のシェル形式のワイルドカードに対応)、もしくはS3 URIのリストを指定可能。

対象とする、もしくは対象外とするオブジェクトのサフィックスを引数で指定可能。

対象とするオブジェクトの最終更新日時(LastModified)を引数で範囲指定可能。

サンプルコード
import awswrangler as wr

# sample01.txtをDataFrameとして読み込み
df = wr.s3.read_fwf(
    path="s3://<バケット名>/sample01.txt",
    widths=[1, 3],
    names=["col1", "col2"]
)

JSONファイル

JSONファイルをDataFrameとして読み込みたい

awswrangler.s3.read_json()を使う。

対象を指定する引数pathにはS3 URIのプレフィックス(Unix のシェル形式のワイルドカードに対応)、もしくはS3 URIのリストを指定可能。

対象とする、もしくは対象外とするオブジェクトのサフィックスを引数で指定可能。

対象とするオブジェクトの最終更新日時(LastModified)を引数で範囲指定可能。

サンプルコード
import awswrangler as wr

# sample01.jsonをDataFrameとして読み込み
df = wr.s3.read_json("s3://<バケット名>/sample01.json")

DataFrameをJSONファイルとして出力したい

awswrangler.s3.to_json()を使う。

サンプルコード
import awswrangler as wr

# DataFrameをJSONファイルとして出力
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]})
wr.s3.to_json(df, path="s3://<バケット名>/sample01.json")

JSONファイルをSQL文でフィルターした内容でDataFrameとして読み込みたい

awswrangler.s3.select_query()を使う。

対象を指定する引数pathにはS3 URIのプレフィックス(Unix のシェル形式のワイルドカードに対応)、もしくはS3 URIのリストを指定可能。

対象とする、もしくは対象外とするオブジェクトのサフィックスを引数で指定可能。

対象とするオブジェクトの最終更新日時(LastModified)を引数で範囲指定可能。

サンプルコード
import awswrangler as wr

# sample01.jsonをSQL文でフィルターした内容でDataFrameとして読み込み
df = wr.s3.select_query(
    sql="SELECT * FROM s3object",
    path="s3://<バケット名>/sample01.json",
    input_serialization="JSON",
    input_serialization_params={
        "Type": "Document",
    },
)

Parquetファイル

ParquetファイルをDataFrameとして読み込みたい

awswrangler.s3.read_parquet()を使う。

対象を指定する引数pathにはS3 URIのプレフィックス(Unix のシェル形式のワイルドカードに対応)、もしくはS3 URIのリストを指定可能。

対象とする、もしくは対象外とするオブジェクトのサフィックスを引数で指定可能。

対象とするオブジェクトの最終更新日時(LastModified)を引数で範囲指定可能。

サンプルコード
import awswrangler as wr

# sample01.parquetをDataFrameとして読み込み
df = wr.s3.read_parquet("s3://<バケット名>/sample01.parquet")

DataFrameをParquetファイルとして出力したい

awswrangler.s3.to_parquet()を使う。

サンプルコード
import awswrangler as wr

# DataFrameをParquetファイルとして出力
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]})
wr.s3.to_parquet(df, path="s3://<バケット名>/sample01.parquet")

ParquetファイルをSQL文でフィルターした内容でDataFrameとして読み込みたい

awswrangler.s3.select_query()を使う。

対象を指定する引数pathにはS3 URIのプレフィックス(Unix のシェル形式のワイルドカードに対応)、もしくはS3 URIのリストを指定可能。

対象とする、もしくは対象外とするオブジェクトのサフィックスを引数で指定可能。

対象とするオブジェクトの最終更新日時(LastModified)を引数で範囲指定可能。

サンプルコード
import awswrangler as wr

# sample01.parquetをSQL文でフィルターした内容でDataFrameとして読み込み
df = wr.s3.select_query(
    sql="SELECT * FROM s3object",
    path="s3://<バケット名>/sample01.parquet",
    input_serialization="Parquet",
)

Parquetファイルのメタデータを取得したい

awswrangler.s3.read_parquet_metadata()を使う。

対象を指定する引数pathにはS3 URIのプレフィックス(Unix のシェル形式のワイルドカードに対応)、もしくはS3 URIのリストを指定可能。

対象とする、もしくは対象外とするオブジェクトのサフィックスを引数で指定可能。

サンプルコード
import awswrangler as wr

# sample01.parquetのメタデータを取得
columns_types, partitions_types = wr.s3.read_parquet_metadata(
    "s3://<バケット名>/sample01.parquet"
)

AWS Glue Data Catalogに登録されているParquetテーブルをDataFrameとして読み込みたい

awswrangler.s3.read_parquet_table()を使う。

対象とする、もしくは対象外とするオブジェクトのサフィックスを引数で指定可能。

サンプルコード
import awswrangler as wr

# ParquetテーブルををDataFrameとして読み込み
df = wr.s3.read_parquet_table(database="<データベース>", table="<テーブル>")

ParquetファイルをAWS Glue Data Catalogに登録したい

awswrangler.s3.store_parquet_metadata()を使う。

対象を指定する引数pathにはS3 URIのプレフィックス(Unix のシェル形式のワイルドカードに対応)を指定可能。

対象とする、もしくは対象外とするオブジェクトのサフィックスを引数で指定可能。

サンプルコード
import awswrangler as wr

# sample01.parquetをAWS Glue Data Catalogに登録
columns_types, partitions_types, partitions_values = wr.s3.store_parquet_metadata(
    path="s3://<バケット名>/sample01.parquet",
    database="<データベース>",
    table="<テーブル>",
)

ORCファイル

ORCファイルをDataFrameとして読み込みたい

awswrangler.s3.read_orc()を使う。

対象を指定する引数pathにはS3 URIのプレフィックス(Unix のシェル形式のワイルドカードに対応)、もしくはS3 URIのリストを指定可能。

対象とする、もしくは対象外とするオブジェクトのサフィックスを引数で指定可能。

対象とするオブジェクトの最終更新日時(LastModified)を引数で範囲指定可能。

サンプルコード
import awswrangler as wr

# sample01.orcをDataFrameとして読み込み
df = wr.s3.read_orc("s3://<バケット名>/sample01.orc")

DataFrameをORCファイルとして出力したい

awswrangler.s3.to_orc()を使う。

サンプルコード
import awswrangler as wr

# DataFrameをORCファイルとして出力
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]})
wr.s3.to_orc(df, path="s3://<バケット名>/sample01.orc")

ORCファイルのメタデータを取得したい

awswrangler.s3.read_orc_metadata()を使う。

対象を指定する引数pathにはS3 URIのプレフィックス(Unix のシェル形式のワイルドカードに対応)、もしくはS3 URIのリストを指定可能。

対象とする、もしくは対象外とするオブジェクトのサフィックスを引数で指定可能。

サンプルコード
import awswrangler as wr

# sample01.orcのメタデータを取得
columns_types, partitions_types = wr.s3.read_orc_metadata(
    "s3://<バケット名>/sample01.orc"
)

AWS Glue Data Catalogに登録されているORCテーブルをDataFrameとして読み込みたい

awswrangler.s3.read_orc_table()を使う。

対象とする、もしくは対象外とするオブジェクトのサフィックスを引数で指定可能。

サンプルコード
import awswrangler as wr

# ORCテーブルををDataFrameとして読み込み
df = wr.s3.read_orc_table(database="<データベース>", table="<テーブル>")

Delta Lakeテーブル

Delta LakeテーブルをDataFrameとして読み込みたい

awswrangler.s3.read_deltalake()を使う。

サンプルコード
import awswrangler as wr

# Delta LakeテーブルをDataFrameとして読み込み
df = wr.s3.read_deltalake("s3://<バケット名>/<テーブルまでのパス>")

参考

この記事は以下の情報を参考にしました。

関連タグの記事

Ryota Kondo
Ryota Kondo

システムエンジニア・プログラマー|このブログサイトの運営もしており、思いついたことをまとめて記事を書いています💡|Twitterのフォローはお気軽に