001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.CompletionService; 026import java.util.concurrent.ExecutorCompletionService; 027import java.util.concurrent.ThreadPoolExecutor; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicLong; 030 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.monitoring.MonitoredTask; 033import org.apache.hadoop.hbase.util.CancelableProgressable; 034import org.apache.hadoop.hbase.util.Threads; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 040 041/** 042 * The following class is an abstraction class to provide a common interface to support different 043 * ways of consuming recovered edits. 044 */ 045@InterfaceAudience.Private 046public abstract class OutputSink { 047 private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class); 048 049 private final WALSplitter.PipelineController controller; 050 protected final EntryBuffers entryBuffers; 051 052 private final List<WriterThread> writerThreads = Lists.newArrayList(); 053 054 protected final int numThreads; 055 056 protected CancelableProgressable reporter = null; 057 058 protected final AtomicLong totalSkippedEdits = new AtomicLong(); 059 060 /** 061 * List of all the files produced by this sink 062 */ 063 protected final List<Path> splits = new ArrayList<>(); 064 065 protected MonitoredTask status = null; 066 067 /** 068 * Used when close this output sink. 069 */ 070 protected final ThreadPoolExecutor closeThreadPool; 071 protected final CompletionService<Void> closeCompletionService; 072 073 public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, 074 int numWriters) { 075 this.numThreads = numWriters; 076 this.controller = controller; 077 this.entryBuffers = entryBuffers; 078 this.closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, 079 Threads.newDaemonThreadFactory("split-log-closeStream-")); 080 this.closeCompletionService = new ExecutorCompletionService<>(closeThreadPool); 081 } 082 083 void setReporter(CancelableProgressable reporter) { 084 this.reporter = reporter; 085 } 086 087 void setStatus(MonitoredTask status) { 088 this.status = status; 089 } 090 091 /** 092 * Start the threads that will pump data from the entryBuffers to the output files. 093 */ 094 public void startWriterThreads() throws IOException { 095 for (int i = 0; i < numThreads; i++) { 096 WriterThread t = new WriterThread(controller, entryBuffers, this, i); 097 t.start(); 098 writerThreads.add(t); 099 } 100 } 101 102 public synchronized void restartWriterThreadsIfNeeded() { 103 for(int i = 0; i< writerThreads.size(); i++){ 104 WriterThread t = writerThreads.get(i); 105 if (!t.isAlive()){ 106 String threadName = t.getName(); 107 LOG.debug("Replacing dead thread: " + threadName); 108 WriterThread newThread = new WriterThread(controller, entryBuffers, this, threadName); 109 newThread.start(); 110 writerThreads.set(i, newThread); 111 } 112 } 113 } 114 115 /** 116 * Wait for writer threads to dump all info to the sink 117 * 118 * @return true when there is no error 119 */ 120 protected boolean finishWriterThreads(boolean interrupt) throws IOException { 121 LOG.debug("Waiting for split writer threads to finish"); 122 boolean progressFailed = false; 123 for (WriterThread t : writerThreads) { 124 t.finish(); 125 } 126 if (interrupt) { 127 for (WriterThread t : writerThreads) { 128 t.interrupt(); // interrupt the writer threads. We are stopping now. 129 } 130 } 131 132 for (WriterThread t : writerThreads) { 133 if (!progressFailed && reporter != null && !reporter.progress()) { 134 progressFailed = true; 135 } 136 try { 137 t.join(); 138 } catch (InterruptedException ie) { 139 IOException iie = new InterruptedIOException(); 140 iie.initCause(ie); 141 throw iie; 142 } 143 } 144 controller.checkForErrors(); 145 final String msg = this.writerThreads.size() + " split writer threads finished"; 146 LOG.info(msg); 147 updateStatusWithMsg(msg); 148 return (!progressFailed); 149 } 150 151 long getTotalSkippedEdits() { 152 return this.totalSkippedEdits.get(); 153 } 154 155 /** 156 * @return the number of currently opened writers 157 */ 158 protected abstract int getNumOpenWriters(); 159 160 /** 161 * @param buffer A buffer of some number of edits for a given region. 162 * @throws IOException For any IO errors 163 */ 164 protected abstract void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException; 165 166 protected abstract List<Path> close() throws IOException; 167 168 /** 169 * @return a map from encoded region ID to the number of edits written out for that region. 170 */ 171 protected abstract Map<String, Long> getOutputCounts(); 172 173 /** 174 * @return number of regions we've recovered 175 */ 176 protected abstract int getNumberOfRecoveredRegions(); 177 178 /** 179 * Some WALEdit's contain only KV's for account on what happened to a region. Not all sinks will 180 * want to get all of those edits. 181 * @return Return true if this sink wants to accept this region-level WALEdit. 182 */ 183 protected abstract boolean keepRegionEvent(WAL.Entry entry); 184 185 /** 186 * Set status message in {@link MonitoredTask} instance that is set in this OutputSink 187 * @param msg message to update the status with 188 */ 189 protected final void updateStatusWithMsg(String msg) { 190 if (status != null) { 191 status.setStatus(msg); 192 } 193 } 194 195 public static class WriterThread extends Thread { 196 private volatile boolean shouldStop = false; 197 private WALSplitter.PipelineController controller; 198 private EntryBuffers entryBuffers; 199 private OutputSink outputSink = null; 200 201 WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, 202 OutputSink sink, int i) { 203 this(controller, entryBuffers, sink, Thread.currentThread().getName() + "-Writer-" + i); 204 } 205 206 WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, 207 OutputSink sink, String threadName) { 208 super(threadName); 209 this.controller = controller; 210 this.entryBuffers = entryBuffers; 211 outputSink = sink; 212 } 213 214 @Override 215 public void run() { 216 try { 217 doRun(); 218 } catch (Throwable t) { 219 LOG.error("Exiting thread", t); 220 controller.writerThreadError(t); 221 } 222 } 223 224 private void doRun() throws IOException { 225 LOG.trace("Writer thread starting"); 226 while (true) { 227 EntryBuffers.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); 228 if (buffer == null) { 229 // No data currently available, wait on some more to show up 230 synchronized (controller.dataAvailable) { 231 if (shouldStop) { 232 return; 233 } 234 try { 235 controller.dataAvailable.wait(500); 236 } catch (InterruptedException ie) { 237 if (!shouldStop) { 238 throw new RuntimeException(ie); 239 } 240 } 241 } 242 continue; 243 } 244 245 assert buffer != null; 246 try { 247 writeBuffer(buffer); 248 } finally { 249 entryBuffers.doneWriting(buffer); 250 } 251 } 252 } 253 254 private void writeBuffer(EntryBuffers.RegionEntryBuffer buffer) throws IOException { 255 outputSink.append(buffer); 256 } 257 258 private void finish() { 259 synchronized (controller.dataAvailable) { 260 shouldStop = true; 261 controller.dataAvailable.notifyAll(); 262 } 263 } 264 } 265}