001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.ArrayList; 024import java.util.List; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.KeyValueUtil; 031import org.apache.hadoop.hbase.monitoring.MonitoredTask; 032import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; 033import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 034import org.apache.yetus.audience.InterfaceAudience; 035 036/** 037 * Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one). 038 * Custom implementation can be provided. 039 */ 040@InterfaceAudience.Private 041abstract class StoreFlusher { 042 protected Configuration conf; 043 protected HStore store; 044 045 public StoreFlusher(Configuration conf, HStore store) { 046 this.conf = conf; 047 this.store = store; 048 } 049 050 /** 051 * Turns a snapshot of memstore into a set of store files. 052 * @param snapshot Memstore snapshot. 053 * @param cacheFlushSeqNum Log cache flush sequence number. 054 * @param status Task that represents the flush operation and may be updated with status. 055 * @param throughputController A controller to avoid flush too fast 056 * @return List of files written. Can be empty; must not be null. 057 */ 058 public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum, 059 MonitoredTask status, ThroughputController throughputController, 060 FlushLifeCycleTracker tracker) throws IOException; 061 062 protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum, 063 MonitoredTask status) throws IOException { 064 // Write out the log sequence number that corresponds to this output 065 // hfile. Also write current time in metadata as minFlushTime. 066 // The hfile is current up to and including cacheFlushSeqNum. 067 status.setStatus("Flushing " + store + ": appending metadata"); 068 writer.appendMetadata(cacheFlushSeqNum, false); 069 status.setStatus("Flushing " + store + ": closing flushed file"); 070 writer.close(); 071 } 072 073 074 /** 075 * Creates the scanner for flushing snapshot. Also calls coprocessors. 076 * @param snapshotScanners 077 * @param smallestReadPoint 078 * @return The scanner; null if coprocessor is canceling the flush. 079 */ 080 protected final InternalScanner createScanner(List<KeyValueScanner> snapshotScanners, 081 long smallestReadPoint, FlushLifeCycleTracker tracker) throws IOException { 082 ScanInfo scanInfo; 083 if (store.getCoprocessorHost() != null) { 084 scanInfo = store.getCoprocessorHost().preFlushScannerOpen(store, tracker); 085 } else { 086 scanInfo = store.getScanInfo(); 087 } 088 InternalScanner scanner = new StoreScanner(store, scanInfo, snapshotScanners, 089 ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP); 090 assert scanner != null; 091 if (store.getCoprocessorHost() != null) { 092 try { 093 return store.getCoprocessorHost().preFlush(store, scanner, tracker); 094 } catch (IOException ioe) { 095 scanner.close(); 096 throw ioe; 097 } 098 } 099 return scanner; 100 } 101 102 /** 103 * Performs memstore flush, writing data from scanner into sink. 104 * @param scanner Scanner to get data from. 105 * @param sink Sink to write data to. Could be StoreFile.Writer. 106 * @param smallestReadPoint Smallest read point used for the flush. 107 * @param throughputController A controller to avoid flush too fast 108 */ 109 protected void performFlush(InternalScanner scanner, CellSink sink, 110 long smallestReadPoint, ThroughputController throughputController) throws IOException { 111 int compactionKVMax = 112 conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); 113 114 ScannerContext scannerContext = 115 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); 116 117 List<Cell> kvs = new ArrayList<>(); 118 boolean hasMore; 119 String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush"); 120 // no control on system table (such as meta, namespace, etc) flush 121 boolean control = throughputController != null && !store.getRegionInfo().getTable().isSystemTable(); 122 if (control) { 123 throughputController.start(flushName); 124 } 125 try { 126 do { 127 hasMore = scanner.next(kvs, scannerContext); 128 if (!kvs.isEmpty()) { 129 for (Cell c : kvs) { 130 // If we know that this KV is going to be included always, then let us 131 // set its memstoreTS to 0. This will help us save space when writing to 132 // disk. 133 sink.append(c); 134 int len = KeyValueUtil.length(c); 135 if (control) { 136 throughputController.control(flushName, len); 137 } 138 } 139 kvs.clear(); 140 } 141 } while (hasMore); 142 } catch (InterruptedException e) { 143 throw new InterruptedIOException("Interrupted while control throughput of flushing " 144 + flushName); 145 } finally { 146 if (control) { 147 throughputController.finish(flushName); 148 } 149 } 150 } 151}