001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.encoding; 019 020import java.io.ByteArrayInputStream; 021import java.io.ByteArrayOutputStream; 022import java.io.DataInputStream; 023import java.io.DataOutputStream; 024import java.io.IOException; 025import java.io.OutputStream; 026import java.nio.ByteBuffer; 027import java.util.ArrayList; 028import java.util.Iterator; 029import java.util.List; 030import org.apache.commons.lang3.NotImplementedException; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.ExtendedCell; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.KeyValue; 035import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 036import org.apache.hadoop.hbase.io.hfile.HFileContext; 037import org.apache.hadoop.hbase.util.ByteBufferUtils; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.io.IOUtils; 040import org.apache.hadoop.io.compress.Compressor; 041import org.apache.yetus.audience.InterfaceAudience; 042 043import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 044 045/** 046 * Encapsulates a data block compressed using a particular encoding algorithm. Useful for testing 047 * and benchmarking. This is used only in testing. 048 */ 049@InterfaceAudience.Private 050public class EncodedDataBlock { 051 private byte[] rawKVs; 052 private ByteBuffer rawBuffer; 053 private DataBlockEncoder dataBlockEncoder; 054 055 private byte[] cachedEncodedData; 056 057 private final HFileBlockEncodingContext encodingCtx; 058 private HFileContext meta; 059 060 private final DataBlockEncoding encoding; 061 private final Configuration conf; 062 063 // The is for one situation that there are some cells includes tags and others are not. 064 // isTagsLenZero stores if cell tags length is zero before doing encoding since we need 065 // to check cell tags length is zero or not after decoding. 066 // Encoders ROW_INDEX_V1 would abandon tags segment if tags is 0 after decode cells to 067 // byte array, other encoders won't do that. So we have to find a way to add tagsLen zero 068 // in the decoded byte array. 069 private List<Boolean> isTagsLenZero = new ArrayList<>(); 070 071 /** 072 * Create a buffer which will be encoded using dataBlockEncoder. 073 * @param conf store configuration 074 * @param dataBlockEncoder Algorithm used for compression. 075 * @param encoding encoding type used 076 * @param rawKVs raw KVs 077 * @param meta hfile context 078 */ 079 public EncodedDataBlock(Configuration conf, DataBlockEncoder dataBlockEncoder, 080 DataBlockEncoding encoding, byte[] rawKVs, HFileContext meta) { 081 Preconditions.checkNotNull(encoding, "Cannot create encoded data block with null encoder"); 082 this.dataBlockEncoder = dataBlockEncoder; 083 this.encoding = encoding; 084 encodingCtx = dataBlockEncoder.newDataBlockEncodingContext(conf, encoding, 085 HConstants.HFILEBLOCK_DUMMY_HEADER, meta); 086 this.rawKVs = rawKVs; 087 this.meta = meta; 088 this.conf = conf; 089 } 090 091 /** 092 * Provides access to compressed value. 093 * @param headerSize header size of the block. 094 * @return Forwards sequential iterator. 095 */ 096 public Iterator<ExtendedCell> getIterator(int headerSize) { 097 final int rawSize = rawKVs.length; 098 byte[] encodedDataWithHeader = getEncodedData(); 099 int bytesToSkip = headerSize + Bytes.SIZEOF_SHORT; 100 ByteArrayInputStream bais = new ByteArrayInputStream(encodedDataWithHeader, bytesToSkip, 101 encodedDataWithHeader.length - bytesToSkip); 102 final DataInputStream dis = new DataInputStream(bais); 103 104 return new Iterator<ExtendedCell>() { 105 private ByteBuffer decompressedData = null; 106 private Iterator<Boolean> it = isTagsLenZero.iterator(); 107 108 @Override 109 public boolean hasNext() { 110 if (decompressedData == null) { 111 return rawSize > 0; 112 } 113 return decompressedData.hasRemaining(); 114 } 115 116 @Override 117 public ExtendedCell next() { 118 if (decompressedData == null) { 119 try { 120 decompressedData = dataBlockEncoder.decodeKeyValues(dis, 121 dataBlockEncoder.newDataBlockDecodingContext(conf, meta)); 122 } catch (IOException e) { 123 throw new RuntimeException("Problem with data block encoder, " 124 + "most likely it requested more bytes than are available.", e); 125 } 126 decompressedData.rewind(); 127 } 128 int offset = decompressedData.position(); 129 int klen = decompressedData.getInt(); 130 int vlen = decompressedData.getInt(); 131 int tagsLen = 0; 132 ByteBufferUtils.skip(decompressedData, klen + vlen); 133 // Read the tag length in case when stream contain tags 134 if (meta.isIncludesTags()) { 135 boolean noTags = true; 136 if (it.hasNext()) { 137 noTags = it.next(); 138 } 139 // ROW_INDEX_V1 will not put tagsLen back in cell if it is zero, there is no need 140 // to read short here. 141 if (!(encoding.equals(DataBlockEncoding.ROW_INDEX_V1) && noTags)) { 142 tagsLen = ((decompressedData.get() & 0xff) << 8) ^ (decompressedData.get() & 0xff); 143 ByteBufferUtils.skip(decompressedData, tagsLen); 144 } 145 } 146 KeyValue kv = 147 new KeyValue(decompressedData.array(), decompressedData.arrayOffset() + offset, 148 (int) KeyValue.getKeyValueDataStructureSize(klen, vlen, tagsLen)); 149 if (meta.isIncludesMvcc()) { 150 long mvccVersion = ByteBufferUtils.readVLong(decompressedData); 151 kv.setSequenceId(mvccVersion); 152 } 153 return kv; 154 } 155 156 @Override 157 public void remove() { 158 throw new NotImplementedException("remove() is not supported!"); 159 } 160 161 @Override 162 public String toString() { 163 return "Iterator of: " + dataBlockEncoder.getClass().getName(); 164 } 165 166 }; 167 } 168 169 /** 170 * Find the size of minimal buffer that could store compressed data. 171 * @return Size in bytes of compressed data. 172 */ 173 public int getSize() { 174 return getEncodedData().length; 175 } 176 177 /** 178 * Find the size of compressed data assuming that buffer will be compressed using given algorithm. 179 * @param algo compression algorithm 180 * @param compressor compressor already requested from codec 181 * @param inputBuffer Array to be compressed. 182 * @param offset Offset to beginning of the data. 183 * @param length Length to be compressed. 184 * @return Size of compressed data in bytes. 185 */ 186 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH_EXCEPTION", 187 justification = "No sure what findbugs wants but looks to me like no NPE") 188 public static int getCompressedSize(Algorithm algo, Compressor compressor, byte[] inputBuffer, 189 int offset, int length) throws IOException { 190 191 // Create streams 192 // Storing them so we can close them 193 final IOUtils.NullOutputStream nullOutputStream = new IOUtils.NullOutputStream(); 194 final DataOutputStream compressedStream = new DataOutputStream(nullOutputStream); 195 OutputStream compressingStream = null; 196 197 try { 198 if (compressor != null) { 199 compressor.reset(); 200 } 201 202 compressingStream = algo.createCompressionStream(compressedStream, compressor, 0); 203 204 compressingStream.write(inputBuffer, offset, length); 205 compressingStream.flush(); 206 207 return compressedStream.size(); 208 } finally { 209 nullOutputStream.close(); 210 compressedStream.close(); 211 if (compressingStream != null) { 212 compressingStream.close(); 213 } 214 } 215 } 216 217 /** 218 * Estimate size after second stage of compression (e.g. LZO). 219 * @param comprAlgo compression algorithm to be used for compression 220 * @param compressor compressor corresponding to the given compression algorithm 221 * @return Size after second stage of compression. 222 */ 223 public int getEncodedCompressedSize(Algorithm comprAlgo, Compressor compressor) 224 throws IOException { 225 byte[] compressedBytes = getEncodedData(); 226 return getCompressedSize(comprAlgo, compressor, compressedBytes, 0, compressedBytes.length); 227 } 228 229 /** Returns encoded data with header */ 230 private byte[] getEncodedData() { 231 if (cachedEncodedData != null) { 232 return cachedEncodedData; 233 } 234 cachedEncodedData = encodeData(); 235 return cachedEncodedData; 236 } 237 238 private ByteBuffer getUncompressedBuffer() { 239 if (rawBuffer == null || rawBuffer.limit() < rawKVs.length) { 240 rawBuffer = ByteBuffer.wrap(rawKVs); 241 } 242 return rawBuffer; 243 } 244 245 /** 246 * Do the encoding, but do not cache the encoded data. 247 * @return encoded data block with header and checksum 248 */ 249 public byte[] encodeData() { 250 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 251 byte[] baosBytes = null; 252 try { 253 baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER); 254 DataOutputStream out = new DataOutputStream(baos); 255 this.dataBlockEncoder.startBlockEncoding(encodingCtx, out); 256 ByteBuffer in = getUncompressedBuffer(); 257 in.rewind(); 258 int klength, vlength; 259 int tagsLength = 0; 260 long memstoreTS = 0L; 261 KeyValue kv = null; 262 while (in.hasRemaining()) { 263 int kvOffset = in.position(); 264 klength = in.getInt(); 265 vlength = in.getInt(); 266 ByteBufferUtils.skip(in, klength + vlength); 267 if (this.meta.isIncludesTags()) { 268 tagsLength = ((in.get() & 0xff) << 8) ^ (in.get() & 0xff); 269 ByteBufferUtils.skip(in, tagsLength); 270 this.isTagsLenZero.add(tagsLength == 0); 271 } 272 if (this.meta.isIncludesMvcc()) { 273 memstoreTS = ByteBufferUtils.readVLong(in); 274 } 275 kv = new KeyValue(in.array(), in.arrayOffset() + kvOffset, 276 (int) KeyValue.getKeyValueDataStructureSize(klength, vlength, tagsLength)); 277 kv.setSequenceId(memstoreTS); 278 this.dataBlockEncoder.encode(kv, encodingCtx, out); 279 } 280 // Below depends on BAOS internal behavior. toByteArray makes a copy of bytes so far. 281 baos.flush(); 282 baosBytes = baos.toByteArray(); 283 this.dataBlockEncoder.endBlockEncoding(encodingCtx, out, baosBytes); 284 // In endBlockEncoding(encodingCtx, out, baosBytes), Encoder ROW_INDEX_V1 write integer in 285 // out while the others write integer in baosBytes(byte array). We need to add 286 // baos.toByteArray() after endBlockEncoding again to make sure the integer writes in 287 // outputstream with Encoder ROW_INDEX_V1 dump to byte array (baosBytes). 288 // The if branch is necessary because Encoders excepts ROW_INDEX_V1 write integer in 289 // baosBytes directly, without if branch and do toByteArray() again, baosBytes won't 290 // contains the integer wrotten in endBlockEncoding. 291 if (this.encoding.equals(DataBlockEncoding.ROW_INDEX_V1)) { 292 baosBytes = baos.toByteArray(); 293 } 294 } catch (IOException e) { 295 throw new RuntimeException(String.format("Bug in encoding part of algorithm %s. " 296 + "Probably it requested more bytes than are available.", toString()), e); 297 } 298 return baosBytes; 299 } 300 301 @Override 302 public String toString() { 303 return encoding.name(); 304 } 305}