001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.Future; 030import java.util.concurrent.atomic.AtomicInteger; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellComparatorImpl; 034import org.apache.hadoop.hbase.CellUtil; 035import org.apache.hadoop.hbase.MetaCellComparator; 036import org.apache.hadoop.hbase.PrivateCellUtil; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.io.hfile.CacheConfig; 039import org.apache.hadoop.hbase.io.hfile.HFileContext; 040import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 041import org.apache.hadoop.hbase.regionserver.CellSet; 042import org.apache.hadoop.hbase.regionserver.HStore; 043import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer; 046import org.apache.hadoop.hbase.wal.WAL.Entry; 047import org.apache.yetus.audience.InterfaceAudience; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051/** 052 * A WALSplitter sink that outputs {@link org.apache.hadoop.hbase.io.hfile.HFile}s. 053 * Runs with a bounded number of HFile writers at any one time rather than let the count run up. 054 * @see BoundedRecoveredEditsOutputSink for a sink implementation that writes intermediate 055 * recovered.edits files. 056 */ 057@InterfaceAudience.Private 058public class BoundedRecoveredHFilesOutputSink extends OutputSink { 059 private static final Logger LOG = LoggerFactory.getLogger(BoundedRecoveredHFilesOutputSink.class); 060 061 private final WALSplitter walSplitter; 062 063 // Since the splitting process may create multiple output files, we need a map 064 // to track the output count of each region. 065 private ConcurrentMap<String, Long> regionEditsWrittenMap = new ConcurrentHashMap<>(); 066 // Need a counter to track the opening writers. 067 private final AtomicInteger openingWritersNum = new AtomicInteger(0); 068 069 public BoundedRecoveredHFilesOutputSink(WALSplitter walSplitter, 070 WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { 071 super(controller, entryBuffers, numWriters); 072 this.walSplitter = walSplitter; 073 } 074 075 @Override 076 public void append(RegionEntryBuffer buffer) throws IOException { 077 Map<String, CellSet> familyCells = new HashMap<>(); 078 Map<String, Long> familySeqIds = new HashMap<>(); 079 boolean isMetaTable = buffer.tableName.equals(META_TABLE_NAME); 080 // First iterate all Cells to find which column families are present and to stamp Cell with 081 // sequence id. 082 for (WAL.Entry entry : buffer.entries) { 083 long seqId = entry.getKey().getSequenceId(); 084 List<Cell> cells = entry.getEdit().getCells(); 085 for (Cell cell : cells) { 086 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { 087 continue; 088 } 089 PrivateCellUtil.setSequenceId(cell, seqId); 090 String familyName = Bytes.toString(CellUtil.cloneFamily(cell)); 091 // comparator need to be specified for meta 092 familyCells 093 .computeIfAbsent(familyName, 094 key -> new CellSet( 095 isMetaTable ? MetaCellComparator.META_COMPARATOR : CellComparatorImpl.COMPARATOR)) 096 .add(cell); 097 familySeqIds.compute(familyName, (k, v) -> v == null ? seqId : Math.max(v, seqId)); 098 } 099 } 100 101 // Create a new hfile writer for each column family, write edits then close writer. 102 String regionName = Bytes.toString(buffer.encodedRegionName); 103 for (Map.Entry<String, CellSet> cellsEntry : familyCells.entrySet()) { 104 String familyName = cellsEntry.getKey(); 105 StoreFileWriter writer = createRecoveredHFileWriter(buffer.tableName, regionName, 106 familySeqIds.get(familyName), familyName, isMetaTable); 107 LOG.trace("Created {}", writer.getPath()); 108 openingWritersNum.incrementAndGet(); 109 try { 110 for (Cell cell : cellsEntry.getValue()) { 111 writer.append(cell); 112 } 113 // Append the max seqid to hfile, used when recovery. 114 writer.appendMetadata(familySeqIds.get(familyName), false); 115 regionEditsWrittenMap.compute(Bytes.toString(buffer.encodedRegionName), 116 (k, v) -> v == null ? buffer.entries.size() : v + buffer.entries.size()); 117 splits.add(writer.getPath()); 118 openingWritersNum.decrementAndGet(); 119 } finally { 120 writer.close(); 121 LOG.trace("Closed {}, edits={}", writer.getPath(), familyCells.size()); 122 } 123 } 124 } 125 126 @Override 127 public List<Path> close() throws IOException { 128 boolean isSuccessful = true; 129 try { 130 isSuccessful = finishWriterThreads(false); 131 } finally { 132 isSuccessful &= writeRemainingEntryBuffers(); 133 } 134 return isSuccessful ? splits : null; 135 } 136 137 /** 138 * Write out the remaining RegionEntryBuffers and close the writers. 139 * 140 * @return true when there is no error. 141 */ 142 private boolean writeRemainingEntryBuffers() throws IOException { 143 for (EntryBuffers.RegionEntryBuffer buffer : entryBuffers.buffers.values()) { 144 closeCompletionService.submit(() -> { 145 append(buffer); 146 return null; 147 }); 148 } 149 boolean progressFailed = false; 150 try { 151 for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) { 152 Future<Void> future = closeCompletionService.take(); 153 future.get(); 154 if (!progressFailed && reporter != null && !reporter.progress()) { 155 progressFailed = true; 156 } 157 } 158 } catch (InterruptedException e) { 159 IOException iie = new InterruptedIOException(); 160 iie.initCause(e); 161 throw iie; 162 } catch (ExecutionException e) { 163 throw new IOException(e.getCause()); 164 } finally { 165 closeThreadPool.shutdownNow(); 166 } 167 return !progressFailed; 168 } 169 170 @Override 171 public Map<String, Long> getOutputCounts() { 172 return regionEditsWrittenMap; 173 } 174 175 @Override 176 public int getNumberOfRecoveredRegions() { 177 return regionEditsWrittenMap.size(); 178 } 179 180 @Override 181 public int getNumOpenWriters() { 182 return openingWritersNum.get(); 183 } 184 185 @Override 186 public boolean keepRegionEvent(Entry entry) { 187 return false; 188 } 189 190 /** 191 * @return Returns a base HFile without compressions or encodings; good enough for recovery 192 * given hfile has metadata on how it was written. 193 */ 194 private StoreFileWriter createRecoveredHFileWriter(TableName tableName, String regionName, 195 long seqId, String familyName, boolean isMetaTable) throws IOException { 196 Path outputDir = WALSplitUtil.tryCreateRecoveredHFilesDir(walSplitter.rootFS, walSplitter.conf, 197 tableName, regionName, familyName); 198 StoreFileWriter.Builder writerBuilder = 199 new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, walSplitter.rootFS) 200 .withOutputDir(outputDir); 201 HFileContext hFileContext = new HFileContextBuilder(). 202 withChecksumType(HStore.getChecksumType(walSplitter.conf)). 203 withBytesPerCheckSum(HStore.getBytesPerChecksum(walSplitter.conf)). 204 withCellComparator(isMetaTable? 205 MetaCellComparator.META_COMPARATOR: CellComparatorImpl.COMPARATOR).build(); 206 return writerBuilder.withFileContext(hFileContext).build(); 207 } 208}