001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.master; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.Set; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.fs.FileStatus; 025import org.apache.hadoop.hbase.HBaseInterfaceAudience; 026import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; 027import org.apache.hadoop.hbase.replication.ReplicationException; 028import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 029import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import org.apache.hbase.thirdparty.com.google.common.base.Predicate; 037import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 038 039/** 040 * Implementation of a log cleaner that checks if a log is still scheduled for 041 * replication before deleting it when its TTL is over. 042 */ 043@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 044public class ReplicationLogCleaner extends BaseLogCleanerDelegate { 045 private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class); 046 private ZKWatcher zkw; 047 private ReplicationQueueStorage queueStorage; 048 private boolean stopped = false; 049 private Set<String> wals; 050 private long readZKTimestamp = 0; 051 052 @Override 053 public void preClean() { 054 readZKTimestamp = EnvironmentEdgeManager.currentTime(); 055 try { 056 // The concurrently created new WALs may not be included in the return list, 057 // but they won't be deleted because they're not in the checking set. 058 wals = queueStorage.getAllWALs(); 059 } catch (ReplicationException e) { 060 LOG.warn("Failed to read zookeeper, skipping checking deletable files"); 061 wals = null; 062 } 063 } 064 065 @Override 066 public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) { 067 // all members of this class are null if replication is disabled, 068 // so we cannot filter the files 069 if (this.getConf() == null) { 070 return files; 071 } 072 073 if (wals == null) { 074 return Collections.emptyList(); 075 } 076 return Iterables.filter(files, new Predicate<FileStatus>() { 077 @Override 078 public boolean apply(FileStatus file) { 079 // just for overriding the findbugs NP warnings, as the parameter is marked as Nullable in 080 // the guava Predicate. 081 if (file == null) { 082 return false; 083 } 084 String wal = file.getPath().getName(); 085 boolean logInReplicationQueue = wals.contains(wal); 086 if (logInReplicationQueue) { 087 LOG.debug("Found up in ZooKeeper, NOT deleting={}", wal); 088 } 089 return !logInReplicationQueue && (file.getModificationTime() < readZKTimestamp); 090 } 091 }); 092 } 093 094 @Override 095 public void setConf(Configuration config) { 096 // Make my own Configuration. Then I'll have my own connection to zk that 097 // I can close myself when comes time. 098 Configuration conf = new Configuration(config); 099 try { 100 setConf(conf, new ZKWatcher(conf, "replicationLogCleaner", null)); 101 } catch (IOException e) { 102 LOG.error("Error while configuring " + this.getClass().getName(), e); 103 } 104 } 105 106 @InterfaceAudience.Private 107 public void setConf(Configuration conf, ZKWatcher zk) { 108 super.setConf(conf); 109 try { 110 this.zkw = zk; 111 this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zk, conf); 112 } catch (Exception e) { 113 LOG.error("Error while configuring " + this.getClass().getName(), e); 114 } 115 } 116 117 @Override 118 public void stop(String why) { 119 if (this.stopped) return; 120 this.stopped = true; 121 if (this.zkw != null) { 122 LOG.info("Stopping " + this.zkw); 123 this.zkw.close(); 124 } 125 } 126 127 @Override 128 public boolean isStopped() { 129 return this.stopped; 130 } 131}