1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.backup.example;
19
20 import java.io.IOException;
21 import java.util.List;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
28 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
29 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
30 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
31 import org.apache.zookeeper.KeeperException;
32
33
34
35
36
37
38
39
40 @InterfaceAudience.Private
41 public class TableHFileArchiveTracker extends ZooKeeperListener {
42 private static final Log LOG = LogFactory.getLog(TableHFileArchiveTracker.class);
43 public static final String HFILE_ARCHIVE_ZNODE_PARENT = "hfilearchive";
44 private HFileArchiveTableMonitor monitor;
45 private String archiveHFileZNode;
46 private boolean stopped = false;
47
48 private TableHFileArchiveTracker(ZooKeeperWatcher watcher, HFileArchiveTableMonitor monitor) {
49 super(watcher);
50 watcher.registerListener(this);
51 this.monitor = monitor;
52 this.archiveHFileZNode = ZKTableArchiveClient.getArchiveZNode(watcher.getConfiguration(),
53 watcher);
54 }
55
56
57
58
59
60 public void start() throws KeeperException {
61
62 LOG.debug("Starting hfile archive tracker...");
63 this.checkEnabledAndUpdate();
64 LOG.debug("Finished starting hfile archive tracker!");
65 }
66
67 @Override
68 public void nodeCreated(String path) {
69
70 if (!path.startsWith(archiveHFileZNode)) return;
71
72 LOG.debug("Archive node: " + path + " created");
73
74 String table = path.substring(archiveHFileZNode.length());
75
76
77 if (table.length() == 0) {
78
79 checkEnabledAndUpdate();
80 return;
81 }
82
83 try {
84 addAndReWatchTable(path);
85 } catch (KeeperException e) {
86 LOG.warn("Couldn't read zookeeper data for table for path:" + path
87 + ", not preserving a table.", e);
88 }
89 }
90
91 @Override
92 public void nodeChildrenChanged(String path) {
93 if (!path.startsWith(archiveHFileZNode)) return;
94
95 LOG.debug("Archive node: " + path + " children changed.");
96
97 try {
98 updateWatchedTables();
99 } catch (KeeperException e) {
100 LOG.error("Failed to update tables to archive", e);
101 }
102 }
103
104
105
106
107
108
109
110
111
112 private void addAndReWatchTable(String tableZnode) throws KeeperException {
113 getMonitor().addTable(ZKUtil.getNodeName(tableZnode));
114
115
116 if (!ZKUtil.watchAndCheckExists(watcher, tableZnode)) {
117 safeStopTrackingTable(tableZnode);
118 }
119 }
120
121
122
123
124
125
126
127 private void safeStopTrackingTable(String tableZnode) throws KeeperException {
128 getMonitor().removeTable(ZKUtil.getNodeName(tableZnode));
129
130 if (ZKUtil.checkExists(watcher, tableZnode) >= 0) {
131 addAndReWatchTable(tableZnode);
132 }
133 }
134
135 @Override
136 public void nodeDeleted(String path) {
137 if (!path.startsWith(archiveHFileZNode)) return;
138
139 LOG.debug("Archive node: " + path + " deleted");
140 String table = path.substring(archiveHFileZNode.length());
141
142 if (table.length() == 0) {
143
144
145 clearTables();
146
147
148
149 checkEnabledAndUpdate();
150 return;
151 }
152
153
154
155
156
157
158 getMonitor().removeTable(ZKUtil.getNodeName(path));
159 }
160
161
162
163
164
165 private void checkEnabledAndUpdate() {
166 try {
167 if (ZKUtil.watchAndCheckExists(watcher, archiveHFileZNode)) {
168 LOG.debug(archiveHFileZNode + " znode does exist, checking for tables to archive");
169
170
171
172
173 updateWatchedTables();
174 } else {
175 LOG.debug("Archiving not currently enabled, waiting");
176 }
177 } catch (KeeperException e) {
178 LOG.warn("Failed to watch for archiving znode", e);
179 }
180 }
181
182
183
184
185
186
187 private void updateWatchedTables() throws KeeperException {
188
189 LOG.debug("Updating watches on tables to archive.");
190
191 List<String> tables = ZKUtil.listChildrenAndWatchThem(watcher, archiveHFileZNode);
192 LOG.debug("Starting archive for tables:" + tables);
193
194 if (tables != null && tables.size() > 0) {
195 getMonitor().setArchiveTables(tables);
196 } else {
197 LOG.debug("No tables to archive.");
198
199 clearTables();
200 }
201 }
202
203
204
205
206
207
208 private void clearTables() {
209 getMonitor().clearArchive();
210 }
211
212
213
214
215
216
217 public boolean keepHFiles(String tableName) {
218 return getMonitor().shouldArchiveTable(tableName);
219 }
220
221
222
223
224 public final HFileArchiveTableMonitor getMonitor() {
225 return this.monitor;
226 }
227
228
229
230
231
232
233
234
235
236 public static TableHFileArchiveTracker create(Configuration conf)
237 throws ZooKeeperConnectionException, IOException {
238 ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "hfileArchiveCleaner", null);
239 return create(zkw, new HFileArchiveTableMonitor());
240 }
241
242
243
244
245
246
247
248
249
250 private static TableHFileArchiveTracker create(ZooKeeperWatcher zkw,
251 HFileArchiveTableMonitor monitor) {
252 return new TableHFileArchiveTracker(zkw, monitor);
253 }
254
255 public ZooKeeperWatcher getZooKeeperWatcher() {
256 return this.watcher;
257 }
258
259
260
261
262 public void stop() {
263 if (this.stopped) return;
264 this.stopped = true;
265 this.watcher.close();
266 }
267 }