org.apache.hadoop.io.Text.getBytes()を使用するときの注意

以前Hadoop入力データの文字エンコーディングに対応するため以下のようなコードを書いた

public class CustomMapper extends Mapper<LongWritable, Text, Text, Text> {

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		Configuration conf = context.getConfiguration();
		String encoding = conf.get(Const.CONF_CHAR_ENCODING, "UTF-8");
		String line = new String(value.getBytes(), Charset.forName(encoding));
		// エンコードした文字列で処理
	}
}

テストしていたら変数「line」に前の行のデータが残る現象が発生
正確には同じMapタスクの直前にTextInputFormatで読み込んだデータが後ろに残る
下のような感じ

入力ファイル

AAAAAAAAAAAAAAAAAAAA
BBBBBBBBBBBBBBB
CCCCCCCCCC
DDDDDDDDDDDDDDDDDDDD
EEEEEEEEEEEEEEE

MapタスクAの入力

AAAAAAAAAAAAAAAAAAAA
BBBBBBBBBBBBBBB

MapタスクBの入力

CCCCCCCCCC
DDDDDDDDDDDDDDDDDDDD
EEEEEEEEEEEEEEE

MapタスクAの変数「line」

AAAAAAAAAAAAAAAAAAAA
BBBBBBBBBBBBBBBAAAAA

MapタスクBの変数「line」

CCCCCCCCCC
DDDDDDDDDDDDDDDDDDDD
EEEEEEEEEEEEEEEDDDDD

1.0.3のjavadocを確認するとgetBytes()で取得した値で正しいのはgetLength()の位置までと書いてある
実際に問題が発生する場合はgetBytes()で取得した配列のサイズがgetLength()の値を超えている

Text (Hadoop 1.0.3 API)

よって以下のように修正

public class CustomMapper extends Mapper<LongWritable, Text, Text, Text> {

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		Configuration conf = context.getConfiguration();
		String encoding = conf.get(Const.CONF_CHAR_ENCODING, "UTF-8");
		byte[] lineArray = ArrayUtils.subarray(
				value.getBytes(), 0, value.getLength());
		String line = new String(lineArray, Charset.forName(encoding));
		// エンコードした文字列で処理
	}
}

2.0.1-alphaではcopyBytes()メソッドが追加されそちらを使うように書かれていた。

Text (Apache Hadoop Main 2.0.1-alpha API)

SQSを使って定期的にAPIをコールする仕組みを作ってみる

今回作る仕組みの概要
f:id:yustam:20120924182041p:image

  1. SQSにあらかじめメッセージを1つ登録しておきCloudWatchがそれを監視する
  2. 取得可能なメッセージ(Available)が閾値(1個)を超えるとAlarmを発生
  3. SNSを通してEC2インスタンスへHTTPリクエストを送信
  4. 受信したAPIでSQSのメッセージを取得しステータスを変更
  5. 一定時間経つとメッセージのステータスが戻り2に戻る
  6. SQSのメッセージを削除すると止まる

APIの実装

{
    "Type": "SubscriptionConfirmation",
    "MessageId": "aad797f5-0d16-47c8-95a0-8dbe40410640",
    "Token": "…",
    "TopicArn": "arn:aws:sns:ap-northeast-1:123456789012:topicName",
    "Message": "You have chosen to subscribe to the topic arn:aws:sns:ap-northeast-1:123456789012:topicName.\n
        To confirm the subscription, visit the SubscribeURL included in this message.",
    "SubscribeURL": "https://sns.ap-northeast-1.amazonaws.com/
        ?Action=ConfirmSubscription&TopicArn=arn:aws:sns:ap-northeast-1:123456789012:topicName&Token=…",
    "Timestamp": "2012-09-24T09:13:29.588Z",
    "SignatureVersion": "1",
    "Signature": "…",
    "SigningCertURL": "https://sns.ap-northeast-1.amazonaws.com/SimpleNotificationService-….pem"
}

SNSの宛先に登録すると上のようなJSON文字列をBodyに持つPOSTリクエストが飛んでくるので
上のJSONを受信したらデータ内の「SubscribeURL」をGETするロジックを書いておく
簡単に書くと下のような感じ。「request」はHttpServletRequestを使用しています。

