001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.wal.WALSplitUtil.getCompletedRecoveredEditsFilePath;
021import static org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath;
022
023import java.io.EOFException;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ConcurrentMap;
030
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.CellUtil;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.log.HBaseMarkers;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.ipc.RemoteException;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
043
044@InterfaceAudience.Private
045abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
046  private static final Logger LOG = LoggerFactory.getLogger(RecoveredEditsOutputSink.class);
047  private final WALSplitter walSplitter;
048  private final ConcurrentMap<String, Long> regionMaximumEditLogSeqNum = new ConcurrentHashMap<>();
049
050  public AbstractRecoveredEditsOutputSink(WALSplitter walSplitter,
051      WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
052    super(controller, entryBuffers, numWriters);
053    this.walSplitter = walSplitter;
054  }
055
056  /**
057   * @return a writer that wraps a {@link WALProvider.Writer} and its Path. Caller should close.
058   */
059  protected RecoveredEditsWriter createRecoveredEditsWriter(TableName tableName, byte[] region,
060      long seqId) throws IOException {
061    Path regionEditsPath = getRegionSplitEditsPath(tableName, region, seqId,
062      walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.getTmpDirName(),
063      walSplitter.conf);
064    if (walSplitter.walFS.exists(regionEditsPath)) {
065      LOG.warn("Found old edits file. It could be the " +
066        "result of a previous failed split attempt. Deleting " + regionEditsPath + ", length=" +
067        walSplitter.walFS.getFileStatus(regionEditsPath).getLen());
068      if (!walSplitter.walFS.delete(regionEditsPath, false)) {
069        LOG.warn("Failed delete of old {}", regionEditsPath);
070      }
071    }
072    WALProvider.Writer w = walSplitter.createWriter(regionEditsPath);
073    final String msg = "Creating recovered edits writer path=" + regionEditsPath;
074    LOG.info(msg);
075    updateStatusWithMsg(msg);
076    return new RecoveredEditsWriter(region, regionEditsPath, w, seqId);
077  }
078
079  protected Path closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter,
080      List<IOException> thrown) throws IOException {
081    try {
082      editsWriter.writer.close();
083    } catch (IOException ioe) {
084      final String errorMsg = "Could not close recovered edits at " + editsWriter.path;
085      LOG.error(errorMsg, ioe);
086      updateStatusWithMsg(errorMsg);
087      thrown.add(ioe);
088      return null;
089    }
090    final String msg = "Closed recovered edits writer path=" + editsWriter.path + " (wrote "
091      + editsWriter.editsWritten + " edits, skipped " + editsWriter.editsSkipped + " edits in " + (
092      editsWriter.nanosSpent / 1000 / 1000) + " ms)";
093    LOG.info(msg);
094    updateStatusWithMsg(msg);
095    if (editsWriter.editsWritten == 0) {
096      // just remove the empty recovered.edits file
097      if (walSplitter.walFS.exists(editsWriter.path)
098          && !walSplitter.walFS.delete(editsWriter.path, false)) {
099        final String errorMsg = "Failed deleting empty " + editsWriter.path;
100        LOG.warn(errorMsg);
101        updateStatusWithMsg(errorMsg);
102        throw new IOException("Failed deleting empty  " + editsWriter.path);
103      }
104      return null;
105    }
106
107    Path dst = getCompletedRecoveredEditsFilePath(editsWriter.path,
108      regionMaximumEditLogSeqNum.get(Bytes.toString(editsWriter.encodedRegionName)));
109    try {
110      if (!dst.equals(editsWriter.path) && walSplitter.walFS.exists(dst)) {
111        deleteOneWithFewerEntries(editsWriter, dst);
112      }
113      // Skip the unit tests which create a splitter that reads and
114      // writes the data without touching disk.
115      // TestHLogSplit#testThreading is an example.
116      if (walSplitter.walFS.exists(editsWriter.path)) {
117        if (!walSplitter.walFS.rename(editsWriter.path, dst)) {
118          final String errorMsg =
119            "Failed renaming recovered edits " + editsWriter.path + " to " + dst;
120          updateStatusWithMsg(errorMsg);
121          throw new IOException(errorMsg);
122        }
123        final String renameEditMsg = "Rename recovered edits " + editsWriter.path + " to " + dst;
124        LOG.info(renameEditMsg);
125        updateStatusWithMsg(renameEditMsg);
126      }
127    } catch (IOException ioe) {
128      final String errorMsg = "Could not rename recovered edits " + editsWriter.path
129        + " to " + dst;
130      LOG.error(errorMsg, ioe);
131      updateStatusWithMsg(errorMsg);
132      thrown.add(ioe);
133      return null;
134    }
135    return dst;
136  }
137
138  @Override
139  public boolean keepRegionEvent(WAL.Entry entry) {
140    ArrayList<Cell> cells = entry.getEdit().getCells();
141    for (Cell cell : cells) {
142      if (WALEdit.isCompactionMarker(cell)) {
143        return true;
144      }
145    }
146    return false;
147  }
148
149  /**
150   * Update region's maximum edit log SeqNum.
151   */
152  void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) {
153    synchronized (regionMaximumEditLogSeqNum) {
154      String regionName = Bytes.toString(entry.getKey().getEncodedRegionName());
155      Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName);
156      if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
157        regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
158      }
159    }
160  }
161
162  // delete the one with fewer wal entries
163  private void deleteOneWithFewerEntries(RecoveredEditsWriter editsWriter, Path dst)
164    throws IOException {
165    long dstMinLogSeqNum = -1L;
166    try (WAL.Reader reader = walSplitter.getWalFactory().createReader(walSplitter.walFS, dst)) {
167      WAL.Entry entry = reader.next();
168      if (entry != null) {
169        dstMinLogSeqNum = entry.getKey().getSequenceId();
170      }
171    } catch (EOFException e) {
172      LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst,
173        e);
174    }
175    if (editsWriter.minLogSeqNum < dstMinLogSeqNum) {
176      LOG.warn("Found existing old edits file. It could be the result of a previous failed" +
177        " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" +
178        walSplitter.walFS.getFileStatus(dst).getLen());
179      if (!walSplitter.walFS.delete(dst, false)) {
180        LOG.warn("Failed deleting of old {}", dst);
181        throw new IOException("Failed deleting of old " + dst);
182      }
183    } else {
184      LOG.warn(
185        "Found existing old edits file and we have less entries. Deleting " + editsWriter.path +
186          ", length=" + walSplitter.walFS.getFileStatus(editsWriter.path).getLen());
187      if (!walSplitter.walFS.delete(editsWriter.path, false)) {
188        LOG.warn("Failed deleting of {}", editsWriter.path);
189        throw new IOException("Failed deleting of " + editsWriter.path);
190      }
191    }
192  }
193
194  /**
195   * Private data structure that wraps a {@link WALProvider.Writer} and its Path, also collecting
196   * statistics about the data written to this output.
197   */
198  final class RecoveredEditsWriter {
199    /* Count of edits written to this path */
200    long editsWritten = 0;
201    /* Count of edits skipped to this path */
202    long editsSkipped = 0;
203    /* Number of nanos spent writing to this log */
204    long nanosSpent = 0;
205
206    final byte[] encodedRegionName;
207    final Path path;
208    final WALProvider.Writer writer;
209    final long minLogSeqNum;
210
211    RecoveredEditsWriter(byte[] encodedRegionName, Path path, WALProvider.Writer writer,
212      long minLogSeqNum) {
213      this.encodedRegionName = encodedRegionName;
214      this.path = path;
215      this.writer = writer;
216      this.minLogSeqNum = minLogSeqNum;
217    }
218
219    private void incrementEdits(int edits) {
220      editsWritten += edits;
221    }
222
223    private void incrementSkippedEdits(int skipped) {
224      editsSkipped += skipped;
225      totalSkippedEdits.addAndGet(skipped);
226    }
227
228    private void incrementNanoTime(long nanos) {
229      nanosSpent += nanos;
230    }
231
232    void writeRegionEntries(List<WAL.Entry> entries) throws IOException {
233      long startTime = System.nanoTime();
234      int editsCount = 0;
235      for (WAL.Entry logEntry : entries) {
236        filterCellByStore(logEntry);
237        if (!logEntry.getEdit().isEmpty()) {
238          try {
239            writer.append(logEntry);
240          } catch (IOException e) {
241            logAndThrowWriterAppendFailure(logEntry, e);
242          }
243          updateRegionMaximumEditLogSeqNum(logEntry);
244          editsCount++;
245        } else {
246          incrementSkippedEdits(1);
247        }
248      }
249      // Pass along summary statistics
250      incrementEdits(editsCount);
251      incrementNanoTime(System.nanoTime() - startTime);
252    }
253
254    private void logAndThrowWriterAppendFailure(WAL.Entry logEntry, IOException e)
255        throws IOException {
256      e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
257      final String errorMsg = "Failed to write log entry " + logEntry.toString() + " to log";
258      LOG.error(HBaseMarkers.FATAL, errorMsg, e);
259      updateStatusWithMsg(errorMsg);
260      throw e;
261    }
262
263    private void filterCellByStore(WAL.Entry logEntry) {
264      Map<byte[], Long> maxSeqIdInStores = walSplitter.getRegionMaxSeqIdInStores()
265          .get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
266      if (MapUtils.isEmpty(maxSeqIdInStores)) {
267        return;
268      }
269      // Create the array list for the cells that aren't filtered.
270      // We make the assumption that most cells will be kept.
271      ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size());
272      for (Cell cell : logEntry.getEdit().getCells()) {
273        if (WALEdit.isMetaEditFamily(cell)) {
274          keptCells.add(cell);
275        } else {
276          byte[] family = CellUtil.cloneFamily(cell);
277          Long maxSeqId = maxSeqIdInStores.get(family);
278          // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
279          // or the master was crashed before and we can not get the information.
280          if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) {
281            keptCells.add(cell);
282          }
283        }
284      }
285
286      // Anything in the keptCells array list is still live.
287      // So rather than removing the cells from the array list
288      // which would be an O(n^2) operation, we just replace the list
289      logEntry.getEdit().setCells(keptCells);
290    }
291  }
292}