001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT;
022import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
023import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
024import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;
025import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
026import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
027
028import java.io.IOException;
029import java.net.InetSocketAddress;
030import java.util.UUID;
031import java.util.regex.Pattern;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CellComparator;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.PrivateCellUtil;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
042import org.apache.hadoop.hbase.io.hfile.CacheConfig;
043import org.apache.hadoop.hbase.io.hfile.HFile;
044import org.apache.hadoop.hbase.io.hfile.HFileContext;
045import org.apache.hadoop.hbase.util.BloomContext;
046import org.apache.hadoop.hbase.util.BloomFilterFactory;
047import org.apache.hadoop.hbase.util.BloomFilterWriter;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.FSUtils;
050import org.apache.hadoop.hbase.util.RowBloomContext;
051import org.apache.hadoop.hbase.util.RowColBloomContext;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
056
057/**
058 * A StoreFile writer.  Use this to read/write HBase Store Files. It is package
059 * local because it is an implementation detail of the HBase regionserver.
060 */
061@InterfaceAudience.Private
062public class StoreFileWriter implements CellSink, ShipperListener {
063  private static final Logger LOG = LoggerFactory.getLogger(StoreFileWriter.class.getName());
064  private static final Pattern dash = Pattern.compile("-");
065  private final BloomFilterWriter generalBloomFilterWriter;
066  private final BloomFilterWriter deleteFamilyBloomFilterWriter;
067  private final BloomType bloomType;
068  private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
069  private long deleteFamilyCnt = 0;
070  private BloomContext bloomContext = null;
071  private BloomContext deleteFamilyBloomContext = null;
072  private final TimeRangeTracker timeRangeTracker;
073
074  protected HFile.Writer writer;
075
076    /**
077     * Creates an HFile.Writer that also write helpful meta data.
078     * @param fs file system to write to
079     * @param path file name to create
080     * @param conf user configuration
081     * @param comparator key comparator
082     * @param bloomType bloom filter setting
083     * @param maxKeys the expected maximum number of keys to be added. Was used
084     *        for Bloom filter size in {@link HFile} format version 1.
085     * @param favoredNodes
086     * @param fileContext - The HFile context
087     * @param shouldDropCacheBehind Drop pages written to page cache after writing the store file.
088     * @throws IOException problem writing to FS
089     */
090    private StoreFileWriter(FileSystem fs, Path path,
091        final Configuration conf,
092        CacheConfig cacheConf,
093        final CellComparator comparator, BloomType bloomType, long maxKeys,
094        InetSocketAddress[] favoredNodes, HFileContext fileContext,
095        boolean shouldDropCacheBehind)
096            throws IOException {
097    this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
098    // TODO : Change all writers to be specifically created for compaction context
099    writer = HFile.getWriterFactory(conf, cacheConf)
100        .withPath(fs, path)
101        .withComparator(comparator)
102        .withFavoredNodes(favoredNodes)
103        .withFileContext(fileContext)
104        .withShouldDropCacheBehind(shouldDropCacheBehind)
105        .create();
106
107    generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(
108        conf, cacheConf, bloomType,
109        (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
110
111    if (generalBloomFilterWriter != null) {
112      this.bloomType = bloomType;
113      if (LOG.isTraceEnabled()) {
114        LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", " +
115            generalBloomFilterWriter.getClass().getSimpleName());
116      }
117      // init bloom context
118      switch (bloomType) {
119      case ROW:
120        bloomContext = new RowBloomContext(generalBloomFilterWriter, comparator);
121        break;
122      case ROWCOL:
123        bloomContext = new RowColBloomContext(generalBloomFilterWriter, comparator);
124        break;
125      default:
126        throw new IOException(
127            "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL expected)");
128      }
129    } else {
130      // Not using Bloom filters.
131      this.bloomType = BloomType.NONE;
132    }
133
134    // initialize delete family Bloom filter when there is NO RowCol Bloom
135    // filter
136    if (this.bloomType != BloomType.ROWCOL) {
137      this.deleteFamilyBloomFilterWriter = BloomFilterFactory
138          .createDeleteBloomAtWrite(conf, cacheConf,
139              (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
140      deleteFamilyBloomContext = new RowBloomContext(deleteFamilyBloomFilterWriter, comparator);
141    } else {
142      deleteFamilyBloomFilterWriter = null;
143    }
144    if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) {
145      LOG.trace("Delete Family Bloom filter type for " + path + ": " +
146          deleteFamilyBloomFilterWriter.getClass().getSimpleName());
147    }
148  }
149
150  /**
151   * Writes meta data.
152   * Call before {@link #close()} since its written as meta data to this file.
153   * @param maxSequenceId Maximum sequence id.
154   * @param majorCompaction True if this file is product of a major compaction
155   * @throws IOException problem writing to FS
156   */
157  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
158      throws IOException {
159    writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
160    writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
161    appendTrackedTimestampsToMetadata();
162  }
163
164  /**
165   * Writes meta data.
166   * Call before {@link #close()} since its written as meta data to this file.
167   * @param maxSequenceId Maximum sequence id.
168   * @param majorCompaction True if this file is product of a major compaction
169   * @param mobCellsCount The number of mob cells.
170   * @throws IOException problem writing to FS
171   */
172  public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
173      final long mobCellsCount) throws IOException {
174    writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
175    writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
176    writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount));
177    appendTrackedTimestampsToMetadata();
178  }
179
180  /**
181   * Add TimestampRange and earliest put timestamp to Metadata
182   */
183  public void appendTrackedTimestampsToMetadata() throws IOException {
184    // TODO: The StoreFileReader always converts the byte[] to TimeRange
185    // via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
186    appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker));
187    appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
188  }
189
190  /**
191   * Record the earlest Put timestamp.
192   *
193   * If the timeRangeTracker is not set,
194   * update TimeRangeTracker to include the timestamp of this key
195   */
196  public void trackTimestamps(final Cell cell) {
197    if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) {
198      earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
199    }
200    timeRangeTracker.includeTimestamp(cell);
201  }
202
203  private void appendGeneralBloomfilter(final Cell cell) throws IOException {
204    if (this.generalBloomFilterWriter != null) {
205      /*
206       * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png
207       * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp
208       *
209       * 2 Types of Filtering:
210       *  1. Row = Row
211       *  2. RowCol = Row + Qualifier
212       */
213      bloomContext.writeBloom(cell);
214    }
215  }
216
217  private void appendDeleteFamilyBloomFilter(final Cell cell)
218      throws IOException {
219    if (!PrivateCellUtil.isDeleteFamily(cell) && !PrivateCellUtil.isDeleteFamilyVersion(cell)) {
220      return;
221    }
222
223    // increase the number of delete family in the store file
224    deleteFamilyCnt++;
225    if (this.deleteFamilyBloomFilterWriter != null) {
226      deleteFamilyBloomContext.writeBloom(cell);
227    }
228  }
229
230  @Override
231  public void append(final Cell cell) throws IOException {
232    appendGeneralBloomfilter(cell);
233    appendDeleteFamilyBloomFilter(cell);
234    writer.append(cell);
235    trackTimestamps(cell);
236  }
237
238  @Override
239  public void beforeShipped() throws IOException {
240    // For now these writer will always be of type ShipperListener true.
241    // TODO : Change all writers to be specifically created for compaction context
242    writer.beforeShipped();
243    if (generalBloomFilterWriter != null) {
244      generalBloomFilterWriter.beforeShipped();
245    }
246    if (deleteFamilyBloomFilterWriter != null) {
247      deleteFamilyBloomFilterWriter.beforeShipped();
248    }
249  }
250
251  public Path getPath() {
252    return this.writer.getPath();
253  }
254
255  public boolean hasGeneralBloom() {
256    return this.generalBloomFilterWriter != null;
257  }
258
259  /**
260   * For unit testing only.
261   *
262   * @return the Bloom filter used by this writer.
263   */
264  BloomFilterWriter getGeneralBloomWriter() {
265    return generalBloomFilterWriter;
266  }
267
268  private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException {
269    boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0);
270    if (haveBloom) {
271      bfw.compactBloom();
272    }
273    return haveBloom;
274  }
275
276  private boolean closeGeneralBloomFilter() throws IOException {
277    boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter);
278
279    // add the general Bloom filter writer and append file info
280    if (hasGeneralBloom) {
281      writer.addGeneralBloomFilter(generalBloomFilterWriter);
282      writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString()));
283      bloomContext.addLastBloomKey(writer);
284    }
285    return hasGeneralBloom;
286  }
287
288  private boolean closeDeleteFamilyBloomFilter() throws IOException {
289    boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter);
290
291    // add the delete family Bloom filter writer
292    if (hasDeleteFamilyBloom) {
293      writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter);
294    }
295
296    // append file info about the number of delete family kvs
297    // even if there is no delete family Bloom.
298    writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt));
299
300    return hasDeleteFamilyBloom;
301  }
302
303  public void close() throws IOException {
304    boolean hasGeneralBloom = this.closeGeneralBloomFilter();
305    boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
306
307    writer.close();
308
309    // Log final Bloom filter statistics. This needs to be done after close()
310    // because compound Bloom filters might be finalized as part of closing.
311    if (LOG.isTraceEnabled()) {
312      LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and " +
313        (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile " +
314        getPath());
315    }
316
317  }
318
319  public void appendFileInfo(byte[] key, byte[] value) throws IOException {
320    writer.appendFileInfo(key, value);
321  }
322
323  /** For use in testing.
324   */
325  HFile.Writer getHFileWriter() {
326    return writer;
327  }
328
329  /**
330   * @param fs
331   * @param dir Directory to create file in.
332   * @return random filename inside passed <code>dir</code>
333   */
334  static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException {
335    if (!fs.getFileStatus(dir).isDirectory()) {
336      throw new IOException("Expecting " + dir.toString() + " to be a directory");
337    }
338    return new Path(dir, dash.matcher(UUID.randomUUID().toString()).replaceAll(""));
339  }
340
341  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ICAST_INTEGER_MULTIPLY_CAST_TO_LONG",
342      justification="Will not overflow")
343  public static class Builder {
344    private final Configuration conf;
345    private final CacheConfig cacheConf;
346    private final FileSystem fs;
347
348    private CellComparator comparator = CellComparator.getInstance();
349    private BloomType bloomType = BloomType.NONE;
350    private long maxKeyCount = 0;
351    private Path dir;
352    private Path filePath;
353    private InetSocketAddress[] favoredNodes;
354    private HFileContext fileContext;
355    private boolean shouldDropCacheBehind;
356
357    public Builder(Configuration conf, CacheConfig cacheConf,
358        FileSystem fs) {
359      this.conf = conf;
360      this.cacheConf = cacheConf;
361      this.fs = fs;
362    }
363
364    /**
365     * Creates Builder with cache configuration disabled
366     */
367    public Builder(Configuration conf, FileSystem fs) {
368      this.conf = conf;
369      this.cacheConf = CacheConfig.DISABLED;
370      this.fs = fs;
371    }
372
373    /**
374     * Use either this method or {@link #withFilePath}, but not both.
375     * @param dir Path to column family directory. The directory is created if
376     *          does not exist. The file is given a unique name within this
377     *          directory.
378     * @return this (for chained invocation)
379     */
380    public Builder withOutputDir(Path dir) {
381      Preconditions.checkNotNull(dir);
382      this.dir = dir;
383      return this;
384    }
385
386    /**
387     * Use either this method or {@link #withOutputDir}, but not both.
388     * @param filePath the StoreFile path to write
389     * @return this (for chained invocation)
390     */
391    public Builder withFilePath(Path filePath) {
392      Preconditions.checkNotNull(filePath);
393      this.filePath = filePath;
394      return this;
395    }
396
397    /**
398     * @param favoredNodes an array of favored nodes or possibly null
399     * @return this (for chained invocation)
400     */
401    public Builder withFavoredNodes(InetSocketAddress[] favoredNodes) {
402      this.favoredNodes = favoredNodes;
403      return this;
404    }
405
406    public Builder withComparator(CellComparator comparator) {
407      Preconditions.checkNotNull(comparator);
408      this.comparator = comparator;
409      return this;
410    }
411
412    public Builder withBloomType(BloomType bloomType) {
413      Preconditions.checkNotNull(bloomType);
414      this.bloomType = bloomType;
415      return this;
416    }
417
418    /**
419     * @param maxKeyCount estimated maximum number of keys we expect to add
420     * @return this (for chained invocation)
421     */
422    public Builder withMaxKeyCount(long maxKeyCount) {
423      this.maxKeyCount = maxKeyCount;
424      return this;
425    }
426
427    public Builder withFileContext(HFileContext fileContext) {
428      this.fileContext = fileContext;
429      return this;
430    }
431
432    public Builder withShouldDropCacheBehind(boolean shouldDropCacheBehind) {
433      this.shouldDropCacheBehind = shouldDropCacheBehind;
434      return this;
435    }
436
437    /**
438     * Create a store file writer. Client is responsible for closing file when
439     * done. If metadata, add BEFORE closing using
440     * {@link StoreFileWriter#appendMetadata}.
441     */
442    public StoreFileWriter build() throws IOException {
443      if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) {
444        throw new IllegalArgumentException("Either specify parent directory " +
445            "or file path");
446      }
447
448      if (dir == null) {
449        dir = filePath.getParent();
450      }
451
452      if (!fs.exists(dir)) {
453        // Handle permission for non-HDFS filesystem properly
454        // See HBASE-17710
455        HRegionFileSystem.mkdirs(fs, conf, dir);
456      }
457
458      // set block storage policy for temp path
459      String policyName = this.conf.get(ColumnFamilyDescriptorBuilder.STORAGE_POLICY);
460      if (null == policyName) {
461        policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY);
462      }
463      FSUtils.setStoragePolicy(this.fs, dir, policyName);
464
465      if (filePath == null) {
466        filePath = getUniqueFile(fs, dir);
467        if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
468          bloomType = BloomType.NONE;
469        }
470      }
471
472      if (comparator == null) {
473        comparator = CellComparator.getInstance();
474      }
475      return new StoreFileWriter(fs, filePath,
476          conf, cacheConf, comparator, bloomType, maxKeyCount, favoredNodes, fileContext,
477          shouldDropCacheBehind);
478    }
479  }
480}