001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.List; 024import org.apache.hadoop.fs.Path; 025import org.apache.yetus.audience.InterfaceAudience; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029/** 030 * Base class for cell sink that separates the provided cells into multiple files. 031 */ 032@InterfaceAudience.Private 033public abstract class AbstractMultiFileWriter implements CellSink, ShipperListener { 034 035 private static final Logger LOG = LoggerFactory.getLogger(AbstractMultiFileWriter.class); 036 037 /** Factory that is used to produce single StoreFile.Writer-s */ 038 protected WriterFactory writerFactory; 039 040 /** Source scanner that is tracking KV count; may be null if source is not StoreScanner */ 041 protected StoreScanner sourceScanner; 042 043 public interface WriterFactory { 044 public StoreFileWriter createWriter() throws IOException; 045 } 046 047 /** 048 * Initializes multi-writer before usage. 049 * @param sourceScanner Optional store scanner to obtain the information about read progress. 050 * @param factory Factory used to produce individual file writers. 051 */ 052 public void init(StoreScanner sourceScanner, WriterFactory factory) { 053 this.writerFactory = factory; 054 this.sourceScanner = sourceScanner; 055 } 056 057 /** 058 * Commit all writers. 059 * <p> 060 * Notice that here we use the same <code>maxSeqId</code> for all output files since we haven't 061 * find an easy to find enough sequence ids for different output files in some corner cases. See 062 * comments in HBASE-15400 for more details. 063 */ 064 public List<Path> commitWriters(long maxSeqId, boolean majorCompaction) throws IOException { 065 preCommitWriters(); 066 Collection<StoreFileWriter> writers = this.writers(); 067 if (LOG.isDebugEnabled()) { 068 LOG.debug("Commit " + writers.size() + " writers, maxSeqId=" + maxSeqId 069 + ", majorCompaction=" + majorCompaction); 070 } 071 List<Path> paths = new ArrayList<>(); 072 for (StoreFileWriter writer : writers) { 073 if (writer == null) { 074 continue; 075 } 076 writer.appendMetadata(maxSeqId, majorCompaction); 077 preCloseWriter(writer); 078 paths.add(writer.getPath()); 079 writer.close(); 080 } 081 return paths; 082 } 083 084 /** 085 * Close all writers without throwing any exceptions. This is used when compaction failed usually. 086 */ 087 public List<Path> abortWriters() { 088 List<Path> paths = new ArrayList<>(); 089 for (StoreFileWriter writer : writers()) { 090 try { 091 if (writer != null) { 092 paths.add(writer.getPath()); 093 writer.close(); 094 } 095 } catch (Exception ex) { 096 LOG.error("Failed to close the writer after an unfinished compaction.", ex); 097 } 098 } 099 return paths; 100 } 101 102 protected abstract Collection<StoreFileWriter> writers(); 103 104 /** 105 * Subclasses override this method to be called at the end of a successful sequence of append; all 106 * appends are processed before this method is called. 107 */ 108 protected void preCommitWriters() throws IOException { 109 } 110 111 /** 112 * Subclasses override this method to be called before we close the give writer. Usually you can 113 * append extra metadata to the writer. 114 */ 115 protected void preCloseWriter(StoreFileWriter writer) throws IOException { 116 } 117 118 @Override 119 public void beforeShipped() throws IOException { 120 Collection<StoreFileWriter> writers = writers(); 121 if (writers != null) { 122 for (StoreFileWriter writer : writers) { 123 if (writer != null) { 124 writer.beforeShipped(); 125 } 126 } 127 } 128 } 129}