001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.IOException;
023import java.util.LinkedList;
024import java.util.Queue;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellComparator;
027import org.apache.hadoop.hbase.CellUtil;
028import org.apache.hadoop.hbase.KeyValueUtil;
029import org.apache.hadoop.hbase.PrivateCellUtil;
030import org.apache.hadoop.hbase.regionserver.BloomType;
031import org.apache.hadoop.hbase.util.BloomFilterChunk;
032import org.apache.hadoop.hbase.util.BloomFilterUtil;
033import org.apache.hadoop.hbase.util.BloomFilterWriter;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.io.Writable;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * Adds methods required for writing a compound Bloom filter to the data section of an
042 * {@link org.apache.hadoop.hbase.io.hfile.HFile} to the {@link CompoundBloomFilter} class.
043 */
044@InterfaceAudience.Private
045public class CompoundBloomFilterWriter extends CompoundBloomFilterBase
046  implements BloomFilterWriter, InlineBlockWriter {
047
048  private static final Logger LOG = LoggerFactory.getLogger(CompoundBloomFilterWriter.class);
049
050  /** The current chunk being written to */
051  private BloomFilterChunk chunk;
052
053  /** Previous chunk, so that we can create another similar chunk */
054  private BloomFilterChunk prevChunk;
055
056  /** Maximum fold factor */
057  private int maxFold;
058
059  /** The size of individual Bloom filter chunks to create */
060  private int chunkByteSize;
061  /** The prev Cell that was processed */
062  private Cell prevCell;
063
064  /** A Bloom filter chunk enqueued for writing */
065  private static class ReadyChunk {
066    int chunkId;
067    byte[] firstKey;
068    BloomFilterChunk chunk;
069  }
070
071  private Queue<ReadyChunk> readyChunks = new LinkedList<>();
072
073  /** The first key in the current Bloom filter chunk. */
074  private byte[] firstKeyInChunk = null;
075
076  private HFileBlockIndex.BlockIndexWriter bloomBlockIndexWriter =
077    new HFileBlockIndex.BlockIndexWriter();
078
079  /** Whether to cache-on-write compound Bloom filter chunks */
080  private boolean cacheOnWrite;
081
082  private BloomType bloomType;
083
084  /**
085   * each chunk's size in bytes. The real chunk size might be different as required by the fold
086   * factor. target false positive rate hash function type to use maximum degree of folding allowed
087   * the bloom type
088   */
089  public CompoundBloomFilterWriter(int chunkByteSizeHint, float errorRate, int hashType,
090    int maxFold, boolean cacheOnWrite, CellComparator comparator, BloomType bloomType) {
091    chunkByteSize = BloomFilterUtil.computeFoldableByteSize(chunkByteSizeHint * 8L, maxFold);
092
093    this.errorRate = errorRate;
094    this.hashType = hashType;
095    this.maxFold = maxFold;
096    this.cacheOnWrite = cacheOnWrite;
097    this.comparator = comparator;
098    this.bloomType = bloomType;
099  }
100
101  @Override
102  public boolean shouldWriteBlock(boolean closing) {
103    enqueueReadyChunk(closing);
104    return !readyChunks.isEmpty();
105  }
106
107  /**
108   * Enqueue the current chunk if it is ready to be written out.
109   * @param closing true if we are closing the file, so we do not expect new keys to show up
110   */
111  private void enqueueReadyChunk(boolean closing) {
112    if (chunk == null || (chunk.getKeyCount() < chunk.getMaxKeys() && !closing)) {
113      return;
114    }
115
116    if (firstKeyInChunk == null) {
117      throw new NullPointerException(
118        "Trying to enqueue a chunk, " + "but first key is null: closing=" + closing + ", keyCount="
119          + chunk.getKeyCount() + ", maxKeys=" + chunk.getMaxKeys());
120    }
121
122    ReadyChunk readyChunk = new ReadyChunk();
123    readyChunk.chunkId = numChunks - 1;
124    readyChunk.chunk = chunk;
125    readyChunk.firstKey = firstKeyInChunk;
126    readyChunks.add(readyChunk);
127
128    long prevMaxKeys = chunk.getMaxKeys();
129    long prevByteSize = chunk.getByteSize();
130
131    chunk.compactBloom();
132
133    if (LOG.isTraceEnabled() && prevByteSize != chunk.getByteSize()) {
134      LOG.trace("Compacted Bloom chunk #" + readyChunk.chunkId + " from [" + prevMaxKeys
135        + " max keys, " + prevByteSize + " bytes] to [" + chunk.getMaxKeys() + " max keys, "
136        + chunk.getByteSize() + " bytes]");
137    }
138
139    totalMaxKeys += chunk.getMaxKeys();
140    totalByteSize += chunk.getByteSize();
141
142    firstKeyInChunk = null;
143    prevChunk = chunk;
144    chunk = null;
145  }
146
147  @Override
148  public void append(Cell cell) throws IOException {
149    if (cell == null) throw new NullPointerException();
150
151    enqueueReadyChunk(false);
152
153    if (chunk == null) {
154      if (firstKeyInChunk != null) {
155        throw new IllegalStateException(
156          "First key in chunk already set: " + Bytes.toStringBinary(firstKeyInChunk));
157      }
158      // This will be done only once per chunk
159      if (bloomType == BloomType.ROWCOL) {
160        firstKeyInChunk = PrivateCellUtil
161          .getCellKeySerializedAsKeyValueKey(PrivateCellUtil.createFirstOnRowCol(cell));
162      } else {
163        firstKeyInChunk = CellUtil.copyRow(cell);
164      }
165      allocateNewChunk();
166    }
167
168    chunk.add(cell);
169    this.prevCell = cell;
170    ++totalKeyCount;
171  }
172
173  @Override
174  public void beforeShipped() throws IOException {
175    if (this.prevCell != null) {
176      this.prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);
177    }
178  }
179
180  @Override
181  public Cell getPrevCell() {
182    return this.prevCell;
183  }
184
185  private void allocateNewChunk() {
186    if (prevChunk == null) {
187      // First chunk
188      chunk = BloomFilterUtil.createBySize(chunkByteSize, errorRate, hashType, maxFold, bloomType);
189    } else {
190      // Use the same parameters as the last chunk, but a new array and
191      // a zero key count.
192      chunk = prevChunk.createAnother();
193    }
194
195    if (chunk.getKeyCount() != 0) {
196      throw new IllegalStateException("keyCount=" + chunk.getKeyCount() + " > 0");
197    }
198
199    chunk.allocBloom();
200    ++numChunks;
201  }
202
203  @Override
204  public void writeInlineBlock(DataOutput out) throws IOException {
205    // We don't remove the chunk from the queue here, because we might need it
206    // again for cache-on-write.
207    ReadyChunk readyChunk = readyChunks.peek();
208
209    BloomFilterChunk readyChunkBloom = readyChunk.chunk;
210    readyChunkBloom.writeBloom(out);
211  }
212
213  @Override
214  public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
215    ReadyChunk readyChunk = readyChunks.remove();
216    bloomBlockIndexWriter.addEntry(readyChunk.firstKey, offset, onDiskSize);
217  }
218
219  @Override
220  public BlockType getInlineBlockType() {
221    return BlockType.BLOOM_CHUNK;
222  }
223
224  private class MetaWriter implements Writable {
225    protected MetaWriter() {
226    }
227
228    @Override
229    public void readFields(DataInput in) throws IOException {
230      throw new IOException("Cant read with this class.");
231    }
232
233    /**
234     * This is modeled after {@link CompoundBloomFilterWriter.MetaWriter} for simplicity, although
235     * the two metadata formats do not have to be consistent. This does have to be consistent with
236     * how
237     * {@link CompoundBloomFilter#CompoundBloomFilter(DataInput, org.apache.hadoop.hbase.io.hfile.HFile.Reader, BloomFilterMetrics)}
238     * reads fields.
239     */
240    @Override
241    public void write(DataOutput out) throws IOException {
242      out.writeInt(VERSION);
243
244      out.writeLong(getByteSize());
245      out.writeInt(prevChunk.getHashCount());
246      out.writeInt(prevChunk.getHashType());
247      out.writeLong(getKeyCount());
248      out.writeLong(getMaxKeys());
249
250      // Fields that don't have equivalents in ByteBloomFilter.
251      out.writeInt(numChunks);
252      if (comparator != null) {
253        Bytes.writeByteArray(out, Bytes.toBytes(comparator.getClass().getName()));
254      } else {
255        // Internally writes a 0 vint if the byte[] is null
256        Bytes.writeByteArray(out, null);
257      }
258
259      // Write a single-level index without compression or block header.
260      bloomBlockIndexWriter.writeSingleLevelIndex(out, "Bloom filter");
261    }
262  }
263
264  @Override
265  public void compactBloom() {
266  }
267
268  @Override
269  public Writable getMetaWriter() {
270    return new MetaWriter();
271  }
272
273  @Override
274  public Writable getDataWriter() {
275    return null;
276  }
277
278  @Override
279  public boolean getCacheOnWrite() {
280    return cacheOnWrite;
281  }
282}