String requestLine = IOUtils.toString(request.getInputStream());
Map<String, String> requestBody = (Map<String, String>) JSON.parse(requestLine);
// SNSの承認がまだの場合は「SubscribeURL」を持つ
if (requestBody.get("SubscribeURL") != null) {
	try {
		// SNSの承認を行う
		URL url = new URL(requestBody.get("SubscribeURL"));
		HttpURLConnection conn = (HttpURLConnection) url.openConnection();
		conn.connect();
		if (conn.getResponseCode() == HttpStatus.SC_OK) {
			/* 成功 */
		} else {
			/* 失敗 */
		}
		conn.disconnect();
	} catch (Exception e) {
		e.printStackTrace();
	}
} else {
	/** 承認済みの場合はこちら */
	String accessKey = "【AWSアクセスキー】";
	String secretAccessKey = "【AWSシークレットキー】";
	AWSCredentials cre = new BasicAWSCredentials(accessKey, secretAccessKey);
	AmazonSQSClient client = new AmazonSQSClient(cre);
	client.setEndpoint("sqs.ap-northeast-1.amazonaws.com");
	ListQueuesResult queue = client.listQueues(new ListQueuesRequest()
		.withQueueNamePrefix("【SQSキュー名】"));
	// SQSキューのURLを取得
	String queueUrl = queue.getQueueUrls().get(0);
	// SQSメッセージを受信
	ReceiveMessageResult result = client.receiveMessage(new ReceiveMessageRequest()
		.withQueueUrl(queueUrl));
	/**
	 * 以下SNSのメッセージを受信したタイミングで実行したい処理を書く
	 */
}

SNSの設定

適当なトピックを作成し上で作成したAPIへのURLをSubscriptionに設定するだけ
AWS Management Consoleから登録した場合は「Subscription ID」を見て
ARNが表示されれば正常に登録されたことが確認できる

CloudWatchの設定

AWS Management Consoleで作業します
CloudWatch -> Metrics -> SQSを選択し以下のMetricsのアラームを作成する
最低が1分なので間隔を1分にしておく

QueueName (SQSキュー名)
MetricName ApproximateNumberOfMessagesVisible
Period 1 Minute
Statistic Maximum
Take action Send Notification
Action details (SNSトピック名)

最後に下のようになってればOK
f:id:yustam:20120924185857j:image

確認してみる

SQSのロック期間が30秒でAlarmの発生条件が1分なので1分30秒毎にAPIが呼ばれるはず
と思ったのだけど実際は5分おきにリクエストが飛んでくる

CloudWatchのステータスが「INSUFFICIENT_DATA」になってるときがあるので、
SQSのデータは5分間隔でしか取れないのかもしれない

2012/09/25追記

SQSのメッセージは4時間で削除される(キューの設定で1分〜14日までの範囲で指定可能)
ので上のような使い方ではダメでAPI側では処理完了後にメッセージを削除しSNSでは
SQSへメッセージを送信するようにする必要がある。
普通はこんな面倒くさい使い方しないと思うけど念のためメモ。

本当はキューに10個以上メッセージがたまったらCloudWatch/SNSAPIに通知して
APIがキューのメッセージを順次処理する、みたいな使い方が正しいと思う。

Amazon SNSからSQSへメッセージを送信する

通知サービスSNSからSQSへは無料でメッセージを送れるみたいです
SNSのメッセージは8KBの制限があるためSQSへメッセージを直接転送できるなら
そっちの方が良い。主にCloudWatchと連携して使うことになると思います

SQSキューを作成

前回と同様名前を入力するだけ。

SNSトピックを作成

こちらも名前を入力するだけ。

SNSトピックに宛先を追加

Protocol Amazpn SQS
Endpoint arn:aws:sqs:ap-northeast-1:123456789012:queueName

SQSキューの権限を変更

Effect Allow
Principals Everybody(*)
Action SQS:SendMessage
Condition ArnEquals aws:SourceArn: "arn:aws:sns:ap-northeast-1:123456789012:topicName"

以上の設定でSNSのTopicを配信するとSQSにメッセージが追加される
キューから取得できるメッセージは以下のような形式

{
  "Type" : "Notification",
  "MessageId" : "6047f93e-4496-4974-92f2-b29e2c120e0c",
  "TopicArn" : "arn:aws:sns:ap-northeast-1:123456789012:topicName",
  "Subject" : "【SNSで入力した件名】",
  "Message" : "【SNSで入力した本文】",
  "Timestamp" : "2012-09-24T01:27:38.058Z",
  "SignatureVersion" : "1",
  "Signature" : "…",
  "SigningCertURL" : "https://sns.ap-northeast-1.amazonaws.com/….pem",
  "UnsubscribeURL" : "https://sns.ap-northeast-1.amazonaws.com/…"
}

