public static class HotelReducer extends Reducer<Hotel, NullWritable, Text, NullWritable> { @SuppressWarnings("unused") public void reduce(Hotel key, Iterable<NullWritable> value, Context context) throws IOException, InterruptedException { int count = 0; ArrayList<Hotel> MyHotel = new ArrayList<Hotel>(); // 一个group中可能会有多个酒店,用arraylist作为酒店容器 for (NullWritable x : value) { if (count == 0 && key.getType() != 0) break; if (key.getType() == 0) { try { MyHotel.add((Hotel) key.clone()); } catch (CloneNotSupportedException e) {} } else { for (Hotel h : MyHotel) { double dis = Math.sqrt(Math.pow(h.getX() - key.getX(), 2) + Math.pow(h.getY() - key.getY(), 2)); if (dis <= 1000) { // 输出格式 :酒店ID 地标ID 距离(测试用)。抛弃没有酒店关系的地标数据 context.write(new Text(h.getID() + "\t" + key.getID() + "\t" + (int) dis), NullWritable.get()); } } } ++count; } } } } // 把酒店地标数据重新划分到不同的reduce class HotelPartitioner extends Partitioner<Hotel, NullWritable> { public int getPartition(Hotel key, NullWritable value, int numPartitions) { return Math.abs((int) ((key.getXa() + key.getYa()) % numPartitions)); }; } //自定义Group 根据1000*1000的块重新洗牌聚合 class HotelGrouping extends WritableComparator { protected HotelGrouping() { super(Hotel.class, true); } @SuppressWarnings("rawtypes") @Override // 比较 WritableComparables. public int compare(WritableComparable w1, WritableComparable w2) { Hotel h1 = (Hotel) w1; Hotel h2 = (Hotel) w2; int r1x = h1.getXa(); int r1y = h1.getYa(); 上一页 [1] [2] [3] [4] [5] [6] 下一页
|