001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Iterator;
023import java.util.List;
024import java.util.TreeSet;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.ArrayBackedTag;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.CellComparator;
029import org.apache.hadoop.hbase.KeyValue;
030import org.apache.hadoop.hbase.KeyValueUtil;
031import org.apache.hadoop.hbase.Tag;
032import org.apache.hadoop.hbase.TagType;
033import org.apache.hadoop.hbase.TagUtil;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.exceptions.DeserializationException;
036import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
037import org.apache.hadoop.hbase.security.visibility.CellVisibility;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.mapreduce.Reducer;
040import org.apache.hadoop.util.StringUtils;
041import org.apache.yetus.audience.InterfaceAudience;
042
043/**
044 * Emits sorted Puts. Reads in all Puts from passed Iterator, sorts them, then emits Puts in sorted
045 * order. If lots of columns per row, it will use lots of memory sorting.
046 * @see HFileOutputFormat2
047 * @see CellSortReducer
048 */
049@InterfaceAudience.Public
050public class PutSortReducer
051  extends Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> {
052  // the cell creator
053  private CellCreator kvCreator;
054
055  @Override
056  protected void
057    setup(Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context)
058      throws IOException, InterruptedException {
059    Configuration conf = context.getConfiguration();
060    this.kvCreator = new CellCreator(conf);
061  }
062
063  @Override
064  protected void reduce(ImmutableBytesWritable row, java.lang.Iterable<Put> puts,
065    Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context)
066    throws java.io.IOException, InterruptedException {
067    // although reduce() is called per-row, handle pathological case
068    long threshold =
069      context.getConfiguration().getLong("putsortreducer.row.threshold", 1L * (1 << 30));
070    Iterator<Put> iter = puts.iterator();
071    while (iter.hasNext()) {
072      TreeSet<KeyValue> map = new TreeSet<>(CellComparator.getInstance());
073      long curSize = 0;
074      // stop at the end or the RAM threshold
075      List<Tag> tags = new ArrayList<>();
076      while (iter.hasNext() && curSize < threshold) {
077        // clear the tags
078        tags.clear();
079        Put p = iter.next();
080        long t = p.getTTL();
081        if (t != Long.MAX_VALUE) {
082          // add TTL tag if found
083          tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(t)));
084        }
085        byte[] acl = p.getACL();
086        if (acl != null) {
087          // add ACL tag if found
088          tags.add(new ArrayBackedTag(TagType.ACL_TAG_TYPE, acl));
089        }
090        try {
091          CellVisibility cellVisibility = p.getCellVisibility();
092          if (cellVisibility != null) {
093            // add the visibility labels if any
094            tags.addAll(kvCreator.getVisibilityExpressionResolver()
095              .createVisibilityExpTags(cellVisibility.getExpression()));
096          }
097        } catch (DeserializationException e) {
098          // We just throw exception here. Should we allow other mutations to proceed by
099          // just ignoring the bad one?
100          throw new IOException("Invalid visibility expression found in mutation " + p, e);
101        }
102        for (List<Cell> cells : p.getFamilyCellMap().values()) {
103          for (Cell cell : cells) {
104            // Creating the KV which needs to be directly written to HFiles. Using the Facade
105            // KVCreator for creation of kvs.
106            KeyValue kv = null;
107            TagUtil.carryForwardTags(tags, cell);
108            if (!tags.isEmpty()) {
109              kv = (KeyValue) kvCreator.create(cell.getRowArray(), cell.getRowOffset(),
110                cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(),
111                cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(),
112                cell.getQualifierLength(), cell.getTimestamp(), cell.getValueArray(),
113                cell.getValueOffset(), cell.getValueLength(), tags);
114            } else {
115              kv = KeyValueUtil.ensureKeyValue(cell);
116            }
117            if (map.add(kv)) {// don't count duplicated kv into size
118              curSize += kv.heapSize();
119            }
120          }
121        }
122      }
123      context.setStatus("Read " + map.size() + " entries of " + map.getClass() + "("
124        + StringUtils.humanReadableInt(curSize) + ")");
125      int index = 0;
126      for (KeyValue kv : map) {
127        context.write(row, kv);
128        if (++index % 100 == 0) context.setStatus("Wrote " + index);
129      }
130
131      // if we have more entries to process
132      if (iter.hasNext()) {
133        // force flush because we cannot guarantee intra-row sorted order
134        context.write(null, null);
135      }
136    }
137  }
138}