001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.master; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.Set; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.fs.FileStatus; 025import org.apache.hadoop.hbase.HBaseInterfaceAudience; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; 028import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; 029import org.apache.hadoop.hbase.replication.ReplicationException; 030import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 031import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 032import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.com.google.common.base.Predicate; 038import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 039 040/** 041 * Implementation of a file cleaner that checks if a hfile is still scheduled for replication before 042 * deleting it from hfile archive directory. 043 */ 044@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 045public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate { 046 private static final Logger LOG = LoggerFactory.getLogger(ReplicationHFileCleaner.class); 047 private ZKWatcher zkw; 048 private ReplicationQueueStorage rqs; 049 private boolean stopped = false; 050 051 @Override 052 public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) { 053 // all members of this class are null if replication is disabled, 054 // so we cannot filter the files 055 if (this.getConf() == null) { 056 return files; 057 } 058 059 final Set<String> hfileRefs; 060 try { 061 // The concurrently created new hfile entries in ZK may not be included in the return list, 062 // but they won't be deleted because they're not in the checking set. 063 hfileRefs = rqs.getAllHFileRefs(); 064 } catch (ReplicationException e) { 065 LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable files"); 066 return Collections.emptyList(); 067 } 068 return Iterables.filter(files, new Predicate<FileStatus>() { 069 @Override 070 public boolean apply(FileStatus file) { 071 // just for overriding the findbugs NP warnings, as the parameter is marked as Nullable in 072 // the guava Predicate. 073 if (file == null) { 074 return false; 075 } 076 String hfile = file.getPath().getName(); 077 boolean foundHFileRefInQueue = hfileRefs.contains(hfile); 078 if (LOG.isDebugEnabled()) { 079 if (foundHFileRefInQueue) { 080 LOG.debug("Found hfile reference in ZK, keeping: " + hfile); 081 } else { 082 LOG.debug("Did not find hfile reference in ZK, deleting: " + hfile); 083 } 084 } 085 return !foundHFileRefInQueue; 086 } 087 }); 088 } 089 090 @Override 091 public void setConf(Configuration config) { 092 // If either replication or replication of bulk load hfiles is disabled, keep all members null 093 if ( 094 !(config.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, 095 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) 096 ) { 097 LOG.warn(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY + " is not enabled. Better to remove " 098 + ReplicationHFileCleaner.class + " from " + HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS 099 + " configuration."); 100 return; 101 } 102 // Make my own Configuration. Then I'll have my own connection to zk that 103 // I can close myself when time comes. 104 Configuration conf = new Configuration(config); 105 try { 106 setConf(conf, new ZKWatcher(conf, "replicationHFileCleaner", null)); 107 } catch (IOException e) { 108 LOG.error("Error while configuring " + this.getClass().getName(), e); 109 } 110 } 111 112 @InterfaceAudience.Private 113 public void setConf(Configuration conf, ZKWatcher zk) { 114 super.setConf(conf); 115 try { 116 initReplicationQueueStorage(conf, zk); 117 } catch (Exception e) { 118 LOG.error("Error while configuring " + this.getClass().getName(), e); 119 } 120 } 121 122 private void initReplicationQueueStorage(Configuration conf, ZKWatcher zk) { 123 this.zkw = zk; 124 this.rqs = ReplicationStorageFactory.getReplicationQueueStorage(zk, conf); 125 } 126 127 @Override 128 public void stop(String why) { 129 if (this.stopped) { 130 return; 131 } 132 this.stopped = true; 133 if (this.zkw != null) { 134 LOG.info("Stopping " + this.zkw); 135 this.zkw.close(); 136 } 137 } 138 139 @Override 140 public boolean isStopped() { 141 return this.stopped; 142 } 143 144 @Override 145 public boolean isFileDeletable(FileStatus fStat) { 146 Set<String> hfileRefsFromQueue; 147 // all members of this class are null if replication is disabled, 148 // so do not stop from deleting the file 149 if (getConf() == null) { 150 return true; 151 } 152 153 try { 154 hfileRefsFromQueue = rqs.getAllHFileRefs(); 155 } catch (ReplicationException e) { 156 LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable " 157 + "file for " + fStat.getPath()); 158 return false; 159 } 160 return !hfileRefsFromQueue.contains(fStat.getPath().getName()); 161 } 162}