001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with this 004 * work for additional information regarding copyright ownership. The ASF 005 * licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 014 * License for the specific language governing permissions and limitations 015 * under the License. 016 */ 017package org.apache.hadoop.hbase.io.encoding; 018 019import java.io.ByteArrayInputStream; 020import java.io.ByteArrayOutputStream; 021import java.io.DataInputStream; 022import java.io.DataOutputStream; 023import java.io.IOException; 024import java.io.OutputStream; 025import java.nio.ByteBuffer; 026import java.util.Iterator; 027 028import org.apache.commons.lang3.NotImplementedException; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.KeyValue; 032import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 033import org.apache.hadoop.hbase.io.hfile.HFileContext; 034import org.apache.hadoop.hbase.util.ByteBufferUtils; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.io.IOUtils; 037import org.apache.hadoop.io.compress.Compressor; 038import org.apache.yetus.audience.InterfaceAudience; 039 040import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 041import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 042 043/** 044 * Encapsulates a data block compressed using a particular encoding algorithm. 045 * Useful for testing and benchmarking. 046 * This is used only in testing. 047 */ 048@InterfaceAudience.Private 049@VisibleForTesting 050public class EncodedDataBlock { 051 private byte[] rawKVs; 052 private ByteBuffer rawBuffer; 053 private DataBlockEncoder dataBlockEncoder; 054 055 private byte[] cachedEncodedData; 056 057 private final HFileBlockEncodingContext encodingCtx; 058 private HFileContext meta; 059 060 /** 061 * Create a buffer which will be encoded using dataBlockEncoder. 062 * @param dataBlockEncoder Algorithm used for compression. 063 * @param encoding encoding type used 064 * @param rawKVs 065 * @param meta 066 */ 067 public EncodedDataBlock(DataBlockEncoder dataBlockEncoder, DataBlockEncoding encoding, 068 byte[] rawKVs, HFileContext meta) { 069 Preconditions.checkNotNull(encoding, 070 "Cannot create encoded data block with null encoder"); 071 this.dataBlockEncoder = dataBlockEncoder; 072 encodingCtx = dataBlockEncoder.newDataBlockEncodingContext(encoding, 073 HConstants.HFILEBLOCK_DUMMY_HEADER, meta); 074 this.rawKVs = rawKVs; 075 this.meta = meta; 076 } 077 078 /** 079 * Provides access to compressed value. 080 * @param headerSize header size of the block. 081 * @return Forwards sequential iterator. 082 */ 083 public Iterator<Cell> getIterator(int headerSize) { 084 final int rawSize = rawKVs.length; 085 byte[] encodedDataWithHeader = getEncodedData(); 086 int bytesToSkip = headerSize + Bytes.SIZEOF_SHORT; 087 ByteArrayInputStream bais = new ByteArrayInputStream(encodedDataWithHeader, 088 bytesToSkip, encodedDataWithHeader.length - bytesToSkip); 089 final DataInputStream dis = new DataInputStream(bais); 090 091 return new Iterator<Cell>() { 092 private ByteBuffer decompressedData = null; 093 094 @Override 095 public boolean hasNext() { 096 if (decompressedData == null) { 097 return rawSize > 0; 098 } 099 return decompressedData.hasRemaining(); 100 } 101 102 @Override 103 public Cell next() { 104 if (decompressedData == null) { 105 try { 106 decompressedData = dataBlockEncoder.decodeKeyValues(dis, dataBlockEncoder 107 .newDataBlockDecodingContext(meta)); 108 } catch (IOException e) { 109 throw new RuntimeException("Problem with data block encoder, " + 110 "most likely it requested more bytes than are available.", e); 111 } 112 decompressedData.rewind(); 113 } 114 int offset = decompressedData.position(); 115 int klen = decompressedData.getInt(); 116 int vlen = decompressedData.getInt(); 117 int tagsLen = 0; 118 ByteBufferUtils.skip(decompressedData, klen + vlen); 119 // Read the tag length in case when steam contain tags 120 if (meta.isIncludesTags()) { 121 tagsLen = ((decompressedData.get() & 0xff) << 8) ^ (decompressedData.get() & 0xff); 122 ByteBufferUtils.skip(decompressedData, tagsLen); 123 } 124 KeyValue kv = new KeyValue(decompressedData.array(), offset, 125 (int) KeyValue.getKeyValueDataStructureSize(klen, vlen, tagsLen)); 126 if (meta.isIncludesMvcc()) { 127 long mvccVersion = ByteBufferUtils.readVLong(decompressedData); 128 kv.setSequenceId(mvccVersion); 129 } 130 return kv; 131 } 132 133 @Override 134 public void remove() { 135 throw new NotImplementedException("remove() is not supported!"); 136 } 137 138 @Override 139 public String toString() { 140 return "Iterator of: " + dataBlockEncoder.getClass().getName(); 141 } 142 143 }; 144 } 145 146 /** 147 * Find the size of minimal buffer that could store compressed data. 148 * @return Size in bytes of compressed data. 149 */ 150 public int getSize() { 151 return getEncodedData().length; 152 } 153 154 /** 155 * Find the size of compressed data assuming that buffer will be compressed 156 * using given algorithm. 157 * @param algo compression algorithm 158 * @param compressor compressor already requested from codec 159 * @param inputBuffer Array to be compressed. 160 * @param offset Offset to beginning of the data. 161 * @param length Length to be compressed. 162 * @return Size of compressed data in bytes. 163 * @throws IOException 164 */ 165 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION", 166 justification="No sure what findbugs wants but looks to me like no NPE") 167 public static int getCompressedSize(Algorithm algo, Compressor compressor, 168 byte[] inputBuffer, int offset, int length) throws IOException { 169 170 // Create streams 171 // Storing them so we can close them 172 final IOUtils.NullOutputStream nullOutputStream = new IOUtils.NullOutputStream(); 173 final DataOutputStream compressedStream = new DataOutputStream(nullOutputStream); 174 OutputStream compressingStream = null; 175 176 177 try { 178 if (compressor != null) { 179 compressor.reset(); 180 } 181 182 compressingStream = algo.createCompressionStream(compressedStream, compressor, 0); 183 184 compressingStream.write(inputBuffer, offset, length); 185 compressingStream.flush(); 186 187 return compressedStream.size(); 188 } finally { 189 nullOutputStream.close(); 190 compressedStream.close(); 191 compressingStream.close(); 192 } 193 } 194 195 /** 196 * Estimate size after second stage of compression (e.g. LZO). 197 * @param comprAlgo compression algorithm to be used for compression 198 * @param compressor compressor corresponding to the given compression 199 * algorithm 200 * @return Size after second stage of compression. 201 */ 202 public int getEncodedCompressedSize(Algorithm comprAlgo, 203 Compressor compressor) throws IOException { 204 byte[] compressedBytes = getEncodedData(); 205 return getCompressedSize(comprAlgo, compressor, compressedBytes, 0, 206 compressedBytes.length); 207 } 208 209 /** @return encoded data with header */ 210 private byte[] getEncodedData() { 211 if (cachedEncodedData != null) { 212 return cachedEncodedData; 213 } 214 cachedEncodedData = encodeData(); 215 return cachedEncodedData; 216 } 217 218 private ByteBuffer getUncompressedBuffer() { 219 if (rawBuffer == null || rawBuffer.limit() < rawKVs.length) { 220 rawBuffer = ByteBuffer.wrap(rawKVs); 221 } 222 return rawBuffer; 223 } 224 225 /** 226 * Do the encoding, but do not cache the encoded data. 227 * @return encoded data block with header and checksum 228 */ 229 public byte[] encodeData() { 230 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 231 byte [] baosBytes = null; 232 try { 233 baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER); 234 DataOutputStream out = new DataOutputStream(baos); 235 this.dataBlockEncoder.startBlockEncoding(encodingCtx, out); 236 ByteBuffer in = getUncompressedBuffer(); 237 in.rewind(); 238 int klength, vlength; 239 int tagsLength = 0; 240 long memstoreTS = 0L; 241 KeyValue kv = null; 242 while (in.hasRemaining()) { 243 int kvOffset = in.position(); 244 klength = in.getInt(); 245 vlength = in.getInt(); 246 ByteBufferUtils.skip(in, klength + vlength); 247 if (this.meta.isIncludesTags()) { 248 tagsLength = ((in.get() & 0xff) << 8) ^ (in.get() & 0xff); 249 ByteBufferUtils.skip(in, tagsLength); 250 } 251 if (this.meta.isIncludesMvcc()) { 252 memstoreTS = ByteBufferUtils.readVLong(in); 253 } 254 kv = new KeyValue(in.array(), kvOffset, (int) KeyValue.getKeyValueDataStructureSize( 255 klength, vlength, tagsLength)); 256 kv.setSequenceId(memstoreTS); 257 this.dataBlockEncoder.encode(kv, encodingCtx, out); 258 } 259 // Below depends on BAOS internal behavior. toByteArray makes a copy of bytes so far. 260 baos.flush(); 261 baosBytes = baos.toByteArray(); 262 this.dataBlockEncoder.endBlockEncoding(encodingCtx, out, baosBytes); 263 } catch (IOException e) { 264 throw new RuntimeException(String.format( 265 "Bug in encoding part of algorithm %s. " + 266 "Probably it requested more bytes than are available.", 267 toString()), e); 268 } 269 return baosBytes; 270 } 271 272 @Override 273 public String toString() { 274 return dataBlockEncoder.toString(); 275 } 276}