001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.ByteArrayOutputStream; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.HBaseInterfaceAudience; 028import org.apache.hadoop.hbase.PrivateCellUtil; 029import org.apache.hadoop.hbase.KeyValue; 030import org.apache.hadoop.hbase.KeyValueUtil; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.apache.hadoop.hbase.codec.BaseDecoder; 033import org.apache.hadoop.hbase.codec.BaseEncoder; 034import org.apache.hadoop.hbase.codec.Codec; 035import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; 036import org.apache.hadoop.hbase.io.ByteBuffInputStream; 037import org.apache.hadoop.hbase.io.ByteBufferWriter; 038import org.apache.hadoop.hbase.io.ByteBufferWriterOutputStream; 039import org.apache.hadoop.hbase.io.util.Dictionary; 040import org.apache.hadoop.hbase.io.util.StreamUtils; 041import org.apache.hadoop.hbase.nio.ByteBuff; 042import org.apache.hadoop.hbase.util.ByteBufferUtils; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.ReflectionUtils; 045import org.apache.hadoop.io.IOUtils; 046 047import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 048import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 049 050 051/** 052 * Compression in this class is lifted off Compressor/KeyValueCompression. 053 * This is a pure coincidence... they are independent and don't have to be compatible. 054 * 055 * This codec is used at server side for writing cells to WAL as well as for sending edits 056 * as part of the distributed splitting process. 057 */ 058@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, 059 HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG}) 060public class WALCellCodec implements Codec { 061 /** Configuration key for the class to use when encoding cells in the WAL */ 062 public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec"; 063 064 protected final CompressionContext compression; 065 066 /** 067 * <b>All subclasses must implement a no argument constructor</b> 068 */ 069 public WALCellCodec() { 070 this.compression = null; 071 } 072 073 /** 074 * Default constructor - <b>all subclasses must implement a constructor with this signature </b> 075 * if they are to be dynamically loaded from the {@link Configuration}. 076 * @param conf configuration to configure <tt>this</tt> 077 * @param compression compression the codec should support, can be <tt>null</tt> to indicate no 078 * compression 079 */ 080 public WALCellCodec(Configuration conf, CompressionContext compression) { 081 this.compression = compression; 082 } 083 084 public static Class<?> getWALCellCodecClass(Configuration conf) { 085 return conf.getClass(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class); 086 } 087 088 /** 089 * Create and setup a {@link WALCellCodec} from the {@code cellCodecClsName} and 090 * CompressionContext, if {@code cellCodecClsName} is specified. 091 * Otherwise Cell Codec classname is read from {@link Configuration}. 092 * Fully prepares the codec for use. 093 * @param conf {@link Configuration} to read for the user-specified codec. If none is specified, 094 * uses a {@link WALCellCodec}. 095 * @param cellCodecClsName name of codec 096 * @param compression compression the codec should use 097 * @return a {@link WALCellCodec} ready for use. 098 * @throws UnsupportedOperationException if the codec cannot be instantiated 099 */ 100 101 public static WALCellCodec create(Configuration conf, String cellCodecClsName, 102 CompressionContext compression) throws UnsupportedOperationException { 103 if (cellCodecClsName == null) { 104 cellCodecClsName = getWALCellCodecClass(conf).getName(); 105 } 106 return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[] 107 { Configuration.class, CompressionContext.class }, new Object[] { conf, compression }); 108 } 109 110 /** 111 * Create and setup a {@link WALCellCodec} from the 112 * CompressionContext. 113 * Cell Codec classname is read from {@link Configuration}. 114 * Fully prepares the codec for use. 115 * @param conf {@link Configuration} to read for the user-specified codec. If none is specified, 116 * uses a {@link WALCellCodec}. 117 * @param compression compression the codec should use 118 * @return a {@link WALCellCodec} ready for use. 119 * @throws UnsupportedOperationException if the codec cannot be instantiated 120 */ 121 public static WALCellCodec create(Configuration conf, 122 CompressionContext compression) throws UnsupportedOperationException { 123 String cellCodecClsName = getWALCellCodecClass(conf).getName(); 124 return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[] 125 { Configuration.class, CompressionContext.class }, new Object[] { conf, compression }); 126 } 127 128 public interface ByteStringCompressor { 129 ByteString compress(byte[] data, Enum dictIndex) throws IOException; 130 } 131 132 public interface ByteStringUncompressor { 133 byte[] uncompress(ByteString data, Enum dictIndex) throws IOException; 134 } 135 136 static class StatelessUncompressor implements ByteStringUncompressor { 137 CompressionContext compressionContext; 138 139 public StatelessUncompressor(CompressionContext compressionContext) { 140 this.compressionContext = compressionContext; 141 } 142 143 @Override 144 public byte[] uncompress(ByteString data, Enum dictIndex) throws IOException { 145 return WALCellCodec.uncompressByteString(data, compressionContext.getDictionary(dictIndex)); 146 } 147 } 148 149 static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor { 150 private CompressionContext compressionContext; 151 152 public BaosAndCompressor(CompressionContext compressionContext) { 153 this.compressionContext = compressionContext; 154 } 155 public ByteString toByteString() { 156 // We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse 157 // them. 158 return ByteString.copyFrom(this.buf, 0, this.count); 159 } 160 161 @Override 162 public ByteString compress(byte[] data, Enum dictIndex) throws IOException { 163 writeCompressed(data, dictIndex); 164 // We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse 165 // them. 166 ByteString result = ByteString.copyFrom(this.buf, 0, this.count); 167 reset(); // Only resets the count - we reuse the byte array. 168 return result; 169 } 170 171 private void writeCompressed(byte[] data, Enum dictIndex) throws IOException { 172 Dictionary dict = compressionContext.getDictionary(dictIndex); 173 assert dict != null; 174 short dictIdx = dict.findEntry(data, 0, data.length); 175 if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { 176 write(Dictionary.NOT_IN_DICTIONARY); 177 StreamUtils.writeRawVInt32(this, data.length); 178 write(data, 0, data.length); 179 } else { 180 StreamUtils.writeShort(this, dictIdx); 181 } 182 } 183 } 184 185 static class NoneCompressor implements ByteStringCompressor { 186 187 @Override 188 public ByteString compress(byte[] data, Enum dictIndex) { 189 return UnsafeByteOperations.unsafeWrap(data); 190 } 191 } 192 193 static class NoneUncompressor implements ByteStringUncompressor { 194 195 @Override 196 public byte[] uncompress(ByteString data, Enum dictIndex) { 197 return data.toByteArray(); 198 } 199 } 200 201 private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException { 202 InputStream in = bs.newInput(); 203 byte status = (byte)in.read(); 204 if (status == Dictionary.NOT_IN_DICTIONARY) { 205 byte[] arr = new byte[StreamUtils.readRawVarint32(in)]; 206 int bytesRead = in.read(arr); 207 if (bytesRead != arr.length) { 208 throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead); 209 } 210 if (dict != null) dict.addEntry(arr, 0, arr.length); 211 return arr; 212 } else { 213 // Status here is the higher-order byte of index of the dictionary entry. 214 short dictIdx = StreamUtils.toShort(status, (byte)in.read()); 215 byte[] entry = dict.getEntry(dictIdx); 216 if (entry == null) { 217 throw new IOException("Missing dictionary entry for index " + dictIdx); 218 } 219 return entry; 220 } 221 } 222 223 static class CompressedKvEncoder extends BaseEncoder { 224 private final CompressionContext compression; 225 public CompressedKvEncoder(OutputStream out, CompressionContext compression) { 226 super(out); 227 this.compression = compression; 228 } 229 230 @Override 231 public void write(Cell cell) throws IOException { 232 // We first write the KeyValue infrastructure as VInts. 233 StreamUtils.writeRawVInt32(out, KeyValueUtil.keyLength(cell)); 234 StreamUtils.writeRawVInt32(out, cell.getValueLength()); 235 // To support tags 236 int tagsLength = cell.getTagsLength(); 237 StreamUtils.writeRawVInt32(out, tagsLength); 238 PrivateCellUtil.compressRow(out, cell, 239 compression.getDictionary(CompressionContext.DictionaryIndex.ROW)); 240 PrivateCellUtil.compressFamily(out, cell, 241 compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY)); 242 PrivateCellUtil.compressQualifier(out, cell, 243 compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER)); 244 // Write timestamp, type and value as uncompressed. 245 StreamUtils.writeLong(out, cell.getTimestamp()); 246 out.write(cell.getTypeByte()); 247 PrivateCellUtil.writeValue(out, cell, cell.getValueLength()); 248 if (tagsLength > 0) { 249 if (compression.tagCompressionContext != null) { 250 // Write tags using Dictionary compression 251 PrivateCellUtil.compressTags(out, cell, compression.tagCompressionContext); 252 } else { 253 // Tag compression is disabled within the WAL compression. Just write the tags bytes as 254 // it is. 255 PrivateCellUtil.writeTags(out, cell, tagsLength); 256 } 257 } 258 } 259 } 260 261 static class CompressedKvDecoder extends BaseDecoder { 262 private final CompressionContext compression; 263 public CompressedKvDecoder(InputStream in, CompressionContext compression) { 264 super(in); 265 this.compression = compression; 266 } 267 268 @Override 269 protected Cell parseCell() throws IOException { 270 int keylength = StreamUtils.readRawVarint32(in); 271 int vlength = StreamUtils.readRawVarint32(in); 272 273 int tagsLength = StreamUtils.readRawVarint32(in); 274 int length = 0; 275 if(tagsLength == 0) { 276 length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength; 277 } else { 278 length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength; 279 } 280 281 byte[] backingArray = new byte[length]; 282 int pos = 0; 283 pos = Bytes.putInt(backingArray, pos, keylength); 284 pos = Bytes.putInt(backingArray, pos, vlength); 285 286 // the row 287 int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT, 288 compression.getDictionary(CompressionContext.DictionaryIndex.ROW)); 289 checkLength(elemLen, Short.MAX_VALUE); 290 pos = Bytes.putShort(backingArray, pos, (short)elemLen); 291 pos += elemLen; 292 293 // family 294 elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE, 295 compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY)); 296 checkLength(elemLen, Byte.MAX_VALUE); 297 pos = Bytes.putByte(backingArray, pos, (byte)elemLen); 298 pos += elemLen; 299 300 // qualifier 301 elemLen = readIntoArray(backingArray, pos, 302 compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER)); 303 pos += elemLen; 304 305 // timestamp, type and value 306 int tsTypeValLen = length - pos; 307 if (tagsLength > 0) { 308 tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE; 309 } 310 IOUtils.readFully(in, backingArray, pos, tsTypeValLen); 311 pos += tsTypeValLen; 312 313 // tags 314 if (tagsLength > 0) { 315 pos = Bytes.putAsShort(backingArray, pos, tagsLength); 316 if (compression.tagCompressionContext != null) { 317 compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength); 318 } else { 319 IOUtils.readFully(in, backingArray, pos, tagsLength); 320 } 321 } 322 return new KeyValue(backingArray, 0, length); 323 } 324 325 private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException { 326 byte status = (byte)in.read(); 327 if (status == Dictionary.NOT_IN_DICTIONARY) { 328 // status byte indicating that data to be read is not in dictionary. 329 // if this isn't in the dictionary, we need to add to the dictionary. 330 int length = StreamUtils.readRawVarint32(in); 331 IOUtils.readFully(in, to, offset, length); 332 dict.addEntry(to, offset, length); 333 return length; 334 } else { 335 // the status byte also acts as the higher order byte of the dictionary entry. 336 short dictIdx = StreamUtils.toShort(status, (byte)in.read()); 337 byte[] entry = dict.getEntry(dictIdx); 338 if (entry == null) { 339 throw new IOException("Missing dictionary entry for index " + dictIdx); 340 } 341 // now we write the uncompressed value. 342 Bytes.putBytes(to, offset, entry, 0, entry.length); 343 return entry.length; 344 } 345 } 346 347 private static void checkLength(int len, int max) throws IOException { 348 if (len < 0 || len > max) { 349 throw new IOException("Invalid length for compresesed portion of keyvalue: " + len); 350 } 351 } 352 } 353 354 public static class EnsureKvEncoder extends BaseEncoder { 355 public EnsureKvEncoder(OutputStream out) { 356 super(out); 357 } 358 @Override 359 public void write(Cell cell) throws IOException { 360 checkFlushed(); 361 // Make sure to write tags into WAL 362 ByteBufferUtils.putInt(this.out, KeyValueUtil.getSerializedSize(cell, true)); 363 KeyValueUtil.oswrite(cell, this.out, true); 364 } 365 } 366 367 @Override 368 public Decoder getDecoder(InputStream is) { 369 return (compression == null) 370 ? new KeyValueCodecWithTags.KeyValueDecoder(is) : new CompressedKvDecoder(is, compression); 371 } 372 373 @Override 374 public Decoder getDecoder(ByteBuff buf) { 375 return getDecoder(new ByteBuffInputStream(buf)); 376 } 377 378 @Override 379 public Encoder getEncoder(OutputStream os) { 380 os = (os instanceof ByteBufferWriter) ? os 381 : new ByteBufferWriterOutputStream(os); 382 if (compression == null) { 383 return new EnsureKvEncoder(os); 384 } 385 return new CompressedKvEncoder(os, compression); 386 } 387 388 public ByteStringCompressor getByteStringCompressor() { 389 return new BaosAndCompressor(compression); 390 } 391 392 public ByteStringUncompressor getByteStringUncompressor() { 393 return new StatelessUncompressor(compression); 394 } 395 396 public static ByteStringCompressor getNoneCompressor() { 397 return new NoneCompressor(); 398 } 399 400 public static ByteStringUncompressor getNoneUncompressor() { 401 return new NoneUncompressor(); 402 } 403}