001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.IOException; 023import java.util.ArrayDeque; 024import java.util.Objects; 025import java.util.Queue; 026import org.apache.hadoop.hbase.Cell; 027import org.apache.hadoop.hbase.CellComparator; 028import org.apache.hadoop.hbase.CellUtil; 029import org.apache.hadoop.hbase.ExtendedCell; 030import org.apache.hadoop.hbase.KeyValueUtil; 031import org.apache.hadoop.hbase.PrivateCellUtil; 032import org.apache.hadoop.hbase.regionserver.BloomType; 033import org.apache.hadoop.hbase.util.BloomFilterChunk; 034import org.apache.hadoop.hbase.util.BloomFilterUtil; 035import org.apache.hadoop.hbase.util.BloomFilterWriter; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.io.Writable; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * Adds methods required for writing a compound Bloom filter to the data section of an 044 * {@link org.apache.hadoop.hbase.io.hfile.HFile} to the {@link CompoundBloomFilter} class. 045 */ 046@InterfaceAudience.Private 047public class CompoundBloomFilterWriter extends CompoundBloomFilterBase 048 implements BloomFilterWriter, InlineBlockWriter { 049 050 private static final Logger LOG = LoggerFactory.getLogger(CompoundBloomFilterWriter.class); 051 052 /** The current chunk being written to */ 053 private BloomFilterChunk chunk; 054 055 /** Previous chunk, so that we can create another similar chunk */ 056 private BloomFilterChunk prevChunk; 057 058 /** Maximum fold factor */ 059 private int maxFold; 060 061 /** The size of individual Bloom filter chunks to create */ 062 private int chunkByteSize; 063 /** The prev Cell that was processed */ 064 private ExtendedCell prevCell; 065 066 /** A Bloom filter chunk enqueued for writing */ 067 private static class ReadyChunk { 068 int chunkId; 069 byte[] firstKey; 070 BloomFilterChunk chunk; 071 } 072 073 private Queue<ReadyChunk> readyChunks = new ArrayDeque<>(); 074 075 /** The first key in the current Bloom filter chunk. */ 076 private byte[] firstKeyInChunk = null; 077 078 private HFileBlockIndex.BlockIndexWriter bloomBlockIndexWriter = 079 new HFileBlockIndex.BlockIndexWriter(); 080 081 /** Whether to cache-on-write compound Bloom filter chunks */ 082 private boolean cacheOnWrite; 083 084 private BloomType bloomType; 085 086 /** 087 * each chunk's size in bytes. The real chunk size might be different as required by the fold 088 * factor. target false positive rate hash function type to use maximum degree of folding allowed 089 * the bloom type 090 */ 091 public CompoundBloomFilterWriter(int chunkByteSizeHint, float errorRate, int hashType, 092 int maxFold, boolean cacheOnWrite, CellComparator comparator, BloomType bloomType) { 093 chunkByteSize = BloomFilterUtil.computeFoldableByteSize(chunkByteSizeHint * 8L, maxFold); 094 095 this.errorRate = errorRate; 096 this.hashType = hashType; 097 this.maxFold = maxFold; 098 this.cacheOnWrite = cacheOnWrite; 099 this.comparator = comparator; 100 this.bloomType = bloomType; 101 } 102 103 @Override 104 public boolean shouldWriteBlock(boolean closing) { 105 enqueueReadyChunk(closing); 106 return !readyChunks.isEmpty(); 107 } 108 109 /** 110 * Enqueue the current chunk if it is ready to be written out. 111 * @param closing true if we are closing the file, so we do not expect new keys to show up 112 */ 113 private void enqueueReadyChunk(boolean closing) { 114 if (chunk == null || (chunk.getKeyCount() < chunk.getMaxKeys() && !closing)) { 115 return; 116 } 117 118 if (firstKeyInChunk == null) { 119 throw new NullPointerException( 120 "Trying to enqueue a chunk, " + "but first key is null: closing=" + closing + ", keyCount=" 121 + chunk.getKeyCount() + ", maxKeys=" + chunk.getMaxKeys()); 122 } 123 124 ReadyChunk readyChunk = new ReadyChunk(); 125 readyChunk.chunkId = numChunks - 1; 126 readyChunk.chunk = chunk; 127 readyChunk.firstKey = firstKeyInChunk; 128 readyChunks.add(readyChunk); 129 130 long prevMaxKeys = chunk.getMaxKeys(); 131 long prevByteSize = chunk.getByteSize(); 132 133 chunk.compactBloom(); 134 135 if (LOG.isTraceEnabled() && prevByteSize != chunk.getByteSize()) { 136 LOG.trace("Compacted Bloom chunk #" + readyChunk.chunkId + " from [" + prevMaxKeys 137 + " max keys, " + prevByteSize + " bytes] to [" + chunk.getMaxKeys() + " max keys, " 138 + chunk.getByteSize() + " bytes]"); 139 } 140 141 totalMaxKeys += chunk.getMaxKeys(); 142 totalByteSize += chunk.getByteSize(); 143 144 firstKeyInChunk = null; 145 prevChunk = chunk; 146 chunk = null; 147 } 148 149 @Override 150 public void append(ExtendedCell cell) throws IOException { 151 Objects.requireNonNull(cell); 152 153 enqueueReadyChunk(false); 154 155 if (chunk == null) { 156 if (firstKeyInChunk != null) { 157 throw new IllegalStateException( 158 "First key in chunk already set: " + Bytes.toStringBinary(firstKeyInChunk)); 159 } 160 // This will be done only once per chunk 161 if (bloomType == BloomType.ROWCOL) { 162 firstKeyInChunk = PrivateCellUtil 163 .getCellKeySerializedAsKeyValueKey(PrivateCellUtil.createFirstOnRowCol(cell)); 164 } else { 165 firstKeyInChunk = CellUtil.copyRow(cell); 166 } 167 allocateNewChunk(); 168 } 169 170 chunk.add(cell); 171 this.prevCell = cell; 172 ++totalKeyCount; 173 } 174 175 @Override 176 public void beforeShipped() throws IOException { 177 if (this.prevCell != null) { 178 this.prevCell = KeyValueUtil.toNewKeyCell(this.prevCell); 179 } 180 } 181 182 @Override 183 public Cell getPrevCell() { 184 return this.prevCell; 185 } 186 187 private void allocateNewChunk() { 188 if (prevChunk == null) { 189 // First chunk 190 chunk = BloomFilterUtil.createBySize(chunkByteSize, errorRate, hashType, maxFold, bloomType); 191 } else { 192 // Use the same parameters as the last chunk, but a new array and 193 // a zero key count. 194 chunk = prevChunk.createAnother(); 195 } 196 197 if (chunk.getKeyCount() != 0) { 198 throw new IllegalStateException("keyCount=" + chunk.getKeyCount() + " > 0"); 199 } 200 201 chunk.allocBloom(); 202 ++numChunks; 203 } 204 205 @Override 206 public void writeInlineBlock(DataOutput out) throws IOException { 207 // We don't remove the chunk from the queue here, because we might need it 208 // again for cache-on-write. 209 ReadyChunk readyChunk = readyChunks.peek(); 210 211 BloomFilterChunk readyChunkBloom = readyChunk.chunk; 212 readyChunkBloom.writeBloom(out); 213 } 214 215 @Override 216 public void blockWritten(long offset, int onDiskSize, int uncompressedSize) { 217 ReadyChunk readyChunk = readyChunks.remove(); 218 bloomBlockIndexWriter.addEntry(readyChunk.firstKey, offset, onDiskSize); 219 } 220 221 @Override 222 public BlockType getInlineBlockType() { 223 return BlockType.BLOOM_CHUNK; 224 } 225 226 private class MetaWriter implements Writable { 227 protected MetaWriter() { 228 } 229 230 @Override 231 public void readFields(DataInput in) throws IOException { 232 throw new IOException("Cant read with this class."); 233 } 234 235 /** 236 * This is modeled after {@link CompoundBloomFilterWriter.MetaWriter} for simplicity, although 237 * the two metadata formats do not have to be consistent. This does have to be consistent with 238 * how 239 * {@link CompoundBloomFilter#CompoundBloomFilter(DataInput, org.apache.hadoop.hbase.io.hfile.HFile.Reader, BloomFilterMetrics)} 240 * reads fields. 241 */ 242 @Override 243 public void write(DataOutput out) throws IOException { 244 out.writeInt(VERSION); 245 246 out.writeLong(getByteSize()); 247 out.writeInt(prevChunk.getHashCount()); 248 out.writeInt(prevChunk.getHashType()); 249 out.writeLong(getKeyCount()); 250 out.writeLong(getMaxKeys()); 251 252 // Fields that don't have equivalents in ByteBloomFilter. 253 out.writeInt(numChunks); 254 if (comparator != null) { 255 Bytes.writeByteArray(out, Bytes.toBytes(comparator.getClass().getName())); 256 } else { 257 // Internally writes a 0 vint if the byte[] is null 258 Bytes.writeByteArray(out, null); 259 } 260 261 // Write a single-level index without compression or block header. 262 bloomBlockIndexWriter.writeSingleLevelIndex(out, "Bloom filter"); 263 } 264 } 265 266 @Override 267 public void compactBloom() { 268 } 269 270 @Override 271 public Writable getMetaWriter() { 272 return new MetaWriter(); 273 } 274 275 @Override 276 public Writable getDataWriter() { 277 return null; 278 } 279 280 @Override 281 public boolean getCacheOnWrite() { 282 return cacheOnWrite; 283 } 284}