001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.List;
023import java.util.Map;
024import java.util.TreeMap;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.Future;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.monitoring.MonitoredTask;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.io.MultipleIOException;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
038
039/**
040 * Class that manages the output streams from the log splitting process.
041 * Every region only has one recovered edits file PER split WAL (if we split
042 * multiple WALs during a log-splitting session, on open, a Region may
043 * have multiple recovered.edits files to replay -- one per split WAL).
044 * @see BoundedRecoveredEditsOutputSink which is like this class but imposes upper bound on
045 *   the number of writers active at one time (makes for better throughput).
046 */
047@InterfaceAudience.Private
048class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
049  private static final Logger LOG = LoggerFactory.getLogger(RecoveredEditsOutputSink.class);
050  private ConcurrentMap<String, RecoveredEditsWriter> writers = new ConcurrentHashMap<>();
051
052  public RecoveredEditsOutputSink(WALSplitter walSplitter,
053      WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
054    super(walSplitter, controller, entryBuffers, numWriters);
055  }
056
057  @Override
058  public void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException {
059    List<WAL.Entry> entries = buffer.entries;
060    if (entries.isEmpty()) {
061      LOG.warn("got an empty buffer, skipping");
062      return;
063    }
064    RecoveredEditsWriter writer =
065      getRecoveredEditsWriter(buffer.tableName, buffer.encodedRegionName,
066        entries.get(0).getKey().getSequenceId());
067    if (writer != null) {
068      writer.writeRegionEntries(entries);
069    }
070  }
071
072  /**
073   * Get a writer and path for a log starting at the given entry. This function is threadsafe so
074   * long as multiple threads are always acting on different regions.
075   * @return null if this region shouldn't output any logs
076   */
077  private RecoveredEditsWriter getRecoveredEditsWriter(TableName tableName, byte[] region,
078      long seqId) throws IOException {
079    RecoveredEditsWriter ret = writers.get(Bytes.toString(region));
080    if (ret != null) {
081      return ret;
082    }
083    ret = createRecoveredEditsWriter(tableName, region, seqId);
084    if (ret == null) {
085      return null;
086    }
087    LOG.trace("Created {}", ret.path);
088    writers.put(Bytes.toString(region), ret);
089    return ret;
090  }
091
092  @Override
093  public List<Path> close() throws IOException {
094    boolean isSuccessful = true;
095    try {
096      isSuccessful = finishWriterThreads(false);
097    } finally {
098      isSuccessful &= closeWriters();
099    }
100    return isSuccessful ? splits : null;
101  }
102
103  /**
104   * Close all of the output streams.
105   *
106   * @return true when there is no error.
107   */
108  private boolean closeWriters() throws IOException {
109    List<IOException> thrown = Lists.newArrayList();
110    for (RecoveredEditsWriter writer : writers.values()) {
111      closeCompletionService.submit(() -> {
112        Path dst = closeRecoveredEditsWriter(writer, thrown);
113        LOG.trace("Closed {}", dst);
114        splits.add(dst);
115        return null;
116      });
117    }
118    boolean progressFailed = false;
119    try {
120      for (int i = 0, n = this.writers.size(); i < n; i++) {
121        Future<Void> future = closeCompletionService.take();
122        future.get();
123        if (!progressFailed && reporter != null && !reporter.progress()) {
124          progressFailed = true;
125        }
126      }
127    } catch (InterruptedException e) {
128      IOException iie = new InterruptedIOException();
129      iie.initCause(e);
130      throw iie;
131    } catch (ExecutionException e) {
132      throw new IOException(e.getCause());
133    } finally {
134      closeThreadPool.shutdownNow();
135    }
136    if (!thrown.isEmpty()) {
137      throw MultipleIOException.createIOException(thrown);
138    }
139    return !progressFailed;
140  }
141
142  @Override
143  public Map<String, Long> getOutputCounts() {
144    TreeMap<String, Long> ret = new TreeMap<>();
145    for (Map.Entry<String, RecoveredEditsWriter> entry : writers.entrySet()) {
146      ret.put(entry.getKey(), entry.getValue().editsWritten);
147    }
148    return ret;
149  }
150
151  @Override
152  public int getNumberOfRecoveredRegions() {
153    return writers.size();
154  }
155
156  @Override
157  public int getNumOpenWriters() {
158    return writers.size();
159  }
160}