001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.List; 025import org.apache.hadoop.fs.Path; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030/** 031 * Base class for cell sink that separates the provided cells into multiple files. 032 */ 033@InterfaceAudience.Private 034public abstract class AbstractMultiFileWriter implements CellSink, ShipperListener { 035 036 private static final Logger LOG = LoggerFactory.getLogger(AbstractMultiFileWriter.class); 037 038 /** Factory that is used to produce single StoreFile.Writer-s */ 039 protected WriterFactory writerFactory; 040 041 /** Source scanner that is tracking KV count; may be null if source is not StoreScanner */ 042 protected StoreScanner sourceScanner; 043 044 public interface WriterFactory { 045 public StoreFileWriter createWriter() throws IOException; 046 } 047 048 /** 049 * Initializes multi-writer before usage. 050 * @param sourceScanner Optional store scanner to obtain the information about read progress. 051 * @param factory Factory used to produce individual file writers. 052 */ 053 public void init(StoreScanner sourceScanner, WriterFactory factory) { 054 this.writerFactory = factory; 055 this.sourceScanner = sourceScanner; 056 } 057 058 /** 059 * Commit all writers. 060 * <p> 061 * Notice that here we use the same <code>maxSeqId</code> for all output files since we haven't 062 * find an easy to find enough sequence ids for different output files in some corner cases. See 063 * comments in HBASE-15400 for more details. 064 */ 065 public List<Path> commitWriters(long maxSeqId, boolean majorCompaction) throws IOException { 066 return commitWriters(maxSeqId, majorCompaction, Collections.EMPTY_SET); 067 } 068 069 public List<Path> commitWriters(long maxSeqId, boolean majorCompaction, 070 Collection<HStoreFile> storeFiles) throws IOException { 071 preCommitWriters(); 072 Collection<StoreFileWriter> writers = this.writers(); 073 if (LOG.isDebugEnabled()) { 074 LOG.debug( 075 "Commit " + writers.size() + " writers, maxSeqId=" + maxSeqId + ", majorCompaction=" + 076 majorCompaction); 077 } 078 List<Path> paths = new ArrayList<>(); 079 for (StoreFileWriter writer : writers) { 080 if (writer == null) { 081 continue; 082 } 083 writer.appendMetadata(maxSeqId, majorCompaction, storeFiles); 084 preCloseWriter(writer); 085 paths.add(writer.getPath()); 086 writer.close(); 087 } 088 return paths; 089 } 090 091 /** 092 * Close all writers without throwing any exceptions. This is used when compaction failed usually. 093 */ 094 public List<Path> abortWriters() { 095 List<Path> paths = new ArrayList<>(); 096 for (StoreFileWriter writer : writers()) { 097 try { 098 if (writer != null) { 099 paths.add(writer.getPath()); 100 writer.close(); 101 } 102 } catch (Exception ex) { 103 LOG.error("Failed to close the writer after an unfinished compaction.", ex); 104 } 105 } 106 return paths; 107 } 108 109 protected abstract Collection<StoreFileWriter> writers(); 110 111 /** 112 * Subclasses override this method to be called at the end of a successful sequence of append; all 113 * appends are processed before this method is called. 114 */ 115 protected void preCommitWriters() throws IOException { 116 } 117 118 /** 119 * Subclasses override this method to be called before we close the give writer. Usually you can 120 * append extra metadata to the writer. 121 */ 122 protected void preCloseWriter(StoreFileWriter writer) throws IOException { 123 } 124 125 @Override 126 public void beforeShipped() throws IOException { 127 Collection<StoreFileWriter> writers = writers(); 128 if (writers != null) { 129 for (StoreFileWriter writer : writers) { 130 if (writer != null) { 131 writer.beforeShipped(); 132 } 133 } 134 } 135 } 136}