Amazon SQSのステータス遷移を確認する

Amazon Simple Queue Service(Amazon SQS)を少し触ってみたのでメモ
64KBまでの文字列を登録するだけのシンプルなサービス

メッセージを追加して取得してみる

キューの作成は名前を入力するだけなので省略。作成するとURLが生成される
AWS-SDKforJavaにてメッセージを追加/取得するコードは以下

String accessKey = "【AWSアクセスキー】";
String secretAccessKey = "【AWSシークレットキー】";
AWSCredentials cre = new BasicAWSCredentials(accessKey, secretAccessKey);
AmazonSQSClient client = new AmazonSQSClient(cre);
client.setEndpoint("sqs.ap-northeast-1.amazonaws.com");

// メッセージを送信
client.sendMessage(new SendMessageRequest()
	.withQueueUrl("【キューのURL】")
	.withMessageBody("Hello, world!"));

// メッセージを取得
ReceiveMessageResult receive1 = client.receiveMessage(
	new ReceiveMessageRequest().withQueueUrl("【キューのURL】"));
ReceiveMessageResult receive2 = client.receiveMessage(
	new ReceiveMessageRequest().withQueueUrl("【キューのURL】"));

確認のため2回実行してみたが片方からしかメッセージは取れない。(空が返ってくる)
メッセージの内容は以下のようなもの

{
    MessageId: "502ebed6-a02b-4b2a-87e3-3de0d59p9d43",
    ReceiptHandle: "【300文字くらいの長い文字列】",
    MD5OfBody: "f0c75976a003b4690235c3o23a5d41c4",
    Body: "Hello, world!",        
}

ロック期間を延長する

メッセージを取得した際にロックされるが一定時間経過すると解除される
キューの設定に規定のロック期間を設定することができる

Default Visibility Timeout 取得したメッセージをロックする時間(規定値30秒)

メッセージを取得したクライアントからはロック期間の延長を要求することが可能

client.changeMessageVisibility(new ChangeMessageVisibilityRequest()
	.withQueueUrl("【キューのURL】")
	.withReceiptHandle("【ReceiptHandle】")
	.withVisibilityTimeout("【ロック期間(秒)】"));

ステータスを確認する

AWS Management Consoleから見るとキューの中身は以下の2つになっている

Messages Available 取得可能なメッセージ
Massages in Flight ロック中のメッセージ

メッセージは削除しないと元に戻ってしまうため1回きりの処理の場合は
取得したクライアントがメッセージの削除を行う必要がある
あえて削除せずにロック期間毎に実行する処理を作るのに利用できるかも
f:id:yustam:20120923231032p:image

HadoopでJSONデータを扱う

JavaでJSONを扱う際に外部ライブラリを必要としますが、
Hadoop1.0.3にはjacksonのライブラリが含まれているみたい
Jackson JSON Processor - Home

hadoop-core-1.0.3.pomの一部
<dependency>
   <groupId>org.codehaus.jackson</groupId>
   <artifactId>jackson-mapper-asl</artifactId>
   <version>1.0.1</version>
</dependency>

注意しなければいけないのはバージョンが1.0.1であるということ
(現時点でjacksonの最新は1.9.9)

依存関係に違うバージョンのjacksonのライブラリを含めると競合してしまうので
Hadoop1.0.3でjacksonを使用する場合は1.0.1を使います
以下コマンドライン引数にJSONデータを渡し設定を取得する例

{
   "char_encoding" : "UTF-8"
}
Configuration conf = getConf();

ObjectMapper mapper = new ObjectMapper();
JsonParser parser = new JsonFactory().createJsonParser(args[2]);
JsonNode json = mapper.readTree(parser);
conf.setStrings("jp.yustam.encoding", json.get("char_encoding").getValueAsText());

Amazon ElasticMapReduceのjarからS3上のファイルを参照する

Hadoopの制御を設定の変更で行う場合にコマンドライン引数で文字列を渡すことはできるが
パラメータが増えると引数が長くなってしまうのでファイルから設定を読み込むようにしたい

ここでは実行するjarファイルからS3上に配置したプロパティファイル読み込んでみました

public class Sample extends Configured implements Tool {
	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		String confPath = "s3n://【バケット名】/【バケット配下のパス】";
		FileSystem fs = FileSystem.get(URI.create(confPath), conf);
		Properties props = new Properties();
		props.load(fs.open(new Path(confPath)));
		/* 〜省略〜 */
	}
}

