001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.IOException; 023import java.util.ArrayDeque; 024import java.util.Objects; 025import java.util.Queue; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellComparator; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.KeyValueUtil; 030import org.apache.hadoop.hbase.PrivateCellUtil; 031import org.apache.hadoop.hbase.regionserver.BloomType; 032import org.apache.hadoop.hbase.util.BloomFilterChunk; 033import org.apache.hadoop.hbase.util.BloomFilterUtil; 034import org.apache.hadoop.hbase.util.BloomFilterWriter; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.io.Writable; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * Adds methods required for writing a compound Bloom filter to the data section of an 043 * {@link org.apache.hadoop.hbase.io.hfile.HFile} to the {@link CompoundBloomFilter} class. 044 */ 045@InterfaceAudience.Private 046public class CompoundBloomFilterWriter extends CompoundBloomFilterBase 047 implements BloomFilterWriter, InlineBlockWriter { 048 049 private static final Logger LOG = LoggerFactory.getLogger(CompoundBloomFilterWriter.class); 050 051 /** The current chunk being written to */ 052 private BloomFilterChunk chunk; 053 054 /** Previous chunk, so that we can create another similar chunk */ 055 private BloomFilterChunk prevChunk; 056 057 /** Maximum fold factor */ 058 private int maxFold; 059 060 /** The size of individual Bloom filter chunks to create */ 061 private int chunkByteSize; 062 /** The prev Cell that was processed */ 063 private Cell prevCell; 064 065 /** A Bloom filter chunk enqueued for writing */ 066 private static class ReadyChunk { 067 int chunkId; 068 byte[] firstKey; 069 BloomFilterChunk chunk; 070 } 071 072 private Queue<ReadyChunk> readyChunks = new ArrayDeque<>(); 073 074 /** The first key in the current Bloom filter chunk. */ 075 private byte[] firstKeyInChunk = null; 076 077 private HFileBlockIndex.BlockIndexWriter bloomBlockIndexWriter = 078 new HFileBlockIndex.BlockIndexWriter(); 079 080 /** Whether to cache-on-write compound Bloom filter chunks */ 081 private boolean cacheOnWrite; 082 083 private BloomType bloomType; 084 085 /** 086 * each chunk's size in bytes. The real chunk size might be different as required by the fold 087 * factor. target false positive rate hash function type to use maximum degree of folding allowed 088 * the bloom type 089 */ 090 public CompoundBloomFilterWriter(int chunkByteSizeHint, float errorRate, int hashType, 091 int maxFold, boolean cacheOnWrite, CellComparator comparator, BloomType bloomType) { 092 chunkByteSize = BloomFilterUtil.computeFoldableByteSize(chunkByteSizeHint * 8L, maxFold); 093 094 this.errorRate = errorRate; 095 this.hashType = hashType; 096 this.maxFold = maxFold; 097 this.cacheOnWrite = cacheOnWrite; 098 this.comparator = comparator; 099 this.bloomType = bloomType; 100 } 101 102 @Override 103 public boolean shouldWriteBlock(boolean closing) { 104 enqueueReadyChunk(closing); 105 return !readyChunks.isEmpty(); 106 } 107 108 /** 109 * Enqueue the current chunk if it is ready to be written out. 110 * @param closing true if we are closing the file, so we do not expect new keys to show up 111 */ 112 private void enqueueReadyChunk(boolean closing) { 113 if (chunk == null || (chunk.getKeyCount() < chunk.getMaxKeys() && !closing)) { 114 return; 115 } 116 117 if (firstKeyInChunk == null) { 118 throw new NullPointerException( 119 "Trying to enqueue a chunk, " + "but first key is null: closing=" + closing + ", keyCount=" 120 + chunk.getKeyCount() + ", maxKeys=" + chunk.getMaxKeys()); 121 } 122 123 ReadyChunk readyChunk = new ReadyChunk(); 124 readyChunk.chunkId = numChunks - 1; 125 readyChunk.chunk = chunk; 126 readyChunk.firstKey = firstKeyInChunk; 127 readyChunks.add(readyChunk); 128 129 long prevMaxKeys = chunk.getMaxKeys(); 130 long prevByteSize = chunk.getByteSize(); 131 132 chunk.compactBloom(); 133 134 if (LOG.isTraceEnabled() && prevByteSize != chunk.getByteSize()) { 135 LOG.trace("Compacted Bloom chunk #" + readyChunk.chunkId + " from [" + prevMaxKeys 136 + " max keys, " + prevByteSize + " bytes] to [" + chunk.getMaxKeys() + " max keys, " 137 + chunk.getByteSize() + " bytes]"); 138 } 139 140 totalMaxKeys += chunk.getMaxKeys(); 141 totalByteSize += chunk.getByteSize(); 142 143 firstKeyInChunk = null; 144 prevChunk = chunk; 145 chunk = null; 146 } 147 148 @Override 149 public void append(Cell cell) throws IOException { 150 Objects.requireNonNull(cell); 151 152 enqueueReadyChunk(false); 153 154 if (chunk == null) { 155 if (firstKeyInChunk != null) { 156 throw new IllegalStateException( 157 "First key in chunk already set: " + Bytes.toStringBinary(firstKeyInChunk)); 158 } 159 // This will be done only once per chunk 160 if (bloomType == BloomType.ROWCOL) { 161 firstKeyInChunk = PrivateCellUtil 162 .getCellKeySerializedAsKeyValueKey(PrivateCellUtil.createFirstOnRowCol(cell)); 163 } else { 164 firstKeyInChunk = CellUtil.copyRow(cell); 165 } 166 allocateNewChunk(); 167 } 168 169 chunk.add(cell); 170 this.prevCell = cell; 171 ++totalKeyCount; 172 } 173 174 @Override 175 public void beforeShipped() throws IOException { 176 if (this.prevCell != null) { 177 this.prevCell = KeyValueUtil.toNewKeyCell(this.prevCell); 178 } 179 } 180 181 @Override 182 public Cell getPrevCell() { 183 return this.prevCell; 184 } 185 186 private void allocateNewChunk() { 187 if (prevChunk == null) { 188 // First chunk 189 chunk = BloomFilterUtil.createBySize(chunkByteSize, errorRate, hashType, maxFold, bloomType); 190 } else { 191 // Use the same parameters as the last chunk, but a new array and 192 // a zero key count. 193 chunk = prevChunk.createAnother(); 194 } 195 196 if (chunk.getKeyCount() != 0) { 197 throw new IllegalStateException("keyCount=" + chunk.getKeyCount() + " > 0"); 198 } 199 200 chunk.allocBloom(); 201 ++numChunks; 202 } 203 204 @Override 205 public void writeInlineBlock(DataOutput out) throws IOException { 206 // We don't remove the chunk from the queue here, because we might need it 207 // again for cache-on-write. 208 ReadyChunk readyChunk = readyChunks.peek(); 209 210 BloomFilterChunk readyChunkBloom = readyChunk.chunk; 211 readyChunkBloom.writeBloom(out); 212 } 213 214 @Override 215 public void blockWritten(long offset, int onDiskSize, int uncompressedSize) { 216 ReadyChunk readyChunk = readyChunks.remove(); 217 bloomBlockIndexWriter.addEntry(readyChunk.firstKey, offset, onDiskSize); 218 } 219 220 @Override 221 public BlockType getInlineBlockType() { 222 return BlockType.BLOOM_CHUNK; 223 } 224 225 private class MetaWriter implements Writable { 226 protected MetaWriter() { 227 } 228 229 @Override 230 public void readFields(DataInput in) throws IOException { 231 throw new IOException("Cant read with this class."); 232 } 233 234 /** 235 * This is modeled after {@link CompoundBloomFilterWriter.MetaWriter} for simplicity, although 236 * the two metadata formats do not have to be consistent. This does have to be consistent with 237 * how 238 * {@link CompoundBloomFilter#CompoundBloomFilter(DataInput, org.apache.hadoop.hbase.io.hfile.HFile.Reader, BloomFilterMetrics)} 239 * reads fields. 240 */ 241 @Override 242 public void write(DataOutput out) throws IOException { 243 out.writeInt(VERSION); 244 245 out.writeLong(getByteSize()); 246 out.writeInt(prevChunk.getHashCount()); 247 out.writeInt(prevChunk.getHashType()); 248 out.writeLong(getKeyCount()); 249 out.writeLong(getMaxKeys()); 250 251 // Fields that don't have equivalents in ByteBloomFilter. 252 out.writeInt(numChunks); 253 if (comparator != null) { 254 Bytes.writeByteArray(out, Bytes.toBytes(comparator.getClass().getName())); 255 } else { 256 // Internally writes a 0 vint if the byte[] is null 257 Bytes.writeByteArray(out, null); 258 } 259 260 // Write a single-level index without compression or block header. 261 bloomBlockIndexWriter.writeSingleLevelIndex(out, "Bloom filter"); 262 } 263 } 264 265 @Override 266 public void compactBloom() { 267 } 268 269 @Override 270 public Writable getMetaWriter() { 271 return new MetaWriter(); 272 } 273 274 @Override 275 public Writable getDataWriter() { 276 return null; 277 } 278 279 @Override 280 public boolean getCacheOnWrite() { 281 return cacheOnWrite; 282 } 283}