おれさまラボ

実際に手を動かして理解を深めるブログ。

Amazon Athenaを使ってVPC FlowLogsを分析してみる

はじめに

AWS VPC内のネットワークフロー情報を保存する仕組みとしてVPC Flow Logsというものがあります。

VPC フローログは、VPC のネットワークインターフェイスとの間で行き来する IP トラフィックに関する情報をキャプチャできるようにする機能です。フローログデータは Amazon CloudWatch Logs と Amazon S3 に発行できます。フローログを作成すると、選択した送信先でそのデータを取得して表示できます。

出典:https://docs.aws.amazon.com/ja_jp/vpc/latest/userguide/flow-logs.html

VPC Flow Logsは2015年頃にGAされた機能のようです。最近ではログ保存タイミングを1分間隔まで縮めることができるようになり再び注目を集めています。

今回は、そんなVPC Flow LogsをAmazon Athenaで読み込んで、SQLクエリを使った解析を行ってみようと思います。

用意するもの

  • PC
  • ブラウザ
  • AWSアカウント

検証環境構築

以下のステップで検証環境の構築を行います。

  1. S3バケット作成
  2. VPC FlowLogsの有効化
  3. Athenaの設定

S3バケット作成

S3の管理コンソール画面からバケットを作成するという青いボタンを押下します。

任意の名前をつけます。ここでは以下のパラメーターを入力しました。

  • バケット名:test-vpc-flowlogs-20200224
  • リージョン:アジアパシフィック(東京)
  • 既存のバケットから設定をコピー:N/A

入力したら作成ボタンを押下します。

S3バケットが作成されたことを確認します。

ここでバケットARNをコピーするを押下してバケットARNをクリップボードにコピーしておきましょう。

VPC FlowLogsの有効化

VPCダッシュボードに移動して、VPC FlowLogsを有効にしたいVPCを選択します。

下ペインのフローログタブを選択し、フローログの作成という青いボタンを押下します。

フローログの設定を行います。

  • リソース:対象のVPCであることを確認しましょう。
  • フィルタ:許可/拒否/すべての3種類から選べます。今回はすべてを選択しました。
  • Mximum aggregation interval:ログ取得間隔時間を指定します。今回は1 minuteとしました。
  • 送信先:今回はS3バケットへの送信を選択します。
  • S3バケットARN:S3の作成で作ったS3バケットのARNを入力します。
  • Log record format:AWS default formatを選択します。Custom formatにするとログフォーマットを自分好みに変えることができます。

パラメーターの入力が終わったら作成ボタンを押下します。

次のフローログが作成されました。と表示されれば成功です。

VPCダッシュボードに戻ると下ペインのフローログタブにフローログが作成されています。

しばらくおいてからS3バケットを見ると、フローログが作成されています。

フォルダ階層は{バケット名}/AWSLogs/{アカウント番号}/vpcflowlogs/ap-northeast-1/{年(西暦)}/{月}/{日}となります。

Athenaの設定

つづいて、取得したVPC FlowLogsに対してSQLクエリを発行できるよう、Athenaの設定をします。

ちなみに、AthenaはAWSマネージドのクエリエンジンです。

https://aws.amazon.com/jp/athena/

Amazon Athena はインタラクティブなクエリサービスで、Amazon S3 内のデータを標準 SQL を使用して簡単に分析できます。Athena はサーバーレスなので、インフラストラクチャの管理は不要です。実行したクエリに対してのみ料金が発生します。

サービスからAthenaを選択し、Get Startedという青いボタンを押下します。

データベースの作成

右ペインのNew query 1タブにクエリを入力し、データベースを作成します。データベース名は任意です。

CREATE DATABASE test_vpc_flowlogs

クエリを入力したら、Run queryという青いボタンを押下します。

テーブルの作成

左ペインのプルダウンメニューから先程作成したデータベース(test_vpc_flowlogs)を選択し、New query 1タブ(+ボタンから違うタブを生成してもOK)にクエリを入力します。

{AWS ID}にはご自身のAWSアカウントIDを入力してください。

CREATE EXTERNAL TABLE IF NOT EXISTS test_vpc_flowlogs.test_table_vpc_flowlogs (
 version int,
 account string,
 interfaceid string,
 sourceaddress string,
 destinationaddress string,
 sourceport int,
 destinationport int,
 protocol int,
 numpackets int,
 numbytes bigint,
 starttime int,
 endtime int,
 action string,
 logstatus string
) 
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ' '
LOCATION 's3://test-vpc-flowlogs-20200224/AWSLogs/{AWS ID}/vpcflowlogs/ap-northeast-1/'
TBLPROPERTIES ("skip.header.line.count"="1");

