001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.wal.WALSplitUtil.getCompletedRecoveredEditsFilePath;
021import static org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath;
022
023import java.io.EOFException;
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Map;
029import java.util.TreeMap;
030import java.util.concurrent.Callable;
031import java.util.concurrent.CompletionService;
032import java.util.concurrent.ExecutionException;
033import java.util.concurrent.ExecutorCompletionService;
034import java.util.concurrent.Future;
035import java.util.concurrent.ThreadFactory;
036import java.util.concurrent.ThreadPoolExecutor;
037import java.util.concurrent.TimeUnit;
038
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.Cell;
043import org.apache.hadoop.hbase.CellUtil;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.log.HBaseMarkers;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.FSUtils;
048import org.apache.hadoop.hbase.util.Threads;
049import org.apache.hadoop.io.MultipleIOException;
050import org.apache.hadoop.ipc.RemoteException;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
056import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
057import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
058import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
059
060/**
061 * Class that manages the output streams from the log splitting process.
062 */
063@InterfaceAudience.Private
064public class LogRecoveredEditsOutputSink extends OutputSink {
065  private static final Logger LOG = LoggerFactory.getLogger(LogRecoveredEditsOutputSink.class);
066  private WALSplitter walSplitter;
067  private FileSystem walFS;
068  private Configuration conf;
069
070  public LogRecoveredEditsOutputSink(WALSplitter walSplitter,
071      WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
072    // More threads could potentially write faster at the expense
073    // of causing more disk seeks as the logs are split.
074    // 3. After a certain setting (probably around 3) the
075    // process will be bound on the reader in the current
076    // implementation anyway.
077    super(controller, entryBuffers, numWriters);
078    this.walSplitter = walSplitter;
079    this.walFS = walSplitter.walFS;
080    this.conf = walSplitter.conf;
081  }
082
083  /**
084   * @return null if failed to report progress
085   */
086  @Override
087  public List<Path> finishWritingAndClose() throws IOException {
088    boolean isSuccessful = false;
089    List<Path> result = null;
090    try {
091      isSuccessful = finishWriting(false);
092    } finally {
093      result = close();
094      List<IOException> thrown = closeLogWriters(null);
095      if (CollectionUtils.isNotEmpty(thrown)) {
096        throw MultipleIOException.createIOException(thrown);
097      }
098    }
099    if (isSuccessful) {
100      splits = result;
101    }
102    return splits;
103  }
104
105  // delete the one with fewer wal entries
106  private void deleteOneWithFewerEntries(WALSplitter.WriterAndPath wap, Path dst)
107      throws IOException {
108    long dstMinLogSeqNum = -1L;
109    try (WAL.Reader reader = walSplitter.getWalFactory().createReader(walSplitter.walFS, dst)) {
110      WAL.Entry entry = reader.next();
111      if (entry != null) {
112        dstMinLogSeqNum = entry.getKey().getSequenceId();
113      }
114    } catch (EOFException e) {
115      LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst,
116        e);
117    }
118    if (wap.minLogSeqNum < dstMinLogSeqNum) {
119      LOG.warn("Found existing old edits file. It could be the result of a previous failed"
120          + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
121          + walFS.getFileStatus(dst).getLen());
122      if (!walFS.delete(dst, false)) {
123        LOG.warn("Failed deleting of old {}", dst);
124        throw new IOException("Failed deleting of old " + dst);
125      }
126    } else {
127      LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.path
128          + ", length=" + walFS.getFileStatus(wap.path).getLen());
129      if (!walFS.delete(wap.path, false)) {
130        LOG.warn("Failed deleting of {}", wap.path);
131        throw new IOException("Failed deleting of " + wap.path);
132      }
133    }
134  }
135
136  /**
137   * Close all of the output streams.
138   * @return the list of paths written.
139   */
140  List<Path> close() throws IOException {
141    Preconditions.checkState(!closeAndCleanCompleted);
142
143    final List<Path> paths = new ArrayList<>();
144    final List<IOException> thrown = Lists.newArrayList();
145    ThreadPoolExecutor closeThreadPool =
146        Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() {
147          private int count = 1;
148
149          @Override
150          public Thread newThread(Runnable r) {
151            Thread t = new Thread(r, "split-log-closeStream-" + count++);
152            return t;
153          }
154        });
155    CompletionService<Void> completionService = new ExecutorCompletionService<>(closeThreadPool);
156    boolean progress_failed;
157    try {
158      progress_failed = executeCloseTask(completionService, thrown, paths);
159    } catch (InterruptedException e) {
160      IOException iie = new InterruptedIOException();
161      iie.initCause(e);
162      throw iie;
163    } catch (ExecutionException e) {
164      throw new IOException(e.getCause());
165    } finally {
166      closeThreadPool.shutdownNow();
167    }
168    if (!thrown.isEmpty()) {
169      throw MultipleIOException.createIOException(thrown);
170    }
171    writersClosed = true;
172    closeAndCleanCompleted = true;
173    if (progress_failed) {
174      return null;
175    }
176    return paths;
177  }
178
179  /**
180   * @param completionService threadPool to execute the closing tasks
181   * @param thrown store the exceptions
182   * @param paths arrayList to store the paths written
183   * @return if close tasks executed successful
184   */
185  boolean executeCloseTask(CompletionService<Void> completionService, List<IOException> thrown,
186      List<Path> paths) throws InterruptedException, ExecutionException {
187    for (final Map.Entry<String, WALSplitter.SinkWriter> writersEntry : writers.entrySet()) {
188      if (LOG.isTraceEnabled()) {
189        LOG.trace(
190          "Submitting close of " + ((WALSplitter.WriterAndPath) writersEntry.getValue()).path);
191      }
192      completionService.submit(new Callable<Void>() {
193        @Override
194        public Void call() throws Exception {
195          WALSplitter.WriterAndPath wap = (WALSplitter.WriterAndPath) writersEntry.getValue();
196          Path dst = closeWriter(writersEntry.getKey(), wap, thrown);
197          paths.add(dst);
198          return null;
199        }
200      });
201    }
202    boolean progress_failed = false;
203    for (int i = 0, n = this.writers.size(); i < n; i++) {
204      Future<Void> future = completionService.take();
205      future.get();
206      if (!progress_failed && reporter != null && !reporter.progress()) {
207        progress_failed = true;
208      }
209    }
210    return progress_failed;
211  }
212
213  Path closeWriter(String encodedRegionName, WALSplitter.WriterAndPath wap,
214      List<IOException> thrown) throws IOException {
215    LOG.trace("Closing {}", wap.path);
216    try {
217      wap.writer.close();
218    } catch (IOException ioe) {
219      LOG.error("Could not close log at {}", wap.path, ioe);
220      thrown.add(ioe);
221      return null;
222    }
223    if (LOG.isDebugEnabled()) {
224      LOG.debug("Closed wap " + wap.path + " (wrote " + wap.editsWritten + " edits, skipped "
225          + wap.editsSkipped + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms");
226    }
227    if (wap.editsWritten == 0) {
228      // just remove the empty recovered.edits file
229      if (walFS.exists(wap.path) && !walFS.delete(wap.path, false)) {
230        LOG.warn("Failed deleting empty {}", wap.path);
231        throw new IOException("Failed deleting empty  " + wap.path);
232      }
233      return null;
234    }
235
236    Path dst = getCompletedRecoveredEditsFilePath(wap.path,
237      regionMaximumEditLogSeqNum.get(encodedRegionName));
238    try {
239      if (!dst.equals(wap.path) && walFS.exists(dst)) {
240        deleteOneWithFewerEntries(wap, dst);
241      }
242      // Skip the unit tests which create a splitter that reads and
243      // writes the data without touching disk.
244      // TestHLogSplit#testThreading is an example.
245      if (walFS.exists(wap.path)) {
246        if (!walFS.rename(wap.path, dst)) {
247          throw new IOException("Failed renaming " + wap.path + " to " + dst);
248        }
249        LOG.info("Rename {} to {}", wap.path, dst);
250      }
251    } catch (IOException ioe) {
252      LOG.error("Could not rename {} to {}", wap.path, dst, ioe);
253      thrown.add(ioe);
254      return null;
255    }
256    return dst;
257  }
258
259  private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
260    if (writersClosed) {
261      return thrown;
262    }
263    if (thrown == null) {
264      thrown = Lists.newArrayList();
265    }
266    try {
267      for (WriterThread writerThread : writerThreads) {
268        while (writerThread.isAlive()) {
269          writerThread.setShouldStop(true);
270          writerThread.interrupt();
271          try {
272            writerThread.join(10);
273          } catch (InterruptedException e) {
274            IOException iie = new InterruptedIOException();
275            iie.initCause(e);
276            throw iie;
277          }
278        }
279      }
280    } finally {
281      WALSplitter.WriterAndPath wap = null;
282      for (WALSplitter.SinkWriter tmpWAP : writers.values()) {
283        try {
284          wap = (WALSplitter.WriterAndPath) tmpWAP;
285          wap.writer.close();
286        } catch (IOException ioe) {
287          LOG.error("Couldn't close log at {}", wap.path, ioe);
288          thrown.add(ioe);
289          continue;
290        }
291        LOG.info("Closed log " + wap.path + " (wrote " + wap.editsWritten + " edits in "
292            + (wap.nanosSpent / 1000 / 1000) + "ms)");
293      }
294      writersClosed = true;
295    }
296
297    return thrown;
298  }
299
300  /**
301   * Get a writer and path for a log starting at the given entry. This function is threadsafe so
302   * long as multiple threads are always acting on different regions.
303   * @return null if this region shouldn't output any logs
304   */
305  WALSplitter.WriterAndPath getWriterAndPath(WAL.Entry entry, boolean reusable) throws IOException {
306    byte[] region = entry.getKey().getEncodedRegionName();
307    String regionName = Bytes.toString(region);
308    WALSplitter.WriterAndPath ret = (WALSplitter.WriterAndPath) writers.get(regionName);
309    if (ret != null) {
310      return ret;
311    }
312    // If we already decided that this region doesn't get any output
313    // we don't need to check again.
314    if (blacklistedRegions.contains(region)) {
315      return null;
316    }
317    ret = createWAP(region, entry);
318    if (ret == null) {
319      blacklistedRegions.add(region);
320      return null;
321    }
322    if (reusable) {
323      writers.put(regionName, ret);
324    }
325    return ret;
326  }
327
328  /**
329   * @return a path with a write for that path. caller should close.
330   */
331  WALSplitter.WriterAndPath createWAP(byte[] region, WAL.Entry entry) throws IOException {
332    String tmpDirName = walSplitter.conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
333      HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
334    Path regionedits = getRegionSplitEditsPath(entry,
335      walSplitter.getFileBeingSplit().getPath().getName(), tmpDirName, conf);
336    if (regionedits == null) {
337      return null;
338    }
339    FileSystem walFs = FSUtils.getWALFileSystem(conf);
340    if (walFs.exists(regionedits)) {
341      LOG.warn("Found old edits file. It could be the "
342          + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
343          + walFs.getFileStatus(regionedits).getLen());
344      if (!walFs.delete(regionedits, false)) {
345        LOG.warn("Failed delete of old {}", regionedits);
346      }
347    }
348    WALProvider.Writer w = walSplitter.createWriter(regionedits);
349    LOG.debug("Creating writer path={}", regionedits);
350    return new WALSplitter.WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
351  }
352
353
354
355  void filterCellByStore(WAL.Entry logEntry) {
356    Map<byte[], Long> maxSeqIdInStores = walSplitter.getRegionMaxSeqIdInStores()
357        .get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
358    if (MapUtils.isEmpty(maxSeqIdInStores)) {
359      return;
360    }
361    // Create the array list for the cells that aren't filtered.
362    // We make the assumption that most cells will be kept.
363    ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size());
364    for (Cell cell : logEntry.getEdit().getCells()) {
365      if (WALEdit.isMetaEditFamily(cell)) {
366        keptCells.add(cell);
367      } else {
368        byte[] family = CellUtil.cloneFamily(cell);
369        Long maxSeqId = maxSeqIdInStores.get(family);
370        // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
371        // or the master was crashed before and we can not get the information.
372        if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) {
373          keptCells.add(cell);
374        }
375      }
376    }
377
378    // Anything in the keptCells array list is still live.
379    // So rather than removing the cells from the array list
380    // which would be an O(n^2) operation, we just replace the list
381    logEntry.getEdit().setCells(keptCells);
382  }
383
384  @Override
385  public void append(WALSplitter.RegionEntryBuffer buffer) throws IOException {
386    appendBuffer(buffer, true);
387  }
388
389  WALSplitter.WriterAndPath appendBuffer(WALSplitter.RegionEntryBuffer buffer, boolean reusable)
390      throws IOException {
391    List<WAL.Entry> entries = buffer.entryBuffer;
392    if (entries.isEmpty()) {
393      LOG.warn("got an empty buffer, skipping");
394      return null;
395    }
396
397    WALSplitter.WriterAndPath wap = null;
398
399    long startTime = System.nanoTime();
400    try {
401      int editsCount = 0;
402
403      for (WAL.Entry logEntry : entries) {
404        if (wap == null) {
405          wap = getWriterAndPath(logEntry, reusable);
406          if (wap == null) {
407            // This log spews the full edit. Can be massive in the log. Enable only debugging
408            // WAL lost edit issues.
409            LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry);
410            return null;
411          }
412        }
413        filterCellByStore(logEntry);
414        if (!logEntry.getEdit().isEmpty()) {
415          wap.writer.append(logEntry);
416          this.updateRegionMaximumEditLogSeqNum(logEntry);
417          editsCount++;
418        } else {
419          wap.incrementSkippedEdits(1);
420        }
421      }
422      // Pass along summary statistics
423      wap.incrementEdits(editsCount);
424      wap.incrementNanoTime(System.nanoTime() - startTime);
425    } catch (IOException e) {
426      e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
427      LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
428      throw e;
429    }
430    return wap;
431  }
432
433  @Override
434  public boolean keepRegionEvent(WAL.Entry entry) {
435    ArrayList<Cell> cells = entry.getEdit().getCells();
436    for (Cell cell : cells) {
437      if (WALEdit.isCompactionMarker(cell)) {
438        return true;
439      }
440    }
441    return false;
442  }
443
444  /**
445   * @return a map from encoded region ID to the number of edits written out for that region.
446   */
447  @Override
448  public Map<byte[], Long> getOutputCounts() {
449    TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
450    for (Map.Entry<String, WALSplitter.SinkWriter> entry : writers.entrySet()) {
451      ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
452    }
453    return ret;
454  }
455
456  @Override
457  public int getNumberOfRecoveredRegions() {
458    return writers.size();
459  }
460}