001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.Future; 029import java.util.concurrent.atomic.AtomicInteger; 030 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.hadoop.io.MultipleIOException; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * Class that manages the output streams from the log splitting process. 040 * Every region may have many recovered edits file. But the opening writers is bounded. 041 * Bounded means the output streams will be no more than the size of threadpool. 042 */ 043@InterfaceAudience.Private 044class BoundedRecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink { 045 private static final Logger LOG = 046 LoggerFactory.getLogger(BoundedRecoveredEditsOutputSink.class); 047 048 // Since the splitting process may create multiple output files, we need a map 049 // to track the output count of each region. 050 private ConcurrentMap<String, Long> regionEditsWrittenMap = new ConcurrentHashMap<>(); 051 // Need a counter to track the opening writers. 052 private final AtomicInteger openingWritersNum = new AtomicInteger(0); 053 054 public BoundedRecoveredEditsOutputSink(WALSplitter walSplitter, 055 WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { 056 super(walSplitter, controller, entryBuffers, numWriters); 057 } 058 059 @Override 060 public void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException { 061 List<WAL.Entry> entries = buffer.entries; 062 if (entries.isEmpty()) { 063 LOG.warn("got an empty buffer, skipping"); 064 return; 065 } 066 // The key point is create a new writer, write edits then close writer. 067 RecoveredEditsWriter writer = 068 createRecoveredEditsWriter(buffer.tableName, buffer.encodedRegionName, 069 entries.get(0).getKey().getSequenceId()); 070 if (writer != null) { 071 openingWritersNum.incrementAndGet(); 072 writer.writeRegionEntries(entries); 073 regionEditsWrittenMap.compute(Bytes.toString(buffer.encodedRegionName), 074 (k, v) -> v == null ? writer.editsWritten : v + writer.editsWritten); 075 List<IOException> thrown = new ArrayList<>(); 076 Path dst = closeRecoveredEditsWriter(writer, thrown); 077 splits.add(dst); 078 openingWritersNum.decrementAndGet(); 079 if (!thrown.isEmpty()) { 080 throw MultipleIOException.createIOException(thrown); 081 } 082 } 083 } 084 085 @Override 086 public List<Path> close() throws IOException { 087 boolean isSuccessful = true; 088 try { 089 isSuccessful = finishWriterThreads(false); 090 } finally { 091 isSuccessful &= writeRemainingEntryBuffers(); 092 } 093 return isSuccessful ? splits : null; 094 } 095 096 /** 097 * Write out the remaining RegionEntryBuffers and close the writers. 098 * 099 * @return true when there is no error. 100 */ 101 private boolean writeRemainingEntryBuffers() throws IOException { 102 for (EntryBuffers.RegionEntryBuffer buffer : entryBuffers.buffers.values()) { 103 closeCompletionService.submit(() -> { 104 append(buffer); 105 return null; 106 }); 107 } 108 boolean progressFailed = false; 109 try { 110 for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) { 111 Future<Void> future = closeCompletionService.take(); 112 future.get(); 113 if (!progressFailed && reporter != null && !reporter.progress()) { 114 progressFailed = true; 115 } 116 } 117 } catch (InterruptedException e) { 118 IOException iie = new InterruptedIOException(); 119 iie.initCause(e); 120 throw iie; 121 } catch (ExecutionException e) { 122 throw new IOException(e.getCause()); 123 } finally { 124 closeThreadPool.shutdownNow(); 125 } 126 return !progressFailed; 127 } 128 129 @Override 130 public Map<String, Long> getOutputCounts() { 131 return regionEditsWrittenMap; 132 } 133 134 @Override 135 public int getNumberOfRecoveredRegions() { 136 return regionEditsWrittenMap.size(); 137 } 138 139 @Override 140 public int getNumOpenWriters() { 141 return openingWritersNum.get(); 142 } 143}