001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.CompletionService;
026import java.util.concurrent.ExecutorCompletionService;
027import java.util.concurrent.ThreadPoolExecutor;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicLong;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.monitoring.MonitoredTask;
032import org.apache.hadoop.hbase.util.CancelableProgressable;
033import org.apache.hadoop.hbase.util.Threads;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
039import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
040
041/**
042 * The following class is an abstraction class to provide a common interface to support different
043 * ways of consuming recovered edits.
044 */
045@InterfaceAudience.Private
046abstract class OutputSink {
047  private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class);
048
049  private final WALSplitter.PipelineController controller;
050  protected final EntryBuffers entryBuffers;
051
052  private final List<WriterThread> writerThreads = Lists.newArrayList();
053
054  protected final int numThreads;
055
056  protected CancelableProgressable reporter = null;
057
058  protected final AtomicLong totalSkippedEdits = new AtomicLong();
059
060  /**
061   * List of all the files produced by this sink
062   */
063  protected final List<Path> splits = new ArrayList<>();
064
065  protected MonitoredTask status = null;
066
067  /**
068   * Used when close this output sink.
069   */
070  protected final ThreadPoolExecutor closeThreadPool;
071  protected final CompletionService<Void> closeCompletionService;
072
073  public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
074    int numWriters) {
075    this.numThreads = numWriters;
076    this.controller = controller;
077    this.entryBuffers = entryBuffers;
078    this.closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS,
079      new ThreadFactoryBuilder().setNameFormat("split-log-closeStream-pool-%d").setDaemon(true)
080        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
081    this.closeCompletionService = new ExecutorCompletionService<>(closeThreadPool);
082  }
083
084  void setReporter(CancelableProgressable reporter) {
085    this.reporter = reporter;
086  }
087
088  void setStatus(MonitoredTask status) {
089    this.status = status;
090  }
091
092  /**
093   * Start the threads that will pump data from the entryBuffers to the output files.
094   */
095  void startWriterThreads() throws IOException {
096    for (int i = 0; i < numThreads; i++) {
097      WriterThread t = new WriterThread(controller, entryBuffers, this, i);
098      t.start();
099      writerThreads.add(t);
100    }
101  }
102
103  /**
104   * Wait for writer threads to dump all info to the sink
105   * @return true when there is no error
106   */
107  boolean finishWriterThreads() throws IOException {
108    LOG.debug("Waiting for split writer threads to finish");
109    boolean progressFailed = false;
110    for (WriterThread t : writerThreads) {
111      t.finish();
112    }
113
114    for (WriterThread t : writerThreads) {
115      if (!progressFailed && reporter != null && !reporter.progress()) {
116        progressFailed = true;
117      }
118      try {
119        t.join();
120      } catch (InterruptedException ie) {
121        IOException iie = new InterruptedIOException();
122        iie.initCause(ie);
123        throw iie;
124      }
125    }
126    controller.checkForErrors();
127    final String msg = this.writerThreads.size() + " split writer threads finished";
128    LOG.info(msg);
129    updateStatusWithMsg(msg);
130    return (!progressFailed);
131  }
132
133  long getTotalSkippedEdits() {
134    return this.totalSkippedEdits.get();
135  }
136
137  /** Returns the number of currently opened writers */
138  abstract int getNumOpenWriters();
139
140  /**
141   * @param buffer A buffer of some number of edits for a given region.
142   * @throws IOException For any IO errors
143   */
144  abstract void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException;
145
146  abstract List<Path> close() throws IOException;
147
148  /** Returns a map from encoded region ID to the number of edits written out for that region. */
149  abstract Map<String, Long> getOutputCounts();
150
151  /** Returns number of regions we've recovered */
152  abstract int getNumberOfRecoveredRegions();
153
154  /**
155   * Some WALEdit's contain only KV's for account on what happened to a region. Not all sinks will
156   * want to get all of those edits.
157   * @return Return true if this sink wants to accept this region-level WALEdit.
158   */
159  abstract boolean keepRegionEvent(WAL.Entry entry);
160
161  /**
162   * Set status message in {@link MonitoredTask} instance that is set in this OutputSink
163   * @param msg message to update the status with
164   */
165  protected final void updateStatusWithMsg(String msg) {
166    if (status != null) {
167      status.setStatus(msg);
168    }
169  }
170
171  public static class WriterThread extends Thread {
172    private volatile boolean shouldStop = false;
173    private WALSplitter.PipelineController controller;
174    private EntryBuffers entryBuffers;
175    private OutputSink outputSink = null;
176
177    WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
178      OutputSink sink, int i) {
179      super(Thread.currentThread().getName() + "-Writer-" + i);
180      this.controller = controller;
181      this.entryBuffers = entryBuffers;
182      outputSink = sink;
183    }
184
185    @Override
186    public void run() {
187      try {
188        doRun();
189      } catch (Throwable t) {
190        LOG.error("Exiting thread", t);
191        controller.writerThreadError(t);
192      }
193    }
194
195    private void doRun() throws IOException {
196      LOG.trace("Writer thread starting");
197      while (true) {
198        EntryBuffers.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
199        if (buffer == null) {
200          // No data currently available, wait on some more to show up
201          synchronized (controller.dataAvailable) {
202            if (shouldStop) {
203              return;
204            }
205            try {
206              controller.dataAvailable.wait(500);
207            } catch (InterruptedException ie) {
208              if (!shouldStop) {
209                throw new RuntimeException(ie);
210              }
211            }
212          }
213          continue;
214        }
215
216        assert buffer != null;
217        try {
218          writeBuffer(buffer);
219        } finally {
220          entryBuffers.doneWriting(buffer);
221        }
222      }
223    }
224
225    private void writeBuffer(EntryBuffers.RegionEntryBuffer buffer) throws IOException {
226      outputSink.append(buffer);
227    }
228
229    private void finish() {
230      synchronized (controller.dataAvailable) {
231        shouldStop = true;
232        controller.dataAvailable.notifyAll();
233      }
234    }
235  }
236}