001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.wal.WALSplitUtil.getCompletedRecoveredEditsFilePath; 021import static org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath; 022 023import java.io.EOFException; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellUtil; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.log.HBaseMarkers; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.ipc.RemoteException; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; 043 044@InterfaceAudience.Private 045abstract class AbstractRecoveredEditsOutputSink extends OutputSink { 046 private static final Logger LOG = LoggerFactory.getLogger(RecoveredEditsOutputSink.class); 047 private final WALSplitter walSplitter; 048 private final ConcurrentMap<String, Long> regionMaximumEditLogSeqNum = new ConcurrentHashMap<>(); 049 050 public AbstractRecoveredEditsOutputSink(WALSplitter walSplitter, 051 WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { 052 super(controller, entryBuffers, numWriters); 053 this.walSplitter = walSplitter; 054 } 055 056 /** 057 * @return a writer that wraps a {@link WALProvider.Writer} and its Path. Caller should close. 058 */ 059 protected RecoveredEditsWriter createRecoveredEditsWriter(TableName tableName, byte[] region, 060 long seqId) throws IOException { 061 Path regionEditsPath = getRegionSplitEditsPath(tableName, region, seqId, 062 walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.getTmpDirName(), 063 walSplitter.conf); 064 if (walSplitter.walFS.exists(regionEditsPath)) { 065 LOG.warn("Found old edits file. It could be the " + 066 "result of a previous failed split attempt. Deleting " + regionEditsPath + ", length=" + 067 walSplitter.walFS.getFileStatus(regionEditsPath).getLen()); 068 if (!walSplitter.walFS.delete(regionEditsPath, false)) { 069 LOG.warn("Failed delete of old {}", regionEditsPath); 070 } 071 } 072 WALProvider.Writer w = walSplitter.createWriter(regionEditsPath); 073 final String msg = "Creating recovered edits writer path=" + regionEditsPath; 074 LOG.info(msg); 075 updateStatusWithMsg(msg); 076 return new RecoveredEditsWriter(region, regionEditsPath, w, seqId); 077 } 078 079 protected Path closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter, 080 List<IOException> thrown) throws IOException { 081 try { 082 editsWriter.writer.close(); 083 } catch (IOException ioe) { 084 final String errorMsg = "Could not close recovered edits at " + editsWriter.path; 085 LOG.error(errorMsg, ioe); 086 updateStatusWithMsg(errorMsg); 087 thrown.add(ioe); 088 return null; 089 } 090 final String msg = "Closed recovered edits writer path=" + editsWriter.path + " (wrote " 091 + editsWriter.editsWritten + " edits, skipped " + editsWriter.editsSkipped + " edits in " + ( 092 editsWriter.nanosSpent / 1000 / 1000) + " ms)"; 093 LOG.info(msg); 094 updateStatusWithMsg(msg); 095 if (editsWriter.editsWritten == 0) { 096 // just remove the empty recovered.edits file 097 if (walSplitter.walFS.exists(editsWriter.path) 098 && !walSplitter.walFS.delete(editsWriter.path, false)) { 099 final String errorMsg = "Failed deleting empty " + editsWriter.path; 100 LOG.warn(errorMsg); 101 updateStatusWithMsg(errorMsg); 102 throw new IOException("Failed deleting empty " + editsWriter.path); 103 } 104 return null; 105 } 106 107 Path dst = getCompletedRecoveredEditsFilePath(editsWriter.path, 108 regionMaximumEditLogSeqNum.get(Bytes.toString(editsWriter.encodedRegionName))); 109 try { 110 if (!dst.equals(editsWriter.path) && walSplitter.walFS.exists(dst)) { 111 deleteOneWithFewerEntries(editsWriter, dst); 112 } 113 // Skip the unit tests which create a splitter that reads and 114 // writes the data without touching disk. 115 // TestHLogSplit#testThreading is an example. 116 if (walSplitter.walFS.exists(editsWriter.path)) { 117 if (!walSplitter.walFS.rename(editsWriter.path, dst)) { 118 final String errorMsg = 119 "Failed renaming recovered edits " + editsWriter.path + " to " + dst; 120 updateStatusWithMsg(errorMsg); 121 throw new IOException(errorMsg); 122 } 123 final String renameEditMsg = "Rename recovered edits " + editsWriter.path + " to " + dst; 124 LOG.info(renameEditMsg); 125 updateStatusWithMsg(renameEditMsg); 126 } 127 } catch (IOException ioe) { 128 final String errorMsg = "Could not rename recovered edits " + editsWriter.path 129 + " to " + dst; 130 LOG.error(errorMsg, ioe); 131 updateStatusWithMsg(errorMsg); 132 thrown.add(ioe); 133 return null; 134 } 135 return dst; 136 } 137 138 @Override 139 public boolean keepRegionEvent(WAL.Entry entry) { 140 ArrayList<Cell> cells = entry.getEdit().getCells(); 141 for (Cell cell : cells) { 142 if (WALEdit.isCompactionMarker(cell)) { 143 return true; 144 } 145 } 146 return false; 147 } 148 149 /** 150 * Update region's maximum edit log SeqNum. 151 */ 152 void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) { 153 synchronized (regionMaximumEditLogSeqNum) { 154 String regionName = Bytes.toString(entry.getKey().getEncodedRegionName()); 155 Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName); 156 if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) { 157 regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId()); 158 } 159 } 160 } 161 162 // delete the one with fewer wal entries 163 private void deleteOneWithFewerEntries(RecoveredEditsWriter editsWriter, Path dst) 164 throws IOException { 165 long dstMinLogSeqNum = -1L; 166 try (WAL.Reader reader = walSplitter.getWalFactory().createReader(walSplitter.walFS, dst)) { 167 WAL.Entry entry = reader.next(); 168 if (entry != null) { 169 dstMinLogSeqNum = entry.getKey().getSequenceId(); 170 } 171 } catch (EOFException e) { 172 LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst, 173 e); 174 } 175 if (editsWriter.minLogSeqNum < dstMinLogSeqNum) { 176 LOG.warn("Found existing old edits file. It could be the result of a previous failed" + 177 " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" + 178 walSplitter.walFS.getFileStatus(dst).getLen()); 179 if (!walSplitter.walFS.delete(dst, false)) { 180 LOG.warn("Failed deleting of old {}", dst); 181 throw new IOException("Failed deleting of old " + dst); 182 } 183 } else { 184 LOG.warn( 185 "Found existing old edits file and we have less entries. Deleting " + editsWriter.path + 186 ", length=" + walSplitter.walFS.getFileStatus(editsWriter.path).getLen()); 187 if (!walSplitter.walFS.delete(editsWriter.path, false)) { 188 LOG.warn("Failed deleting of {}", editsWriter.path); 189 throw new IOException("Failed deleting of " + editsWriter.path); 190 } 191 } 192 } 193 194 /** 195 * Private data structure that wraps a {@link WALProvider.Writer} and its Path, also collecting 196 * statistics about the data written to this output. 197 */ 198 final class RecoveredEditsWriter { 199 /* Count of edits written to this path */ 200 long editsWritten = 0; 201 /* Count of edits skipped to this path */ 202 long editsSkipped = 0; 203 /* Number of nanos spent writing to this log */ 204 long nanosSpent = 0; 205 206 final byte[] encodedRegionName; 207 final Path path; 208 final WALProvider.Writer writer; 209 final long minLogSeqNum; 210 211 RecoveredEditsWriter(byte[] encodedRegionName, Path path, WALProvider.Writer writer, 212 long minLogSeqNum) { 213 this.encodedRegionName = encodedRegionName; 214 this.path = path; 215 this.writer = writer; 216 this.minLogSeqNum = minLogSeqNum; 217 } 218 219 private void incrementEdits(int edits) { 220 editsWritten += edits; 221 } 222 223 private void incrementSkippedEdits(int skipped) { 224 editsSkipped += skipped; 225 totalSkippedEdits.addAndGet(skipped); 226 } 227 228 private void incrementNanoTime(long nanos) { 229 nanosSpent += nanos; 230 } 231 232 void writeRegionEntries(List<WAL.Entry> entries) throws IOException { 233 long startTime = System.nanoTime(); 234 int editsCount = 0; 235 for (WAL.Entry logEntry : entries) { 236 filterCellByStore(logEntry); 237 if (!logEntry.getEdit().isEmpty()) { 238 try { 239 writer.append(logEntry); 240 } catch (IOException e) { 241 logAndThrowWriterAppendFailure(logEntry, e); 242 } 243 updateRegionMaximumEditLogSeqNum(logEntry); 244 editsCount++; 245 } else { 246 incrementSkippedEdits(1); 247 } 248 } 249 // Pass along summary statistics 250 incrementEdits(editsCount); 251 incrementNanoTime(System.nanoTime() - startTime); 252 } 253 254 private void logAndThrowWriterAppendFailure(WAL.Entry logEntry, IOException e) 255 throws IOException { 256 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 257 final String errorMsg = "Failed to write log entry " + logEntry.toString() + " to log"; 258 LOG.error(HBaseMarkers.FATAL, errorMsg, e); 259 updateStatusWithMsg(errorMsg); 260 throw e; 261 } 262 263 private void filterCellByStore(WAL.Entry logEntry) { 264 Map<byte[], Long> maxSeqIdInStores = walSplitter.getRegionMaxSeqIdInStores() 265 .get(Bytes.toString(logEntry.getKey().getEncodedRegionName())); 266 if (MapUtils.isEmpty(maxSeqIdInStores)) { 267 return; 268 } 269 // Create the array list for the cells that aren't filtered. 270 // We make the assumption that most cells will be kept. 271 ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size()); 272 for (Cell cell : logEntry.getEdit().getCells()) { 273 if (WALEdit.isMetaEditFamily(cell)) { 274 keptCells.add(cell); 275 } else { 276 byte[] family = CellUtil.cloneFamily(cell); 277 Long maxSeqId = maxSeqIdInStores.get(family); 278 // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade, 279 // or the master was crashed before and we can not get the information. 280 if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) { 281 keptCells.add(cell); 282 } 283 } 284 } 285 286 // Anything in the keptCells array list is still live. 287 // So rather than removing the cells from the array list 288 // which would be an O(n^2) operation, we just replace the list 289 logEntry.getEdit().setCells(keptCells); 290 } 291 } 292}