001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024
025import org.apache.yetus.audience.InterfaceAudience;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.monitoring.MonitoredTask;
031import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
032import org.apache.hadoop.util.StringUtils;
033
034/**
035 * Default implementation of StoreFlusher.
036 */
037@InterfaceAudience.Private
038public class DefaultStoreFlusher extends StoreFlusher {
039  private static final Logger LOG = LoggerFactory.getLogger(DefaultStoreFlusher.class);
040  private final Object flushLock = new Object();
041
042  public DefaultStoreFlusher(Configuration conf, HStore store) {
043    super(conf, store);
044  }
045
046  @Override
047  public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
048      MonitoredTask status, ThroughputController throughputController,
049      FlushLifeCycleTracker tracker) throws IOException {
050    ArrayList<Path> result = new ArrayList<>();
051    int cellsCount = snapshot.getCellsCount();
052    if (cellsCount == 0) return result; // don't flush if there are no entries
053
054    // Use a store scanner to find which rows to flush.
055    InternalScanner scanner = createScanner(snapshot.getScanners(), tracker);
056    StoreFileWriter writer;
057    try {
058      // TODO:  We can fail in the below block before we complete adding this flush to
059      //        list of store files.  Add cleanup of anything put on filesystem if we fail.
060      synchronized (flushLock) {
061        status.setStatus("Flushing " + store + ": creating writer");
062        // Write the map out to the disk
063        writer = store.createWriterInTmp(cellsCount,
064            store.getColumnFamilyDescriptor().getCompressionType(), false, true,
065            snapshot.isTagsPresent(), false);
066        IOException e = null;
067        try {
068          performFlush(scanner, writer, throughputController);
069        } catch (IOException ioe) {
070          e = ioe;
071          // throw the exception out
072          throw ioe;
073        } finally {
074          if (e != null) {
075            writer.close();
076          } else {
077            finalizeWriter(writer, cacheFlushId, status);
078          }
079        }
080      }
081    } finally {
082      scanner.close();
083    }
084    LOG.info("Flushed memstore data size={} at sequenceid={} (bloomFilter={}), to={}",
085        StringUtils.byteDesc(snapshot.getDataSize()), cacheFlushId, writer.hasGeneralBloom(),
086        writer.getPath());
087    result.add(writer.getPath());
088    return result;
089  }
090}