001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.IOException;
023import java.util.ArrayDeque;
024import java.util.Objects;
025import java.util.Queue;
026
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.CellComparator;
029import org.apache.hadoop.hbase.CellUtil;
030import org.apache.hadoop.hbase.PrivateCellUtil;
031import org.apache.hadoop.hbase.KeyValueUtil;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035import org.apache.hadoop.hbase.regionserver.BloomType;
036import org.apache.hadoop.hbase.util.BloomFilterChunk;
037import org.apache.hadoop.hbase.util.BloomFilterUtil;
038import org.apache.hadoop.hbase.util.BloomFilterWriter;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.io.Writable;
041
042/**
043 * Adds methods required for writing a compound Bloom filter to the data
044 * section of an {@link org.apache.hadoop.hbase.io.hfile.HFile} to the
045 * {@link CompoundBloomFilter} class.
046 */
047@InterfaceAudience.Private
048public class CompoundBloomFilterWriter extends CompoundBloomFilterBase
049    implements BloomFilterWriter, InlineBlockWriter {
050
051  private static final Logger LOG =
052    LoggerFactory.getLogger(CompoundBloomFilterWriter.class);
053
054  /** The current chunk being written to */
055  private BloomFilterChunk chunk;
056
057  /** Previous chunk, so that we can create another similar chunk */
058  private BloomFilterChunk prevChunk;
059
060  /** Maximum fold factor */
061  private int maxFold;
062
063  /** The size of individual Bloom filter chunks to create */
064  private int chunkByteSize;
065  /** The prev Cell that was processed  */
066  private Cell prevCell;
067
068  /** A Bloom filter chunk enqueued for writing */
069  private static class ReadyChunk {
070    int chunkId;
071    byte[] firstKey;
072    BloomFilterChunk chunk;
073  }
074
075  private Queue<ReadyChunk> readyChunks = new ArrayDeque<>();
076
077  /** The first key in the current Bloom filter chunk. */
078  private byte[] firstKeyInChunk = null;
079
080  private HFileBlockIndex.BlockIndexWriter bloomBlockIndexWriter =
081      new HFileBlockIndex.BlockIndexWriter();
082
083  /** Whether to cache-on-write compound Bloom filter chunks */
084  private boolean cacheOnWrite;
085
086  private BloomType bloomType;
087
088  /**
089   * @param chunkByteSizeHint
090   *          each chunk's size in bytes. The real chunk size might be different
091   *          as required by the fold factor.
092   * @param errorRate
093   *          target false positive rate
094   * @param hashType
095   *          hash function type to use
096   * @param maxFold
097   *          maximum degree of folding allowed
098   * @param bloomType
099   *          the bloom type
100   */
101  public CompoundBloomFilterWriter(int chunkByteSizeHint, float errorRate,
102      int hashType, int maxFold, boolean cacheOnWrite,
103      CellComparator comparator, BloomType bloomType) {
104    chunkByteSize = BloomFilterUtil.computeFoldableByteSize(
105        chunkByteSizeHint * 8L, maxFold);
106
107    this.errorRate = errorRate;
108    this.hashType = hashType;
109    this.maxFold = maxFold;
110    this.cacheOnWrite = cacheOnWrite;
111    this.comparator = comparator;
112    this.bloomType = bloomType;
113  }
114
115  @Override
116  public boolean shouldWriteBlock(boolean closing) {
117    enqueueReadyChunk(closing);
118    return !readyChunks.isEmpty();
119  }
120
121  /**
122   * Enqueue the current chunk if it is ready to be written out.
123   *
124   * @param closing true if we are closing the file, so we do not expect new
125   *        keys to show up
126   */
127  private void enqueueReadyChunk(boolean closing) {
128    if (chunk == null ||
129        (chunk.getKeyCount() < chunk.getMaxKeys() && !closing)) {
130      return;
131    }
132
133    if (firstKeyInChunk == null) {
134      throw new NullPointerException("Trying to enqueue a chunk, " +
135          "but first key is null: closing=" + closing + ", keyCount=" +
136          chunk.getKeyCount() + ", maxKeys=" + chunk.getMaxKeys());
137    }
138
139    ReadyChunk readyChunk = new ReadyChunk();
140    readyChunk.chunkId = numChunks - 1;
141    readyChunk.chunk = chunk;
142    readyChunk.firstKey = firstKeyInChunk;
143    readyChunks.add(readyChunk);
144
145    long prevMaxKeys = chunk.getMaxKeys();
146    long prevByteSize = chunk.getByteSize();
147
148    chunk.compactBloom();
149
150    if (LOG.isTraceEnabled() && prevByteSize != chunk.getByteSize()) {
151      LOG.trace("Compacted Bloom chunk #" + readyChunk.chunkId + " from ["
152          + prevMaxKeys + " max keys, " + prevByteSize + " bytes] to ["
153          + chunk.getMaxKeys() + " max keys, " + chunk.getByteSize()
154          + " bytes]");
155    }
156
157    totalMaxKeys += chunk.getMaxKeys();
158    totalByteSize += chunk.getByteSize();
159
160    firstKeyInChunk = null;
161    prevChunk = chunk;
162    chunk = null;
163  }
164
165  @Override
166  public void append(Cell cell) throws IOException {
167    Objects.requireNonNull(cell);
168
169    enqueueReadyChunk(false);
170
171    if (chunk == null) {
172      if (firstKeyInChunk != null) {
173        throw new IllegalStateException("First key in chunk already set: "
174            + Bytes.toStringBinary(firstKeyInChunk));
175      }
176      // This will be done only once per chunk
177      if (bloomType == BloomType.ROWCOL) {
178        firstKeyInChunk =
179            PrivateCellUtil
180                .getCellKeySerializedAsKeyValueKey(PrivateCellUtil.createFirstOnRowCol(cell));
181      } else {
182        firstKeyInChunk = CellUtil.copyRow(cell);
183      }
184      allocateNewChunk();
185    }
186
187    chunk.add(cell);
188    this.prevCell = cell;
189    ++totalKeyCount;
190  }
191
192  @Override
193  public void beforeShipped() throws IOException {
194    if (this.prevCell != null) {
195      this.prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);
196    }
197  }
198
199  @Override
200  public Cell getPrevCell() {
201    return this.prevCell;
202  }
203
204  private void allocateNewChunk() {
205    if (prevChunk == null) {
206      // First chunk
207      chunk = BloomFilterUtil.createBySize(chunkByteSize, errorRate,
208          hashType, maxFold, bloomType);
209    } else {
210      // Use the same parameters as the last chunk, but a new array and
211      // a zero key count.
212      chunk = prevChunk.createAnother();
213    }
214
215    if (chunk.getKeyCount() != 0) {
216      throw new IllegalStateException("keyCount=" + chunk.getKeyCount()
217          + " > 0");
218    }
219
220    chunk.allocBloom();
221    ++numChunks;
222  }
223  @Override
224  public void writeInlineBlock(DataOutput out) throws IOException {
225    // We don't remove the chunk from the queue here, because we might need it
226    // again for cache-on-write.
227    ReadyChunk readyChunk = readyChunks.peek();
228
229    BloomFilterChunk readyChunkBloom = readyChunk.chunk;
230    readyChunkBloom.writeBloom(out);
231  }
232
233  @Override
234  public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
235    ReadyChunk readyChunk = readyChunks.remove();
236    bloomBlockIndexWriter.addEntry(readyChunk.firstKey, offset, onDiskSize);
237  }
238
239  @Override
240  public BlockType getInlineBlockType() {
241    return BlockType.BLOOM_CHUNK;
242  }
243
244  private class MetaWriter implements Writable {
245    protected MetaWriter() {}
246
247    @Override
248    public void readFields(DataInput in) throws IOException {
249      throw new IOException("Cant read with this class.");
250    }
251
252    /**
253     * This is modeled after {@link CompoundBloomFilterWriter.MetaWriter} for simplicity,
254     * although the two metadata formats do not have to be consistent. This
255     * does have to be consistent with how {@link
256     * CompoundBloomFilter#CompoundBloomFilter(DataInput,
257     * org.apache.hadoop.hbase.io.hfile.HFile.Reader)} reads fields.
258     */
259    @Override
260    public void write(DataOutput out) throws IOException {
261      out.writeInt(VERSION);
262
263      out.writeLong(getByteSize());
264      out.writeInt(prevChunk.getHashCount());
265      out.writeInt(prevChunk.getHashType());
266      out.writeLong(getKeyCount());
267      out.writeLong(getMaxKeys());
268
269      // Fields that don't have equivalents in ByteBloomFilter.
270      out.writeInt(numChunks);
271      if (comparator != null) {
272        Bytes.writeByteArray(out, Bytes.toBytes(comparator.getClass().getName()));
273      } else {
274        // Internally writes a 0 vint if the byte[] is null
275        Bytes.writeByteArray(out, null);
276      }
277
278      // Write a single-level index without compression or block header.
279      bloomBlockIndexWriter.writeSingleLevelIndex(out, "Bloom filter");
280    }
281  }
282
283  @Override
284  public void compactBloom() {
285  }
286
287  @Override
288  public Writable getMetaWriter() {
289    return new MetaWriter();
290  }
291
292  @Override
293  public Writable getDataWriter() {
294    return null;
295  }
296
297  @Override
298  public boolean getCacheOnWrite() {
299    return cacheOnWrite;
300  }
301}