001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.wal.WALSplitUtil.getCompletedRecoveredEditsFilePath; 021import static org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath; 022 023import java.io.EOFException; 024import java.io.IOException; 025import java.net.URLEncoder; 026import java.nio.charset.StandardCharsets; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Map; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.ConcurrentMap; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellUtil; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.log.HBaseMarkers; 038import org.apache.hadoop.hbase.util.Addressing; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.ipc.RemoteException; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; 046 047@InterfaceAudience.Private 048abstract class AbstractRecoveredEditsOutputSink extends OutputSink { 049 private static final Logger LOG = LoggerFactory.getLogger(RecoveredEditsOutputSink.class); 050 private final WALSplitter walSplitter; 051 private final ConcurrentMap<String, Long> regionMaximumEditLogSeqNum = new ConcurrentHashMap<>(); 052 private static final int MAX_RENAME_RETRY_COUNT = 5; 053 054 public AbstractRecoveredEditsOutputSink(WALSplitter walSplitter, 055 WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { 056 super(controller, entryBuffers, numWriters); 057 this.walSplitter = walSplitter; 058 } 059 060 /** Returns a writer that wraps a {@link WALProvider.Writer} and its Path. Caller should close. */ 061 protected RecoveredEditsWriter createRecoveredEditsWriter(TableName tableName, byte[] region, 062 long seqId) throws IOException { 063 // If multiple worker are splitting a WAL at a same time, both should use unique file name to 064 // avoid conflict 065 Path regionEditsPath = getRegionSplitEditsPath(tableName, region, seqId, 066 walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.getTmpDirName(), 067 walSplitter.conf, getWorkerNameComponent()); 068 069 if (walSplitter.walFS.exists(regionEditsPath)) { 070 LOG.warn("Found old edits file. It could be the " 071 + "result of a previous failed split attempt. Deleting " + regionEditsPath + ", length=" 072 + walSplitter.walFS.getFileStatus(regionEditsPath).getLen()); 073 if (!walSplitter.walFS.delete(regionEditsPath, false)) { 074 LOG.warn("Failed delete of old {}", regionEditsPath); 075 } 076 } 077 WALProvider.Writer w = walSplitter.createWriter(regionEditsPath); 078 final String msg = "Creating recovered edits writer path=" + regionEditsPath; 079 LOG.info(msg); 080 updateStatusWithMsg(msg); 081 return new RecoveredEditsWriter(region, regionEditsPath, w, seqId); 082 } 083 084 private String getWorkerNameComponent() { 085 if (walSplitter.rsServices == null) { 086 return ""; 087 } 088 return URLEncoder.encode( 089 walSplitter.rsServices.getServerName().toShortString() 090 .replace(Addressing.HOSTNAME_PORT_SEPARATOR, ServerName.SERVERNAME_SEPARATOR), 091 StandardCharsets.UTF_8); 092 } 093 094 /** 095 * abortRecoveredEditsWriter closes the editsWriter, but does not rename and finalize the 096 * recovered edits WAL files. Please see HBASE-28569. 097 */ 098 protected void abortRecoveredEditsWriter(RecoveredEditsWriter editsWriter, 099 List<IOException> thrown) { 100 closeRecoveredEditsWriter(editsWriter, thrown); 101 try { 102 removeRecoveredEditsFile(editsWriter); 103 } catch (IOException ioe) { 104 final String errorMsg = "Failed removing recovered edits file at " + editsWriter.path; 105 LOG.error(errorMsg); 106 updateStatusWithMsg(errorMsg); 107 } 108 } 109 110 protected Path closeRecoveredEditsWriterAndFinalizeEdits(RecoveredEditsWriter editsWriter, 111 List<IOException> thrown) throws IOException { 112 if (!closeRecoveredEditsWriter(editsWriter, thrown)) { 113 return null; 114 } 115 if (editsWriter.editsWritten == 0) { 116 // just remove the empty recovered.edits file 117 removeRecoveredEditsFile(editsWriter); 118 return null; 119 } 120 121 Path dst = getCompletedRecoveredEditsFilePath(editsWriter.path, 122 regionMaximumEditLogSeqNum.get(Bytes.toString(editsWriter.encodedRegionName))); 123 try { 124 // Skip the unit tests which create a splitter that reads and 125 // writes the data without touching disk. 126 // TestHLogSplit#testThreading is an example. 127 if (walSplitter.walFS.exists(editsWriter.path)) { 128 boolean retry; 129 int retryCount = 0; 130 do { 131 retry = false; 132 retryCount++; 133 // If rename is successful, it means recovered edits are successfully places at right 134 // place but if rename fails, there can be below reasons 135 // 1. dst already exist - in this case if dst have desired edits we will keep the dst, 136 // delete the editsWriter.path and consider this success else if dst have fewer edits, we 137 // will delete the dst and retry the rename 138 // 2. parent directory does not exit - in one edge case this is possible when this worker 139 // got stuck before rename and HMaster get another worker to split the wal, SCP will 140 // proceed and once region get opened on one RS, we delete the recovered.edits directory, 141 // in this case there is no harm in failing this procedure after retry exhausted. 142 if (!walSplitter.walFS.rename(editsWriter.path, dst)) { 143 retry = deleteOneWithFewerEntriesToRetry(editsWriter, dst); 144 } 145 } while (retry && retryCount < MAX_RENAME_RETRY_COUNT); 146 147 // If we are out of loop with retry flag `true` it means we have exhausted the retries. 148 if (retry) { 149 final String errorMsg = "Failed renaming recovered edits " + editsWriter.path + " to " 150 + dst + " in " + MAX_RENAME_RETRY_COUNT + " retries"; 151 updateStatusWithMsg(errorMsg); 152 throw new IOException(errorMsg); 153 } else { 154 final String renameEditMsg = "Rename recovered edits " + editsWriter.path + " to " + dst; 155 LOG.info(renameEditMsg); 156 updateStatusWithMsg(renameEditMsg); 157 } 158 } 159 } catch (IOException ioe) { 160 final String errorMsg = "Could not rename recovered edits " + editsWriter.path + " to " + dst; 161 LOG.error(errorMsg, ioe); 162 updateStatusWithMsg(errorMsg); 163 thrown.add(ioe); 164 return null; 165 } 166 return dst; 167 } 168 169 private boolean closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter, 170 List<IOException> thrown) { 171 try { 172 editsWriter.writer.close(); 173 } catch (IOException ioe) { 174 final String errorMsg = "Could not close recovered edits at " + editsWriter.path; 175 LOG.error(errorMsg, ioe); 176 updateStatusWithMsg(errorMsg); 177 thrown.add(ioe); 178 return false; 179 } 180 final String msg = "Closed recovered edits writer path=" + editsWriter.path + " (wrote " 181 + editsWriter.editsWritten + " edits, skipped " + editsWriter.editsSkipped + " edits in " 182 + (editsWriter.nanosSpent / 1000 / 1000) + " ms)"; 183 LOG.info(msg); 184 updateStatusWithMsg(msg); 185 return true; 186 } 187 188 private void removeRecoveredEditsFile(RecoveredEditsWriter editsWriter) throws IOException { 189 if ( 190 walSplitter.walFS.exists(editsWriter.path) 191 && !walSplitter.walFS.delete(editsWriter.path, false) 192 ) { 193 final String errorMsg = "Failed deleting empty " + editsWriter.path; 194 LOG.warn(errorMsg); 195 updateStatusWithMsg(errorMsg); 196 throw new IOException("Failed deleting empty " + editsWriter.path); 197 } 198 } 199 200 @Override 201 public boolean keepRegionEvent(WAL.Entry entry) { 202 ArrayList<Cell> cells = entry.getEdit().getCells(); 203 for (Cell cell : cells) { 204 if (WALEdit.isCompactionMarker(cell)) { 205 return true; 206 } 207 } 208 return false; 209 } 210 211 /** 212 * Update region's maximum edit log SeqNum. 213 */ 214 void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) { 215 synchronized (regionMaximumEditLogSeqNum) { 216 String regionName = Bytes.toString(entry.getKey().getEncodedRegionName()); 217 Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName); 218 if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) { 219 regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId()); 220 } 221 } 222 } 223 224 // delete the one with fewer wal entries 225 private boolean deleteOneWithFewerEntriesToRetry(RecoveredEditsWriter editsWriter, Path dst) 226 throws IOException { 227 if (!walSplitter.walFS.exists(dst)) { 228 LOG.info("dst {} doesn't exist, need to retry ", dst); 229 return true; 230 } 231 232 if (isDstHasFewerEntries(editsWriter, dst)) { 233 LOG.warn("Found existing old edits file. It could be the result of a previous failed" 234 + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" 235 + walSplitter.walFS.getFileStatus(dst).getLen() + " and retry is needed"); 236 if (!walSplitter.walFS.delete(dst, false)) { 237 LOG.warn("Failed deleting of old {}", dst); 238 throw new IOException("Failed deleting of old " + dst); 239 } 240 return true; 241 } else { 242 LOG 243 .warn("Found existing old edits file and we have less entries. Deleting " + editsWriter.path 244 + ", length=" + walSplitter.walFS.getFileStatus(editsWriter.path).getLen() 245 + " and no retry needed as dst has all edits"); 246 if (!walSplitter.walFS.delete(editsWriter.path, false)) { 247 LOG.warn("Failed deleting of {}", editsWriter.path); 248 throw new IOException("Failed deleting of " + editsWriter.path); 249 } 250 return false; 251 } 252 } 253 254 private boolean isDstHasFewerEntries(RecoveredEditsWriter editsWriter, Path dst) 255 throws IOException { 256 long dstMinLogSeqNum = -1L; 257 try (WALStreamReader reader = 258 walSplitter.getWalFactory().createStreamReader(walSplitter.walFS, dst)) { 259 WAL.Entry entry = reader.next(); 260 if (entry != null) { 261 dstMinLogSeqNum = entry.getKey().getSequenceId(); 262 } 263 } catch (EOFException e) { 264 LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst, 265 e); 266 } 267 return editsWriter.minLogSeqNum < dstMinLogSeqNum; 268 } 269 270 /** 271 * Private data structure that wraps a {@link WALProvider.Writer} and its Path, also collecting 272 * statistics about the data written to this output. 273 */ 274 final class RecoveredEditsWriter { 275 /* Count of edits written to this path */ 276 long editsWritten = 0; 277 /* Count of edits skipped to this path */ 278 long editsSkipped = 0; 279 /* Number of nanos spent writing to this log */ 280 long nanosSpent = 0; 281 282 final byte[] encodedRegionName; 283 final Path path; 284 final WALProvider.Writer writer; 285 final long minLogSeqNum; 286 287 RecoveredEditsWriter(byte[] encodedRegionName, Path path, WALProvider.Writer writer, 288 long minLogSeqNum) { 289 this.encodedRegionName = encodedRegionName; 290 this.path = path; 291 this.writer = writer; 292 this.minLogSeqNum = minLogSeqNum; 293 } 294 295 private void incrementEdits(int edits) { 296 editsWritten += edits; 297 } 298 299 private void incrementSkippedEdits(int skipped) { 300 editsSkipped += skipped; 301 totalSkippedEdits.addAndGet(skipped); 302 } 303 304 private void incrementNanoTime(long nanos) { 305 nanosSpent += nanos; 306 } 307 308 void writeRegionEntries(List<WAL.Entry> entries) throws IOException { 309 long startTime = System.nanoTime(); 310 int editsCount = 0; 311 for (WAL.Entry logEntry : entries) { 312 filterCellByStore(logEntry); 313 if (!logEntry.getEdit().isEmpty()) { 314 try { 315 writer.append(logEntry); 316 } catch (IOException e) { 317 logAndThrowWriterAppendFailure(logEntry, e); 318 } 319 updateRegionMaximumEditLogSeqNum(logEntry); 320 editsCount++; 321 } else { 322 incrementSkippedEdits(1); 323 } 324 } 325 // Pass along summary statistics 326 incrementEdits(editsCount); 327 incrementNanoTime(System.nanoTime() - startTime); 328 } 329 330 private void logAndThrowWriterAppendFailure(WAL.Entry logEntry, IOException e) 331 throws IOException { 332 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 333 final String errorMsg = "Failed to write log entry " + logEntry.toString() + " to log"; 334 LOG.error(HBaseMarkers.FATAL, errorMsg, e); 335 updateStatusWithMsg(errorMsg); 336 throw e; 337 } 338 339 private void filterCellByStore(WAL.Entry logEntry) { 340 Map<byte[], Long> maxSeqIdInStores = walSplitter.getRegionMaxSeqIdInStores() 341 .get(Bytes.toString(logEntry.getKey().getEncodedRegionName())); 342 if (MapUtils.isEmpty(maxSeqIdInStores)) { 343 return; 344 } 345 // Create the array list for the cells that aren't filtered. 346 // We make the assumption that most cells will be kept. 347 ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size()); 348 for (Cell cell : logEntry.getEdit().getCells()) { 349 if (WALEdit.isMetaEditFamily(cell)) { 350 keptCells.add(cell); 351 } else { 352 byte[] family = CellUtil.cloneFamily(cell); 353 Long maxSeqId = maxSeqIdInStores.get(family); 354 // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade, 355 // or the master was crashed before and we can not get the information. 356 if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) { 357 keptCells.add(cell); 358 } 359 } 360 } 361 362 // Anything in the keptCells array list is still live. 363 // So rather than removing the cells from the array list 364 // which would be an O(n^2) operation, we just replace the list 365 logEntry.getEdit().setCells(keptCells); 366 } 367 } 368}