001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.CompletionService; 026import java.util.concurrent.ExecutorCompletionService; 027import java.util.concurrent.ThreadPoolExecutor; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicLong; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.monitoring.MonitoredTask; 032import org.apache.hadoop.hbase.util.CancelableProgressable; 033import org.apache.hadoop.hbase.util.Threads; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 039import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 040 041/** 042 * The following class is an abstraction class to provide a common interface to support different 043 * ways of consuming recovered edits. 044 */ 045@InterfaceAudience.Private 046public abstract class OutputSink { 047 private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class); 048 049 private final WALSplitter.PipelineController controller; 050 protected final EntryBuffers entryBuffers; 051 052 private final List<WriterThread> writerThreads = Lists.newArrayList(); 053 054 protected final int numThreads; 055 056 protected CancelableProgressable reporter = null; 057 058 protected final AtomicLong totalSkippedEdits = new AtomicLong(); 059 060 /** 061 * List of all the files produced by this sink 062 */ 063 protected final List<Path> splits = new ArrayList<>(); 064 065 protected MonitoredTask status = null; 066 067 /** 068 * Used when close this output sink. 069 */ 070 protected final ThreadPoolExecutor closeThreadPool; 071 protected final CompletionService<Void> closeCompletionService; 072 073 public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, 074 int numWriters) { 075 this.numThreads = numWriters; 076 this.controller = controller; 077 this.entryBuffers = entryBuffers; 078 this.closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, 079 new ThreadFactoryBuilder().setNameFormat("split-log-closeStream-pool-%d").setDaemon(true) 080 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 081 this.closeCompletionService = new ExecutorCompletionService<>(closeThreadPool); 082 } 083 084 void setReporter(CancelableProgressable reporter) { 085 this.reporter = reporter; 086 } 087 088 void setStatus(MonitoredTask status) { 089 this.status = status; 090 } 091 092 /** 093 * Start the threads that will pump data from the entryBuffers to the output files. 094 */ 095 public void startWriterThreads() throws IOException { 096 for (int i = 0; i < numThreads; i++) { 097 WriterThread t = new WriterThread(controller, entryBuffers, this, i); 098 t.start(); 099 writerThreads.add(t); 100 } 101 } 102 103 public synchronized void restartWriterThreadsIfNeeded() { 104 for (int i = 0; i < writerThreads.size(); i++) { 105 WriterThread t = writerThreads.get(i); 106 if (!t.isAlive()) { 107 String threadName = t.getName(); 108 LOG.debug("Replacing dead thread: " + threadName); 109 WriterThread newThread = new WriterThread(controller, entryBuffers, this, threadName); 110 newThread.start(); 111 writerThreads.set(i, newThread); 112 } 113 } 114 } 115 116 /** 117 * Wait for writer threads to dump all info to the sink 118 * @return true when there is no error 119 */ 120 protected boolean finishWriterThreads(boolean interrupt) throws IOException { 121 LOG.debug("Waiting for split writer threads to finish"); 122 boolean progressFailed = false; 123 for (WriterThread t : writerThreads) { 124 t.finish(); 125 } 126 if (interrupt) { 127 for (WriterThread t : writerThreads) { 128 t.interrupt(); // interrupt the writer threads. We are stopping now. 129 } 130 } 131 132 for (WriterThread t : writerThreads) { 133 if (!progressFailed && reporter != null && !reporter.progress()) { 134 progressFailed = true; 135 } 136 try { 137 t.join(); 138 } catch (InterruptedException ie) { 139 IOException iie = new InterruptedIOException(); 140 iie.initCause(ie); 141 throw iie; 142 } 143 } 144 controller.checkForErrors(); 145 final String msg = this.writerThreads.size() + " split writer threads finished"; 146 LOG.info(msg); 147 updateStatusWithMsg(msg); 148 return (!progressFailed); 149 } 150 151 long getTotalSkippedEdits() { 152 return this.totalSkippedEdits.get(); 153 } 154 155 /** Returns the number of currently opened writers */ 156 protected abstract int getNumOpenWriters(); 157 158 /** 159 * @param buffer A buffer of some number of edits for a given region. 160 * @throws IOException For any IO errors 161 */ 162 protected abstract void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException; 163 164 protected abstract List<Path> close() throws IOException; 165 166 /** Returns a map from encoded region ID to the number of edits written out for that region. */ 167 protected abstract Map<String, Long> getOutputCounts(); 168 169 /** Returns number of regions we've recovered */ 170 protected abstract int getNumberOfRecoveredRegions(); 171 172 /** 173 * Some WALEdit's contain only KV's for account on what happened to a region. Not all sinks will 174 * want to get all of those edits. 175 * @return Return true if this sink wants to accept this region-level WALEdit. 176 */ 177 protected abstract boolean keepRegionEvent(WAL.Entry entry); 178 179 /** 180 * Set status message in {@link MonitoredTask} instance that is set in this OutputSink 181 * @param msg message to update the status with 182 */ 183 protected final void updateStatusWithMsg(String msg) { 184 if (status != null) { 185 status.setStatus(msg); 186 } 187 } 188 189 public static class WriterThread extends Thread { 190 private volatile boolean shouldStop = false; 191 private WALSplitter.PipelineController controller; 192 private EntryBuffers entryBuffers; 193 private OutputSink outputSink = null; 194 195 WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, 196 OutputSink sink, int i) { 197 this(controller, entryBuffers, sink, Thread.currentThread().getName() + "-Writer-" + i); 198 } 199 200 WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, 201 OutputSink sink, String threadName) { 202 super(threadName); 203 this.controller = controller; 204 this.entryBuffers = entryBuffers; 205 outputSink = sink; 206 } 207 208 @Override 209 public void run() { 210 try { 211 doRun(); 212 } catch (Throwable t) { 213 LOG.error("Exiting thread", t); 214 controller.writerThreadError(t); 215 } 216 } 217 218 private void doRun() throws IOException { 219 LOG.trace("Writer thread starting"); 220 while (true) { 221 EntryBuffers.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); 222 if (buffer == null) { 223 // No data currently available, wait on some more to show up 224 synchronized (controller.dataAvailable) { 225 if (shouldStop) { 226 return; 227 } 228 try { 229 controller.dataAvailable.wait(500); 230 } catch (InterruptedException ie) { 231 if (!shouldStop) { 232 throw new RuntimeException(ie); 233 } 234 } 235 } 236 continue; 237 } 238 239 assert buffer != null; 240 try { 241 writeBuffer(buffer); 242 } finally { 243 entryBuffers.doneWriting(buffer); 244 } 245 } 246 } 247 248 private void writeBuffer(EntryBuffers.RegionEntryBuffer buffer) throws IOException { 249 outputSink.append(buffer); 250 } 251 252 private void finish() { 253 synchronized (controller.dataAvailable) { 254 shouldStop = true; 255 controller.dataAvailable.notifyAll(); 256 } 257 } 258 } 259}