001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with this 004 * work for additional information regarding copyright ownership. The ASF 005 * licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 014 * License for the specific language governing permissions and limitations 015 * under the License. 016 */ 017package org.apache.hadoop.hbase.io.encoding; 018 019import java.io.DataInputStream; 020import java.io.DataOutput; 021import java.io.DataOutputStream; 022import java.io.IOException; 023import java.nio.ByteBuffer; 024import org.apache.hadoop.hbase.Cell; 025import org.apache.hadoop.hbase.KeyValue; 026import org.apache.hadoop.hbase.KeyValueUtil; 027import org.apache.hadoop.hbase.PrivateCellUtil; 028import org.apache.hadoop.hbase.nio.ByteBuff; 029import org.apache.hadoop.hbase.util.ByteBufferUtils; 030import org.apache.hadoop.hbase.util.Bytes; 031import org.apache.yetus.audience.InterfaceAudience; 032 033/** 034 * Compress key by storing size of common prefix with previous KeyValue 035 * and storing raw size of rest. 036 * 037 * Format: 038 * 1-5 bytes: compressed key length minus prefix (7-bit encoding) 039 * 1-5 bytes: compressed value length (7-bit encoding) 040 * 1-3 bytes: compressed length of common key prefix 041 * ... bytes: rest of key (including timestamp) 042 * ... bytes: value 043 * 044 * In a worst case compressed KeyValue will be three bytes longer than original. 045 * 046 */ 047@InterfaceAudience.Private 048public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder { 049 050 @Override 051 public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext, 052 DataOutputStream out) throws IOException { 053 int klength = KeyValueUtil.keyLength(cell); 054 int vlength = cell.getValueLength(); 055 EncodingState state = encodingContext.getEncodingState(); 056 if (state.prevCell == null) { 057 // copy the key, there is no common prefix with none 058 ByteBufferUtils.putCompressedInt(out, klength); 059 ByteBufferUtils.putCompressedInt(out, vlength); 060 ByteBufferUtils.putCompressedInt(out, 0); 061 PrivateCellUtil.writeFlatKey(cell, (DataOutput)out); 062 } else { 063 // find a common prefix and skip it 064 int common = PrivateCellUtil.findCommonPrefixInFlatKey(cell, state.prevCell, true, true); 065 ByteBufferUtils.putCompressedInt(out, klength - common); 066 ByteBufferUtils.putCompressedInt(out, vlength); 067 ByteBufferUtils.putCompressedInt(out, common); 068 writeKeyExcludingCommon(cell, common, out); 069 } 070 // Write the value part 071 PrivateCellUtil.writeValue(out, cell, vlength); 072 int size = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; 073 size += afterEncodingKeyValue(cell, out, encodingContext); 074 state.prevCell = cell; 075 return size; 076 } 077 078 private void writeKeyExcludingCommon(Cell cell, int commonPrefix, DataOutputStream out) 079 throws IOException { 080 short rLen = cell.getRowLength(); 081 if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) { 082 // Previous and current rows are different. Need to write the differing part followed by 083 // cf,q,ts and type 084 PrivateCellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out); 085 byte fLen = cell.getFamilyLength(); 086 out.writeByte(fLen); 087 PrivateCellUtil.writeFamily(out, cell, fLen); 088 PrivateCellUtil.writeQualifier(out, cell, cell.getQualifierLength()); 089 out.writeLong(cell.getTimestamp()); 090 out.writeByte(cell.getTypeByte()); 091 } else { 092 // The full row key part is common. CF part will be common for sure as we deal with Cells in 093 // same family. Just need write the differing part in q, ts and type 094 commonPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE) 095 - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE); 096 int qLen = cell.getQualifierLength(); 097 int commonQualPrefix = Math.min(commonPrefix, qLen); 098 int qualPartLenToWrite = qLen - commonQualPrefix; 099 if (qualPartLenToWrite > 0) { 100 PrivateCellUtil.writeQualifierSkippingBytes(out, cell, qLen, commonQualPrefix); 101 } 102 commonPrefix -= commonQualPrefix; 103 // Common part in TS also? 104 if (commonPrefix > 0) { 105 int commonTimestampPrefix = Math.min(commonPrefix, KeyValue.TIMESTAMP_SIZE); 106 if (commonTimestampPrefix < KeyValue.TIMESTAMP_SIZE) { 107 byte[] curTsBuf = Bytes.toBytes(cell.getTimestamp()); 108 out.write(curTsBuf, commonTimestampPrefix, KeyValue.TIMESTAMP_SIZE 109 - commonTimestampPrefix); 110 } 111 commonPrefix -= commonTimestampPrefix; 112 if (commonPrefix == 0) { 113 out.writeByte(cell.getTypeByte()); 114 } 115 } else { 116 out.writeLong(cell.getTimestamp()); 117 out.writeByte(cell.getTypeByte()); 118 } 119 } 120 } 121 122 @Override 123 protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength, 124 int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException { 125 int decompressedSize = source.readInt(); 126 ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + 127 allocateHeaderLength); 128 buffer.position(allocateHeaderLength); 129 int prevKeyOffset = 0; 130 131 while (source.available() > skipLastBytes) { 132 prevKeyOffset = decodeKeyValue(source, buffer, prevKeyOffset); 133 afterDecodingKeyValue(source, buffer, decodingCtx); 134 } 135 136 if (source.available() != skipLastBytes) { 137 throw new IllegalStateException("Read too many bytes."); 138 } 139 140 buffer.limit(buffer.position()); 141 return buffer; 142 } 143 144 private int decodeKeyValue(DataInputStream source, ByteBuffer buffer, 145 int prevKeyOffset) 146 throws IOException, EncoderBufferTooSmallException { 147 int keyLength = ByteBufferUtils.readCompressedInt(source); 148 int valueLength = ByteBufferUtils.readCompressedInt(source); 149 int commonLength = ByteBufferUtils.readCompressedInt(source); 150 int keyOffset; 151 keyLength += commonLength; 152 153 ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET); 154 155 buffer.putInt(keyLength); 156 buffer.putInt(valueLength); 157 158 // copy the prefix 159 if (commonLength > 0) { 160 keyOffset = buffer.position(); 161 ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, prevKeyOffset, 162 commonLength); 163 } else { 164 keyOffset = buffer.position(); 165 } 166 167 // copy rest of the key and value 168 int len = keyLength - commonLength + valueLength; 169 ByteBufferUtils.copyFromStreamToBuffer(buffer, source, len); 170 return keyOffset; 171 } 172 173 @Override 174 public Cell getFirstKeyCellInBlock(ByteBuff block) { 175 block.mark(); 176 block.position(Bytes.SIZEOF_INT); 177 int keyLength = ByteBuff.readCompressedInt(block); 178 // TODO : See if we can avoid these reads as the read values are not getting used 179 ByteBuff.readCompressedInt(block); 180 int commonLength = ByteBuff.readCompressedInt(block); 181 if (commonLength != 0) { 182 throw new AssertionError("Nonzero common length in the first key in " 183 + "block: " + commonLength); 184 } 185 ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate(); 186 block.reset(); 187 return createFirstKeyCell(key, keyLength); 188 } 189 190 @Override 191 public String toString() { 192 return PrefixKeyDeltaEncoder.class.getSimpleName(); 193 } 194 195 @Override 196 public EncodedSeeker createSeeker(final HFileBlockDecodingContext decodingCtx) { 197 return new SeekerStateBufferedEncodedSeeker(decodingCtx); 198 } 199 200 private static class SeekerStateBufferedEncodedSeeker 201 extends BufferedEncodedSeeker<SeekerState> { 202 203 private SeekerStateBufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) { 204 super(decodingCtx); 205 } 206 207 @Override 208 protected void decodeNext() { 209 current.keyLength = ByteBuff.readCompressedInt(currentBuffer); 210 current.valueLength = ByteBuff.readCompressedInt(currentBuffer); 211 current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer); 212 current.keyLength += current.lastCommonPrefix; 213 current.ensureSpaceForKey(); 214 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, 215 current.keyLength - current.lastCommonPrefix); 216 current.valueOffset = currentBuffer.position(); 217 currentBuffer.skip(current.valueLength); 218 if (includesTags()) { 219 decodeTags(); 220 } 221 if (includesMvcc()) { 222 current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer); 223 } else { 224 current.memstoreTS = 0; 225 } 226 current.nextKvOffset = currentBuffer.position(); 227 } 228 229 @Override 230 protected void decodeFirst() { 231 currentBuffer.skip(Bytes.SIZEOF_INT); 232 decodeNext(); 233 } 234 } 235}