Snowflake × AWS でリアルタイムデータ基盤を作る — センシングデータ収集から可視化まで
Snowflake は優れたデータウェアハウスだが、リアルタイム性については正直に言うと苦手な部分がある。この記事では Snowflake のリアルタイム性を冷静に評価したうえで、AWS サービスで補完するアーキテクチャパターンと、センシングデータの収集・可視化の全体像を解説する。
Snowflake のリアルタイム性 — 正直な評価
Snowflake は バッチ処理・分析クエリ に最適化されたウェアハウスであり、リアルタイム性については以下の制約がある。
| 項目 | 実態 |
|---|---|
| 最小ロード間隔 | Snowpipe で数十秒〜数分のラグが発生 |
| クエリレイテンシ | コールドスタート時は数秒〜数十秒かかる |
| 行ごとの UPSERT | マイクロバッチでも INSERT が前提で高頻度更新は重い |
| ストリーミング | Snowflake Streaming API(旧 Kafka Connector)でも秒単位のラグ |
結論: Snowflake は「準リアルタイム(数分以内)」には対応できるが、「数百ミリ秒以内に表示」が必要なユースケースには向かない。
センシングデータ(IoT センサー・機器ログ等)のように 高頻度・低レイテンシ が求められる場合は、AWS のリアルタイム向けサービスと組み合わせるのが現実的な設計だ。
AWS でリアルタイム性を解決する 3 パターン
パターン 1: Kinesis Data Streams + Lambda(イベント駆動)
最もシンプルな構成。センサーデータをストリームに流し、Lambda で即時処理する。
センサー → Kinesis Data Streams → Lambda → DynamoDB / RDS
↓(バッチ)
S3 → Snowflake(分析用)
向いているユースケース: アラート検知・異常値の即時通知・シンプルな集計
# Lambda でのストリーム処理例
import json
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('sensor_realtime')
def lambda_handler(event, context):
for record in event['Records']:
payload = json.loads(record['kinesis']['data'])
# リアルタイムストアに書き込み
table.put_item(Item={
'device_id': payload['device_id'],
'timestamp': payload['timestamp'],
'value': payload['value'],
})
パターン 2: Amazon Timestream(時系列データ専用)
IoT・センシングデータに特化した時系列データベース。ミリ秒単位の書き込みと高速クエリに対応。
センサー → IoT Core → Timestream(リアルタイム参照)
↓(エクスポート)
S3 → Snowflake(長期分析)
向いているユースケース: 機器の稼働監視・温湿度ログ・時系列グラフの表示
import boto3
from datetime import datetime
client = boto3.client('timestream-write', region_name='ap-northeast-1')
def write_sensor_data(device_id, metric_name, value):
current_time = str(int(datetime.now().timestamp() * 1000))
client.write_records(
DatabaseName='sensor_db',
TableName='sensor_metrics',
Records=[{
'Dimensions': [{'Name': 'device_id', 'Value': device_id}],
'MeasureName': metric_name,
'MeasureValue': str(value),
'MeasureValueType': 'DOUBLE',
'Time': current_time,
'TimeUnit': 'MILLISECONDS',
}]
)
パターン 3: RDS / Aurora(リレーショナルが必要な場合)
最新値や集計済みサマリーをリレーショナルDBに持ち、Snowflake は履歴分析専用にする構成。
センサー → Kinesis → Lambda → RDS Aurora(最新値・直近データ)
↓(CDC または定期エクスポート)
Snowflake(過去データの分析・ML)
向いているユースケース: 既存の RDBMS 資産がある場合・JOIN が必要な複雑なクエリ
センシングデータの収集アーキテクチャ
エッジから Snowflake までの全体像を示す。
[ センサー / デバイス ]
↓ MQTT / HTTP
[ AWS IoT Core ]
↓ ルールエンジン
┌────┴────────────┐
↓ ↓
[ Kinesis ] [ S3(生データ保存)]
↓ ↓
[ Lambda ] [ Glue ETL ]
↓ ↓
[ Timestream ] [ Snowflake ]
(リアルタイム) (バッチ分析)
IoT Core のルール設定例
IoT Core のルールで Kinesis と S3 に同時に振り分けることで、リアルタイム用とバッチ用を分離する。
-- IoT Core ルールクエリ(SQL ライク)
SELECT
topic(2) AS device_id,
timestamp() AS ts,
temperature,
humidity
FROM 'sensors/+/data'
WHERE temperature IS NOT NULL
アクションは以下の 2 つを並列で設定する。
| アクション | 送信先 | 用途 |
|---|---|---|
| Kinesis Data Streams | sensor-stream | リアルタイム処理 |
| S3 | s3://sensor-raw/ | バッチ・Snowflake 取り込み |
Snowflake への取り込み(Snowpipe)
S3 に保存されたデータは Snowpipe で自動的に Snowflake に取り込む。
-- ステージの作成
CREATE STAGE sensor_stage
URL = 's3://sensor-raw/'
CREDENTIALS = (AWS_ROLE = 'arn:aws:iam::123456789:role/snowflake-role');
-- パイプの作成
CREATE PIPE sensor_pipe AUTO_INGEST = TRUE AS
COPY INTO sensor_raw
FROM @sensor_stage
FILE_FORMAT = (TYPE = 'JSON');
S3 バケットに SQS 通知を設定すると、ファイル到着から数十秒以内に自動取り込みが動く。
表示系の選択肢
Grafana(リアルタイム監視向け)
Timestream や Kinesis のデータを秒単位で更新するダッシュボード。Grafana の Timestream プラグインを使うと設定が簡単。
# docker-compose.yml(ローカル確認用)
version: '3'
services:
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_AUTH_ANONYMOUS_ENABLED=true
volumes:
- grafana-data:/var/lib/grafana
Timestream データソースの接続設定:
{
"type": "grafana-timestream-datasource",
"access": "proxy",
"jsonData": {
"defaultRegion": "ap-northeast-1",
"defaultDatabase": "sensor_db",
"defaultTable": "sensor_metrics"
}
}
Amazon QuickSight(ビジネス分析向け)
Snowflake を直接データソースとして接続し、経営層・非エンジニア向けのレポートを作る場合に適している。リアルタイム性より分析の深さと共有のしやすさが強み。
QuickSight ↔ Snowflake の接続設定:
データソース種別: Snowflake
サーバー: <account>.snowflakecomputing.com
データベース: SENSOR_DW
ウェアハウス: ANALYTICS_WH
カスタム UI(Next.js + API Gateway)
独自のダッシュボードが必要な場合は API Gateway → Lambda → Timestream/RDS の構成でリアルタイムデータを取得する。
// pages/api/sensor/[deviceId].ts
import { TimestreamQueryClient, QueryCommand } from '@aws-sdk/client-timestream-query';
const client = new TimestreamQueryClient({ region: 'ap-northeast-1' });
export default async function handler(req, res) {
const { deviceId } = req.query;
const command = new QueryCommand({
QueryString: `
SELECT time, measure_value::double AS value
FROM "sensor_db"."sensor_metrics"
WHERE device_id = '${deviceId}'
AND measure_name = 'temperature'
AND time > ago(1h)
ORDER BY time DESC
LIMIT 100
`,
});
const result = await client.send(command);
const rows = result.Rows?.map(row => ({
time: row.Data?.[0].ScalarValue,
value: parseFloat(row.Data?.[1].ScalarValue ?? '0'),
}));
res.json(rows);
}
この構成で Snowflake を使う利点
リアルタイム部分を AWS に任せることで、Snowflake は得意なことだけに集中できる。
| Snowflake が担う役割 | 具体例 |
|---|---|
| 長期履歴の保存・クエリ | 過去 1 年分のセンサーログ分析 |
| 複雑な集計・BI 連携 | デバイスごとの月次稼働率・異常傾向分析 |
| ML 用の特徴量作成 | Snowpark で Python による前処理 |
| データ共有 | Snowflake Data Sharing で他チームへ提供 |
逆に Snowflake に やらせないこと を明確にするのが設計のポイントだ。
- 1秒以内のリアルタイム表示 → Timestream / DynamoDB
- 高頻度の行更新(UPSERT) → RDS / DynamoDB
- アラート・イベント駆動処理 → Lambda
役割分担まとめ
| レイヤー | サービス | 役割 |
|---|---|---|
| 収集 | AWS IoT Core | デバイスからのデータ受信(MQTT/HTTP) |
| ストリーム | Kinesis Data Streams | リアルタイムデータの転送バッファ |
| リアルタイム処理 | Lambda | 異常検知・アラート・即時書き込み |
| リアルタイムストア | Timestream / DynamoDB | 直近データの高速参照 |
| 生データ保管 | S3 | バッチ処理・再処理用の永続化 |
| バッチ ETL | Glue | S3 → Snowflake の変換・ロード |
| 分析ウェアハウス | Snowflake | 長期履歴・複雑分析・BI 連携 |
| リアルタイム可視化 | Grafana | 秒単位の監視ダッシュボード |
| ビジネス分析 | QuickSight | 非エンジニア向けレポート |
| カスタム UI | Next.js + API GW | 独自ダッシュボード |
設計の原則: 「Snowflake がリアルタイムに弱い」のは欠点ではなく、専門特化の結果だ。AWS のリアルタイム向けサービスと役割分担することで、それぞれの強みを最大限に活かしたデータ基盤が構築できる。