001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import java.io.ByteArrayOutputStream;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.HBaseInterfaceAudience;
028import org.apache.hadoop.hbase.PrivateCellUtil;
029import org.apache.hadoop.hbase.KeyValue;
030import org.apache.hadoop.hbase.KeyValueUtil;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.apache.hadoop.hbase.codec.BaseDecoder;
033import org.apache.hadoop.hbase.codec.BaseEncoder;
034import org.apache.hadoop.hbase.codec.Codec;
035import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
036import org.apache.hadoop.hbase.io.ByteBuffInputStream;
037import org.apache.hadoop.hbase.io.ByteBufferWriter;
038import org.apache.hadoop.hbase.io.ByteBufferWriterOutputStream;
039import org.apache.hadoop.hbase.io.util.Dictionary;
040import org.apache.hadoop.hbase.io.util.StreamUtils;
041import org.apache.hadoop.hbase.nio.ByteBuff;
042import org.apache.hadoop.hbase.util.ByteBufferUtils;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.ReflectionUtils;
045import org.apache.hadoop.io.IOUtils;
046
047import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
048import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
049
050
051/**
052 * Compression in this class is lifted off Compressor/KeyValueCompression.
053 * This is a pure coincidence... they are independent and don't have to be compatible.
054 *
055 * This codec is used at server side for writing cells to WAL as well as for sending edits
056 * as part of the distributed splitting process.
057 */
058@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC,
059  HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG})
060public class WALCellCodec implements Codec {
061  /** Configuration key for the class to use when encoding cells in the WAL */
062  public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec";
063
064  protected final CompressionContext compression;
065
066  /**
067   * <b>All subclasses must implement a no argument constructor</b>
068   */
069  public WALCellCodec() {
070    this.compression = null;
071  }
072
073  /**
074   * Default constructor - <b>all subclasses must implement a constructor with this signature </b>
075   * if they are to be dynamically loaded from the {@link Configuration}.
076   * @param conf configuration to configure <tt>this</tt>
077   * @param compression compression the codec should support, can be <tt>null</tt> to indicate no
078   *          compression
079   */
080  public WALCellCodec(Configuration conf, CompressionContext compression) {
081    this.compression = compression;
082  }
083
084  public static Class<?> getWALCellCodecClass(Configuration conf) {
085    return conf.getClass(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class);
086  }
087
088  /**
089   * Create and setup a {@link WALCellCodec} from the {@code cellCodecClsName} and
090   * CompressionContext, if {@code cellCodecClsName} is specified.
091   * Otherwise Cell Codec classname is read from {@link Configuration}.
092   * Fully prepares the codec for use.
093   * @param conf {@link Configuration} to read for the user-specified codec. If none is specified,
094   *          uses a {@link WALCellCodec}.
095   * @param cellCodecClsName name of codec
096   * @param compression compression the codec should use
097   * @return a {@link WALCellCodec} ready for use.
098   * @throws UnsupportedOperationException if the codec cannot be instantiated
099   */
100
101  public static WALCellCodec create(Configuration conf, String cellCodecClsName,
102      CompressionContext compression) throws UnsupportedOperationException {
103    if (cellCodecClsName == null) {
104      cellCodecClsName = getWALCellCodecClass(conf).getName();
105    }
106    return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[]
107        { Configuration.class, CompressionContext.class }, new Object[] { conf, compression });
108  }
109
110  /**
111   * Create and setup a {@link WALCellCodec} from the
112   * CompressionContext.
113   * Cell Codec classname is read from {@link Configuration}.
114   * Fully prepares the codec for use.
115   * @param conf {@link Configuration} to read for the user-specified codec. If none is specified,
116   *          uses a {@link WALCellCodec}.
117   * @param compression compression the codec should use
118   * @return a {@link WALCellCodec} ready for use.
119   * @throws UnsupportedOperationException if the codec cannot be instantiated
120   */
121  public static WALCellCodec create(Configuration conf,
122      CompressionContext compression) throws UnsupportedOperationException {
123    String cellCodecClsName = getWALCellCodecClass(conf).getName();
124    return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[]
125        { Configuration.class, CompressionContext.class }, new Object[] { conf, compression });
126  }
127
128  public interface ByteStringCompressor {
129    ByteString compress(byte[] data, Enum dictIndex) throws IOException;
130  }
131
132  public interface ByteStringUncompressor {
133    byte[] uncompress(ByteString data, Enum dictIndex) throws IOException;
134  }
135
136  static class StatelessUncompressor implements ByteStringUncompressor {
137    CompressionContext compressionContext;
138
139    public StatelessUncompressor(CompressionContext compressionContext) {
140      this.compressionContext = compressionContext;
141    }
142
143    @Override
144    public byte[] uncompress(ByteString data, Enum dictIndex) throws IOException {
145      return WALCellCodec.uncompressByteString(data, compressionContext.getDictionary(dictIndex));
146    }
147  }
148
149  static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor {
150    private CompressionContext compressionContext;
151
152    public BaosAndCompressor(CompressionContext compressionContext) {
153      this.compressionContext = compressionContext;
154    }
155    public ByteString toByteString() {
156      // We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse
157      // them.
158      return ByteString.copyFrom(this.buf, 0, this.count);
159    }
160
161    @Override
162    public ByteString compress(byte[] data, Enum dictIndex) throws IOException {
163      writeCompressed(data, dictIndex);
164      // We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse
165      // them.
166      ByteString result = ByteString.copyFrom(this.buf, 0, this.count);
167      reset(); // Only resets the count - we reuse the byte array.
168      return result;
169    }
170
171    private void writeCompressed(byte[] data, Enum dictIndex) throws IOException {
172      Dictionary dict = compressionContext.getDictionary(dictIndex);
173      assert dict != null;
174      short dictIdx = dict.findEntry(data, 0, data.length);
175      if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
176        write(Dictionary.NOT_IN_DICTIONARY);
177        StreamUtils.writeRawVInt32(this, data.length);
178        write(data, 0, data.length);
179      } else {
180        StreamUtils.writeShort(this, dictIdx);
181      }
182    }
183  }
184
185  static class NoneCompressor implements ByteStringCompressor {
186
187    @Override
188    public ByteString compress(byte[] data, Enum dictIndex) {
189      return UnsafeByteOperations.unsafeWrap(data);
190    }
191  }
192
193  static class NoneUncompressor implements ByteStringUncompressor {
194
195    @Override
196    public byte[] uncompress(ByteString data, Enum dictIndex) {
197      return data.toByteArray();
198    }
199  }
200
201  private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException {
202    InputStream in = bs.newInput();
203    byte status = (byte)in.read();
204    if (status == Dictionary.NOT_IN_DICTIONARY) {
205      byte[] arr = new byte[StreamUtils.readRawVarint32(in)];
206      int bytesRead = in.read(arr);
207      if (bytesRead != arr.length) {
208        throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead);
209      }
210      if (dict != null) dict.addEntry(arr, 0, arr.length);
211      return arr;
212    } else {
213      // Status here is the higher-order byte of index of the dictionary entry.
214      short dictIdx = StreamUtils.toShort(status, (byte)in.read());
215      byte[] entry = dict.getEntry(dictIdx);
216      if (entry == null) {
217        throw new IOException("Missing dictionary entry for index " + dictIdx);
218      }
219      return entry;
220    }
221  }
222
223  static class CompressedKvEncoder extends BaseEncoder {
224    private final CompressionContext compression;
225    public CompressedKvEncoder(OutputStream out, CompressionContext compression) {
226      super(out);
227      this.compression = compression;
228    }
229
230    @Override
231    public void write(Cell cell) throws IOException {
232      // We first write the KeyValue infrastructure as VInts.
233      StreamUtils.writeRawVInt32(out, KeyValueUtil.keyLength(cell));
234      StreamUtils.writeRawVInt32(out, cell.getValueLength());
235      // To support tags
236      int tagsLength = cell.getTagsLength();
237      StreamUtils.writeRawVInt32(out, tagsLength);
238      PrivateCellUtil.compressRow(out, cell,
239        compression.getDictionary(CompressionContext.DictionaryIndex.ROW));
240      PrivateCellUtil.compressFamily(out, cell,
241        compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY));
242      PrivateCellUtil.compressQualifier(out, cell,
243        compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER));
244      // Write timestamp, type and value as uncompressed.
245      StreamUtils.writeLong(out, cell.getTimestamp());
246      out.write(cell.getTypeByte());
247      PrivateCellUtil.writeValue(out, cell, cell.getValueLength());
248      if (tagsLength > 0) {
249        if (compression.tagCompressionContext != null) {
250          // Write tags using Dictionary compression
251          PrivateCellUtil.compressTags(out, cell, compression.tagCompressionContext);
252        } else {
253          // Tag compression is disabled within the WAL compression. Just write the tags bytes as
254          // it is.
255          PrivateCellUtil.writeTags(out, cell, tagsLength);
256        }
257      }
258    }
259  }
260
261  static class CompressedKvDecoder extends BaseDecoder {
262    private final CompressionContext compression;
263    public CompressedKvDecoder(InputStream in, CompressionContext compression) {
264      super(in);
265      this.compression = compression;
266    }
267
268    @Override
269    protected Cell parseCell() throws IOException {
270      int keylength = StreamUtils.readRawVarint32(in);
271      int vlength = StreamUtils.readRawVarint32(in);
272
273      int tagsLength = StreamUtils.readRawVarint32(in);
274      int length = 0;
275      if(tagsLength == 0) {
276        length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
277      } else {
278        length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength;
279      }
280
281      byte[] backingArray = new byte[length];
282      int pos = 0;
283      pos = Bytes.putInt(backingArray, pos, keylength);
284      pos = Bytes.putInt(backingArray, pos, vlength);
285
286      // the row
287      int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT,
288        compression.getDictionary(CompressionContext.DictionaryIndex.ROW));
289      checkLength(elemLen, Short.MAX_VALUE);
290      pos = Bytes.putShort(backingArray, pos, (short)elemLen);
291      pos += elemLen;
292
293      // family
294      elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE,
295        compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY));
296      checkLength(elemLen, Byte.MAX_VALUE);
297      pos = Bytes.putByte(backingArray, pos, (byte)elemLen);
298      pos += elemLen;
299
300      // qualifier
301      elemLen = readIntoArray(backingArray, pos,
302        compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER));
303      pos += elemLen;
304
305      // timestamp, type and value
306      int tsTypeValLen = length - pos;
307      if (tagsLength > 0) {
308        tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
309      }
310      IOUtils.readFully(in, backingArray, pos, tsTypeValLen);
311      pos += tsTypeValLen;
312
313      // tags
314      if (tagsLength > 0) {
315        pos = Bytes.putAsShort(backingArray, pos, tagsLength);
316        if (compression.tagCompressionContext != null) {
317          compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength);
318        } else {
319          IOUtils.readFully(in, backingArray, pos, tagsLength);
320        }
321      }
322      return new KeyValue(backingArray, 0, length);
323    }
324
325    private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException {
326      byte status = (byte)in.read();
327      if (status == Dictionary.NOT_IN_DICTIONARY) {
328        // status byte indicating that data to be read is not in dictionary.
329        // if this isn't in the dictionary, we need to add to the dictionary.
330        int length = StreamUtils.readRawVarint32(in);
331        IOUtils.readFully(in, to, offset, length);
332        dict.addEntry(to, offset, length);
333        return length;
334      } else {
335        // the status byte also acts as the higher order byte of the dictionary entry.
336        short dictIdx = StreamUtils.toShort(status, (byte)in.read());
337        byte[] entry = dict.getEntry(dictIdx);
338        if (entry == null) {
339          throw new IOException("Missing dictionary entry for index " + dictIdx);
340        }
341        // now we write the uncompressed value.
342        Bytes.putBytes(to, offset, entry, 0, entry.length);
343        return entry.length;
344      }
345    }
346
347    private static void checkLength(int len, int max) throws IOException {
348      if (len < 0 || len > max) {
349        throw new IOException("Invalid length for compresesed portion of keyvalue: " + len);
350      }
351    }
352  }
353
354  public static class EnsureKvEncoder extends BaseEncoder {
355    public EnsureKvEncoder(OutputStream out) {
356      super(out);
357    }
358    @Override
359    public void write(Cell cell) throws IOException {
360      checkFlushed();
361      // Make sure to write tags into WAL
362      ByteBufferUtils.putInt(this.out, KeyValueUtil.getSerializedSize(cell, true));
363      KeyValueUtil.oswrite(cell, this.out, true);
364    }
365  }
366
367  @Override
368  public Decoder getDecoder(InputStream is) {
369    return (compression == null)
370        ? new KeyValueCodecWithTags.KeyValueDecoder(is) : new CompressedKvDecoder(is, compression);
371  }
372
373  @Override
374  public Decoder getDecoder(ByteBuff buf) {
375    return getDecoder(new ByteBuffInputStream(buf));
376  }
377
378  @Override
379  public Encoder getEncoder(OutputStream os) {
380    os = (os instanceof ByteBufferWriter) ? os
381        : new ByteBufferWriterOutputStream(os);
382    if (compression == null) {
383      return new EnsureKvEncoder(os);
384    }
385    return new CompressedKvEncoder(os, compression);
386  }
387
388  public ByteStringCompressor getByteStringCompressor() {
389    return new BaosAndCompressor(compression);
390  }
391
392  public ByteStringUncompressor getByteStringUncompressor() {
393    return new StatelessUncompressor(compression);
394  }
395
396  public static ByteStringCompressor getNoneCompressor() {
397    return new NoneCompressor();
398  }
399
400  public static ByteStringUncompressor getNoneUncompressor() {
401    return new NoneUncompressor();
402  }
403}