001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.Collections; 023import java.util.List; 024import java.util.Map; 025import java.util.Set; 026import java.util.TreeSet; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.atomic.AtomicLong; 029 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.hbase.util.CancelableProgressable; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 038 039/** 040 * The following class is an abstraction class to provide a common interface to support different 041 * ways of consuming recovered edits. 042 */ 043@InterfaceAudience.Private 044public abstract class OutputSink { 045 private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class); 046 047 protected WALSplitter.PipelineController controller; 048 protected EntryBuffers entryBuffers; 049 050 protected ConcurrentHashMap<String, WALSplitter.SinkWriter> writers = new ConcurrentHashMap<>(); 051 protected final ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum = 052 new ConcurrentHashMap<>(); 053 054 protected final List<WriterThread> writerThreads = Lists.newArrayList(); 055 056 /* Set of regions which we've decided should not output edits */ 057 protected final Set<byte[]> blacklistedRegions = 058 Collections.synchronizedSet(new TreeSet<>(Bytes.BYTES_COMPARATOR)); 059 060 protected boolean closeAndCleanCompleted = false; 061 062 protected boolean writersClosed = false; 063 064 protected final int numThreads; 065 066 protected CancelableProgressable reporter = null; 067 068 protected AtomicLong skippedEdits = new AtomicLong(); 069 070 protected List<Path> splits = null; 071 072 public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, 073 int numWriters) { 074 numThreads = numWriters; 075 this.controller = controller; 076 this.entryBuffers = entryBuffers; 077 } 078 079 void setReporter(CancelableProgressable reporter) { 080 this.reporter = reporter; 081 } 082 083 /** 084 * Start the threads that will pump data from the entryBuffers to the output files. 085 */ 086 public synchronized void startWriterThreads() { 087 for (int i = 0; i < numThreads; i++) { 088 WriterThread t = new WriterThread(controller, entryBuffers, this, i); 089 t.start(); 090 writerThreads.add(t); 091 } 092 } 093 094 /** 095 * Update region's maximum edit log SeqNum. 096 */ 097 void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) { 098 synchronized (regionMaximumEditLogSeqNum) { 099 String regionName = Bytes.toString(entry.getKey().getEncodedRegionName()); 100 Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName); 101 if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) { 102 regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId()); 103 } 104 } 105 } 106 107 /** 108 * @return the number of currently opened writers 109 */ 110 int getNumOpenWriters() { 111 return this.writers.size(); 112 } 113 114 long getSkippedEdits() { 115 return this.skippedEdits.get(); 116 } 117 118 /** 119 * Wait for writer threads to dump all info to the sink 120 * @return true when there is no error 121 */ 122 protected boolean finishWriting(boolean interrupt) throws IOException { 123 LOG.debug("Waiting for split writer threads to finish"); 124 boolean progress_failed = false; 125 for (WriterThread t : writerThreads) { 126 t.finish(); 127 } 128 if (interrupt) { 129 for (WriterThread t : writerThreads) { 130 t.interrupt(); // interrupt the writer threads. We are stopping now. 131 } 132 } 133 134 for (WriterThread t : writerThreads) { 135 if (!progress_failed && reporter != null && !reporter.progress()) { 136 progress_failed = true; 137 } 138 try { 139 t.join(); 140 } catch (InterruptedException ie) { 141 IOException iie = new InterruptedIOException(); 142 iie.initCause(ie); 143 throw iie; 144 } 145 } 146 controller.checkForErrors(); 147 LOG.info("{} split writers finished; closing.", this.writerThreads.size()); 148 return (!progress_failed); 149 } 150 151 public abstract List<Path> finishWritingAndClose() throws IOException; 152 153 /** 154 * @return a map from encoded region ID to the number of edits written out for that region. 155 */ 156 public abstract Map<byte[], Long> getOutputCounts(); 157 158 /** 159 * @return number of regions we've recovered 160 */ 161 public abstract int getNumberOfRecoveredRegions(); 162 163 /** 164 * @param buffer A WAL Edit Entry 165 */ 166 public abstract void append(WALSplitter.RegionEntryBuffer buffer) throws IOException; 167 168 /** 169 * WriterThread call this function to help flush internal remaining edits in buffer before close 170 * @return true when underlying sink has something to flush 171 */ 172 public boolean flush() throws IOException { 173 return false; 174 } 175 176 /** 177 * Some WALEdit's contain only KV's for account on what happened to a region. Not all sinks will 178 * want to get all of those edits. 179 * @return Return true if this sink wants to accept this region-level WALEdit. 180 */ 181 public abstract boolean keepRegionEvent(WAL.Entry entry); 182 183 public static class WriterThread extends Thread { 184 private volatile boolean shouldStop = false; 185 private WALSplitter.PipelineController controller; 186 private EntryBuffers entryBuffers; 187 private OutputSink outputSink = null; 188 189 WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, 190 OutputSink sink, int i) { 191 super(Thread.currentThread().getName() + "-Writer-" + i); 192 this.controller = controller; 193 this.entryBuffers = entryBuffers; 194 outputSink = sink; 195 } 196 197 @Override 198 public void run() { 199 try { 200 doRun(); 201 } catch (Throwable t) { 202 LOG.error("Exiting thread", t); 203 controller.writerThreadError(t); 204 } 205 } 206 207 private void doRun() throws IOException { 208 LOG.trace("Writer thread starting"); 209 while (true) { 210 WALSplitter.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); 211 if (buffer == null) { 212 // No data currently available, wait on some more to show up 213 synchronized (controller.dataAvailable) { 214 if (shouldStop && !this.outputSink.flush()) { 215 return; 216 } 217 try { 218 controller.dataAvailable.wait(500); 219 } catch (InterruptedException ie) { 220 if (!shouldStop) { 221 throw new RuntimeException(ie); 222 } 223 } 224 } 225 continue; 226 } 227 228 assert buffer != null; 229 try { 230 writeBuffer(buffer); 231 } finally { 232 entryBuffers.doneWriting(buffer); 233 } 234 } 235 } 236 237 private void writeBuffer(WALSplitter.RegionEntryBuffer buffer) throws IOException { 238 outputSink.append(buffer); 239 } 240 241 void setShouldStop(boolean shouldStop) { 242 this.shouldStop = shouldStop; 243 } 244 245 void finish() { 246 synchronized (controller.dataAvailable) { 247 shouldStop = true; 248 controller.dataAvailable.notifyAll(); 249 } 250 } 251 } 252}