001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license 003 * agreements. See the NOTICE file distributed with this work for additional information regarding 004 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the 005 * "License"); you may not use this file except in compliance with the License. You may obtain a 006 * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable 007 * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" 008 * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License 009 * for the specific language governing permissions and limitations under the License. 010 */ 011package org.apache.hadoop.hbase.replication.master; 012 013import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 014import org.apache.hbase.thirdparty.com.google.common.base.Predicate; 015import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 016import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 017import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 018 019import java.io.IOException; 020import java.util.Collections; 021import java.util.List; 022import java.util.Set; 023 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileStatus; 026import org.apache.hadoop.hbase.Abortable; 027import org.apache.hadoop.hbase.HBaseInterfaceAudience; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; 031import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; 032import org.apache.hadoop.hbase.replication.ReplicationFactory; 033import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; 034import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; 035import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 036import org.apache.zookeeper.KeeperException; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * Implementation of a file cleaner that checks if a hfile is still scheduled for replication before 042 * deleting it from hfile archive directory. 043 */ 044@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 045public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate { 046 private static final Logger LOG = LoggerFactory.getLogger(ReplicationHFileCleaner.class); 047 private ZKWatcher zkw; 048 private ReplicationQueuesClient rqc; 049 private boolean stopped = false; 050 051 @Override 052 public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) { 053 // all members of this class are null if replication is disabled, 054 // so we cannot filter the files 055 if (this.getConf() == null) { 056 return files; 057 } 058 059 final Set<String> hfileRefs; 060 try { 061 // The concurrently created new hfile entries in ZK may not be included in the return list, 062 // but they won't be deleted because they're not in the checking set. 063 hfileRefs = loadHFileRefsFromPeers(); 064 } catch (KeeperException e) { 065 LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable files"); 066 return Collections.emptyList(); 067 } 068 return Iterables.filter(files, new Predicate<FileStatus>() { 069 @Override 070 public boolean apply(FileStatus file) { 071 String hfile = file.getPath().getName(); 072 boolean foundHFileRefInQueue = hfileRefs.contains(hfile); 073 if (LOG.isDebugEnabled()) { 074 if (foundHFileRefInQueue) { 075 LOG.debug("Found hfile reference in ZK, keeping: " + hfile); 076 } else { 077 LOG.debug("Did not find hfile reference in ZK, deleting: " + hfile); 078 } 079 } 080 return !foundHFileRefInQueue; 081 } 082 }); 083 } 084 085 /** 086 * Load all hfile references in all replication queues from ZK. This method guarantees to return a 087 * snapshot which contains all hfile references in the zookeeper at the start of this call. 088 * However, some newly created hfile references during the call may not be included. 089 */ 090 private Set<String> loadHFileRefsFromPeers() throws KeeperException { 091 Set<String> hfileRefs = Sets.newHashSet(); 092 List<String> listOfPeers; 093 for (int retry = 0;; retry++) { 094 int v0 = rqc.getHFileRefsNodeChangeVersion(); 095 hfileRefs.clear(); 096 listOfPeers = rqc.getAllPeersFromHFileRefsQueue(); 097 if (listOfPeers == null) { 098 LOG.debug("Didn't find any peers with hfile references, won't prevent any deletions."); 099 return ImmutableSet.of(); 100 } 101 for (String id : listOfPeers) { 102 List<String> peerHFileRefs = rqc.getReplicableHFiles(id); 103 if (peerHFileRefs != null) { 104 hfileRefs.addAll(peerHFileRefs); 105 } 106 } 107 int v1 = rqc.getHFileRefsNodeChangeVersion(); 108 if (v0 == v1) { 109 return hfileRefs; 110 } 111 LOG.debug(String.format("Replication hfile references node cversion changed from " 112 + "%d to %d, retry = %d", v0, v1, retry)); 113 } 114 } 115 116 @Override 117 public void setConf(Configuration config) { 118 // If either replication or replication of bulk load hfiles is disabled, keep all members null 119 if (!(config.getBoolean( 120 HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, 121 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT))) { 122 LOG.warn(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY 123 + " is not enabled. Better to remove " 124 + ReplicationHFileCleaner.class + " from " + HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS 125 + " configuration."); 126 return; 127 } 128 // Make my own Configuration. Then I'll have my own connection to zk that 129 // I can close myself when time comes. 130 Configuration conf = new Configuration(config); 131 try { 132 setConf(conf, new ZKWatcher(conf, "replicationHFileCleaner", null)); 133 } catch (IOException e) { 134 LOG.error("Error while configuring " + this.getClass().getName(), e); 135 } 136 } 137 138 @VisibleForTesting 139 public void setConf(Configuration conf, ZKWatcher zk) { 140 super.setConf(conf); 141 try { 142 initReplicationQueuesClient(conf, zk); 143 } catch (Exception e) { 144 LOG.error("Error while configuring " + this.getClass().getName(), e); 145 } 146 } 147 148 private void initReplicationQueuesClient(Configuration conf, ZKWatcher zk) 149 throws Exception { 150 this.zkw = zk; 151 this.rqc = ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments( 152 conf, new WarnOnlyAbortable(), zkw)); 153 } 154 155 @Override 156 public void stop(String why) { 157 if (this.stopped) { 158 return; 159 } 160 this.stopped = true; 161 if (this.zkw != null) { 162 LOG.info("Stopping " + this.zkw); 163 this.zkw.close(); 164 } 165 } 166 167 @Override 168 public boolean isStopped() { 169 return this.stopped; 170 } 171 172 @Override 173 public boolean isFileDeletable(FileStatus fStat) { 174 Set<String> hfileRefsFromQueue; 175 // all members of this class are null if replication is disabled, 176 // so do not stop from deleting the file 177 if (getConf() == null) { 178 return true; 179 } 180 181 try { 182 hfileRefsFromQueue = loadHFileRefsFromPeers(); 183 } catch (KeeperException e) { 184 LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable " 185 + "file for " + fStat.getPath()); 186 return false; 187 } 188 return !hfileRefsFromQueue.contains(fStat.getPath().getName()); 189 } 190 191 private static class WarnOnlyAbortable implements Abortable { 192 @Override 193 public void abort(String why, Throwable e) { 194 LOG.warn("ReplicationHFileCleaner received abort, ignoring. Reason: " + why); 195 LOG.debug(e.toString(), e); 196 } 197 198 @Override 199 public boolean isAborted() { 200 return false; 201 } 202 } 203}