001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import java.io.ByteArrayOutputStream;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.HBaseInterfaceAudience;
027import org.apache.hadoop.hbase.KeyValue;
028import org.apache.hadoop.hbase.KeyValueUtil;
029import org.apache.hadoop.hbase.PrivateCellUtil;
030import org.apache.hadoop.hbase.codec.BaseDecoder;
031import org.apache.hadoop.hbase.codec.BaseEncoder;
032import org.apache.hadoop.hbase.codec.Codec;
033import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
034import org.apache.hadoop.hbase.io.ByteBuffInputStream;
035import org.apache.hadoop.hbase.io.ByteBufferWriter;
036import org.apache.hadoop.hbase.io.ByteBufferWriterOutputStream;
037import org.apache.hadoop.hbase.io.util.Dictionary;
038import org.apache.hadoop.hbase.io.util.StreamUtils;
039import org.apache.hadoop.hbase.nio.ByteBuff;
040import org.apache.hadoop.hbase.util.ByteBufferUtils;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.ReflectionUtils;
043import org.apache.hadoop.io.IOUtils;
044import org.apache.yetus.audience.InterfaceAudience;
045
046import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
047import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
048
049/**
050 * Compression in this class is lifted off Compressor/KeyValueCompression. This is a pure
051 * coincidence... they are independent and don't have to be compatible. This codec is used at server
052 * side for writing cells to WAL as well as for sending edits as part of the distributed splitting
053 * process.
054 */
055@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX,
056  HBaseInterfaceAudience.CONFIG })
057public class WALCellCodec implements Codec {
058  /** Configuration key for the class to use when encoding cells in the WAL */
059  public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec";
060
061  protected final CompressionContext compression;
062
063  /**
064   * <b>All subclasses must implement a no argument constructor</b>
065   */
066  public WALCellCodec() {
067    this.compression = null;
068  }
069
070  /**
071   * Default constructor - <b>all subclasses must implement a constructor with this signature </b>
072   * if they are to be dynamically loaded from the {@link Configuration}.
073   * @param conf        configuration to configure <tt>this</tt>
074   * @param compression compression the codec should support, can be <tt>null</tt> to indicate no
075   *                    compression
076   */
077  public WALCellCodec(Configuration conf, CompressionContext compression) {
078    this.compression = compression;
079  }
080
081  public static Class<?> getWALCellCodecClass(Configuration conf) {
082    return conf.getClass(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class);
083  }
084
085  /**
086   * Create and setup a {@link WALCellCodec} from the {@code cellCodecClsName} and
087   * CompressionContext, if {@code cellCodecClsName} is specified. Otherwise Cell Codec classname is
088   * read from {@link Configuration}. Fully prepares the codec for use.
089   * @param conf             {@link Configuration} to read for the user-specified codec. If none is
090   *                         specified, uses a {@link WALCellCodec}.
091   * @param cellCodecClsName name of codec
092   * @param compression      compression the codec should use
093   * @return a {@link WALCellCodec} ready for use.
094   * @throws UnsupportedOperationException if the codec cannot be instantiated
095   */
096
097  public static WALCellCodec create(Configuration conf, String cellCodecClsName,
098    CompressionContext compression) throws UnsupportedOperationException {
099    if (cellCodecClsName == null) {
100      cellCodecClsName = getWALCellCodecClass(conf).getName();
101    }
102    return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName,
103      new Class[] { Configuration.class, CompressionContext.class },
104      new Object[] { conf, compression });
105  }
106
107  /**
108   * Create and setup a {@link WALCellCodec} from the CompressionContext. Cell Codec classname is
109   * read from {@link Configuration}. Fully prepares the codec for use.
110   * @param conf        {@link Configuration} to read for the user-specified codec. If none is
111   *                    specified, uses a {@link WALCellCodec}.
112   * @param compression compression the codec should use
113   * @return a {@link WALCellCodec} ready for use.
114   * @throws UnsupportedOperationException if the codec cannot be instantiated
115   */
116  public static WALCellCodec create(Configuration conf, CompressionContext compression)
117    throws UnsupportedOperationException {
118    String cellCodecClsName = getWALCellCodecClass(conf).getName();
119    return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName,
120      new Class[] { Configuration.class, CompressionContext.class },
121      new Object[] { conf, compression });
122  }
123
124  public interface ByteStringCompressor {
125    ByteString compress(byte[] data, Enum dictIndex) throws IOException;
126  }
127
128  public interface ByteStringUncompressor {
129    byte[] uncompress(ByteString data, Enum dictIndex) throws IOException;
130  }
131
132  static class StatelessUncompressor implements ByteStringUncompressor {
133    CompressionContext compressionContext;
134
135    public StatelessUncompressor(CompressionContext compressionContext) {
136      this.compressionContext = compressionContext;
137    }
138
139    @Override
140    public byte[] uncompress(ByteString data, Enum dictIndex) throws IOException {
141      return WALCellCodec.uncompressByteString(data, compressionContext.getDictionary(dictIndex));
142    }
143  }
144
145  static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor {
146    private CompressionContext compressionContext;
147
148    public BaosAndCompressor(CompressionContext compressionContext) {
149      this.compressionContext = compressionContext;
150    }
151
152    public ByteString toByteString() {
153      // We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse
154      // them.
155      return ByteString.copyFrom(this.buf, 0, this.count);
156    }
157
158    @Override
159    public ByteString compress(byte[] data, Enum dictIndex) throws IOException {
160      writeCompressed(data, dictIndex);
161      // We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse
162      // them.
163      ByteString result = ByteString.copyFrom(this.buf, 0, this.count);
164      reset(); // Only resets the count - we reuse the byte array.
165      return result;
166    }
167
168    private void writeCompressed(byte[] data, Enum dictIndex) throws IOException {
169      Dictionary dict = compressionContext.getDictionary(dictIndex);
170      assert dict != null;
171      short dictIdx = dict.findEntry(data, 0, data.length);
172      if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
173        write(Dictionary.NOT_IN_DICTIONARY);
174        StreamUtils.writeRawVInt32(this, data.length);
175        write(data, 0, data.length);
176      } else {
177        StreamUtils.writeShort(this, dictIdx);
178      }
179    }
180  }
181
182  static class NoneCompressor implements ByteStringCompressor {
183
184    @Override
185    public ByteString compress(byte[] data, Enum dictIndex) {
186      return UnsafeByteOperations.unsafeWrap(data);
187    }
188  }
189
190  static class NoneUncompressor implements ByteStringUncompressor {
191
192    @Override
193    public byte[] uncompress(ByteString data, Enum dictIndex) {
194      return data.toByteArray();
195    }
196  }
197
198  private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException {
199    InputStream in = bs.newInput();
200    byte status = StreamUtils.readByte(in);
201    if (status == Dictionary.NOT_IN_DICTIONARY) {
202      byte[] arr = new byte[StreamUtils.readRawVarint32(in)];
203      int bytesRead = in.read(arr);
204      if (bytesRead != arr.length) {
205        throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead);
206      }
207      if (dict != null) {
208        dict.addEntry(arr, 0, arr.length);
209      }
210      return arr;
211    } else {
212      // Status here is the higher-order byte of index of the dictionary entry.
213      short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(in));
214      byte[] entry = dict.getEntry(dictIdx);
215      if (entry == null) {
216        throw new IOException("Missing dictionary entry for index " + dictIdx);
217      }
218      return entry;
219    }
220  }
221
222  static class CompressedKvEncoder extends BaseEncoder {
223    private final CompressionContext compression;
224    private final boolean hasValueCompression;
225    private final boolean hasTagCompression;
226
227    public CompressedKvEncoder(OutputStream out, CompressionContext compression) {
228      super(out);
229      this.compression = compression;
230      this.hasValueCompression = compression.hasValueCompression();
231      this.hasTagCompression = compression.hasTagCompression();
232    }
233
234    @Override
235    public void write(Cell cell) throws IOException {
236      // We first write the KeyValue infrastructure as VInts.
237      StreamUtils.writeRawVInt32(out, KeyValueUtil.keyLength(cell));
238      StreamUtils.writeRawVInt32(out, cell.getValueLength());
239      // To support tags
240      int tagsLength = cell.getTagsLength();
241      StreamUtils.writeRawVInt32(out, tagsLength);
242      PrivateCellUtil.compressRow(out, cell,
243        compression.getDictionary(CompressionContext.DictionaryIndex.ROW));
244      PrivateCellUtil.compressFamily(out, cell,
245        compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY));
246      PrivateCellUtil.compressQualifier(out, cell,
247        compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER));
248      // Write timestamp, type and value.
249      StreamUtils.writeLong(out, cell.getTimestamp());
250      out.write(cell.getTypeByte());
251      if (hasValueCompression) {
252        writeCompressedValue(out, cell);
253      } else {
254        PrivateCellUtil.writeValue(out, cell, cell.getValueLength());
255      }
256      if (tagsLength > 0) {
257        if (hasTagCompression) {
258          // Write tags using Dictionary compression
259          PrivateCellUtil.compressTags(out, cell, compression.tagCompressionContext);
260        } else {
261          // Tag compression is disabled within the WAL compression. Just write the tags bytes as
262          // it is.
263          PrivateCellUtil.writeTags(out, cell, tagsLength);
264        }
265      }
266    }
267
268    private void writeCompressedValue(OutputStream out, Cell cell) throws IOException {
269      byte[] compressed = compression.getValueCompressor().compress(cell.getValueArray(),
270        cell.getValueOffset(), cell.getValueLength());
271      StreamUtils.writeRawVInt32(out, compressed.length);
272      out.write(compressed);
273    }
274
275  }
276
277  static class CompressedKvDecoder extends BaseDecoder {
278    private final CompressionContext compression;
279    private final boolean hasValueCompression;
280    private final boolean hasTagCompression;
281
282    public CompressedKvDecoder(InputStream in, CompressionContext compression) {
283      super(in);
284      this.compression = compression;
285      this.hasValueCompression = compression.hasValueCompression();
286      this.hasTagCompression = compression.hasTagCompression();
287    }
288
289    @Override
290    protected Cell parseCell() throws IOException {
291      int keylength = StreamUtils.readRawVarint32(in);
292      int vlength = StreamUtils.readRawVarint32(in);
293      int tagsLength = StreamUtils.readRawVarint32(in);
294      int length = 0;
295      if (tagsLength == 0) {
296        length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
297      } else {
298        length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength;
299      }
300
301      byte[] backingArray = new byte[length];
302      int pos = 0;
303      pos = Bytes.putInt(backingArray, pos, keylength);
304      pos = Bytes.putInt(backingArray, pos, vlength);
305
306      // the row
307      int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT,
308        compression.getDictionary(CompressionContext.DictionaryIndex.ROW));
309      checkLength(elemLen, Short.MAX_VALUE);
310      pos = Bytes.putShort(backingArray, pos, (short) elemLen);
311      pos += elemLen;
312
313      // family
314      elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE,
315        compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY));
316      checkLength(elemLen, Byte.MAX_VALUE);
317      pos = Bytes.putByte(backingArray, pos, (byte) elemLen);
318      pos += elemLen;
319
320      // qualifier
321      elemLen = readIntoArray(backingArray, pos,
322        compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER));
323      pos += elemLen;
324
325      // timestamp
326      long ts = StreamUtils.readLong(in);
327      pos = Bytes.putLong(backingArray, pos, ts);
328      // type and value
329      int typeValLen = length - pos;
330      if (tagsLength > 0) {
331        typeValLen = typeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
332      }
333      pos = Bytes.putByte(backingArray, pos, (byte) in.read());
334      int valLen = typeValLen - 1;
335      if (hasValueCompression) {
336        readCompressedValue(in, backingArray, pos, valLen);
337        pos += valLen;
338      } else {
339        IOUtils.readFully(in, backingArray, pos, valLen);
340        pos += valLen;
341      }
342      // tags
343      if (tagsLength > 0) {
344        pos = Bytes.putAsShort(backingArray, pos, tagsLength);
345        if (hasTagCompression) {
346          compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength);
347        } else {
348          IOUtils.readFully(in, backingArray, pos, tagsLength);
349        }
350      }
351      return new KeyValue(backingArray, 0, length);
352    }
353
354    private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException {
355      byte status = StreamUtils.readByte(in);
356      if (status == Dictionary.NOT_IN_DICTIONARY) {
357        // status byte indicating that data to be read is not in dictionary.
358        // if this isn't in the dictionary, we need to add to the dictionary.
359        int length = StreamUtils.readRawVarint32(in);
360        IOUtils.readFully(in, to, offset, length);
361        dict.addEntry(to, offset, length);
362        return length;
363      } else {
364        // the status byte also acts as the higher order byte of the dictionary entry.
365        short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(in));
366        byte[] entry = dict.getEntry(dictIdx);
367        if (entry == null) {
368          throw new IOException("Missing dictionary entry for index " + dictIdx);
369        }
370        // now we write the uncompressed value.
371        Bytes.putBytes(to, offset, entry, 0, entry.length);
372        return entry.length;
373      }
374    }
375
376    private static void checkLength(int len, int max) throws IOException {
377      if (len < 0 || len > max) {
378        throw new IOException("Invalid length for compresesed portion of keyvalue: " + len);
379      }
380    }
381
382    private void readCompressedValue(InputStream in, byte[] outArray, int outOffset,
383      int expectedLength) throws IOException {
384      int compressedLen = StreamUtils.readRawVarint32(in);
385      compression.getValueCompressor().decompress(in, compressedLen, outArray, outOffset,
386        expectedLength);
387    }
388  }
389
390  public static class EnsureKvEncoder extends BaseEncoder {
391    public EnsureKvEncoder(OutputStream out) {
392      super(out);
393    }
394
395    @Override
396    public void write(Cell cell) throws IOException {
397      checkFlushed();
398      // Make sure to write tags into WAL
399      ByteBufferUtils.putInt(this.out, KeyValueUtil.getSerializedSize(cell, true));
400      KeyValueUtil.oswrite(cell, this.out, true);
401    }
402  }
403
404  @Override
405  public Decoder getDecoder(InputStream is) {
406    return (compression == null)
407      ? new KeyValueCodecWithTags.KeyValueDecoder(is)
408      : new CompressedKvDecoder(is, compression);
409  }
410
411  @Override
412  public Decoder getDecoder(ByteBuff buf) {
413    return getDecoder(new ByteBuffInputStream(buf));
414  }
415
416  @Override
417  public Encoder getEncoder(OutputStream os) {
418    os = (os instanceof ByteBufferWriter) ? os : new ByteBufferWriterOutputStream(os);
419    if (compression == null) {
420      return new EnsureKvEncoder(os);
421    }
422    return new CompressedKvEncoder(os, compression);
423  }
424
425  public ByteStringCompressor getByteStringCompressor() {
426    return new BaosAndCompressor(compression);
427  }
428
429  public ByteStringUncompressor getByteStringUncompressor() {
430    return new StatelessUncompressor(compression);
431  }
432
433  public static ByteStringCompressor getNoneCompressor() {
434    return new NoneCompressor();
435  }
436
437  public static ByteStringUncompressor getNoneUncompressor() {
438    return new NoneUncompressor();
439  }
440}