001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.function.Consumer; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.monitoring.MonitoredTask; 027import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 028import org.apache.hadoop.util.StringUtils; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * Default implementation of StoreFlusher. 035 */ 036@InterfaceAudience.Private 037public class DefaultStoreFlusher extends StoreFlusher { 038 private static final Logger LOG = LoggerFactory.getLogger(DefaultStoreFlusher.class); 039 private final Object flushLock = new Object(); 040 041 public DefaultStoreFlusher(Configuration conf, HStore store) { 042 super(conf, store); 043 } 044 045 @Override 046 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, 047 MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker, 048 Consumer<Path> writerCreationTracker) throws IOException { 049 ArrayList<Path> result = new ArrayList<>(); 050 int cellsCount = snapshot.getCellsCount(); 051 if (cellsCount == 0) return result; // don't flush if there are no entries 052 053 // Use a store scanner to find which rows to flush. 054 InternalScanner scanner = createScanner(snapshot.getScanners(), tracker); 055 StoreFileWriter writer; 056 try { 057 // TODO: We can fail in the below block before we complete adding this flush to 058 // list of store files. Add cleanup of anything put on filesystem if we fail. 059 synchronized (flushLock) { 060 status.setStatus("Flushing " + store + ": creating writer"); 061 // Write the map out to the disk 062 writer = createWriter(snapshot, false, writerCreationTracker); 063 IOException e = null; 064 try { 065 performFlush(scanner, writer, throughputController); 066 } catch (IOException ioe) { 067 e = ioe; 068 // throw the exception out 069 throw ioe; 070 } finally { 071 if (e != null) { 072 writer.close(); 073 } else { 074 finalizeWriter(writer, cacheFlushId, status); 075 } 076 } 077 } 078 } finally { 079 scanner.close(); 080 } 081 LOG.info("Flushed memstore data size={} at sequenceid={} (bloomFilter={}), to={}", 082 StringUtils.byteDesc(snapshot.getDataSize()), cacheFlushId, writer.hasGeneralBloom(), 083 writer.getPath()); 084 result.add(writer.getPath()); 085 return result; 086 } 087}