001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.Callable;
026import java.util.concurrent.CompletionService;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.Future;
030
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.io.MultipleIOException;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * Class that manages the output streams from the log splitting process.
040 * Bounded means the output streams will be no more than the size of threadpool
041 */
042@InterfaceAudience.Private
043public class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink {
044  private static final Logger LOG =
045      LoggerFactory.getLogger(BoundedLogWriterCreationOutputSink.class);
046
047  private ConcurrentHashMap<String, Long> regionRecoverStatMap = new ConcurrentHashMap<>();
048
049  public BoundedLogWriterCreationOutputSink(WALSplitter walSplitter,
050      WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
051    super(walSplitter, controller, entryBuffers, numWriters);
052  }
053
054  @Override
055  public List<Path> finishWritingAndClose() throws IOException {
056    boolean isSuccessful;
057    List<Path> result;
058    try {
059      isSuccessful = finishWriting(false);
060    } finally {
061      result = close();
062    }
063    if (isSuccessful) {
064      splits = result;
065    }
066    return splits;
067  }
068
069  @Override
070  boolean executeCloseTask(CompletionService<Void> completionService, List<IOException> thrown,
071      List<Path> paths) throws InterruptedException, ExecutionException {
072    for (final Map.Entry<byte[], WALSplitter.RegionEntryBuffer> buffer : entryBuffers.buffers
073        .entrySet()) {
074      LOG.info("Submitting writeThenClose of {}",
075          Bytes.toString(buffer.getValue().encodedRegionName));
076      completionService.submit(new Callable<Void>() {
077        @Override
078        public Void call() throws Exception {
079          Path dst = writeThenClose(buffer.getValue());
080          paths.add(dst);
081          return null;
082        }
083      });
084    }
085    boolean progress_failed = false;
086    for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
087      Future<Void> future = completionService.take();
088      future.get();
089      if (!progress_failed && reporter != null && !reporter.progress()) {
090        progress_failed = true;
091      }
092    }
093
094    return progress_failed;
095  }
096
097  /**
098   * since the splitting process may create multiple output files, we need a map
099   * regionRecoverStatMap to track the output count of each region.
100   * @return a map from encoded region ID to the number of edits written out for that region.
101   */
102  @Override
103  public Map<byte[], Long> getOutputCounts() {
104    Map<byte[], Long> regionRecoverStatMapResult = new HashMap<>();
105    for (Map.Entry<String, Long> entry : regionRecoverStatMap.entrySet()) {
106      regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue());
107    }
108    return regionRecoverStatMapResult;
109  }
110
111  /**
112   * @return the number of recovered regions
113   */
114  @Override
115  public int getNumberOfRecoveredRegions() {
116    return regionRecoverStatMap.size();
117  }
118
119  /**
120   * Append the buffer to a new recovered edits file, then close it after all done
121   * @param buffer contain all entries of a certain region
122   * @throws IOException when closeWriter failed
123   */
124  @Override
125  public void append(WALSplitter.RegionEntryBuffer buffer) throws IOException {
126    writeThenClose(buffer);
127  }
128
129  private Path writeThenClose(WALSplitter.RegionEntryBuffer buffer) throws IOException {
130    WALSplitter.WriterAndPath wap = appendBuffer(buffer, false);
131    if (wap != null) {
132      String encodedRegionName = Bytes.toString(buffer.encodedRegionName);
133      Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten);
134      if (value != null) {
135        Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten;
136        regionRecoverStatMap.put(encodedRegionName, newValue);
137      }
138    }
139
140    Path dst = null;
141    List<IOException> thrown = new ArrayList<>();
142    if (wap != null) {
143      dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown);
144    }
145    if (!thrown.isEmpty()) {
146      throw MultipleIOException.createIOException(thrown);
147    }
148    return dst;
149  }
150}