001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024 025import org.apache.yetus.audience.InterfaceAudience; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.monitoring.MonitoredTask; 031import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 032import org.apache.hadoop.util.StringUtils; 033 034/** 035 * Default implementation of StoreFlusher. 036 */ 037@InterfaceAudience.Private 038public class DefaultStoreFlusher extends StoreFlusher { 039 private static final Logger LOG = LoggerFactory.getLogger(DefaultStoreFlusher.class); 040 private final Object flushLock = new Object(); 041 042 public DefaultStoreFlusher(Configuration conf, HStore store) { 043 super(conf, store); 044 } 045 046 @Override 047 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, 048 MonitoredTask status, ThroughputController throughputController, 049 FlushLifeCycleTracker tracker) throws IOException { 050 ArrayList<Path> result = new ArrayList<>(); 051 int cellsCount = snapshot.getCellsCount(); 052 if (cellsCount == 0) return result; // don't flush if there are no entries 053 054 // Use a store scanner to find which rows to flush. 055 InternalScanner scanner = createScanner(snapshot.getScanners(), tracker); 056 StoreFileWriter writer; 057 try { 058 // TODO: We can fail in the below block before we complete adding this flush to 059 // list of store files. Add cleanup of anything put on filesystem if we fail. 060 synchronized (flushLock) { 061 status.setStatus("Flushing " + store + ": creating writer"); 062 // Write the map out to the disk 063 writer = store.createWriterInTmp(cellsCount, 064 store.getColumnFamilyDescriptor().getCompressionType(), false, true, 065 snapshot.isTagsPresent(), false); 066 IOException e = null; 067 try { 068 performFlush(scanner, writer, throughputController); 069 } catch (IOException ioe) { 070 e = ioe; 071 // throw the exception out 072 throw ioe; 073 } finally { 074 if (e != null) { 075 writer.close(); 076 } else { 077 finalizeWriter(writer, cacheFlushId, status); 078 } 079 } 080 } 081 } finally { 082 scanner.close(); 083 } 084 LOG.info("Flushed memstore data size={} at sequenceid={} (bloomFilter={}), to={}", 085 StringUtils.byteDesc(snapshot.getDataSize()), cacheFlushId, writer.hasGeneralBloom(), 086 writer.getPath()); 087 result.add(writer.getPath()); 088 return result; 089 } 090}