001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 021 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.Future; 031import java.util.concurrent.atomic.AtomicInteger; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.CellComparatorImpl; 035import org.apache.hadoop.hbase.CellUtil; 036import org.apache.hadoop.hbase.ExtendedCell; 037import org.apache.hadoop.hbase.MetaCellComparator; 038import org.apache.hadoop.hbase.PrivateCellUtil; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.io.hfile.CacheConfig; 041import org.apache.hadoop.hbase.io.hfile.HFileContext; 042import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 043import org.apache.hadoop.hbase.regionserver.CellSet; 044import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 045import org.apache.hadoop.hbase.regionserver.StoreUtils; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer; 048import org.apache.hadoop.hbase.wal.WAL.Entry; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * A WALSplitter sink that outputs {@link org.apache.hadoop.hbase.io.hfile.HFile}s. Runs with a 055 * bounded number of HFile writers at any one time rather than let the count run up. 056 * @see BoundedRecoveredEditsOutputSink for a sink implementation that writes intermediate 057 * recovered.edits files. 058 */ 059@InterfaceAudience.Private 060public class BoundedRecoveredHFilesOutputSink extends OutputSink { 061 private static final Logger LOG = LoggerFactory.getLogger(BoundedRecoveredHFilesOutputSink.class); 062 063 private final WALSplitter walSplitter; 064 065 // Since the splitting process may create multiple output files, we need a map 066 // to track the output count of each region. 067 private ConcurrentMap<String, Long> regionEditsWrittenMap = new ConcurrentHashMap<>(); 068 // Need a counter to track the opening writers. 069 private final AtomicInteger openingWritersNum = new AtomicInteger(0); 070 071 public BoundedRecoveredHFilesOutputSink(WALSplitter walSplitter, 072 WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { 073 super(controller, entryBuffers, numWriters); 074 this.walSplitter = walSplitter; 075 } 076 077 @Override 078 void append(RegionEntryBuffer buffer) throws IOException { 079 Map<String, CellSet<ExtendedCell>> familyCells = new HashMap<>(); 080 Map<String, Long> familySeqIds = new HashMap<>(); 081 boolean isMetaTable = buffer.tableName.equals(META_TABLE_NAME); 082 // First iterate all Cells to find which column families are present and to stamp Cell with 083 // sequence id. 084 for (WAL.Entry entry : buffer.entryBuffer) { 085 long seqId = entry.getKey().getSequenceId(); 086 List<Cell> cells = entry.getEdit().getCells(); 087 for (Cell cell : cells) { 088 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { 089 continue; 090 } 091 // only ExtendedCell can set sequence id, so it is safe to cast it to ExtendedCell later. 092 PrivateCellUtil.setSequenceId(cell, seqId); 093 String familyName = Bytes.toString(CellUtil.cloneFamily(cell)); 094 // comparator need to be specified for meta 095 familyCells 096 .computeIfAbsent(familyName, 097 key -> new CellSet<>( 098 isMetaTable ? MetaCellComparator.META_COMPARATOR : CellComparatorImpl.COMPARATOR)) 099 .add((ExtendedCell) cell); 100 familySeqIds.compute(familyName, (k, v) -> v == null ? seqId : Math.max(v, seqId)); 101 } 102 } 103 104 // Create a new hfile writer for each column family, write edits then close writer. 105 String regionName = Bytes.toString(buffer.encodedRegionName); 106 for (Map.Entry<String, CellSet<ExtendedCell>> cellsEntry : familyCells.entrySet()) { 107 String familyName = cellsEntry.getKey(); 108 StoreFileWriter writer = createRecoveredHFileWriter(buffer.tableName, regionName, 109 familySeqIds.get(familyName), familyName, isMetaTable); 110 LOG.trace("Created {}", writer.getPath()); 111 openingWritersNum.incrementAndGet(); 112 try { 113 for (ExtendedCell cell : cellsEntry.getValue()) { 114 writer.append(cell); 115 } 116 // Append the max seqid to hfile, used when recovery. 117 writer.appendMetadata(familySeqIds.get(familyName), false); 118 regionEditsWrittenMap.compute(Bytes.toString(buffer.encodedRegionName), 119 (k, v) -> v == null ? buffer.entryBuffer.size() : v + buffer.entryBuffer.size()); 120 splits.add(writer.getPath()); 121 openingWritersNum.decrementAndGet(); 122 } finally { 123 writer.close(); 124 LOG.trace("Closed {}, edits={}", writer.getPath(), familyCells.size()); 125 } 126 } 127 } 128 129 @Override 130 public List<Path> close() throws IOException { 131 boolean isSuccessful = true; 132 try { 133 isSuccessful = finishWriterThreads(); 134 } finally { 135 isSuccessful &= writeRemainingEntryBuffers(); 136 } 137 return isSuccessful ? splits : null; 138 } 139 140 /** 141 * Write out the remaining RegionEntryBuffers and close the writers. 142 * @return true when there is no error. 143 */ 144 private boolean writeRemainingEntryBuffers() throws IOException { 145 for (EntryBuffers.RegionEntryBuffer buffer : entryBuffers.buffers.values()) { 146 closeCompletionService.submit(() -> { 147 append(buffer); 148 return null; 149 }); 150 } 151 boolean progressFailed = false; 152 try { 153 for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) { 154 Future<Void> future = closeCompletionService.take(); 155 future.get(); 156 if (!progressFailed && reporter != null && !reporter.progress()) { 157 progressFailed = true; 158 } 159 } 160 } catch (InterruptedException e) { 161 IOException iie = new InterruptedIOException(); 162 iie.initCause(e); 163 throw iie; 164 } catch (ExecutionException e) { 165 throw new IOException(e.getCause()); 166 } finally { 167 closeThreadPool.shutdownNow(); 168 } 169 return !progressFailed; 170 } 171 172 @Override 173 public Map<String, Long> getOutputCounts() { 174 return regionEditsWrittenMap; 175 } 176 177 @Override 178 public int getNumberOfRecoveredRegions() { 179 return regionEditsWrittenMap.size(); 180 } 181 182 @Override 183 int getNumOpenWriters() { 184 return openingWritersNum.get(); 185 } 186 187 @Override 188 boolean keepRegionEvent(Entry entry) { 189 return false; 190 } 191 192 /** 193 * @return Returns a base HFile without compressions or encodings; good enough for recovery given 194 * hfile has metadata on how it was written. 195 */ 196 private StoreFileWriter createRecoveredHFileWriter(TableName tableName, String regionName, 197 long seqId, String familyName, boolean isMetaTable) throws IOException { 198 Path outputDir = WALSplitUtil.tryCreateRecoveredHFilesDir(walSplitter.rootFS, walSplitter.conf, 199 tableName, regionName, familyName); 200 StoreFileWriter.Builder writerBuilder = 201 new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, walSplitter.rootFS) 202 .withOutputDir(outputDir); 203 HFileContext hFileContext = 204 new HFileContextBuilder().withChecksumType(StoreUtils.getChecksumType(walSplitter.conf)) 205 .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(walSplitter.conf)).withCellComparator( 206 isMetaTable ? MetaCellComparator.META_COMPARATOR : CellComparatorImpl.COMPARATOR) 207 .build(); 208 return writerBuilder.withFileContext(hFileContext).build(); 209 } 210}