001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.ByteArrayOutputStream; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.HBaseInterfaceAudience; 027import org.apache.hadoop.hbase.KeyValue; 028import org.apache.hadoop.hbase.KeyValueUtil; 029import org.apache.hadoop.hbase.PrivateCellUtil; 030import org.apache.hadoop.hbase.codec.BaseDecoder; 031import org.apache.hadoop.hbase.codec.BaseEncoder; 032import org.apache.hadoop.hbase.codec.Codec; 033import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; 034import org.apache.hadoop.hbase.io.ByteBuffInputStream; 035import org.apache.hadoop.hbase.io.ByteBufferWriter; 036import org.apache.hadoop.hbase.io.ByteBufferWriterOutputStream; 037import org.apache.hadoop.hbase.io.util.Dictionary; 038import org.apache.hadoop.hbase.io.util.StreamUtils; 039import org.apache.hadoop.hbase.nio.ByteBuff; 040import org.apache.hadoop.hbase.util.ByteBufferUtils; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.ReflectionUtils; 043import org.apache.hadoop.io.IOUtils; 044import org.apache.yetus.audience.InterfaceAudience; 045 046import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 047import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 048 049/** 050 * Compression in this class is lifted off Compressor/KeyValueCompression. This is a pure 051 * coincidence... they are independent and don't have to be compatible. This codec is used at server 052 * side for writing cells to WAL as well as for sending edits as part of the distributed splitting 053 * process. 054 */ 055@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, 056 HBaseInterfaceAudience.CONFIG }) 057public class WALCellCodec implements Codec { 058 /** Configuration key for the class to use when encoding cells in the WAL */ 059 public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec"; 060 061 protected final CompressionContext compression; 062 063 /** 064 * <b>All subclasses must implement a no argument constructor</b> 065 */ 066 public WALCellCodec() { 067 this.compression = null; 068 } 069 070 /** 071 * Default constructor - <b>all subclasses must implement a constructor with this signature </b> 072 * if they are to be dynamically loaded from the {@link Configuration}. 073 * @param conf configuration to configure <tt>this</tt> 074 * @param compression compression the codec should support, can be <tt>null</tt> to indicate no 075 * compression 076 */ 077 public WALCellCodec(Configuration conf, CompressionContext compression) { 078 this.compression = compression; 079 } 080 081 public static Class<?> getWALCellCodecClass(Configuration conf) { 082 return conf.getClass(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class); 083 } 084 085 /** 086 * Create and setup a {@link WALCellCodec} from the {@code cellCodecClsName} and 087 * CompressionContext, if {@code cellCodecClsName} is specified. Otherwise Cell Codec classname is 088 * read from {@link Configuration}. Fully prepares the codec for use. 089 * @param conf {@link Configuration} to read for the user-specified codec. If none is 090 * specified, uses a {@link WALCellCodec}. 091 * @param cellCodecClsName name of codec 092 * @param compression compression the codec should use 093 * @return a {@link WALCellCodec} ready for use. 094 * @throws UnsupportedOperationException if the codec cannot be instantiated 095 */ 096 097 public static WALCellCodec create(Configuration conf, String cellCodecClsName, 098 CompressionContext compression) throws UnsupportedOperationException { 099 if (cellCodecClsName == null) { 100 cellCodecClsName = getWALCellCodecClass(conf).getName(); 101 } 102 return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, 103 new Class[] { Configuration.class, CompressionContext.class }, 104 new Object[] { conf, compression }); 105 } 106 107 /** 108 * Create and setup a {@link WALCellCodec} from the CompressionContext. Cell Codec classname is 109 * read from {@link Configuration}. Fully prepares the codec for use. 110 * @param conf {@link Configuration} to read for the user-specified codec. If none is 111 * specified, uses a {@link WALCellCodec}. 112 * @param compression compression the codec should use 113 * @return a {@link WALCellCodec} ready for use. 114 * @throws UnsupportedOperationException if the codec cannot be instantiated 115 */ 116 public static WALCellCodec create(Configuration conf, CompressionContext compression) 117 throws UnsupportedOperationException { 118 String cellCodecClsName = getWALCellCodecClass(conf).getName(); 119 return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, 120 new Class[] { Configuration.class, CompressionContext.class }, 121 new Object[] { conf, compression }); 122 } 123 124 public interface ByteStringCompressor { 125 ByteString compress(byte[] data, Enum dictIndex) throws IOException; 126 } 127 128 public interface ByteStringUncompressor { 129 byte[] uncompress(ByteString data, Enum dictIndex) throws IOException; 130 } 131 132 static class StatelessUncompressor implements ByteStringUncompressor { 133 CompressionContext compressionContext; 134 135 public StatelessUncompressor(CompressionContext compressionContext) { 136 this.compressionContext = compressionContext; 137 } 138 139 @Override 140 public byte[] uncompress(ByteString data, Enum dictIndex) throws IOException { 141 return WALCellCodec.uncompressByteString(data, compressionContext.getDictionary(dictIndex)); 142 } 143 } 144 145 static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor { 146 private CompressionContext compressionContext; 147 148 public BaosAndCompressor(CompressionContext compressionContext) { 149 this.compressionContext = compressionContext; 150 } 151 152 public ByteString toByteString() { 153 // We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse 154 // them. 155 return ByteString.copyFrom(this.buf, 0, this.count); 156 } 157 158 @Override 159 public ByteString compress(byte[] data, Enum dictIndex) throws IOException { 160 writeCompressed(data, dictIndex); 161 // We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse 162 // them. 163 ByteString result = ByteString.copyFrom(this.buf, 0, this.count); 164 reset(); // Only resets the count - we reuse the byte array. 165 return result; 166 } 167 168 private void writeCompressed(byte[] data, Enum dictIndex) throws IOException { 169 Dictionary dict = compressionContext.getDictionary(dictIndex); 170 assert dict != null; 171 short dictIdx = dict.findEntry(data, 0, data.length); 172 if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { 173 write(Dictionary.NOT_IN_DICTIONARY); 174 StreamUtils.writeRawVInt32(this, data.length); 175 write(data, 0, data.length); 176 } else { 177 StreamUtils.writeShort(this, dictIdx); 178 } 179 } 180 } 181 182 static class NoneCompressor implements ByteStringCompressor { 183 184 @Override 185 public ByteString compress(byte[] data, Enum dictIndex) { 186 return UnsafeByteOperations.unsafeWrap(data); 187 } 188 } 189 190 static class NoneUncompressor implements ByteStringUncompressor { 191 192 @Override 193 public byte[] uncompress(ByteString data, Enum dictIndex) { 194 return data.toByteArray(); 195 } 196 } 197 198 private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException { 199 InputStream in = bs.newInput(); 200 byte status = (byte) in.read(); 201 if (status == Dictionary.NOT_IN_DICTIONARY) { 202 byte[] arr = new byte[StreamUtils.readRawVarint32(in)]; 203 int bytesRead = in.read(arr); 204 if (bytesRead != arr.length) { 205 throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead); 206 } 207 if (dict != null) dict.addEntry(arr, 0, arr.length); 208 return arr; 209 } else { 210 // Status here is the higher-order byte of index of the dictionary entry. 211 short dictIdx = StreamUtils.toShort(status, (byte) in.read()); 212 byte[] entry = dict.getEntry(dictIdx); 213 if (entry == null) { 214 throw new IOException("Missing dictionary entry for index " + dictIdx); 215 } 216 return entry; 217 } 218 } 219 220 static class CompressedKvEncoder extends BaseEncoder { 221 private final CompressionContext compression; 222 private final boolean hasValueCompression; 223 private final boolean hasTagCompression; 224 225 public CompressedKvEncoder(OutputStream out, CompressionContext compression) { 226 super(out); 227 this.compression = compression; 228 this.hasValueCompression = compression.hasValueCompression(); 229 this.hasTagCompression = compression.hasTagCompression(); 230 } 231 232 @Override 233 public void write(Cell cell) throws IOException { 234 // We first write the KeyValue infrastructure as VInts. 235 StreamUtils.writeRawVInt32(out, KeyValueUtil.keyLength(cell)); 236 StreamUtils.writeRawVInt32(out, cell.getValueLength()); 237 // To support tags 238 int tagsLength = cell.getTagsLength(); 239 StreamUtils.writeRawVInt32(out, tagsLength); 240 PrivateCellUtil.compressRow(out, cell, 241 compression.getDictionary(CompressionContext.DictionaryIndex.ROW)); 242 PrivateCellUtil.compressFamily(out, cell, 243 compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY)); 244 PrivateCellUtil.compressQualifier(out, cell, 245 compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER)); 246 // Write timestamp, type and value. 247 StreamUtils.writeLong(out, cell.getTimestamp()); 248 out.write(cell.getTypeByte()); 249 if (hasValueCompression) { 250 writeCompressedValue(out, cell); 251 } else { 252 PrivateCellUtil.writeValue(out, cell, cell.getValueLength()); 253 } 254 if (tagsLength > 0) { 255 if (hasTagCompression) { 256 // Write tags using Dictionary compression 257 PrivateCellUtil.compressTags(out, cell, compression.tagCompressionContext); 258 } else { 259 // Tag compression is disabled within the WAL compression. Just write the tags bytes as 260 // it is. 261 PrivateCellUtil.writeTags(out, cell, tagsLength); 262 } 263 } 264 } 265 266 private void writeCompressedValue(OutputStream out, Cell cell) throws IOException { 267 byte[] compressed = compression.getValueCompressor().compress(cell.getValueArray(), 268 cell.getValueOffset(), cell.getValueLength()); 269 StreamUtils.writeRawVInt32(out, compressed.length); 270 out.write(compressed); 271 } 272 273 } 274 275 static class CompressedKvDecoder extends BaseDecoder { 276 private final CompressionContext compression; 277 private final boolean hasValueCompression; 278 private final boolean hasTagCompression; 279 280 public CompressedKvDecoder(InputStream in, CompressionContext compression) { 281 super(in); 282 this.compression = compression; 283 this.hasValueCompression = compression.hasValueCompression(); 284 this.hasTagCompression = compression.hasTagCompression(); 285 } 286 287 @Override 288 protected Cell parseCell() throws IOException { 289 int keylength = StreamUtils.readRawVarint32(in); 290 int vlength = StreamUtils.readRawVarint32(in); 291 int tagsLength = StreamUtils.readRawVarint32(in); 292 int length = 0; 293 if (tagsLength == 0) { 294 length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength; 295 } else { 296 length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength; 297 } 298 299 byte[] backingArray = new byte[length]; 300 int pos = 0; 301 pos = Bytes.putInt(backingArray, pos, keylength); 302 pos = Bytes.putInt(backingArray, pos, vlength); 303 304 // the row 305 int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT, 306 compression.getDictionary(CompressionContext.DictionaryIndex.ROW)); 307 checkLength(elemLen, Short.MAX_VALUE); 308 pos = Bytes.putShort(backingArray, pos, (short) elemLen); 309 pos += elemLen; 310 311 // family 312 elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE, 313 compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY)); 314 checkLength(elemLen, Byte.MAX_VALUE); 315 pos = Bytes.putByte(backingArray, pos, (byte) elemLen); 316 pos += elemLen; 317 318 // qualifier 319 elemLen = readIntoArray(backingArray, pos, 320 compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER)); 321 pos += elemLen; 322 323 // timestamp 324 long ts = StreamUtils.readLong(in); 325 pos = Bytes.putLong(backingArray, pos, ts); 326 // type and value 327 int typeValLen = length - pos; 328 if (tagsLength > 0) { 329 typeValLen = typeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE; 330 } 331 pos = Bytes.putByte(backingArray, pos, (byte) in.read()); 332 int valLen = typeValLen - 1; 333 if (hasValueCompression) { 334 readCompressedValue(in, backingArray, pos, valLen); 335 pos += valLen; 336 } else { 337 IOUtils.readFully(in, backingArray, pos, valLen); 338 pos += valLen; 339 } 340 // tags 341 if (tagsLength > 0) { 342 pos = Bytes.putAsShort(backingArray, pos, tagsLength); 343 if (hasTagCompression) { 344 compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength); 345 } else { 346 IOUtils.readFully(in, backingArray, pos, tagsLength); 347 } 348 } 349 return new KeyValue(backingArray, 0, length); 350 } 351 352 private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException { 353 byte status = (byte) in.read(); 354 if (status == Dictionary.NOT_IN_DICTIONARY) { 355 // status byte indicating that data to be read is not in dictionary. 356 // if this isn't in the dictionary, we need to add to the dictionary. 357 int length = StreamUtils.readRawVarint32(in); 358 IOUtils.readFully(in, to, offset, length); 359 dict.addEntry(to, offset, length); 360 return length; 361 } else { 362 // the status byte also acts as the higher order byte of the dictionary entry. 363 short dictIdx = StreamUtils.toShort(status, (byte) in.read()); 364 byte[] entry = dict.getEntry(dictIdx); 365 if (entry == null) { 366 throw new IOException("Missing dictionary entry for index " + dictIdx); 367 } 368 // now we write the uncompressed value. 369 Bytes.putBytes(to, offset, entry, 0, entry.length); 370 return entry.length; 371 } 372 } 373 374 private static void checkLength(int len, int max) throws IOException { 375 if (len < 0 || len > max) { 376 throw new IOException("Invalid length for compresesed portion of keyvalue: " + len); 377 } 378 } 379 380 private void readCompressedValue(InputStream in, byte[] outArray, int outOffset, 381 int expectedLength) throws IOException { 382 int compressedLen = StreamUtils.readRawVarint32(in); 383 int read = compression.getValueCompressor().decompress(in, compressedLen, outArray, outOffset, 384 expectedLength); 385 if (read != expectedLength) { 386 throw new IOException("ValueCompressor state error: short read"); 387 } 388 } 389 390 } 391 392 public static class EnsureKvEncoder extends BaseEncoder { 393 public EnsureKvEncoder(OutputStream out) { 394 super(out); 395 } 396 397 @Override 398 public void write(Cell cell) throws IOException { 399 checkFlushed(); 400 // Make sure to write tags into WAL 401 ByteBufferUtils.putInt(this.out, KeyValueUtil.getSerializedSize(cell, true)); 402 KeyValueUtil.oswrite(cell, this.out, true); 403 } 404 } 405 406 @Override 407 public Decoder getDecoder(InputStream is) { 408 return (compression == null) 409 ? new KeyValueCodecWithTags.KeyValueDecoder(is) 410 : new CompressedKvDecoder(is, compression); 411 } 412 413 @Override 414 public Decoder getDecoder(ByteBuff buf) { 415 return getDecoder(new ByteBuffInputStream(buf)); 416 } 417 418 @Override 419 public Encoder getEncoder(OutputStream os) { 420 os = (os instanceof ByteBufferWriter) ? os : new ByteBufferWriterOutputStream(os); 421 if (compression == null) { 422 return new EnsureKvEncoder(os); 423 } 424 return new CompressedKvEncoder(os, compression); 425 } 426 427 public ByteStringCompressor getByteStringCompressor() { 428 return new BaosAndCompressor(compression); 429 } 430 431 public ByteStringUncompressor getByteStringUncompressor() { 432 return new StatelessUncompressor(compression); 433 } 434 435 public static ByteStringCompressor getNoneCompressor() { 436 return new NoneCompressor(); 437 } 438 439 public static ByteStringUncompressor getNoneUncompressor() { 440 return new NoneUncompressor(); 441 } 442}