001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with this 004 * work for additional information regarding copyright ownership. The ASF 005 * licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 014 * License for the specific language governing permissions and limitations 015 * under the License. 016 */ 017package org.apache.hadoop.hbase.io.encoding; 018 019import java.io.DataInputStream; 020import java.io.DataOutput; 021import java.io.DataOutputStream; 022import java.io.IOException; 023import java.nio.ByteBuffer; 024 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.CellComparator; 027import org.apache.hadoop.hbase.KeyValue; 028import org.apache.hadoop.hbase.KeyValueUtil; 029import org.apache.hadoop.hbase.PrivateCellUtil; 030import org.apache.hadoop.hbase.nio.ByteBuff; 031import org.apache.hadoop.hbase.util.ByteBufferUtils; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.yetus.audience.InterfaceAudience; 034 035/** 036 * Compress key by storing size of common prefix with previous KeyValue 037 * and storing raw size of rest. 038 * 039 * Format: 040 * 1-5 bytes: compressed key length minus prefix (7-bit encoding) 041 * 1-5 bytes: compressed value length (7-bit encoding) 042 * 1-3 bytes: compressed length of common key prefix 043 * ... bytes: rest of key (including timestamp) 044 * ... bytes: value 045 * 046 * In a worst case compressed KeyValue will be three bytes longer than original. 047 * 048 */ 049@InterfaceAudience.Private 050public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder { 051 052 @Override 053 public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext, 054 DataOutputStream out) throws IOException { 055 int klength = KeyValueUtil.keyLength(cell); 056 int vlength = cell.getValueLength(); 057 EncodingState state = encodingContext.getEncodingState(); 058 if (state.prevCell == null) { 059 // copy the key, there is no common prefix with none 060 ByteBufferUtils.putCompressedInt(out, klength); 061 ByteBufferUtils.putCompressedInt(out, vlength); 062 ByteBufferUtils.putCompressedInt(out, 0); 063 PrivateCellUtil.writeFlatKey(cell, (DataOutput)out); 064 } else { 065 // find a common prefix and skip it 066 int common = PrivateCellUtil.findCommonPrefixInFlatKey(cell, state.prevCell, true, true); 067 ByteBufferUtils.putCompressedInt(out, klength - common); 068 ByteBufferUtils.putCompressedInt(out, vlength); 069 ByteBufferUtils.putCompressedInt(out, common); 070 writeKeyExcludingCommon(cell, common, out); 071 } 072 // Write the value part 073 PrivateCellUtil.writeValue(out, cell, vlength); 074 int size = klength + vlength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE; 075 size += afterEncodingKeyValue(cell, out, encodingContext); 076 state.prevCell = cell; 077 return size; 078 } 079 080 private void writeKeyExcludingCommon(Cell cell, int commonPrefix, DataOutputStream out) 081 throws IOException { 082 short rLen = cell.getRowLength(); 083 if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) { 084 // Previous and current rows are different. Need to write the differing part followed by 085 // cf,q,ts and type 086 PrivateCellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out); 087 byte fLen = cell.getFamilyLength(); 088 out.writeByte(fLen); 089 PrivateCellUtil.writeFamily(out, cell, fLen); 090 PrivateCellUtil.writeQualifier(out, cell, cell.getQualifierLength()); 091 out.writeLong(cell.getTimestamp()); 092 out.writeByte(cell.getTypeByte()); 093 } else { 094 // The full row key part is common. CF part will be common for sure as we deal with Cells in 095 // same family. Just need write the differing part in q, ts and type 096 commonPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE) 097 - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE); 098 int qLen = cell.getQualifierLength(); 099 int commonQualPrefix = Math.min(commonPrefix, qLen); 100 int qualPartLenToWrite = qLen - commonQualPrefix; 101 if (qualPartLenToWrite > 0) { 102 PrivateCellUtil.writeQualifierSkippingBytes(out, cell, qLen, commonQualPrefix); 103 } 104 commonPrefix -= commonQualPrefix; 105 // Common part in TS also? 106 if (commonPrefix > 0) { 107 int commonTimestampPrefix = Math.min(commonPrefix, KeyValue.TIMESTAMP_SIZE); 108 if (commonTimestampPrefix < KeyValue.TIMESTAMP_SIZE) { 109 byte[] curTsBuf = Bytes.toBytes(cell.getTimestamp()); 110 out.write(curTsBuf, commonTimestampPrefix, KeyValue.TIMESTAMP_SIZE 111 - commonTimestampPrefix); 112 } 113 commonPrefix -= commonTimestampPrefix; 114 if (commonPrefix == 0) { 115 out.writeByte(cell.getTypeByte()); 116 } 117 } else { 118 out.writeLong(cell.getTimestamp()); 119 out.writeByte(cell.getTypeByte()); 120 } 121 } 122 } 123 124 @Override 125 protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength, 126 int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException { 127 int decompressedSize = source.readInt(); 128 ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + 129 allocateHeaderLength); 130 buffer.position(allocateHeaderLength); 131 int prevKeyOffset = 0; 132 133 while (source.available() > skipLastBytes) { 134 prevKeyOffset = decodeKeyValue(source, buffer, prevKeyOffset); 135 afterDecodingKeyValue(source, buffer, decodingCtx); 136 } 137 138 if (source.available() != skipLastBytes) { 139 throw new IllegalStateException("Read too many bytes."); 140 } 141 142 buffer.limit(buffer.position()); 143 return buffer; 144 } 145 146 private int decodeKeyValue(DataInputStream source, ByteBuffer buffer, 147 int prevKeyOffset) 148 throws IOException, EncoderBufferTooSmallException { 149 int keyLength = ByteBufferUtils.readCompressedInt(source); 150 int valueLength = ByteBufferUtils.readCompressedInt(source); 151 int commonLength = ByteBufferUtils.readCompressedInt(source); 152 int keyOffset; 153 keyLength += commonLength; 154 155 ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET); 156 157 buffer.putInt(keyLength); 158 buffer.putInt(valueLength); 159 160 // copy the prefix 161 if (commonLength > 0) { 162 keyOffset = buffer.position(); 163 ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, prevKeyOffset, 164 commonLength); 165 } else { 166 keyOffset = buffer.position(); 167 } 168 169 // copy rest of the key and value 170 int len = keyLength - commonLength + valueLength; 171 ByteBufferUtils.copyFromStreamToBuffer(buffer, source, len); 172 return keyOffset; 173 } 174 175 @Override 176 public Cell getFirstKeyCellInBlock(ByteBuff block) { 177 block.mark(); 178 block.position(Bytes.SIZEOF_INT); 179 int keyLength = ByteBuff.readCompressedInt(block); 180 // TODO : See if we can avoid these reads as the read values are not getting used 181 ByteBuff.readCompressedInt(block); 182 int commonLength = ByteBuff.readCompressedInt(block); 183 if (commonLength != 0) { 184 throw new AssertionError("Nonzero common length in the first key in " 185 + "block: " + commonLength); 186 } 187 ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate(); 188 block.reset(); 189 return createFirstKeyCell(key, keyLength); 190 } 191 192 @Override 193 public String toString() { 194 return PrefixKeyDeltaEncoder.class.getSimpleName(); 195 } 196 197 @Override 198 public EncodedSeeker createSeeker(CellComparator comparator, 199 final HFileBlockDecodingContext decodingCtx) { 200 return new BufferedEncodedSeeker<SeekerState>(comparator, decodingCtx) { 201 @Override 202 protected void decodeNext() { 203 current.keyLength = ByteBuff.readCompressedInt(currentBuffer); 204 current.valueLength = ByteBuff.readCompressedInt(currentBuffer); 205 current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer); 206 current.keyLength += current.lastCommonPrefix; 207 current.ensureSpaceForKey(); 208 currentBuffer.get(current.keyBuffer, current.lastCommonPrefix, 209 current.keyLength - current.lastCommonPrefix); 210 current.valueOffset = currentBuffer.position(); 211 currentBuffer.skip(current.valueLength); 212 if (includesTags()) { 213 decodeTags(); 214 } 215 if (includesMvcc()) { 216 current.memstoreTS = ByteBuff.readVLong(currentBuffer); 217 } else { 218 current.memstoreTS = 0; 219 } 220 current.nextKvOffset = currentBuffer.position(); 221 } 222 223 @Override 224 protected void decodeFirst() { 225 currentBuffer.skip(Bytes.SIZEOF_INT); 226 decodeNext(); 227 } 228 }; 229 } 230}