001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.List; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.monitoring.MonitoredTask; 031import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 032import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 033import org.apache.yetus.audience.InterfaceAudience; 034 035/** 036 * Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one). 037 * Custom implementation can be provided. 038 */ 039@InterfaceAudience.Private 040abstract class StoreFlusher { 041 protected Configuration conf; 042 protected HStore store; 043 044 public StoreFlusher(Configuration conf, HStore store) { 045 this.conf = conf; 046 this.store = store; 047 } 048 049 /** 050 * Turns a snapshot of memstore into a set of store files. 051 * @param snapshot Memstore snapshot. 052 * @param cacheFlushSeqNum Log cache flush sequence number. 053 * @param status Task that represents the flush operation and may be updated with status. 054 * @param throughputController A controller to avoid flush too fast 055 * @return List of files written. Can be empty; must not be null. 056 */ 057 public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum, 058 MonitoredTask status, ThroughputController throughputController, 059 FlushLifeCycleTracker tracker) throws IOException; 060 061 protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum, 062 MonitoredTask status) throws IOException { 063 // Write out the log sequence number that corresponds to this output 064 // hfile. Also write current time in metadata as minFlushTime. 065 // The hfile is current up to and including cacheFlushSeqNum. 066 status.setStatus("Flushing " + store + ": appending metadata"); 067 writer.appendMetadata(cacheFlushSeqNum, false); 068 status.setStatus("Flushing " + store + ": closing flushed file"); 069 writer.close(); 070 } 071 072 073 /** 074 * Creates the scanner for flushing snapshot. Also calls coprocessors. 075 * @param snapshotScanners 076 * @param smallestReadPoint 077 * @return The scanner; null if coprocessor is canceling the flush. 078 */ 079 protected final InternalScanner createScanner(List<KeyValueScanner> snapshotScanners, 080 long smallestReadPoint, FlushLifeCycleTracker tracker) throws IOException { 081 ScanInfo scanInfo; 082 if (store.getCoprocessorHost() != null) { 083 scanInfo = store.getCoprocessorHost().preFlushScannerOpen(store, tracker); 084 } else { 085 scanInfo = store.getScanInfo(); 086 } 087 InternalScanner scanner = new StoreScanner(store, scanInfo, snapshotScanners, 088 ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP); 089 assert scanner != null; 090 if (store.getCoprocessorHost() != null) { 091 try { 092 return store.getCoprocessorHost().preFlush(store, scanner, tracker); 093 } catch (IOException ioe) { 094 scanner.close(); 095 throw ioe; 096 } 097 } 098 return scanner; 099 } 100 101 /** 102 * Performs memstore flush, writing data from scanner into sink. 103 * @param scanner Scanner to get data from. 104 * @param sink Sink to write data to. Could be StoreFile.Writer. 105 * @param smallestReadPoint Smallest read point used for the flush. 106 * @param throughputController A controller to avoid flush too fast 107 */ 108 protected void performFlush(InternalScanner scanner, CellSink sink, 109 long smallestReadPoint, ThroughputController throughputController) throws IOException { 110 int compactionKVMax = 111 conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); 112 113 ScannerContext scannerContext = 114 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 115 116 List<Cell> kvs = new ArrayList<>(); 117 boolean hasMore; 118 String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush"); 119 // no control on system table (such as meta, namespace, etc) flush 120 boolean control = 121 throughputController != null && !store.getRegionInfo().getTable().isSystemTable(); 122 if (control) { 123 throughputController.start(flushName); 124 } 125 try { 126 do { 127 hasMore = scanner.next(kvs, scannerContext); 128 if (!kvs.isEmpty()) { 129 for (Cell c : kvs) { 130 // If we know that this KV is going to be included always, then let us 131 // set its memstoreTS to 0. This will help us save space when writing to 132 // disk. 133 sink.append(c); 134 if (control) { 135 throughputController.control(flushName, c.getSerializedSize()); 136 } 137 } 138 kvs.clear(); 139 } 140 } while (hasMore); 141 } catch (InterruptedException e) { 142 throw new InterruptedIOException( 143 "Interrupted while control throughput of flushing " + flushName); 144 } finally { 145 if (control) { 146 throughputController.finish(flushName); 147 } 148 } 149 } 150}