Hadoopの入力にShift-JISのファイルを使用する

Hadoopのinputに指定したファイルはMapperに渡され、Mapperの入力に来たときorg.apache.hadoop.io.Text型になっている。
これがどうやらUTF-8になってしまうらしく日本語の文字列比較などが上手に行えない。
エンコーディング指定する方法を探したが見つからなかったのでJobConfに指定する方法で回避した。

public class Sample extends Configured implements Tool {

	public int run(String[] args) throws Exception {
		if(args.length != 3){
			System.err.printf("Usage: %s [generic options] <input> <output> <encoding>\n", getClass().getSimpleName());
			ToolRunner.printGenericCommandUsage(System.err);
			return -1;
		}

		Configuration conf = getConf();
		// 引数のエンコーディングをJobConfに設定
		conf.setStrings("jp.yustam.hadoop.encoding", args[2]);

		JobConf jobConf = new JobConf(conf, getClass());
		jobConf.setJobName("Journalizing");

		FileInputFormat.addInputPath(jobConf, new Path(args[0]));
		FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));

		// 〜省略〜

		JobClient.runJob(jobConf);

		return 0;
	}

	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new Sample(), args);
		System.exit(exitCode);
	}
}
public class MapperSample extends MapReduceBase implements
		Mapper<LongWritable, Text, Text, Text> {

	private JobConf conf;

	public void map(LongWritable key, Text value,
			OutputCollector<Text, Text> output, Reporter reporter)
			throws IOException {

		// エンコーディングを取得
		String encoding = conf.get("jp.yustam.hadoop.encoding", "UTF-8");
		Charset charset = Charset.forName(encoding);

		// エンコーディングを指定してvalueを変換
		String tmpValue = new String(value.getBytes(), charset);

		// TODO tmpValueを使って処理
		if (tmpValue.startsWith("東京")) {
			output.collect(new Text("東京"), value);
		}
	}

	@Override
	public void configure(JobConf conf) {
		this.conf = conf;
	}
}

Cygwinで実行する場合はこんな感じ

$ hadoop jar $(cygpath -w /usr/local/hadoop-0.20.2/mapreduce-0.0.1.jar) mapreduce input output Shift-JIS