ormat; public class ctrip001 { public static void main(String[] args) throws Exception { String In = "/tmp/hotel"; String Out = "/tmp/hotel001"; Configuration conf = new Configuration(); conf.set("mapred.job.tracker", "m04.ct1.r01.hdp:9001"); Job job = new Job(conf, "Hotel"); job.setJarByClass(ctrip001.class); FileSystem fs = FileSystem.get(conf); fs.delete(new Path(Out), true); FileInputFormat.addInputPath(job, new Path(In)); FileOutputFormat.setOutputPath(job, new Path(Out)); job.setMapperClass(HotelMap.class); job.setMapOutputKeyClass(Hotel.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(HotelReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(5); // 根据酒店地标数据的行数,增减reduce。为了保证速度,20多亿行数据差不多要500到1000个Reduce(根据不同机型改变配置) job.setGroupingComparatorClass(HotelGrouping.class); job.setPartitionerClass(HotelPartitioner.class); job.waitForCompletion(true); } public static class HotelMap extends Mapper<LongWritable, Text, Hotel, NullWritable> { public void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException { Hotel hotel = new Hotel(); hotel.setHotel(value); context.write(hotel, NullWritable.get()); // 酒店或地标 if (hotel.getType() == 0) { // 为酒店冗余8个中心点 hotel.setXYa(1, 0); context.write(hotel, NullWritable.get()); hotel.setXYa(1, 1); context.write(hotel, NullWritable.get()); hotel.setXYa(1, -1); context.write(hotel, NullWritable.get()); hotel.setXYa(-1, 0); context.write(hotel, NullWritable.get()); hotel.setXYa(-1, 1); context.write(hotel, NullWritable.get()); hotel.setXYa(-1, -1); context.write(hotel, NullWritable.get()); hotel.setXYa(0, 1); context.write(hotel, NullWritable.get()); hotel.setXYa(0, -1); context.write(hotel, NullWritable.get()); } return; } } 上一页 [1] [2] [3] [4] [5] [6] 下一页
|