1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.io.DataInput;
22 import java.io.DataOutput;
23 import java.io.IOException;
24 import java.util.Arrays;
25 import java.util.LinkedList;
26 import java.util.Queue;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.hbase.KeyValue.KVComparator;
32 import org.apache.hadoop.hbase.io.hfile.BlockType;
33 import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex;
34 import org.apache.hadoop.hbase.io.hfile.InlineBlockWriter;
35 import org.apache.hadoop.io.Writable;
36
37
38
39
40
41
42 @InterfaceAudience.Private
43 public class CompoundBloomFilterWriter extends CompoundBloomFilterBase
44 implements BloomFilterWriter, InlineBlockWriter {
45
46 protected static final Log LOG =
47 LogFactory.getLog(CompoundBloomFilterWriter.class);
48
49
50 private ByteBloomFilter chunk;
51
52
53 private ByteBloomFilter prevChunk;
54
55
56 private int maxFold;
57
58
59 private int chunkByteSize;
60
61
62 private static class ReadyChunk {
63 int chunkId;
64 byte[] firstKey;
65 ByteBloomFilter chunk;
66 }
67
68 private Queue<ReadyChunk> readyChunks = new LinkedList<ReadyChunk>();
69
70
71 private byte[] firstKeyInChunk = null;
72
73 private HFileBlockIndex.BlockIndexWriter bloomBlockIndexWriter =
74 new HFileBlockIndex.BlockIndexWriter();
75
76
77 private boolean cacheOnWrite;
78
79
80
81
82
83
84
85
86
87
88
89
90 public CompoundBloomFilterWriter(int chunkByteSizeHint, float errorRate,
91 int hashType, int maxFold, boolean cacheOnWrite,
92 KVComparator comparator) {
93 chunkByteSize = ByteBloomFilter.computeFoldableByteSize(
94 chunkByteSizeHint * 8L, maxFold);
95
96 this.errorRate = errorRate;
97 this.hashType = hashType;
98 this.maxFold = maxFold;
99 this.cacheOnWrite = cacheOnWrite;
100 this.comparator = comparator;
101 }
102
103 @Override
104 public boolean shouldWriteBlock(boolean closing) {
105 enqueueReadyChunk(closing);
106 return !readyChunks.isEmpty();
107 }
108
109
110
111
112
113
114
115 private void enqueueReadyChunk(boolean closing) {
116 if (chunk == null ||
117 (chunk.getKeyCount() < chunk.getMaxKeys() && !closing)) {
118 return;
119 }
120
121 if (firstKeyInChunk == null) {
122 throw new NullPointerException("Trying to enqueue a chunk, " +
123 "but first key is null: closing=" + closing + ", keyCount=" +
124 chunk.getKeyCount() + ", maxKeys=" + chunk.getMaxKeys());
125 }
126
127 ReadyChunk readyChunk = new ReadyChunk();
128 readyChunk.chunkId = numChunks - 1;
129 readyChunk.chunk = chunk;
130 readyChunk.firstKey = firstKeyInChunk;
131 readyChunks.add(readyChunk);
132
133 long prevMaxKeys = chunk.getMaxKeys();
134 long prevByteSize = chunk.getByteSize();
135
136 chunk.compactBloom();
137
138 if (LOG.isTraceEnabled() && prevByteSize != chunk.getByteSize()) {
139 LOG.trace("Compacted Bloom chunk #" + readyChunk.chunkId + " from ["
140 + prevMaxKeys + " max keys, " + prevByteSize + " bytes] to ["
141 + chunk.getMaxKeys() + " max keys, " + chunk.getByteSize()
142 + " bytes]");
143 }
144
145 totalMaxKeys += chunk.getMaxKeys();
146 totalByteSize += chunk.getByteSize();
147
148 firstKeyInChunk = null;
149 prevChunk = chunk;
150 chunk = null;
151 }
152
153
154
155
156
157
158
159
160 @Override
161 public void add(byte[] bloomKey, int keyOffset, int keyLength) {
162 if (bloomKey == null)
163 throw new NullPointerException();
164
165 enqueueReadyChunk(false);
166
167 if (chunk == null) {
168 if (firstKeyInChunk != null) {
169 throw new IllegalStateException("First key in chunk already set: "
170 + Bytes.toStringBinary(firstKeyInChunk));
171 }
172 firstKeyInChunk = Arrays.copyOfRange(bloomKey, keyOffset, keyOffset
173 + keyLength);
174
175 if (prevChunk == null) {
176
177 chunk = ByteBloomFilter.createBySize(chunkByteSize, errorRate,
178 hashType, maxFold);
179 } else {
180
181
182 chunk = prevChunk.createAnother();
183 }
184
185 if (chunk.getKeyCount() != 0) {
186 throw new IllegalStateException("keyCount=" + chunk.getKeyCount()
187 + " > 0");
188 }
189
190 chunk.allocBloom();
191 ++numChunks;
192 }
193
194 chunk.add(bloomKey, keyOffset, keyLength);
195 ++totalKeyCount;
196 }
197
198 @Override
199 public void writeInlineBlock(DataOutput out) throws IOException {
200
201
202 ReadyChunk readyChunk = readyChunks.peek();
203
204 ByteBloomFilter readyChunkBloom = readyChunk.chunk;
205 readyChunkBloom.getDataWriter().write(out);
206 }
207
208 @Override
209 public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
210 ReadyChunk readyChunk = readyChunks.remove();
211 bloomBlockIndexWriter.addEntry(readyChunk.firstKey, offset, onDiskSize);
212 }
213
214 @Override
215 public BlockType getInlineBlockType() {
216 return BlockType.BLOOM_CHUNK;
217 }
218
219 private class MetaWriter implements Writable {
220 protected MetaWriter() {}
221
222 @Override
223 public void readFields(DataInput in) throws IOException {
224 throw new IOException("Cant read with this class.");
225 }
226
227
228
229
230
231
232
233
234 @Override
235 public void write(DataOutput out) throws IOException {
236 out.writeInt(VERSION);
237
238 out.writeLong(getByteSize());
239 out.writeInt(prevChunk.getHashCount());
240 out.writeInt(prevChunk.getHashType());
241 out.writeLong(getKeyCount());
242 out.writeLong(getMaxKeys());
243
244
245 out.writeInt(numChunks);
246 Bytes.writeByteArray(out,
247 Bytes.toBytes(comparator.getClass().getName()));
248
249
250 bloomBlockIndexWriter.writeSingleLevelIndex(out, "Bloom filter");
251 }
252 }
253
254 @Override
255 public Writable getMetaWriter() {
256 return new MetaWriter();
257 }
258
259 @Override
260 public void compactBloom() {
261 }
262
263 @Override
264 public void allocBloom() {
265
266 }
267
268 @Override
269 public Writable getDataWriter() {
270 return null;
271 }
272
273 @Override
274 public boolean getCacheOnWrite() {
275 return cacheOnWrite;
276 }
277 }