001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with this 004 * work for additional information regarding copyright ownership. The ASF 005 * licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 014 * License for the specific language governing permissions and limitations 015 * under the License. 016 */ 017package org.apache.hadoop.hbase.io.encoding; 018 019import java.io.ByteArrayInputStream; 020import java.io.ByteArrayOutputStream; 021import java.io.DataInputStream; 022import java.io.DataOutputStream; 023import java.io.IOException; 024import java.io.OutputStream; 025import java.nio.ByteBuffer; 026import java.util.ArrayList; 027import java.util.Iterator; 028import java.util.List; 029 030import org.apache.commons.lang3.NotImplementedException; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.KeyValue; 034import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 035import org.apache.hadoop.hbase.io.hfile.HFileContext; 036import org.apache.hadoop.hbase.util.ByteBufferUtils; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.io.IOUtils; 039import org.apache.hadoop.io.compress.Compressor; 040import org.apache.yetus.audience.InterfaceAudience; 041 042import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 043import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 044 045/** 046 * Encapsulates a data block compressed using a particular encoding algorithm. 047 * Useful for testing and benchmarking. 048 * This is used only in testing. 049 */ 050@InterfaceAudience.Private 051@VisibleForTesting 052public class EncodedDataBlock { 053 private byte[] rawKVs; 054 private ByteBuffer rawBuffer; 055 private DataBlockEncoder dataBlockEncoder; 056 057 private byte[] cachedEncodedData; 058 059 private final HFileBlockEncodingContext encodingCtx; 060 private HFileContext meta; 061 062 private final DataBlockEncoding encoding; 063 064 // The is for one situation that there are some cells includes tags and others are not. 065 // isTagsLenZero stores if cell tags length is zero before doing encoding since we need 066 // to check cell tags length is zero or not after decoding. 067 // Encoders ROW_INDEX_V1 would abandon tags segment if tags is 0 after decode cells to 068 // byte array, other encoders won't do that. So we have to find a way to add tagsLen zero 069 // in the decoded byte array. 070 private List<Boolean> isTagsLenZero = new ArrayList<>(); 071 072 /** 073 * Create a buffer which will be encoded using dataBlockEncoder. 074 * @param dataBlockEncoder Algorithm used for compression. 075 * @param encoding encoding type used 076 * @param rawKVs 077 * @param meta 078 */ 079 public EncodedDataBlock(DataBlockEncoder dataBlockEncoder, DataBlockEncoding encoding, 080 byte[] rawKVs, HFileContext meta) { 081 Preconditions.checkNotNull(encoding, 082 "Cannot create encoded data block with null encoder"); 083 this.dataBlockEncoder = dataBlockEncoder; 084 this.encoding = encoding; 085 encodingCtx = dataBlockEncoder.newDataBlockEncodingContext(encoding, 086 HConstants.HFILEBLOCK_DUMMY_HEADER, meta); 087 this.rawKVs = rawKVs; 088 this.meta = meta; 089 } 090 091 /** 092 * Provides access to compressed value. 093 * @param headerSize header size of the block. 094 * @return Forwards sequential iterator. 095 */ 096 public Iterator<Cell> getIterator(int headerSize) { 097 final int rawSize = rawKVs.length; 098 byte[] encodedDataWithHeader = getEncodedData(); 099 int bytesToSkip = headerSize + Bytes.SIZEOF_SHORT; 100 ByteArrayInputStream bais = new ByteArrayInputStream(encodedDataWithHeader, 101 bytesToSkip, encodedDataWithHeader.length - bytesToSkip); 102 final DataInputStream dis = new DataInputStream(bais); 103 104 return new Iterator<Cell>() { 105 private ByteBuffer decompressedData = null; 106 private Iterator<Boolean> it = isTagsLenZero.iterator(); 107 108 @Override 109 public boolean hasNext() { 110 if (decompressedData == null) { 111 return rawSize > 0; 112 } 113 return decompressedData.hasRemaining(); 114 } 115 116 @Override 117 public Cell next() { 118 if (decompressedData == null) { 119 try { 120 decompressedData = dataBlockEncoder.decodeKeyValues(dis, dataBlockEncoder 121 .newDataBlockDecodingContext(meta)); 122 } catch (IOException e) { 123 throw new RuntimeException("Problem with data block encoder, " + 124 "most likely it requested more bytes than are available.", e); 125 } 126 decompressedData.rewind(); 127 } 128 int offset = decompressedData.position(); 129 int klen = decompressedData.getInt(); 130 int vlen = decompressedData.getInt(); 131 int tagsLen = 0; 132 ByteBufferUtils.skip(decompressedData, klen + vlen); 133 // Read the tag length in case when stream contain tags 134 if (meta.isIncludesTags()) { 135 boolean noTags = true; 136 if (it.hasNext()) { 137 noTags = it.next(); 138 } 139 // ROW_INDEX_V1 will not put tagsLen back in cell if it is zero, there is no need 140 // to read short here. 141 if (!(encoding.equals(DataBlockEncoding.ROW_INDEX_V1) && noTags)) { 142 tagsLen = ((decompressedData.get() & 0xff) << 8) ^ (decompressedData.get() & 0xff); 143 ByteBufferUtils.skip(decompressedData, tagsLen); 144 } 145 } 146 KeyValue kv = new KeyValue(decompressedData.array(), offset, 147 (int) KeyValue.getKeyValueDataStructureSize(klen, vlen, tagsLen)); 148 if (meta.isIncludesMvcc()) { 149 long mvccVersion = ByteBufferUtils.readVLong(decompressedData); 150 kv.setSequenceId(mvccVersion); 151 } 152 return kv; 153 } 154 155 @Override 156 public void remove() { 157 throw new NotImplementedException("remove() is not supported!"); 158 } 159 160 @Override 161 public String toString() { 162 return "Iterator of: " + dataBlockEncoder.getClass().getName(); 163 } 164 165 }; 166 } 167 168 /** 169 * Find the size of minimal buffer that could store compressed data. 170 * @return Size in bytes of compressed data. 171 */ 172 public int getSize() { 173 return getEncodedData().length; 174 } 175 176 /** 177 * Find the size of compressed data assuming that buffer will be compressed 178 * using given algorithm. 179 * @param algo compression algorithm 180 * @param compressor compressor already requested from codec 181 * @param inputBuffer Array to be compressed. 182 * @param offset Offset to beginning of the data. 183 * @param length Length to be compressed. 184 * @return Size of compressed data in bytes. 185 * @throws IOException 186 */ 187 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION", 188 justification="No sure what findbugs wants but looks to me like no NPE") 189 public static int getCompressedSize(Algorithm algo, Compressor compressor, 190 byte[] inputBuffer, int offset, int length) throws IOException { 191 192 // Create streams 193 // Storing them so we can close them 194 final IOUtils.NullOutputStream nullOutputStream = new IOUtils.NullOutputStream(); 195 final DataOutputStream compressedStream = new DataOutputStream(nullOutputStream); 196 OutputStream compressingStream = null; 197 198 199 try { 200 if (compressor != null) { 201 compressor.reset(); 202 } 203 204 compressingStream = algo.createCompressionStream(compressedStream, compressor, 0); 205 206 compressingStream.write(inputBuffer, offset, length); 207 compressingStream.flush(); 208 209 return compressedStream.size(); 210 } finally { 211 nullOutputStream.close(); 212 compressedStream.close(); 213 compressingStream.close(); 214 } 215 } 216 217 /** 218 * Estimate size after second stage of compression (e.g. LZO). 219 * @param comprAlgo compression algorithm to be used for compression 220 * @param compressor compressor corresponding to the given compression 221 * algorithm 222 * @return Size after second stage of compression. 223 */ 224 public int getEncodedCompressedSize(Algorithm comprAlgo, 225 Compressor compressor) throws IOException { 226 byte[] compressedBytes = getEncodedData(); 227 return getCompressedSize(comprAlgo, compressor, compressedBytes, 0, 228 compressedBytes.length); 229 } 230 231 /** @return encoded data with header */ 232 private byte[] getEncodedData() { 233 if (cachedEncodedData != null) { 234 return cachedEncodedData; 235 } 236 cachedEncodedData = encodeData(); 237 return cachedEncodedData; 238 } 239 240 private ByteBuffer getUncompressedBuffer() { 241 if (rawBuffer == null || rawBuffer.limit() < rawKVs.length) { 242 rawBuffer = ByteBuffer.wrap(rawKVs); 243 } 244 return rawBuffer; 245 } 246 247 /** 248 * Do the encoding, but do not cache the encoded data. 249 * @return encoded data block with header and checksum 250 */ 251 public byte[] encodeData() { 252 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 253 byte [] baosBytes = null; 254 try { 255 baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER); 256 DataOutputStream out = new DataOutputStream(baos); 257 this.dataBlockEncoder.startBlockEncoding(encodingCtx, out); 258 ByteBuffer in = getUncompressedBuffer(); 259 in.rewind(); 260 int klength, vlength; 261 int tagsLength = 0; 262 long memstoreTS = 0L; 263 KeyValue kv = null; 264 while (in.hasRemaining()) { 265 int kvOffset = in.position(); 266 klength = in.getInt(); 267 vlength = in.getInt(); 268 ByteBufferUtils.skip(in, klength + vlength); 269 if (this.meta.isIncludesTags()) { 270 tagsLength = ((in.get() & 0xff) << 8) ^ (in.get() & 0xff); 271 ByteBufferUtils.skip(in, tagsLength); 272 this.isTagsLenZero.add(tagsLength == 0); 273 } 274 if (this.meta.isIncludesMvcc()) { 275 memstoreTS = ByteBufferUtils.readVLong(in); 276 } 277 kv = new KeyValue(in.array(), kvOffset, (int) KeyValue.getKeyValueDataStructureSize( 278 klength, vlength, tagsLength)); 279 kv.setSequenceId(memstoreTS); 280 this.dataBlockEncoder.encode(kv, encodingCtx, out); 281 } 282 // Below depends on BAOS internal behavior. toByteArray makes a copy of bytes so far. 283 baos.flush(); 284 baosBytes = baos.toByteArray(); 285 this.dataBlockEncoder.endBlockEncoding(encodingCtx, out, baosBytes); 286 // In endBlockEncoding(encodingCtx, out, baosBytes), Encoder ROW_INDEX_V1 write integer in 287 // out while the others write integer in baosBytes(byte array). We need to add 288 // baos.toByteArray() after endBlockEncoding again to make sure the integer writes in 289 // outputstream with Encoder ROW_INDEX_V1 dump to byte array (baosBytes). 290 // The if branch is necessary because Encoders excepts ROW_INDEX_V1 write integer in 291 // baosBytes directly, without if branch and do toByteArray() again, baosBytes won't 292 // contains the integer wrotten in endBlockEncoding. 293 if (this.encoding.equals(DataBlockEncoding.ROW_INDEX_V1)) { 294 baosBytes = baos.toByteArray(); 295 } 296 } catch (IOException e) { 297 throw new RuntimeException(String.format( 298 "Bug in encoding part of algorithm %s. " + 299 "Probably it requested more bytes than are available.", 300 toString()), e); 301 } 302 return baosBytes; 303 } 304 305 @Override 306 public String toString() { 307 return encoding.name(); 308 } 309}