1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.master;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.classification.InterfaceAudience;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.Path;
26 import org.apache.hadoop.hbase.Abortable;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.client.HConnectionManager;
29 import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
30 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
31 import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
32 import org.apache.hadoop.hbase.replication.ReplicationStateImpl;
33 import org.apache.hadoop.hbase.replication.ReplicationStateInterface;
34 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
35 import org.apache.zookeeper.KeeperException;
36
37 import java.io.IOException;
38 import java.util.HashSet;
39 import java.util.List;
40 import java.util.Set;
41
42
43
44
45
46 @InterfaceAudience.Private
47 public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abortable {
48 private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
49 private ZooKeeperWatcher zkw;
50 private ReplicationQueuesClient replicationQueues;
51 private ReplicationStateInterface replicationState;
52 private final Set<String> hlogs = new HashSet<String>();
53 private boolean stopped = false;
54 private boolean aborted;
55
56
57 @Override
58 public boolean isLogDeletable(Path filePath) {
59
60 try {
61 if (!replicationState.getState()) {
62 return false;
63 }
64 } catch (KeeperException e) {
65 abort("Cannot get the state of replication", e);
66 return false;
67 }
68
69
70
71 if (this.getConf() == null) {
72 return true;
73 }
74 String log = filePath.getName();
75
76
77 if (this.hlogs.contains(log)) {
78 return false;
79 }
80
81
82
83
84 return !refreshHLogsAndSearch(log);
85 }
86
87
88
89
90
91
92
93
94 private boolean refreshHLogsAndSearch(String searchedLog) {
95 this.hlogs.clear();
96 final boolean lookForLog = searchedLog != null;
97 List<String> rss = replicationQueues.getListOfReplicators();
98 if (rss == null) {
99 LOG.debug("Didn't find any region server that replicates, deleting: " +
100 searchedLog);
101 return false;
102 }
103 for (String rs: rss) {
104 List<String> listOfPeers = replicationQueues.getAllQueues(rs);
105
106 if (listOfPeers == null) {
107 continue;
108 }
109 for (String id : listOfPeers) {
110 List<String> peersHlogs = replicationQueues.getLogsInQueue(rs, id);
111 if (peersHlogs != null) {
112 this.hlogs.addAll(peersHlogs);
113 }
114
115 if(lookForLog && this.hlogs.contains(searchedLog)) {
116 LOG.debug("Found log in ZK, keeping: " + searchedLog);
117 return true;
118 }
119 }
120 }
121 LOG.debug("Didn't find this log in ZK, deleting: " + searchedLog);
122 return false;
123 }
124
125 @Override
126 public void setConf(Configuration config) {
127
128 if (!config.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
129 return;
130 }
131
132
133 Configuration conf = new Configuration(config);
134 super.setConf(conf);
135 try {
136 this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
137 this.replicationQueues = new ReplicationQueuesClientZKImpl(zkw, conf, this);
138 this.replicationState = new ReplicationStateImpl(zkw, conf, this);
139 } catch (KeeperException e) {
140 LOG.error("Error while configuring " + this.getClass().getName(), e);
141 } catch (IOException e) {
142 LOG.error("Error while configuring " + this.getClass().getName(), e);
143 }
144 refreshHLogsAndSearch(null);
145 }
146
147
148 @Override
149 public void stop(String why) {
150 if (this.stopped) return;
151 this.stopped = true;
152 if (this.zkw != null) {
153 LOG.info("Stopping " + this.zkw);
154 this.zkw.close();
155 }
156 if (this.replicationState != null) {
157 LOG.info("Stopping " + this.replicationState);
158 try {
159 this.replicationState.close();
160 } catch (IOException e) {
161 LOG.error("Error while stopping " + this.replicationState, e);
162 }
163 }
164
165 HConnectionManager.deleteConnection(this.getConf());
166 }
167
168 @Override
169 public boolean isStopped() {
170 return this.stopped;
171 }
172
173 @Override
174 public void abort(String why, Throwable e) {
175 LOG.warn("Aborting ReplicationLogCleaner because " + why, e);
176 this.aborted = true;
177 stop(why);
178 }
179
180 @Override
181 public boolean isAborted() {
182 return this.aborted;
183 }
184 }