Snowflake × AWS でリアルタイムデータ基盤を作る — センシングデータ収集から可視化まで

Snowflake は優れたデータウェアハウスだが、リアルタイム性については正直に言うと苦手な部分がある。この記事では Snowflake のリアルタイム性を冷静に評価したうえで、AWS サービスで補完するアーキテクチャパターンと、センシングデータの収集・可視化の全体像を解説する。

Snowflake のリアルタイム性 — 正直な評価

Snowflake は バッチ処理・分析クエリ に最適化されたウェアハウスであり、リアルタイム性については以下の制約がある。

項目実態
最小ロード間隔Snowpipe で数十秒〜数分のラグが発生
クエリレイテンシコールドスタート時は数秒〜数十秒かかる
行ごとの UPSERTマイクロバッチでも INSERT が前提で高頻度更新は重い
ストリーミングSnowflake Streaming API(旧 Kafka Connector)でも秒単位のラグ

結論: Snowflake は「準リアルタイム(数分以内)」には対応できるが、「数百ミリ秒以内に表示」が必要なユースケースには向かない。

センシングデータ(IoT センサー・機器ログ等)のように 高頻度・低レイテンシ が求められる場合は、AWS のリアルタイム向けサービスと組み合わせるのが現実的な設計だ。

AWS でリアルタイム性を解決する 3 パターン

パターン 1: Kinesis Data Streams + Lambda(イベント駆動)

最もシンプルな構成。センサーデータをストリームに流し、Lambda で即時処理する。

センサー → Kinesis Data Streams → Lambda → DynamoDB / RDS
                                         ↓(バッチ)
                                    S3 → Snowflake(分析用)

向いているユースケース: アラート検知・異常値の即時通知・シンプルな集計

# Lambda でのストリーム処理例
import json
import boto3

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('sensor_realtime')

def lambda_handler(event, context):
    for record in event['Records']:
        payload = json.loads(record['kinesis']['data'])
        # リアルタイムストアに書き込み
        table.put_item(Item={
            'device_id': payload['device_id'],
            'timestamp': payload['timestamp'],
            'value': payload['value'],
        })

パターン 2: Amazon Timestream(時系列データ専用)

IoT・センシングデータに特化した時系列データベース。ミリ秒単位の書き込みと高速クエリに対応。

センサー → IoT Core → Timestream(リアルタイム参照)
                    ↓(エクスポート)
               S3 → Snowflake(長期分析)

向いているユースケース: 機器の稼働監視・温湿度ログ・時系列グラフの表示

import boto3
from datetime import datetime

client = boto3.client('timestream-write', region_name='ap-northeast-1')

def write_sensor_data(device_id, metric_name, value):
    current_time = str(int(datetime.now().timestamp() * 1000))
    client.write_records(
        DatabaseName='sensor_db',
        TableName='sensor_metrics',
        Records=[{
            'Dimensions': [{'Name': 'device_id', 'Value': device_id}],
            'MeasureName': metric_name,
            'MeasureValue': str(value),
            'MeasureValueType': 'DOUBLE',
            'Time': current_time,
            'TimeUnit': 'MILLISECONDS',
        }]
    )

パターン 3: RDS / Aurora(リレーショナルが必要な場合)

最新値や集計済みサマリーをリレーショナルDBに持ち、Snowflake は履歴分析専用にする構成。

センサー → Kinesis → Lambda → RDS Aurora(最新値・直近データ)
                             ↓(CDC または定期エクスポート)
                        Snowflake(過去データの分析・ML)

向いているユースケース: 既存の RDBMS 資産がある場合・JOIN が必要な複雑なクエリ

センシングデータの収集アーキテクチャ

エッジから Snowflake までの全体像を示す。

[ センサー / デバイス ]
        ↓ MQTT / HTTP
[ AWS IoT Core ]
        ↓ ルールエンジン
   ┌────┴────────────┐
   ↓                ↓
[ Kinesis ]      [ S3(生データ保存)]
   ↓                ↓
[ Lambda ]      [ Glue ETL ]
   ↓                ↓
[ Timestream ]  [ Snowflake ]
(リアルタイム)  (バッチ分析)

IoT Core のルール設定例

IoT Core のルールで Kinesis と S3 に同時に振り分けることで、リアルタイム用とバッチ用を分離する。

-- IoT Core ルールクエリ(SQL ライク)
SELECT
  topic(2) AS device_id,
  timestamp() AS ts,
  temperature,
  humidity
FROM 'sensors/+/data'
WHERE temperature IS NOT NULL

アクションは以下の 2 つを並列で設定する。

アクション送信先用途
Kinesis Data Streamssensor-streamリアルタイム処理
S3s3://sensor-raw/バッチ・Snowflake 取り込み

