001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.IOException; 023import java.util.LinkedList; 024import java.util.Queue; 025 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellComparator; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.PrivateCellUtil; 030import org.apache.hadoop.hbase.KeyValueUtil; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034import org.apache.hadoop.hbase.regionserver.BloomType; 035import org.apache.hadoop.hbase.util.BloomFilterChunk; 036import org.apache.hadoop.hbase.util.BloomFilterUtil; 037import org.apache.hadoop.hbase.util.BloomFilterWriter; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.io.Writable; 040 041/** 042 * Adds methods required for writing a compound Bloom filter to the data 043 * section of an {@link org.apache.hadoop.hbase.io.hfile.HFile} to the 044 * {@link CompoundBloomFilter} class. 045 */ 046@InterfaceAudience.Private 047public class CompoundBloomFilterWriter extends CompoundBloomFilterBase 048 implements BloomFilterWriter, InlineBlockWriter { 049 050 private static final Logger LOG = 051 LoggerFactory.getLogger(CompoundBloomFilterWriter.class); 052 053 /** The current chunk being written to */ 054 private BloomFilterChunk chunk; 055 056 /** Previous chunk, so that we can create another similar chunk */ 057 private BloomFilterChunk prevChunk; 058 059 /** Maximum fold factor */ 060 private int maxFold; 061 062 /** The size of individual Bloom filter chunks to create */ 063 private int chunkByteSize; 064 /** The prev Cell that was processed */ 065 private Cell prevCell; 066 067 /** A Bloom filter chunk enqueued for writing */ 068 private static class ReadyChunk { 069 int chunkId; 070 byte[] firstKey; 071 BloomFilterChunk chunk; 072 } 073 074 private Queue<ReadyChunk> readyChunks = new LinkedList<>(); 075 076 /** The first key in the current Bloom filter chunk. */ 077 private byte[] firstKeyInChunk = null; 078 079 private HFileBlockIndex.BlockIndexWriter bloomBlockIndexWriter = 080 new HFileBlockIndex.BlockIndexWriter(); 081 082 /** Whether to cache-on-write compound Bloom filter chunks */ 083 private boolean cacheOnWrite; 084 085 private BloomType bloomType; 086 087 /** 088 * @param chunkByteSizeHint 089 * each chunk's size in bytes. The real chunk size might be different 090 * as required by the fold factor. 091 * @param errorRate 092 * target false positive rate 093 * @param hashType 094 * hash function type to use 095 * @param maxFold 096 * maximum degree of folding allowed 097 * @param bloomType 098 * the bloom type 099 */ 100 public CompoundBloomFilterWriter(int chunkByteSizeHint, float errorRate, 101 int hashType, int maxFold, boolean cacheOnWrite, 102 CellComparator comparator, BloomType bloomType) { 103 chunkByteSize = BloomFilterUtil.computeFoldableByteSize( 104 chunkByteSizeHint * 8L, maxFold); 105 106 this.errorRate = errorRate; 107 this.hashType = hashType; 108 this.maxFold = maxFold; 109 this.cacheOnWrite = cacheOnWrite; 110 this.comparator = comparator; 111 this.bloomType = bloomType; 112 } 113 114 @Override 115 public boolean shouldWriteBlock(boolean closing) { 116 enqueueReadyChunk(closing); 117 return !readyChunks.isEmpty(); 118 } 119 120 /** 121 * Enqueue the current chunk if it is ready to be written out. 122 * 123 * @param closing true if we are closing the file, so we do not expect new 124 * keys to show up 125 */ 126 private void enqueueReadyChunk(boolean closing) { 127 if (chunk == null || 128 (chunk.getKeyCount() < chunk.getMaxKeys() && !closing)) { 129 return; 130 } 131 132 if (firstKeyInChunk == null) { 133 throw new NullPointerException("Trying to enqueue a chunk, " + 134 "but first key is null: closing=" + closing + ", keyCount=" + 135 chunk.getKeyCount() + ", maxKeys=" + chunk.getMaxKeys()); 136 } 137 138 ReadyChunk readyChunk = new ReadyChunk(); 139 readyChunk.chunkId = numChunks - 1; 140 readyChunk.chunk = chunk; 141 readyChunk.firstKey = firstKeyInChunk; 142 readyChunks.add(readyChunk); 143 144 long prevMaxKeys = chunk.getMaxKeys(); 145 long prevByteSize = chunk.getByteSize(); 146 147 chunk.compactBloom(); 148 149 if (LOG.isTraceEnabled() && prevByteSize != chunk.getByteSize()) { 150 LOG.trace("Compacted Bloom chunk #" + readyChunk.chunkId + " from [" 151 + prevMaxKeys + " max keys, " + prevByteSize + " bytes] to [" 152 + chunk.getMaxKeys() + " max keys, " + chunk.getByteSize() 153 + " bytes]"); 154 } 155 156 totalMaxKeys += chunk.getMaxKeys(); 157 totalByteSize += chunk.getByteSize(); 158 159 firstKeyInChunk = null; 160 prevChunk = chunk; 161 chunk = null; 162 } 163 164 @Override 165 public void append(Cell cell) throws IOException { 166 if (cell == null) 167 throw new NullPointerException(); 168 169 enqueueReadyChunk(false); 170 171 if (chunk == null) { 172 if (firstKeyInChunk != null) { 173 throw new IllegalStateException("First key in chunk already set: " 174 + Bytes.toStringBinary(firstKeyInChunk)); 175 } 176 // This will be done only once per chunk 177 if (bloomType == BloomType.ROW) { 178 firstKeyInChunk = CellUtil.copyRow(cell); 179 } else { 180 firstKeyInChunk = 181 PrivateCellUtil 182 .getCellKeySerializedAsKeyValueKey(PrivateCellUtil.createFirstOnRowCol(cell)); 183 } 184 allocateNewChunk(); 185 } 186 187 chunk.add(cell); 188 this.prevCell = cell; 189 ++totalKeyCount; 190 } 191 192 @Override 193 public void beforeShipped() throws IOException { 194 if (this.prevCell != null) { 195 this.prevCell = KeyValueUtil.toNewKeyCell(this.prevCell); 196 } 197 } 198 199 @Override 200 public Cell getPrevCell() { 201 return this.prevCell; 202 } 203 204 private void allocateNewChunk() { 205 if (prevChunk == null) { 206 // First chunk 207 chunk = BloomFilterUtil.createBySize(chunkByteSize, errorRate, 208 hashType, maxFold, bloomType); 209 } else { 210 // Use the same parameters as the last chunk, but a new array and 211 // a zero key count. 212 chunk = prevChunk.createAnother(); 213 } 214 215 if (chunk.getKeyCount() != 0) { 216 throw new IllegalStateException("keyCount=" + chunk.getKeyCount() 217 + " > 0"); 218 } 219 220 chunk.allocBloom(); 221 ++numChunks; 222 } 223 @Override 224 public void writeInlineBlock(DataOutput out) throws IOException { 225 // We don't remove the chunk from the queue here, because we might need it 226 // again for cache-on-write. 227 ReadyChunk readyChunk = readyChunks.peek(); 228 229 BloomFilterChunk readyChunkBloom = readyChunk.chunk; 230 readyChunkBloom.writeBloom(out); 231 } 232 233 @Override 234 public void blockWritten(long offset, int onDiskSize, int uncompressedSize) { 235 ReadyChunk readyChunk = readyChunks.remove(); 236 bloomBlockIndexWriter.addEntry(readyChunk.firstKey, offset, onDiskSize); 237 } 238 239 @Override 240 public BlockType getInlineBlockType() { 241 return BlockType.BLOOM_CHUNK; 242 } 243 244 private class MetaWriter implements Writable { 245 protected MetaWriter() {} 246 247 @Override 248 public void readFields(DataInput in) throws IOException { 249 throw new IOException("Cant read with this class."); 250 } 251 252 /** 253 * This is modeled after {@link CompoundBloomFilterWriter.MetaWriter} for simplicity, 254 * although the two metadata formats do not have to be consistent. This 255 * does have to be consistent with how {@link 256 * CompoundBloomFilter#CompoundBloomFilter(DataInput, 257 * org.apache.hadoop.hbase.io.hfile.HFile.Reader)} reads fields. 258 */ 259 @Override 260 public void write(DataOutput out) throws IOException { 261 out.writeInt(VERSION); 262 263 out.writeLong(getByteSize()); 264 out.writeInt(prevChunk.getHashCount()); 265 out.writeInt(prevChunk.getHashType()); 266 out.writeLong(getKeyCount()); 267 out.writeLong(getMaxKeys()); 268 269 // Fields that don't have equivalents in ByteBloomFilter. 270 out.writeInt(numChunks); 271 if (comparator != null) { 272 Bytes.writeByteArray(out, Bytes.toBytes(comparator.getClass().getName())); 273 } else { 274 // Internally writes a 0 vint if the byte[] is null 275 Bytes.writeByteArray(out, null); 276 } 277 278 // Write a single-level index without compression or block header. 279 bloomBlockIndexWriter.writeSingleLevelIndex(out, "Bloom filter"); 280 } 281 } 282 283 @Override 284 public void compactBloom() { 285 } 286 287 @Override 288 public Writable getMetaWriter() { 289 return new MetaWriter(); 290 } 291 292 @Override 293 public Writable getDataWriter() { 294 return null; 295 } 296 297 @Override 298 public boolean getCacheOnWrite() { 299 return cacheOnWrite; 300 } 301}