クエリを入力したら、Run queryという青いボタンを押下します。

Result画面にQuery successfulと表示されれば成功です。

SQLクエリの発行

この状態でSELECTクエリを発行してみます。

SELECT * FROM test_vpc_flowlogs.test_table_vpc_flowlogs limit 10;

するとZero records returned.と結果が表示されました。これはパーティションが適切ではないために示されます。パーティションは、クエリでスキャンするデータ量を制限するために有効です。

パーティション設定を変更するにはALTER TABLEクエリを発行します。

{AWS ID}にはご自身のAWSアカウントIDを、日付も任意の値を入力してください。

ALTER TABLE test_vpc_flowlogs.test_table_vpc_flowlogs
ADD PARTITION (dt='2020/02/24')
location 's3://test-vpc-flowlogs-20200224/AWSLogs/{AWS ID}/vpcflowlogs/ap-northeast-1/2020/02';

Query successfulとResultに表示されれば成功です。

この状態でもう一度SELECTクエリを発行してみます。

SELECT * FROM test_vpc_flowlogs.test_table_vpc_flowlogs limit 10;

すると、今度はデータを表示することができました。

ユースケース

ユースケースに沿っていくつかSQLクエリを発行してみたいと思います。データベースは素人なので変なクエリ打ってるかもしれません。あくまで参考まで。

ログ数をカウントしたい

count()関数を使います。

SELECT count(*) FROM test_table_vpc_flowlogs;

5847行のログがあるようです。

Denyされている通信を可視化したい

WHERE句でaction列がREJECTであるものを指定します。

SELECT *
FROM test_table_vpc_flowlogs
WHERE action = 'REJECT' LIMIT 10;

REJECTのログだけ抽出することができました。

逆にPermitされたものを指定したい場合はACCEPTを指定すればOKです。

特定のNICの通信を可視化したい

以下のようなSQL文を発行します。

SELECT DISTINCT interfaceid,
         protocol,
         sourceaddress,
         sourceport,
         destinationaddress,
         destinationport
FROM test_table_vpc_flowlogs
WHERE interfaceid = 'eni-xxxxxxxxxxxxxxxxx'
        AND action = 'ACCEPT'
ORDER BY  sourceaddress, sourceport, destinationaddress, destinationport;

たとえばポート番号の10000未満だけ表示したければWHERE句で<演算子を使います。

SELECT DISTINCT interfaceid,
         protocol,
         sourceaddress,
         sourceport,
         destinationaddress,
         destinationport
FROM test_table_vpc_flowlogs
WHERE interfaceid = 'eni-xxxxxxxxxxxxxxxxx'
        AND action = 'ACCEPT'
        AND sourceport < 10000
        AND destinationport < 10000
ORDER BY  sourceaddress, sourceport, destinationaddress, destinationport;

エフェメラルポート(一時ポート)を排除したい場合はNOT BETWEENを使って排除します。

SELECT DISTINCT interfaceid,
         protocol,
         sourceaddress,
         sourceport,
         destinationaddress,
         destinationport
FROM test_table_vpc_flowlogs
WHERE interfaceid = 'eni-xxxxxxxxxxxxxxxxx'
        AND action = 'ACCEPT'
        AND sourceport NOT BETWEEN 32768 AND 61000
        AND destinationport NOT BETWEEN 32768 AND 61000
ORDER BY  sourceaddress, sourceport, destinationaddress, destinationport;

エフェメラルポートは以下表を参照してください。

種別 FROM TO
RFC 6056 1024 65535
IANA 49152 65535
FreeBSD 49152 65535
Linuxカーネル 32768 61000
Windows XP 1025 5000
Windows Server 2003 1025 5000
Windows Vista以降 49152 65535
Windows Server 2008以降 49152 65535

こんな感じで結果が返ってきます。

豆知識

知っておくと便利な情報を書いておきます。

結果を保存する

クエリを発行した結果はCSVでダウンロードできます。SQLに慣れていない場合は、ある程度の情報だけ抽出してCSVをダウンロード、エクセルでゴリゴリみたいなこともできますね。

ショートカットキー

Run queryCtrl + Enterでも実行できます。

料金

最後に料金について。

半日遊びましたが、こんなもんです。

Athenaはコストパフォーマンスに大変すぐれています。個人で検証する分にはほぼお金がかからないと言えるでしょう。

スキャンされたデータ 1 TB あたり 5.00USD

参考:https://aws.amazon.com/jp/athena/pricing/

インフラを何も用意する必要がないのに気軽にSQLを触れる環境は本当に楽ちんだな~と思いました。

以上