View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce.hadoopbackport;
19  
20  import java.io.IOException;
21  import java.lang.reflect.Array;
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  
25  import org.apache.hadoop.conf.Configurable;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.fs.FileSystem;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.io.BinaryComparable;
30  import org.apache.hadoop.io.NullWritable;
31  import org.apache.hadoop.io.SequenceFile;
32  import org.apache.hadoop.io.RawComparator;
33  import org.apache.hadoop.io.WritableComparable;
34  import org.apache.hadoop.mapreduce.Job;
35  import org.apache.hadoop.mapreduce.Partitioner;
36  import org.apache.hadoop.util.ReflectionUtils;
37  
38  /**
39   * Partitioner effecting a total order by reading split points from
40   * an externally generated source.
41   * 
42   * This is an identical copy of o.a.h.mapreduce.lib.partition.TotalOrderPartitioner
43   * from Hadoop trunk at r910774.
44   */
45  public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
46      extends Partitioner<K,V> implements Configurable {
47  
48    private Node partitions;
49    public static final String DEFAULT_PATH = "_partition.lst";
50    public static final String PARTITIONER_PATH = 
51      "mapreduce.totalorderpartitioner.path";
52    public static final String MAX_TRIE_DEPTH = 
53      "mapreduce.totalorderpartitioner.trie.maxdepth"; 
54    public static final String NATURAL_ORDER = 
55      "mapreduce.totalorderpartitioner.naturalorder";
56    Configuration conf;
57  
58    public TotalOrderPartitioner() { }
59  
60    /**
61     * Read in the partition file and build indexing data structures.
62     * If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
63     * <tt>total.order.partitioner.natural.order</tt> is not false, a trie
64     * of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
65     * will be built. Otherwise, keys will be located using a binary search of
66     * the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
67     * defined for this job. The input file must be sorted with the same
68     * comparator and contain {@link Job#getNumReduceTasks()} - 1 keys.
69     */
70    @SuppressWarnings("unchecked") // keytype from conf not static
71    public void setConf(Configuration conf) {
72      try {
73        this.conf = conf;
74        String parts = getPartitionFile(conf);
75        final Path partFile = new Path(parts);
76        final FileSystem fs = (DEFAULT_PATH.equals(parts))
77          ? FileSystem.getLocal(conf)     // assume in DistributedCache
78          : partFile.getFileSystem(conf);
79  
80        Job job = new Job(conf);
81        Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
82        K[] splitPoints = readPartitions(fs, partFile, keyClass, conf);
83        if (splitPoints.length != job.getNumReduceTasks() - 1) {
84          throw new IOException("Wrong number of partitions in keyset:"
85              + splitPoints.length);
86        }
87        RawComparator<K> comparator =
88          (RawComparator<K>) job.getSortComparator();
89        for (int i = 0; i < splitPoints.length - 1; ++i) {
90          if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
91            throw new IOException("Split points are out of order");
92          }
93        }
94        boolean natOrder =
95          conf.getBoolean(NATURAL_ORDER, true);
96        if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
97          partitions = buildTrie((BinaryComparable[])splitPoints, 0,
98              splitPoints.length, new byte[0],
99              // Now that blocks of identical splitless trie nodes are 
100             // represented reentrantly, and we develop a leaf for any trie
101             // node with only one split point, the only reason for a depth
102             // limit is to refute stack overflow or bloat in the pathological
103             // case where the split points are long and mostly look like bytes 
104             // iii...iixii...iii   .  Therefore, we make the default depth
105             // limit large but not huge.
106             conf.getInt(MAX_TRIE_DEPTH, 200));
107       } else {
108         partitions = new BinarySearchNode(splitPoints, comparator);
109       }
110     } catch (IOException e) {
111       throw new IllegalArgumentException("Can't read partitions file", e);
112     }
113   }
114 
115   public Configuration getConf() {
116     return conf;
117   }
118   
119   // by construction, we know if our keytype
120   @SuppressWarnings("unchecked") // is memcmp-able and uses the trie
121   public int getPartition(K key, V value, int numPartitions) {
122     return partitions.findPartition(key);
123   }
124 
125   /**
126    * Set the path to the SequenceFile storing the sorted partition keyset.
127    * It must be the case that for <tt>R</tt> reduces, there are <tt>R-1</tt>
128    * keys in the SequenceFile.
129    */
130   public static void setPartitionFile(Configuration conf, Path p) {
131     conf.set(PARTITIONER_PATH, p.toString());
132   }
133 
134   /**
135    * Get the path to the SequenceFile storing the sorted partition keyset.
136    * @see #setPartitionFile(Configuration, Path)
137    */
138   public static String getPartitionFile(Configuration conf) {
139     return conf.get(PARTITIONER_PATH, DEFAULT_PATH);
140   }
141 
142   /**
143    * Interface to the partitioner to locate a key in the partition keyset.
144    */
145   interface Node<T> {
146     /**
147      * Locate partition in keyset K, st [Ki..Ki+1) defines a partition,
148      * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1.
149      */
150     int findPartition(T key);
151   }
152 
153   /**
154    * Base class for trie nodes. If the keytype is memcomp-able, this builds
155    * tries of the first <tt>total.order.partitioner.max.trie.depth</tt>
156    * bytes.
157    */
158   static abstract class TrieNode implements Node<BinaryComparable> {
159     private final int level;
160     TrieNode(int level) {
161       this.level = level;
162     }
163     int getLevel() {
164       return level;
165     }
166   }
167 
168   /**
169    * For types that are not {@link org.apache.hadoop.io.BinaryComparable} or
170    * where disabled by <tt>total.order.partitioner.natural.order</tt>,
171    * search the partition keyset with a binary search.
172    */
173   class BinarySearchNode implements Node<K> {
174     private final K[] splitPoints;
175     private final RawComparator<K> comparator;
176     BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
177       this.splitPoints = splitPoints;
178       this.comparator = comparator;
179     }
180     public int findPartition(K key) {
181       final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
182       return (pos < 0) ? -pos : pos;
183     }
184   }
185 
186   /**
187    * An inner trie node that contains 256 children based on the next
188    * character.
189    */
190   class InnerTrieNode extends TrieNode {
191     private TrieNode[] child = new TrieNode[256];
192 
193     InnerTrieNode(int level) {
194       super(level);
195     }
196     public int findPartition(BinaryComparable key) {
197       int level = getLevel();
198       if (key.getLength() <= level) {
199         return child[0].findPartition(key);
200       }
201       return child[0xFF & key.getBytes()[level]].findPartition(key);
202     }
203   }
204   
205   /**
206    * @param level        the tree depth at this node
207    * @param splitPoints  the full split point vector, which holds
208    *                     the split point or points this leaf node
209    *                     should contain
210    * @param lower        first INcluded element of splitPoints
211    * @param upper        first EXcluded element of splitPoints
212    * @return  a leaf node.  They come in three kinds: no split points 
213    *          [and the findParttion returns a canned index], one split
214    *          point [and we compare with a single comparand], or more
215    *          than one [and we do a binary search].  The last case is
216    *          rare.
217    */
218   private TrieNode LeafTrieNodeFactory
219              (int level, BinaryComparable[] splitPoints, int lower, int upper) {
220       switch (upper - lower) {
221       case 0:
222           return new UnsplitTrieNode(level, lower);
223           
224       case 1:
225           return new SinglySplitTrieNode(level, splitPoints, lower);
226           
227       default:
228           return new LeafTrieNode(level, splitPoints, lower, upper);
229       }
230   }
231 
232   /**
233    * A leaf trie node that scans for the key between lower..upper.
234    * 
235    * We don't generate many of these now, since we usually continue trie-ing 
236    * when more than one split point remains at this level. and we make different
237    * objects for nodes with 0 or 1 split point.
238    */
239   private class LeafTrieNode extends TrieNode {
240     final int lower;
241     final int upper;
242     final BinaryComparable[] splitPoints;
243     LeafTrieNode(int level, BinaryComparable[] splitPoints, int lower, int upper) {
244       super(level);
245       this.lower = lower;
246       this.upper = upper;
247       this.splitPoints = splitPoints;
248     }
249     public int findPartition(BinaryComparable key) {
250       final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1;
251       return (pos < 0) ? -pos : pos;
252     }
253   }
254   
255   private class UnsplitTrieNode extends TrieNode {
256       final int result;
257       
258       UnsplitTrieNode(int level, int value) {
259           super(level);
260           this.result = value;
261       }
262       
263       public int findPartition(BinaryComparable key) {
264           return result;
265       }
266   }
267   
268   private class SinglySplitTrieNode extends TrieNode {
269       final int               lower;
270       final BinaryComparable  mySplitPoint;
271       
272       SinglySplitTrieNode(int level, BinaryComparable[] splitPoints, int lower) {
273           super(level);
274           this.lower = lower;
275           this.mySplitPoint = splitPoints[lower];
276       }
277       
278       public int findPartition(BinaryComparable key) {
279           return lower + (key.compareTo(mySplitPoint) < 0 ? 0 : 1);
280       }
281   }
282 
283 
284   /**
285    * Read the cut points from the given IFile.
286    * @param fs The file system
287    * @param p The path to read
288    * @param keyClass The map output key class
289    * @param job The job config
290    * @throws IOException
291    */
292                                  // matching key types enforced by passing in
293   @SuppressWarnings("unchecked") // map output key class
294   private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
295       Configuration conf) throws IOException {
296     SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
297     ArrayList<K> parts = new ArrayList<K>();
298     K key = ReflectionUtils.newInstance(keyClass, conf);
299     NullWritable value = NullWritable.get();
300     while (reader.next(key, value)) {
301       parts.add(key);
302       key = ReflectionUtils.newInstance(keyClass, conf);
303     }
304     reader.close();
305     return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
306   }
307   
308   /**
309    * 
310    * This object contains a TrieNodeRef if there is such a thing that
311    * can be repeated.  Two adjacent trie node slots that contain no 
312    * split points can be filled with the same trie node, even if they
313    * are not on the same level.  See buildTreeRec, below.
314    *
315    */  
316   private class CarriedTrieNodeRef
317   {
318       TrieNode   content;
319       
320       CarriedTrieNodeRef() {
321           content = null;
322       }
323   }
324 
325   
326   /**
327    * Given a sorted set of cut points, build a trie that will find the correct
328    * partition quickly.
329    * @param splits the list of cut points
330    * @param lower the lower bound of partitions 0..numPartitions-1
331    * @param upper the upper bound of partitions 0..numPartitions-1
332    * @param prefix the prefix that we have already checked against
333    * @param maxDepth the maximum depth we will build a trie for
334    * @return the trie node that will divide the splits correctly
335    */
336   private TrieNode buildTrie(BinaryComparable[] splits, int lower,
337           int upper, byte[] prefix, int maxDepth) {
338       return buildTrieRec
339                (splits, lower, upper, prefix, maxDepth, new CarriedTrieNodeRef());
340   }
341   
342   /**
343    * This is the core of buildTrie.  The interface, and stub, above, just adds
344    * an empty CarriedTrieNodeRef.  
345    * 
346    * We build trie nodes in depth first order, which is also in key space
347    * order.  Every leaf node is referenced as a slot in a parent internal
348    * node.  If two adjacent slots [in the DFO] hold leaf nodes that have
349    * no split point, then they are not separated by a split point either, 
350    * because there's no place in key space for that split point to exist.
351    * 
352    * When that happens, the leaf nodes would be semantically identical, and
353    * we reuse the object.  A single CarriedTrieNodeRef "ref" lives for the 
354    * duration of the tree-walk.  ref carries a potentially reusable, unsplit
355    * leaf node for such reuse until a leaf node with a split arises, which 
356    * breaks the chain until we need to make a new unsplit leaf node.
357    * 
358    * Note that this use of CarriedTrieNodeRef means that for internal nodes, 
359    * for internal nodes if this code is modified in any way we still need 
360    * to make or fill in the subnodes in key space order.
361    */
362   private TrieNode buildTrieRec(BinaryComparable[] splits, int lower,
363       int upper, byte[] prefix, int maxDepth, CarriedTrieNodeRef ref) {
364     final int depth = prefix.length;
365     // We generate leaves for a single split point as well as for 
366     // no split points.
367     if (depth >= maxDepth || lower >= upper - 1) {
368         // If we have two consecutive requests for an unsplit trie node, we
369         // can deliver the same one the second time.
370         if (lower == upper && ref.content != null) {
371             return ref.content;
372         }
373         TrieNode  result = LeafTrieNodeFactory(depth, splits, lower, upper);
374         ref.content = lower == upper ? result : null;
375         return result;
376     }
377     InnerTrieNode result = new InnerTrieNode(depth);
378     byte[] trial = Arrays.copyOf(prefix, prefix.length + 1);
379     // append an extra byte on to the prefix
380     int         currentBound = lower;
381     for(int ch = 0; ch < 0xFF; ++ch) {
382       trial[depth] = (byte) (ch + 1);
383       lower = currentBound;
384       while (currentBound < upper) {
385         if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) {
386           break;
387         }
388         currentBound += 1;
389       }
390       trial[depth] = (byte) ch;
391       result.child[0xFF & ch]
392                    = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
393     }
394     // pick up the rest
395     trial[depth] = (byte)0xFF;
396     result.child[0xFF] 
397                  = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
398     
399     return result;
400   }
401 }