001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.wal.WALSplitUtil.getCompletedRecoveredEditsFilePath; 021import static org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath; 022 023import java.io.EOFException; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellUtil; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.log.HBaseMarkers; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.ipc.RemoteException; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; 042 043@InterfaceAudience.Private 044abstract class AbstractRecoveredEditsOutputSink extends OutputSink { 045 private static final Logger LOG = LoggerFactory.getLogger(RecoveredEditsOutputSink.class); 046 private final WALSplitter walSplitter; 047 private final ConcurrentMap<String, Long> regionMaximumEditLogSeqNum = new ConcurrentHashMap<>(); 048 049 public AbstractRecoveredEditsOutputSink(WALSplitter walSplitter, 050 WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { 051 super(controller, entryBuffers, numWriters); 052 this.walSplitter = walSplitter; 053 } 054 055 /** Returns a writer that wraps a {@link WALProvider.Writer} and its Path. Caller should close. */ 056 protected RecoveredEditsWriter createRecoveredEditsWriter(TableName tableName, byte[] region, 057 long seqId) throws IOException { 058 Path regionEditsPath = getRegionSplitEditsPath(tableName, region, seqId, 059 walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.getTmpDirName(), 060 walSplitter.conf); 061 if (walSplitter.walFS.exists(regionEditsPath)) { 062 LOG.warn("Found old edits file. It could be the " 063 + "result of a previous failed split attempt. Deleting " + regionEditsPath + ", length=" 064 + walSplitter.walFS.getFileStatus(regionEditsPath).getLen()); 065 if (!walSplitter.walFS.delete(regionEditsPath, false)) { 066 LOG.warn("Failed delete of old {}", regionEditsPath); 067 } 068 } 069 WALProvider.Writer w = walSplitter.createWriter(regionEditsPath); 070 final String msg = "Creating recovered edits writer path=" + regionEditsPath; 071 LOG.info(msg); 072 updateStatusWithMsg(msg); 073 return new RecoveredEditsWriter(region, regionEditsPath, w, seqId); 074 } 075 076 protected Path closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter, 077 List<IOException> thrown) throws IOException { 078 try { 079 editsWriter.writer.close(); 080 } catch (IOException ioe) { 081 final String errorMsg = "Could not close recovered edits at " + editsWriter.path; 082 LOG.error(errorMsg, ioe); 083 updateStatusWithMsg(errorMsg); 084 thrown.add(ioe); 085 return null; 086 } 087 final String msg = "Closed recovered edits writer path=" + editsWriter.path + " (wrote " 088 + editsWriter.editsWritten + " edits, skipped " + editsWriter.editsSkipped + " edits in " 089 + (editsWriter.nanosSpent / 1000 / 1000) + " ms)"; 090 LOG.info(msg); 091 updateStatusWithMsg(msg); 092 if (editsWriter.editsWritten == 0) { 093 // just remove the empty recovered.edits file 094 if ( 095 walSplitter.walFS.exists(editsWriter.path) 096 && !walSplitter.walFS.delete(editsWriter.path, false) 097 ) { 098 final String errorMsg = "Failed deleting empty " + editsWriter.path; 099 LOG.warn(errorMsg); 100 updateStatusWithMsg(errorMsg); 101 throw new IOException("Failed deleting empty " + editsWriter.path); 102 } 103 return null; 104 } 105 106 Path dst = getCompletedRecoveredEditsFilePath(editsWriter.path, 107 regionMaximumEditLogSeqNum.get(Bytes.toString(editsWriter.encodedRegionName))); 108 try { 109 if (!dst.equals(editsWriter.path) && walSplitter.walFS.exists(dst)) { 110 deleteOneWithFewerEntries(editsWriter, dst); 111 } 112 // Skip the unit tests which create a splitter that reads and 113 // writes the data without touching disk. 114 // TestHLogSplit#testThreading is an example. 115 if (walSplitter.walFS.exists(editsWriter.path)) { 116 if (!walSplitter.walFS.rename(editsWriter.path, dst)) { 117 final String errorMsg = 118 "Failed renaming recovered edits " + editsWriter.path + " to " + dst; 119 updateStatusWithMsg(errorMsg); 120 throw new IOException(errorMsg); 121 } 122 final String renameEditMsg = "Rename recovered edits " + editsWriter.path + " to " + dst; 123 LOG.info(renameEditMsg); 124 updateStatusWithMsg(renameEditMsg); 125 } 126 } catch (IOException ioe) { 127 final String errorMsg = "Could not rename recovered edits " + editsWriter.path + " to " + dst; 128 LOG.error(errorMsg, ioe); 129 updateStatusWithMsg(errorMsg); 130 thrown.add(ioe); 131 return null; 132 } 133 return dst; 134 } 135 136 @Override 137 public boolean keepRegionEvent(WAL.Entry entry) { 138 ArrayList<Cell> cells = entry.getEdit().getCells(); 139 for (Cell cell : cells) { 140 if (WALEdit.isCompactionMarker(cell)) { 141 return true; 142 } 143 } 144 return false; 145 } 146 147 /** 148 * Update region's maximum edit log SeqNum. 149 */ 150 void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) { 151 synchronized (regionMaximumEditLogSeqNum) { 152 String regionName = Bytes.toString(entry.getKey().getEncodedRegionName()); 153 Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName); 154 if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) { 155 regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId()); 156 } 157 } 158 } 159 160 // delete the one with fewer wal entries 161 private void deleteOneWithFewerEntries(RecoveredEditsWriter editsWriter, Path dst) 162 throws IOException { 163 long dstMinLogSeqNum = -1L; 164 try (WALStreamReader reader = 165 walSplitter.getWalFactory().createStreamReader(walSplitter.walFS, dst)) { 166 WAL.Entry entry = reader.next(); 167 if (entry != null) { 168 dstMinLogSeqNum = entry.getKey().getSequenceId(); 169 } 170 } catch (EOFException e) { 171 LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst, 172 e); 173 } 174 if (editsWriter.minLogSeqNum < dstMinLogSeqNum) { 175 LOG.warn("Found existing old edits file. It could be the result of a previous failed" 176 + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" 177 + walSplitter.walFS.getFileStatus(dst).getLen()); 178 if (!walSplitter.walFS.delete(dst, false)) { 179 LOG.warn("Failed deleting of old {}", dst); 180 throw new IOException("Failed deleting of old " + dst); 181 } 182 } else { 183 LOG 184 .warn("Found existing old edits file and we have less entries. Deleting " + editsWriter.path 185 + ", length=" + walSplitter.walFS.getFileStatus(editsWriter.path).getLen()); 186 if (!walSplitter.walFS.delete(editsWriter.path, false)) { 187 LOG.warn("Failed deleting of {}", editsWriter.path); 188 throw new IOException("Failed deleting of " + editsWriter.path); 189 } 190 } 191 } 192 193 /** 194 * Private data structure that wraps a {@link WALProvider.Writer} and its Path, also collecting 195 * statistics about the data written to this output. 196 */ 197 final class RecoveredEditsWriter { 198 /* Count of edits written to this path */ 199 long editsWritten = 0; 200 /* Count of edits skipped to this path */ 201 long editsSkipped = 0; 202 /* Number of nanos spent writing to this log */ 203 long nanosSpent = 0; 204 205 final byte[] encodedRegionName; 206 final Path path; 207 final WALProvider.Writer writer; 208 final long minLogSeqNum; 209 210 RecoveredEditsWriter(byte[] encodedRegionName, Path path, WALProvider.Writer writer, 211 long minLogSeqNum) { 212 this.encodedRegionName = encodedRegionName; 213 this.path = path; 214 this.writer = writer; 215 this.minLogSeqNum = minLogSeqNum; 216 } 217 218 private void incrementEdits(int edits) { 219 editsWritten += edits; 220 } 221 222 private void incrementSkippedEdits(int skipped) { 223 editsSkipped += skipped; 224 totalSkippedEdits.addAndGet(skipped); 225 } 226 227 private void incrementNanoTime(long nanos) { 228 nanosSpent += nanos; 229 } 230 231 void writeRegionEntries(List<WAL.Entry> entries) throws IOException { 232 long startTime = System.nanoTime(); 233 int editsCount = 0; 234 for (WAL.Entry logEntry : entries) { 235 filterCellByStore(logEntry); 236 if (!logEntry.getEdit().isEmpty()) { 237 try { 238 writer.append(logEntry); 239 } catch (IOException e) { 240 logAndThrowWriterAppendFailure(logEntry, e); 241 } 242 updateRegionMaximumEditLogSeqNum(logEntry); 243 editsCount++; 244 } else { 245 incrementSkippedEdits(1); 246 } 247 } 248 // Pass along summary statistics 249 incrementEdits(editsCount); 250 incrementNanoTime(System.nanoTime() - startTime); 251 } 252 253 private void logAndThrowWriterAppendFailure(WAL.Entry logEntry, IOException e) 254 throws IOException { 255 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 256 final String errorMsg = "Failed to write log entry " + logEntry.toString() + " to log"; 257 LOG.error(HBaseMarkers.FATAL, errorMsg, e); 258 updateStatusWithMsg(errorMsg); 259 throw e; 260 } 261 262 private void filterCellByStore(WAL.Entry logEntry) { 263 Map<byte[], Long> maxSeqIdInStores = walSplitter.getRegionMaxSeqIdInStores() 264 .get(Bytes.toString(logEntry.getKey().getEncodedRegionName())); 265 if (MapUtils.isEmpty(maxSeqIdInStores)) { 266 return; 267 } 268 // Create the array list for the cells that aren't filtered. 269 // We make the assumption that most cells will be kept. 270 ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size()); 271 for (Cell cell : logEntry.getEdit().getCells()) { 272 if (WALEdit.isMetaEditFamily(cell)) { 273 keptCells.add(cell); 274 } else { 275 byte[] family = CellUtil.cloneFamily(cell); 276 Long maxSeqId = maxSeqIdInStores.get(family); 277 // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade, 278 // or the master was crashed before and we can not get the information. 279 if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) { 280 keptCells.add(cell); 281 } 282 } 283 } 284 285 // Anything in the keptCells array list is still live. 286 // So rather than removing the cells from the array list 287 // which would be an O(n^2) operation, we just replace the list 288 logEntry.getEdit().setCells(keptCells); 289 } 290 } 291}