001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.CompletionService; 026import java.util.concurrent.ExecutorCompletionService; 027import java.util.concurrent.ThreadPoolExecutor; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicLong; 030 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.monitoring.MonitoredTask; 033import org.apache.hadoop.hbase.util.CancelableProgressable; 034import org.apache.hadoop.hbase.util.Threads; 035import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 041 042/** 043 * The following class is an abstraction class to provide a common interface to support different 044 * ways of consuming recovered edits. 045 */ 046@InterfaceAudience.Private 047public abstract class OutputSink { 048 private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class); 049 050 private final WALSplitter.PipelineController controller; 051 protected final EntryBuffers entryBuffers; 052 053 private final List<WriterThread> writerThreads = Lists.newArrayList(); 054 055 protected final int numThreads; 056 057 protected CancelableProgressable reporter = null; 058 059 protected final AtomicLong totalSkippedEdits = new AtomicLong(); 060 061 /** 062 * List of all the files produced by this sink 063 */ 064 protected final List<Path> splits = new ArrayList<>(); 065 066 protected MonitoredTask status = null; 067 068 /** 069 * Used when close this output sink. 070 */ 071 protected final ThreadPoolExecutor closeThreadPool; 072 protected final CompletionService<Void> closeCompletionService; 073 074 public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, 075 int numWriters) { 076 this.numThreads = numWriters; 077 this.controller = controller; 078 this.entryBuffers = entryBuffers; 079 this.closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, 080 new ThreadFactoryBuilder().setNameFormat("split-log-closeStream-pool-%d").setDaemon(true) 081 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 082 this.closeCompletionService = new ExecutorCompletionService<>(closeThreadPool); 083 } 084 085 void setReporter(CancelableProgressable reporter) { 086 this.reporter = reporter; 087 } 088 089 void setStatus(MonitoredTask status) { 090 this.status = status; 091 } 092 093 /** 094 * Start the threads that will pump data from the entryBuffers to the output files. 095 */ 096 public void startWriterThreads() throws IOException { 097 for (int i = 0; i < numThreads; i++) { 098 WriterThread t = new WriterThread(controller, entryBuffers, this, i); 099 t.start(); 100 writerThreads.add(t); 101 } 102 } 103 104 public synchronized void restartWriterThreadsIfNeeded() { 105 for(int i = 0; i< writerThreads.size(); i++){ 106 WriterThread t = writerThreads.get(i); 107 if (!t.isAlive()){ 108 String threadName = t.getName(); 109 LOG.debug("Replacing dead thread: " + threadName); 110 WriterThread newThread = new WriterThread(controller, entryBuffers, this, threadName); 111 newThread.start(); 112 writerThreads.set(i, newThread); 113 } 114 } 115 } 116 117 /** 118 * Wait for writer threads to dump all info to the sink 119 * 120 * @return true when there is no error 121 */ 122 protected boolean finishWriterThreads(boolean interrupt) throws IOException { 123 LOG.debug("Waiting for split writer threads to finish"); 124 boolean progressFailed = false; 125 for (WriterThread t : writerThreads) { 126 t.finish(); 127 } 128 if (interrupt) { 129 for (WriterThread t : writerThreads) { 130 t.interrupt(); // interrupt the writer threads. We are stopping now. 131 } 132 } 133 134 for (WriterThread t : writerThreads) { 135 if (!progressFailed && reporter != null && !reporter.progress()) { 136 progressFailed = true; 137 } 138 try { 139 t.join(); 140 } catch (InterruptedException ie) { 141 IOException iie = new InterruptedIOException(); 142 iie.initCause(ie); 143 throw iie; 144 } 145 } 146 controller.checkForErrors(); 147 final String msg = this.writerThreads.size() + " split writer threads finished"; 148 LOG.info(msg); 149 updateStatusWithMsg(msg); 150 return (!progressFailed); 151 } 152 153 long getTotalSkippedEdits() { 154 return this.totalSkippedEdits.get(); 155 } 156 157 /** 158 * @return the number of currently opened writers 159 */ 160 protected abstract int getNumOpenWriters(); 161 162 /** 163 * @param buffer A buffer of some number of edits for a given region. 164 * @throws IOException For any IO errors 165 */ 166 protected abstract void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException; 167 168 protected abstract List<Path> close() throws IOException; 169 170 /** 171 * @return a map from encoded region ID to the number of edits written out for that region. 172 */ 173 protected abstract Map<String, Long> getOutputCounts(); 174 175 /** 176 * @return number of regions we've recovered 177 */ 178 protected abstract int getNumberOfRecoveredRegions(); 179 180 /** 181 * Some WALEdit's contain only KV's for account on what happened to a region. Not all sinks will 182 * want to get all of those edits. 183 * @return Return true if this sink wants to accept this region-level WALEdit. 184 */ 185 protected abstract boolean keepRegionEvent(WAL.Entry entry); 186 187 /** 188 * Set status message in {@link MonitoredTask} instance that is set in this OutputSink 189 * @param msg message to update the status with 190 */ 191 protected final void updateStatusWithMsg(String msg) { 192 if (status != null) { 193 status.setStatus(msg); 194 } 195 } 196 197 public static class WriterThread extends Thread { 198 private volatile boolean shouldStop = false; 199 private WALSplitter.PipelineController controller; 200 private EntryBuffers entryBuffers; 201 private OutputSink outputSink = null; 202 203 WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, 204 OutputSink sink, int i) { 205 this(controller, entryBuffers, sink, Thread.currentThread().getName() + "-Writer-" + i); 206 } 207 208 WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, 209 OutputSink sink, String threadName) { 210 super(threadName); 211 this.controller = controller; 212 this.entryBuffers = entryBuffers; 213 outputSink = sink; 214 } 215 216 @Override 217 public void run() { 218 try { 219 doRun(); 220 } catch (Throwable t) { 221 LOG.error("Exiting thread", t); 222 controller.writerThreadError(t); 223 } 224 } 225 226 private void doRun() throws IOException { 227 LOG.trace("Writer thread starting"); 228 while (true) { 229 EntryBuffers.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); 230 if (buffer == null) { 231 // No data currently available, wait on some more to show up 232 synchronized (controller.dataAvailable) { 233 if (shouldStop) { 234 return; 235 } 236 try { 237 controller.dataAvailable.wait(500); 238 } catch (InterruptedException ie) { 239 if (!shouldStop) { 240 throw new RuntimeException(ie); 241 } 242 } 243 } 244 continue; 245 } 246 247 assert buffer != null; 248 try { 249 writeBuffer(buffer); 250 } finally { 251 entryBuffers.doneWriting(buffer); 252 } 253 } 254 } 255 256 private void writeBuffer(EntryBuffers.RegionEntryBuffer buffer) throws IOException { 257 outputSink.append(buffer); 258 } 259 260 private void finish() { 261 synchronized (controller.dataAvailable) { 262 shouldStop = true; 263 controller.dataAvailable.notifyAll(); 264 } 265 } 266 } 267}