View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.codec.prefixtree.encode;
20  
21  import java.io.IOException;
22  import java.io.OutputStream;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.Cell;
28  import org.apache.hadoop.hbase.CellUtil;
29  import org.apache.hadoop.hbase.KeyValueUtil;
30  import org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeBlockMeta;
31  import org.apache.hadoop.hbase.codec.prefixtree.encode.column.ColumnSectionWriter;
32  import org.apache.hadoop.hbase.codec.prefixtree.encode.other.CellTypeEncoder;
33  import org.apache.hadoop.hbase.codec.prefixtree.encode.other.ColumnNodeType;
34  import org.apache.hadoop.hbase.codec.prefixtree.encode.other.LongEncoder;
35  import org.apache.hadoop.hbase.codec.prefixtree.encode.row.RowSectionWriter;
36  import org.apache.hadoop.hbase.codec.prefixtree.encode.tokenize.Tokenizer;
37  import org.apache.hadoop.hbase.io.CellOutputStream;
38  import org.apache.hadoop.hbase.util.ArrayUtils;
39  import org.apache.hadoop.hbase.util.ByteRange;
40  import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
41  import org.apache.hadoop.hbase.util.byterange.ByteRangeSet;
42  import org.apache.hadoop.hbase.util.byterange.impl.ByteRangeHashSet;
43  import org.apache.hadoop.hbase.util.byterange.impl.ByteRangeTreeSet;
44  import org.apache.hadoop.hbase.util.vint.UFIntTool;
45  import org.apache.hadoop.io.WritableUtils;
46  /**
47   * This is the primary class for converting a CellOutputStream into an encoded byte[]. As Cells are
48   * added they are completely copied into the various encoding structures. This is important because
49   * usually the cells being fed in during compactions will be transient.<br/>
50   * <br/>
51   * Usage:<br/>
52   * 1) constructor<br/>
53   * 4) append cells in sorted order: write(Cell cell)<br/>
54   * 5) flush()<br/>
55   */
56  @InterfaceAudience.Private
57  public class PrefixTreeEncoder implements CellOutputStream {
58  
59    /**************** static ************************/
60  
61    protected static final Log LOG = LogFactory.getLog(PrefixTreeEncoder.class);
62  
63    //future-proof where HBase supports multiple families in a data block.
64    public static final boolean MULITPLE_FAMILIES_POSSIBLE = false;
65  
66    private static final boolean USE_HASH_COLUMN_SORTER = true;
67    private static final int INITIAL_PER_CELL_ARRAY_SIZES = 256;
68    private static final int VALUE_BUFFER_INIT_SIZE = 64 * 1024;
69  
70  
71    /**************** fields *************************/
72  
73    protected long numResets = 0L;
74  
75    protected OutputStream outputStream;
76  
77    /*
78     * Cannot change during a single block's encoding. If false, then substitute incoming Cell's
79     * mvccVersion with zero and write out the block as usual.
80     */
81    protected boolean includeMvccVersion;
82  
83    /*
84     * reusable ByteRanges used for communicating with the sorters/compilers
85     */
86    protected ByteRange rowRange;
87    protected ByteRange familyRange;
88    protected ByteRange qualifierRange;
89    protected ByteRange tagsRange;
90  
91    /*
92     * incoming Cell fields are copied into these arrays
93     */
94    protected long[] timestamps;
95    protected long[] mvccVersions;
96    protected byte[] typeBytes;
97    protected int[] valueOffsets;
98    protected int[] tagsOffsets;
99    protected byte[] values;
100   protected byte[] tags;
101 
102   protected PrefixTreeBlockMeta blockMeta;
103 
104   /*
105    * Sub-encoders for the simple long/byte fields of a Cell.  Add to these as each cell arrives and
106    * compile before flushing.
107    */
108   protected LongEncoder timestampEncoder;
109   protected LongEncoder mvccVersionEncoder;
110   protected CellTypeEncoder cellTypeEncoder;
111 
112   /*
113    * Structures used for collecting families and qualifiers, de-duplicating them, and sorting them
114    * so they can be passed to the tokenizers. Unlike row keys where we can detect duplicates by
115    * comparing only with the previous row key, families and qualifiers can arrive in unsorted order
116    * in blocks spanning multiple rows. We must collect them all into a set to de-duplicate them.
117    */
118   protected ByteRangeSet familyDeduplicator;
119   protected ByteRangeSet qualifierDeduplicator;
120   protected ByteRangeSet tagsDeduplicator;
121   /*
122    * Feed sorted byte[]s into these tokenizers which will convert the byte[]s to an in-memory
123    * trie structure with nodes connected by memory pointers (not serializable yet).
124    */
125   protected Tokenizer rowTokenizer;
126   protected Tokenizer familyTokenizer;
127   protected Tokenizer qualifierTokenizer;
128   protected Tokenizer tagsTokenizer;
129 
130   /*
131    * Writers take an in-memory trie, sort the nodes, calculate offsets and lengths, and write
132    * all information to an output stream of bytes that can be stored on disk.
133    */
134   protected RowSectionWriter rowWriter;
135   protected ColumnSectionWriter familyWriter;
136   protected ColumnSectionWriter qualifierWriter;
137   protected ColumnSectionWriter tagsWriter;
138 
139   /*
140    * Integers used for counting cells and bytes.  We keep track of the size of the Cells as if they
141    * were full KeyValues because some parts of HBase like to know the "unencoded size".
142    */
143   protected int totalCells = 0;
144   protected int totalUnencodedBytes = 0;//numBytes if the cells were KeyValues
145   protected int totalValueBytes = 0;
146   protected int totalTagBytes = 0;
147   protected int maxValueLength = 0;
148   protected int maxTagLength = 0;
149   protected int totalBytes = 0;//
150 
151 
152   /***************** construct ***********************/
153 
154   public PrefixTreeEncoder(OutputStream outputStream, boolean includeMvccVersion) {
155     // used during cell accumulation
156     this.blockMeta = new PrefixTreeBlockMeta();
157     this.rowRange = new SimpleMutableByteRange();
158     this.familyRange = new SimpleMutableByteRange();
159     this.qualifierRange = new SimpleMutableByteRange();
160     this.timestamps = new long[INITIAL_PER_CELL_ARRAY_SIZES];
161     this.mvccVersions = new long[INITIAL_PER_CELL_ARRAY_SIZES];
162     this.typeBytes = new byte[INITIAL_PER_CELL_ARRAY_SIZES];
163     this.valueOffsets = new int[INITIAL_PER_CELL_ARRAY_SIZES];
164     this.values = new byte[VALUE_BUFFER_INIT_SIZE];
165 
166     // used during compilation
167     this.familyDeduplicator = USE_HASH_COLUMN_SORTER ? new ByteRangeHashSet()
168         : new ByteRangeTreeSet();
169     this.qualifierDeduplicator = USE_HASH_COLUMN_SORTER ? new ByteRangeHashSet()
170         : new ByteRangeTreeSet();
171     this.timestampEncoder = new LongEncoder();
172     this.mvccVersionEncoder = new LongEncoder();
173     this.cellTypeEncoder = new CellTypeEncoder();
174     this.rowTokenizer = new Tokenizer();
175     this.familyTokenizer = new Tokenizer();
176     this.qualifierTokenizer = new Tokenizer();
177     this.rowWriter = new RowSectionWriter();
178     this.familyWriter = new ColumnSectionWriter();
179     this.qualifierWriter = new ColumnSectionWriter();
180     initializeTagHelpers();
181 
182     reset(outputStream, includeMvccVersion);
183   }
184 
185   public void reset(OutputStream outputStream, boolean includeMvccVersion) {
186     ++numResets;
187     this.includeMvccVersion = includeMvccVersion;
188     this.outputStream = outputStream;
189     valueOffsets[0] = 0;
190     familyDeduplicator.reset();
191     qualifierDeduplicator.reset();
192     tagsDeduplicator.reset();
193     tagsWriter.reset();
194     tagsTokenizer.reset();
195     rowTokenizer.reset();
196     timestampEncoder.reset();
197     mvccVersionEncoder.reset();
198     cellTypeEncoder.reset();
199     familyTokenizer.reset();
200     qualifierTokenizer.reset();
201     rowWriter.reset();
202     familyWriter.reset();
203     qualifierWriter.reset();
204 
205     totalCells = 0;
206     totalUnencodedBytes = 0;
207     totalValueBytes = 0;
208     maxValueLength = 0;
209     totalBytes = 0;
210   }
211 
212   protected void initializeTagHelpers() {
213     this.tagsRange = new SimpleMutableByteRange();
214     this.tagsDeduplicator = USE_HASH_COLUMN_SORTER ? new ByteRangeHashSet()
215     : new ByteRangeTreeSet();
216     this.tagsTokenizer = new Tokenizer();
217     this.tagsWriter = new ColumnSectionWriter();
218   }
219 
220   /**
221    * Check that the arrays used to hold cell fragments are large enough for the cell that is being
222    * added. Since the PrefixTreeEncoder is cached between uses, these arrays may grow during the
223    * first few block encodings but should stabilize quickly.
224    */
225   protected void ensurePerCellCapacities() {
226     int currentCapacity = valueOffsets.length;
227     int neededCapacity = totalCells + 2;// some things write one index ahead. +2 to be safe
228     if (neededCapacity < currentCapacity) {
229       return;
230     }
231 
232     int padding = neededCapacity;//this will double the array size
233     timestamps = ArrayUtils.growIfNecessary(timestamps, neededCapacity, padding);
234     mvccVersions = ArrayUtils.growIfNecessary(mvccVersions, neededCapacity, padding);
235     typeBytes = ArrayUtils.growIfNecessary(typeBytes, neededCapacity, padding);
236     valueOffsets = ArrayUtils.growIfNecessary(valueOffsets, neededCapacity, padding);
237   }
238 
239   /******************** CellOutputStream methods *************************/
240 
241   /**
242    * Note: Unused until support is added to the scanner/heap
243    * <p/>
244    * The following method are optimized versions of write(Cell cell). The result should be
245    * identical, however the implementation may be able to execute them much more efficiently because
246    * it does not need to compare the unchanged fields with the previous cell's.
247    * <p/>
248    * Consider the benefits during compaction when paired with a CellScanner that is also aware of
249    * row boundaries. The CellScanner can easily use these methods instead of blindly passing Cells
250    * to the write(Cell cell) method.
251    * <p/>
252    * The savings of skipping duplicate row detection are significant with long row keys. A
253    * DataBlockEncoder may store a row key once in combination with a count of how many cells are in
254    * the row. With a 100 byte row key, we can replace 100 byte comparisons with a single increment
255    * of the counter, and that is for every cell in the row.
256    */
257 
258   /**
259    * Add a Cell to the output stream but repeat the previous row. 
260    */
261   //@Override
262   public void writeWithRepeatRow(Cell cell) {
263     ensurePerCellCapacities();//can we optimize away some of this?
264 
265     //save a relatively expensive row comparison, incrementing the row's counter instead
266     rowTokenizer.incrementNumOccurrencesOfLatestValue();
267     addFamilyPart(cell);
268     addQualifierPart(cell);
269     addAfterRowFamilyQualifier(cell);
270   }
271 
272 
273   @Override
274   public void write(Cell cell) {
275     ensurePerCellCapacities();
276 
277     rowTokenizer.addSorted(CellUtil.fillRowRange(cell, rowRange));
278     addFamilyPart(cell);
279     addQualifierPart(cell);
280     addTagPart(cell);
281     addAfterRowFamilyQualifier(cell);
282   }
283 
284 
285   private void addTagPart(Cell cell) {
286     CellUtil.fillTagRange(cell, tagsRange);
287     tagsDeduplicator.add(tagsRange);
288   }
289 
290   /***************** internal add methods ************************/
291 
292   private void addAfterRowFamilyQualifier(Cell cell){
293     // timestamps
294     timestamps[totalCells] = cell.getTimestamp();
295     timestampEncoder.add(cell.getTimestamp());
296 
297     // memstore timestamps
298     if (includeMvccVersion) {
299       mvccVersions[totalCells] = cell.getMvccVersion();
300       mvccVersionEncoder.add(cell.getMvccVersion());
301       totalUnencodedBytes += WritableUtils.getVIntSize(cell.getMvccVersion());
302     }else{
303       //must overwrite in case there was a previous version in this array slot
304       mvccVersions[totalCells] = 0L;
305       if(totalCells == 0){//only need to do this for the first cell added
306         mvccVersionEncoder.add(0L);
307       }
308       //totalUncompressedBytes += 0;//mvccVersion takes zero bytes when disabled
309     }
310 
311     // types
312     typeBytes[totalCells] = cell.getTypeByte();
313     cellTypeEncoder.add(cell.getTypeByte());
314 
315     // values
316     totalValueBytes += cell.getValueLength();
317     // double the array each time we run out of space
318     values = ArrayUtils.growIfNecessary(values, totalValueBytes, 2 * totalValueBytes);
319     CellUtil.copyValueTo(cell, values, valueOffsets[totalCells]);
320     if (cell.getValueLength() > maxValueLength) {
321       maxValueLength = cell.getValueLength();
322     }
323     valueOffsets[totalCells + 1] = totalValueBytes;
324 
325     // general
326     totalUnencodedBytes += KeyValueUtil.length(cell);
327     ++totalCells;
328   }
329 
330   private void addFamilyPart(Cell cell) {
331     if (MULITPLE_FAMILIES_POSSIBLE || totalCells == 0) {
332       CellUtil.fillFamilyRange(cell, familyRange);
333       familyDeduplicator.add(familyRange);
334     }
335   }
336 
337   private void addQualifierPart(Cell cell) {
338     CellUtil.fillQualifierRange(cell, qualifierRange);
339     qualifierDeduplicator.add(qualifierRange);
340   }
341 
342 
343   /****************** compiling/flushing ********************/
344 
345   /**
346    * Expensive method.  The second half of the encoding work happens here.
347    *
348    * Take all the separate accumulated data structures and turn them into a single stream of bytes
349    * which is written to the outputStream.
350    */
351   @Override
352   public void flush() throws IOException {
353     compile();
354 
355     // do the actual flushing to the output stream.  Order matters.
356     blockMeta.writeVariableBytesToOutputStream(outputStream);
357     rowWriter.writeBytes(outputStream);
358     familyWriter.writeBytes(outputStream);
359     qualifierWriter.writeBytes(outputStream);
360     tagsWriter.writeBytes(outputStream);
361     timestampEncoder.writeBytes(outputStream);
362     mvccVersionEncoder.writeBytes(outputStream);
363     //CellType bytes are in the row nodes.  there is no additional type section
364     outputStream.write(values, 0, totalValueBytes);
365   }
366 
367   /**
368    * Now that all the cells have been added, do the work to reduce them to a series of byte[]
369    * fragments that are ready to be written to the output stream.
370    */
371   protected void compile(){
372     blockMeta.setNumKeyValueBytes(totalUnencodedBytes);
373     int lastValueOffset = valueOffsets[totalCells];
374     blockMeta.setValueOffsetWidth(UFIntTool.numBytes(lastValueOffset));
375     blockMeta.setValueLengthWidth(UFIntTool.numBytes(maxValueLength));
376     blockMeta.setNumValueBytes(totalValueBytes);
377     totalBytes += totalTagBytes + totalValueBytes;
378 
379     //these compile methods will add to totalBytes
380     compileTypes();
381     compileMvccVersions();
382     compileTimestamps();
383     compileTags();
384     compileQualifiers();
385     compileFamilies();
386     compileRows();
387 
388     int numMetaBytes = blockMeta.calculateNumMetaBytes();
389     blockMeta.setNumMetaBytes(numMetaBytes);
390     totalBytes += numMetaBytes;
391   }
392 
393   /**
394    * The following "compile" methods do any intermediate work necessary to transform the cell
395    * fragments collected during the writing phase into structures that are ready to write to the
396    * outputStream.
397    * <p/>
398    * The family and qualifier treatment is almost identical, as is timestamp and mvccVersion.
399    */
400 
401   protected void compileTypes() {
402     blockMeta.setAllSameType(cellTypeEncoder.areAllSameType());
403     if(cellTypeEncoder.areAllSameType()){
404       blockMeta.setAllTypes(cellTypeEncoder.getOnlyType());
405     }
406   }
407 
408   protected void compileMvccVersions() {
409     mvccVersionEncoder.compile();
410     blockMeta.setMvccVersionFields(mvccVersionEncoder);
411     int numMvccVersionBytes = mvccVersionEncoder.getOutputArrayLength();
412     totalBytes += numMvccVersionBytes;
413   }
414 
415   protected void compileTimestamps() {
416     timestampEncoder.compile();
417     blockMeta.setTimestampFields(timestampEncoder);
418     int numTimestampBytes = timestampEncoder.getOutputArrayLength();
419     totalBytes += numTimestampBytes;
420   }
421 
422   protected void compileQualifiers() {
423     blockMeta.setNumUniqueQualifiers(qualifierDeduplicator.size());
424     qualifierDeduplicator.compile();
425     qualifierTokenizer.addAll(qualifierDeduplicator.getSortedRanges());
426     qualifierWriter.reconstruct(blockMeta, qualifierTokenizer, ColumnNodeType.QUALIFIER);
427     qualifierWriter.compile();
428     int numQualifierBytes = qualifierWriter.getNumBytes();
429     blockMeta.setNumQualifierBytes(numQualifierBytes);
430     totalBytes += numQualifierBytes;
431   }
432 
433   protected void compileFamilies() {
434     blockMeta.setNumUniqueFamilies(familyDeduplicator.size());
435     familyDeduplicator.compile();
436     familyTokenizer.addAll(familyDeduplicator.getSortedRanges());
437     familyWriter.reconstruct(blockMeta, familyTokenizer, ColumnNodeType.FAMILY);
438     familyWriter.compile();
439     int numFamilyBytes = familyWriter.getNumBytes();
440     blockMeta.setNumFamilyBytes(numFamilyBytes);
441     totalBytes += numFamilyBytes;
442   }
443 
444   protected void compileTags() {
445     blockMeta.setNumUniqueTags(tagsDeduplicator.size());
446     tagsDeduplicator.compile();
447     tagsTokenizer.addAll(tagsDeduplicator.getSortedRanges());
448     tagsWriter.reconstruct(blockMeta, tagsTokenizer, ColumnNodeType.TAGS);
449     tagsWriter.compile();
450     int numTagBytes = tagsWriter.getNumBytes();
451     blockMeta.setNumTagsBytes(numTagBytes);
452     totalBytes += numTagBytes;
453   }
454 
455   protected void compileRows() {
456     rowWriter.reconstruct(this);
457     rowWriter.compile();
458     int numRowBytes = rowWriter.getNumBytes();
459     blockMeta.setNumRowBytes(numRowBytes);
460     blockMeta.setRowTreeDepth(rowTokenizer.getTreeDepth());
461     totalBytes += numRowBytes;
462   }
463 
464   /********************* convenience getters ********************************/
465 
466   public long getValueOffset(int index) {
467     return valueOffsets[index];
468   }
469 
470   public int getValueLength(int index) {
471     return (int) (valueOffsets[index + 1] - valueOffsets[index]);
472   }
473 
474   /************************* get/set *************************************/
475 
476   public PrefixTreeBlockMeta getBlockMeta() {
477     return blockMeta;
478   }
479 
480   public Tokenizer getRowTokenizer() {
481     return rowTokenizer;
482   }
483 
484   public LongEncoder getTimestampEncoder() {
485     return timestampEncoder;
486   }
487 
488   public int getTotalBytes() {
489     return totalBytes;
490   }
491 
492   public long[] getTimestamps() {
493     return timestamps;
494   }
495 
496   public long[] getMvccVersions() {
497     return mvccVersions;
498   }
499 
500   public byte[] getTypeBytes() {
501     return typeBytes;
502   }
503 
504   public LongEncoder getMvccVersionEncoder() {
505     return mvccVersionEncoder;
506   }
507 
508   public ByteRangeSet getFamilySorter() {
509     return familyDeduplicator;
510   }
511 
512   public ByteRangeSet getQualifierSorter() {
513     return qualifierDeduplicator;
514   }
515 
516   public ByteRangeSet getTagSorter() {
517     return tagsDeduplicator;
518   }
519 
520   public ColumnSectionWriter getFamilyWriter() {
521     return familyWriter;
522   }
523 
524   public ColumnSectionWriter getQualifierWriter() {
525     return qualifierWriter;
526   }
527 
528   public ColumnSectionWriter getTagWriter() {
529     return tagsWriter;
530   }
531 
532   public RowSectionWriter getRowWriter() {
533     return rowWriter;
534   }
535 
536   public ByteRange getValueByteRange() {
537     return new SimpleMutableByteRange(values, 0, totalValueBytes);
538   }
539 
540 }