001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication.master; 020 021import java.io.IOException; 022import java.util.Collections; 023import java.util.Set; 024 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileStatus; 027import org.apache.hadoop.hbase.Abortable; 028import org.apache.hadoop.hbase.HBaseInterfaceAudience; 029import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; 030import org.apache.hadoop.hbase.replication.ReplicationFactory; 031import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; 032import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; 033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 034import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.apache.zookeeper.KeeperException; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 041import org.apache.hbase.thirdparty.com.google.common.base.Predicate; 042import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 043 044/** 045 * Implementation of a log cleaner that checks if a log is still scheduled for 046 * replication before deleting it when its TTL is over. 047 */ 048@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 049public class ReplicationLogCleaner extends BaseLogCleanerDelegate { 050 private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class); 051 private ZKWatcher zkw; 052 private ReplicationQueuesClient replicationQueues; 053 private boolean stopped = false; 054 private Set<String> wals; 055 private long readZKTimestamp = 0; 056 057 @Override 058 public void preClean() { 059 readZKTimestamp = EnvironmentEdgeManager.currentTime(); 060 try { 061 // The concurrently created new WALs may not be included in the return list, 062 // but they won't be deleted because they're not in the checking set. 063 wals = replicationQueues.getAllWALs(); 064 } catch (KeeperException e) { 065 LOG.warn("Failed to read zookeeper, skipping checking deletable files"); 066 wals = null; 067 } 068 } 069 070 @Override 071 public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) { 072 // all members of this class are null if replication is disabled, 073 // so we cannot filter the files 074 if (this.getConf() == null) { 075 return files; 076 } 077 078 if (wals == null) { 079 return Collections.emptyList(); 080 } 081 return Iterables.filter(files, new Predicate<FileStatus>() { 082 @Override 083 public boolean apply(FileStatus file) { 084 String wal = file.getPath().getName(); 085 boolean logInReplicationQueue = wals.contains(wal); 086 if (logInReplicationQueue) { 087 LOG.debug("Found up in ZooKeeper, NOT deleting={}", wal); 088 } 089 return !logInReplicationQueue && (file.getModificationTime() < readZKTimestamp); 090 } 091 }); 092 } 093 094 @Override 095 public void setConf(Configuration config) { 096 // Make my own Configuration. Then I'll have my own connection to zk that 097 // I can close myself when comes time. 098 Configuration conf = new Configuration(config); 099 try { 100 setConf(conf, new ZKWatcher(conf, "replicationLogCleaner", null)); 101 } catch (IOException e) { 102 LOG.error("Error while configuring " + this.getClass().getName(), e); 103 } 104 } 105 106 @VisibleForTesting 107 public void setConf(Configuration conf, ZKWatcher zk) { 108 super.setConf(conf); 109 try { 110 this.zkw = zk; 111 this.replicationQueues = ReplicationFactory.getReplicationQueuesClient( 112 new ReplicationQueuesClientArguments(conf, new WarnOnlyAbortable(), zkw)); 113 this.replicationQueues.init(); 114 } catch (Exception e) { 115 LOG.error("Error while configuring " + this.getClass().getName(), e); 116 } 117 } 118 119 @VisibleForTesting 120 public void setConf(Configuration conf, ZKWatcher zk, 121 ReplicationQueuesClient replicationQueuesClient) { 122 super.setConf(conf); 123 this.zkw = zk; 124 this.replicationQueues = replicationQueuesClient; 125 } 126 127 @Override 128 public void stop(String why) { 129 if (this.stopped) return; 130 this.stopped = true; 131 if (this.zkw != null) { 132 LOG.info("Stopping " + this.zkw); 133 this.zkw.close(); 134 } 135 } 136 137 @Override 138 public boolean isStopped() { 139 return this.stopped; 140 } 141 142 public static class WarnOnlyAbortable implements Abortable { 143 144 @Override 145 public void abort(String why, Throwable e) { 146 LOG.warn("ReplicationLogCleaner received abort, ignoring. Reason: " + why); 147 LOG.debug(e.toString(), e); 148 } 149 150 @Override 151 public boolean isAborted() { 152 return false; 153 } 154 } 155}