001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.List; 023import java.util.Map; 024import java.util.TreeMap; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.Future; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.monitoring.MonitoredTask; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.hadoop.io.MultipleIOException; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 038 039/** 040 * Class that manages the output streams from the log splitting process. 041 * Every region only has one recovered edits file PER split WAL (if we split 042 * multiple WALs during a log-splitting session, on open, a Region may 043 * have multiple recovered.edits files to replay -- one per split WAL). 044 * @see BoundedRecoveredEditsOutputSink which is like this class but imposes upper bound on 045 * the number of writers active at one time (makes for better throughput). 046 */ 047@InterfaceAudience.Private 048class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink { 049 private static final Logger LOG = LoggerFactory.getLogger(RecoveredEditsOutputSink.class); 050 private ConcurrentMap<String, RecoveredEditsWriter> writers = new ConcurrentHashMap<>(); 051 052 public RecoveredEditsOutputSink(WALSplitter walSplitter, 053 WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { 054 super(walSplitter, controller, entryBuffers, numWriters); 055 } 056 057 @Override 058 public void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException { 059 List<WAL.Entry> entries = buffer.entries; 060 if (entries.isEmpty()) { 061 LOG.warn("got an empty buffer, skipping"); 062 return; 063 } 064 RecoveredEditsWriter writer = 065 getRecoveredEditsWriter(buffer.tableName, buffer.encodedRegionName, 066 entries.get(0).getKey().getSequenceId()); 067 if (writer != null) { 068 writer.writeRegionEntries(entries); 069 } 070 } 071 072 /** 073 * Get a writer and path for a log starting at the given entry. This function is threadsafe so 074 * long as multiple threads are always acting on different regions. 075 * @return null if this region shouldn't output any logs 076 */ 077 private RecoveredEditsWriter getRecoveredEditsWriter(TableName tableName, byte[] region, 078 long seqId) throws IOException { 079 RecoveredEditsWriter ret = writers.get(Bytes.toString(region)); 080 if (ret != null) { 081 return ret; 082 } 083 ret = createRecoveredEditsWriter(tableName, region, seqId); 084 if (ret == null) { 085 return null; 086 } 087 LOG.trace("Created {}", ret.path); 088 writers.put(Bytes.toString(region), ret); 089 return ret; 090 } 091 092 @Override 093 public List<Path> close() throws IOException { 094 boolean isSuccessful = true; 095 try { 096 isSuccessful = finishWriterThreads(false); 097 } finally { 098 isSuccessful &= closeWriters(); 099 } 100 return isSuccessful ? splits : null; 101 } 102 103 /** 104 * Close all of the output streams. 105 * 106 * @return true when there is no error. 107 */ 108 private boolean closeWriters() throws IOException { 109 List<IOException> thrown = Lists.newArrayList(); 110 for (RecoveredEditsWriter writer : writers.values()) { 111 closeCompletionService.submit(() -> { 112 Path dst = closeRecoveredEditsWriter(writer, thrown); 113 LOG.trace("Closed {}", dst); 114 splits.add(dst); 115 return null; 116 }); 117 } 118 boolean progressFailed = false; 119 try { 120 for (int i = 0, n = this.writers.size(); i < n; i++) { 121 Future<Void> future = closeCompletionService.take(); 122 future.get(); 123 if (!progressFailed && reporter != null && !reporter.progress()) { 124 progressFailed = true; 125 } 126 } 127 } catch (InterruptedException e) { 128 IOException iie = new InterruptedIOException(); 129 iie.initCause(e); 130 throw iie; 131 } catch (ExecutionException e) { 132 throw new IOException(e.getCause()); 133 } finally { 134 closeThreadPool.shutdownNow(); 135 } 136 if (!thrown.isEmpty()) { 137 throw MultipleIOException.createIOException(thrown); 138 } 139 return !progressFailed; 140 } 141 142 @Override 143 public Map<String, Long> getOutputCounts() { 144 TreeMap<String, Long> ret = new TreeMap<>(); 145 for (Map.Entry<String, RecoveredEditsWriter> entry : writers.entrySet()) { 146 ret.put(entry.getKey(), entry.getValue().editsWritten); 147 } 148 return ret; 149 } 150 151 @Override 152 public int getNumberOfRecoveredRegions() { 153 return writers.size(); 154 } 155 156 @Override 157 public int getNumOpenWriters() { 158 return writers.size(); 159 } 160}