001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.ByteArrayOutputStream; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.ExtendedCell; 027import org.apache.hadoop.hbase.HBaseInterfaceAudience; 028import org.apache.hadoop.hbase.KeyValue; 029import org.apache.hadoop.hbase.KeyValueUtil; 030import org.apache.hadoop.hbase.PrivateCellUtil; 031import org.apache.hadoop.hbase.codec.BaseDecoder; 032import org.apache.hadoop.hbase.codec.BaseEncoder; 033import org.apache.hadoop.hbase.codec.Codec; 034import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; 035import org.apache.hadoop.hbase.io.ByteBuffInputStream; 036import org.apache.hadoop.hbase.io.ByteBufferWriter; 037import org.apache.hadoop.hbase.io.ByteBufferWriterOutputStream; 038import org.apache.hadoop.hbase.io.util.Dictionary; 039import org.apache.hadoop.hbase.io.util.StreamUtils; 040import org.apache.hadoop.hbase.nio.ByteBuff; 041import org.apache.hadoop.hbase.util.ByteBufferUtils; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.ReflectionUtils; 044import org.apache.hadoop.io.IOUtils; 045import org.apache.yetus.audience.InterfaceAudience; 046 047import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 048import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 049 050/** 051 * Compression in this class is lifted off Compressor/KeyValueCompression. This is a pure 052 * coincidence... they are independent and don't have to be compatible. This codec is used at server 053 * side for writing cells to WAL as well as for sending edits as part of the distributed splitting 054 * process. 055 */ 056@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, 057 HBaseInterfaceAudience.CONFIG }) 058public class WALCellCodec implements Codec { 059 /** Configuration key for the class to use when encoding cells in the WAL */ 060 public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec"; 061 062 protected final CompressionContext compression; 063 064 /** 065 * <b>All subclasses must implement a no argument constructor</b> 066 */ 067 public WALCellCodec() { 068 this.compression = null; 069 } 070 071 /** 072 * Default constructor - <b>all subclasses must implement a constructor with this signature </b> 073 * if they are to be dynamically loaded from the {@link Configuration}. 074 * @param conf configuration to configure <tt>this</tt> 075 * @param compression compression the codec should support, can be <tt>null</tt> to indicate no 076 * compression 077 */ 078 public WALCellCodec(Configuration conf, CompressionContext compression) { 079 this.compression = compression; 080 } 081 082 public static Class<?> getWALCellCodecClass(Configuration conf) { 083 return conf.getClass(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class); 084 } 085 086 /** 087 * Create and setup a {@link WALCellCodec} from the {@code cellCodecClsName} and 088 * CompressionContext, if {@code cellCodecClsName} is specified. Otherwise Cell Codec classname is 089 * read from {@link Configuration}. Fully prepares the codec for use. 090 * @param conf {@link Configuration} to read for the user-specified codec. If none is 091 * specified, uses a {@link WALCellCodec}. 092 * @param cellCodecClsName name of codec 093 * @param compression compression the codec should use 094 * @return a {@link WALCellCodec} ready for use. 095 * @throws UnsupportedOperationException if the codec cannot be instantiated 096 */ 097 098 public static WALCellCodec create(Configuration conf, String cellCodecClsName, 099 CompressionContext compression) throws UnsupportedOperationException { 100 if (cellCodecClsName == null) { 101 cellCodecClsName = getWALCellCodecClass(conf).getName(); 102 } 103 return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, 104 new Class[] { Configuration.class, CompressionContext.class }, 105 new Object[] { conf, compression }); 106 } 107 108 /** 109 * Create and setup a {@link WALCellCodec} from the CompressionContext. Cell Codec classname is 110 * read from {@link Configuration}. Fully prepares the codec for use. 111 * @param conf {@link Configuration} to read for the user-specified codec. If none is 112 * specified, uses a {@link WALCellCodec}. 113 * @param compression compression the codec should use 114 * @return a {@link WALCellCodec} ready for use. 115 * @throws UnsupportedOperationException if the codec cannot be instantiated 116 */ 117 public static WALCellCodec create(Configuration conf, CompressionContext compression) 118 throws UnsupportedOperationException { 119 String cellCodecClsName = getWALCellCodecClass(conf).getName(); 120 return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, 121 new Class[] { Configuration.class, CompressionContext.class }, 122 new Object[] { conf, compression }); 123 } 124 125 public interface ByteStringCompressor { 126 ByteString compress(byte[] data, Enum dictIndex) throws IOException; 127 } 128 129 public interface ByteStringUncompressor { 130 byte[] uncompress(ByteString data, Enum dictIndex) throws IOException; 131 } 132 133 static class StatelessUncompressor implements ByteStringUncompressor { 134 CompressionContext compressionContext; 135 136 public StatelessUncompressor(CompressionContext compressionContext) { 137 this.compressionContext = compressionContext; 138 } 139 140 @Override 141 public byte[] uncompress(ByteString data, Enum dictIndex) throws IOException { 142 return WALCellCodec.uncompressByteString(data, compressionContext.getDictionary(dictIndex)); 143 } 144 } 145 146 static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor { 147 private CompressionContext compressionContext; 148 149 public BaosAndCompressor(CompressionContext compressionContext) { 150 this.compressionContext = compressionContext; 151 } 152 153 public ByteString toByteString() { 154 // We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse 155 // them. 156 return ByteString.copyFrom(this.buf, 0, this.count); 157 } 158 159 @Override 160 public ByteString compress(byte[] data, Enum dictIndex) throws IOException { 161 writeCompressed(data, dictIndex); 162 // We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse 163 // them. 164 ByteString result = ByteString.copyFrom(this.buf, 0, this.count); 165 reset(); // Only resets the count - we reuse the byte array. 166 return result; 167 } 168 169 private void writeCompressed(byte[] data, Enum dictIndex) throws IOException { 170 Dictionary dict = compressionContext.getDictionary(dictIndex); 171 assert dict != null; 172 short dictIdx = dict.findEntry(data, 0, data.length); 173 if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { 174 write(Dictionary.NOT_IN_DICTIONARY); 175 StreamUtils.writeRawVInt32(this, data.length); 176 write(data, 0, data.length); 177 } else { 178 StreamUtils.writeShort(this, dictIdx); 179 } 180 } 181 } 182 183 static class NoneCompressor implements ByteStringCompressor { 184 185 @Override 186 public ByteString compress(byte[] data, Enum dictIndex) { 187 return UnsafeByteOperations.unsafeWrap(data); 188 } 189 } 190 191 static class NoneUncompressor implements ByteStringUncompressor { 192 193 @Override 194 public byte[] uncompress(ByteString data, Enum dictIndex) { 195 return data.toByteArray(); 196 } 197 } 198 199 private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException { 200 InputStream in = bs.newInput(); 201 byte status = StreamUtils.readByte(in); 202 if (status == Dictionary.NOT_IN_DICTIONARY) { 203 byte[] arr = new byte[StreamUtils.readRawVarint32(in)]; 204 int bytesRead = in.read(arr); 205 if (bytesRead != arr.length) { 206 throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead); 207 } 208 if (dict != null) { 209 dict.addEntry(arr, 0, arr.length); 210 } 211 return arr; 212 } else { 213 // Status here is the higher-order byte of index of the dictionary entry. 214 short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(in)); 215 byte[] entry = dict.getEntry(dictIdx); 216 if (entry == null) { 217 throw new IOException("Missing dictionary entry for index " + dictIdx); 218 } 219 return entry; 220 } 221 } 222 223 static class CompressedKvEncoder extends BaseEncoder { 224 private final CompressionContext compression; 225 private final boolean hasValueCompression; 226 private final boolean hasTagCompression; 227 228 public CompressedKvEncoder(OutputStream out, CompressionContext compression) { 229 super(out); 230 this.compression = compression; 231 this.hasValueCompression = compression.hasValueCompression(); 232 this.hasTagCompression = compression.hasTagCompression(); 233 } 234 235 @Override 236 public void write(ExtendedCell cell) throws IOException { 237 // We first write the KeyValue infrastructure as VInts. 238 StreamUtils.writeRawVInt32(out, KeyValueUtil.keyLength(cell)); 239 StreamUtils.writeRawVInt32(out, cell.getValueLength()); 240 // To support tags 241 int tagsLength = cell.getTagsLength(); 242 StreamUtils.writeRawVInt32(out, tagsLength); 243 PrivateCellUtil.compressRow(out, cell, 244 compression.getDictionary(CompressionContext.DictionaryIndex.ROW)); 245 PrivateCellUtil.compressFamily(out, cell, 246 compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY)); 247 PrivateCellUtil.compressQualifier(out, cell, 248 compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER)); 249 // Write timestamp, type and value. 250 StreamUtils.writeLong(out, cell.getTimestamp()); 251 out.write(cell.getTypeByte()); 252 if (hasValueCompression) { 253 writeCompressedValue(out, cell); 254 } else { 255 PrivateCellUtil.writeValue(out, cell, cell.getValueLength()); 256 } 257 if (tagsLength > 0) { 258 if (hasTagCompression) { 259 // Write tags using Dictionary compression 260 PrivateCellUtil.compressTags(out, cell, compression.tagCompressionContext); 261 } else { 262 // Tag compression is disabled within the WAL compression. Just write the tags bytes as 263 // it is. 264 PrivateCellUtil.writeTags(out, cell, tagsLength); 265 } 266 } 267 } 268 269 private void writeCompressedValue(OutputStream out, Cell cell) throws IOException { 270 byte[] compressed = compression.getValueCompressor().compress(cell.getValueArray(), 271 cell.getValueOffset(), cell.getValueLength()); 272 StreamUtils.writeRawVInt32(out, compressed.length); 273 out.write(compressed); 274 } 275 276 } 277 278 static class CompressedKvDecoder extends BaseDecoder { 279 private final CompressionContext compression; 280 private final boolean hasValueCompression; 281 private final boolean hasTagCompression; 282 283 public CompressedKvDecoder(InputStream in, CompressionContext compression) { 284 super(in); 285 this.compression = compression; 286 this.hasValueCompression = compression.hasValueCompression(); 287 this.hasTagCompression = compression.hasTagCompression(); 288 } 289 290 @Override 291 protected ExtendedCell parseCell() throws IOException { 292 int keylength = StreamUtils.readRawVarint32(in); 293 int vlength = StreamUtils.readRawVarint32(in); 294 int tagsLength = StreamUtils.readRawVarint32(in); 295 int length = 0; 296 if (tagsLength == 0) { 297 length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength; 298 } else { 299 length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength; 300 } 301 302 byte[] backingArray = new byte[length]; 303 int pos = 0; 304 pos = Bytes.putInt(backingArray, pos, keylength); 305 pos = Bytes.putInt(backingArray, pos, vlength); 306 307 // the row 308 int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT, 309 compression.getDictionary(CompressionContext.DictionaryIndex.ROW)); 310 checkLength(elemLen, Short.MAX_VALUE); 311 pos = Bytes.putShort(backingArray, pos, (short) elemLen); 312 pos += elemLen; 313 314 // family 315 elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE, 316 compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY)); 317 checkLength(elemLen, Byte.MAX_VALUE); 318 pos = Bytes.putByte(backingArray, pos, (byte) elemLen); 319 pos += elemLen; 320 321 // qualifier 322 elemLen = readIntoArray(backingArray, pos, 323 compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER)); 324 pos += elemLen; 325 326 // timestamp 327 long ts = StreamUtils.readLong(in); 328 pos = Bytes.putLong(backingArray, pos, ts); 329 // type and value 330 int typeValLen = length - pos; 331 if (tagsLength > 0) { 332 typeValLen = typeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE; 333 } 334 pos = Bytes.putByte(backingArray, pos, (byte) in.read()); 335 int valLen = typeValLen - 1; 336 if (hasValueCompression) { 337 readCompressedValue(in, backingArray, pos, valLen); 338 pos += valLen; 339 } else { 340 IOUtils.readFully(in, backingArray, pos, valLen); 341 pos += valLen; 342 } 343 // tags 344 if (tagsLength > 0) { 345 pos = Bytes.putAsShort(backingArray, pos, tagsLength); 346 if (hasTagCompression) { 347 compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength); 348 } else { 349 IOUtils.readFully(in, backingArray, pos, tagsLength); 350 } 351 } 352 return new KeyValue(backingArray, 0, length); 353 } 354 355 private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException { 356 byte status = StreamUtils.readByte(in); 357 if (status == Dictionary.NOT_IN_DICTIONARY) { 358 // status byte indicating that data to be read is not in dictionary. 359 // if this isn't in the dictionary, we need to add to the dictionary. 360 int length = StreamUtils.readRawVarint32(in); 361 IOUtils.readFully(in, to, offset, length); 362 dict.addEntry(to, offset, length); 363 return length; 364 } else { 365 // the status byte also acts as the higher order byte of the dictionary entry. 366 short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(in)); 367 byte[] entry = dict.getEntry(dictIdx); 368 if (entry == null) { 369 throw new IOException("Missing dictionary entry for index " + dictIdx); 370 } 371 // now we write the uncompressed value. 372 Bytes.putBytes(to, offset, entry, 0, entry.length); 373 return entry.length; 374 } 375 } 376 377 private static void checkLength(int len, int max) throws IOException { 378 if (len < 0 || len > max) { 379 throw new IOException("Invalid length for compresesed portion of keyvalue: " + len); 380 } 381 } 382 383 private void readCompressedValue(InputStream in, byte[] outArray, int outOffset, 384 int expectedLength) throws IOException { 385 int compressedLen = StreamUtils.readRawVarint32(in); 386 compression.getValueCompressor().decompress(in, compressedLen, outArray, outOffset, 387 expectedLength); 388 } 389 } 390 391 public static class EnsureKvEncoder extends BaseEncoder { 392 public EnsureKvEncoder(OutputStream out) { 393 super(out); 394 } 395 396 @Override 397 public void write(ExtendedCell cell) throws IOException { 398 checkFlushed(); 399 // Make sure to write tags into WAL 400 ByteBufferUtils.putInt(this.out, cell.getSerializedSize(true)); 401 cell.write(out, true); 402 } 403 } 404 405 @Override 406 public Decoder getDecoder(InputStream is) { 407 return (compression == null) 408 ? new KeyValueCodecWithTags.KeyValueDecoder(is) 409 : new CompressedKvDecoder(is, compression); 410 } 411 412 @Override 413 public Decoder getDecoder(ByteBuff buf) { 414 return getDecoder(new ByteBuffInputStream(buf)); 415 } 416 417 @Override 418 public Encoder getEncoder(OutputStream os) { 419 os = (os instanceof ByteBufferWriter) ? os : new ByteBufferWriterOutputStream(os); 420 if (compression == null) { 421 return new EnsureKvEncoder(os); 422 } 423 return new CompressedKvEncoder(os, compression); 424 } 425 426 public ByteStringCompressor getByteStringCompressor() { 427 return new BaosAndCompressor(compression); 428 } 429 430 public ByteStringUncompressor getByteStringUncompressor() { 431 return new StatelessUncompressor(compression); 432 } 433 434 public static ByteStringCompressor getNoneCompressor() { 435 return new NoneCompressor(); 436 } 437 438 public static ByteStringUncompressor getNoneUncompressor() { 439 return new NoneUncompressor(); 440 } 441}