001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.CompletionService; 027import java.util.concurrent.ExecutorCompletionService; 028import java.util.concurrent.ThreadPoolExecutor; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicLong; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.monitoring.MonitoredTask; 033import org.apache.hadoop.hbase.util.CancelableProgressable; 034import org.apache.hadoop.hbase.util.Threads; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 040import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 041 042/** 043 * The following class is an abstraction class to provide a common interface to support different 044 * ways of consuming recovered edits. 045 */ 046@InterfaceAudience.Private 047abstract class OutputSink { 048 private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class); 049 050 private final WALSplitter.PipelineController controller; 051 protected final EntryBuffers entryBuffers; 052 053 private final List<WriterThread> writerThreads = Lists.newArrayList(); 054 055 protected final int numThreads; 056 057 protected CancelableProgressable reporter = null; 058 059 protected final AtomicLong totalSkippedEdits = new AtomicLong(); 060 061 /** 062 * List of all the files produced by this sink, 063 * <p> 064 * Must be a synchronized list to avoid concurrency issues. CopyOnWriteArrayList is not a good 065 * choice because all we do is add to the list and then return the result. 066 */ 067 protected final List<Path> splits = Collections.synchronizedList(new ArrayList<>()); 068 069 protected MonitoredTask status = null; 070 071 /** 072 * Used when close this output sink. 073 */ 074 protected final ThreadPoolExecutor closeThreadPool; 075 protected final CompletionService<Void> closeCompletionService; 076 077 public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, 078 int numWriters) { 079 this.numThreads = numWriters; 080 this.controller = controller; 081 this.entryBuffers = entryBuffers; 082 this.closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, 083 new ThreadFactoryBuilder().setNameFormat("split-log-closeStream-pool-%d").setDaemon(true) 084 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 085 this.closeCompletionService = new ExecutorCompletionService<>(closeThreadPool); 086 } 087 088 void setReporter(CancelableProgressable reporter) { 089 this.reporter = reporter; 090 } 091 092 void setStatus(MonitoredTask status) { 093 this.status = status; 094 } 095 096 /** 097 * Start the threads that will pump data from the entryBuffers to the output files. 098 */ 099 void startWriterThreads() throws IOException { 100 for (int i = 0; i < numThreads; i++) { 101 WriterThread t = new WriterThread(controller, entryBuffers, this, i); 102 t.start(); 103 writerThreads.add(t); 104 } 105 } 106 107 public synchronized void restartWriterThreadsIfNeeded() { 108 for (int i = 0; i < writerThreads.size(); i++) { 109 WriterThread t = writerThreads.get(i); 110 if (!t.isAlive()) { 111 String threadName = t.getName(); 112 LOG.debug("Replacing dead thread: " + threadName); 113 WriterThread newThread = new WriterThread(controller, entryBuffers, this, threadName); 114 newThread.start(); 115 writerThreads.set(i, newThread); 116 } 117 } 118 } 119 120 /** 121 * Wait for writer threads to dump all info to the sink 122 * @return true when there is no error 123 */ 124 boolean finishWriterThreads() throws IOException { 125 LOG.debug("Waiting for split writer threads to finish"); 126 boolean progressFailed = false; 127 for (WriterThread t : writerThreads) { 128 t.finish(); 129 } 130 131 for (WriterThread t : writerThreads) { 132 if (!progressFailed && reporter != null && !reporter.progress()) { 133 progressFailed = true; 134 } 135 try { 136 t.join(); 137 } catch (InterruptedException ie) { 138 IOException iie = new InterruptedIOException(); 139 iie.initCause(ie); 140 throw iie; 141 } 142 } 143 controller.checkForErrors(); 144 final String msg = this.writerThreads.size() + " split writer threads finished"; 145 LOG.info(msg); 146 updateStatusWithMsg(msg); 147 return (!progressFailed); 148 } 149 150 long getTotalSkippedEdits() { 151 return this.totalSkippedEdits.get(); 152 } 153 154 /** Returns the number of currently opened writers */ 155 abstract int getNumOpenWriters(); 156 157 /** 158 * @param buffer A buffer of some number of edits for a given region. 159 * @throws IOException For any IO errors 160 */ 161 abstract void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException; 162 163 abstract List<Path> close() throws IOException; 164 165 /** Returns a map from encoded region ID to the number of edits written out for that region. */ 166 abstract Map<String, Long> getOutputCounts(); 167 168 /** Returns number of regions we've recovered */ 169 abstract int getNumberOfRecoveredRegions(); 170 171 /** 172 * Some WALEdit's contain only KV's for account on what happened to a region. Not all sinks will 173 * want to get all of those edits. 174 * @return Return true if this sink wants to accept this region-level WALEdit. 175 */ 176 abstract boolean keepRegionEvent(WAL.Entry entry); 177 178 /** 179 * Set status message in {@link MonitoredTask} instance that is set in this OutputSink 180 * @param msg message to update the status with 181 */ 182 protected final void updateStatusWithMsg(String msg) { 183 if (status != null) { 184 status.setStatus(msg); 185 } 186 } 187 188 public static class WriterThread extends Thread { 189 private volatile boolean shouldStop = false; 190 private WALSplitter.PipelineController controller; 191 private EntryBuffers entryBuffers; 192 private OutputSink outputSink = null; 193 194 WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, 195 OutputSink sink, int i) { 196 this(controller, entryBuffers, sink, Thread.currentThread().getName() + "-Writer-" + i); 197 } 198 199 WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, 200 OutputSink sink, String threadName) { 201 super(threadName); 202 this.controller = controller; 203 this.entryBuffers = entryBuffers; 204 outputSink = sink; 205 } 206 207 @Override 208 public void run() { 209 try { 210 doRun(); 211 } catch (Throwable t) { 212 LOG.error("Exiting thread", t); 213 controller.writerThreadError(t); 214 } 215 } 216 217 private void doRun() throws IOException { 218 LOG.trace("Writer thread starting"); 219 while (true) { 220 EntryBuffers.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); 221 if (buffer == null) { 222 // No data currently available, wait on some more to show up 223 synchronized (controller.dataAvailable) { 224 if (shouldStop) { 225 return; 226 } 227 try { 228 controller.dataAvailable.wait(500); 229 } catch (InterruptedException ie) { 230 if (!shouldStop) { 231 throw new RuntimeException(ie); 232 } 233 } 234 } 235 continue; 236 } 237 238 assert buffer != null; 239 try { 240 writeBuffer(buffer); 241 } finally { 242 entryBuffers.doneWriting(buffer); 243 } 244 } 245 } 246 247 private void writeBuffer(EntryBuffers.RegionEntryBuffer buffer) throws IOException { 248 outputSink.append(buffer); 249 } 250 251 private void finish() { 252 synchronized (controller.dataAvailable) { 253 shouldStop = true; 254 controller.dataAvailable.notifyAll(); 255 } 256 } 257 } 258}