001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.master; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.Set; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.fs.FileStatus; 025import org.apache.hadoop.hbase.HBaseInterfaceAudience; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; 028import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; 029import org.apache.hadoop.hbase.replication.ReplicationException; 030import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 031import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 032import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 038import org.apache.hbase.thirdparty.com.google.common.base.Predicate; 039import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 040 041/** 042 * Implementation of a file cleaner that checks if a hfile is still scheduled for replication before 043 * deleting it from hfile archive directory. 044 */ 045@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 046public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate { 047 private static final Logger LOG = LoggerFactory.getLogger(ReplicationHFileCleaner.class); 048 private ZKWatcher zkw; 049 private ReplicationQueueStorage rqs; 050 private boolean stopped = false; 051 052 @Override 053 public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) { 054 // all members of this class are null if replication is disabled, 055 // so we cannot filter the files 056 if (this.getConf() == null) { 057 return files; 058 } 059 060 final Set<String> hfileRefs; 061 try { 062 // The concurrently created new hfile entries in ZK may not be included in the return list, 063 // but they won't be deleted because they're not in the checking set. 064 hfileRefs = rqs.getAllHFileRefs(); 065 } catch (ReplicationException e) { 066 LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable files"); 067 return Collections.emptyList(); 068 } 069 return Iterables.filter(files, new Predicate<FileStatus>() { 070 @Override 071 public boolean apply(FileStatus file) { 072 String hfile = file.getPath().getName(); 073 boolean foundHFileRefInQueue = hfileRefs.contains(hfile); 074 if (LOG.isDebugEnabled()) { 075 if (foundHFileRefInQueue) { 076 LOG.debug("Found hfile reference in ZK, keeping: " + hfile); 077 } else { 078 LOG.debug("Did not find hfile reference in ZK, deleting: " + hfile); 079 } 080 } 081 return !foundHFileRefInQueue; 082 } 083 }); 084 } 085 086 @Override 087 public void setConf(Configuration config) { 088 // If either replication or replication of bulk load hfiles is disabled, keep all members null 089 if (!(config.getBoolean( 090 HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, 091 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT))) { 092 LOG.warn(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY 093 + " is not enabled. Better to remove " 094 + ReplicationHFileCleaner.class + " from " + HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS 095 + " configuration."); 096 return; 097 } 098 // Make my own Configuration. Then I'll have my own connection to zk that 099 // I can close myself when time comes. 100 Configuration conf = new Configuration(config); 101 try { 102 setConf(conf, new ZKWatcher(conf, "replicationHFileCleaner", null)); 103 } catch (IOException e) { 104 LOG.error("Error while configuring " + this.getClass().getName(), e); 105 } 106 } 107 108 @VisibleForTesting 109 public void setConf(Configuration conf, ZKWatcher zk) { 110 super.setConf(conf); 111 try { 112 initReplicationQueueStorage(conf, zk); 113 } catch (Exception e) { 114 LOG.error("Error while configuring " + this.getClass().getName(), e); 115 } 116 } 117 118 private void initReplicationQueueStorage(Configuration conf, ZKWatcher zk) { 119 this.zkw = zk; 120 this.rqs = ReplicationStorageFactory.getReplicationQueueStorage(zk, conf); 121 } 122 123 @Override 124 public void stop(String why) { 125 if (this.stopped) { 126 return; 127 } 128 this.stopped = true; 129 if (this.zkw != null) { 130 LOG.info("Stopping " + this.zkw); 131 this.zkw.close(); 132 } 133 } 134 135 @Override 136 public boolean isStopped() { 137 return this.stopped; 138 } 139 140 @Override 141 public boolean isFileDeletable(FileStatus fStat) { 142 Set<String> hfileRefsFromQueue; 143 // all members of this class are null if replication is disabled, 144 // so do not stop from deleting the file 145 if (getConf() == null) { 146 return true; 147 } 148 149 try { 150 hfileRefsFromQueue = rqs.getAllHFileRefs(); 151 } catch (ReplicationException e) { 152 LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable " 153 + "file for " + fStat.getPath()); 154 return false; 155 } 156 return !hfileRefsFromQueue.contains(fStat.getPath().getName()); 157 } 158}