001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.master; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.Set; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.fs.FileStatus; 025import org.apache.hadoop.hbase.HBaseInterfaceAudience; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; 028import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; 029import org.apache.hadoop.hbase.replication.ReplicationException; 030import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 031import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 032import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 038import org.apache.hbase.thirdparty.com.google.common.base.Predicate; 039import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 040 041/** 042 * Implementation of a file cleaner that checks if a hfile is still scheduled for replication before 043 * deleting it from hfile archive directory. 044 */ 045@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 046public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate { 047 private static final Logger LOG = LoggerFactory.getLogger(ReplicationHFileCleaner.class); 048 private ZKWatcher zkw; 049 private ReplicationQueueStorage rqs; 050 private boolean stopped = false; 051 052 @Override 053 public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) { 054 // all members of this class are null if replication is disabled, 055 // so we cannot filter the files 056 if (this.getConf() == null) { 057 return files; 058 } 059 060 final Set<String> hfileRefs; 061 try { 062 // The concurrently created new hfile entries in ZK may not be included in the return list, 063 // but they won't be deleted because they're not in the checking set. 064 hfileRefs = rqs.getAllHFileRefs(); 065 } catch (ReplicationException e) { 066 LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable files"); 067 return Collections.emptyList(); 068 } 069 return Iterables.filter(files, new Predicate<FileStatus>() { 070 @Override 071 public boolean apply(FileStatus file) { 072 // just for overriding the findbugs NP warnings, as the parameter is marked as Nullable in 073 // the guava Predicate. 074 if (file == null) { 075 return false; 076 } 077 String hfile = file.getPath().getName(); 078 boolean foundHFileRefInQueue = hfileRefs.contains(hfile); 079 if (LOG.isDebugEnabled()) { 080 if (foundHFileRefInQueue) { 081 LOG.debug("Found hfile reference in ZK, keeping: " + hfile); 082 } else { 083 LOG.debug("Did not find hfile reference in ZK, deleting: " + hfile); 084 } 085 } 086 return !foundHFileRefInQueue; 087 } 088 }); 089 } 090 091 @Override 092 public void setConf(Configuration config) { 093 // If either replication or replication of bulk load hfiles is disabled, keep all members null 094 if (!(config.getBoolean( 095 HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, 096 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT))) { 097 LOG.warn(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY 098 + " is not enabled. Better to remove " 099 + ReplicationHFileCleaner.class + " from " + HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS 100 + " configuration."); 101 return; 102 } 103 // Make my own Configuration. Then I'll have my own connection to zk that 104 // I can close myself when time comes. 105 Configuration conf = new Configuration(config); 106 try { 107 setConf(conf, new ZKWatcher(conf, "replicationHFileCleaner", null)); 108 } catch (IOException e) { 109 LOG.error("Error while configuring " + this.getClass().getName(), e); 110 } 111 } 112 113 @VisibleForTesting 114 public void setConf(Configuration conf, ZKWatcher zk) { 115 super.setConf(conf); 116 try { 117 initReplicationQueueStorage(conf, zk); 118 } catch (Exception e) { 119 LOG.error("Error while configuring " + this.getClass().getName(), e); 120 } 121 } 122 123 private void initReplicationQueueStorage(Configuration conf, ZKWatcher zk) { 124 this.zkw = zk; 125 this.rqs = ReplicationStorageFactory.getReplicationQueueStorage(zk, conf); 126 } 127 128 @Override 129 public void stop(String why) { 130 if (this.stopped) { 131 return; 132 } 133 this.stopped = true; 134 if (this.zkw != null) { 135 LOG.info("Stopping " + this.zkw); 136 this.zkw.close(); 137 } 138 } 139 140 @Override 141 public boolean isStopped() { 142 return this.stopped; 143 } 144 145 @Override 146 public boolean isFileDeletable(FileStatus fStat) { 147 Set<String> hfileRefsFromQueue; 148 // all members of this class are null if replication is disabled, 149 // so do not stop from deleting the file 150 if (getConf() == null) { 151 return true; 152 } 153 154 try { 155 hfileRefsFromQueue = rqs.getAllHFileRefs(); 156 } catch (ReplicationException e) { 157 LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable " 158 + "file for " + fStat.getPath()); 159 return false; 160 } 161 return !hfileRefsFromQueue.contains(fStat.getPath().getName()); 162 } 163}