View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.TreeSet;
26
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.ArrayBackedTag;
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.CellComparator;
31  import org.apache.hadoop.hbase.KeyValue;
32  import org.apache.hadoop.hbase.KeyValueUtil;
33  import org.apache.hadoop.hbase.Tag;
34  import org.apache.hadoop.hbase.TagType;
35  import org.apache.hadoop.hbase.TagUtil;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.hbase.classification.InterfaceStability;
38  import org.apache.hadoop.hbase.client.Put;
39  import org.apache.hadoop.hbase.exceptions.DeserializationException;
40  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
41  import org.apache.hadoop.hbase.security.visibility.CellVisibility;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.hadoop.mapreduce.Reducer;
44  import org.apache.hadoop.util.StringUtils;
45
46  /**
47   * Emits sorted Puts.
48   * Reads in all Puts from passed Iterator, sorts them, then emits
49   * Puts in sorted order.  If lots of columns per row, it will use lots of
50   * memory sorting.
51   * @see HFileOutputFormat2
52   * @see KeyValueSortReducer
53   */
54  @InterfaceAudience.Public
55  @InterfaceStability.Stable
56  public class PutSortReducer extends
57      Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> {
58    // the cell creator
59    private CellCreator kvCreator;
60
61    @Override
62    protected void
63        setup(Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context)
64            throws IOException, InterruptedException {
65      Configuration conf = context.getConfiguration();
66      this.kvCreator = new CellCreator(conf);
67    }
68
69    @Override
70    protected void reduce(
71        ImmutableBytesWritable row,
72        java.lang.Iterable<Put> puts,
73        Reducer<ImmutableBytesWritable, Put,
74                ImmutableBytesWritable, KeyValue>.Context context)
75        throws java.io.IOException, InterruptedException
76    {
77      // although reduce() is called per-row, handle pathological case
78      long threshold = context.getConfiguration().getLong(
79          "putsortreducer.row.threshold", 1L * (1<<30));
80      Iterator<Put> iter = puts.iterator();
81      while (iter.hasNext()) {
82        TreeSet<KeyValue> map = new TreeSet<KeyValue>(CellComparator.COMPARATOR);
83        long curSize = 0;
84        // stop at the end or the RAM threshold
85        List<Tag> tags = new ArrayList<Tag>();
86        while (iter.hasNext() && curSize < threshold) {
87          // clear the tags
88          tags.clear();
89          Put p = iter.next();
90          long t = p.getTTL();
91          if (t != Long.MAX_VALUE) {
92            // add TTL tag if found
93            tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(t)));
94          }
95          byte[] acl = p.getACL();
96          if (acl != null) {
97            // add ACL tag if found
98            tags.add(new ArrayBackedTag(TagType.ACL_TAG_TYPE, acl));
99          }
100         try {
101           CellVisibility cellVisibility = p.getCellVisibility();
102           if (cellVisibility != null) {
103             // add the visibility labels if any
104             tags.addAll(kvCreator.getVisibilityExpressionResolver()
105                 .createVisibilityExpTags(cellVisibility.getExpression()));
106           }
107         } catch (DeserializationException e) {
108           // We just throw exception here. Should we allow other mutations to proceed by
109           // just ignoring the bad one?
110           throw new IOException("Invalid visibility expression found in mutation " + p, e);
111         }
112         for (List<Cell> cells: p.getFamilyCellMap().values()) {
113           for (Cell cell: cells) {
114             // Creating the KV which needs to be directly written to HFiles. Using the Facade
115             // KVCreator for creation of kvs.
116             KeyValue kv = null;
117             TagUtil.carryForwardTags(tags, cell);
118             if (!tags.isEmpty()) {
119               kv = (KeyValue) kvCreator.create(cell.getRowArray(), cell.getRowOffset(),
120                 cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(),
121                 cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(),
122                 cell.getQualifierLength(), cell.getTimestamp(), cell.getValueArray(),
123                 cell.getValueOffset(), cell.getValueLength(), tags);
124             } else {
125               kv = KeyValueUtil.ensureKeyValue(cell);
126             }
127             if (map.add(kv)) {// don't count duplicated kv into size
128               curSize += kv.heapSize();
129             }
130           }
131         }
132       }
133       context.setStatus("Read " + map.size() + " entries of " + map.getClass()
134           + "(" + StringUtils.humanReadableInt(curSize) + ")");
135       int index = 0;
136       for (KeyValue kv : map) {
137         context.write(row, kv);
138         if (++index % 100 == 0)
139           context.setStatus("Wrote " + index);
140       }
141
142       // if we have more entries to process
143       if (iter.hasNext()) {
144         // force flush because we cannot guarantee intra-row sorted order
145         context.write(null, null);
146       }
147     }
148   }
149 }