1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.master;
20
21 import com.google.common.annotations.VisibleForTesting;
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.hbase.classification.InterfaceAudience;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.fs.FileStatus;
27 import org.apache.hadoop.hbase.Abortable;
28 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
31 import org.apache.hadoop.hbase.replication.ReplicationException;
32 import org.apache.hadoop.hbase.replication.ReplicationFactory;
33 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
34 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
35 import java.io.IOException;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.Set;
39
40 import com.google.common.base.Predicate;
41 import com.google.common.collect.ImmutableSet;
42 import com.google.common.collect.Iterables;
43 import com.google.common.collect.Sets;
44 import org.apache.zookeeper.KeeperException;
45
46
47
48
49
50 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
51 public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
52 private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
53 private ZooKeeperWatcher zkw;
54 private ReplicationQueuesClient replicationQueues;
55 private boolean stopped = false;
56
57
58 @Override
59 public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
60
61
62 if (this.getConf() == null) {
63 return files;
64 }
65
66 final Set<String> wals;
67 try {
68
69
70 wals = loadWALsFromQueues();
71 } catch (KeeperException e) {
72 LOG.warn("Failed to read zookeeper, skipping checking deletable files");
73 return Collections.emptyList();
74 }
75 return Iterables.filter(files, new Predicate<FileStatus>() {
76 @Override
77 public boolean apply(FileStatus file) {
78 String wal = file.getPath().getName();
79 boolean logInReplicationQueue = wals.contains(wal);
80 if (LOG.isDebugEnabled()) {
81 if (logInReplicationQueue) {
82 LOG.debug("Found log in ZK, keeping: " + wal);
83 } else {
84 LOG.debug("Didn't find this log in ZK, deleting: " + wal);
85 }
86 }
87 return !logInReplicationQueue;
88 }});
89 }
90
91
92
93
94
95
96
97 private Set<String> loadWALsFromQueues() throws KeeperException {
98 for (int retry = 0; ; retry++) {
99 int v0 = replicationQueues.getQueuesZNodeCversion();
100 List<String> rss = replicationQueues.getListOfReplicators();
101 if (rss == null) {
102 LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
103 return ImmutableSet.of();
104 }
105 Set<String> wals = Sets.newHashSet();
106 for (String rs : rss) {
107 List<String> listOfPeers = replicationQueues.getAllQueues(rs);
108
109 if (listOfPeers == null) {
110 continue;
111 }
112 for (String id : listOfPeers) {
113 List<String> peersWals = replicationQueues.getLogsInQueue(rs, id);
114 if (peersWals != null) {
115 wals.addAll(peersWals);
116 }
117 }
118 }
119 int v1 = replicationQueues.getQueuesZNodeCversion();
120 if (v0 == v1) {
121 return wals;
122 }
123 LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
124 v0, v1, retry));
125 }
126 }
127
128 @Override
129 public void setConf(Configuration config) {
130
131 if (!config.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
132 HConstants.REPLICATION_ENABLE_DEFAULT)) {
133 LOG.warn("Not configured - allowing all wals to be deleted");
134 return;
135 }
136
137
138 Configuration conf = new Configuration(config);
139 try {
140 setConf(conf, new ZooKeeperWatcher(conf, "replicationLogCleaner", null));
141 } catch (IOException e) {
142 LOG.error("Error while configuring " + this.getClass().getName(), e);
143 }
144 }
145
146 @VisibleForTesting
147 public void setConf(Configuration conf, ZooKeeperWatcher zk) {
148 super.setConf(conf);
149 try {
150 this.zkw = zk;
151 this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(zkw, conf,
152 new WarnOnlyAbortable());
153 this.replicationQueues.init();
154 } catch (ReplicationException e) {
155 LOG.error("Error while configuring " + this.getClass().getName(), e);
156 }
157 }
158
159 @Override
160 public void stop(String why) {
161 if (this.stopped) return;
162 this.stopped = true;
163 if (this.zkw != null) {
164 LOG.info("Stopping " + this.zkw);
165 this.zkw.close();
166 }
167 }
168
169 @Override
170 public boolean isStopped() {
171 return this.stopped;
172 }
173
174 private static class WarnOnlyAbortable implements Abortable {
175
176 @Override
177 public void abort(String why, Throwable e) {
178 LOG.warn("ReplicationLogCleaner received abort, ignoring. Reason: " + why);
179 if (LOG.isDebugEnabled()) {
180 LOG.debug(e);
181 }
182 }
183
184 @Override
185 public boolean isAborted() {
186 return false;
187 }
188 }
189 }