001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.List; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.monitoring.MonitoredTask; 031import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 032import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 033import org.apache.yetus.audience.InterfaceAudience; 034 035/** 036 * Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one). 037 * Custom implementation can be provided. 038 */ 039@InterfaceAudience.Private 040abstract class StoreFlusher { 041 protected Configuration conf; 042 protected HStore store; 043 044 public StoreFlusher(Configuration conf, HStore store) { 045 this.conf = conf; 046 this.store = store; 047 } 048 049 /** 050 * Turns a snapshot of memstore into a set of store files. 051 * @param snapshot Memstore snapshot. 052 * @param cacheFlushSeqNum Log cache flush sequence number. 053 * @param status Task that represents the flush operation and may be updated with status. 054 * @param throughputController A controller to avoid flush too fast 055 * @return List of files written. Can be empty; must not be null. 056 */ 057 public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum, 058 MonitoredTask status, ThroughputController throughputController, 059 FlushLifeCycleTracker tracker) throws IOException; 060 061 protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum, 062 MonitoredTask status) throws IOException { 063 // Write out the log sequence number that corresponds to this output 064 // hfile. Also write current time in metadata as minFlushTime. 065 // The hfile is current up to and including cacheFlushSeqNum. 066 status.setStatus("Flushing " + store + ": appending metadata"); 067 writer.appendMetadata(cacheFlushSeqNum, false); 068 status.setStatus("Flushing " + store + ": closing flushed file"); 069 writer.close(); 070 } 071 072 073 /** 074 * Creates the scanner for flushing snapshot. Also calls coprocessors. 075 * @param snapshotScanners 076 * @return The scanner; null if coprocessor is canceling the flush. 077 */ 078 protected final InternalScanner createScanner(List<KeyValueScanner> snapshotScanners, 079 FlushLifeCycleTracker tracker) throws IOException { 080 ScanInfo scanInfo; 081 if (store.getCoprocessorHost() != null) { 082 scanInfo = store.getCoprocessorHost().preFlushScannerOpen(store, tracker); 083 } else { 084 scanInfo = store.getScanInfo(); 085 } 086 final long smallestReadPoint = store.getSmallestReadPoint(); 087 InternalScanner scanner = new StoreScanner(store, scanInfo, snapshotScanners, 088 ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP); 089 090 if (store.getCoprocessorHost() != null) { 091 try { 092 return store.getCoprocessorHost().preFlush(store, scanner, tracker); 093 } catch (IOException ioe) { 094 scanner.close(); 095 throw ioe; 096 } 097 } 098 return scanner; 099 } 100 101 /** 102 * Performs memstore flush, writing data from scanner into sink. 103 * @param scanner Scanner to get data from. 104 * @param sink Sink to write data to. Could be StoreFile.Writer. 105 * @param throughputController A controller to avoid flush too fast 106 */ 107 protected void performFlush(InternalScanner scanner, CellSink sink, 108 ThroughputController throughputController) throws IOException { 109 int compactionKVMax = 110 conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); 111 112 ScannerContext scannerContext = 113 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 114 115 List<Cell> kvs = new ArrayList<>(); 116 boolean hasMore; 117 String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush"); 118 // no control on system table (such as meta, namespace, etc) flush 119 boolean control = 120 throughputController != null && !store.getRegionInfo().getTable().isSystemTable(); 121 if (control) { 122 throughputController.start(flushName); 123 } 124 try { 125 do { 126 hasMore = scanner.next(kvs, scannerContext); 127 if (!kvs.isEmpty()) { 128 for (Cell c : kvs) { 129 // If we know that this KV is going to be included always, then let us 130 // set its memstoreTS to 0. This will help us save space when writing to 131 // disk. 132 sink.append(c); 133 if (control) { 134 throughputController.control(flushName, c.getSerializedSize()); 135 } 136 } 137 kvs.clear(); 138 } 139 } while (hasMore); 140 } catch (InterruptedException e) { 141 throw new InterruptedIOException( 142 "Interrupted while control throughput of flushing " + flushName); 143 } finally { 144 if (control) { 145 throughputController.finish(flushName); 146 } 147 } 148 } 149}