001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.List; 023import java.util.Map; 024import java.util.TreeMap; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.Future; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.io.MultipleIOException; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 038 039/** 040 * Class that manages the output streams from the log splitting process. Every region only has one 041 * recovered edits file PER split WAL (if we split multiple WALs during a log-splitting session, on 042 * open, a Region may have multiple recovered.edits files to replay -- one per split WAL). 043 * @see BoundedRecoveredEditsOutputSink which is like this class but imposes upper bound on the 044 * number of writers active at one time (makes for better throughput). 045 */ 046@InterfaceAudience.Private 047class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink { 048 private static final Logger LOG = LoggerFactory.getLogger(RecoveredEditsOutputSink.class); 049 private ConcurrentMap<String, RecoveredEditsWriter> writers = new ConcurrentHashMap<>(); 050 051 public RecoveredEditsOutputSink(WALSplitter walSplitter, 052 WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { 053 super(walSplitter, controller, entryBuffers, numWriters); 054 } 055 056 @Override 057 public void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException { 058 List<WAL.Entry> entries = buffer.entryBuffer; 059 if (entries.isEmpty()) { 060 LOG.warn("got an empty buffer, skipping"); 061 return; 062 } 063 RecoveredEditsWriter writer = getRecoveredEditsWriter(buffer.tableName, 064 buffer.encodedRegionName, entries.get(0).getKey().getSequenceId()); 065 if (writer != null) { 066 writer.writeRegionEntries(entries); 067 } 068 } 069 070 /** 071 * Get a writer and path for a log starting at the given entry. This function is threadsafe so 072 * long as multiple threads are always acting on different regions. 073 * @return null if this region shouldn't output any logs 074 */ 075 private RecoveredEditsWriter getRecoveredEditsWriter(TableName tableName, byte[] region, 076 long seqId) throws IOException { 077 RecoveredEditsWriter ret = writers.get(Bytes.toString(region)); 078 if (ret != null) { 079 return ret; 080 } 081 ret = createRecoveredEditsWriter(tableName, region, seqId); 082 if (ret == null) { 083 return null; 084 } 085 LOG.trace("Created {}", ret.path); 086 writers.put(Bytes.toString(region), ret); 087 return ret; 088 } 089 090 @Override 091 public List<Path> close() throws IOException { 092 boolean isSuccessful; 093 try { 094 isSuccessful = finishWriterThreads(); 095 } catch (IOException e) { 096 closeWriters(false); 097 throw e; 098 } 099 if (!isSuccessful) { 100 // Even if an exception is not thrown, finishWriterThreads() not being successful is an 101 // error case where the WAL files should not be finalized. 102 closeWriters(false); 103 return null; 104 } 105 isSuccessful = closeWriters(true); 106 return isSuccessful ? splits : null; 107 } 108 109 /** 110 * Close all the output streams. 111 * @param finalizeEdits true in the successful close case, false when we don't want to rename and 112 * finalize the temporary, possibly corrupted WAL files, such as when there 113 * was a previous failure or exception. Please see HBASE-28569. 114 * @return true when there is no error. 115 */ 116 boolean closeWriters(boolean finalizeEdits) throws IOException { 117 List<IOException> thrown = Lists.newArrayList(); 118 for (RecoveredEditsWriter writer : writers.values()) { 119 closeCompletionService.submit(() -> { 120 if (!finalizeEdits) { 121 abortRecoveredEditsWriter(writer, thrown); 122 LOG.trace("Aborted edits at {}", writer.path); 123 return null; 124 } 125 Path dst = closeRecoveredEditsWriterAndFinalizeEdits(writer, thrown); 126 LOG.trace("Closed {}", dst); 127 splits.add(dst); 128 return null; 129 }); 130 } 131 boolean progressFailed = false; 132 try { 133 for (int i = 0, n = this.writers.size(); i < n; i++) { 134 Future<Void> future = closeCompletionService.take(); 135 future.get(); 136 if (!progressFailed && reporter != null && !reporter.progress()) { 137 progressFailed = true; 138 } 139 } 140 } catch (InterruptedException e) { 141 IOException iie = new InterruptedIOException(); 142 iie.initCause(e); 143 throw iie; 144 } catch (ExecutionException e) { 145 throw new IOException(e.getCause()); 146 } finally { 147 closeThreadPool.shutdownNow(); 148 } 149 if (!thrown.isEmpty()) { 150 throw MultipleIOException.createIOException(thrown); 151 } 152 return !progressFailed; 153 } 154 155 @Override 156 public Map<String, Long> getOutputCounts() { 157 TreeMap<String, Long> ret = new TreeMap<>(); 158 for (Map.Entry<String, RecoveredEditsWriter> entry : writers.entrySet()) { 159 ret.put(entry.getKey(), entry.getValue().editsWritten); 160 } 161 return ret; 162 } 163 164 @Override 165 public int getNumberOfRecoveredRegions() { 166 return writers.size(); 167 } 168 169 @Override 170 int getNumOpenWriters() { 171 return writers.size(); 172 } 173}