001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.Callable; 026import java.util.concurrent.CompletionService; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.Future; 030 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.hadoop.io.MultipleIOException; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * Class that manages the output streams from the log splitting process. 040 * Bounded means the output streams will be no more than the size of threadpool 041 */ 042@InterfaceAudience.Private 043public class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink { 044 private static final Logger LOG = 045 LoggerFactory.getLogger(BoundedLogWriterCreationOutputSink.class); 046 047 private ConcurrentHashMap<String, Long> regionRecoverStatMap = new ConcurrentHashMap<>(); 048 049 public BoundedLogWriterCreationOutputSink(WALSplitter walSplitter, 050 WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { 051 super(walSplitter, controller, entryBuffers, numWriters); 052 } 053 054 @Override 055 public List<Path> finishWritingAndClose() throws IOException { 056 boolean isSuccessful; 057 List<Path> result; 058 try { 059 isSuccessful = finishWriting(false); 060 } finally { 061 result = close(); 062 } 063 if (isSuccessful) { 064 splits = result; 065 } 066 return splits; 067 } 068 069 @Override 070 boolean executeCloseTask(CompletionService<Void> completionService, List<IOException> thrown, 071 List<Path> paths) throws InterruptedException, ExecutionException { 072 for (final Map.Entry<byte[], WALSplitter.RegionEntryBuffer> buffer : entryBuffers.buffers 073 .entrySet()) { 074 LOG.info("Submitting writeThenClose of {}", 075 Bytes.toString(buffer.getValue().encodedRegionName)); 076 completionService.submit(new Callable<Void>() { 077 @Override 078 public Void call() throws Exception { 079 Path dst = writeThenClose(buffer.getValue()); 080 paths.add(dst); 081 return null; 082 } 083 }); 084 } 085 boolean progress_failed = false; 086 for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) { 087 Future<Void> future = completionService.take(); 088 future.get(); 089 if (!progress_failed && reporter != null && !reporter.progress()) { 090 progress_failed = true; 091 } 092 } 093 094 return progress_failed; 095 } 096 097 /** 098 * since the splitting process may create multiple output files, we need a map 099 * regionRecoverStatMap to track the output count of each region. 100 * @return a map from encoded region ID to the number of edits written out for that region. 101 */ 102 @Override 103 public Map<byte[], Long> getOutputCounts() { 104 Map<byte[], Long> regionRecoverStatMapResult = new HashMap<>(); 105 for (Map.Entry<String, Long> entry : regionRecoverStatMap.entrySet()) { 106 regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue()); 107 } 108 return regionRecoverStatMapResult; 109 } 110 111 /** 112 * @return the number of recovered regions 113 */ 114 @Override 115 public int getNumberOfRecoveredRegions() { 116 return regionRecoverStatMap.size(); 117 } 118 119 /** 120 * Append the buffer to a new recovered edits file, then close it after all done 121 * @param buffer contain all entries of a certain region 122 * @throws IOException when closeWriter failed 123 */ 124 @Override 125 public void append(WALSplitter.RegionEntryBuffer buffer) throws IOException { 126 writeThenClose(buffer); 127 } 128 129 private Path writeThenClose(WALSplitter.RegionEntryBuffer buffer) throws IOException { 130 WALSplitter.WriterAndPath wap = appendBuffer(buffer, false); 131 if (wap != null) { 132 String encodedRegionName = Bytes.toString(buffer.encodedRegionName); 133 Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten); 134 if (value != null) { 135 Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten; 136 regionRecoverStatMap.put(encodedRegionName, newValue); 137 } 138 } 139 140 Path dst = null; 141 List<IOException> thrown = new ArrayList<>(); 142 if (wap != null) { 143 dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown); 144 } 145 if (!thrown.isEmpty()) { 146 throw MultipleIOException.createIOException(thrown); 147 } 148 return dst; 149 } 150}