1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.master;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.classification.InterfaceAudience;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.FileStatus;
26 import org.apache.hadoop.fs.Path;
27 import org.apache.hadoop.hbase.Abortable;
28 import org.apache.hadoop.hbase.HConstants;
29 import org.apache.hadoop.hbase.client.HConnectionManager;
30 import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
31 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
32 import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
33 import org.apache.hadoop.hbase.replication.ReplicationStateImpl;
34 import org.apache.hadoop.hbase.replication.ReplicationStateInterface;
35 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
36 import org.apache.zookeeper.KeeperException;
37
38 import java.io.IOException;
39 import java.util.HashSet;
40 import java.util.List;
41 import java.util.Set;
42
43
44
45
46
47 @InterfaceAudience.Private
48 public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abortable {
49 private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
50 private ZooKeeperWatcher zkw;
51 private ReplicationQueuesClient replicationQueues;
52 private ReplicationStateInterface replicationState;
53 private final Set<String> hlogs = new HashSet<String>();
54 private boolean stopped = false;
55 private boolean aborted;
56
57
58 @Override
59 public boolean isLogDeletable(FileStatus fStat) {
60
61 try {
62 if (!replicationState.getState()) {
63 return false;
64 }
65 } catch (KeeperException e) {
66 abort("Cannot get the state of replication", e);
67 return false;
68 }
69
70
71
72 if (this.getConf() == null) {
73 return true;
74 }
75 String log = fStat.getPath().getName();
76
77
78 if (this.hlogs.contains(log)) {
79 return false;
80 }
81
82
83
84
85 return !refreshHLogsAndSearch(log);
86 }
87
88
89
90
91
92
93
94
95 private boolean refreshHLogsAndSearch(String searchedLog) {
96 this.hlogs.clear();
97 final boolean lookForLog = searchedLog != null;
98 List<String> rss = replicationQueues.getListOfReplicators();
99 if (rss == null) {
100 LOG.debug("Didn't find any region server that replicates, deleting: " +
101 searchedLog);
102 return false;
103 }
104 for (String rs: rss) {
105 List<String> listOfPeers = replicationQueues.getAllQueues(rs);
106
107 if (listOfPeers == null) {
108 continue;
109 }
110 for (String id : listOfPeers) {
111 List<String> peersHlogs = replicationQueues.getLogsInQueue(rs, id);
112 if (peersHlogs != null) {
113 this.hlogs.addAll(peersHlogs);
114 }
115
116 if(lookForLog && this.hlogs.contains(searchedLog)) {
117 LOG.debug("Found log in ZK, keeping: " + searchedLog);
118 return true;
119 }
120 }
121 }
122 LOG.debug("Didn't find this log in ZK, deleting: " + searchedLog);
123 return false;
124 }
125
126 @Override
127 public void setConf(Configuration config) {
128
129 if (!config.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
130 return;
131 }
132
133
134 Configuration conf = new Configuration(config);
135 super.setConf(conf);
136 try {
137 this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
138 this.replicationQueues = new ReplicationQueuesClientZKImpl(zkw, conf, this);
139 this.replicationState = new ReplicationStateImpl(zkw, conf, this);
140 this.replicationState.init();
141 } catch (KeeperException e) {
142 LOG.error("Error while configuring " + this.getClass().getName(), e);
143 } catch (IOException e) {
144 LOG.error("Error while configuring " + this.getClass().getName(), e);
145 }
146 refreshHLogsAndSearch(null);
147 }
148
149
150 @Override
151 public void stop(String why) {
152 if (this.stopped) return;
153 this.stopped = true;
154 if (this.zkw != null) {
155 LOG.info("Stopping " + this.zkw);
156 this.zkw.close();
157 }
158 if (this.replicationState != null) {
159 LOG.info("Stopping " + this.replicationState);
160 try {
161 this.replicationState.close();
162 } catch (IOException e) {
163 LOG.error("Error while stopping " + this.replicationState, e);
164 }
165 }
166
167 HConnectionManager.deleteConnection(this.getConf());
168 }
169
170 @Override
171 public boolean isStopped() {
172 return this.stopped;
173 }
174
175 @Override
176 public void abort(String why, Throwable e) {
177 LOG.warn("Aborting ReplicationLogCleaner because " + why, e);
178 this.aborted = true;
179 stop(why);
180 }
181
182 @Override
183 public boolean isAborted() {
184 return this.aborted;
185 }
186 }