001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.IOException;
023import java.util.ArrayDeque;
024import java.util.Objects;
025import java.util.Queue;
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellComparator;
028import org.apache.hadoop.hbase.CellUtil;
029import org.apache.hadoop.hbase.KeyValueUtil;
030import org.apache.hadoop.hbase.PrivateCellUtil;
031import org.apache.hadoop.hbase.regionserver.BloomType;
032import org.apache.hadoop.hbase.util.BloomFilterChunk;
033import org.apache.hadoop.hbase.util.BloomFilterUtil;
034import org.apache.hadoop.hbase.util.BloomFilterWriter;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.io.Writable;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * Adds methods required for writing a compound Bloom filter to the data section of an
043 * {@link org.apache.hadoop.hbase.io.hfile.HFile} to the {@link CompoundBloomFilter} class.
044 */
045@InterfaceAudience.Private
046public class CompoundBloomFilterWriter extends CompoundBloomFilterBase
047  implements BloomFilterWriter, InlineBlockWriter {
048
049  private static final Logger LOG = LoggerFactory.getLogger(CompoundBloomFilterWriter.class);
050
051  /** The current chunk being written to */
052  private BloomFilterChunk chunk;
053
054  /** Previous chunk, so that we can create another similar chunk */
055  private BloomFilterChunk prevChunk;
056
057  /** Maximum fold factor */
058  private int maxFold;
059
060  /** The size of individual Bloom filter chunks to create */
061  private int chunkByteSize;
062  /** The prev Cell that was processed */
063  private Cell prevCell;
064
065  /** A Bloom filter chunk enqueued for writing */
066  private static class ReadyChunk {
067    int chunkId;
068    byte[] firstKey;
069    BloomFilterChunk chunk;
070  }
071
072  private Queue<ReadyChunk> readyChunks = new ArrayDeque<>();
073
074  /** The first key in the current Bloom filter chunk. */
075  private byte[] firstKeyInChunk = null;
076
077  private HFileBlockIndex.BlockIndexWriter bloomBlockIndexWriter =
078    new HFileBlockIndex.BlockIndexWriter();
079
080  /** Whether to cache-on-write compound Bloom filter chunks */
081  private boolean cacheOnWrite;
082
083  private BloomType bloomType;
084
085  /**
086   * each chunk's size in bytes. The real chunk size might be different as required by the fold
087   * factor. target false positive rate hash function type to use maximum degree of folding allowed
088   * the bloom type
089   */
090  public CompoundBloomFilterWriter(int chunkByteSizeHint, float errorRate, int hashType,
091    int maxFold, boolean cacheOnWrite, CellComparator comparator, BloomType bloomType) {
092    chunkByteSize = BloomFilterUtil.computeFoldableByteSize(chunkByteSizeHint * 8L, maxFold);
093
094    this.errorRate = errorRate;
095    this.hashType = hashType;
096    this.maxFold = maxFold;
097    this.cacheOnWrite = cacheOnWrite;
098    this.comparator = comparator;
099    this.bloomType = bloomType;
100  }
101
102  @Override
103  public boolean shouldWriteBlock(boolean closing) {
104    enqueueReadyChunk(closing);
105    return !readyChunks.isEmpty();
106  }
107
108  /**
109   * Enqueue the current chunk if it is ready to be written out.
110   * @param closing true if we are closing the file, so we do not expect new keys to show up
111   */
112  private void enqueueReadyChunk(boolean closing) {
113    if (chunk == null || (chunk.getKeyCount() < chunk.getMaxKeys() && !closing)) {
114      return;
115    }
116
117    if (firstKeyInChunk == null) {
118      throw new NullPointerException(
119        "Trying to enqueue a chunk, " + "but first key is null: closing=" + closing + ", keyCount="
120          + chunk.getKeyCount() + ", maxKeys=" + chunk.getMaxKeys());
121    }
122
123    ReadyChunk readyChunk = new ReadyChunk();
124    readyChunk.chunkId = numChunks - 1;
125    readyChunk.chunk = chunk;
126    readyChunk.firstKey = firstKeyInChunk;
127    readyChunks.add(readyChunk);
128
129    long prevMaxKeys = chunk.getMaxKeys();
130    long prevByteSize = chunk.getByteSize();
131
132    chunk.compactBloom();
133
134    if (LOG.isTraceEnabled() && prevByteSize != chunk.getByteSize()) {
135      LOG.trace("Compacted Bloom chunk #" + readyChunk.chunkId + " from [" + prevMaxKeys
136        + " max keys, " + prevByteSize + " bytes] to [" + chunk.getMaxKeys() + " max keys, "
137        + chunk.getByteSize() + " bytes]");
138    }
139
140    totalMaxKeys += chunk.getMaxKeys();
141    totalByteSize += chunk.getByteSize();
142
143    firstKeyInChunk = null;
144    prevChunk = chunk;
145    chunk = null;
146  }
147
148  @Override
149  public void append(Cell cell) throws IOException {
150    Objects.requireNonNull(cell);
151
152    enqueueReadyChunk(false);
153
154    if (chunk == null) {
155      if (firstKeyInChunk != null) {
156        throw new IllegalStateException(
157          "First key in chunk already set: " + Bytes.toStringBinary(firstKeyInChunk));
158      }
159      // This will be done only once per chunk
160      if (bloomType == BloomType.ROWCOL) {
161        firstKeyInChunk = PrivateCellUtil
162          .getCellKeySerializedAsKeyValueKey(PrivateCellUtil.createFirstOnRowCol(cell));
163      } else {
164        firstKeyInChunk = CellUtil.copyRow(cell);
165      }
166      allocateNewChunk();
167    }
168
169    chunk.add(cell);
170    this.prevCell = cell;
171    ++totalKeyCount;
172  }
173
174  @Override
175  public void beforeShipped() throws IOException {
176    if (this.prevCell != null) {
177      this.prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);
178    }
179  }
180
181  @Override
182  public Cell getPrevCell() {
183    return this.prevCell;
184  }
185
186  private void allocateNewChunk() {
187    if (prevChunk == null) {
188      // First chunk
189      chunk = BloomFilterUtil.createBySize(chunkByteSize, errorRate, hashType, maxFold, bloomType);
190    } else {
191      // Use the same parameters as the last chunk, but a new array and
192      // a zero key count.
193      chunk = prevChunk.createAnother();
194    }
195
196    if (chunk.getKeyCount() != 0) {
197      throw new IllegalStateException("keyCount=" + chunk.getKeyCount() + " > 0");
198    }
199
200    chunk.allocBloom();
201    ++numChunks;
202  }
203
204  @Override
205  public void writeInlineBlock(DataOutput out) throws IOException {
206    // We don't remove the chunk from the queue here, because we might need it
207    // again for cache-on-write.
208    ReadyChunk readyChunk = readyChunks.peek();
209
210    BloomFilterChunk readyChunkBloom = readyChunk.chunk;
211    readyChunkBloom.writeBloom(out);
212  }
213
214  @Override
215  public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
216    ReadyChunk readyChunk = readyChunks.remove();
217    bloomBlockIndexWriter.addEntry(readyChunk.firstKey, offset, onDiskSize);
218  }
219
220  @Override
221  public BlockType getInlineBlockType() {
222    return BlockType.BLOOM_CHUNK;
223  }
224
225  private class MetaWriter implements Writable {
226    protected MetaWriter() {
227    }
228
229    @Override
230    public void readFields(DataInput in) throws IOException {
231      throw new IOException("Cant read with this class.");
232    }
233
234    /**
235     * This is modeled after {@link CompoundBloomFilterWriter.MetaWriter} for simplicity, although
236     * the two metadata formats do not have to be consistent. This does have to be consistent with
237     * how
238     * {@link CompoundBloomFilter#CompoundBloomFilter(DataInput, org.apache.hadoop.hbase.io.hfile.HFile.Reader, BloomFilterMetrics)}
239     * reads fields.
240     */
241    @Override
242    public void write(DataOutput out) throws IOException {
243      out.writeInt(VERSION);
244
245      out.writeLong(getByteSize());
246      out.writeInt(prevChunk.getHashCount());
247      out.writeInt(prevChunk.getHashType());
248      out.writeLong(getKeyCount());
249      out.writeLong(getMaxKeys());
250
251      // Fields that don't have equivalents in ByteBloomFilter.
252      out.writeInt(numChunks);
253      if (comparator != null) {
254        Bytes.writeByteArray(out, Bytes.toBytes(comparator.getClass().getName()));
255      } else {
256        // Internally writes a 0 vint if the byte[] is null
257        Bytes.writeByteArray(out, null);
258      }
259
260      // Write a single-level index without compression or block header.
261      bloomBlockIndexWriter.writeSingleLevelIndex(out, "Bloom filter");
262    }
263  }
264
265  @Override
266  public void compactBloom() {
267  }
268
269  @Override
270  public Writable getMetaWriter() {
271    return new MetaWriter();
272  }
273
274  @Override
275  public Writable getDataWriter() {
276    return null;
277  }
278
279  @Override
280  public boolean getCacheOnWrite() {
281    return cacheOnWrite;
282  }
283}