SQSを使って定期的にAPIをコールする仕組みを作ってみる

今回作る仕組みの概要
f:id:yustam:20120924182041p:image

  1. SQSにあらかじめメッセージを1つ登録しておきCloudWatchがそれを監視する
  2. 取得可能なメッセージ(Available)が閾値(1個)を超えるとAlarmを発生
  3. SNSを通してEC2インスタンスへHTTPリクエストを送信
  4. 受信したAPIでSQSのメッセージを取得しステータスを変更
  5. 一定時間経つとメッセージのステータスが戻り2に戻る
  6. SQSのメッセージを削除すると止まる

APIの実装

{
    "Type": "SubscriptionConfirmation",
    "MessageId": "aad797f5-0d16-47c8-95a0-8dbe40410640",
    "Token": "…",
    "TopicArn": "arn:aws:sns:ap-northeast-1:123456789012:topicName",
    "Message": "You have chosen to subscribe to the topic arn:aws:sns:ap-northeast-1:123456789012:topicName.\n
        To confirm the subscription, visit the SubscribeURL included in this message.",
    "SubscribeURL": "https://sns.ap-northeast-1.amazonaws.com/
        ?Action=ConfirmSubscription&TopicArn=arn:aws:sns:ap-northeast-1:123456789012:topicName&Token=…",
    "Timestamp": "2012-09-24T09:13:29.588Z",
    "SignatureVersion": "1",
    "Signature": "…",
    "SigningCertURL": "https://sns.ap-northeast-1.amazonaws.com/SimpleNotificationService-….pem"
}

SNSの宛先に登録すると上のようなJSON文字列をBodyに持つPOSTリクエストが飛んでくるので
上のJSONを受信したらデータ内の「SubscribeURL」をGETするロジックを書いておく
簡単に書くと下のような感じ。「request」はHttpServletRequestを使用しています。

String requestLine = IOUtils.toString(request.getInputStream());
Map<String, String> requestBody = (Map<String, String>) JSON.parse(requestLine);
// SNSの承認がまだの場合は「SubscribeURL」を持つ
if (requestBody.get("SubscribeURL") != null) {
	try {
		// SNSの承認を行う
		URL url = new URL(requestBody.get("SubscribeURL"));
		HttpURLConnection conn = (HttpURLConnection) url.openConnection();
		conn.connect();
		if (conn.getResponseCode() == HttpStatus.SC_OK) {
			/* 成功 */
		} else {
			/* 失敗 */
		}
		conn.disconnect();
	} catch (Exception e) {
		e.printStackTrace();
	}
} else {
	/** 承認済みの場合はこちら */
	String accessKey = "【AWSアクセスキー】";
	String secretAccessKey = "【AWSシークレットキー】";
	AWSCredentials cre = new BasicAWSCredentials(accessKey, secretAccessKey);
	AmazonSQSClient client = new AmazonSQSClient(cre);
	client.setEndpoint("sqs.ap-northeast-1.amazonaws.com");
	ListQueuesResult queue = client.listQueues(new ListQueuesRequest()
		.withQueueNamePrefix("【SQSキュー名】"));
	// SQSキューのURLを取得
	String queueUrl = queue.getQueueUrls().get(0);
	// SQSメッセージを受信
	ReceiveMessageResult result = client.receiveMessage(new ReceiveMessageRequest()
		.withQueueUrl(queueUrl));
	/**
	 * 以下SNSのメッセージを受信したタイミングで実行したい処理を書く
	 */
}

SNSの設定

適当なトピックを作成し上で作成したAPIへのURLをSubscriptionに設定するだけ
AWS Management Consoleから登録した場合は「Subscription ID」を見て
ARNが表示されれば正常に登録されたことが確認できる

CloudWatchの設定

AWS Management Consoleで作業します
CloudWatch -> Metrics -> SQSを選択し以下のMetricsのアラームを作成する
最低が1分なので間隔を1分にしておく

QueueName (SQSキュー名)
MetricName ApproximateNumberOfMessagesVisible
Period 1 Minute
Statistic Maximum
Take action Send Notification
Action details (SNSトピック名)

最後に下のようになってればOK
f:id:yustam:20120924185857j:image

確認してみる

SQSのロック期間が30秒でAlarmの発生条件が1分なので1分30秒毎にAPIが呼ばれるはず
と思ったのだけど実際は5分おきにリクエストが飛んでくる

CloudWatchのステータスが「INSUFFICIENT_DATA」になってるときがあるので、
SQSのデータは5分間隔でしか取れないのかもしれない

2012/09/25追記

SQSのメッセージは4時間で削除される(キューの設定で1分〜14日までの範囲で指定可能)
ので上のような使い方ではダメでAPI側では処理完了後にメッセージを削除しSNSでは
SQSへメッセージを送信するようにする必要がある。
普通はこんな面倒くさい使い方しないと思うけど念のためメモ。

本当はキューに10個以上メッセージがたまったらCloudWatch/SNSAPIに通知して
APIがキューのメッセージを順次処理する、みたいな使い方が正しいと思う。