001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.wal.WALSplitUtil.getCompletedRecoveredEditsFilePath;
021import static org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath;
022
023import java.io.EOFException;
024import java.io.IOException;
025import java.net.URLEncoder;
026import java.nio.charset.StandardCharsets;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.Map;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.ConcurrentMap;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellUtil;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.log.HBaseMarkers;
038import org.apache.hadoop.hbase.util.Addressing;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.ipc.RemoteException;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
046
047@InterfaceAudience.Private
048abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
049  private static final Logger LOG = LoggerFactory.getLogger(RecoveredEditsOutputSink.class);
050  private final WALSplitter walSplitter;
051  private final ConcurrentMap<String, Long> regionMaximumEditLogSeqNum = new ConcurrentHashMap<>();
052  private static final int MAX_RENAME_RETRY_COUNT = 5;
053
054  public AbstractRecoveredEditsOutputSink(WALSplitter walSplitter,
055    WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
056    super(controller, entryBuffers, numWriters);
057    this.walSplitter = walSplitter;
058  }
059
060  /** Returns a writer that wraps a {@link WALProvider.Writer} and its Path. Caller should close. */
061  protected RecoveredEditsWriter createRecoveredEditsWriter(TableName tableName, byte[] region,
062    long seqId) throws IOException {
063    // If multiple worker are splitting a WAL at a same time, both should use unique file name to
064    // avoid conflict
065    Path regionEditsPath = getRegionSplitEditsPath(tableName, region, seqId,
066      walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.getTmpDirName(),
067      walSplitter.conf, getWorkerNameComponent());
068
069    if (walSplitter.walFS.exists(regionEditsPath)) {
070      LOG.warn("Found old edits file. It could be the "
071        + "result of a previous failed split attempt. Deleting " + regionEditsPath + ", length="
072        + walSplitter.walFS.getFileStatus(regionEditsPath).getLen());
073      if (!walSplitter.walFS.delete(regionEditsPath, false)) {
074        LOG.warn("Failed delete of old {}", regionEditsPath);
075      }
076    }
077    WALProvider.Writer w = walSplitter.createWriter(regionEditsPath);
078    final String msg = "Creating recovered edits writer path=" + regionEditsPath;
079    LOG.info(msg);
080    updateStatusWithMsg(msg);
081    return new RecoveredEditsWriter(region, regionEditsPath, w, seqId);
082  }
083
084  private String getWorkerNameComponent() {
085    if (walSplitter.rsServices == null) {
086      return "";
087    }
088    return URLEncoder.encode(
089      walSplitter.rsServices.getServerName().toShortString()
090        .replace(Addressing.HOSTNAME_PORT_SEPARATOR, ServerName.SERVERNAME_SEPARATOR),
091      StandardCharsets.UTF_8);
092  }
093
094  /**
095   * abortRecoveredEditsWriter closes the editsWriter, but does not rename and finalize the
096   * recovered edits WAL files. Please see HBASE-28569.
097   */
098  protected void abortRecoveredEditsWriter(RecoveredEditsWriter editsWriter,
099    List<IOException> thrown) {
100    closeRecoveredEditsWriter(editsWriter, thrown);
101    try {
102      removeRecoveredEditsFile(editsWriter);
103    } catch (IOException ioe) {
104      final String errorMsg = "Failed removing recovered edits file at " + editsWriter.path;
105      LOG.error(errorMsg);
106      updateStatusWithMsg(errorMsg);
107    }
108  }
109
110  protected Path closeRecoveredEditsWriterAndFinalizeEdits(RecoveredEditsWriter editsWriter,
111    List<IOException> thrown) throws IOException {
112    if (!closeRecoveredEditsWriter(editsWriter, thrown)) {
113      return null;
114    }
115    if (editsWriter.editsWritten == 0) {
116      // just remove the empty recovered.edits file
117      removeRecoveredEditsFile(editsWriter);
118      return null;
119    }
120
121    Path dst = getCompletedRecoveredEditsFilePath(editsWriter.path,
122      regionMaximumEditLogSeqNum.get(Bytes.toString(editsWriter.encodedRegionName)));
123    try {
124      // Skip the unit tests which create a splitter that reads and
125      // writes the data without touching disk.
126      // TestHLogSplit#testThreading is an example.
127      if (walSplitter.walFS.exists(editsWriter.path)) {
128        boolean retry;
129        int retryCount = 0;
130        do {
131          retry = false;
132          retryCount++;
133          // If rename is successful, it means recovered edits are successfully places at right
134          // place but if rename fails, there can be below reasons
135          // 1. dst already exist - in this case if dst have desired edits we will keep the dst,
136          // delete the editsWriter.path and consider this success else if dst have fewer edits, we
137          // will delete the dst and retry the rename
138          // 2. parent directory does not exit - in one edge case this is possible when this worker
139          // got stuck before rename and HMaster get another worker to split the wal, SCP will
140          // proceed and once region get opened on one RS, we delete the recovered.edits directory,
141          // in this case there is no harm in failing this procedure after retry exhausted.
142          if (!walSplitter.walFS.rename(editsWriter.path, dst)) {
143            retry = deleteOneWithFewerEntriesToRetry(editsWriter, dst);
144          }
145        } while (retry && retryCount < MAX_RENAME_RETRY_COUNT);
146
147        // If we are out of loop with retry flag `true` it means we have exhausted the retries.
148        if (retry) {
149          final String errorMsg = "Failed renaming recovered edits " + editsWriter.path + " to "
150            + dst + " in " + MAX_RENAME_RETRY_COUNT + " retries";
151          updateStatusWithMsg(errorMsg);
152          throw new IOException(errorMsg);
153        } else {
154          final String renameEditMsg = "Rename recovered edits " + editsWriter.path + " to " + dst;
155          LOG.info(renameEditMsg);
156          updateStatusWithMsg(renameEditMsg);
157        }
158      }
159    } catch (IOException ioe) {
160      final String errorMsg = "Could not rename recovered edits " + editsWriter.path + " to " + dst;
161      LOG.error(errorMsg, ioe);
162      updateStatusWithMsg(errorMsg);
163      thrown.add(ioe);
164      return null;
165    }
166    return dst;
167  }
168
169  private boolean closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter,
170    List<IOException> thrown) {
171    try {
172      editsWriter.writer.close();
173    } catch (IOException ioe) {
174      final String errorMsg = "Could not close recovered edits at " + editsWriter.path;
175      LOG.error(errorMsg, ioe);
176      updateStatusWithMsg(errorMsg);
177      thrown.add(ioe);
178      return false;
179    }
180    final String msg = "Closed recovered edits writer path=" + editsWriter.path + " (wrote "
181      + editsWriter.editsWritten + " edits, skipped " + editsWriter.editsSkipped + " edits in "
182      + (editsWriter.nanosSpent / 1000 / 1000) + " ms)";
183    LOG.info(msg);
184    updateStatusWithMsg(msg);
185    return true;
186  }
187
188  private void removeRecoveredEditsFile(RecoveredEditsWriter editsWriter) throws IOException {
189    if (
190      walSplitter.walFS.exists(editsWriter.path)
191        && !walSplitter.walFS.delete(editsWriter.path, false)
192    ) {
193      final String errorMsg = "Failed deleting empty " + editsWriter.path;
194      LOG.warn(errorMsg);
195      updateStatusWithMsg(errorMsg);
196      throw new IOException("Failed deleting empty  " + editsWriter.path);
197    }
198  }
199
200  @Override
201  public boolean keepRegionEvent(WAL.Entry entry) {
202    ArrayList<Cell> cells = entry.getEdit().getCells();
203    for (Cell cell : cells) {
204      if (WALEdit.isCompactionMarker(cell)) {
205        return true;
206      }
207    }
208    return false;
209  }
210
211  /**
212   * Update region's maximum edit log SeqNum.
213   */
214  void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) {
215    synchronized (regionMaximumEditLogSeqNum) {
216      String regionName = Bytes.toString(entry.getKey().getEncodedRegionName());
217      Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName);
218      if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
219        regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
220      }
221    }
222  }
223
224  // delete the one with fewer wal entries
225  private boolean deleteOneWithFewerEntriesToRetry(RecoveredEditsWriter editsWriter, Path dst)
226    throws IOException {
227    if (!walSplitter.walFS.exists(dst)) {
228      LOG.info("dst {} doesn't exist, need to retry ", dst);
229      return true;
230    }
231
232    if (isDstHasFewerEntries(editsWriter, dst)) {
233      LOG.warn("Found existing old edits file. It could be the result of a previous failed"
234        + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
235        + walSplitter.walFS.getFileStatus(dst).getLen() + " and retry is needed");
236      if (!walSplitter.walFS.delete(dst, false)) {
237        LOG.warn("Failed deleting of old {}", dst);
238        throw new IOException("Failed deleting of old " + dst);
239      }
240      return true;
241    } else {
242      LOG
243        .warn("Found existing old edits file and we have less entries. Deleting " + editsWriter.path
244          + ", length=" + walSplitter.walFS.getFileStatus(editsWriter.path).getLen()
245          + " and no retry needed as dst has all edits");
246      if (!walSplitter.walFS.delete(editsWriter.path, false)) {
247        LOG.warn("Failed deleting of {}", editsWriter.path);
248        throw new IOException("Failed deleting of " + editsWriter.path);
249      }
250      return false;
251    }
252  }
253
254  private boolean isDstHasFewerEntries(RecoveredEditsWriter editsWriter, Path dst)
255    throws IOException {
256    long dstMinLogSeqNum = -1L;
257    try (WALStreamReader reader =
258      walSplitter.getWalFactory().createStreamReader(walSplitter.walFS, dst)) {
259      WAL.Entry entry = reader.next();
260      if (entry != null) {
261        dstMinLogSeqNum = entry.getKey().getSequenceId();
262      }
263    } catch (EOFException e) {
264      LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst,
265        e);
266    }
267    return editsWriter.minLogSeqNum < dstMinLogSeqNum;
268  }
269
270  /**
271   * Private data structure that wraps a {@link WALProvider.Writer} and its Path, also collecting
272   * statistics about the data written to this output.
273   */
274  final class RecoveredEditsWriter {
275    /* Count of edits written to this path */
276    long editsWritten = 0;
277    /* Count of edits skipped to this path */
278    long editsSkipped = 0;
279    /* Number of nanos spent writing to this log */
280    long nanosSpent = 0;
281
282    final byte[] encodedRegionName;
283    final Path path;
284    final WALProvider.Writer writer;
285    final long minLogSeqNum;
286
287    RecoveredEditsWriter(byte[] encodedRegionName, Path path, WALProvider.Writer writer,
288      long minLogSeqNum) {
289      this.encodedRegionName = encodedRegionName;
290      this.path = path;
291      this.writer = writer;
292      this.minLogSeqNum = minLogSeqNum;
293    }
294
295    private void incrementEdits(int edits) {
296      editsWritten += edits;
297    }
298
299    private void incrementSkippedEdits(int skipped) {
300      editsSkipped += skipped;
301      totalSkippedEdits.addAndGet(skipped);
302    }
303
304    private void incrementNanoTime(long nanos) {
305      nanosSpent += nanos;
306    }
307
308    void writeRegionEntries(List<WAL.Entry> entries) throws IOException {
309      long startTime = System.nanoTime();
310      int editsCount = 0;
311      for (WAL.Entry logEntry : entries) {
312        filterCellByStore(logEntry);
313        if (!logEntry.getEdit().isEmpty()) {
314          try {
315            writer.append(logEntry);
316          } catch (IOException e) {
317            logAndThrowWriterAppendFailure(logEntry, e);
318          }
319          updateRegionMaximumEditLogSeqNum(logEntry);
320          editsCount++;
321        } else {
322          incrementSkippedEdits(1);
323        }
324      }
325      // Pass along summary statistics
326      incrementEdits(editsCount);
327      incrementNanoTime(System.nanoTime() - startTime);
328    }
329
330    private void logAndThrowWriterAppendFailure(WAL.Entry logEntry, IOException e)
331      throws IOException {
332      e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
333      final String errorMsg = "Failed to write log entry " + logEntry.toString() + " to log";
334      LOG.error(HBaseMarkers.FATAL, errorMsg, e);
335      updateStatusWithMsg(errorMsg);
336      throw e;
337    }
338
339    private void filterCellByStore(WAL.Entry logEntry) {
340      Map<byte[], Long> maxSeqIdInStores = walSplitter.getRegionMaxSeqIdInStores()
341        .get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
342      if (MapUtils.isEmpty(maxSeqIdInStores)) {
343        return;
344      }
345      // Create the array list for the cells that aren't filtered.
346      // We make the assumption that most cells will be kept.
347      ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size());
348      for (Cell cell : logEntry.getEdit().getCells()) {
349        if (WALEdit.isMetaEditFamily(cell)) {
350          keptCells.add(cell);
351        } else {
352          byte[] family = CellUtil.cloneFamily(cell);
353          Long maxSeqId = maxSeqIdInStores.get(family);
354          // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
355          // or the master was crashed before and we can not get the information.
356          if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) {
357            keptCells.add(cell);
358          }
359        }
360      }
361
362      // Anything in the keptCells array list is still live.
363      // So rather than removing the cells from the array list
364      // which would be an O(n^2) operation, we just replace the list
365      logEntry.getEdit().setCells(keptCells);
366    }
367  }
368}