001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.function.Consumer; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.ExtendedCell; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.PrivateConstants; 030import org.apache.hadoop.hbase.monitoring.MonitoredTask; 031import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 032import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 033import org.apache.yetus.audience.InterfaceAudience; 034 035/** 036 * Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one). 037 * Custom implementation can be provided. 038 */ 039@InterfaceAudience.Private 040abstract class StoreFlusher { 041 protected Configuration conf; 042 protected HStore store; 043 044 public StoreFlusher(Configuration conf, HStore store) { 045 this.conf = conf; 046 this.store = store; 047 } 048 049 /** 050 * Turns a snapshot of memstore into a set of store files. 051 * @param snapshot Memstore snapshot. 052 * @param cacheFlushSeqNum Log cache flush sequence number. 053 * @param status Task that represents the flush operation and may be updated with 054 * status. 055 * @param throughputController A controller to avoid flush too fast 056 * @return List of files written. Can be empty; must not be null. 057 */ 058 public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum, 059 MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker, 060 Consumer<Path> writerCreationTracker) throws IOException; 061 062 protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum, MonitoredTask status) 063 throws IOException { 064 // Write out the log sequence number that corresponds to this output 065 // hfile. Also write current time in metadata as minFlushTime. 066 // The hfile is current up to and including cacheFlushSeqNum. 067 status.setStatus("Flushing " + store + ": appending metadata"); 068 writer.appendMetadata(cacheFlushSeqNum, false); 069 status.setStatus("Flushing " + store + ": closing flushed file"); 070 writer.close(); 071 } 072 073 protected final StoreFileWriter createWriter(MemStoreSnapshot snapshot, boolean alwaysIncludesTag, 074 Consumer<Path> writerCreationTracker) throws IOException { 075 return store.getStoreEngine() 076 .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(snapshot.getCellsCount()) 077 .compression(store.getColumnFamilyDescriptor().getCompressionType()).isCompaction(false) 078 .includeMVCCReadpoint(true).includesTag(alwaysIncludesTag || snapshot.isTagsPresent()) 079 .shouldDropBehind(false).writerCreationTracker(writerCreationTracker)); 080 } 081 082 /** 083 * Creates the scanner for flushing snapshot. Also calls coprocessors. 084 * @return The scanner; null if coprocessor is canceling the flush. 085 */ 086 protected final InternalScanner createScanner(List<KeyValueScanner> snapshotScanners, 087 FlushLifeCycleTracker tracker) throws IOException { 088 ScanInfo scanInfo; 089 if (store.getCoprocessorHost() != null) { 090 scanInfo = store.getCoprocessorHost().preFlushScannerOpen(store, tracker); 091 } else { 092 scanInfo = store.getScanInfo(); 093 } 094 final long smallestReadPoint = store.getSmallestReadPoint(); 095 InternalScanner scanner = new StoreScanner(store, scanInfo, snapshotScanners, 096 ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, PrivateConstants.OLDEST_TIMESTAMP); 097 098 if (store.getCoprocessorHost() != null) { 099 try { 100 return store.getCoprocessorHost().preFlush(store, scanner, tracker); 101 } catch (IOException ioe) { 102 scanner.close(); 103 throw ioe; 104 } 105 } 106 return scanner; 107 } 108 109 /** 110 * Performs memstore flush, writing data from scanner into sink. 111 * @param scanner Scanner to get data from. 112 * @param sink Sink to write data to. Could be StoreFile.Writer. 113 * @param throughputController A controller to avoid flush too fast 114 */ 115 protected void performFlush(InternalScanner scanner, CellSink sink, 116 ThroughputController throughputController) throws IOException { 117 int compactionKVMax = 118 conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); 119 120 ScannerContext scannerContext = 121 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 122 123 List<ExtendedCell> kvs = new ArrayList<>(); 124 boolean hasMore; 125 String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush"); 126 // no control on system table (such as meta, namespace, etc) flush 127 boolean control = 128 throughputController != null && !store.getRegionInfo().getTable().isSystemTable(); 129 if (control) { 130 throughputController.start(flushName); 131 } 132 try { 133 do { 134 // InternalScanner is for CPs so we do not want to leak ExtendedCell to the interface, but 135 // all the server side implementation should only add ExtendedCell to the List, otherwise it 136 // will cause serious assertions in our code 137 hasMore = scanner.next(kvs, scannerContext); 138 if (!kvs.isEmpty()) { 139 for (ExtendedCell c : kvs) { 140 sink.append(c); 141 if (control) { 142 throughputController.control(flushName, c.getSerializedSize()); 143 } 144 } 145 kvs.clear(); 146 } 147 } while (hasMore); 148 } catch (InterruptedException e) { 149 throw new InterruptedIOException( 150 "Interrupted while control throughput of flushing " + flushName); 151 } finally { 152 if (control) { 153 throughputController.finish(flushName); 154 } 155 } 156 } 157}