001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.CompletionService; 026import java.util.concurrent.ExecutorCompletionService; 027import java.util.concurrent.ThreadPoolExecutor; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicLong; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.monitoring.MonitoredTask; 032import org.apache.hadoop.hbase.util.CancelableProgressable; 033import org.apache.hadoop.hbase.util.Threads; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 039import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 040 041/** 042 * The following class is an abstraction class to provide a common interface to support different 043 * ways of consuming recovered edits. 044 */ 045@InterfaceAudience.Private 046abstract class OutputSink { 047 private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class); 048 049 private final WALSplitter.PipelineController controller; 050 protected final EntryBuffers entryBuffers; 051 052 private final List<WriterThread> writerThreads = Lists.newArrayList(); 053 054 protected final int numThreads; 055 056 protected CancelableProgressable reporter = null; 057 058 protected final AtomicLong totalSkippedEdits = new AtomicLong(); 059 060 /** 061 * List of all the files produced by this sink 062 */ 063 protected final List<Path> splits = new ArrayList<>(); 064 065 protected MonitoredTask status = null; 066 067 /** 068 * Used when close this output sink. 069 */ 070 protected final ThreadPoolExecutor closeThreadPool; 071 protected final CompletionService<Void> closeCompletionService; 072 073 public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, 074 int numWriters) { 075 this.numThreads = numWriters; 076 this.controller = controller; 077 this.entryBuffers = entryBuffers; 078 this.closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, 079 new ThreadFactoryBuilder().setNameFormat("split-log-closeStream-pool-%d").setDaemon(true) 080 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 081 this.closeCompletionService = new ExecutorCompletionService<>(closeThreadPool); 082 } 083 084 void setReporter(CancelableProgressable reporter) { 085 this.reporter = reporter; 086 } 087 088 void setStatus(MonitoredTask status) { 089 this.status = status; 090 } 091 092 /** 093 * Start the threads that will pump data from the entryBuffers to the output files. 094 */ 095 void startWriterThreads() throws IOException { 096 for (int i = 0; i < numThreads; i++) { 097 WriterThread t = new WriterThread(controller, entryBuffers, this, i); 098 t.start(); 099 writerThreads.add(t); 100 } 101 } 102 103 /** 104 * Wait for writer threads to dump all info to the sink 105 * @return true when there is no error 106 */ 107 boolean finishWriterThreads() throws IOException { 108 LOG.debug("Waiting for split writer threads to finish"); 109 boolean progressFailed = false; 110 for (WriterThread t : writerThreads) { 111 t.finish(); 112 } 113 114 for (WriterThread t : writerThreads) { 115 if (!progressFailed && reporter != null && !reporter.progress()) { 116 progressFailed = true; 117 } 118 try { 119 t.join(); 120 } catch (InterruptedException ie) { 121 IOException iie = new InterruptedIOException(); 122 iie.initCause(ie); 123 throw iie; 124 } 125 } 126 controller.checkForErrors(); 127 final String msg = this.writerThreads.size() + " split writer threads finished"; 128 LOG.info(msg); 129 updateStatusWithMsg(msg); 130 return (!progressFailed); 131 } 132 133 long getTotalSkippedEdits() { 134 return this.totalSkippedEdits.get(); 135 } 136 137 /** Returns the number of currently opened writers */ 138 abstract int getNumOpenWriters(); 139 140 /** 141 * @param buffer A buffer of some number of edits for a given region. 142 * @throws IOException For any IO errors 143 */ 144 abstract void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException; 145 146 abstract List<Path> close() throws IOException; 147 148 /** Returns a map from encoded region ID to the number of edits written out for that region. */ 149 abstract Map<String, Long> getOutputCounts(); 150 151 /** Returns number of regions we've recovered */ 152 abstract int getNumberOfRecoveredRegions(); 153 154 /** 155 * Some WALEdit's contain only KV's for account on what happened to a region. Not all sinks will 156 * want to get all of those edits. 157 * @return Return true if this sink wants to accept this region-level WALEdit. 158 */ 159 abstract boolean keepRegionEvent(WAL.Entry entry); 160 161 /** 162 * Set status message in {@link MonitoredTask} instance that is set in this OutputSink 163 * @param msg message to update the status with 164 */ 165 protected final void updateStatusWithMsg(String msg) { 166 if (status != null) { 167 status.setStatus(msg); 168 } 169 } 170 171 public static class WriterThread extends Thread { 172 private volatile boolean shouldStop = false; 173 private WALSplitter.PipelineController controller; 174 private EntryBuffers entryBuffers; 175 private OutputSink outputSink = null; 176 177 WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, 178 OutputSink sink, int i) { 179 super(Thread.currentThread().getName() + "-Writer-" + i); 180 this.controller = controller; 181 this.entryBuffers = entryBuffers; 182 outputSink = sink; 183 } 184 185 @Override 186 public void run() { 187 try { 188 doRun(); 189 } catch (Throwable t) { 190 LOG.error("Exiting thread", t); 191 controller.writerThreadError(t); 192 } 193 } 194 195 private void doRun() throws IOException { 196 LOG.trace("Writer thread starting"); 197 while (true) { 198 EntryBuffers.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); 199 if (buffer == null) { 200 // No data currently available, wait on some more to show up 201 synchronized (controller.dataAvailable) { 202 if (shouldStop) { 203 return; 204 } 205 try { 206 controller.dataAvailable.wait(500); 207 } catch (InterruptedException ie) { 208 if (!shouldStop) { 209 throw new RuntimeException(ie); 210 } 211 } 212 } 213 continue; 214 } 215 216 assert buffer != null; 217 try { 218 writeBuffer(buffer); 219 } finally { 220 entryBuffers.doneWriting(buffer); 221 } 222 } 223 } 224 225 private void writeBuffer(EntryBuffers.RegionEntryBuffer buffer) throws IOException { 226 outputSink.append(buffer); 227 } 228 229 private void finish() { 230 synchronized (controller.dataAvailable) { 231 shouldStop = true; 232 controller.dataAvailable.notifyAll(); 233 } 234 } 235 } 236}