001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.Future; 030import java.util.concurrent.atomic.AtomicInteger; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.CellComparatorImpl; 034import org.apache.hadoop.hbase.CellUtil; 035import org.apache.hadoop.hbase.PrivateCellUtil; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.io.hfile.CacheConfig; 038import org.apache.hadoop.hbase.io.hfile.HFileContext; 039import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 040import org.apache.hadoop.hbase.regionserver.CellSet; 041import org.apache.hadoop.hbase.regionserver.HStore; 042import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer; 045import org.apache.hadoop.hbase.wal.WAL.Entry; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050/** 051 * A WALSplitter sink that outputs {@link org.apache.hadoop.hbase.io.hfile.HFile}s. 052 * Runs with a bounded number of HFile writers at any one time rather than let the count run up. 053 * @see BoundedRecoveredEditsOutputSink for a sink implementation that writes intermediate 054 * recovered.edits files. 055 */ 056@InterfaceAudience.Private 057public class BoundedRecoveredHFilesOutputSink extends OutputSink { 058 private static final Logger LOG = LoggerFactory.getLogger(BoundedRecoveredHFilesOutputSink.class); 059 060 private final WALSplitter walSplitter; 061 062 // Since the splitting process may create multiple output files, we need a map 063 // to track the output count of each region. 064 private ConcurrentMap<String, Long> regionEditsWrittenMap = new ConcurrentHashMap<>(); 065 // Need a counter to track the opening writers. 066 private final AtomicInteger openingWritersNum = new AtomicInteger(0); 067 068 public BoundedRecoveredHFilesOutputSink(WALSplitter walSplitter, 069 WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { 070 super(controller, entryBuffers, numWriters); 071 this.walSplitter = walSplitter; 072 } 073 074 @Override 075 public void append(RegionEntryBuffer buffer) throws IOException { 076 Map<String, CellSet> familyCells = new HashMap<>(); 077 Map<String, Long> familySeqIds = new HashMap<>(); 078 boolean isMetaTable = buffer.tableName.equals(META_TABLE_NAME); 079 // First iterate all Cells to find which column families are present and to stamp Cell with 080 // sequence id. 081 for (WAL.Entry entry : buffer.entries) { 082 long seqId = entry.getKey().getSequenceId(); 083 List<Cell> cells = entry.getEdit().getCells(); 084 for (Cell cell : cells) { 085 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { 086 continue; 087 } 088 PrivateCellUtil.setSequenceId(cell, seqId); 089 String familyName = Bytes.toString(CellUtil.cloneFamily(cell)); 090 // comparator need to be specified for meta 091 familyCells 092 .computeIfAbsent(familyName, 093 key -> new CellSet( 094 isMetaTable ? CellComparatorImpl.META_COMPARATOR : CellComparatorImpl.COMPARATOR)) 095 .add(cell); 096 familySeqIds.compute(familyName, (k, v) -> v == null ? seqId : Math.max(v, seqId)); 097 } 098 } 099 100 // Create a new hfile writer for each column family, write edits then close writer. 101 String regionName = Bytes.toString(buffer.encodedRegionName); 102 for (Map.Entry<String, CellSet> cellsEntry : familyCells.entrySet()) { 103 String familyName = cellsEntry.getKey(); 104 StoreFileWriter writer = createRecoveredHFileWriter(buffer.tableName, regionName, 105 familySeqIds.get(familyName), familyName, isMetaTable); 106 LOG.trace("Created {}", writer.getPath()); 107 openingWritersNum.incrementAndGet(); 108 try { 109 for (Cell cell : cellsEntry.getValue()) { 110 writer.append(cell); 111 } 112 // Append the max seqid to hfile, used when recovery. 113 writer.appendMetadata(familySeqIds.get(familyName), false); 114 regionEditsWrittenMap.compute(Bytes.toString(buffer.encodedRegionName), 115 (k, v) -> v == null ? buffer.entries.size() : v + buffer.entries.size()); 116 splits.add(writer.getPath()); 117 openingWritersNum.decrementAndGet(); 118 } finally { 119 writer.close(); 120 LOG.trace("Closed {}, edits={}", writer.getPath(), familyCells.size()); 121 } 122 } 123 } 124 125 @Override 126 public List<Path> close() throws IOException { 127 boolean isSuccessful = true; 128 try { 129 isSuccessful = finishWriterThreads(false); 130 } finally { 131 isSuccessful &= writeRemainingEntryBuffers(); 132 } 133 return isSuccessful ? splits : null; 134 } 135 136 /** 137 * Write out the remaining RegionEntryBuffers and close the writers. 138 * 139 * @return true when there is no error. 140 */ 141 private boolean writeRemainingEntryBuffers() throws IOException { 142 for (EntryBuffers.RegionEntryBuffer buffer : entryBuffers.buffers.values()) { 143 closeCompletionService.submit(() -> { 144 append(buffer); 145 return null; 146 }); 147 } 148 boolean progressFailed = false; 149 try { 150 for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) { 151 Future<Void> future = closeCompletionService.take(); 152 future.get(); 153 if (!progressFailed && reporter != null && !reporter.progress()) { 154 progressFailed = true; 155 } 156 } 157 } catch (InterruptedException e) { 158 IOException iie = new InterruptedIOException(); 159 iie.initCause(e); 160 throw iie; 161 } catch (ExecutionException e) { 162 throw new IOException(e.getCause()); 163 } finally { 164 closeThreadPool.shutdownNow(); 165 } 166 return !progressFailed; 167 } 168 169 @Override 170 public Map<String, Long> getOutputCounts() { 171 return regionEditsWrittenMap; 172 } 173 174 @Override 175 public int getNumberOfRecoveredRegions() { 176 return regionEditsWrittenMap.size(); 177 } 178 179 @Override 180 public int getNumOpenWriters() { 181 return openingWritersNum.get(); 182 } 183 184 @Override 185 public boolean keepRegionEvent(Entry entry) { 186 return false; 187 } 188 189 /** 190 * @return Returns a base HFile without compressions or encodings; good enough for recovery 191 * given hfile has metadata on how it was written. 192 */ 193 private StoreFileWriter createRecoveredHFileWriter(TableName tableName, String regionName, 194 long seqId, String familyName, boolean isMetaTable) throws IOException { 195 Path outputDir = WALSplitUtil.tryCreateRecoveredHFilesDir(walSplitter.rootFS, walSplitter.conf, 196 tableName, regionName, familyName); 197 StoreFileWriter.Builder writerBuilder = 198 new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, walSplitter.rootFS) 199 .withOutputDir(outputDir); 200 HFileContext hFileContext = new HFileContextBuilder(). 201 withChecksumType(HStore.getChecksumType(walSplitter.conf)). 202 withBytesPerCheckSum(HStore.getBytesPerChecksum(walSplitter.conf)). 203 withCellComparator(isMetaTable? 204 CellComparatorImpl.META_COMPARATOR: CellComparatorImpl.COMPARATOR).build(); 205 return writerBuilder.withFileContext(hFileContext).build(); 206 } 207}