参考:File System Configuration - Amazon Elastic MapReduce

2012/09/20追記

Amazon EMRのHadoopでなくローカルのHadoop環境で実行する場合は以下のように
アクセスキーを指定することで「s3n://〜」から始まるURIにアクセスできる

Configuration conf = getConf();

conf.set("fs.s3n.awsAccessKeyId", "【AWSアクセスキー】");
conf.set("fs.s3n.awsSecretAccessKey", "【AWSシークレットキー】");

String confPath = "s3n://【バケット名】/【バケット配下のパス】";
FileSystem fs = FileSystem.get(URI.create(confPath), conf);
Properties props = new Properties();
props.load(fs.open(new Path(confPath)));

S3の制限付きダウンロードURLにIPアドレス制限をかける

前回のソースでは有効期限のみの制限となるのでリンクを知っている人なら
誰でもダウンロードできてしまう問題がある

もう一段回セキュリティを強化する方法として時間の制限に加えてIPアドレスの制限をかける
IPアドレスの制限はIAMのポリシーで行うことができるのでダウンロードさせたいIPアドレス
ポリシーに設定したユーザを作成し、そのAWSアクセスキーでURLを生成すればよい

ポリシーは以下のようになる

{
    "Statement": [
        {
            "Sid": "【適当な半角英数字の文字列(記号不可)】",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::【バケット名】/【バケット配下のパス】【ファイル名】"
            ],
            "Condition": {
                "IpAddress": {
                    "aws:SourceIp": [
                        "【IPアドレス】/【サブネットマスク】"
                    ]
                }
            }
        }
    ]
}

ダウンロードなのでActionは「s3:GetObject」のみ
IPアドレス固定の場合「xxx.xxx.xxx.xxx/32」でOK

AWSのJavaSDKでユーザを作成する場合はこんな感じ

// 適当なユーザ名を作る
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS");
String userName = "user" + sdf.format(Calendar.getInstance().getTime());
String policyName = "policy_" + userName;
String sid = "sid" + userName;
String ipAddress = "【IPアドレス】/【サブネットマスク】";

// ダウンロードしたいファイルを指定
String bucketName = "【バケット名】";
String key = "【バケット配下のパス】【ファイル名】";

// IAMの操作権限を持つアカウントでクライアントを生成
AWSCredentials creAdmin = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_ACCESS_KEY);
AmazonIdentityManagementClient client = new AmazonIdentityManagementClient(creAdmin);

// ユーザ作成
CreateUserRequest createUserRequest = new CreateUserRequest(userName);
User user = client.createUser(createUserRequest).getUser();

// アクセスキー作成
AccessKey accessKey = client.createAccessKey(
		new CreateAccessKeyRequest().withUserName(userName))
		.getAccessKey();
System.out.println(accessKey.getAccessKeyId());		// <- AWSアクセスキー
System.out.println(accessKey.getSecretAccessKey());	// <- AWSシークレットキー

// ポリシー設定
Resource s3Resource = new S3BucketResource(bucketName + "/" + key);
Condition condition = new IpAddressCondition(ipAddress);
Statement statement = new Statement(Effect.Allow).withId(sid)
		.withActions(S3Actions.GetObject).withResources(s3Resource)
		.withConditions(condition);
Policy policy = new Policy().withStatements(statement);
PutUserPolicyRequest putUserPolicyRequest = new PutUserPolicyRequest(
		userName, policyName, policy.toJson());
client.putUserPolicy(putUserPolicyRequest);

あとは作成したユーザのAWSアクセスキー/AWSシークレットキーでダウンロードURLを作成するだけ

ユーザ作成からAWSアクセスキーが使用可能になるまで少々時間がかかるらしく
5〜8秒くらい待つと大丈夫みたい(URLの生成はできるがURLをGETした際に403エラーとなる)

IP制限/時間制限付きダウンロードURLを返すRESTサービス

ダウンロードURLを生成するRESTサービスをJerseyで作成するとこんな感じになると思います
リクエストからリモートIPアドレスを取得してIAMの権限にセットしているので同じクライアント(IPアドレス)
からしかダウンロードできないよう制限をかけることができる

