001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.List;
025import java.util.Map;
026import java.util.concurrent.CompletionService;
027import java.util.concurrent.ExecutorCompletionService;
028import java.util.concurrent.ThreadPoolExecutor;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicLong;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.monitoring.MonitoredTask;
033import org.apache.hadoop.hbase.util.CancelableProgressable;
034import org.apache.hadoop.hbase.util.Threads;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
040import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
041
042/**
043 * The following class is an abstraction class to provide a common interface to support different
044 * ways of consuming recovered edits.
045 */
046@InterfaceAudience.Private
047abstract class OutputSink {
048  private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class);
049
050  private final WALSplitter.PipelineController controller;
051  protected final EntryBuffers entryBuffers;
052
053  private final List<WriterThread> writerThreads = Lists.newArrayList();
054
055  protected final int numThreads;
056
057  protected CancelableProgressable reporter = null;
058
059  protected final AtomicLong totalSkippedEdits = new AtomicLong();
060
061  /**
062   * List of all the files produced by this sink,
063   * <p>
064   * Must be a synchronized list to avoid concurrency issues. CopyOnWriteArrayList is not a good
065   * choice because all we do is add to the list and then return the result.
066   */
067  protected final List<Path> splits = Collections.synchronizedList(new ArrayList<>());
068
069  protected MonitoredTask status = null;
070
071  /**
072   * Used when close this output sink.
073   */
074  protected final ThreadPoolExecutor closeThreadPool;
075  protected final CompletionService<Void> closeCompletionService;
076
077  public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
078    int numWriters) {
079    this.numThreads = numWriters;
080    this.controller = controller;
081    this.entryBuffers = entryBuffers;
082    this.closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS,
083      new ThreadFactoryBuilder().setNameFormat("split-log-closeStream-pool-%d").setDaemon(true)
084        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
085    this.closeCompletionService = new ExecutorCompletionService<>(closeThreadPool);
086  }
087
088  void setReporter(CancelableProgressable reporter) {
089    this.reporter = reporter;
090  }
091
092  void setStatus(MonitoredTask status) {
093    this.status = status;
094  }
095
096  /**
097   * Start the threads that will pump data from the entryBuffers to the output files.
098   */
099  void startWriterThreads() throws IOException {
100    for (int i = 0; i < numThreads; i++) {
101      WriterThread t = new WriterThread(controller, entryBuffers, this, i);
102      t.start();
103      writerThreads.add(t);
104    }
105  }
106
107  public synchronized void restartWriterThreadsIfNeeded() {
108    for (int i = 0; i < writerThreads.size(); i++) {
109      WriterThread t = writerThreads.get(i);
110      if (!t.isAlive()) {
111        String threadName = t.getName();
112        LOG.debug("Replacing dead thread: " + threadName);
113        WriterThread newThread = new WriterThread(controller, entryBuffers, this, threadName);
114        newThread.start();
115        writerThreads.set(i, newThread);
116      }
117    }
118  }
119
120  /**
121   * Wait for writer threads to dump all info to the sink
122   * @return true when there is no error
123   */
124  boolean finishWriterThreads() throws IOException {
125    LOG.debug("Waiting for split writer threads to finish");
126    boolean progressFailed = false;
127    for (WriterThread t : writerThreads) {
128      t.finish();
129    }
130
131    for (WriterThread t : writerThreads) {
132      if (!progressFailed && reporter != null && !reporter.progress()) {
133        progressFailed = true;
134      }
135      try {
136        t.join();
137      } catch (InterruptedException ie) {
138        IOException iie = new InterruptedIOException();
139        iie.initCause(ie);
140        throw iie;
141      }
142    }
143    controller.checkForErrors();
144    final String msg = this.writerThreads.size() + " split writer threads finished";
145    LOG.info(msg);
146    updateStatusWithMsg(msg);
147    return (!progressFailed);
148  }
149
150  long getTotalSkippedEdits() {
151    return this.totalSkippedEdits.get();
152  }
153
154  /** Returns the number of currently opened writers */
155  abstract int getNumOpenWriters();
156
157  /**
158   * @param buffer A buffer of some number of edits for a given region.
159   * @throws IOException For any IO errors
160   */
161  abstract void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException;
162
163  abstract List<Path> close() throws IOException;
164
165  /** Returns a map from encoded region ID to the number of edits written out for that region. */
166  abstract Map<String, Long> getOutputCounts();
167
168  /** Returns number of regions we've recovered */
169  abstract int getNumberOfRecoveredRegions();
170
171  /**
172   * Some WALEdit's contain only KV's for account on what happened to a region. Not all sinks will
173   * want to get all of those edits.
174   * @return Return true if this sink wants to accept this region-level WALEdit.
175   */
176  abstract boolean keepRegionEvent(WAL.Entry entry);
177
178  /**
179   * Set status message in {@link MonitoredTask} instance that is set in this OutputSink
180   * @param msg message to update the status with
181   */
182  protected final void updateStatusWithMsg(String msg) {
183    if (status != null) {
184      status.setStatus(msg);
185    }
186  }
187
188  public static class WriterThread extends Thread {
189    private volatile boolean shouldStop = false;
190    private WALSplitter.PipelineController controller;
191    private EntryBuffers entryBuffers;
192    private OutputSink outputSink = null;
193
194    WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
195      OutputSink sink, int i) {
196      this(controller, entryBuffers, sink, Thread.currentThread().getName() + "-Writer-" + i);
197    }
198
199    WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
200      OutputSink sink, String threadName) {
201      super(threadName);
202      this.controller = controller;
203      this.entryBuffers = entryBuffers;
204      outputSink = sink;
205    }
206
207    @Override
208    public void run() {
209      try {
210        doRun();
211      } catch (Throwable t) {
212        LOG.error("Exiting thread", t);
213        controller.writerThreadError(t);
214      }
215    }
216
217    private void doRun() throws IOException {
218      LOG.trace("Writer thread starting");
219      while (true) {
220        EntryBuffers.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
221        if (buffer == null) {
222          // No data currently available, wait on some more to show up
223          synchronized (controller.dataAvailable) {
224            if (shouldStop) {
225              return;
226            }
227            try {
228              controller.dataAvailable.wait(500);
229            } catch (InterruptedException ie) {
230              if (!shouldStop) {
231                throw new RuntimeException(ie);
232              }
233            }
234          }
235          continue;
236        }
237
238        assert buffer != null;
239        try {
240          writeBuffer(buffer);
241        } finally {
242          entryBuffers.doneWriting(buffer);
243        }
244      }
245    }
246
247    private void writeBuffer(EntryBuffers.RegionEntryBuffer buffer) throws IOException {
248      outputSink.append(buffer);
249    }
250
251    private void finish() {
252      synchronized (controller.dataAvailable) {
253        shouldStop = true;
254        controller.dataAvailable.notifyAll();
255      }
256    }
257  }
258}