001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.CompletionService;
026import java.util.concurrent.ExecutorCompletionService;
027import java.util.concurrent.ThreadPoolExecutor;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicLong;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.monitoring.MonitoredTask;
032import org.apache.hadoop.hbase.util.CancelableProgressable;
033import org.apache.hadoop.hbase.util.Threads;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
039import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
040
041/**
042 * The following class is an abstraction class to provide a common interface to support different
043 * ways of consuming recovered edits.
044 */
045@InterfaceAudience.Private
046public abstract class OutputSink {
047  private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class);
048
049  private final WALSplitter.PipelineController controller;
050  protected final EntryBuffers entryBuffers;
051
052  private final List<WriterThread> writerThreads = Lists.newArrayList();
053
054  protected final int numThreads;
055
056  protected CancelableProgressable reporter = null;
057
058  protected final AtomicLong totalSkippedEdits = new AtomicLong();
059
060  /**
061   * List of all the files produced by this sink
062   */
063  protected final List<Path> splits = new ArrayList<>();
064
065  protected MonitoredTask status = null;
066
067  /**
068   * Used when close this output sink.
069   */
070  protected final ThreadPoolExecutor closeThreadPool;
071  protected final CompletionService<Void> closeCompletionService;
072
073  public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
074    int numWriters) {
075    this.numThreads = numWriters;
076    this.controller = controller;
077    this.entryBuffers = entryBuffers;
078    this.closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS,
079      new ThreadFactoryBuilder().setNameFormat("split-log-closeStream-pool-%d").setDaemon(true)
080        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
081    this.closeCompletionService = new ExecutorCompletionService<>(closeThreadPool);
082  }
083
084  void setReporter(CancelableProgressable reporter) {
085    this.reporter = reporter;
086  }
087
088  void setStatus(MonitoredTask status) {
089    this.status = status;
090  }
091
092  /**
093   * Start the threads that will pump data from the entryBuffers to the output files.
094   */
095  public void startWriterThreads() throws IOException {
096    for (int i = 0; i < numThreads; i++) {
097      WriterThread t = new WriterThread(controller, entryBuffers, this, i);
098      t.start();
099      writerThreads.add(t);
100    }
101  }
102
103  public synchronized void restartWriterThreadsIfNeeded() {
104    for (int i = 0; i < writerThreads.size(); i++) {
105      WriterThread t = writerThreads.get(i);
106      if (!t.isAlive()) {
107        String threadName = t.getName();
108        LOG.debug("Replacing dead thread: " + threadName);
109        WriterThread newThread = new WriterThread(controller, entryBuffers, this, threadName);
110        newThread.start();
111        writerThreads.set(i, newThread);
112      }
113    }
114  }
115
116  /**
117   * Wait for writer threads to dump all info to the sink
118   * @return true when there is no error
119   */
120  protected boolean finishWriterThreads(boolean interrupt) throws IOException {
121    LOG.debug("Waiting for split writer threads to finish");
122    boolean progressFailed = false;
123    for (WriterThread t : writerThreads) {
124      t.finish();
125    }
126    if (interrupt) {
127      for (WriterThread t : writerThreads) {
128        t.interrupt(); // interrupt the writer threads. We are stopping now.
129      }
130    }
131
132    for (WriterThread t : writerThreads) {
133      if (!progressFailed && reporter != null && !reporter.progress()) {
134        progressFailed = true;
135      }
136      try {
137        t.join();
138      } catch (InterruptedException ie) {
139        IOException iie = new InterruptedIOException();
140        iie.initCause(ie);
141        throw iie;
142      }
143    }
144    controller.checkForErrors();
145    final String msg = this.writerThreads.size() + " split writer threads finished";
146    LOG.info(msg);
147    updateStatusWithMsg(msg);
148    return (!progressFailed);
149  }
150
151  long getTotalSkippedEdits() {
152    return this.totalSkippedEdits.get();
153  }
154
155  /** Returns the number of currently opened writers */
156  protected abstract int getNumOpenWriters();
157
158  /**
159   * @param buffer A buffer of some number of edits for a given region.
160   * @throws IOException For any IO errors
161   */
162  protected abstract void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException;
163
164  protected abstract List<Path> close() throws IOException;
165
166  /** Returns a map from encoded region ID to the number of edits written out for that region. */
167  protected abstract Map<String, Long> getOutputCounts();
168
169  /** Returns number of regions we've recovered */
170  protected abstract int getNumberOfRecoveredRegions();
171
172  /**
173   * Some WALEdit's contain only KV's for account on what happened to a region. Not all sinks will
174   * want to get all of those edits.
175   * @return Return true if this sink wants to accept this region-level WALEdit.
176   */
177  protected abstract boolean keepRegionEvent(WAL.Entry entry);
178
179  /**
180   * Set status message in {@link MonitoredTask} instance that is set in this OutputSink
181   * @param msg message to update the status with
182   */
183  protected final void updateStatusWithMsg(String msg) {
184    if (status != null) {
185      status.setStatus(msg);
186    }
187  }
188
189  public static class WriterThread extends Thread {
190    private volatile boolean shouldStop = false;
191    private WALSplitter.PipelineController controller;
192    private EntryBuffers entryBuffers;
193    private OutputSink outputSink = null;
194
195    WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
196      OutputSink sink, int i) {
197      this(controller, entryBuffers, sink, Thread.currentThread().getName() + "-Writer-" + i);
198    }
199
200    WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
201      OutputSink sink, String threadName) {
202      super(threadName);
203      this.controller = controller;
204      this.entryBuffers = entryBuffers;
205      outputSink = sink;
206    }
207
208    @Override
209    public void run() {
210      try {
211        doRun();
212      } catch (Throwable t) {
213        LOG.error("Exiting thread", t);
214        controller.writerThreadError(t);
215      }
216    }
217
218    private void doRun() throws IOException {
219      LOG.trace("Writer thread starting");
220      while (true) {
221        EntryBuffers.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
222        if (buffer == null) {
223          // No data currently available, wait on some more to show up
224          synchronized (controller.dataAvailable) {
225            if (shouldStop) {
226              return;
227            }
228            try {
229              controller.dataAvailable.wait(500);
230            } catch (InterruptedException ie) {
231              if (!shouldStop) {
232                throw new RuntimeException(ie);
233              }
234            }
235          }
236          continue;
237        }
238
239        assert buffer != null;
240        try {
241          writeBuffer(buffer);
242        } finally {
243          entryBuffers.doneWriting(buffer);
244        }
245      }
246    }
247
248    private void writeBuffer(EntryBuffers.RegionEntryBuffer buffer) throws IOException {
249      outputSink.append(buffer);
250    }
251
252    private void finish() {
253      synchronized (controller.dataAvailable) {
254        shouldStop = true;
255        controller.dataAvailable.notifyAll();
256      }
257    }
258  }
259}