001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.List; 025import org.apache.hadoop.fs.Path; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030/** 031 * Base class for cell sink that separates the provided cells into multiple files. 032 */ 033@InterfaceAudience.Private 034public abstract class AbstractMultiFileWriter implements CellSink, ShipperListener { 035 036 private static final Logger LOG = LoggerFactory.getLogger(AbstractMultiFileWriter.class); 037 038 /** Factory that is used to produce single StoreFile.Writer-s */ 039 protected WriterFactory writerFactory; 040 041 /** Source scanner that is tracking KV count; may be null if source is not StoreScanner */ 042 protected StoreScanner sourceScanner; 043 044 public interface WriterFactory { 045 public StoreFileWriter createWriter() throws IOException; 046 047 default StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy) 048 throws IOException { 049 return createWriter(); 050 }; 051 } 052 053 /** 054 * Initializes multi-writer before usage. 055 * @param sourceScanner Optional store scanner to obtain the information about read progress. 056 * @param factory Factory used to produce individual file writers. 057 */ 058 public void init(StoreScanner sourceScanner, WriterFactory factory) { 059 this.writerFactory = factory; 060 this.sourceScanner = sourceScanner; 061 } 062 063 /** 064 * Commit all writers. 065 * <p> 066 * Notice that here we use the same <code>maxSeqId</code> for all output files since we haven't 067 * find an easy to find enough sequence ids for different output files in some corner cases. See 068 * comments in HBASE-15400 for more details. 069 */ 070 public List<Path> commitWriters(long maxSeqId, boolean majorCompaction) throws IOException { 071 return commitWriters(maxSeqId, majorCompaction, Collections.emptyList()); 072 } 073 074 public List<Path> commitWriters(long maxSeqId, boolean majorCompaction, 075 Collection<HStoreFile> storeFiles) throws IOException { 076 preCommitWriters(); 077 Collection<StoreFileWriter> writers = this.writers(); 078 if (LOG.isDebugEnabled()) { 079 LOG.debug("Commit " + writers.size() + " writers, maxSeqId=" + maxSeqId + ", majorCompaction=" 080 + majorCompaction); 081 } 082 List<Path> paths = new ArrayList<>(); 083 for (StoreFileWriter writer : writers) { 084 if (writer == null) { 085 continue; 086 } 087 writer.appendMetadata(maxSeqId, majorCompaction, storeFiles); 088 preCloseWriter(writer); 089 paths.add(writer.getPath()); 090 writer.close(); 091 } 092 return paths; 093 } 094 095 /** 096 * Close all writers without throwing any exceptions. This is used when compaction failed usually. 097 */ 098 public List<Path> abortWriters() { 099 List<Path> paths = new ArrayList<>(); 100 for (StoreFileWriter writer : writers()) { 101 try { 102 if (writer != null) { 103 paths.add(writer.getPath()); 104 writer.close(); 105 } 106 } catch (Exception ex) { 107 LOG.error("Failed to close the writer after an unfinished compaction.", ex); 108 } 109 } 110 return paths; 111 } 112 113 protected abstract Collection<StoreFileWriter> writers(); 114 115 /** 116 * Subclasses override this method to be called at the end of a successful sequence of append; all 117 * appends are processed before this method is called. 118 */ 119 protected void preCommitWriters() throws IOException { 120 } 121 122 /** 123 * Subclasses override this method to be called before we close the give writer. Usually you can 124 * append extra metadata to the writer. 125 */ 126 protected void preCloseWriter(StoreFileWriter writer) throws IOException { 127 } 128 129 @Override 130 public void beforeShipped() throws IOException { 131 Collection<StoreFileWriter> writers = writers(); 132 if (writers != null) { 133 for (StoreFileWriter writer : writers) { 134 if (writer != null) { 135 writer.beforeShipped(); 136 } 137 } 138 } 139 } 140}