001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.master; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.Map; 023import java.util.Set; 024import org.apache.commons.io.IOUtils; 025import org.apache.hadoop.fs.FileStatus; 026import org.apache.hadoop.hbase.HBaseInterfaceAudience; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.Server; 029import org.apache.hadoop.hbase.client.Connection; 030import org.apache.hadoop.hbase.client.ConnectionFactory; 031import org.apache.hadoop.hbase.master.HMaster; 032import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; 033import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; 034import org.apache.hadoop.hbase.replication.ReplicationException; 035import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 036import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hbase.thirdparty.com.google.common.base.Predicate; 042import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 043import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; 044 045/** 046 * Implementation of a file cleaner that checks if a hfile is still scheduled for replication before 047 * deleting it from hfile archive directory. 048 */ 049@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 050public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate { 051 private static final Logger LOG = LoggerFactory.getLogger(ReplicationHFileCleaner.class); 052 private Connection conn; 053 private boolean shareConn; 054 private ReplicationQueueStorage rqs; 055 private boolean stopped = false; 056 057 @Override 058 public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) { 059 if ( 060 !(getConf().getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, 061 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) 062 ) { 063 LOG.warn(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY + " is not enabled. Better to remove " 064 + ReplicationHFileCleaner.class + " from " + HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS 065 + " configuration."); 066 return files; 067 } 068 069 final Set<String> hfileRefs; 070 try { 071 // The concurrently created new hfile entries in ZK may not be included in the return list, 072 // but they won't be deleted because they're not in the checking set. 073 hfileRefs = rqs.getAllHFileRefs(); 074 } catch (ReplicationException e) { 075 LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable files"); 076 return Collections.emptyList(); 077 } 078 return Iterables.filter(files, new Predicate<FileStatus>() { 079 @Override 080 public boolean apply(FileStatus file) { 081 // just for overriding the findbugs NP warnings, as the parameter is marked as Nullable in 082 // the guava Predicate. 083 if (file == null) { 084 return false; 085 } 086 String hfile = file.getPath().getName(); 087 boolean foundHFileRefInQueue = hfileRefs.contains(hfile); 088 if (LOG.isDebugEnabled()) { 089 if (foundHFileRefInQueue) { 090 LOG.debug("Found hfile reference in ZK, keeping: " + hfile); 091 } else { 092 LOG.debug("Did not find hfile reference in ZK, deleting: " + hfile); 093 } 094 } 095 return !foundHFileRefInQueue; 096 } 097 }); 098 } 099 100 @Override 101 public void init(Map<String, Object> params) { 102 super.init(params); 103 try { 104 if (MapUtils.isNotEmpty(params)) { 105 Object master = params.get(HMaster.MASTER); 106 if (master != null && master instanceof Server) { 107 conn = ((Server) master).getConnection(); 108 shareConn = true; 109 } 110 } 111 if (conn == null) { 112 conn = ConnectionFactory.createConnection(getConf()); 113 } 114 this.rqs = ReplicationStorageFactory.getReplicationQueueStorage(conn, getConf()); 115 } catch (IOException e) { 116 LOG.error("Error while configuring " + this.getClass().getName(), e); 117 } 118 } 119 120 @Override 121 public void stop(String why) { 122 if (this.stopped) { 123 return; 124 } 125 this.stopped = true; 126 if (!shareConn && this.conn != null) { 127 LOG.info("Stopping " + this.conn); 128 IOUtils.closeQuietly(conn); 129 } 130 } 131 132 @Override 133 public boolean isStopped() { 134 return this.stopped; 135 } 136 137 @Override 138 public boolean isFileDeletable(FileStatus fStat) { 139 Set<String> hfileRefsFromQueue; 140 // all members of this class are null if replication is disabled, 141 // so do not stop from deleting the file 142 if (getConf() == null) { 143 return true; 144 } 145 146 try { 147 hfileRefsFromQueue = rqs.getAllHFileRefs(); 148 } catch (ReplicationException e) { 149 LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable " 150 + "file for " + fStat.getPath()); 151 return false; 152 } 153 return !hfileRefsFromQueue.contains(fStat.getPath().getName()); 154 } 155}