View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.IOException;
24  import java.util.Arrays;
25  import java.util.LinkedList;
26  import java.util.Queue;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.KeyValue.KVComparator;
32  import org.apache.hadoop.hbase.io.hfile.BlockType;
33  import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex;
34  import org.apache.hadoop.hbase.io.hfile.InlineBlockWriter;
35  import org.apache.hadoop.io.Writable;
36  
37  /**
38   * Adds methods required for writing a compound Bloom filter to the data
39   * section of an {@link org.apache.hadoop.hbase.io.hfile.HFile} to the
40   * {@link CompoundBloomFilter} class.
41   */
42  @InterfaceAudience.Private
43  public class CompoundBloomFilterWriter extends CompoundBloomFilterBase
44      implements BloomFilterWriter, InlineBlockWriter {
45  
46    private static final Log LOG =
47      LogFactory.getLog(CompoundBloomFilterWriter.class);
48  
49    /** The current chunk being written to */
50    private ByteBloomFilter chunk;
51  
52    /** Previous chunk, so that we can create another similar chunk */
53    private ByteBloomFilter prevChunk;
54  
55    /** Maximum fold factor */
56    private int maxFold;
57  
58    /** The size of individual Bloom filter chunks to create */
59    private int chunkByteSize;
60  
61    /** A Bloom filter chunk enqueued for writing */
62    private static class ReadyChunk {
63      int chunkId;
64      byte[] firstKey;
65      ByteBloomFilter chunk;
66    }
67  
68    private Queue<ReadyChunk> readyChunks = new LinkedList<ReadyChunk>();
69  
70    /** The first key in the current Bloom filter chunk. */
71    private byte[] firstKeyInChunk = null;
72  
73    private HFileBlockIndex.BlockIndexWriter bloomBlockIndexWriter =
74        new HFileBlockIndex.BlockIndexWriter();
75  
76    /** Whether to cache-on-write compound Bloom filter chunks */
77    private boolean cacheOnWrite;
78  
79    /**
80     * @param chunkByteSizeHint
81     *          each chunk's size in bytes. The real chunk size might be different
82     *          as required by the fold factor.
83     * @param errorRate
84     *          target false positive rate
85     * @param hashType
86     *          hash function type to use
87     * @param maxFold
88     *          maximum degree of folding allowed
89     */
90    public CompoundBloomFilterWriter(int chunkByteSizeHint, float errorRate,
91        int hashType, int maxFold, boolean cacheOnWrite,
92        KVComparator comparator) {
93      chunkByteSize = ByteBloomFilter.computeFoldableByteSize(
94          chunkByteSizeHint * 8L, maxFold);
95  
96      this.errorRate = errorRate;
97      this.hashType = hashType;
98      this.maxFold = maxFold;
99      this.cacheOnWrite = cacheOnWrite;
100     this.comparator = comparator;
101   }
102 
103   @Override
104   public boolean shouldWriteBlock(boolean closing) {
105     enqueueReadyChunk(closing);
106     return !readyChunks.isEmpty();
107   }
108 
109   /**
110    * Enqueue the current chunk if it is ready to be written out.
111    *
112    * @param closing true if we are closing the file, so we do not expect new
113    *        keys to show up
114    */
115   private void enqueueReadyChunk(boolean closing) {
116     if (chunk == null ||
117         (chunk.getKeyCount() < chunk.getMaxKeys() && !closing)) {
118       return;
119     }
120 
121     if (firstKeyInChunk == null) {
122       throw new NullPointerException("Trying to enqueue a chunk, " +
123           "but first key is null: closing=" + closing + ", keyCount=" +
124           chunk.getKeyCount() + ", maxKeys=" + chunk.getMaxKeys());
125     }
126 
127     ReadyChunk readyChunk = new ReadyChunk();
128     readyChunk.chunkId = numChunks - 1;
129     readyChunk.chunk = chunk;
130     readyChunk.firstKey = firstKeyInChunk;
131     readyChunks.add(readyChunk);
132 
133     long prevMaxKeys = chunk.getMaxKeys();
134     long prevByteSize = chunk.getByteSize();
135 
136     chunk.compactBloom();
137 
138     if (LOG.isTraceEnabled() && prevByteSize != chunk.getByteSize()) {
139       LOG.trace("Compacted Bloom chunk #" + readyChunk.chunkId + " from ["
140           + prevMaxKeys + " max keys, " + prevByteSize + " bytes] to ["
141           + chunk.getMaxKeys() + " max keys, " + chunk.getByteSize()
142           + " bytes]");
143     }
144 
145     totalMaxKeys += chunk.getMaxKeys();
146     totalByteSize += chunk.getByteSize();
147 
148     firstKeyInChunk = null;
149     prevChunk = chunk;
150     chunk = null;
151   }
152 
153   /**
154    * Adds a Bloom filter key. This key must be greater than the previous key,
155    * as defined by the comparator this compound Bloom filter is configured
156    * with. For efficiency, key monotonicity is not checked here. See
157    * {@link org.apache.hadoop.hbase.regionserver.StoreFile.Writer#append(
158    * org.apache.hadoop.hbase.Cell)} for the details of deduplication.
159    */
160   @Override
161   public void add(byte[] bloomKey, int keyOffset, int keyLength) {
162     if (bloomKey == null)
163       throw new NullPointerException();
164 
165     enqueueReadyChunk(false);
166 
167     if (chunk == null) {
168       if (firstKeyInChunk != null) {
169         throw new IllegalStateException("First key in chunk already set: "
170             + Bytes.toStringBinary(firstKeyInChunk));
171       }
172       firstKeyInChunk = Arrays.copyOfRange(bloomKey, keyOffset, keyOffset
173           + keyLength);
174 
175       if (prevChunk == null) {
176         // First chunk
177         chunk = ByteBloomFilter.createBySize(chunkByteSize, errorRate,
178             hashType, maxFold);
179       } else {
180         // Use the same parameters as the last chunk, but a new array and
181         // a zero key count.
182         chunk = prevChunk.createAnother();
183       }
184 
185       if (chunk.getKeyCount() != 0) {
186         throw new IllegalStateException("keyCount=" + chunk.getKeyCount()
187             + " > 0");
188       }
189 
190       chunk.allocBloom();
191       ++numChunks;
192     }
193 
194     chunk.add(bloomKey, keyOffset, keyLength);
195     ++totalKeyCount;
196   }
197 
198   @Override
199   public void writeInlineBlock(DataOutput out) throws IOException {
200     // We don't remove the chunk from the queue here, because we might need it
201     // again for cache-on-write.
202     ReadyChunk readyChunk = readyChunks.peek();
203 
204     ByteBloomFilter readyChunkBloom = readyChunk.chunk;
205     readyChunkBloom.getDataWriter().write(out);
206   }
207 
208   @Override
209   public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
210     ReadyChunk readyChunk = readyChunks.remove();
211     bloomBlockIndexWriter.addEntry(readyChunk.firstKey, offset, onDiskSize);
212   }
213 
214   @Override
215   public BlockType getInlineBlockType() {
216     return BlockType.BLOOM_CHUNK;
217   }
218 
219   private class MetaWriter implements Writable {
220     protected MetaWriter() {}
221 
222     @Override
223     public void readFields(DataInput in) throws IOException {
224       throw new IOException("Cant read with this class.");
225     }
226 
227     /**
228      * This is modeled after {@link ByteBloomFilter.MetaWriter} for simplicity,
229      * although the two metadata formats do not have to be consistent. This
230      * does have to be consistent with how {@link
231      * CompoundBloomFilter#CompoundBloomFilter(DataInput,
232      * org.apache.hadoop.hbase.io.hfile.HFile.Reader)} reads fields.
233      */
234     @Override
235     public void write(DataOutput out) throws IOException {
236       out.writeInt(VERSION);
237 
238       out.writeLong(getByteSize());
239       out.writeInt(prevChunk.getHashCount());
240       out.writeInt(prevChunk.getHashType());
241       out.writeLong(getKeyCount());
242       out.writeLong(getMaxKeys());
243 
244       // Fields that don't have equivalents in ByteBloomFilter.
245       out.writeInt(numChunks);
246       Bytes.writeByteArray(out,
247           Bytes.toBytes(comparator.getClass().getName()));
248 
249       // Write a single-level index without compression or block header.
250       bloomBlockIndexWriter.writeSingleLevelIndex(out, "Bloom filter");
251     }
252   }
253 
254   @Override
255   public Writable getMetaWriter() {
256     return new MetaWriter();
257   }
258 
259   @Override
260   public void compactBloom() {
261   }
262 
263   @Override
264   public void allocBloom() {
265     // Nothing happens here. All allocation happens on demand.
266   }
267 
268   @Override
269   public Writable getDataWriter() {
270     return null;
271   }
272 
273   @Override
274   public boolean getCacheOnWrite() {
275     return cacheOnWrite;
276   }
277 }