001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.IOException;
023import java.util.LinkedList;
024import java.util.Queue;
025
026import org.apache.hadoop.hbase.Cell;
027import org.apache.hadoop.hbase.CellComparator;
028import org.apache.hadoop.hbase.CellUtil;
029import org.apache.hadoop.hbase.PrivateCellUtil;
030import org.apache.hadoop.hbase.KeyValueUtil;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034import org.apache.hadoop.hbase.regionserver.BloomType;
035import org.apache.hadoop.hbase.util.BloomFilterChunk;
036import org.apache.hadoop.hbase.util.BloomFilterUtil;
037import org.apache.hadoop.hbase.util.BloomFilterWriter;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.io.Writable;
040
041/**
042 * Adds methods required for writing a compound Bloom filter to the data
043 * section of an {@link org.apache.hadoop.hbase.io.hfile.HFile} to the
044 * {@link CompoundBloomFilter} class.
045 */
046@InterfaceAudience.Private
047public class CompoundBloomFilterWriter extends CompoundBloomFilterBase
048    implements BloomFilterWriter, InlineBlockWriter {
049
050  private static final Logger LOG =
051    LoggerFactory.getLogger(CompoundBloomFilterWriter.class);
052
053  /** The current chunk being written to */
054  private BloomFilterChunk chunk;
055
056  /** Previous chunk, so that we can create another similar chunk */
057  private BloomFilterChunk prevChunk;
058
059  /** Maximum fold factor */
060  private int maxFold;
061
062  /** The size of individual Bloom filter chunks to create */
063  private int chunkByteSize;
064  /** The prev Cell that was processed  */
065  private Cell prevCell;
066
067  /** A Bloom filter chunk enqueued for writing */
068  private static class ReadyChunk {
069    int chunkId;
070    byte[] firstKey;
071    BloomFilterChunk chunk;
072  }
073
074  private Queue<ReadyChunk> readyChunks = new LinkedList<>();
075
076  /** The first key in the current Bloom filter chunk. */
077  private byte[] firstKeyInChunk = null;
078
079  private HFileBlockIndex.BlockIndexWriter bloomBlockIndexWriter =
080      new HFileBlockIndex.BlockIndexWriter();
081
082  /** Whether to cache-on-write compound Bloom filter chunks */
083  private boolean cacheOnWrite;
084
085  private BloomType bloomType;
086
087  /**
088   * @param chunkByteSizeHint
089   *          each chunk's size in bytes. The real chunk size might be different
090   *          as required by the fold factor.
091   * @param errorRate
092   *          target false positive rate
093   * @param hashType
094   *          hash function type to use
095   * @param maxFold
096   *          maximum degree of folding allowed
097   * @param bloomType
098   *          the bloom type
099   */
100  public CompoundBloomFilterWriter(int chunkByteSizeHint, float errorRate,
101      int hashType, int maxFold, boolean cacheOnWrite,
102      CellComparator comparator, BloomType bloomType) {
103    chunkByteSize = BloomFilterUtil.computeFoldableByteSize(
104        chunkByteSizeHint * 8L, maxFold);
105
106    this.errorRate = errorRate;
107    this.hashType = hashType;
108    this.maxFold = maxFold;
109    this.cacheOnWrite = cacheOnWrite;
110    this.comparator = comparator;
111    this.bloomType = bloomType;
112  }
113
114  @Override
115  public boolean shouldWriteBlock(boolean closing) {
116    enqueueReadyChunk(closing);
117    return !readyChunks.isEmpty();
118  }
119
120  /**
121   * Enqueue the current chunk if it is ready to be written out.
122   *
123   * @param closing true if we are closing the file, so we do not expect new
124   *        keys to show up
125   */
126  private void enqueueReadyChunk(boolean closing) {
127    if (chunk == null ||
128        (chunk.getKeyCount() < chunk.getMaxKeys() && !closing)) {
129      return;
130    }
131
132    if (firstKeyInChunk == null) {
133      throw new NullPointerException("Trying to enqueue a chunk, " +
134          "but first key is null: closing=" + closing + ", keyCount=" +
135          chunk.getKeyCount() + ", maxKeys=" + chunk.getMaxKeys());
136    }
137
138    ReadyChunk readyChunk = new ReadyChunk();
139    readyChunk.chunkId = numChunks - 1;
140    readyChunk.chunk = chunk;
141    readyChunk.firstKey = firstKeyInChunk;
142    readyChunks.add(readyChunk);
143
144    long prevMaxKeys = chunk.getMaxKeys();
145    long prevByteSize = chunk.getByteSize();
146
147    chunk.compactBloom();
148
149    if (LOG.isTraceEnabled() && prevByteSize != chunk.getByteSize()) {
150      LOG.trace("Compacted Bloom chunk #" + readyChunk.chunkId + " from ["
151          + prevMaxKeys + " max keys, " + prevByteSize + " bytes] to ["
152          + chunk.getMaxKeys() + " max keys, " + chunk.getByteSize()
153          + " bytes]");
154    }
155
156    totalMaxKeys += chunk.getMaxKeys();
157    totalByteSize += chunk.getByteSize();
158
159    firstKeyInChunk = null;
160    prevChunk = chunk;
161    chunk = null;
162  }
163
164  @Override
165  public void append(Cell cell) throws IOException {
166    if (cell == null)
167      throw new NullPointerException();
168
169    enqueueReadyChunk(false);
170
171    if (chunk == null) {
172      if (firstKeyInChunk != null) {
173        throw new IllegalStateException("First key in chunk already set: "
174            + Bytes.toStringBinary(firstKeyInChunk));
175      }
176      // This will be done only once per chunk
177      if (bloomType == BloomType.ROW) {
178        firstKeyInChunk = CellUtil.copyRow(cell);
179      } else {
180        firstKeyInChunk =
181            PrivateCellUtil
182                .getCellKeySerializedAsKeyValueKey(PrivateCellUtil.createFirstOnRowCol(cell));
183      }
184      allocateNewChunk();
185    }
186
187    chunk.add(cell);
188    this.prevCell = cell;
189    ++totalKeyCount;
190  }
191
192  @Override
193  public void beforeShipped() throws IOException {
194    if (this.prevCell != null) {
195      this.prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);
196    }
197  }
198
199  @Override
200  public Cell getPrevCell() {
201    return this.prevCell;
202  }
203
204  private void allocateNewChunk() {
205    if (prevChunk == null) {
206      // First chunk
207      chunk = BloomFilterUtil.createBySize(chunkByteSize, errorRate,
208          hashType, maxFold, bloomType);
209    } else {
210      // Use the same parameters as the last chunk, but a new array and
211      // a zero key count.
212      chunk = prevChunk.createAnother();
213    }
214
215    if (chunk.getKeyCount() != 0) {
216      throw new IllegalStateException("keyCount=" + chunk.getKeyCount()
217          + " > 0");
218    }
219
220    chunk.allocBloom();
221    ++numChunks;
222  }
223  @Override
224  public void writeInlineBlock(DataOutput out) throws IOException {
225    // We don't remove the chunk from the queue here, because we might need it
226    // again for cache-on-write.
227    ReadyChunk readyChunk = readyChunks.peek();
228
229    BloomFilterChunk readyChunkBloom = readyChunk.chunk;
230    readyChunkBloom.writeBloom(out);
231  }
232
233  @Override
234  public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
235    ReadyChunk readyChunk = readyChunks.remove();
236    bloomBlockIndexWriter.addEntry(readyChunk.firstKey, offset, onDiskSize);
237  }
238
239  @Override
240  public BlockType getInlineBlockType() {
241    return BlockType.BLOOM_CHUNK;
242  }
243
244  private class MetaWriter implements Writable {
245    protected MetaWriter() {}
246
247    @Override
248    public void readFields(DataInput in) throws IOException {
249      throw new IOException("Cant read with this class.");
250    }
251
252    /**
253     * This is modeled after {@link CompoundBloomFilterWriter.MetaWriter} for simplicity,
254     * although the two metadata formats do not have to be consistent. This
255     * does have to be consistent with how {@link
256     * CompoundBloomFilter#CompoundBloomFilter(DataInput,
257     * org.apache.hadoop.hbase.io.hfile.HFile.Reader)} reads fields.
258     */
259    @Override
260    public void write(DataOutput out) throws IOException {
261      out.writeInt(VERSION);
262
263      out.writeLong(getByteSize());
264      out.writeInt(prevChunk.getHashCount());
265      out.writeInt(prevChunk.getHashType());
266      out.writeLong(getKeyCount());
267      out.writeLong(getMaxKeys());
268
269      // Fields that don't have equivalents in ByteBloomFilter.
270      out.writeInt(numChunks);
271      if (comparator != null) {
272        Bytes.writeByteArray(out, Bytes.toBytes(comparator.getClass().getName()));
273      } else {
274        // Internally writes a 0 vint if the byte[] is null
275        Bytes.writeByteArray(out, null);
276      }
277
278      // Write a single-level index without compression or block header.
279      bloomBlockIndexWriter.writeSingleLevelIndex(out, "Bloom filter");
280    }
281  }
282
283  @Override
284  public void compactBloom() {
285  }
286
287  @Override
288  public Writable getMetaWriter() {
289    return new MetaWriter();
290  }
291
292  @Override
293  public Writable getDataWriter() {
294    return null;
295  }
296
297  @Override
298  public boolean getCacheOnWrite() {
299    return cacheOnWrite;
300  }
301}