001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.IOException; 023import java.util.LinkedList; 024import java.util.Queue; 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.CellComparator; 027import org.apache.hadoop.hbase.CellUtil; 028import org.apache.hadoop.hbase.KeyValueUtil; 029import org.apache.hadoop.hbase.PrivateCellUtil; 030import org.apache.hadoop.hbase.regionserver.BloomType; 031import org.apache.hadoop.hbase.util.BloomFilterChunk; 032import org.apache.hadoop.hbase.util.BloomFilterUtil; 033import org.apache.hadoop.hbase.util.BloomFilterWriter; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.io.Writable; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * Adds methods required for writing a compound Bloom filter to the data section of an 042 * {@link org.apache.hadoop.hbase.io.hfile.HFile} to the {@link CompoundBloomFilter} class. 043 */ 044@InterfaceAudience.Private 045public class CompoundBloomFilterWriter extends CompoundBloomFilterBase 046 implements BloomFilterWriter, InlineBlockWriter { 047 048 private static final Logger LOG = LoggerFactory.getLogger(CompoundBloomFilterWriter.class); 049 050 /** The current chunk being written to */ 051 private BloomFilterChunk chunk; 052 053 /** Previous chunk, so that we can create another similar chunk */ 054 private BloomFilterChunk prevChunk; 055 056 /** Maximum fold factor */ 057 private int maxFold; 058 059 /** The size of individual Bloom filter chunks to create */ 060 private int chunkByteSize; 061 /** The prev Cell that was processed */ 062 private Cell prevCell; 063 064 /** A Bloom filter chunk enqueued for writing */ 065 private static class ReadyChunk { 066 int chunkId; 067 byte[] firstKey; 068 BloomFilterChunk chunk; 069 } 070 071 private Queue<ReadyChunk> readyChunks = new LinkedList<>(); 072 073 /** The first key in the current Bloom filter chunk. */ 074 private byte[] firstKeyInChunk = null; 075 076 private HFileBlockIndex.BlockIndexWriter bloomBlockIndexWriter = 077 new HFileBlockIndex.BlockIndexWriter(); 078 079 /** Whether to cache-on-write compound Bloom filter chunks */ 080 private boolean cacheOnWrite; 081 082 private BloomType bloomType; 083 084 /** 085 * each chunk's size in bytes. The real chunk size might be different as required by the fold 086 * factor. target false positive rate hash function type to use maximum degree of folding allowed 087 * the bloom type 088 */ 089 public CompoundBloomFilterWriter(int chunkByteSizeHint, float errorRate, int hashType, 090 int maxFold, boolean cacheOnWrite, CellComparator comparator, BloomType bloomType) { 091 chunkByteSize = BloomFilterUtil.computeFoldableByteSize(chunkByteSizeHint * 8L, maxFold); 092 093 this.errorRate = errorRate; 094 this.hashType = hashType; 095 this.maxFold = maxFold; 096 this.cacheOnWrite = cacheOnWrite; 097 this.comparator = comparator; 098 this.bloomType = bloomType; 099 } 100 101 @Override 102 public boolean shouldWriteBlock(boolean closing) { 103 enqueueReadyChunk(closing); 104 return !readyChunks.isEmpty(); 105 } 106 107 /** 108 * Enqueue the current chunk if it is ready to be written out. 109 * @param closing true if we are closing the file, so we do not expect new keys to show up 110 */ 111 private void enqueueReadyChunk(boolean closing) { 112 if (chunk == null || (chunk.getKeyCount() < chunk.getMaxKeys() && !closing)) { 113 return; 114 } 115 116 if (firstKeyInChunk == null) { 117 throw new NullPointerException( 118 "Trying to enqueue a chunk, " + "but first key is null: closing=" + closing + ", keyCount=" 119 + chunk.getKeyCount() + ", maxKeys=" + chunk.getMaxKeys()); 120 } 121 122 ReadyChunk readyChunk = new ReadyChunk(); 123 readyChunk.chunkId = numChunks - 1; 124 readyChunk.chunk = chunk; 125 readyChunk.firstKey = firstKeyInChunk; 126 readyChunks.add(readyChunk); 127 128 long prevMaxKeys = chunk.getMaxKeys(); 129 long prevByteSize = chunk.getByteSize(); 130 131 chunk.compactBloom(); 132 133 if (LOG.isTraceEnabled() && prevByteSize != chunk.getByteSize()) { 134 LOG.trace("Compacted Bloom chunk #" + readyChunk.chunkId + " from [" + prevMaxKeys 135 + " max keys, " + prevByteSize + " bytes] to [" + chunk.getMaxKeys() + " max keys, " 136 + chunk.getByteSize() + " bytes]"); 137 } 138 139 totalMaxKeys += chunk.getMaxKeys(); 140 totalByteSize += chunk.getByteSize(); 141 142 firstKeyInChunk = null; 143 prevChunk = chunk; 144 chunk = null; 145 } 146 147 @Override 148 public void append(Cell cell) throws IOException { 149 if (cell == null) throw new NullPointerException(); 150 151 enqueueReadyChunk(false); 152 153 if (chunk == null) { 154 if (firstKeyInChunk != null) { 155 throw new IllegalStateException( 156 "First key in chunk already set: " + Bytes.toStringBinary(firstKeyInChunk)); 157 } 158 // This will be done only once per chunk 159 if (bloomType == BloomType.ROWCOL) { 160 firstKeyInChunk = PrivateCellUtil 161 .getCellKeySerializedAsKeyValueKey(PrivateCellUtil.createFirstOnRowCol(cell)); 162 } else { 163 firstKeyInChunk = CellUtil.copyRow(cell); 164 } 165 allocateNewChunk(); 166 } 167 168 chunk.add(cell); 169 this.prevCell = cell; 170 ++totalKeyCount; 171 } 172 173 @Override 174 public void beforeShipped() throws IOException { 175 if (this.prevCell != null) { 176 this.prevCell = KeyValueUtil.toNewKeyCell(this.prevCell); 177 } 178 } 179 180 @Override 181 public Cell getPrevCell() { 182 return this.prevCell; 183 } 184 185 private void allocateNewChunk() { 186 if (prevChunk == null) { 187 // First chunk 188 chunk = BloomFilterUtil.createBySize(chunkByteSize, errorRate, hashType, maxFold, bloomType); 189 } else { 190 // Use the same parameters as the last chunk, but a new array and 191 // a zero key count. 192 chunk = prevChunk.createAnother(); 193 } 194 195 if (chunk.getKeyCount() != 0) { 196 throw new IllegalStateException("keyCount=" + chunk.getKeyCount() + " > 0"); 197 } 198 199 chunk.allocBloom(); 200 ++numChunks; 201 } 202 203 @Override 204 public void writeInlineBlock(DataOutput out) throws IOException { 205 // We don't remove the chunk from the queue here, because we might need it 206 // again for cache-on-write. 207 ReadyChunk readyChunk = readyChunks.peek(); 208 209 BloomFilterChunk readyChunkBloom = readyChunk.chunk; 210 readyChunkBloom.writeBloom(out); 211 } 212 213 @Override 214 public void blockWritten(long offset, int onDiskSize, int uncompressedSize) { 215 ReadyChunk readyChunk = readyChunks.remove(); 216 bloomBlockIndexWriter.addEntry(readyChunk.firstKey, offset, onDiskSize); 217 } 218 219 @Override 220 public BlockType getInlineBlockType() { 221 return BlockType.BLOOM_CHUNK; 222 } 223 224 private class MetaWriter implements Writable { 225 protected MetaWriter() { 226 } 227 228 @Override 229 public void readFields(DataInput in) throws IOException { 230 throw new IOException("Cant read with this class."); 231 } 232 233 /** 234 * This is modeled after {@link CompoundBloomFilterWriter.MetaWriter} for simplicity, although 235 * the two metadata formats do not have to be consistent. This does have to be consistent with 236 * how 237 * {@link CompoundBloomFilter#CompoundBloomFilter(DataInput, org.apache.hadoop.hbase.io.hfile.HFile.Reader, BloomFilterMetrics)} 238 * reads fields. 239 */ 240 @Override 241 public void write(DataOutput out) throws IOException { 242 out.writeInt(VERSION); 243 244 out.writeLong(getByteSize()); 245 out.writeInt(prevChunk.getHashCount()); 246 out.writeInt(prevChunk.getHashType()); 247 out.writeLong(getKeyCount()); 248 out.writeLong(getMaxKeys()); 249 250 // Fields that don't have equivalents in ByteBloomFilter. 251 out.writeInt(numChunks); 252 if (comparator != null) { 253 Bytes.writeByteArray(out, Bytes.toBytes(comparator.getClass().getName())); 254 } else { 255 // Internally writes a 0 vint if the byte[] is null 256 Bytes.writeByteArray(out, null); 257 } 258 259 // Write a single-level index without compression or block header. 260 bloomBlockIndexWriter.writeSingleLevelIndex(out, "Bloom filter"); 261 } 262 } 263 264 @Override 265 public void compactBloom() { 266 } 267 268 @Override 269 public Writable getMetaWriter() { 270 return new MetaWriter(); 271 } 272 273 @Override 274 public Writable getDataWriter() { 275 return null; 276 } 277 278 @Override 279 public boolean getCacheOnWrite() { 280 return cacheOnWrite; 281 } 282}