001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Iterator;
024import java.util.List;
025import java.util.TreeSet;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.ArrayBackedTag;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.CellComparator;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.KeyValueUtil;
033import org.apache.hadoop.hbase.Tag;
034import org.apache.hadoop.hbase.TagType;
035import org.apache.hadoop.hbase.TagUtil;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.exceptions.DeserializationException;
039import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
040import org.apache.hadoop.hbase.security.visibility.CellVisibility;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.mapreduce.Reducer;
043import org.apache.hadoop.util.StringUtils;
044
045/**
046 * Emits sorted Puts.
047 * Reads in all Puts from passed Iterator, sorts them, then emits
048 * Puts in sorted order.  If lots of columns per row, it will use lots of
049 * memory sorting.
050 * @see HFileOutputFormat2
051 * @see CellSortReducer
052 */
053@InterfaceAudience.Public
054public class PutSortReducer extends
055    Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> {
056  // the cell creator
057  private CellCreator kvCreator;
058
059  @Override
060  protected void
061      setup(Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context)
062          throws IOException, InterruptedException {
063    Configuration conf = context.getConfiguration();
064    this.kvCreator = new CellCreator(conf);
065  }
066
067  @Override
068  protected void reduce(
069      ImmutableBytesWritable row,
070      java.lang.Iterable<Put> puts,
071      Reducer<ImmutableBytesWritable, Put,
072              ImmutableBytesWritable, KeyValue>.Context context)
073      throws java.io.IOException, InterruptedException
074  {
075    // although reduce() is called per-row, handle pathological case
076    long threshold = context.getConfiguration().getLong(
077        "putsortreducer.row.threshold", 1L * (1<<30));
078    Iterator<Put> iter = puts.iterator();
079    while (iter.hasNext()) {
080      TreeSet<KeyValue> map = new TreeSet<>(CellComparator.getInstance());
081      long curSize = 0;
082      // stop at the end or the RAM threshold
083      List<Tag> tags = new ArrayList<>();
084      while (iter.hasNext() && curSize < threshold) {
085        // clear the tags
086        tags.clear();
087        Put p = iter.next();
088        long t = p.getTTL();
089        if (t != Long.MAX_VALUE) {
090          // add TTL tag if found
091          tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(t)));
092        }
093        byte[] acl = p.getACL();
094        if (acl != null) {
095          // add ACL tag if found
096          tags.add(new ArrayBackedTag(TagType.ACL_TAG_TYPE, acl));
097        }
098        try {
099          CellVisibility cellVisibility = p.getCellVisibility();
100          if (cellVisibility != null) {
101            // add the visibility labels if any
102            tags.addAll(kvCreator.getVisibilityExpressionResolver()
103                .createVisibilityExpTags(cellVisibility.getExpression()));
104          }
105        } catch (DeserializationException e) {
106          // We just throw exception here. Should we allow other mutations to proceed by
107          // just ignoring the bad one?
108          throw new IOException("Invalid visibility expression found in mutation " + p, e);
109        }
110        for (List<Cell> cells: p.getFamilyCellMap().values()) {
111          for (Cell cell: cells) {
112            // Creating the KV which needs to be directly written to HFiles. Using the Facade
113            // KVCreator for creation of kvs.
114            KeyValue kv = null;
115            TagUtil.carryForwardTags(tags, cell);
116            if (!tags.isEmpty()) {
117              kv = (KeyValue) kvCreator.create(cell.getRowArray(), cell.getRowOffset(),
118                cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(),
119                cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(),
120                cell.getQualifierLength(), cell.getTimestamp(), cell.getValueArray(),
121                cell.getValueOffset(), cell.getValueLength(), tags);
122            } else {
123              kv = KeyValueUtil.ensureKeyValue(cell);
124            }
125            if (map.add(kv)) {// don't count duplicated kv into size
126              curSize += kv.heapSize();
127            }
128          }
129        }
130      }
131      context.setStatus("Read " + map.size() + " entries of " + map.getClass()
132          + "(" + StringUtils.humanReadableInt(curSize) + ")");
133      int index = 0;
134      for (KeyValue kv : map) {
135        context.write(row, kv);
136        if (++index % 100 == 0)
137          context.setStatus("Wrote " + index);
138      }
139
140      // if we have more entries to process
141      if (iter.hasNext()) {
142        // force flush because we cannot guarantee intra-row sorted order
143        context.write(null, null);
144      }
145    }
146  }
147}