001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.CompletionService;
026import java.util.concurrent.ExecutorCompletionService;
027import java.util.concurrent.ThreadPoolExecutor;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicLong;
030
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.monitoring.MonitoredTask;
033import org.apache.hadoop.hbase.util.CancelableProgressable;
034import org.apache.hadoop.hbase.util.Threads;
035import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
041
042/**
043 * The following class is an abstraction class to provide a common interface to support different
044 * ways of consuming recovered edits.
045 */
046@InterfaceAudience.Private
047abstract class OutputSink {
048  private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class);
049
050  private final WALSplitter.PipelineController controller;
051  protected final EntryBuffers entryBuffers;
052
053  private final List<WriterThread> writerThreads = Lists.newArrayList();
054
055  protected final int numThreads;
056
057  protected CancelableProgressable reporter = null;
058
059  protected final AtomicLong totalSkippedEdits = new AtomicLong();
060
061  /**
062   * List of all the files produced by this sink
063   */
064  protected final List<Path> splits = new ArrayList<>();
065
066  protected MonitoredTask status = null;
067
068  /**
069   * Used when close this output sink.
070   */
071  protected final ThreadPoolExecutor closeThreadPool;
072  protected final CompletionService<Void> closeCompletionService;
073
074  public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
075      int numWriters) {
076    this.numThreads = numWriters;
077    this.controller = controller;
078    this.entryBuffers = entryBuffers;
079    this.closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS,
080      new ThreadFactoryBuilder().setNameFormat("split-log-closeStream-pool-%d").setDaemon(true)
081        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
082    this.closeCompletionService = new ExecutorCompletionService<>(closeThreadPool);
083  }
084
085  void setReporter(CancelableProgressable reporter) {
086    this.reporter = reporter;
087  }
088
089  void setStatus(MonitoredTask status) {
090    this.status = status;
091  }
092
093  /**
094   * Start the threads that will pump data from the entryBuffers to the output files.
095   */
096  void startWriterThreads() throws IOException {
097    for (int i = 0; i < numThreads; i++) {
098      WriterThread t = new WriterThread(controller, entryBuffers, this, i);
099      t.start();
100      writerThreads.add(t);
101    }
102  }
103
104  /**
105   * Wait for writer threads to dump all info to the sink
106   *
107   * @return true when there is no error
108   */
109  boolean finishWriterThreads() throws IOException {
110    LOG.debug("Waiting for split writer threads to finish");
111    boolean progressFailed = false;
112    for (WriterThread t : writerThreads) {
113      t.finish();
114    }
115
116    for (WriterThread t : writerThreads) {
117      if (!progressFailed && reporter != null && !reporter.progress()) {
118        progressFailed = true;
119      }
120      try {
121        t.join();
122      } catch (InterruptedException ie) {
123        IOException iie = new InterruptedIOException();
124        iie.initCause(ie);
125        throw iie;
126      }
127    }
128    controller.checkForErrors();
129    final String msg = this.writerThreads.size() + " split writer threads finished";
130    LOG.info(msg);
131    updateStatusWithMsg(msg);
132    return (!progressFailed);
133  }
134
135  long getTotalSkippedEdits() {
136    return this.totalSkippedEdits.get();
137  }
138
139  /**
140   * @return the number of currently opened writers
141   */
142  abstract int getNumOpenWriters();
143
144  /**
145   * @param buffer A buffer of some number of edits for a given region.
146   * @throws IOException For any IO errors
147   */
148  abstract void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException;
149
150  abstract List<Path> close() throws IOException;
151
152  /**
153   * @return a map from encoded region ID to the number of edits written out for that region.
154   */
155  abstract Map<String, Long> getOutputCounts();
156
157  /**
158   * @return number of regions we've recovered
159   */
160  abstract int getNumberOfRecoveredRegions();
161
162  /**
163   * Some WALEdit's contain only KV's for account on what happened to a region. Not all sinks will
164   * want to get all of those edits.
165   * @return Return true if this sink wants to accept this region-level WALEdit.
166   */
167  abstract boolean keepRegionEvent(WAL.Entry entry);
168
169  /**
170   * Set status message in {@link MonitoredTask} instance that is set in this OutputSink
171   * @param msg message to update the status with
172   */
173  protected final void updateStatusWithMsg(String msg) {
174    if (status != null) {
175      status.setStatus(msg);
176    }
177  }
178
179  public static class WriterThread extends Thread {
180    private volatile boolean shouldStop = false;
181    private WALSplitter.PipelineController controller;
182    private EntryBuffers entryBuffers;
183    private OutputSink outputSink = null;
184
185    WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
186        OutputSink sink, int i) {
187      super(Thread.currentThread().getName() + "-Writer-" + i);
188      this.controller = controller;
189      this.entryBuffers = entryBuffers;
190      outputSink = sink;
191    }
192
193    @Override
194    public void run()  {
195      try {
196        doRun();
197      } catch (Throwable t) {
198        LOG.error("Exiting thread", t);
199        controller.writerThreadError(t);
200      }
201    }
202
203    private void doRun() throws IOException {
204      LOG.trace("Writer thread starting");
205      while (true) {
206        EntryBuffers.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
207        if (buffer == null) {
208          // No data currently available, wait on some more to show up
209          synchronized (controller.dataAvailable) {
210            if (shouldStop) {
211              return;
212            }
213            try {
214              controller.dataAvailable.wait(500);
215            } catch (InterruptedException ie) {
216              if (!shouldStop) {
217                throw new RuntimeException(ie);
218              }
219            }
220          }
221          continue;
222        }
223
224        assert buffer != null;
225        try {
226          writeBuffer(buffer);
227        } finally {
228          entryBuffers.doneWriting(buffer);
229        }
230      }
231    }
232
233    private void writeBuffer(EntryBuffers.RegionEntryBuffer buffer) throws IOException {
234      outputSink.append(buffer);
235    }
236
237    private void finish() {
238      synchronized (controller.dataAvailable) {
239        shouldStop = true;
240        controller.dataAvailable.notifyAll();
241      }
242    }
243  }
244}