001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.util.TreeSet;
022
023import org.apache.yetus.audience.InterfaceAudience;
024import org.apache.hadoop.hbase.CellComparatorImpl;
025import org.apache.hadoop.hbase.KeyValue;
026import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
027import org.apache.hadoop.mapreduce.Reducer;
028
029/**
030 * Emits sorted KeyValues.
031 * Reads in all KeyValues from passed Iterator, sorts them, then emits
032 * KeyValues in sorted order.  If lots of columns per row, it will use lots of
033 * memory sorting.
034 * @see HFileOutputFormat2
035 * @deprecated Use {@link CellSortReducer}. Will be removed from
036 * 3.0 onwards
037 */
038@Deprecated
039@InterfaceAudience.Public
040public class KeyValueSortReducer
041    extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
042  protected void reduce(ImmutableBytesWritable row, Iterable<KeyValue> kvs,
043      Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
044  throws java.io.IOException, InterruptedException {
045    TreeSet<KeyValue> map = new TreeSet<>(CellComparatorImpl.COMPARATOR);
046    for (KeyValue kv: kvs) {
047      try {
048        map.add(kv.clone());
049      } catch (CloneNotSupportedException e) {
050        throw new java.io.IOException(e);
051      }
052    }
053    context.setStatus("Read " + map.getClass());
054    int index = 0;
055    for (KeyValue kv: map) {
056      context.write(row, kv);
057      if (++index % 100 == 0) context.setStatus("Wrote " + index);
058    }
059  }
060}