Snowflake への取り込み(Snowpipe)

S3 に保存されたデータは Snowpipe で自動的に Snowflake に取り込む。

-- ステージの作成
CREATE STAGE sensor_stage
  URL = 's3://sensor-raw/'
  CREDENTIALS = (AWS_ROLE = 'arn:aws:iam::123456789:role/snowflake-role');

-- パイプの作成
CREATE PIPE sensor_pipe AUTO_INGEST = TRUE AS
  COPY INTO sensor_raw
  FROM @sensor_stage
  FILE_FORMAT = (TYPE = 'JSON');

S3 バケットに SQS 通知を設定すると、ファイル到着から数十秒以内に自動取り込みが動く。

表示系の選択肢

Grafana(リアルタイム監視向け)

Timestream や Kinesis のデータを秒単位で更新するダッシュボード。Grafana の Timestream プラグインを使うと設定が簡単。

# docker-compose.yml(ローカル確認用)
version: '3'
services:
  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_AUTH_ANONYMOUS_ENABLED=true
    volumes:
      - grafana-data:/var/lib/grafana

Timestream データソースの接続設定:

{
  "type": "grafana-timestream-datasource",
  "access": "proxy",
  "jsonData": {
    "defaultRegion": "ap-northeast-1",
    "defaultDatabase": "sensor_db",
    "defaultTable": "sensor_metrics"
  }
}

Amazon QuickSight(ビジネス分析向け)

Snowflake を直接データソースとして接続し、経営層・非エンジニア向けのレポートを作る場合に適している。リアルタイム性より分析の深さと共有のしやすさが強み。

QuickSight ↔ Snowflake の接続設定:

データソース種別: Snowflake
サーバー: <account>.snowflakecomputing.com
データベース: SENSOR_DW
ウェアハウス: ANALYTICS_WH

カスタム UI(Next.js + API Gateway)

独自のダッシュボードが必要な場合は API Gateway → Lambda → Timestream/RDS の構成でリアルタイムデータを取得する。

// pages/api/sensor/[deviceId].ts
import { TimestreamQueryClient, QueryCommand } from '@aws-sdk/client-timestream-query';

const client = new TimestreamQueryClient({ region: 'ap-northeast-1' });

export default async function handler(req, res) {
  const { deviceId } = req.query;

  const command = new QueryCommand({
    QueryString: `
      SELECT time, measure_value::double AS value
      FROM "sensor_db"."sensor_metrics"
      WHERE device_id = '${deviceId}'
        AND measure_name = 'temperature'
        AND time > ago(1h)
      ORDER BY time DESC
      LIMIT 100
    `,
  });

  const result = await client.send(command);
  const rows = result.Rows?.map(row => ({
    time: row.Data?.[0].ScalarValue,
    value: parseFloat(row.Data?.[1].ScalarValue ?? '0'),
  }));

  res.json(rows);
}

この構成で Snowflake を使う利点

リアルタイム部分を AWS に任せることで、Snowflake は得意なことだけに集中できる。

Snowflake が担う役割具体例
長期履歴の保存・クエリ過去 1 年分のセンサーログ分析
複雑な集計・BI 連携デバイスごとの月次稼働率・異常傾向分析
ML 用の特徴量作成Snowpark で Python による前処理
データ共有Snowflake Data Sharing で他チームへ提供

逆に Snowflake に やらせないこと を明確にするのが設計のポイントだ。

  • 1秒以内のリアルタイム表示 → Timestream / DynamoDB
  • 高頻度の行更新(UPSERT) → RDS / DynamoDB
  • アラート・イベント駆動処理 → Lambda

役割分担まとめ

レイヤーサービス役割
収集AWS IoT Coreデバイスからのデータ受信(MQTT/HTTP)
ストリームKinesis Data Streamsリアルタイムデータの転送バッファ
リアルタイム処理Lambda異常検知・アラート・即時書き込み
リアルタイムストアTimestream / DynamoDB直近データの高速参照
生データ保管S3バッチ処理・再処理用の永続化
バッチ ETLGlueS3 → Snowflake の変換・ロード
分析ウェアハウスSnowflake長期履歴・複雑分析・BI 連携
リアルタイム可視化Grafana秒単位の監視ダッシュボード
ビジネス分析QuickSight非エンジニア向けレポート
カスタム UINext.js + API GW独自ダッシュボード

設計の原則: 「Snowflake がリアルタイムに弱い」のは欠点ではなく、専門特化の結果だ。AWS のリアルタイム向けサービスと役割分担することで、それぞれの強みを最大限に活かしたデータ基盤が構築できる。