001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024 025import org.apache.yetus.audience.InterfaceAudience; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.monitoring.MonitoredTask; 031import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 032import org.apache.hadoop.util.StringUtils; 033 034/** 035 * Default implementation of StoreFlusher. 036 */ 037@InterfaceAudience.Private 038public class DefaultStoreFlusher extends StoreFlusher { 039 private static final Logger LOG = LoggerFactory.getLogger(DefaultStoreFlusher.class); 040 private final Object flushLock = new Object(); 041 042 public DefaultStoreFlusher(Configuration conf, HStore store) { 043 super(conf, store); 044 } 045 046 @Override 047 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, 048 MonitoredTask status, ThroughputController throughputController, 049 FlushLifeCycleTracker tracker) throws IOException { 050 ArrayList<Path> result = new ArrayList<>(); 051 int cellsCount = snapshot.getCellsCount(); 052 if (cellsCount == 0) return result; // don't flush if there are no entries 053 054 // Use a store scanner to find which rows to flush. 055 long smallestReadPoint = store.getSmallestReadPoint(); 056 InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker); 057 StoreFileWriter writer; 058 try { 059 // TODO: We can fail in the below block before we complete adding this flush to 060 // list of store files. Add cleanup of anything put on filesystem if we fail. 061 synchronized (flushLock) { 062 status.setStatus("Flushing " + store + ": creating writer"); 063 // Write the map out to the disk 064 writer = store.createWriterInTmp(cellsCount, 065 store.getColumnFamilyDescriptor().getCompressionType(), false, true, 066 snapshot.isTagsPresent(), false); 067 IOException e = null; 068 try { 069 performFlush(scanner, writer, smallestReadPoint, throughputController); 070 } catch (IOException ioe) { 071 e = ioe; 072 // throw the exception out 073 throw ioe; 074 } finally { 075 if (e != null) { 076 writer.close(); 077 } else { 078 finalizeWriter(writer, cacheFlushId, status); 079 } 080 } 081 } 082 } finally { 083 scanner.close(); 084 } 085 LOG.info("Flushed memstore data size={} at sequenceid={} (bloomFilter={}), to={}", 086 StringUtils.byteDesc(snapshot.getDataSize()), cacheFlushId, writer.hasGeneralBloom(), 087 writer.getPath()); 088 result.add(writer.getPath()); 089 return result; 090 } 091}