001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.ByteArrayOutputStream; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.HBaseInterfaceAudience; 028import org.apache.hadoop.hbase.PrivateCellUtil; 029import org.apache.hadoop.hbase.KeyValue; 030import org.apache.hadoop.hbase.KeyValueUtil; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.apache.hadoop.hbase.codec.BaseDecoder; 033import org.apache.hadoop.hbase.codec.BaseEncoder; 034import org.apache.hadoop.hbase.codec.Codec; 035import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; 036import org.apache.hadoop.hbase.io.ByteBuffInputStream; 037import org.apache.hadoop.hbase.io.ByteBufferWriter; 038import org.apache.hadoop.hbase.io.ByteBufferWriterOutputStream; 039import org.apache.hadoop.hbase.io.util.Dictionary; 040import org.apache.hadoop.hbase.io.util.StreamUtils; 041import org.apache.hadoop.hbase.nio.ByteBuff; 042import org.apache.hadoop.hbase.util.ByteBufferUtils; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.ReflectionUtils; 045import org.apache.hadoop.io.IOUtils; 046 047import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 048 049 050/** 051 * Compression in this class is lifted off Compressor/KeyValueCompression. 052 * This is a pure coincidence... they are independent and don't have to be compatible. 053 * 054 * This codec is used at server side for writing cells to WAL as well as for sending edits 055 * as part of the distributed splitting process. 056 */ 057@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, 058 HBaseInterfaceAudience.PHOENIX, HBaseInterfaceAudience.CONFIG}) 059public class WALCellCodec implements Codec { 060 /** Configuration key for the class to use when encoding cells in the WAL */ 061 public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec"; 062 063 protected final CompressionContext compression; 064 protected final ByteStringUncompressor statelessUncompressor = new ByteStringUncompressor() { 065 @Override 066 public byte[] uncompress(ByteString data, Dictionary dict) throws IOException { 067 return WALCellCodec.uncompressByteString(data, dict); 068 } 069 }; 070 071 /** 072 * <b>All subclasses must implement a no argument constructor</b> 073 */ 074 public WALCellCodec() { 075 this.compression = null; 076 } 077 078 /** 079 * Default constructor - <b>all subclasses must implement a constructor with this signature </b> 080 * if they are to be dynamically loaded from the {@link Configuration}. 081 * @param conf configuration to configure <tt>this</tt> 082 * @param compression compression the codec should support, can be <tt>null</tt> to indicate no 083 * compression 084 */ 085 public WALCellCodec(Configuration conf, CompressionContext compression) { 086 this.compression = compression; 087 } 088 089 public static Class<?> getWALCellCodecClass(Configuration conf) { 090 return conf.getClass(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class); 091 } 092 093 /** 094 * Create and setup a {@link WALCellCodec} from the {@code cellCodecClsName} and 095 * CompressionContext, if {@code cellCodecClsName} is specified. 096 * Otherwise Cell Codec classname is read from {@link Configuration}. 097 * Fully prepares the codec for use. 098 * @param conf {@link Configuration} to read for the user-specified codec. If none is specified, 099 * uses a {@link WALCellCodec}. 100 * @param cellCodecClsName name of codec 101 * @param compression compression the codec should use 102 * @return a {@link WALCellCodec} ready for use. 103 * @throws UnsupportedOperationException if the codec cannot be instantiated 104 */ 105 106 public static WALCellCodec create(Configuration conf, String cellCodecClsName, 107 CompressionContext compression) throws UnsupportedOperationException { 108 if (cellCodecClsName == null) { 109 cellCodecClsName = getWALCellCodecClass(conf).getName(); 110 } 111 return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[] 112 { Configuration.class, CompressionContext.class }, new Object[] { conf, compression }); 113 } 114 115 /** 116 * Create and setup a {@link WALCellCodec} from the 117 * CompressionContext. 118 * Cell Codec classname is read from {@link Configuration}. 119 * Fully prepares the codec for use. 120 * @param conf {@link Configuration} to read for the user-specified codec. If none is specified, 121 * uses a {@link WALCellCodec}. 122 * @param compression compression the codec should use 123 * @return a {@link WALCellCodec} ready for use. 124 * @throws UnsupportedOperationException if the codec cannot be instantiated 125 */ 126 public static WALCellCodec create(Configuration conf, 127 CompressionContext compression) throws UnsupportedOperationException { 128 String cellCodecClsName = getWALCellCodecClass(conf).getName(); 129 return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[] 130 { Configuration.class, CompressionContext.class }, new Object[] { conf, compression }); 131 } 132 133 public interface ByteStringCompressor { 134 ByteString compress(byte[] data, Dictionary dict) throws IOException; 135 } 136 137 public interface ByteStringUncompressor { 138 byte[] uncompress(ByteString data, Dictionary dict) throws IOException; 139 } 140 141 // TODO: it sucks that compression context is in WAL.Entry. It'd be nice if it was here. 142 // Dictionary could be gotten by enum; initially, based on enum, context would create 143 // an array of dictionaries. 144 static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor { 145 public ByteString toByteString() { 146 // We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse 147 // them. 148 return ByteString.copyFrom(this.buf, 0, this.count); 149 } 150 151 @Override 152 public ByteString compress(byte[] data, Dictionary dict) throws IOException { 153 writeCompressed(data, dict); 154 // We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse 155 // them. 156 ByteString result = ByteString.copyFrom(this.buf, 0, this.count); 157 reset(); // Only resets the count - we reuse the byte array. 158 return result; 159 } 160 161 private void writeCompressed(byte[] data, Dictionary dict) throws IOException { 162 assert dict != null; 163 short dictIdx = dict.findEntry(data, 0, data.length); 164 if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { 165 write(Dictionary.NOT_IN_DICTIONARY); 166 StreamUtils.writeRawVInt32(this, data.length); 167 write(data, 0, data.length); 168 } else { 169 StreamUtils.writeShort(this, dictIdx); 170 } 171 } 172 } 173 174 private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException { 175 InputStream in = bs.newInput(); 176 byte status = (byte)in.read(); 177 if (status == Dictionary.NOT_IN_DICTIONARY) { 178 byte[] arr = new byte[StreamUtils.readRawVarint32(in)]; 179 int bytesRead = in.read(arr); 180 if (bytesRead != arr.length) { 181 throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead); 182 } 183 if (dict != null) dict.addEntry(arr, 0, arr.length); 184 return arr; 185 } else { 186 // Status here is the higher-order byte of index of the dictionary entry. 187 short dictIdx = StreamUtils.toShort(status, (byte)in.read()); 188 byte[] entry = dict.getEntry(dictIdx); 189 if (entry == null) { 190 throw new IOException("Missing dictionary entry for index " + dictIdx); 191 } 192 return entry; 193 } 194 } 195 196 static class CompressedKvEncoder extends BaseEncoder { 197 private final CompressionContext compression; 198 public CompressedKvEncoder(OutputStream out, CompressionContext compression) { 199 super(out); 200 this.compression = compression; 201 } 202 203 @Override 204 public void write(Cell cell) throws IOException { 205 // We first write the KeyValue infrastructure as VInts. 206 StreamUtils.writeRawVInt32(out, KeyValueUtil.keyLength(cell)); 207 StreamUtils.writeRawVInt32(out, cell.getValueLength()); 208 // To support tags 209 int tagsLength = cell.getTagsLength(); 210 StreamUtils.writeRawVInt32(out, tagsLength); 211 PrivateCellUtil.compressRow(out, cell, compression.rowDict); 212 PrivateCellUtil.compressFamily(out, cell, compression.familyDict); 213 PrivateCellUtil.compressQualifier(out, cell, compression.qualifierDict); 214 // Write timestamp, type and value as uncompressed. 215 StreamUtils.writeLong(out, cell.getTimestamp()); 216 out.write(cell.getTypeByte()); 217 PrivateCellUtil.writeValue(out, cell, cell.getValueLength()); 218 if (tagsLength > 0) { 219 if (compression.tagCompressionContext != null) { 220 // Write tags using Dictionary compression 221 PrivateCellUtil.compressTags(out, cell, compression.tagCompressionContext); 222 } else { 223 // Tag compression is disabled within the WAL compression. Just write the tags bytes as 224 // it is. 225 PrivateCellUtil.writeTags(out, cell, tagsLength); 226 } 227 } 228 } 229 } 230 231 static class CompressedKvDecoder extends BaseDecoder { 232 private final CompressionContext compression; 233 public CompressedKvDecoder(InputStream in, CompressionContext compression) { 234 super(in); 235 this.compression = compression; 236 } 237 238 @Override 239 protected Cell parseCell() throws IOException { 240 int keylength = StreamUtils.readRawVarint32(in); 241 int vlength = StreamUtils.readRawVarint32(in); 242 243 int tagsLength = StreamUtils.readRawVarint32(in); 244 int length = 0; 245 if(tagsLength == 0) { 246 length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength; 247 } else { 248 length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength; 249 } 250 251 byte[] backingArray = new byte[length]; 252 int pos = 0; 253 pos = Bytes.putInt(backingArray, pos, keylength); 254 pos = Bytes.putInt(backingArray, pos, vlength); 255 256 // the row 257 int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT, compression.rowDict); 258 checkLength(elemLen, Short.MAX_VALUE); 259 pos = Bytes.putShort(backingArray, pos, (short)elemLen); 260 pos += elemLen; 261 262 // family 263 elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE, compression.familyDict); 264 checkLength(elemLen, Byte.MAX_VALUE); 265 pos = Bytes.putByte(backingArray, pos, (byte)elemLen); 266 pos += elemLen; 267 268 // qualifier 269 elemLen = readIntoArray(backingArray, pos, compression.qualifierDict); 270 pos += elemLen; 271 272 // timestamp, type and value 273 int tsTypeValLen = length - pos; 274 if (tagsLength > 0) { 275 tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE; 276 } 277 IOUtils.readFully(in, backingArray, pos, tsTypeValLen); 278 pos += tsTypeValLen; 279 280 // tags 281 if (tagsLength > 0) { 282 pos = Bytes.putAsShort(backingArray, pos, tagsLength); 283 if (compression.tagCompressionContext != null) { 284 compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength); 285 } else { 286 IOUtils.readFully(in, backingArray, pos, tagsLength); 287 } 288 } 289 return new KeyValue(backingArray, 0, length); 290 } 291 292 private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException { 293 byte status = (byte)in.read(); 294 if (status == Dictionary.NOT_IN_DICTIONARY) { 295 // status byte indicating that data to be read is not in dictionary. 296 // if this isn't in the dictionary, we need to add to the dictionary. 297 int length = StreamUtils.readRawVarint32(in); 298 IOUtils.readFully(in, to, offset, length); 299 dict.addEntry(to, offset, length); 300 return length; 301 } else { 302 // the status byte also acts as the higher order byte of the dictionary entry. 303 short dictIdx = StreamUtils.toShort(status, (byte)in.read()); 304 byte[] entry = dict.getEntry(dictIdx); 305 if (entry == null) { 306 throw new IOException("Missing dictionary entry for index " + dictIdx); 307 } 308 // now we write the uncompressed value. 309 Bytes.putBytes(to, offset, entry, 0, entry.length); 310 return entry.length; 311 } 312 } 313 314 private static void checkLength(int len, int max) throws IOException { 315 if (len < 0 || len > max) { 316 throw new IOException("Invalid length for compresesed portion of keyvalue: " + len); 317 } 318 } 319 } 320 321 public static class EnsureKvEncoder extends BaseEncoder { 322 public EnsureKvEncoder(OutputStream out) { 323 super(out); 324 } 325 @Override 326 public void write(Cell cell) throws IOException { 327 checkFlushed(); 328 // Make sure to write tags into WAL 329 ByteBufferUtils.putInt(this.out, KeyValueUtil.getSerializedSize(cell, true)); 330 KeyValueUtil.oswrite(cell, this.out, true); 331 } 332 } 333 334 @Override 335 public Decoder getDecoder(InputStream is) { 336 return (compression == null) 337 ? new KeyValueCodecWithTags.KeyValueDecoder(is) : new CompressedKvDecoder(is, compression); 338 } 339 340 @Override 341 public Decoder getDecoder(ByteBuff buf) { 342 return getDecoder(new ByteBuffInputStream(buf)); 343 } 344 345 @Override 346 public Encoder getEncoder(OutputStream os) { 347 os = (os instanceof ByteBufferWriter) ? os 348 : new ByteBufferWriterOutputStream(os); 349 if (compression == null) { 350 return new EnsureKvEncoder(os); 351 } 352 return new CompressedKvEncoder(os, compression); 353 } 354 355 public ByteStringCompressor getByteStringCompressor() { 356 // TODO: ideally this should also encapsulate compressionContext 357 return new BaosAndCompressor(); 358 } 359 360 public ByteStringUncompressor getByteStringUncompressor() { 361 // TODO: ideally this should also encapsulate compressionContext 362 return this.statelessUncompressor; 363 } 364}