001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024
025import org.apache.yetus.audience.InterfaceAudience;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.monitoring.MonitoredTask;
031import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
032import org.apache.hadoop.util.StringUtils;
033
034/**
035 * Default implementation of StoreFlusher.
036 */
037@InterfaceAudience.Private
038public class DefaultStoreFlusher extends StoreFlusher {
039  private static final Logger LOG = LoggerFactory.getLogger(DefaultStoreFlusher.class);
040  private final Object flushLock = new Object();
041
042  public DefaultStoreFlusher(Configuration conf, HStore store) {
043    super(conf, store);
044  }
045
046  @Override
047  public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
048      MonitoredTask status, ThroughputController throughputController,
049      FlushLifeCycleTracker tracker) throws IOException {
050    ArrayList<Path> result = new ArrayList<>();
051    int cellsCount = snapshot.getCellsCount();
052    if (cellsCount == 0) return result; // don't flush if there are no entries
053
054    // Use a store scanner to find which rows to flush.
055    long smallestReadPoint = store.getSmallestReadPoint();
056    InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
057    StoreFileWriter writer;
058    try {
059      // TODO:  We can fail in the below block before we complete adding this flush to
060      //        list of store files.  Add cleanup of anything put on filesystem if we fail.
061      synchronized (flushLock) {
062        status.setStatus("Flushing " + store + ": creating writer");
063        // Write the map out to the disk
064        writer = store.createWriterInTmp(cellsCount,
065            store.getColumnFamilyDescriptor().getCompressionType(), false, true,
066            snapshot.isTagsPresent(), false);
067        IOException e = null;
068        try {
069          performFlush(scanner, writer, smallestReadPoint, throughputController);
070        } catch (IOException ioe) {
071          e = ioe;
072          // throw the exception out
073          throw ioe;
074        } finally {
075          if (e != null) {
076            writer.close();
077          } else {
078            finalizeWriter(writer, cacheFlushId, status);
079          }
080        }
081      }
082    } finally {
083      scanner.close();
084    }
085    LOG.info("Flushed memstore data size={} at sequenceid={} (bloomFilter={}), to={}",
086        StringUtils.byteDesc(snapshot.getDataSize()), cacheFlushId, writer.hasGeneralBloom(),
087        writer.getPath());
088    result.add(writer.getPath());
089    return result;
090  }
091}