org.apache.hadoop.io.Text.getBytes()を使用するときの注意
以前Hadoop入力データの文字エンコーディングに対応するため以下のようなコードを書いた
public class CustomMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); String encoding = conf.get(Const.CONF_CHAR_ENCODING, "UTF-8"); String line = new String(value.getBytes(), Charset.forName(encoding)); // エンコードした文字列で処理 } }
テストしていたら変数「line」に前の行のデータが残る現象が発生
正確には同じMapタスクの直前にTextInputFormatで読み込んだデータが後ろに残る
下のような感じ
入力ファイル
AAAAAAAAAAAAAAAAAAAA
BBBBBBBBBBBBBBB
CCCCCCCCCC
DDDDDDDDDDDDDDDDDDDD
EEEEEEEEEEEEEEE
MapタスクAの入力
AAAAAAAAAAAAAAAAAAAA
BBBBBBBBBBBBBBB
MapタスクBの入力
CCCCCCCCCC
DDDDDDDDDDDDDDDDDDDD
EEEEEEEEEEEEEEE
MapタスクAの変数「line」
AAAAAAAAAAAAAAAAAAAA
BBBBBBBBBBBBBBBAAAAA
MapタスクBの変数「line」
CCCCCCCCCC
DDDDDDDDDDDDDDDDDDDD
EEEEEEEEEEEEEEEDDDDD
1.0.3のjavadocを確認するとgetBytes()で取得した値で正しいのはgetLength()の位置までと書いてある
実際に問題が発生する場合はgetBytes()で取得した配列のサイズがgetLength()の値を超えている
Text (Hadoop 1.0.3 API)
よって以下のように修正
public class CustomMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); String encoding = conf.get(Const.CONF_CHAR_ENCODING, "UTF-8"); byte[] lineArray = ArrayUtils.subarray( value.getBytes(), 0, value.getLength()); String line = new String(lineArray, Charset.forName(encoding)); // エンコードした文字列で処理 } }
2.0.1-alphaではcopyBytes()メソッドが追加されそちらを使うように書かれていた。
Text (Apache Hadoop Main 2.0.1-alpha API)
SQSを使って定期的にAPIをコールする仕組みを作ってみる
- SQSにあらかじめメッセージを1つ登録しておきCloudWatchがそれを監視する
- 取得可能なメッセージ(Available)が閾値(1個)を超えるとAlarmを発生
- SNSを通してEC2インスタンスへHTTPリクエストを送信
- 受信したAPIでSQSのメッセージを取得しステータスを変更
- 一定時間経つとメッセージのステータスが戻り2に戻る
- SQSのメッセージを削除すると止まる
APIの実装
{ "Type": "SubscriptionConfirmation", "MessageId": "aad797f5-0d16-47c8-95a0-8dbe40410640", "Token": "…", "TopicArn": "arn:aws:sns:ap-northeast-1:123456789012:topicName", "Message": "You have chosen to subscribe to the topic arn:aws:sns:ap-northeast-1:123456789012:topicName.\n To confirm the subscription, visit the SubscribeURL included in this message.", "SubscribeURL": "https://sns.ap-northeast-1.amazonaws.com/ ?Action=ConfirmSubscription&TopicArn=arn:aws:sns:ap-northeast-1:123456789012:topicName&Token=…", "Timestamp": "2012-09-24T09:13:29.588Z", "SignatureVersion": "1", "Signature": "…", "SigningCertURL": "https://sns.ap-northeast-1.amazonaws.com/SimpleNotificationService-….pem" }
SNSの宛先に登録すると上のようなJSON文字列をBodyに持つPOSTリクエストが飛んでくるので
上のJSONを受信したらデータ内の「SubscribeURL」をGETするロジックを書いておく
簡単に書くと下のような感じ。「request」はHttpServletRequestを使用しています。
String requestLine = IOUtils.toString(request.getInputStream()); Map<String, String> requestBody = (Map<String, String>) JSON.parse(requestLine); // SNSの承認がまだの場合は「SubscribeURL」を持つ if (requestBody.get("SubscribeURL") != null) { try { // SNSの承認を行う URL url = new URL(requestBody.get("SubscribeURL")); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.connect(); if (conn.getResponseCode() == HttpStatus.SC_OK) { /* 成功 */ } else { /* 失敗 */ } conn.disconnect(); } catch (Exception e) { e.printStackTrace(); } } else { /** 承認済みの場合はこちら */ String accessKey = "【AWSアクセスキー】"; String secretAccessKey = "【AWSシークレットキー】"; AWSCredentials cre = new BasicAWSCredentials(accessKey, secretAccessKey); AmazonSQSClient client = new AmazonSQSClient(cre); client.setEndpoint("sqs.ap-northeast-1.amazonaws.com"); ListQueuesResult queue = client.listQueues(new ListQueuesRequest() .withQueueNamePrefix("【SQSキュー名】")); // SQSキューのURLを取得 String queueUrl = queue.getQueueUrls().get(0); // SQSメッセージを受信 ReceiveMessageResult result = client.receiveMessage(new ReceiveMessageRequest() .withQueueUrl(queueUrl)); /** * 以下SNSのメッセージを受信したタイミングで実行したい処理を書く */ }
SNSの設定
適当なトピックを作成し上で作成したAPIへのURLをSubscriptionに設定するだけ
AWS Management Consoleから登録した場合は「Subscription ID」を見て
ARNが表示されれば正常に登録されたことが確認できる
CloudWatchの設定
AWS Management Consoleで作業します
CloudWatch -> Metrics -> SQSを選択し以下のMetricsのアラームを作成する
最低が1分なので間隔を1分にしておく
QueueName | (SQSキュー名) |
---|---|
MetricName | ApproximateNumberOfMessagesVisible |
Period | 1 Minute |
Statistic | Maximum |
Take action | Send Notification |
Action details | (SNSトピック名) |
確認してみる
SQSのロック期間が30秒でAlarmの発生条件が1分なので1分30秒毎にAPIが呼ばれるはず
と思ったのだけど実際は5分おきにリクエストが飛んでくる
CloudWatchのステータスが「INSUFFICIENT_DATA」になってるときがあるので、
SQSのデータは5分間隔でしか取れないのかもしれない
Amazon SNSからSQSへメッセージを送信する
通知サービスSNSからSQSへは無料でメッセージを送れるみたいです
SNSのメッセージは8KBの制限があるためSQSへメッセージを直接転送できるなら
そっちの方が良い。主にCloudWatchと連携して使うことになると思います
SQSキューを作成
前回と同様名前を入力するだけ。
SNSトピックを作成
こちらも名前を入力するだけ。
SNSトピックに宛先を追加
Protocol | Amazpn SQS |
---|---|
Endpoint | arn:aws:sqs:ap-northeast-1:123456789012:queueName |
SQSキューの権限を変更
Effect | Allow |
---|---|
Principals | Everybody(*) |
Action | SQS:SendMessage |
Condition | ArnEquals aws:SourceArn: "arn:aws:sns:ap-northeast-1:123456789012:topicName" |
以上の設定でSNSのTopicを配信するとSQSにメッセージが追加される
キューから取得できるメッセージは以下のような形式
{ "Type" : "Notification", "MessageId" : "6047f93e-4496-4974-92f2-b29e2c120e0c", "TopicArn" : "arn:aws:sns:ap-northeast-1:123456789012:topicName", "Subject" : "【SNSで入力した件名】", "Message" : "【SNSで入力した本文】", "Timestamp" : "2012-09-24T01:27:38.058Z", "SignatureVersion" : "1", "Signature" : "…", "SigningCertURL" : "https://sns.ap-northeast-1.amazonaws.com/….pem", "UnsubscribeURL" : "https://sns.ap-northeast-1.amazonaws.com/…" }
Amazon SQSのステータス遷移を確認する
Amazon Simple Queue Service(Amazon SQS)を少し触ってみたのでメモ
64KBまでの文字列を登録するだけのシンプルなサービス
メッセージを追加して取得してみる
キューの作成は名前を入力するだけなので省略。作成するとURLが生成される
AWS-SDKforJavaにてメッセージを追加/取得するコードは以下
String accessKey = "【AWSアクセスキー】"; String secretAccessKey = "【AWSシークレットキー】"; AWSCredentials cre = new BasicAWSCredentials(accessKey, secretAccessKey); AmazonSQSClient client = new AmazonSQSClient(cre); client.setEndpoint("sqs.ap-northeast-1.amazonaws.com"); // メッセージを送信 client.sendMessage(new SendMessageRequest() .withQueueUrl("【キューのURL】") .withMessageBody("Hello, world!")); // メッセージを取得 ReceiveMessageResult receive1 = client.receiveMessage( new ReceiveMessageRequest().withQueueUrl("【キューのURL】")); ReceiveMessageResult receive2 = client.receiveMessage( new ReceiveMessageRequest().withQueueUrl("【キューのURL】"));
確認のため2回実行してみたが片方からしかメッセージは取れない。(空が返ってくる)
メッセージの内容は以下のようなもの
{ MessageId: "502ebed6-a02b-4b2a-87e3-3de0d59p9d43", ReceiptHandle: "【300文字くらいの長い文字列】", MD5OfBody: "f0c75976a003b4690235c3o23a5d41c4", Body: "Hello, world!", }
ロック期間を延長する
メッセージを取得した際にロックされるが一定時間経過すると解除される
キューの設定に規定のロック期間を設定することができる
Default Visibility Timeout | 取得したメッセージをロックする時間(規定値30秒) |
---|
メッセージを取得したクライアントからはロック期間の延長を要求することが可能
client.changeMessageVisibility(new ChangeMessageVisibilityRequest() .withQueueUrl("【キューのURL】") .withReceiptHandle("【ReceiptHandle】") .withVisibilityTimeout("【ロック期間(秒)】"));
HadoopでJSONデータを扱う
JavaでJSONを扱う際に外部ライブラリを必要としますが、
Hadoop1.0.3にはjacksonのライブラリが含まれているみたい
Jackson JSON Processor - Home
hadoop-core-1.0.3.pomの一部
<dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> <version>1.0.1</version> </dependency>
注意しなければいけないのはバージョンが1.0.1であるということ
(現時点でjacksonの最新は1.9.9)
依存関係に違うバージョンのjacksonのライブラリを含めると競合してしまうので
Hadoop1.0.3でjacksonを使用する場合は1.0.1を使います
以下コマンドライン引数にJSONデータを渡し設定を取得する例
{ "char_encoding" : "UTF-8" }
Configuration conf = getConf(); ObjectMapper mapper = new ObjectMapper(); JsonParser parser = new JsonFactory().createJsonParser(args[2]); JsonNode json = mapper.readTree(parser); conf.setStrings("jp.yustam.encoding", json.get("char_encoding").getValueAsText());
Amazon ElasticMapReduceのjarからS3上のファイルを参照する
Hadoopの制御を設定の変更で行う場合にコマンドライン引数で文字列を渡すことはできるが
パラメータが増えると引数が長くなってしまうのでファイルから設定を読み込むようにしたい
ここでは実行するjarファイルからS3上に配置したプロパティファイル読み込んでみました
public class Sample extends Configured implements Tool { public int run(String[] args) throws Exception { Configuration conf = getConf(); String confPath = "s3n://【バケット名】/【バケット配下のパス】"; FileSystem fs = FileSystem.get(URI.create(confPath), conf); Properties props = new Properties(); props.load(fs.open(new Path(confPath))); /* 〜省略〜 */ } }
参考:File System Configuration - Amazon Elastic MapReduce
2012/09/20追記
Amazon EMRのHadoopでなくローカルのHadoop環境で実行する場合は以下のように
アクセスキーを指定することで「s3n://〜」から始まるURIにアクセスできる
Configuration conf = getConf(); conf.set("fs.s3n.awsAccessKeyId", "【AWSアクセスキー】"); conf.set("fs.s3n.awsSecretAccessKey", "【AWSシークレットキー】"); String confPath = "s3n://【バケット名】/【バケット配下のパス】"; FileSystem fs = FileSystem.get(URI.create(confPath), conf); Properties props = new Properties(); props.load(fs.open(new Path(confPath)));
S3の制限付きダウンロードURLにIPアドレス制限をかける
前回のソースでは有効期限のみの制限となるのでリンクを知っている人なら
誰でもダウンロードできてしまう問題がある
もう一段回セキュリティを強化する方法として時間の制限に加えてIPアドレスの制限をかける
IPアドレスの制限はIAMのポリシーで行うことができるのでダウンロードさせたいIPアドレスを
ポリシーに設定したユーザを作成し、そのAWSアクセスキーでURLを生成すればよい
ポリシーは以下のようになる
{ "Statement": [ { "Sid": "【適当な半角英数字の文字列(記号不可)】", "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::【バケット名】/【バケット配下のパス】【ファイル名】" ], "Condition": { "IpAddress": { "aws:SourceIp": [ "【IPアドレス】/【サブネットマスク】" ] } } } ] }
ダウンロードなのでActionは「s3:GetObject」のみ
IPアドレス固定の場合「xxx.xxx.xxx.xxx/32」でOK
AWSのJavaSDKでユーザを作成する場合はこんな感じ
// 適当なユーザ名を作る SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS"); String userName = "user" + sdf.format(Calendar.getInstance().getTime()); String policyName = "policy_" + userName; String sid = "sid" + userName; String ipAddress = "【IPアドレス】/【サブネットマスク】"; // ダウンロードしたいファイルを指定 String bucketName = "【バケット名】"; String key = "【バケット配下のパス】【ファイル名】"; // IAMの操作権限を持つアカウントでクライアントを生成 AWSCredentials creAdmin = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_ACCESS_KEY); AmazonIdentityManagementClient client = new AmazonIdentityManagementClient(creAdmin); // ユーザ作成 CreateUserRequest createUserRequest = new CreateUserRequest(userName); User user = client.createUser(createUserRequest).getUser(); // アクセスキー作成 AccessKey accessKey = client.createAccessKey( new CreateAccessKeyRequest().withUserName(userName)) .getAccessKey(); System.out.println(accessKey.getAccessKeyId()); // <- AWSアクセスキー System.out.println(accessKey.getSecretAccessKey()); // <- AWSシークレットキー // ポリシー設定 Resource s3Resource = new S3BucketResource(bucketName + "/" + key); Condition condition = new IpAddressCondition(ipAddress); Statement statement = new Statement(Effect.Allow).withId(sid) .withActions(S3Actions.GetObject).withResources(s3Resource) .withConditions(condition); Policy policy = new Policy().withStatements(statement); PutUserPolicyRequest putUserPolicyRequest = new PutUserPolicyRequest( userName, policyName, policy.toJson()); client.putUserPolicy(putUserPolicyRequest);
あとは作成したユーザのAWSアクセスキー/AWSシークレットキーでダウンロードURLを作成するだけ
ユーザ作成からAWSアクセスキーが使用可能になるまで少々時間がかかるらしく
5〜8秒くらい待つと大丈夫みたい(URLの生成はできるがURLをGETした際に403エラーとなる)
IP制限/時間制限付きダウンロードURLを返すRESTサービス
ダウンロードURLを生成するRESTサービスをJerseyで作成するとこんな感じになると思います
リクエストからリモートIPアドレスを取得してIAMの権限にセットしているので同じクライアント(IPアドレス)
からしかダウンロードできないよう制限をかけることができる
@GET @Produces( { MediaType.APPLICATION_JSON }) @Path("download/{bucket}/{key}/") public Response generateDownloadUrl(@Context HttpServletRequest request, @PathParam("bucket") String bucket, @PathParam("key") String key) { // 適当なユーザ名を作る SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS"); String userName = "user" + sdf.format(Calendar.getInstance().getTime()); String policyName = "policy_" + userName; String sid = "sid" + userName; String ipAddress = request.getRemoteAddr() + "/32"; // IAMの操作権限を持つアカウントでクライアントを生成 AWSCredentials creAdmin = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_ACCESS_KEY); AmazonIdentityManagementClient clientIAM = new AmazonIdentityManagementClient(creAdmin); // ユーザ作成 CreateUserRequest createUserRequest = new CreateUserRequest(userName); User user = clientIAM.createUser(createUserRequest).getUser(); // アクセスキー作成 AccessKey accessKey = clientIAM.createAccessKey( new CreateAccessKeyRequest().withUserName(userName)) .getAccessKey(); System.out.println(accessKey.getAccessKeyId()); System.out.println(accessKey.getSecretAccessKey()); // ポリシー設定 Resource s3Resource = new S3BucketResource(bucket + "/" + key); Condition condition = new IpAddressCondition(ipAddress); Statement statement = new Statement(Effect.Allow).withId(sid) .withActions(S3Actions.GetObject).withResources(s3Resource) .withConditions(condition); Policy policy = new Policy().withStatements(statement); System.out.println(policy.toJson()); PutUserPolicyRequest putUserPolicyRequest = new PutUserPolicyRequest( user.getUserName(), policyName, policy.toJson()); clientIAM.putUserPolicy(putUserPolicyRequest); // すぐにはユーザが反映されないらしいのでちょっと待つ TimeUnit.SECONDS.sleep(5); // 作成したアカウントでクライアントを生成 AWSCredentials cre = new BasicAWSCredentials( accessKey.getAccessKeyId(), accessKey.getSecretAccessKey()); AmazonS3Client client = new AmazonS3Client(cre); // 有効期限(5分) Calendar cal = Calendar.getInstance(); cal.add(Calendar.MINUTE, 5); Date limit = cal.getTime(); // URLを生成 URL url = client.generatePresignedUrl(new GeneratePresignedUrlRequest( bucket, key).withExpiration(limit)); // 生成したURLを返す Response.ResponseBuilder responseBuilder = Response.ok(); responseBuilder.entity(url.toString()); return responseBuilder.build(); }
実際に使おうと思ったらユーザの有効チェックやファイルの存在チェックを入れてもいいと思います。
あとユーザ情報が残るので有効期限が切れたら削除してあげるような仕組みを作っておいた方が良いかも
使い終わったユーザを削除する
こんな感じのコードを生成処理と同じところに入れておけば良いと思います
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); FutureTask<String> task = new FutureTask<String>(new DeleteUserTask( creAdmin, userName, policyName, accessKey.getAccessKeyId())); // 有効期限に合わせる executor.schedule(task, 5, TimeUnit.MINUTES); executor.shutdown();
class DeleteUserTask implements Callable<String> { private AWSCredentials cre; private String userName; private String policyName; private String userAccessKeyId; public DeleteUserTask(AWSCredentials cre, String userName, String policyName, String userAccessKeyId) { this.cre = cre; this.userName = userName; this.policyName = policyName; this.userAccessKeyId = userAccessKeyId; } public String call() throws Exception { // IAMの操作権限を持つアカウントでクライアントを生成 AmazonIdentityManagementClient client = new AmazonIdentityManagementClient(cre); // ポリシー削除(ユーザ削除の前に行わないとエラー) client.deleteUserPolicy(new DeleteUserPolicyRequest(userName, policyName)); // アクセスキー削除(ユーザ削除の前に行わないとエラー) client.deleteAccessKey(new DeleteAccessKeyRequest(userAccessKeyId).withUserName(userName)); // ユーザ削除 client.deleteUser(new DeleteUserRequest(userName)); return "ok"; } }
2012/09/15追記
毎回IAMユーザを作成するのは面倒だし反映されるまで毎回待たされるのも問題
Amazon IAMはユーザを作っても作りっぱなしにしても料金は掛からないので、
使い終わったユーザが残ってても気にしないのであれば消さずに再利用するのが良さそう
AWS Identity and Access Management (IAM) Preview Beta | アマゾン ウェブ サービス(AWS 日本語)