View Javadoc

1   /*
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.util;
21  
22  import java.io.DataInput;
23  import java.io.DataOutput;
24  import java.io.IOException;
25  import java.util.Arrays;
26  import java.util.LinkedList;
27  import java.util.Queue;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.io.hfile.BlockType;
32  import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex;
33  import org.apache.hadoop.hbase.io.hfile.InlineBlockWriter;
34  import org.apache.hadoop.io.RawComparator;
35  import org.apache.hadoop.io.Writable;
36  
37  /**
38   * Adds methods required for writing a compound Bloom filter to the data
39   * section of an {@link org.apache.hadoop.hbase.io.hfile.HFile} to the
40   * {@link CompoundBloomFilter} class.
41   */
42  public class CompoundBloomFilterWriter extends CompoundBloomFilterBase
43      implements BloomFilterWriter, InlineBlockWriter {
44  
45    protected static final Log LOG =
46      LogFactory.getLog(CompoundBloomFilterWriter.class);
47  
48    /** The current chunk being written to */
49    private ByteBloomFilter chunk;
50  
51    /** Previous chunk, so that we can create another similar chunk */
52    private ByteBloomFilter prevChunk;
53  
54    /** Maximum fold factor */
55    private int maxFold;
56  
57    /** The size of individual Bloom filter chunks to create */
58    private int chunkByteSize;
59  
60    /** A Bloom filter chunk enqueued for writing */
61    private static class ReadyChunk {
62      int chunkId;
63      byte[] firstKey;
64      ByteBloomFilter chunk;
65    }
66  
67    private Queue<ReadyChunk> readyChunks = new LinkedList<ReadyChunk>();
68  
69    /** The first key in the current Bloom filter chunk. */
70    private byte[] firstKeyInChunk = null;
71  
72    private HFileBlockIndex.BlockIndexWriter bloomBlockIndexWriter =
73        new HFileBlockIndex.BlockIndexWriter();
74  
75    /** Whether to cache-on-write compound Bloom filter chunks */
76    private boolean cacheOnWrite;
77  
78    /**
79     * @param chunkByteSizeHint
80     *          each chunk's size in bytes. The real chunk size might be different
81     *          as required by the fold factor.
82     * @param errorRate
83     *          target false positive rate
84     * @param hashType
85     *          hash function type to use
86     * @param maxFold
87     *          maximum degree of folding allowed
88     */
89    public CompoundBloomFilterWriter(int chunkByteSizeHint, float errorRate,
90        int hashType, int maxFold, boolean cacheOnWrite,
91        RawComparator<byte[]> comparator) {
92      chunkByteSize = ByteBloomFilter.computeFoldableByteSize(
93          chunkByteSizeHint * 8, maxFold);
94  
95      this.errorRate = errorRate;
96      this.hashType = hashType;
97      this.maxFold = maxFold;
98      this.cacheOnWrite = cacheOnWrite;
99      this.comparator = comparator;
100   }
101 
102   @Override
103   public boolean shouldWriteBlock(boolean closing) {
104     enqueueReadyChunk(closing);
105     return !readyChunks.isEmpty();
106   }
107 
108   /**
109    * Enqueue the current chunk if it is ready to be written out.
110    *
111    * @param closing true if we are closing the file, so we do not expect new
112    *        keys to show up
113    */
114   private void enqueueReadyChunk(boolean closing) {
115     if (chunk == null ||
116         (chunk.getKeyCount() < chunk.getMaxKeys() && !closing)) {
117       return;
118     }
119 
120     if (firstKeyInChunk == null) {
121       throw new NullPointerException("Trying to enqueue a chunk, " +
122           "but first key is null: closing=" + closing + ", keyCount=" +
123           chunk.getKeyCount() + ", maxKeys=" + chunk.getMaxKeys());
124     }
125 
126     ReadyChunk readyChunk = new ReadyChunk();
127     readyChunk.chunkId = numChunks - 1;
128     readyChunk.chunk = chunk;
129     readyChunk.firstKey = firstKeyInChunk;
130     readyChunks.add(readyChunk);
131 
132     long prevMaxKeys = chunk.getMaxKeys();
133     long prevByteSize = chunk.getByteSize();
134 
135     chunk.compactBloom();
136 
137     if (LOG.isDebugEnabled() && prevByteSize != chunk.getByteSize()) {
138       LOG.debug("Compacted Bloom chunk #" + readyChunk.chunkId + " from ["
139           + prevMaxKeys + " max keys, " + prevByteSize + " bytes] to ["
140           + chunk.getMaxKeys() + " max keys, " + chunk.getByteSize()
141           + " bytes]");
142     }
143 
144     totalMaxKeys += chunk.getMaxKeys();
145     totalByteSize += chunk.getByteSize();
146 
147     firstKeyInChunk = null;
148     prevChunk = chunk;
149     chunk = null;
150   }
151 
152   /**
153    * Adds a Bloom filter key. This key must be greater than the previous key,
154    * as defined by the comparator this compound Bloom filter is configured
155    * with. For efficiency, key monotonicity is not checked here. See
156    * {@link org.apache.hadoop.hbase.regionserver.StoreFile.Writer#append(
157    * org.apache.hadoop.hbase.KeyValue)} for the details of deduplication.
158    */
159   @Override
160   public void add(byte[] bloomKey, int keyOffset, int keyLength) {
161     if (bloomKey == null)
162       throw new NullPointerException();
163 
164     enqueueReadyChunk(false);
165 
166     if (chunk == null) {
167       if (firstKeyInChunk != null) {
168         throw new IllegalStateException("First key in chunk already set: "
169             + Bytes.toStringBinary(firstKeyInChunk));
170       }
171       firstKeyInChunk = Arrays.copyOfRange(bloomKey, keyOffset, keyOffset
172           + keyLength);
173 
174       if (prevChunk == null) {
175         // First chunk
176         chunk = ByteBloomFilter.createBySize(chunkByteSize, errorRate,
177             hashType, maxFold);
178       } else {
179         // Use the same parameters as the last chunk, but a new array and
180         // a zero key count.
181         chunk = prevChunk.createAnother();
182       }
183 
184       if (chunk.getKeyCount() != 0) {
185         throw new IllegalStateException("keyCount=" + chunk.getKeyCount()
186             + " > 0");
187       }
188 
189       chunk.allocBloom();
190       ++numChunks;
191     }
192 
193     chunk.add(bloomKey, keyOffset, keyLength);
194     ++totalKeyCount;
195   }
196 
197   @Override
198   public void writeInlineBlock(DataOutput out) throws IOException {
199     // We don't remove the chunk from the queue here, because we might need it
200     // again for cache-on-write.
201     ReadyChunk readyChunk = readyChunks.peek();
202 
203     ByteBloomFilter readyChunkBloom = readyChunk.chunk;
204     readyChunkBloom.getDataWriter().write(out);
205   }
206 
207   @Override
208   public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
209     ReadyChunk readyChunk = readyChunks.remove();
210     bloomBlockIndexWriter.addEntry(readyChunk.firstKey, offset, onDiskSize);
211   }
212 
213   @Override
214   public BlockType getInlineBlockType() {
215     return BlockType.BLOOM_CHUNK;
216   }
217 
218   private class MetaWriter implements Writable {
219     protected MetaWriter() {}
220 
221     @Override
222     public void readFields(DataInput in) throws IOException {
223       throw new IOException("Cant read with this class.");
224     }
225 
226     /**
227      * This is modeled after {@link ByteBloomFilter.MetaWriter} for simplicity,
228      * although the two metadata formats do not have to be consistent. This
229      * does have to be consistent with how {@link
230      * CompoundBloomFilter#CompoundBloomFilter(DataInput,
231      * org.apache.hadoop.hbase.io.hfile.HFile.Reader)} reads fields.
232      */
233     @Override
234     public void write(DataOutput out) throws IOException {
235       out.writeInt(VERSION);
236 
237       out.writeLong(getByteSize());
238       out.writeInt(prevChunk.getHashCount());
239       out.writeInt(prevChunk.getHashType());
240       out.writeLong(getKeyCount());
241       out.writeLong(getMaxKeys());
242 
243       // Fields that don't have equivalents in ByteBloomFilter.
244       out.writeInt(numChunks);
245       Bytes.writeByteArray(out,
246           Bytes.toBytes(comparator.getClass().getName()));
247 
248       // Write a single-level index without compression or block header.
249       bloomBlockIndexWriter.writeSingleLevelIndex(out, "Bloom filter");
250     }
251   }
252 
253   @Override
254   public Writable getMetaWriter() {
255     return new MetaWriter();
256   }
257 
258   @Override
259   public void compactBloom() {
260   }
261 
262   @Override
263   public void allocBloom() {
264     // Nothing happens here. All allocation happens on demand.
265   }
266 
267   @Override
268   public Writable getDataWriter() {
269     return null;
270   }
271 
272   @Override
273   public boolean cacheOnWrite() {
274     return cacheOnWrite;
275   }
276 
277 }