001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.List;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.KeyValueUtil;
031import org.apache.hadoop.hbase.monitoring.MonitoredTask;
032import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
033import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
034import org.apache.yetus.audience.InterfaceAudience;
035
036/**
037 * Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one).
038 * Custom implementation can be provided.
039 */
040@InterfaceAudience.Private
041abstract class StoreFlusher {
042  protected Configuration conf;
043  protected HStore store;
044
045  public StoreFlusher(Configuration conf, HStore store) {
046    this.conf = conf;
047    this.store = store;
048  }
049
050  /**
051   * Turns a snapshot of memstore into a set of store files.
052   * @param snapshot Memstore snapshot.
053   * @param cacheFlushSeqNum Log cache flush sequence number.
054   * @param status Task that represents the flush operation and may be updated with status.
055   * @param throughputController A controller to avoid flush too fast
056   * @return List of files written. Can be empty; must not be null.
057   */
058  public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
059      MonitoredTask status, ThroughputController throughputController,
060      FlushLifeCycleTracker tracker) throws IOException;
061
062  protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum,
063      MonitoredTask status) throws IOException {
064    // Write out the log sequence number that corresponds to this output
065    // hfile. Also write current time in metadata as minFlushTime.
066    // The hfile is current up to and including cacheFlushSeqNum.
067    status.setStatus("Flushing " + store + ": appending metadata");
068    writer.appendMetadata(cacheFlushSeqNum, false);
069    status.setStatus("Flushing " + store + ": closing flushed file");
070    writer.close();
071  }
072
073
074  /**
075   * Creates the scanner for flushing snapshot. Also calls coprocessors.
076   * @param snapshotScanners
077   * @param smallestReadPoint
078   * @return The scanner; null if coprocessor is canceling the flush.
079   */
080  protected final InternalScanner createScanner(List<KeyValueScanner> snapshotScanners,
081      long smallestReadPoint, FlushLifeCycleTracker tracker) throws IOException {
082    ScanInfo scanInfo;
083    if (store.getCoprocessorHost() != null) {
084      scanInfo = store.getCoprocessorHost().preFlushScannerOpen(store, tracker);
085    } else {
086      scanInfo = store.getScanInfo();
087    }
088    InternalScanner scanner = new StoreScanner(store, scanInfo, snapshotScanners,
089        ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
090    assert scanner != null;
091    if (store.getCoprocessorHost() != null) {
092      try {
093        return store.getCoprocessorHost().preFlush(store, scanner, tracker);
094      } catch (IOException ioe) {
095        scanner.close();
096        throw ioe;
097      }
098    }
099    return scanner;
100  }
101
102  /**
103   * Performs memstore flush, writing data from scanner into sink.
104   * @param scanner Scanner to get data from.
105   * @param sink Sink to write data to. Could be StoreFile.Writer.
106   * @param smallestReadPoint Smallest read point used for the flush.
107   * @param throughputController A controller to avoid flush too fast
108   */
109  protected void performFlush(InternalScanner scanner, CellSink sink,
110      long smallestReadPoint, ThroughputController throughputController) throws IOException {
111    int compactionKVMax =
112      conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
113
114    ScannerContext scannerContext =
115        ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
116
117    List<Cell> kvs = new ArrayList<>();
118    boolean hasMore;
119    String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush");
120    // no control on system table (such as meta, namespace, etc) flush
121    boolean control = throughputController != null && !store.getRegionInfo().getTable().isSystemTable();
122    if (control) {
123      throughputController.start(flushName);
124    }
125    try {
126      do {
127        hasMore = scanner.next(kvs, scannerContext);
128        if (!kvs.isEmpty()) {
129          for (Cell c : kvs) {
130            // If we know that this KV is going to be included always, then let us
131            // set its memstoreTS to 0. This will help us save space when writing to
132            // disk.
133            sink.append(c);
134            int len = KeyValueUtil.length(c);
135            if (control) {
136              throughputController.control(flushName, len);
137            }
138          }
139          kvs.clear();
140        }
141      } while (hasMore);
142    } catch (InterruptedException e) {
143      throw new InterruptedIOException("Interrupted while control throughput of flushing "
144          + flushName);
145    } finally {
146      if (control) {
147        throughputController.finish(flushName);
148      }
149    }
150  }
151}