001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.function.Consumer;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.monitoring.MonitoredTask;
030import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
031import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
032import org.apache.yetus.audience.InterfaceAudience;
033
034/**
035 * Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one).
036 * Custom implementation can be provided.
037 */
038@InterfaceAudience.Private
039abstract class StoreFlusher {
040  protected Configuration conf;
041  protected HStore store;
042
043  public StoreFlusher(Configuration conf, HStore store) {
044    this.conf = conf;
045    this.store = store;
046  }
047
048  /**
049   * Turns a snapshot of memstore into a set of store files.
050   * @param snapshot             Memstore snapshot.
051   * @param cacheFlushSeqNum     Log cache flush sequence number.
052   * @param status               Task that represents the flush operation and may be updated with
053   *                             status.
054   * @param throughputController A controller to avoid flush too fast
055   * @return List of files written. Can be empty; must not be null.
056   */
057  public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
058    MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,
059    Consumer<Path> writerCreationTracker) throws IOException;
060
061  protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum, MonitoredTask status)
062    throws IOException {
063    // Write out the log sequence number that corresponds to this output
064    // hfile. Also write current time in metadata as minFlushTime.
065    // The hfile is current up to and including cacheFlushSeqNum.
066    status.setStatus("Flushing " + store + ": appending metadata");
067    writer.appendMetadata(cacheFlushSeqNum, false);
068    status.setStatus("Flushing " + store + ": closing flushed file");
069    writer.close();
070  }
071
072  protected final StoreFileWriter createWriter(MemStoreSnapshot snapshot, boolean alwaysIncludesTag,
073    Consumer<Path> writerCreationTracker) throws IOException {
074    return store.getStoreEngine()
075      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(snapshot.getCellsCount())
076        .compression(store.getColumnFamilyDescriptor().getCompressionType()).isCompaction(false)
077        .includeMVCCReadpoint(true).includesTag(alwaysIncludesTag || snapshot.isTagsPresent())
078        .shouldDropBehind(false).writerCreationTracker(writerCreationTracker));
079  }
080
081  /**
082   * Creates the scanner for flushing snapshot. Also calls coprocessors.
083   * @return The scanner; null if coprocessor is canceling the flush.
084   */
085  protected final InternalScanner createScanner(List<KeyValueScanner> snapshotScanners,
086    FlushLifeCycleTracker tracker) throws IOException {
087    ScanInfo scanInfo;
088    if (store.getCoprocessorHost() != null) {
089      scanInfo = store.getCoprocessorHost().preFlushScannerOpen(store, tracker);
090    } else {
091      scanInfo = store.getScanInfo();
092    }
093    final long smallestReadPoint = store.getSmallestReadPoint();
094    InternalScanner scanner = new StoreScanner(store, scanInfo, snapshotScanners,
095      ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
096
097    if (store.getCoprocessorHost() != null) {
098      try {
099        return store.getCoprocessorHost().preFlush(store, scanner, tracker);
100      } catch (IOException ioe) {
101        scanner.close();
102        throw ioe;
103      }
104    }
105    return scanner;
106  }
107
108  /**
109   * Performs memstore flush, writing data from scanner into sink.
110   * @param scanner              Scanner to get data from.
111   * @param sink                 Sink to write data to. Could be StoreFile.Writer.
112   * @param throughputController A controller to avoid flush too fast
113   */
114  protected void performFlush(InternalScanner scanner, CellSink sink,
115    ThroughputController throughputController) throws IOException {
116    int compactionKVMax =
117      conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
118
119    ScannerContext scannerContext =
120      ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
121
122    List<Cell> kvs = new ArrayList<>();
123    boolean hasMore;
124    String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush");
125    // no control on system table (such as meta, namespace, etc) flush
126    boolean control =
127      throughputController != null && !store.getRegionInfo().getTable().isSystemTable();
128    if (control) {
129      throughputController.start(flushName);
130    }
131    try {
132      do {
133        hasMore = scanner.next(kvs, scannerContext);
134        if (!kvs.isEmpty()) {
135          for (Cell c : kvs) {
136            // If we know that this KV is going to be included always, then let us
137            // set its memstoreTS to 0. This will help us save space when writing to
138            // disk.
139            sink.append(c);
140            if (control) {
141              throughputController.control(flushName, c.getSerializedSize());
142            }
143          }
144          kvs.clear();
145        }
146      } while (hasMore);
147    } catch (InterruptedException e) {
148      throw new InterruptedIOException(
149        "Interrupted while control throughput of flushing " + flushName);
150    } finally {
151      if (control) {
152        throughputController.finish(flushName);
153      }
154    }
155  }
156}