View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.wal;
19  
20  import java.io.ByteArrayOutputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.hbase.Cell;
27  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
28  import org.apache.hadoop.hbase.KeyValue;
29  import org.apache.hadoop.hbase.KeyValueUtil;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.codec.BaseDecoder;
32  import org.apache.hadoop.hbase.codec.BaseEncoder;
33  import org.apache.hadoop.hbase.codec.Codec;
34  import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
35  import org.apache.hadoop.hbase.io.util.Dictionary;
36  import org.apache.hadoop.hbase.io.util.StreamUtils;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.hbase.util.ReflectionUtils;
39  import org.apache.hadoop.io.IOUtils;
40  
41  import com.google.protobuf.ByteString;
42  
43  
44  /**
45   * Compression in this class is lifted off Compressor/KeyValueCompression.
46   * This is a pure coincidence... they are independent and don't have to be compatible.
47   *
48   * This codec is used at server side for writing cells to WAL as well as for sending edits
49   * as part of the distributed splitting process.
50   */
51  @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG})
52  public class WALCellCodec implements Codec {
53    /** Configuration key for the class to use when encoding cells in the WAL */
54    public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec";
55  
56    protected final CompressionContext compression;
57    protected final ByteStringUncompressor statelessUncompressor = new ByteStringUncompressor() {
58      @Override
59      public byte[] uncompress(ByteString data, Dictionary dict) throws IOException {
60        return WALCellCodec.uncompressByteString(data, dict);
61      }
62    };
63  
64    /**
65     * <b>All subclasses must implement a no argument constructor</b>
66     */
67    public WALCellCodec() {
68      this.compression = null;
69    }
70  
71    /**
72     * Default constructor - <b>all subclasses must implement a constructor with this signature </b>
73     * if they are to be dynamically loaded from the {@link Configuration}.
74     * @param conf configuration to configure <tt>this</tt>
75     * @param compression compression the codec should support, can be <tt>null</tt> to indicate no
76     *          compression
77     */
78    public WALCellCodec(Configuration conf, CompressionContext compression) {
79      this.compression = compression;
80    }
81  
82    public static Class<?> getWALCellCodecClass(Configuration conf) {
83      return conf.getClass(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class);
84    }
85  
86    /**
87     * Create and setup a {@link WALCellCodec} from the {@code cellCodecClsName} and
88     * CompressionContext, if {@code cellCodecClsName} is specified.
89     * Otherwise Cell Codec classname is read from {@link Configuration}.
90     * Fully prepares the codec for use.
91     * @param conf {@link Configuration} to read for the user-specified codec. If none is specified,
92     *          uses a {@link WALCellCodec}.
93     * @param compression compression the codec should use
94     * @return a {@link WALCellCodec} ready for use.
95     * @throws UnsupportedOperationException if the codec cannot be instantiated
96     */
97  
98    public static WALCellCodec create(Configuration conf, String cellCodecClsName,
99        CompressionContext compression) throws UnsupportedOperationException {
100     if (cellCodecClsName == null) {
101       cellCodecClsName = getWALCellCodecClass(conf).getName();
102     }
103     return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[]
104         { Configuration.class, CompressionContext.class }, new Object[] { conf, compression });
105   }
106 
107   /**
108    * Create and setup a {@link WALCellCodec} from the
109    * CompressionContext.
110    * Cell Codec classname is read from {@link Configuration}.
111    * Fully prepares the codec for use.
112    * @param conf {@link Configuration} to read for the user-specified codec. If none is specified,
113    *          uses a {@link WALCellCodec}.
114    * @param compression compression the codec should use
115    * @return a {@link WALCellCodec} ready for use.
116    * @throws UnsupportedOperationException if the codec cannot be instantiated
117    */
118   public static WALCellCodec create(Configuration conf,
119       CompressionContext compression) throws UnsupportedOperationException {
120     String cellCodecClsName = getWALCellCodecClass(conf).getName();
121     return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[]
122         { Configuration.class, CompressionContext.class }, new Object[] { conf, compression });
123   }
124 
125   public interface ByteStringCompressor {
126     ByteString compress(byte[] data, Dictionary dict) throws IOException;
127   }
128 
129   public interface ByteStringUncompressor {
130     byte[] uncompress(ByteString data, Dictionary dict) throws IOException;
131   }
132 
133   // TODO: it sucks that compression context is in WAL.Entry. It'd be nice if it was here.
134   //       Dictionary could be gotten by enum; initially, based on enum, context would create
135   //       an array of dictionaries.
136   static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor {
137     public ByteString toByteString() {
138       return ByteString.copyFrom(this.buf, 0, this.count);
139     }
140 
141     @Override
142     public ByteString compress(byte[] data, Dictionary dict) throws IOException {
143       writeCompressed(data, dict);
144       ByteString result = ByteString.copyFrom(this.buf, 0, this.count);
145       reset(); // Only resets the count - we reuse the byte array.
146       return result;
147     }
148 
149     private void writeCompressed(byte[] data, Dictionary dict) throws IOException {
150       assert dict != null;
151       short dictIdx = dict.findEntry(data, 0, data.length);
152       if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
153         write(Dictionary.NOT_IN_DICTIONARY);
154         StreamUtils.writeRawVInt32(this, data.length);
155         write(data, 0, data.length);
156       } else {
157         StreamUtils.writeShort(this, dictIdx);
158       }
159     }
160   }
161 
162   private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException {
163     InputStream in = bs.newInput();
164     byte status = (byte)in.read();
165     if (status == Dictionary.NOT_IN_DICTIONARY) {
166       byte[] arr = new byte[StreamUtils.readRawVarint32(in)];
167       int bytesRead = in.read(arr);
168       if (bytesRead != arr.length) {
169         throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead);
170       }
171       if (dict != null) dict.addEntry(arr, 0, arr.length);
172       return arr;
173     } else {
174       // Status here is the higher-order byte of index of the dictionary entry.
175       short dictIdx = StreamUtils.toShort(status, (byte)in.read());
176       byte[] entry = dict.getEntry(dictIdx);
177       if (entry == null) {
178         throw new IOException("Missing dictionary entry for index " + dictIdx);
179       }
180       return entry;
181     }
182   }
183 
184   static class CompressedKvEncoder extends BaseEncoder {
185     private final CompressionContext compression;
186     public CompressedKvEncoder(OutputStream out, CompressionContext compression) {
187       super(out);
188       this.compression = compression;
189     }
190 
191     @Override
192     public void write(Cell cell) throws IOException {
193       // We first write the KeyValue infrastructure as VInts.
194       StreamUtils.writeRawVInt32(out, KeyValueUtil.keyLength(cell));
195       StreamUtils.writeRawVInt32(out, cell.getValueLength());
196       // To support tags
197       int tagsLength = cell.getTagsLength();
198       StreamUtils.writeRawVInt32(out, tagsLength);
199 
200       // Write row, qualifier, and family; use dictionary
201       // compression as they're likely to have duplicates.
202       write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), compression.rowDict);
203       write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
204           compression.familyDict);
205       write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
206           compression.qualifierDict);
207 
208       // Write timestamp, type and value as uncompressed.
209       StreamUtils.writeLong(out, cell.getTimestamp());
210       out.write(cell.getTypeByte());
211       out.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
212       if (tagsLength > 0) {
213         if (compression.tagCompressionContext != null) {
214           // Write tags using Dictionary compression
215           compression.tagCompressionContext.compressTags(out, cell.getTagsArray(),
216               cell.getTagsOffset(), tagsLength);
217         } else {
218           // Tag compression is disabled within the WAL compression. Just write the tags bytes as
219           // it is.
220           out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
221         }
222       }
223     }
224 
225     private void write(byte[] data, int offset, int length, Dictionary dict) throws IOException {
226       short dictIdx = Dictionary.NOT_IN_DICTIONARY;
227       if (dict != null) {
228         dictIdx = dict.findEntry(data, offset, length);
229       }
230       if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
231         out.write(Dictionary.NOT_IN_DICTIONARY);
232         StreamUtils.writeRawVInt32(out, length);
233         out.write(data, offset, length);
234       } else {
235         StreamUtils.writeShort(out, dictIdx);
236       }
237     }
238   }
239 
240   static class CompressedKvDecoder extends BaseDecoder {
241     private final CompressionContext compression;
242     public CompressedKvDecoder(InputStream in, CompressionContext compression) {
243       super(in);
244       this.compression = compression;
245     }
246 
247     @Override
248     protected Cell parseCell() throws IOException {
249       int keylength = StreamUtils.readRawVarint32(in);
250       int vlength = StreamUtils.readRawVarint32(in);
251 
252       int tagsLength = StreamUtils.readRawVarint32(in);
253       int length = 0;
254       if(tagsLength == 0) {
255         length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
256       } else {
257         length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength;
258       }
259 
260       byte[] backingArray = new byte[length];
261       int pos = 0;
262       pos = Bytes.putInt(backingArray, pos, keylength);
263       pos = Bytes.putInt(backingArray, pos, vlength);
264 
265       // the row
266       int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT, compression.rowDict);
267       checkLength(elemLen, Short.MAX_VALUE);
268       pos = Bytes.putShort(backingArray, pos, (short)elemLen);
269       pos += elemLen;
270 
271       // family
272       elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE, compression.familyDict);
273       checkLength(elemLen, Byte.MAX_VALUE);
274       pos = Bytes.putByte(backingArray, pos, (byte)elemLen);
275       pos += elemLen;
276 
277       // qualifier
278       elemLen = readIntoArray(backingArray, pos, compression.qualifierDict);
279       pos += elemLen;
280 
281       // timestamp, type and value
282       int tsTypeValLen = length - pos;
283       if (tagsLength > 0) {
284         tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
285       }
286       IOUtils.readFully(in, backingArray, pos, tsTypeValLen);
287       pos += tsTypeValLen;
288 
289       // tags
290       if (tagsLength > 0) {
291         pos = Bytes.putAsShort(backingArray, pos, tagsLength);
292         if (compression.tagCompressionContext != null) {
293           compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength);
294         } else {
295           IOUtils.readFully(in, backingArray, pos, tagsLength);
296         }
297       }
298       return new KeyValue(backingArray, 0, length);
299     }
300 
301     private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException {
302       byte status = (byte)in.read();
303       if (status == Dictionary.NOT_IN_DICTIONARY) {
304         // status byte indicating that data to be read is not in dictionary.
305         // if this isn't in the dictionary, we need to add to the dictionary.
306         int length = StreamUtils.readRawVarint32(in);
307         IOUtils.readFully(in, to, offset, length);
308         dict.addEntry(to, offset, length);
309         return length;
310       } else {
311         // the status byte also acts as the higher order byte of the dictionary entry.
312         short dictIdx = StreamUtils.toShort(status, (byte)in.read());
313         byte[] entry = dict.getEntry(dictIdx);
314         if (entry == null) {
315           throw new IOException("Missing dictionary entry for index " + dictIdx);
316         }
317         // now we write the uncompressed value.
318         Bytes.putBytes(to, offset, entry, 0, entry.length);
319         return entry.length;
320       }
321     }
322 
323     private static void checkLength(int len, int max) throws IOException {
324       if (len < 0 || len > max) {
325         throw new IOException("Invalid length for compresesed portion of keyvalue: " + len);
326       }
327     }
328   }
329 
330   public static class EnsureKvEncoder extends BaseEncoder {
331     public EnsureKvEncoder(OutputStream out) {
332       super(out);
333     }
334     @Override
335     public void write(Cell cell) throws IOException {
336       checkFlushed();
337       // Make sure to write tags into WAL
338       KeyValueUtil.oswrite(cell, this.out, true);
339     }
340   }
341 
342   @Override
343   public Decoder getDecoder(InputStream is) {
344     return (compression == null)
345         ? new KeyValueCodecWithTags.KeyValueDecoder(is) : new CompressedKvDecoder(is, compression);
346   }
347 
348   @Override
349   public Encoder getEncoder(OutputStream os) {
350     return (compression == null)
351         ? new EnsureKvEncoder(os) : new CompressedKvEncoder(os, compression);
352   }
353 
354   public ByteStringCompressor getByteStringCompressor() {
355     // TODO: ideally this should also encapsulate compressionContext
356     return new BaosAndCompressor();
357   }
358 
359   public ByteStringUncompressor getByteStringUncompressor() {
360     // TODO: ideally this should also encapsulate compressionContext
361     return this.statelessUncompressor;
362   }
363 }