001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.Collections;
023import java.util.List;
024import java.util.Map;
025import java.util.Set;
026import java.util.TreeSet;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.atomic.AtomicLong;
029
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.hadoop.hbase.util.CancelableProgressable;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
038
039/**
040 * The following class is an abstraction class to provide a common interface to support different
041 * ways of consuming recovered edits.
042 */
043@InterfaceAudience.Private
044public abstract class OutputSink {
045  private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class);
046
047  protected WALSplitter.PipelineController controller;
048  protected EntryBuffers entryBuffers;
049
050  protected ConcurrentHashMap<String, WALSplitter.SinkWriter> writers = new ConcurrentHashMap<>();
051  protected final ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum =
052      new ConcurrentHashMap<>();
053
054  protected final List<WriterThread> writerThreads = Lists.newArrayList();
055
056  /* Set of regions which we've decided should not output edits */
057  protected final Set<byte[]> blacklistedRegions =
058      Collections.synchronizedSet(new TreeSet<>(Bytes.BYTES_COMPARATOR));
059
060  protected boolean closeAndCleanCompleted = false;
061
062  protected boolean writersClosed = false;
063
064  protected final int numThreads;
065
066  protected CancelableProgressable reporter = null;
067
068  protected AtomicLong skippedEdits = new AtomicLong();
069
070  protected List<Path> splits = null;
071
072  public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
073      int numWriters) {
074    numThreads = numWriters;
075    this.controller = controller;
076    this.entryBuffers = entryBuffers;
077  }
078
079  void setReporter(CancelableProgressable reporter) {
080    this.reporter = reporter;
081  }
082
083  /**
084   * Start the threads that will pump data from the entryBuffers to the output files.
085   */
086  public synchronized void startWriterThreads() {
087    for (int i = 0; i < numThreads; i++) {
088      WriterThread t = new WriterThread(controller, entryBuffers, this, i);
089      t.start();
090      writerThreads.add(t);
091    }
092  }
093
094  /**
095   * Update region's maximum edit log SeqNum.
096   */
097  void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) {
098    synchronized (regionMaximumEditLogSeqNum) {
099      String regionName = Bytes.toString(entry.getKey().getEncodedRegionName());
100      Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName);
101      if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
102        regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
103      }
104    }
105  }
106
107  /**
108   * @return the number of currently opened writers
109   */
110  int getNumOpenWriters() {
111    return this.writers.size();
112  }
113
114  long getSkippedEdits() {
115    return this.skippedEdits.get();
116  }
117
118  /**
119   * Wait for writer threads to dump all info to the sink
120   * @return true when there is no error
121   */
122  protected boolean finishWriting(boolean interrupt) throws IOException {
123    LOG.debug("Waiting for split writer threads to finish");
124    boolean progress_failed = false;
125    for (WriterThread t : writerThreads) {
126      t.finish();
127    }
128    if (interrupt) {
129      for (WriterThread t : writerThreads) {
130        t.interrupt(); // interrupt the writer threads. We are stopping now.
131      }
132    }
133
134    for (WriterThread t : writerThreads) {
135      if (!progress_failed && reporter != null && !reporter.progress()) {
136        progress_failed = true;
137      }
138      try {
139        t.join();
140      } catch (InterruptedException ie) {
141        IOException iie = new InterruptedIOException();
142        iie.initCause(ie);
143        throw iie;
144      }
145    }
146    controller.checkForErrors();
147    LOG.info("{} split writers finished; closing.", this.writerThreads.size());
148    return (!progress_failed);
149  }
150
151  public abstract List<Path> finishWritingAndClose() throws IOException;
152
153  /**
154   * @return a map from encoded region ID to the number of edits written out for that region.
155   */
156  public abstract Map<byte[], Long> getOutputCounts();
157
158  /**
159   * @return number of regions we've recovered
160   */
161  public abstract int getNumberOfRecoveredRegions();
162
163  /**
164   * @param buffer A WAL Edit Entry
165   */
166  public abstract void append(WALSplitter.RegionEntryBuffer buffer) throws IOException;
167
168  /**
169   * WriterThread call this function to help flush internal remaining edits in buffer before close
170   * @return true when underlying sink has something to flush
171   */
172  public boolean flush() throws IOException {
173    return false;
174  }
175
176  /**
177   * Some WALEdit's contain only KV's for account on what happened to a region. Not all sinks will
178   * want to get all of those edits.
179   * @return Return true if this sink wants to accept this region-level WALEdit.
180   */
181  public abstract boolean keepRegionEvent(WAL.Entry entry);
182
183  public static class WriterThread extends Thread {
184    private volatile boolean shouldStop = false;
185    private WALSplitter.PipelineController controller;
186    private EntryBuffers entryBuffers;
187    private OutputSink outputSink = null;
188
189    WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
190        OutputSink sink, int i) {
191      super(Thread.currentThread().getName() + "-Writer-" + i);
192      this.controller = controller;
193      this.entryBuffers = entryBuffers;
194      outputSink = sink;
195    }
196
197    @Override
198    public void run()  {
199      try {
200        doRun();
201      } catch (Throwable t) {
202        LOG.error("Exiting thread", t);
203        controller.writerThreadError(t);
204      }
205    }
206
207    private void doRun() throws IOException {
208      LOG.trace("Writer thread starting");
209      while (true) {
210        WALSplitter.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
211        if (buffer == null) {
212          // No data currently available, wait on some more to show up
213          synchronized (controller.dataAvailable) {
214            if (shouldStop && !this.outputSink.flush()) {
215              return;
216            }
217            try {
218              controller.dataAvailable.wait(500);
219            } catch (InterruptedException ie) {
220              if (!shouldStop) {
221                throw new RuntimeException(ie);
222              }
223            }
224          }
225          continue;
226        }
227
228        assert buffer != null;
229        try {
230          writeBuffer(buffer);
231        } finally {
232          entryBuffers.doneWriting(buffer);
233        }
234      }
235    }
236
237    private void writeBuffer(WALSplitter.RegionEntryBuffer buffer) throws IOException {
238      outputSink.append(buffer);
239    }
240
241    void setShouldStop(boolean shouldStop) {
242      this.shouldStop = shouldStop;
243    }
244
245    void finish() {
246      synchronized (controller.dataAvailable) {
247        shouldStop = true;
248        controller.dataAvailable.notifyAll();
249      }
250    }
251  }
252}