001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.function.Consumer; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.monitoring.MonitoredTask; 030import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 031import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 032import org.apache.yetus.audience.InterfaceAudience; 033 034/** 035 * Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one). 036 * Custom implementation can be provided. 037 */ 038@InterfaceAudience.Private 039abstract class StoreFlusher { 040 protected Configuration conf; 041 protected HStore store; 042 043 public StoreFlusher(Configuration conf, HStore store) { 044 this.conf = conf; 045 this.store = store; 046 } 047 048 /** 049 * Turns a snapshot of memstore into a set of store files. 050 * @param snapshot Memstore snapshot. 051 * @param cacheFlushSeqNum Log cache flush sequence number. 052 * @param status Task that represents the flush operation and may be updated with 053 * status. 054 * @param throughputController A controller to avoid flush too fast 055 * @return List of files written. Can be empty; must not be null. 056 */ 057 public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum, 058 MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker, 059 Consumer<Path> writerCreationTracker) throws IOException; 060 061 protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum, MonitoredTask status) 062 throws IOException { 063 // Write out the log sequence number that corresponds to this output 064 // hfile. Also write current time in metadata as minFlushTime. 065 // The hfile is current up to and including cacheFlushSeqNum. 066 status.setStatus("Flushing " + store + ": appending metadata"); 067 writer.appendMetadata(cacheFlushSeqNum, false); 068 status.setStatus("Flushing " + store + ": closing flushed file"); 069 writer.close(); 070 } 071 072 protected final StoreFileWriter createWriter(MemStoreSnapshot snapshot, boolean alwaysIncludesTag, 073 Consumer<Path> writerCreationTracker) throws IOException { 074 return store.getStoreEngine() 075 .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(snapshot.getCellsCount()) 076 .compression(store.getColumnFamilyDescriptor().getCompressionType()).isCompaction(false) 077 .includeMVCCReadpoint(true).includesTag(alwaysIncludesTag || snapshot.isTagsPresent()) 078 .shouldDropBehind(false).writerCreationTracker(writerCreationTracker)); 079 } 080 081 /** 082 * Creates the scanner for flushing snapshot. Also calls coprocessors. 083 * @return The scanner; null if coprocessor is canceling the flush. 084 */ 085 protected final InternalScanner createScanner(List<KeyValueScanner> snapshotScanners, 086 FlushLifeCycleTracker tracker) throws IOException { 087 ScanInfo scanInfo; 088 if (store.getCoprocessorHost() != null) { 089 scanInfo = store.getCoprocessorHost().preFlushScannerOpen(store, tracker); 090 } else { 091 scanInfo = store.getScanInfo(); 092 } 093 final long smallestReadPoint = store.getSmallestReadPoint(); 094 InternalScanner scanner = new StoreScanner(store, scanInfo, snapshotScanners, 095 ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP); 096 097 if (store.getCoprocessorHost() != null) { 098 try { 099 return store.getCoprocessorHost().preFlush(store, scanner, tracker); 100 } catch (IOException ioe) { 101 scanner.close(); 102 throw ioe; 103 } 104 } 105 return scanner; 106 } 107 108 /** 109 * Performs memstore flush, writing data from scanner into sink. 110 * @param scanner Scanner to get data from. 111 * @param sink Sink to write data to. Could be StoreFile.Writer. 112 * @param throughputController A controller to avoid flush too fast 113 */ 114 protected void performFlush(InternalScanner scanner, CellSink sink, 115 ThroughputController throughputController) throws IOException { 116 int compactionKVMax = 117 conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); 118 119 ScannerContext scannerContext = 120 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 121 122 List<Cell> kvs = new ArrayList<>(); 123 boolean hasMore; 124 String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush"); 125 // no control on system table (such as meta, namespace, etc) flush 126 boolean control = 127 throughputController != null && !store.getRegionInfo().getTable().isSystemTable(); 128 if (control) { 129 throughputController.start(flushName); 130 } 131 try { 132 do { 133 hasMore = scanner.next(kvs, scannerContext); 134 if (!kvs.isEmpty()) { 135 for (Cell c : kvs) { 136 // If we know that this KV is going to be included always, then let us 137 // set its memstoreTS to 0. This will help us save space when writing to 138 // disk. 139 sink.append(c); 140 if (control) { 141 throughputController.control(flushName, c.getSerializedSize()); 142 } 143 } 144 kvs.clear(); 145 } 146 } while (hasMore); 147 } catch (InterruptedException e) { 148 throw new InterruptedIOException( 149 "Interrupted while control throughput of flushing " + flushName); 150 } finally { 151 if (control) { 152 throughputController.finish(flushName); 153 } 154 } 155 } 156}