001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.wal.WALSplitUtil.getCompletedRecoveredEditsFilePath; 021import static org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath; 022 023import java.io.EOFException; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellUtil; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.log.HBaseMarkers; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.ipc.RemoteException; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; 042 043@InterfaceAudience.Private 044abstract class AbstractRecoveredEditsOutputSink extends OutputSink { 045 private static final Logger LOG = LoggerFactory.getLogger(RecoveredEditsOutputSink.class); 046 private final WALSplitter walSplitter; 047 private final ConcurrentMap<String, Long> regionMaximumEditLogSeqNum = new ConcurrentHashMap<>(); 048 049 public AbstractRecoveredEditsOutputSink(WALSplitter walSplitter, 050 WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { 051 super(controller, entryBuffers, numWriters); 052 this.walSplitter = walSplitter; 053 } 054 055 /** Returns a writer that wraps a {@link WALProvider.Writer} and its Path. Caller should close. */ 056 protected RecoveredEditsWriter createRecoveredEditsWriter(TableName tableName, byte[] region, 057 long seqId) throws IOException { 058 Path regionEditsPath = getRegionSplitEditsPath(tableName, region, seqId, 059 walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.getTmpDirName(), 060 walSplitter.conf); 061 if (walSplitter.walFS.exists(regionEditsPath)) { 062 LOG.warn("Found old edits file. It could be the " 063 + "result of a previous failed split attempt. Deleting " + regionEditsPath + ", length=" 064 + walSplitter.walFS.getFileStatus(regionEditsPath).getLen()); 065 if (!walSplitter.walFS.delete(regionEditsPath, false)) { 066 LOG.warn("Failed delete of old {}", regionEditsPath); 067 } 068 } 069 WALProvider.Writer w = walSplitter.createWriter(regionEditsPath); 070 final String msg = "Creating recovered edits writer path=" + regionEditsPath; 071 LOG.info(msg); 072 updateStatusWithMsg(msg); 073 return new RecoveredEditsWriter(region, regionEditsPath, w, seqId); 074 } 075 076 protected Path closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter, 077 List<IOException> thrown) throws IOException { 078 try { 079 editsWriter.writer.close(); 080 } catch (IOException ioe) { 081 final String errorMsg = "Could not close recovered edits at " + editsWriter.path; 082 LOG.error(errorMsg, ioe); 083 updateStatusWithMsg(errorMsg); 084 thrown.add(ioe); 085 return null; 086 } 087 final String msg = "Closed recovered edits writer path=" + editsWriter.path + " (wrote " 088 + editsWriter.editsWritten + " edits, skipped " + editsWriter.editsSkipped + " edits in " 089 + (editsWriter.nanosSpent / 1000 / 1000) + " ms)"; 090 LOG.info(msg); 091 updateStatusWithMsg(msg); 092 if (editsWriter.editsWritten == 0) { 093 // just remove the empty recovered.edits file 094 if ( 095 walSplitter.walFS.exists(editsWriter.path) 096 && !walSplitter.walFS.delete(editsWriter.path, false) 097 ) { 098 final String errorMsg = "Failed deleting empty " + editsWriter.path; 099 LOG.warn(errorMsg); 100 updateStatusWithMsg(errorMsg); 101 throw new IOException("Failed deleting empty " + editsWriter.path); 102 } 103 return null; 104 } 105 106 Path dst = getCompletedRecoveredEditsFilePath(editsWriter.path, 107 regionMaximumEditLogSeqNum.get(Bytes.toString(editsWriter.encodedRegionName))); 108 try { 109 if (!dst.equals(editsWriter.path) && walSplitter.walFS.exists(dst)) { 110 deleteOneWithFewerEntries(editsWriter, dst); 111 } 112 // Skip the unit tests which create a splitter that reads and 113 // writes the data without touching disk. 114 // TestHLogSplit#testThreading is an example. 115 if (walSplitter.walFS.exists(editsWriter.path)) { 116 if (!walSplitter.walFS.rename(editsWriter.path, dst)) { 117 final String errorMsg = 118 "Failed renaming recovered edits " + editsWriter.path + " to " + dst; 119 updateStatusWithMsg(errorMsg); 120 throw new IOException(errorMsg); 121 } 122 final String renameEditMsg = "Rename recovered edits " + editsWriter.path + " to " + dst; 123 LOG.info(renameEditMsg); 124 updateStatusWithMsg(renameEditMsg); 125 } 126 } catch (IOException ioe) { 127 final String errorMsg = "Could not rename recovered edits " + editsWriter.path + " to " + dst; 128 LOG.error(errorMsg, ioe); 129 updateStatusWithMsg(errorMsg); 130 thrown.add(ioe); 131 return null; 132 } 133 return dst; 134 } 135 136 @Override 137 public boolean keepRegionEvent(WAL.Entry entry) { 138 ArrayList<Cell> cells = entry.getEdit().getCells(); 139 for (Cell cell : cells) { 140 if (WALEdit.isCompactionMarker(cell)) { 141 return true; 142 } 143 } 144 return false; 145 } 146 147 /** 148 * Update region's maximum edit log SeqNum. 149 */ 150 void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) { 151 synchronized (regionMaximumEditLogSeqNum) { 152 String regionName = Bytes.toString(entry.getKey().getEncodedRegionName()); 153 Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName); 154 if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) { 155 regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId()); 156 } 157 } 158 } 159 160 // delete the one with fewer wal entries 161 private void deleteOneWithFewerEntries(RecoveredEditsWriter editsWriter, Path dst) 162 throws IOException { 163 long dstMinLogSeqNum = -1L; 164 try (WAL.Reader reader = walSplitter.getWalFactory().createReader(walSplitter.walFS, dst)) { 165 WAL.Entry entry = reader.next(); 166 if (entry != null) { 167 dstMinLogSeqNum = entry.getKey().getSequenceId(); 168 } 169 } catch (EOFException e) { 170 LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst, 171 e); 172 } 173 if (editsWriter.minLogSeqNum < dstMinLogSeqNum) { 174 LOG.warn("Found existing old edits file. It could be the result of a previous failed" 175 + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" 176 + walSplitter.walFS.getFileStatus(dst).getLen()); 177 if (!walSplitter.walFS.delete(dst, false)) { 178 LOG.warn("Failed deleting of old {}", dst); 179 throw new IOException("Failed deleting of old " + dst); 180 } 181 } else { 182 LOG 183 .warn("Found existing old edits file and we have less entries. Deleting " + editsWriter.path 184 + ", length=" + walSplitter.walFS.getFileStatus(editsWriter.path).getLen()); 185 if (!walSplitter.walFS.delete(editsWriter.path, false)) { 186 LOG.warn("Failed deleting of {}", editsWriter.path); 187 throw new IOException("Failed deleting of " + editsWriter.path); 188 } 189 } 190 } 191 192 /** 193 * Private data structure that wraps a {@link WALProvider.Writer} and its Path, also collecting 194 * statistics about the data written to this output. 195 */ 196 final class RecoveredEditsWriter { 197 /* Count of edits written to this path */ 198 long editsWritten = 0; 199 /* Count of edits skipped to this path */ 200 long editsSkipped = 0; 201 /* Number of nanos spent writing to this log */ 202 long nanosSpent = 0; 203 204 final byte[] encodedRegionName; 205 final Path path; 206 final WALProvider.Writer writer; 207 final long minLogSeqNum; 208 209 RecoveredEditsWriter(byte[] encodedRegionName, Path path, WALProvider.Writer writer, 210 long minLogSeqNum) { 211 this.encodedRegionName = encodedRegionName; 212 this.path = path; 213 this.writer = writer; 214 this.minLogSeqNum = minLogSeqNum; 215 } 216 217 private void incrementEdits(int edits) { 218 editsWritten += edits; 219 } 220 221 private void incrementSkippedEdits(int skipped) { 222 editsSkipped += skipped; 223 totalSkippedEdits.addAndGet(skipped); 224 } 225 226 private void incrementNanoTime(long nanos) { 227 nanosSpent += nanos; 228 } 229 230 void writeRegionEntries(List<WAL.Entry> entries) throws IOException { 231 long startTime = System.nanoTime(); 232 int editsCount = 0; 233 for (WAL.Entry logEntry : entries) { 234 filterCellByStore(logEntry); 235 if (!logEntry.getEdit().isEmpty()) { 236 try { 237 writer.append(logEntry); 238 } catch (IOException e) { 239 logAndThrowWriterAppendFailure(logEntry, e); 240 } 241 updateRegionMaximumEditLogSeqNum(logEntry); 242 editsCount++; 243 } else { 244 incrementSkippedEdits(1); 245 } 246 } 247 // Pass along summary statistics 248 incrementEdits(editsCount); 249 incrementNanoTime(System.nanoTime() - startTime); 250 } 251 252 private void logAndThrowWriterAppendFailure(WAL.Entry logEntry, IOException e) 253 throws IOException { 254 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 255 final String errorMsg = "Failed to write log entry " + logEntry.toString() + " to log"; 256 LOG.error(HBaseMarkers.FATAL, errorMsg, e); 257 updateStatusWithMsg(errorMsg); 258 throw e; 259 } 260 261 private void filterCellByStore(WAL.Entry logEntry) { 262 Map<byte[], Long> maxSeqIdInStores = walSplitter.getRegionMaxSeqIdInStores() 263 .get(Bytes.toString(logEntry.getKey().getEncodedRegionName())); 264 if (MapUtils.isEmpty(maxSeqIdInStores)) { 265 return; 266 } 267 // Create the array list for the cells that aren't filtered. 268 // We make the assumption that most cells will be kept. 269 ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size()); 270 for (Cell cell : logEntry.getEdit().getCells()) { 271 if (WALEdit.isMetaEditFamily(cell)) { 272 keptCells.add(cell); 273 } else { 274 byte[] family = CellUtil.cloneFamily(cell); 275 Long maxSeqId = maxSeqIdInStores.get(family); 276 // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade, 277 // or the master was crashed before and we can not get the information. 278 if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) { 279 keptCells.add(cell); 280 } 281 } 282 } 283 284 // Anything in the keptCells array list is still live. 285 // So rather than removing the cells from the array list 286 // which would be an O(n^2) operation, we just replace the list 287 logEntry.getEdit().setCells(keptCells); 288 } 289 } 290}