@GET
@Produces( { MediaType.APPLICATION_JSON })
@Path("download/{bucket}/{key}/")
public Response generateDownloadUrl(@Context HttpServletRequest request,
		@PathParam("bucket") String bucket, @PathParam("key") String key) {

	// 適当なユーザ名を作る
	SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS");
	String userName = "user" + sdf.format(Calendar.getInstance().getTime());
	String policyName = "policy_" + userName;
	String sid = "sid" + userName;
	String ipAddress = request.getRemoteAddr() + "/32";

	// IAMの操作権限を持つアカウントでクライアントを生成
	AWSCredentials creAdmin = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_ACCESS_KEY);
	AmazonIdentityManagementClient clientIAM = new AmazonIdentityManagementClient(creAdmin);

	// ユーザ作成
	CreateUserRequest createUserRequest = new CreateUserRequest(userName);
	User user = clientIAM.createUser(createUserRequest).getUser();

	// アクセスキー作成
	AccessKey accessKey = clientIAM.createAccessKey(
			new CreateAccessKeyRequest().withUserName(userName))
			.getAccessKey();
	System.out.println(accessKey.getAccessKeyId());
	System.out.println(accessKey.getSecretAccessKey());

	// ポリシー設定
	Resource s3Resource = new S3BucketResource(bucket + "/" + key);
	Condition condition = new IpAddressCondition(ipAddress);
	Statement statement = new Statement(Effect.Allow).withId(sid)
			.withActions(S3Actions.GetObject).withResources(s3Resource)
			.withConditions(condition);
	Policy policy = new Policy().withStatements(statement);
	System.out.println(policy.toJson());
	PutUserPolicyRequest putUserPolicyRequest = new PutUserPolicyRequest(
			user.getUserName(), policyName, policy.toJson());
	clientIAM.putUserPolicy(putUserPolicyRequest);

	// すぐにはユーザが反映されないらしいのでちょっと待つ
	TimeUnit.SECONDS.sleep(5);

	// 作成したアカウントでクライアントを生成
	AWSCredentials cre = new BasicAWSCredentials(
			accessKey.getAccessKeyId(), accessKey.getSecretAccessKey());
	AmazonS3Client client = new AmazonS3Client(cre);

	// 有効期限(5分)
	Calendar cal = Calendar.getInstance();
	cal.add(Calendar.MINUTE, 5);
	Date limit = cal.getTime();

	// URLを生成
	URL url = client.generatePresignedUrl(new GeneratePresignedUrlRequest(
			bucket, key).withExpiration(limit));

	// 生成したURLを返す
	Response.ResponseBuilder responseBuilder = Response.ok();
	responseBuilder.entity(url.toString());
	return responseBuilder.build();
}

実際に使おうと思ったらユーザの有効チェックやファイルの存在チェックを入れてもいいと思います。
あとユーザ情報が残るので有効期限が切れたら削除してあげるような仕組みを作っておいた方が良いかも

使い終わったユーザを削除する

こんな感じのコードを生成処理と同じところに入れておけば良いと思います

ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
FutureTask<String> task = new FutureTask<String>(new DeleteUserTask(
		creAdmin, userName, policyName, accessKey.getAccessKeyId()));
// 有効期限に合わせる
executor.schedule(task, 5, TimeUnit.MINUTES);
executor.shutdown();
class DeleteUserTask implements Callable<String> {

	private AWSCredentials cre;
	private String userName;
	private String policyName;
	private String userAccessKeyId;

	public DeleteUserTask(AWSCredentials cre, String userName,
			String policyName, String userAccessKeyId) {
		this.cre = cre;
		this.userName = userName;
		this.policyName = policyName;
		this.userAccessKeyId = userAccessKeyId;
	}

	public String call() throws Exception {
		// IAMの操作権限を持つアカウントでクライアントを生成
		AmazonIdentityManagementClient client = new AmazonIdentityManagementClient(cre);
		// ポリシー削除(ユーザ削除の前に行わないとエラー)
		client.deleteUserPolicy(new DeleteUserPolicyRequest(userName, policyName));
		// アクセスキー削除(ユーザ削除の前に行わないとエラー)
		client.deleteAccessKey(new DeleteAccessKeyRequest(userAccessKeyId).withUserName(userName));
		// ユーザ削除
		client.deleteUser(new DeleteUserRequest(userName));
		return "ok";
	}
}

2012/09/15追記

毎回IAMユーザを作成するのは面倒だし反映されるまで毎回待たされるのも問題
Amazon IAMはユーザを作っても作りっぱなしにしても料金は掛からないので、
使い終わったユーザが残ってても気にしないのであれば消さずに再利用するのが良さそう
AWS Identity and Access Management (IAM) Preview Beta | アマゾン ウェブ サービス(AWS 日本語)