001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.CompletionService;
026import java.util.concurrent.ExecutorCompletionService;
027import java.util.concurrent.ThreadPoolExecutor;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicLong;
030
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.monitoring.MonitoredTask;
033import org.apache.hadoop.hbase.util.CancelableProgressable;
034import org.apache.hadoop.hbase.util.Threads;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
040
041/**
042 * The following class is an abstraction class to provide a common interface to support different
043 * ways of consuming recovered edits.
044 */
045@InterfaceAudience.Private
046public abstract class OutputSink {
047  private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class);
048
049  private final WALSplitter.PipelineController controller;
050  protected final EntryBuffers entryBuffers;
051
052  private final List<WriterThread> writerThreads = Lists.newArrayList();
053
054  protected final int numThreads;
055
056  protected CancelableProgressable reporter = null;
057
058  protected final AtomicLong totalSkippedEdits = new AtomicLong();
059
060  /**
061   * List of all the files produced by this sink
062   */
063  protected final List<Path> splits = new ArrayList<>();
064
065  protected MonitoredTask status = null;
066
067  /**
068   * Used when close this output sink.
069   */
070  protected final ThreadPoolExecutor closeThreadPool;
071  protected final CompletionService<Void> closeCompletionService;
072
073  public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
074      int numWriters) {
075    this.numThreads = numWriters;
076    this.controller = controller;
077    this.entryBuffers = entryBuffers;
078    this.closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS,
079        Threads.newDaemonThreadFactory("split-log-closeStream-"));
080    this.closeCompletionService = new ExecutorCompletionService<>(closeThreadPool);
081  }
082
083  void setReporter(CancelableProgressable reporter) {
084    this.reporter = reporter;
085  }
086
087  void setStatus(MonitoredTask status) {
088    this.status = status;
089  }
090
091  /**
092   * Start the threads that will pump data from the entryBuffers to the output files.
093   */
094  public void startWriterThreads() throws IOException {
095    for (int i = 0; i < numThreads; i++) {
096      WriterThread t = new WriterThread(controller, entryBuffers, this, i);
097      t.start();
098      writerThreads.add(t);
099    }
100  }
101
102  public synchronized void restartWriterThreadsIfNeeded() {
103    for(int i = 0; i< writerThreads.size(); i++){
104      WriterThread t = writerThreads.get(i);
105      if (!t.isAlive()){
106        String threadName = t.getName();
107        LOG.debug("Replacing dead thread: " + threadName);
108        WriterThread newThread = new WriterThread(controller, entryBuffers, this, threadName);
109        newThread.start();
110        writerThreads.set(i, newThread);
111      }
112    }
113  }
114
115  /**
116   * Wait for writer threads to dump all info to the sink
117   *
118   * @return true when there is no error
119   */
120  protected boolean finishWriterThreads(boolean interrupt) throws IOException {
121    LOG.debug("Waiting for split writer threads to finish");
122    boolean progressFailed = false;
123    for (WriterThread t : writerThreads) {
124      t.finish();
125    }
126    if (interrupt) {
127      for (WriterThread t : writerThreads) {
128        t.interrupt(); // interrupt the writer threads. We are stopping now.
129      }
130    }
131
132    for (WriterThread t : writerThreads) {
133      if (!progressFailed && reporter != null && !reporter.progress()) {
134        progressFailed = true;
135      }
136      try {
137        t.join();
138      } catch (InterruptedException ie) {
139        IOException iie = new InterruptedIOException();
140        iie.initCause(ie);
141        throw iie;
142      }
143    }
144    controller.checkForErrors();
145    final String msg = this.writerThreads.size() + " split writer threads finished";
146    LOG.info(msg);
147    updateStatusWithMsg(msg);
148    return (!progressFailed);
149  }
150
151  long getTotalSkippedEdits() {
152    return this.totalSkippedEdits.get();
153  }
154
155  /**
156   * @return the number of currently opened writers
157   */
158  protected abstract int getNumOpenWriters();
159
160  /**
161   * @param buffer A buffer of some number of edits for a given region.
162   * @throws IOException For any IO errors
163   */
164  protected abstract void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException;
165
166  protected abstract List<Path> close() throws IOException;
167
168  /**
169   * @return a map from encoded region ID to the number of edits written out for that region.
170   */
171  protected abstract Map<String, Long> getOutputCounts();
172
173  /**
174   * @return number of regions we've recovered
175   */
176  protected abstract int getNumberOfRecoveredRegions();
177
178  /**
179   * Some WALEdit's contain only KV's for account on what happened to a region. Not all sinks will
180   * want to get all of those edits.
181   * @return Return true if this sink wants to accept this region-level WALEdit.
182   */
183  protected abstract boolean keepRegionEvent(WAL.Entry entry);
184
185  /**
186   * Set status message in {@link MonitoredTask} instance that is set in this OutputSink
187   * @param msg message to update the status with
188   */
189  protected final void updateStatusWithMsg(String msg) {
190    if (status != null) {
191      status.setStatus(msg);
192    }
193  }
194
195  public static class WriterThread extends Thread {
196    private volatile boolean shouldStop = false;
197    private WALSplitter.PipelineController controller;
198    private EntryBuffers entryBuffers;
199    private OutputSink outputSink = null;
200
201    WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
202        OutputSink sink, int i) {
203      this(controller, entryBuffers, sink, Thread.currentThread().getName() + "-Writer-" + i);
204    }
205
206    WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
207        OutputSink sink, String threadName) {
208      super(threadName);
209      this.controller = controller;
210      this.entryBuffers = entryBuffers;
211      outputSink = sink;
212    }
213
214    @Override
215    public void run()  {
216      try {
217        doRun();
218      } catch (Throwable t) {
219        LOG.error("Exiting thread", t);
220        controller.writerThreadError(t);
221      }
222    }
223
224    private void doRun() throws IOException {
225      LOG.trace("Writer thread starting");
226      while (true) {
227        EntryBuffers.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
228        if (buffer == null) {
229          // No data currently available, wait on some more to show up
230          synchronized (controller.dataAvailable) {
231            if (shouldStop) {
232              return;
233            }
234            try {
235              controller.dataAvailable.wait(500);
236            } catch (InterruptedException ie) {
237              if (!shouldStop) {
238                throw new RuntimeException(ie);
239              }
240            }
241          }
242          continue;
243        }
244
245        assert buffer != null;
246        try {
247          writeBuffer(buffer);
248        } finally {
249          entryBuffers.doneWriting(buffer);
250        }
251      }
252    }
253
254    private void writeBuffer(EntryBuffers.RegionEntryBuffer buffer) throws IOException {
255      outputSink.append(buffer);
256    }
257
258    private void finish() {
259      synchronized (controller.dataAvailable) {
260        shouldStop = true;
261        controller.dataAvailable.notifyAll();
262      }
263    }
264  }
265}