001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.CompletionService;
026import java.util.concurrent.ExecutorCompletionService;
027import java.util.concurrent.ThreadPoolExecutor;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicLong;
030
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.monitoring.MonitoredTask;
033import org.apache.hadoop.hbase.util.CancelableProgressable;
034import org.apache.hadoop.hbase.util.Threads;
035import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
041
042/**
043 * The following class is an abstraction class to provide a common interface to support different
044 * ways of consuming recovered edits.
045 */
046@InterfaceAudience.Private
047public abstract class OutputSink {
048  private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class);
049
050  private final WALSplitter.PipelineController controller;
051  protected final EntryBuffers entryBuffers;
052
053  private final List<WriterThread> writerThreads = Lists.newArrayList();
054
055  protected final int numThreads;
056
057  protected CancelableProgressable reporter = null;
058
059  protected final AtomicLong totalSkippedEdits = new AtomicLong();
060
061  /**
062   * List of all the files produced by this sink
063   */
064  protected final List<Path> splits = new ArrayList<>();
065
066  protected MonitoredTask status = null;
067
068  /**
069   * Used when close this output sink.
070   */
071  protected final ThreadPoolExecutor closeThreadPool;
072  protected final CompletionService<Void> closeCompletionService;
073
074  public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
075      int numWriters) {
076    this.numThreads = numWriters;
077    this.controller = controller;
078    this.entryBuffers = entryBuffers;
079    this.closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS,
080      new ThreadFactoryBuilder().setNameFormat("split-log-closeStream-pool-%d").setDaemon(true)
081        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
082    this.closeCompletionService = new ExecutorCompletionService<>(closeThreadPool);
083  }
084
085  void setReporter(CancelableProgressable reporter) {
086    this.reporter = reporter;
087  }
088
089  void setStatus(MonitoredTask status) {
090    this.status = status;
091  }
092
093  /**
094   * Start the threads that will pump data from the entryBuffers to the output files.
095   */
096  public void startWriterThreads() throws IOException {
097    for (int i = 0; i < numThreads; i++) {
098      WriterThread t = new WriterThread(controller, entryBuffers, this, i);
099      t.start();
100      writerThreads.add(t);
101    }
102  }
103
104  public synchronized void restartWriterThreadsIfNeeded() {
105    for(int i = 0; i< writerThreads.size(); i++){
106      WriterThread t = writerThreads.get(i);
107      if (!t.isAlive()){
108        String threadName = t.getName();
109        LOG.debug("Replacing dead thread: " + threadName);
110        WriterThread newThread = new WriterThread(controller, entryBuffers, this, threadName);
111        newThread.start();
112        writerThreads.set(i, newThread);
113      }
114    }
115  }
116
117  /**
118   * Wait for writer threads to dump all info to the sink
119   *
120   * @return true when there is no error
121   */
122  protected boolean finishWriterThreads(boolean interrupt) throws IOException {
123    LOG.debug("Waiting for split writer threads to finish");
124    boolean progressFailed = false;
125    for (WriterThread t : writerThreads) {
126      t.finish();
127    }
128    if (interrupt) {
129      for (WriterThread t : writerThreads) {
130        t.interrupt(); // interrupt the writer threads. We are stopping now.
131      }
132    }
133
134    for (WriterThread t : writerThreads) {
135      if (!progressFailed && reporter != null && !reporter.progress()) {
136        progressFailed = true;
137      }
138      try {
139        t.join();
140      } catch (InterruptedException ie) {
141        IOException iie = new InterruptedIOException();
142        iie.initCause(ie);
143        throw iie;
144      }
145    }
146    controller.checkForErrors();
147    final String msg = this.writerThreads.size() + " split writer threads finished";
148    LOG.info(msg);
149    updateStatusWithMsg(msg);
150    return (!progressFailed);
151  }
152
153  long getTotalSkippedEdits() {
154    return this.totalSkippedEdits.get();
155  }
156
157  /**
158   * @return the number of currently opened writers
159   */
160  protected abstract int getNumOpenWriters();
161
162  /**
163   * @param buffer A buffer of some number of edits for a given region.
164   * @throws IOException For any IO errors
165   */
166  protected abstract void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException;
167
168  protected abstract List<Path> close() throws IOException;
169
170  /**
171   * @return a map from encoded region ID to the number of edits written out for that region.
172   */
173  protected abstract Map<String, Long> getOutputCounts();
174
175  /**
176   * @return number of regions we've recovered
177   */
178  protected abstract int getNumberOfRecoveredRegions();
179
180  /**
181   * Some WALEdit's contain only KV's for account on what happened to a region. Not all sinks will
182   * want to get all of those edits.
183   * @return Return true if this sink wants to accept this region-level WALEdit.
184   */
185  protected abstract boolean keepRegionEvent(WAL.Entry entry);
186
187  /**
188   * Set status message in {@link MonitoredTask} instance that is set in this OutputSink
189   * @param msg message to update the status with
190   */
191  protected final void updateStatusWithMsg(String msg) {
192    if (status != null) {
193      status.setStatus(msg);
194    }
195  }
196
197  public static class WriterThread extends Thread {
198    private volatile boolean shouldStop = false;
199    private WALSplitter.PipelineController controller;
200    private EntryBuffers entryBuffers;
201    private OutputSink outputSink = null;
202
203    WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
204        OutputSink sink, int i) {
205      this(controller, entryBuffers, sink, Thread.currentThread().getName() + "-Writer-" + i);
206    }
207
208    WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
209        OutputSink sink, String threadName) {
210      super(threadName);
211      this.controller = controller;
212      this.entryBuffers = entryBuffers;
213      outputSink = sink;
214    }
215
216    @Override
217    public void run()  {
218      try {
219        doRun();
220      } catch (Throwable t) {
221        LOG.error("Exiting thread", t);
222        controller.writerThreadError(t);
223      }
224    }
225
226    private void doRun() throws IOException {
227      LOG.trace("Writer thread starting");
228      while (true) {
229        EntryBuffers.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
230        if (buffer == null) {
231          // No data currently available, wait on some more to show up
232          synchronized (controller.dataAvailable) {
233            if (shouldStop) {
234              return;
235            }
236            try {
237              controller.dataAvailable.wait(500);
238            } catch (InterruptedException ie) {
239              if (!shouldStop) {
240                throw new RuntimeException(ie);
241              }
242            }
243          }
244          continue;
245        }
246
247        assert buffer != null;
248        try {
249          writeBuffer(buffer);
250        } finally {
251          entryBuffers.doneWriting(buffer);
252        }
253      }
254    }
255
256    private void writeBuffer(EntryBuffers.RegionEntryBuffer buffer) throws IOException {
257      outputSink.append(buffer);
258    }
259
260    private void finish() {
261      synchronized (controller.dataAvailable) {
262        shouldStop = true;
263        controller.dataAvailable.notifyAll();
264      }
265    }
266  }
267}