001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with this 004 * work for additional information regarding copyright ownership. The ASF 005 * licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 014 * License for the specific language governing permissions and limitations 015 * under the License. 016 */ 017package org.apache.hadoop.hbase.io.encoding; 018 019import java.io.ByteArrayInputStream; 020import java.io.ByteArrayOutputStream; 021import java.io.DataInputStream; 022import java.io.DataOutputStream; 023import java.io.IOException; 024import java.io.OutputStream; 025import java.nio.ByteBuffer; 026import java.util.ArrayList; 027import java.util.Iterator; 028import java.util.List; 029import org.apache.commons.lang3.NotImplementedException; 030import org.apache.hadoop.hbase.Cell; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.KeyValue; 033import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 034import org.apache.hadoop.hbase.io.hfile.HFileContext; 035import org.apache.hadoop.hbase.util.ByteBufferUtils; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.io.IOUtils; 038import org.apache.hadoop.io.compress.Compressor; 039import org.apache.yetus.audience.InterfaceAudience; 040 041import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 042 043/** 044 * Encapsulates a data block compressed using a particular encoding algorithm. 045 * Useful for testing and benchmarking. 046 * This is used only in testing. 047 */ 048@InterfaceAudience.Private 049public class EncodedDataBlock { 050 private byte[] rawKVs; 051 private ByteBuffer rawBuffer; 052 private DataBlockEncoder dataBlockEncoder; 053 054 private byte[] cachedEncodedData; 055 056 private final HFileBlockEncodingContext encodingCtx; 057 private HFileContext meta; 058 059 private final DataBlockEncoding encoding; 060 061 // The is for one situation that there are some cells includes tags and others are not. 062 // isTagsLenZero stores if cell tags length is zero before doing encoding since we need 063 // to check cell tags length is zero or not after decoding. 064 // Encoders ROW_INDEX_V1 would abandon tags segment if tags is 0 after decode cells to 065 // byte array, other encoders won't do that. So we have to find a way to add tagsLen zero 066 // in the decoded byte array. 067 private List<Boolean> isTagsLenZero = new ArrayList<>(); 068 069 /** 070 * Create a buffer which will be encoded using dataBlockEncoder. 071 * @param dataBlockEncoder Algorithm used for compression. 072 * @param encoding encoding type used 073 * @param rawKVs 074 * @param meta 075 */ 076 public EncodedDataBlock(DataBlockEncoder dataBlockEncoder, DataBlockEncoding encoding, 077 byte[] rawKVs, HFileContext meta) { 078 Preconditions.checkNotNull(encoding, 079 "Cannot create encoded data block with null encoder"); 080 this.dataBlockEncoder = dataBlockEncoder; 081 this.encoding = encoding; 082 encodingCtx = dataBlockEncoder.newDataBlockEncodingContext(encoding, 083 HConstants.HFILEBLOCK_DUMMY_HEADER, meta); 084 this.rawKVs = rawKVs; 085 this.meta = meta; 086 } 087 088 /** 089 * Provides access to compressed value. 090 * @param headerSize header size of the block. 091 * @return Forwards sequential iterator. 092 */ 093 public Iterator<Cell> getIterator(int headerSize) { 094 final int rawSize = rawKVs.length; 095 byte[] encodedDataWithHeader = getEncodedData(); 096 int bytesToSkip = headerSize + Bytes.SIZEOF_SHORT; 097 ByteArrayInputStream bais = new ByteArrayInputStream(encodedDataWithHeader, 098 bytesToSkip, encodedDataWithHeader.length - bytesToSkip); 099 final DataInputStream dis = new DataInputStream(bais); 100 101 return new Iterator<Cell>() { 102 private ByteBuffer decompressedData = null; 103 private Iterator<Boolean> it = isTagsLenZero.iterator(); 104 105 @Override 106 public boolean hasNext() { 107 if (decompressedData == null) { 108 return rawSize > 0; 109 } 110 return decompressedData.hasRemaining(); 111 } 112 113 @Override 114 public Cell next() { 115 if (decompressedData == null) { 116 try { 117 decompressedData = dataBlockEncoder.decodeKeyValues(dis, dataBlockEncoder 118 .newDataBlockDecodingContext(meta)); 119 } catch (IOException e) { 120 throw new RuntimeException("Problem with data block encoder, " + 121 "most likely it requested more bytes than are available.", e); 122 } 123 decompressedData.rewind(); 124 } 125 int offset = decompressedData.position(); 126 int klen = decompressedData.getInt(); 127 int vlen = decompressedData.getInt(); 128 int tagsLen = 0; 129 ByteBufferUtils.skip(decompressedData, klen + vlen); 130 // Read the tag length in case when stream contain tags 131 if (meta.isIncludesTags()) { 132 boolean noTags = true; 133 if (it.hasNext()) { 134 noTags = it.next(); 135 } 136 // ROW_INDEX_V1 will not put tagsLen back in cell if it is zero, there is no need 137 // to read short here. 138 if (!(encoding.equals(DataBlockEncoding.ROW_INDEX_V1) && noTags)) { 139 tagsLen = ((decompressedData.get() & 0xff) << 8) ^ (decompressedData.get() & 0xff); 140 ByteBufferUtils.skip(decompressedData, tagsLen); 141 } 142 } 143 KeyValue kv = new KeyValue(decompressedData.array(), offset, 144 (int) KeyValue.getKeyValueDataStructureSize(klen, vlen, tagsLen)); 145 if (meta.isIncludesMvcc()) { 146 long mvccVersion = ByteBufferUtils.readVLong(decompressedData); 147 kv.setSequenceId(mvccVersion); 148 } 149 return kv; 150 } 151 152 @Override 153 public void remove() { 154 throw new NotImplementedException("remove() is not supported!"); 155 } 156 157 @Override 158 public String toString() { 159 return "Iterator of: " + dataBlockEncoder.getClass().getName(); 160 } 161 162 }; 163 } 164 165 /** 166 * Find the size of minimal buffer that could store compressed data. 167 * @return Size in bytes of compressed data. 168 */ 169 public int getSize() { 170 return getEncodedData().length; 171 } 172 173 /** 174 * Find the size of compressed data assuming that buffer will be compressed 175 * using given algorithm. 176 * @param algo compression algorithm 177 * @param compressor compressor already requested from codec 178 * @param inputBuffer Array to be compressed. 179 * @param offset Offset to beginning of the data. 180 * @param length Length to be compressed. 181 * @return Size of compressed data in bytes. 182 * @throws IOException 183 */ 184 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION", 185 justification="No sure what findbugs wants but looks to me like no NPE") 186 public static int getCompressedSize(Algorithm algo, Compressor compressor, 187 byte[] inputBuffer, int offset, int length) throws IOException { 188 189 // Create streams 190 // Storing them so we can close them 191 final IOUtils.NullOutputStream nullOutputStream = new IOUtils.NullOutputStream(); 192 final DataOutputStream compressedStream = new DataOutputStream(nullOutputStream); 193 OutputStream compressingStream = null; 194 195 196 try { 197 if (compressor != null) { 198 compressor.reset(); 199 } 200 201 compressingStream = algo.createCompressionStream(compressedStream, compressor, 0); 202 203 compressingStream.write(inputBuffer, offset, length); 204 compressingStream.flush(); 205 206 return compressedStream.size(); 207 } finally { 208 nullOutputStream.close(); 209 compressedStream.close(); 210 compressingStream.close(); 211 } 212 } 213 214 /** 215 * Estimate size after second stage of compression (e.g. LZO). 216 * @param comprAlgo compression algorithm to be used for compression 217 * @param compressor compressor corresponding to the given compression 218 * algorithm 219 * @return Size after second stage of compression. 220 */ 221 public int getEncodedCompressedSize(Algorithm comprAlgo, 222 Compressor compressor) throws IOException { 223 byte[] compressedBytes = getEncodedData(); 224 return getCompressedSize(comprAlgo, compressor, compressedBytes, 0, 225 compressedBytes.length); 226 } 227 228 /** @return encoded data with header */ 229 private byte[] getEncodedData() { 230 if (cachedEncodedData != null) { 231 return cachedEncodedData; 232 } 233 cachedEncodedData = encodeData(); 234 return cachedEncodedData; 235 } 236 237 private ByteBuffer getUncompressedBuffer() { 238 if (rawBuffer == null || rawBuffer.limit() < rawKVs.length) { 239 rawBuffer = ByteBuffer.wrap(rawKVs); 240 } 241 return rawBuffer; 242 } 243 244 /** 245 * Do the encoding, but do not cache the encoded data. 246 * @return encoded data block with header and checksum 247 */ 248 public byte[] encodeData() { 249 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 250 byte [] baosBytes = null; 251 try { 252 baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER); 253 DataOutputStream out = new DataOutputStream(baos); 254 this.dataBlockEncoder.startBlockEncoding(encodingCtx, out); 255 ByteBuffer in = getUncompressedBuffer(); 256 in.rewind(); 257 int klength, vlength; 258 int tagsLength = 0; 259 long memstoreTS = 0L; 260 KeyValue kv = null; 261 while (in.hasRemaining()) { 262 int kvOffset = in.position(); 263 klength = in.getInt(); 264 vlength = in.getInt(); 265 ByteBufferUtils.skip(in, klength + vlength); 266 if (this.meta.isIncludesTags()) { 267 tagsLength = ((in.get() & 0xff) << 8) ^ (in.get() & 0xff); 268 ByteBufferUtils.skip(in, tagsLength); 269 this.isTagsLenZero.add(tagsLength == 0); 270 } 271 if (this.meta.isIncludesMvcc()) { 272 memstoreTS = ByteBufferUtils.readVLong(in); 273 } 274 kv = new KeyValue(in.array(), kvOffset, (int) KeyValue.getKeyValueDataStructureSize( 275 klength, vlength, tagsLength)); 276 kv.setSequenceId(memstoreTS); 277 this.dataBlockEncoder.encode(kv, encodingCtx, out); 278 } 279 // Below depends on BAOS internal behavior. toByteArray makes a copy of bytes so far. 280 baos.flush(); 281 baosBytes = baos.toByteArray(); 282 this.dataBlockEncoder.endBlockEncoding(encodingCtx, out, baosBytes); 283 // In endBlockEncoding(encodingCtx, out, baosBytes), Encoder ROW_INDEX_V1 write integer in 284 // out while the others write integer in baosBytes(byte array). We need to add 285 // baos.toByteArray() after endBlockEncoding again to make sure the integer writes in 286 // outputstream with Encoder ROW_INDEX_V1 dump to byte array (baosBytes). 287 // The if branch is necessary because Encoders excepts ROW_INDEX_V1 write integer in 288 // baosBytes directly, without if branch and do toByteArray() again, baosBytes won't 289 // contains the integer wrotten in endBlockEncoding. 290 if (this.encoding.equals(DataBlockEncoding.ROW_INDEX_V1)) { 291 baosBytes = baos.toByteArray(); 292 } 293 } catch (IOException e) { 294 throw new RuntimeException(String.format( 295 "Bug in encoding part of algorithm %s. " + 296 "Probably it requested more bytes than are available.", 297 toString()), e); 298 } 299 return baosBytes; 300 } 301 302 @Override 303 public String toString() { 304 return encoding.name(); 305 } 306}