001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.encoding; 019 020import java.io.DataInputStream; 021import java.io.DataOutput; 022import java.io.DataOutputStream; 023import java.io.IOException; 024import java.nio.ByteBuffer; 025import org.apache.hadoop.hbase.ExtendedCell; 026import org.apache.hadoop.hbase.KeyValue; 027import org.apache.hadoop.hbase.KeyValueUtil; 028import org.apache.hadoop.hbase.PrivateCellUtil; 029import org.apache.hadoop.hbase.nio.ByteBuff; 030import org.apache.hadoop.hbase.util.ByteBufferUtils; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.yetus.audience.InterfaceAudience; 033 034/** 035 * Compress key by storing size of common prefix with previous KeyValue and storing raw size of 036 * rest. Format: 1-5 bytes: compressed key length minus prefix (7-bit encoding) 1-5 bytes: 037 * compressed value length (7-bit encoding) 1-3 bytes: compressed length of common key prefix ... 038 * bytes: rest of key (including timestamp) ... bytes: value In a worst case compressed KeyValue 039 * will be three bytes longer than original. 040 */ 041@InterfaceAudience.Private 042public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder { 043 044 @Override 045 public int internalEncode(ExtendedCell cell, HFileBlockDefaultEncodingContext encodingContext, 046 DataOutputStream out) throws IOException { 047 int klength = KeyValueUtil.keyLength(cell); 048 int vlength = cell.getValueLength(); 049 EncodingState state = encodingContext.getEncodingState(); 050 if (state.prevCell == null) { 051 // copy the key, there is no common prefix with none 052 ByteBufferUtils.putCompressedInt(out, klength); 053 ByteBufferUtils.putCompressedInt(out, vlength); 054 ByteBufferUtils.putCompressedInt(out, 0); 055 PrivateCellUtil.writeFlatKey(cell, (DataOutput) out); 056 } else { 057 // find a common prefix and skip it 058 int common = PrivateCellUtil.findCommonPrefixInFlatKey(cell, state.prevCell, true, true); 059 ByteBufferUtils.putCompressedInt(out, klength - common); 060 ByteBufferUtils.putCompressedInt(out, vlength); 061 ByteBufferUtils.putCompressedInt(out, common); 062 writeKeyExcludingCommon(cell, common, out); 063 } 064 // Write the value part 065 PrivateCellUtil.writeValue(out, cell, vlength); 066 int size = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; 067 size += afterEncodingKeyValue(cell, out, encodingContext); 068 state.prevCell = cell; 069 return size; 070 } 071 072 private void writeKeyExcludingCommon(ExtendedCell cell, int commonPrefix, DataOutputStream out) 073 throws IOException { 074 short rLen = cell.getRowLength(); 075 if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) { 076 // Previous and current rows are different. Need to write the differing part followed by 077 // cf,q,ts and type 078 PrivateCellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out); 079 byte fLen = cell.getFamilyLength(); 080 out.writeByte(fLen); 081 PrivateCellUtil.writeFamily(out, cell, fLen); 082 PrivateCellUtil.writeQualifier(out, cell, cell.getQualifierLength()); 083 out.writeLong(cell.getTimestamp()); 084 out.writeByte(cell.getTypeByte()); 085 } else { 086 // The full row key part is common. CF part will be common for sure as we deal with Cells in 087 // same family. Just need write the differing part in q, ts and type 088 commonPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE) 089 - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE); 090 int qLen = cell.getQualifierLength(); 091 int commonQualPrefix = Math.min(commonPrefix, qLen); 092 int qualPartLenToWrite = qLen - commonQualPrefix; 093 if (qualPartLenToWrite > 0) { 094 PrivateCellUtil.writeQualifierSkippingBytes(out, cell, qLen, commonQualPrefix); 095 } 096 commonPrefix -= commonQualPrefix; 097 // Common part in TS also? 098 if (commonPrefix > 0) { 099 int commonTimestampPrefix = Math.min(commonPrefix, KeyValue.TIMESTAMP_SIZE); 100 if (commonTimestampPrefix < KeyValue.TIMESTAMP_SIZE) { 101 byte[] curTsBuf = Bytes.toBytes(cell.getTimestamp()); 102 out.write(curTsBuf, commonTimestampPrefix, 103 KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix); 104 } 105 commonPrefix -= commonTimestampPrefix; 106 if (commonPrefix == 0) { 107 out.writeByte(cell.getTypeByte()); 108 } 109 } else { 110 out.writeLong(cell.getTimestamp()); 111 out.writeByte(cell.getTypeByte()); 112 } 113 } 114 } 115 116 @Override 117 protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength, 118 int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException { 119 int decompressedSize = source.readInt(); 120 ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + allocateHeaderLength); 121 buffer.position(allocateHeaderLength); 122 int prevKeyOffset = 0; 123 124 while (source.available() > skipLastBytes) { 125 prevKeyOffset = decodeKeyValue(source, buffer, prevKeyOffset); 126 afterDecodingKeyValue(source, buffer, decodingCtx); 127 } 128 129 if (source.available() != skipLastBytes) { 130 throw new IllegalStateException("Read too many bytes."); 131 } 132 133 buffer.limit(buffer.position()); 134 return buffer; 135 } 136 137 private int decodeKeyValue(DataInputStream source, ByteBuffer buffer, int prevKeyOffset) 138 throws IOException, EncoderBufferTooSmallException { 139 int keyLength = ByteBufferUtils.readCompressedInt(source); 140 int valueLength = ByteBufferUtils.readCompressedInt(source); 141 int commonLength = ByteBufferUtils.readCompressedInt(source); 142 int keyOffset; 143 keyLength += commonLength; 144 145 ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET); 146 147 buffer.putInt(keyLength); 148 buffer.putInt(valueLength); 149 150 // copy the prefix 151 if (commonLength > 0) { 152 keyOffset = buffer.position(); 153 ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, prevKeyOffset, commonLength); 154 } else { 155 keyOffset = buffer.position(); 156 } 157 158 // copy rest of the key and value 159 int len = keyLength - commonLength + valueLength; 160 ByteBufferUtils.copyFromStreamToBuffer(buffer, source, len); 161 return keyOffset; 162 } 163 164 @Override 165 public ExtendedCell getFirstKeyCellInBlock(ByteBuff block) { 166 block.mark(); 167 block.position(Bytes.SIZEOF_INT); 168 int keyLength = ByteBuff.readCompressedInt(block); 169 // TODO : See if we can avoid these reads as the read values are not getting used 170 ByteBuff.readCompressedInt(block); 171 int commonLength = ByteBuff.readCompressedInt(block); 172 if (commonLength != 0) { 173 throw new AssertionError( 174 "Nonzero common length in the first key in " + "block: " + commonLength); 175 } 176 ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate(); 177 block.reset(); 178 return createFirstKeyCell(key, keyLength); 179 } 180 181 @Override 182 public String toString() { 183 return PrefixKeyDeltaEncoder.class.getSimpleName(); 184 } 185 186 @Override 187 public EncodedSeeker createSeeker(final HFileBlockDecodingContext decodingCtx) { 188 return new SeekerStateBufferedEncodedSeeker(decodingCtx); 189 } 190 191 private static class SeekerStateBufferedEncodedSeeker extends BufferedEncodedSeeker<SeekerState> { 192 193 private SeekerStateBufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) { 194 super(decodingCtx); 195 } 196 197 @Override 198 protected void decodeNext() { 199 current.keyLength = ByteBuff.readCompressedInt(currentBuffer); 200 current.valueLength = ByteBuff.readCompressedInt(currentBuffer); 201 current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer); 202 current.keyLength += current.lastCommonPrefix; 203 current.ensureSpaceForKey(); 204 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, 205 current.keyLength - current.lastCommonPrefix); 206 current.valueOffset = currentBuffer.position(); 207 currentBuffer.skip(current.valueLength); 208 if (includesTags()) { 209 decodeTags(); 210 } 211 if (includesMvcc()) { 212 current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer); 213 } else { 214 current.memstoreTS = 0; 215 } 216 current.nextKvOffset = currentBuffer.position(); 217 } 218 219 @Override 220 protected void decodeFirst() { 221 currentBuffer.skip(Bytes.SIZEOF_INT); 222 decodeNext(); 223 } 224 } 225}