001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import java.io.ByteArrayOutputStream;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.HBaseInterfaceAudience;
027import org.apache.hadoop.hbase.KeyValue;
028import org.apache.hadoop.hbase.KeyValueUtil;
029import org.apache.hadoop.hbase.PrivateCellUtil;
030import org.apache.hadoop.hbase.codec.BaseDecoder;
031import org.apache.hadoop.hbase.codec.BaseEncoder;
032import org.apache.hadoop.hbase.codec.Codec;
033import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
034import org.apache.hadoop.hbase.io.ByteBuffInputStream;
035import org.apache.hadoop.hbase.io.ByteBufferWriter;
036import org.apache.hadoop.hbase.io.ByteBufferWriterOutputStream;
037import org.apache.hadoop.hbase.io.util.Dictionary;
038import org.apache.hadoop.hbase.io.util.StreamUtils;
039import org.apache.hadoop.hbase.nio.ByteBuff;
040import org.apache.hadoop.hbase.util.ByteBufferUtils;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.hbase.util.ReflectionUtils;
043import org.apache.hadoop.io.IOUtils;
044import org.apache.yetus.audience.InterfaceAudience;
045
046import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
047import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
048
049/**
050 * Compression in this class is lifted off Compressor/KeyValueCompression. This is a pure
051 * coincidence... they are independent and don't have to be compatible. This codec is used at server
052 * side for writing cells to WAL as well as for sending edits as part of the distributed splitting
053 * process.
054 */
055@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX,
056  HBaseInterfaceAudience.CONFIG })
057public class WALCellCodec implements Codec {
058  /** Configuration key for the class to use when encoding cells in the WAL */
059  public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec";
060
061  protected final CompressionContext compression;
062
063  /**
064   * <b>All subclasses must implement a no argument constructor</b>
065   */
066  public WALCellCodec() {
067    this.compression = null;
068  }
069
070  /**
071   * Default constructor - <b>all subclasses must implement a constructor with this signature </b>
072   * if they are to be dynamically loaded from the {@link Configuration}.
073   * @param conf        configuration to configure <tt>this</tt>
074   * @param compression compression the codec should support, can be <tt>null</tt> to indicate no
075   *                    compression
076   */
077  public WALCellCodec(Configuration conf, CompressionContext compression) {
078    this.compression = compression;
079  }
080
081  public static Class<?> getWALCellCodecClass(Configuration conf) {
082    return conf.getClass(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class);
083  }
084
085  /**
086   * Create and setup a {@link WALCellCodec} from the {@code cellCodecClsName} and
087   * CompressionContext, if {@code cellCodecClsName} is specified. Otherwise Cell Codec classname is
088   * read from {@link Configuration}. Fully prepares the codec for use.
089   * @param conf             {@link Configuration} to read for the user-specified codec. If none is
090   *                         specified, uses a {@link WALCellCodec}.
091   * @param cellCodecClsName name of codec
092   * @param compression      compression the codec should use
093   * @return a {@link WALCellCodec} ready for use.
094   * @throws UnsupportedOperationException if the codec cannot be instantiated
095   */
096
097  public static WALCellCodec create(Configuration conf, String cellCodecClsName,
098    CompressionContext compression) throws UnsupportedOperationException {
099    if (cellCodecClsName == null) {
100      cellCodecClsName = getWALCellCodecClass(conf).getName();
101    }
102    return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName,
103      new Class[] { Configuration.class, CompressionContext.class },
104      new Object[] { conf, compression });
105  }
106
107  /**
108   * Create and setup a {@link WALCellCodec} from the CompressionContext. Cell Codec classname is
109   * read from {@link Configuration}. Fully prepares the codec for use.
110   * @param conf        {@link Configuration} to read for the user-specified codec. If none is
111   *                    specified, uses a {@link WALCellCodec}.
112   * @param compression compression the codec should use
113   * @return a {@link WALCellCodec} ready for use.
114   * @throws UnsupportedOperationException if the codec cannot be instantiated
115   */
116  public static WALCellCodec create(Configuration conf, CompressionContext compression)
117    throws UnsupportedOperationException {
118    String cellCodecClsName = getWALCellCodecClass(conf).getName();
119    return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName,
120      new Class[] { Configuration.class, CompressionContext.class },
121      new Object[] { conf, compression });
122  }
123
124  public interface ByteStringCompressor {
125    ByteString compress(byte[] data, Enum dictIndex) throws IOException;
126  }
127
128  public interface ByteStringUncompressor {
129    byte[] uncompress(ByteString data, Enum dictIndex) throws IOException;
130  }
131
132  static class StatelessUncompressor implements ByteStringUncompressor {
133    CompressionContext compressionContext;
134
135    public StatelessUncompressor(CompressionContext compressionContext) {
136      this.compressionContext = compressionContext;
137    }
138
139    @Override
140    public byte[] uncompress(ByteString data, Enum dictIndex) throws IOException {
141      return WALCellCodec.uncompressByteString(data, compressionContext.getDictionary(dictIndex));
142    }
143  }
144
145  static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor {
146    private CompressionContext compressionContext;
147
148    public BaosAndCompressor(CompressionContext compressionContext) {
149      this.compressionContext = compressionContext;
150    }
151
152    public ByteString toByteString() {
153      // We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse
154      // them.
155      return ByteString.copyFrom(this.buf, 0, this.count);
156    }
157
158    @Override
159    public ByteString compress(byte[] data, Enum dictIndex) throws IOException {
160      writeCompressed(data, dictIndex);
161      // We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse
162      // them.
163      ByteString result = ByteString.copyFrom(this.buf, 0, this.count);
164      reset(); // Only resets the count - we reuse the byte array.
165      return result;
166    }
167
168    private void writeCompressed(byte[] data, Enum dictIndex) throws IOException {
169      Dictionary dict = compressionContext.getDictionary(dictIndex);
170      assert dict != null;
171      short dictIdx = dict.findEntry(data, 0, data.length);
172      if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
173        write(Dictionary.NOT_IN_DICTIONARY);
174        StreamUtils.writeRawVInt32(this, data.length);
175        write(data, 0, data.length);
176      } else {
177        StreamUtils.writeShort(this, dictIdx);
178      }
179    }
180  }
181
182  static class NoneCompressor implements ByteStringCompressor {
183
184    @Override
185    public ByteString compress(byte[] data, Enum dictIndex) {
186      return UnsafeByteOperations.unsafeWrap(data);
187    }
188  }
189
190  static class NoneUncompressor implements ByteStringUncompressor {
191
192    @Override
193    public byte[] uncompress(ByteString data, Enum dictIndex) {
194      return data.toByteArray();
195    }
196  }
197
198  private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException {
199    InputStream in = bs.newInput();
200    byte status = (byte) in.read();
201    if (status == Dictionary.NOT_IN_DICTIONARY) {
202      byte[] arr = new byte[StreamUtils.readRawVarint32(in)];
203      int bytesRead = in.read(arr);
204      if (bytesRead != arr.length) {
205        throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead);
206      }
207      if (dict != null) dict.addEntry(arr, 0, arr.length);
208      return arr;
209    } else {
210      // Status here is the higher-order byte of index of the dictionary entry.
211      short dictIdx = StreamUtils.toShort(status, (byte) in.read());
212      byte[] entry = dict.getEntry(dictIdx);
213      if (entry == null) {
214        throw new IOException("Missing dictionary entry for index " + dictIdx);
215      }
216      return entry;
217    }
218  }
219
220  static class CompressedKvEncoder extends BaseEncoder {
221    private final CompressionContext compression;
222    private final boolean hasValueCompression;
223    private final boolean hasTagCompression;
224
225    public CompressedKvEncoder(OutputStream out, CompressionContext compression) {
226      super(out);
227      this.compression = compression;
228      this.hasValueCompression = compression.hasValueCompression();
229      this.hasTagCompression = compression.hasTagCompression();
230    }
231
232    @Override
233    public void write(Cell cell) throws IOException {
234      // We first write the KeyValue infrastructure as VInts.
235      StreamUtils.writeRawVInt32(out, KeyValueUtil.keyLength(cell));
236      StreamUtils.writeRawVInt32(out, cell.getValueLength());
237      // To support tags
238      int tagsLength = cell.getTagsLength();
239      StreamUtils.writeRawVInt32(out, tagsLength);
240      PrivateCellUtil.compressRow(out, cell,
241        compression.getDictionary(CompressionContext.DictionaryIndex.ROW));
242      PrivateCellUtil.compressFamily(out, cell,
243        compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY));
244      PrivateCellUtil.compressQualifier(out, cell,
245        compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER));
246      // Write timestamp, type and value.
247      StreamUtils.writeLong(out, cell.getTimestamp());
248      out.write(cell.getTypeByte());
249      if (hasValueCompression) {
250        writeCompressedValue(out, cell);
251      } else {
252        PrivateCellUtil.writeValue(out, cell, cell.getValueLength());
253      }
254      if (tagsLength > 0) {
255        if (hasTagCompression) {
256          // Write tags using Dictionary compression
257          PrivateCellUtil.compressTags(out, cell, compression.tagCompressionContext);
258        } else {
259          // Tag compression is disabled within the WAL compression. Just write the tags bytes as
260          // it is.
261          PrivateCellUtil.writeTags(out, cell, tagsLength);
262        }
263      }
264    }
265
266    private void writeCompressedValue(OutputStream out, Cell cell) throws IOException {
267      byte[] compressed = compression.getValueCompressor().compress(cell.getValueArray(),
268        cell.getValueOffset(), cell.getValueLength());
269      StreamUtils.writeRawVInt32(out, compressed.length);
270      out.write(compressed);
271    }
272
273  }
274
275  static class CompressedKvDecoder extends BaseDecoder {
276    private final CompressionContext compression;
277    private final boolean hasValueCompression;
278    private final boolean hasTagCompression;
279
280    public CompressedKvDecoder(InputStream in, CompressionContext compression) {
281      super(in);
282      this.compression = compression;
283      this.hasValueCompression = compression.hasValueCompression();
284      this.hasTagCompression = compression.hasTagCompression();
285    }
286
287    @Override
288    protected Cell parseCell() throws IOException {
289      int keylength = StreamUtils.readRawVarint32(in);
290      int vlength = StreamUtils.readRawVarint32(in);
291      int tagsLength = StreamUtils.readRawVarint32(in);
292      int length = 0;
293      if (tagsLength == 0) {
294        length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
295      } else {
296        length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength;
297      }
298
299      byte[] backingArray = new byte[length];
300      int pos = 0;
301      pos = Bytes.putInt(backingArray, pos, keylength);
302      pos = Bytes.putInt(backingArray, pos, vlength);
303
304      // the row
305      int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT,
306        compression.getDictionary(CompressionContext.DictionaryIndex.ROW));
307      checkLength(elemLen, Short.MAX_VALUE);
308      pos = Bytes.putShort(backingArray, pos, (short) elemLen);
309      pos += elemLen;
310
311      // family
312      elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE,
313        compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY));
314      checkLength(elemLen, Byte.MAX_VALUE);
315      pos = Bytes.putByte(backingArray, pos, (byte) elemLen);
316      pos += elemLen;
317
318      // qualifier
319      elemLen = readIntoArray(backingArray, pos,
320        compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER));
321      pos += elemLen;
322
323      // timestamp
324      long ts = StreamUtils.readLong(in);
325      pos = Bytes.putLong(backingArray, pos, ts);
326      // type and value
327      int typeValLen = length - pos;
328      if (tagsLength > 0) {
329        typeValLen = typeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
330      }
331      pos = Bytes.putByte(backingArray, pos, (byte) in.read());
332      int valLen = typeValLen - 1;
333      if (hasValueCompression) {
334        readCompressedValue(in, backingArray, pos, valLen);
335        pos += valLen;
336      } else {
337        IOUtils.readFully(in, backingArray, pos, valLen);
338        pos += valLen;
339      }
340      // tags
341      if (tagsLength > 0) {
342        pos = Bytes.putAsShort(backingArray, pos, tagsLength);
343        if (hasTagCompression) {
344          compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength);
345        } else {
346          IOUtils.readFully(in, backingArray, pos, tagsLength);
347        }
348      }
349      return new KeyValue(backingArray, 0, length);
350    }
351
352    private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException {
353      byte status = (byte) in.read();
354      if (status == Dictionary.NOT_IN_DICTIONARY) {
355        // status byte indicating that data to be read is not in dictionary.
356        // if this isn't in the dictionary, we need to add to the dictionary.
357        int length = StreamUtils.readRawVarint32(in);
358        IOUtils.readFully(in, to, offset, length);
359        dict.addEntry(to, offset, length);
360        return length;
361      } else {
362        // the status byte also acts as the higher order byte of the dictionary entry.
363        short dictIdx = StreamUtils.toShort(status, (byte) in.read());
364        byte[] entry = dict.getEntry(dictIdx);
365        if (entry == null) {
366          throw new IOException("Missing dictionary entry for index " + dictIdx);
367        }
368        // now we write the uncompressed value.
369        Bytes.putBytes(to, offset, entry, 0, entry.length);
370        return entry.length;
371      }
372    }
373
374    private static void checkLength(int len, int max) throws IOException {
375      if (len < 0 || len > max) {
376        throw new IOException("Invalid length for compresesed portion of keyvalue: " + len);
377      }
378    }
379
380    private void readCompressedValue(InputStream in, byte[] outArray, int outOffset,
381      int expectedLength) throws IOException {
382      int compressedLen = StreamUtils.readRawVarint32(in);
383      int read = compression.getValueCompressor().decompress(in, compressedLen, outArray, outOffset,
384        expectedLength);
385      if (read != expectedLength) {
386        throw new IOException("ValueCompressor state error: short read");
387      }
388    }
389
390  }
391
392  public static class EnsureKvEncoder extends BaseEncoder {
393    public EnsureKvEncoder(OutputStream out) {
394      super(out);
395    }
396
397    @Override
398    public void write(Cell cell) throws IOException {
399      checkFlushed();
400      // Make sure to write tags into WAL
401      ByteBufferUtils.putInt(this.out, KeyValueUtil.getSerializedSize(cell, true));
402      KeyValueUtil.oswrite(cell, this.out, true);
403    }
404  }
405
406  @Override
407  public Decoder getDecoder(InputStream is) {
408    return (compression == null)
409      ? new KeyValueCodecWithTags.KeyValueDecoder(is)
410      : new CompressedKvDecoder(is, compression);
411  }
412
413  @Override
414  public Decoder getDecoder(ByteBuff buf) {
415    return getDecoder(new ByteBuffInputStream(buf));
416  }
417
418  @Override
419  public Encoder getEncoder(OutputStream os) {
420    os = (os instanceof ByteBufferWriter) ? os : new ByteBufferWriterOutputStream(os);
421    if (compression == null) {
422      return new EnsureKvEncoder(os);
423    }
424    return new CompressedKvEncoder(os, compression);
425  }
426
427  public ByteStringCompressor getByteStringCompressor() {
428    return new BaosAndCompressor(compression);
429  }
430
431  public ByteStringUncompressor getByteStringUncompressor() {
432    return new StatelessUncompressor(compression);
433  }
434
435  public static ByteStringCompressor getNoneCompressor() {
436    return new NoneCompressor();
437  }
438
439  public static ByteStringUncompressor getNoneUncompressor() {
440    return new NoneUncompressor();
441  }
442}