001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import java.io.ByteArrayOutputStream;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.HBaseInterfaceAudience;
028import org.apache.hadoop.hbase.PrivateCellUtil;
029import org.apache.hadoop.hbase.KeyValue;
030import org.apache.hadoop.hbase.KeyValueUtil;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.apache.hadoop.hbase.codec.BaseDecoder;
033import org.apache.hadoop.hbase.codec.BaseEncoder;
034import org.apache.hadoop.hbase.codec.Codec;
035import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
036import org.apache.hadoop.hbase.io.ByteBuffInputStream;
037import org.apache.hadoop.hbase.io.ByteBufferWriter;
038import org.apache.hadoop.hbase.io.ByteBufferWriterOutputStream;
039import org.apache.hadoop.hbase.io.util.Dictionary;
040import org.apache.hadoop.hbase.io.util.StreamUtils;
041import org.apache.hadoop.hbase.nio.ByteBuff;
042import org.apache.hadoop.hbase.util.ByteBufferUtils;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.ReflectionUtils;
045import org.apache.hadoop.io.IOUtils;
046
047import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
048
049
050/**
051 * Compression in this class is lifted off Compressor/KeyValueCompression.
052 * This is a pure coincidence... they are independent and don't have to be compatible.
053 *
054 * This codec is used at server side for writing cells to WAL as well as for sending edits
055 * as part of the distributed splitting process.
056 */
057@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC,
058  HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG})
059public class WALCellCodec implements Codec {
060  /** Configuration key for the class to use when encoding cells in the WAL */
061  public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec";
062
063  protected final CompressionContext compression;
064  protected final ByteStringUncompressor statelessUncompressor = new ByteStringUncompressor() {
065    @Override
066    public byte[] uncompress(ByteString data, Dictionary dict) throws IOException {
067      return WALCellCodec.uncompressByteString(data, dict);
068    }
069  };
070
071  /**
072   * <b>All subclasses must implement a no argument constructor</b>
073   */
074  public WALCellCodec() {
075    this.compression = null;
076  }
077
078  /**
079   * Default constructor - <b>all subclasses must implement a constructor with this signature </b>
080   * if they are to be dynamically loaded from the {@link Configuration}.
081   * @param conf configuration to configure <tt>this</tt>
082   * @param compression compression the codec should support, can be <tt>null</tt> to indicate no
083   *          compression
084   */
085  public WALCellCodec(Configuration conf, CompressionContext compression) {
086    this.compression = compression;
087  }
088
089  public static Class<?> getWALCellCodecClass(Configuration conf) {
090    return conf.getClass(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class);
091  }
092
093  /**
094   * Create and setup a {@link WALCellCodec} from the {@code cellCodecClsName} and
095   * CompressionContext, if {@code cellCodecClsName} is specified.
096   * Otherwise Cell Codec classname is read from {@link Configuration}.
097   * Fully prepares the codec for use.
098   * @param conf {@link Configuration} to read for the user-specified codec. If none is specified,
099   *          uses a {@link WALCellCodec}.
100   * @param cellCodecClsName name of codec
101   * @param compression compression the codec should use
102   * @return a {@link WALCellCodec} ready for use.
103   * @throws UnsupportedOperationException if the codec cannot be instantiated
104   */
105
106  public static WALCellCodec create(Configuration conf, String cellCodecClsName,
107      CompressionContext compression) throws UnsupportedOperationException {
108    if (cellCodecClsName == null) {
109      cellCodecClsName = getWALCellCodecClass(conf).getName();
110    }
111    return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[]
112        { Configuration.class, CompressionContext.class }, new Object[] { conf, compression });
113  }
114
115  /**
116   * Create and setup a {@link WALCellCodec} from the
117   * CompressionContext.
118   * Cell Codec classname is read from {@link Configuration}.
119   * Fully prepares the codec for use.
120   * @param conf {@link Configuration} to read for the user-specified codec. If none is specified,
121   *          uses a {@link WALCellCodec}.
122   * @param compression compression the codec should use
123   * @return a {@link WALCellCodec} ready for use.
124   * @throws UnsupportedOperationException if the codec cannot be instantiated
125   */
126  public static WALCellCodec create(Configuration conf,
127      CompressionContext compression) throws UnsupportedOperationException {
128    String cellCodecClsName = getWALCellCodecClass(conf).getName();
129    return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[]
130        { Configuration.class, CompressionContext.class }, new Object[] { conf, compression });
131  }
132
133  public interface ByteStringCompressor {
134    ByteString compress(byte[] data, Dictionary dict) throws IOException;
135  }
136
137  public interface ByteStringUncompressor {
138    byte[] uncompress(ByteString data, Dictionary dict) throws IOException;
139  }
140
141  // TODO: it sucks that compression context is in WAL.Entry. It'd be nice if it was here.
142  //       Dictionary could be gotten by enum; initially, based on enum, context would create
143  //       an array of dictionaries.
144  static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor {
145    public ByteString toByteString() {
146      // We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse
147      // them.
148      return ByteString.copyFrom(this.buf, 0, this.count);
149    }
150
151    @Override
152    public ByteString compress(byte[] data, Dictionary dict) throws IOException {
153      writeCompressed(data, dict);
154      // We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse
155      // them.
156      ByteString result = ByteString.copyFrom(this.buf, 0, this.count);
157      reset(); // Only resets the count - we reuse the byte array.
158      return result;
159    }
160
161    private void writeCompressed(byte[] data, Dictionary dict) throws IOException {
162      assert dict != null;
163      short dictIdx = dict.findEntry(data, 0, data.length);
164      if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
165        write(Dictionary.NOT_IN_DICTIONARY);
166        StreamUtils.writeRawVInt32(this, data.length);
167        write(data, 0, data.length);
168      } else {
169        StreamUtils.writeShort(this, dictIdx);
170      }
171    }
172  }
173
174  private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException {
175    InputStream in = bs.newInput();
176    byte status = (byte)in.read();
177    if (status == Dictionary.NOT_IN_DICTIONARY) {
178      byte[] arr = new byte[StreamUtils.readRawVarint32(in)];
179      int bytesRead = in.read(arr);
180      if (bytesRead != arr.length) {
181        throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead);
182      }
183      if (dict != null) dict.addEntry(arr, 0, arr.length);
184      return arr;
185    } else {
186      // Status here is the higher-order byte of index of the dictionary entry.
187      short dictIdx = StreamUtils.toShort(status, (byte)in.read());
188      byte[] entry = dict.getEntry(dictIdx);
189      if (entry == null) {
190        throw new IOException("Missing dictionary entry for index " + dictIdx);
191      }
192      return entry;
193    }
194  }
195
196  static class CompressedKvEncoder extends BaseEncoder {
197    private final CompressionContext compression;
198    public CompressedKvEncoder(OutputStream out, CompressionContext compression) {
199      super(out);
200      this.compression = compression;
201    }
202
203    @Override
204    public void write(Cell cell) throws IOException {
205      // We first write the KeyValue infrastructure as VInts.
206      StreamUtils.writeRawVInt32(out, KeyValueUtil.keyLength(cell));
207      StreamUtils.writeRawVInt32(out, cell.getValueLength());
208      // To support tags
209      int tagsLength = cell.getTagsLength();
210      StreamUtils.writeRawVInt32(out, tagsLength);
211      PrivateCellUtil.compressRow(out, cell, compression.rowDict);
212      PrivateCellUtil.compressFamily(out, cell, compression.familyDict);
213      PrivateCellUtil.compressQualifier(out, cell, compression.qualifierDict);
214      // Write timestamp, type and value as uncompressed.
215      StreamUtils.writeLong(out, cell.getTimestamp());
216      out.write(cell.getTypeByte());
217      PrivateCellUtil.writeValue(out, cell, cell.getValueLength());
218      if (tagsLength > 0) {
219        if (compression.tagCompressionContext != null) {
220          // Write tags using Dictionary compression
221          PrivateCellUtil.compressTags(out, cell, compression.tagCompressionContext);
222        } else {
223          // Tag compression is disabled within the WAL compression. Just write the tags bytes as
224          // it is.
225          PrivateCellUtil.writeTags(out, cell, tagsLength);
226        }
227      }
228    }
229  }
230
231  static class CompressedKvDecoder extends BaseDecoder {
232    private final CompressionContext compression;
233    public CompressedKvDecoder(InputStream in, CompressionContext compression) {
234      super(in);
235      this.compression = compression;
236    }
237
238    @Override
239    protected Cell parseCell() throws IOException {
240      int keylength = StreamUtils.readRawVarint32(in);
241      int vlength = StreamUtils.readRawVarint32(in);
242
243      int tagsLength = StreamUtils.readRawVarint32(in);
244      int length = 0;
245      if(tagsLength == 0) {
246        length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
247      } else {
248        length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength;
249      }
250
251      byte[] backingArray = new byte[length];
252      int pos = 0;
253      pos = Bytes.putInt(backingArray, pos, keylength);
254      pos = Bytes.putInt(backingArray, pos, vlength);
255
256      // the row
257      int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT, compression.rowDict);
258      checkLength(elemLen, Short.MAX_VALUE);
259      pos = Bytes.putShort(backingArray, pos, (short)elemLen);
260      pos += elemLen;
261
262      // family
263      elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE, compression.familyDict);
264      checkLength(elemLen, Byte.MAX_VALUE);
265      pos = Bytes.putByte(backingArray, pos, (byte)elemLen);
266      pos += elemLen;
267
268      // qualifier
269      elemLen = readIntoArray(backingArray, pos, compression.qualifierDict);
270      pos += elemLen;
271
272      // timestamp, type and value
273      int tsTypeValLen = length - pos;
274      if (tagsLength > 0) {
275        tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
276      }
277      IOUtils.readFully(in, backingArray, pos, tsTypeValLen);
278      pos += tsTypeValLen;
279
280      // tags
281      if (tagsLength > 0) {
282        pos = Bytes.putAsShort(backingArray, pos, tagsLength);
283        if (compression.tagCompressionContext != null) {
284          compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength);
285        } else {
286          IOUtils.readFully(in, backingArray, pos, tagsLength);
287        }
288      }
289      return new KeyValue(backingArray, 0, length);
290    }
291
292    private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException {
293      byte status = (byte)in.read();
294      if (status == Dictionary.NOT_IN_DICTIONARY) {
295        // status byte indicating that data to be read is not in dictionary.
296        // if this isn't in the dictionary, we need to add to the dictionary.
297        int length = StreamUtils.readRawVarint32(in);
298        IOUtils.readFully(in, to, offset, length);
299        dict.addEntry(to, offset, length);
300        return length;
301      } else {
302        // the status byte also acts as the higher order byte of the dictionary entry.
303        short dictIdx = StreamUtils.toShort(status, (byte)in.read());
304        byte[] entry = dict.getEntry(dictIdx);
305        if (entry == null) {
306          throw new IOException("Missing dictionary entry for index " + dictIdx);
307        }
308        // now we write the uncompressed value.
309        Bytes.putBytes(to, offset, entry, 0, entry.length);
310        return entry.length;
311      }
312    }
313
314    private static void checkLength(int len, int max) throws IOException {
315      if (len < 0 || len > max) {
316        throw new IOException("Invalid length for compresesed portion of keyvalue: " + len);
317      }
318    }
319  }
320
321  public static class EnsureKvEncoder extends BaseEncoder {
322    public EnsureKvEncoder(OutputStream out) {
323      super(out);
324    }
325    @Override
326    public void write(Cell cell) throws IOException {
327      checkFlushed();
328      // Make sure to write tags into WAL
329      ByteBufferUtils.putInt(this.out, KeyValueUtil.getSerializedSize(cell, true));
330      KeyValueUtil.oswrite(cell, this.out, true);
331    }
332  }
333
334  @Override
335  public Decoder getDecoder(InputStream is) {
336    return (compression == null)
337        ? new KeyValueCodecWithTags.KeyValueDecoder(is) : new CompressedKvDecoder(is, compression);
338  }
339
340  @Override
341  public Decoder getDecoder(ByteBuff buf) {
342    return getDecoder(new ByteBuffInputStream(buf));
343  }
344
345  @Override
346  public Encoder getEncoder(OutputStream os) {
347    os = (os instanceof ByteBufferWriter) ? os
348        : new ByteBufferWriterOutputStream(os);
349    if (compression == null) {
350      return new EnsureKvEncoder(os);
351    }
352    return new CompressedKvEncoder(os, compression);
353  }
354
355  public ByteStringCompressor getByteStringCompressor() {
356    // TODO: ideally this should also encapsulate compressionContext
357    return new BaosAndCompressor();
358  }
359
360  public ByteStringUncompressor getByteStringUncompressor() {
361    // TODO: ideally this should also encapsulate compressionContext
362    return this.statelessUncompressor;
363  }
364}