001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.List; 023import java.util.Map; 024import java.util.TreeMap; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.Future; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.io.MultipleIOException; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 038 039/** 040 * Class that manages the output streams from the log splitting process. Every region only has one 041 * recovered edits file PER split WAL (if we split multiple WALs during a log-splitting session, on 042 * open, a Region may have multiple recovered.edits files to replay -- one per split WAL). 043 * @see BoundedRecoveredEditsOutputSink which is like this class but imposes upper bound on the 044 * number of writers active at one time (makes for better throughput). 045 */ 046@InterfaceAudience.Private 047class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink { 048 private static final Logger LOG = LoggerFactory.getLogger(RecoveredEditsOutputSink.class); 049 private ConcurrentMap<String, RecoveredEditsWriter> writers = new ConcurrentHashMap<>(); 050 051 public RecoveredEditsOutputSink(WALSplitter walSplitter, 052 WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { 053 super(walSplitter, controller, entryBuffers, numWriters); 054 } 055 056 @Override 057 public void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException { 058 List<WAL.Entry> entries = buffer.entries; 059 if (entries.isEmpty()) { 060 LOG.warn("got an empty buffer, skipping"); 061 return; 062 } 063 RecoveredEditsWriter writer = getRecoveredEditsWriter(buffer.tableName, 064 buffer.encodedRegionName, entries.get(0).getKey().getSequenceId()); 065 if (writer != null) { 066 writer.writeRegionEntries(entries); 067 } 068 } 069 070 /** 071 * Get a writer and path for a log starting at the given entry. This function is threadsafe so 072 * long as multiple threads are always acting on different regions. 073 * @return null if this region shouldn't output any logs 074 */ 075 private RecoveredEditsWriter getRecoveredEditsWriter(TableName tableName, byte[] region, 076 long seqId) throws IOException { 077 RecoveredEditsWriter ret = writers.get(Bytes.toString(region)); 078 if (ret != null) { 079 return ret; 080 } 081 ret = createRecoveredEditsWriter(tableName, region, seqId); 082 if (ret == null) { 083 return null; 084 } 085 LOG.trace("Created {}", ret.path); 086 writers.put(Bytes.toString(region), ret); 087 return ret; 088 } 089 090 @Override 091 public List<Path> close() throws IOException { 092 boolean isSuccessful = true; 093 try { 094 isSuccessful = finishWriterThreads(false); 095 } finally { 096 isSuccessful &= closeWriters(); 097 } 098 return isSuccessful ? splits : null; 099 } 100 101 /** 102 * Close all of the output streams. 103 * @return true when there is no error. 104 */ 105 private boolean closeWriters() throws IOException { 106 List<IOException> thrown = Lists.newArrayList(); 107 for (RecoveredEditsWriter writer : writers.values()) { 108 closeCompletionService.submit(() -> { 109 Path dst = closeRecoveredEditsWriter(writer, thrown); 110 LOG.trace("Closed {}", dst); 111 splits.add(dst); 112 return null; 113 }); 114 } 115 boolean progressFailed = false; 116 try { 117 for (int i = 0, n = this.writers.size(); i < n; i++) { 118 Future<Void> future = closeCompletionService.take(); 119 future.get(); 120 if (!progressFailed && reporter != null && !reporter.progress()) { 121 progressFailed = true; 122 } 123 } 124 } catch (InterruptedException e) { 125 IOException iie = new InterruptedIOException(); 126 iie.initCause(e); 127 throw iie; 128 } catch (ExecutionException e) { 129 throw new IOException(e.getCause()); 130 } finally { 131 closeThreadPool.shutdownNow(); 132 } 133 if (!thrown.isEmpty()) { 134 throw MultipleIOException.createIOException(thrown); 135 } 136 return !progressFailed; 137 } 138 139 @Override 140 public Map<String, Long> getOutputCounts() { 141 TreeMap<String, Long> ret = new TreeMap<>(); 142 for (Map.Entry<String, RecoveredEditsWriter> entry : writers.entrySet()) { 143 ret.put(entry.getKey(), entry.getValue().editsWritten); 144 } 145 return ret; 146 } 147 148 @Override 149 public int getNumberOfRecoveredRegions() { 150 return writers.size(); 151 } 152 153 @Override 154 public int getNumOpenWriters() { 155 return writers.size(); 156 } 157}