1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.SortedMap;
24 import java.util.SortedSet;
25 import java.util.TreeMap;
26 import java.util.TreeSet;
27
28 import com.google.common.annotations.VisibleForTesting;
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.Abortable;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.exceptions.DeserializationException;
36 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
38 import org.apache.hadoop.hbase.util.Bytes;
39 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
40 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
41 import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
42 import org.apache.zookeeper.KeeperException;
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 @InterfaceAudience.Private
66 public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues {
67
68
69 private String myQueuesZnode;
70
71 public final static String RS_LOCK_ZNODE = "lock";
72
73 private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
74
75 public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf,
76 Abortable abortable) {
77 super(zk, conf, abortable);
78 }
79
80 @Override
81 public void init(String serverName) throws ReplicationException {
82 this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName);
83 try {
84 ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
85 } catch (KeeperException e) {
86 throw new ReplicationException("Could not initialize replication queues.", e);
87 }
88 }
89
90 @Override
91 public List<String> getListOfReplicators() throws ReplicationException {
92 try {
93 return super.getListOfReplicatorsZK();
94 } catch (KeeperException e) {
95 LOG.warn("getListOfReplicators() from ZK failed", e);
96 throw new ReplicationException("getListOfReplicators() from ZK failed", e);
97 }
98 }
99
100 @Override
101 public void removeQueue(String queueId) {
102 try {
103 ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.myQueuesZnode, queueId));
104 } catch (KeeperException e) {
105 this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", e);
106 }
107 }
108
109 @Override
110 public void addLog(String queueId, String filename) throws ReplicationException {
111 String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
112 znode = ZKUtil.joinZNode(znode, filename);
113 try {
114 ZKUtil.createWithParents(this.zookeeper, znode);
115 } catch (KeeperException e) {
116 throw new ReplicationException(
117 "Could not add log because znode could not be created. queueId=" + queueId
118 + ", filename=" + filename);
119 }
120 }
121
122 @Override
123 public void removeLog(String queueId, String filename) {
124 try {
125 String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
126 znode = ZKUtil.joinZNode(znode, filename);
127 ZKUtil.deleteNode(this.zookeeper, znode);
128 } catch (KeeperException e) {
129 this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename="
130 + filename + ")", e);
131 }
132 }
133
134 @Override
135 public void setLogPosition(String queueId, String filename, long position) {
136 try {
137 String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
138 znode = ZKUtil.joinZNode(znode, filename);
139
140 ZKUtil.setData(this.zookeeper, znode, ZKUtil.positionToByteArray(position));
141 } catch (KeeperException e) {
142 this.abortable.abort("Failed to write replication wal position (filename=" + filename
143 + ", position=" + position + ")", e);
144 }
145 }
146
147 @Override
148 public long getLogPosition(String queueId, String filename) throws ReplicationException {
149 String clusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
150 String znode = ZKUtil.joinZNode(clusterZnode, filename);
151 byte[] bytes = null;
152 try {
153 bytes = ZKUtil.getData(this.zookeeper, znode);
154 } catch (KeeperException e) {
155 throw new ReplicationException("Internal Error: could not get position in log for queueId="
156 + queueId + ", filename=" + filename, e);
157 } catch (InterruptedException e) {
158 Thread.currentThread().interrupt();
159 return 0;
160 }
161 try {
162 return ZKUtil.parseWALPositionFrom(bytes);
163 } catch (DeserializationException de) {
164 LOG.warn("Failed to parse WALPosition for queueId=" + queueId + " and wal=" + filename
165 + "znode content, continuing.");
166 }
167
168
169 return 0;
170 }
171
172 @Override
173 public boolean isThisOurZnode(String znode) {
174 return ZKUtil.joinZNode(this.queuesZNode, znode).equals(this.myQueuesZnode);
175 }
176
177 @Override
178 public SortedMap<String, SortedSet<String>> claimQueues(String regionserverZnode) {
179 SortedMap<String, SortedSet<String>> newQueues = new TreeMap<String, SortedSet<String>>();
180
181 if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
182 LOG.info("Atomically moving " + regionserverZnode + "'s WALs to my queue");
183 newQueues = copyQueuesFromRSUsingMulti(regionserverZnode);
184 } else {
185 LOG.info("Moving " + regionserverZnode + "'s wals to my queue");
186 if (!lockOtherRS(regionserverZnode)) {
187 return newQueues;
188 }
189 newQueues = copyQueuesFromRS(regionserverZnode);
190 deleteAnotherRSQueues(regionserverZnode);
191 }
192 return newQueues;
193 }
194
195 @Override
196 public void removeAllQueues() {
197 try {
198 ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode);
199 } catch (KeeperException e) {
200
201 if (e instanceof KeeperException.SessionExpiredException) {
202 return;
203 }
204 this.abortable.abort("Failed to delete replication queues for region server: "
205 + this.myQueuesZnode, e);
206 }
207 }
208
209 @Override
210 public List<String> getLogsInQueue(String queueId) {
211 String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
212 List<String> result = null;
213 try {
214 result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
215 } catch (KeeperException e) {
216 this.abortable.abort("Failed to get list of wals for queueId=" + queueId, e);
217 }
218 return result;
219 }
220
221 @Override
222 public List<String> getAllQueues() {
223 List<String> listOfQueues = null;
224 try {
225 listOfQueues = ZKUtil.listChildrenNoWatch(this.zookeeper, this.myQueuesZnode);
226 } catch (KeeperException e) {
227 this.abortable.abort("Failed to get a list of queues for region server: "
228 + this.myQueuesZnode, e);
229 }
230 return listOfQueues;
231 }
232
233
234
235
236
237
238 @VisibleForTesting
239 public boolean lockOtherRS(String znode) {
240 try {
241 String parent = ZKUtil.joinZNode(this.queuesZNode, znode);
242 if (parent.equals(this.myQueuesZnode)) {
243 LOG.warn("Won't lock because this is us, we're dead!");
244 return false;
245 }
246 String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
247 ZKUtil.createAndWatch(this.zookeeper, p, lockToByteArray(this.myQueuesZnode));
248 } catch (KeeperException e) {
249
250
251
252
253
254 if (e instanceof KeeperException.NoNodeException
255 || e instanceof KeeperException.NodeExistsException) {
256 LOG.info("Won't transfer the queue," + " another RS took care of it because of: "
257 + e.getMessage());
258 } else {
259 LOG.info("Failed lock other rs", e);
260 }
261 return false;
262 }
263 return true;
264 }
265
266 public String getLockZNode(String znode) {
267 return this.queuesZNode + "/" + znode + "/" + RS_LOCK_ZNODE;
268 }
269
270 @VisibleForTesting
271 public boolean checkLockExists(String znode) throws KeeperException {
272 return ZKUtil.checkExists(zookeeper, getLockZNode(znode)) >= 0;
273 }
274
275
276
277
278
279 private void deleteAnotherRSQueues(String regionserverZnode) {
280 String fullpath = ZKUtil.joinZNode(this.queuesZNode, regionserverZnode);
281 try {
282 List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, fullpath);
283 for (String cluster : clusters) {
284
285 if (cluster.equals(RS_LOCK_ZNODE)) {
286 continue;
287 }
288 String fullClusterPath = ZKUtil.joinZNode(fullpath, cluster);
289 ZKUtil.deleteNodeRecursively(this.zookeeper, fullClusterPath);
290 }
291
292 ZKUtil.deleteNodeRecursively(this.zookeeper, fullpath);
293 } catch (KeeperException e) {
294 if (e instanceof KeeperException.NoNodeException
295 || e instanceof KeeperException.NotEmptyException) {
296
297
298
299 if (e.getPath().equals(fullpath)) {
300 return;
301 }
302 }
303 this.abortable.abort("Failed to delete replication queues for region server: "
304 + regionserverZnode, e);
305 }
306 }
307
308
309
310
311
312
313
314 private SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String znode) {
315 SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
316
317 String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
318 List<String> peerIdsToProcess = null;
319 List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
320 try {
321 peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
322 if (peerIdsToProcess == null) return queues;
323 for (String peerId : peerIdsToProcess) {
324 ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
325 if (!peerExists(replicationQueueInfo.getPeerId())) {
326
327
328
329 LOG.warn("Peer " + peerId
330 + " didn't exist, will move its queue to avoid the failure of multi op");
331 }
332 String newPeerId = peerId + "-" + znode;
333 String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
334
335 String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
336 List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
337 if (wals == null || wals.size() == 0) {
338 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
339 continue;
340 }
341
342 SortedSet<String> logQueue = new TreeSet<String>();
343 queues.put(newPeerId, logQueue);
344 ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
345 listOfOps.add(op);
346
347 for (String wal : wals) {
348 String oldWalZnode = ZKUtil.joinZNode(oldClusterZnode, wal);
349 byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalZnode);
350 LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset));
351 String newLogZnode = ZKUtil.joinZNode(newPeerZnode, wal);
352 listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
353
354 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
355 logQueue.add(wal);
356 }
357
358 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
359 }
360
361
362
363 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
364 if (LOG.isTraceEnabled()) LOG.trace(" The multi list size is: " + listOfOps.size());
365 ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
366 if (LOG.isTraceEnabled()) LOG.trace("Atomically moved the dead regionserver logs. ");
367 } catch (KeeperException e) {
368
369 LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
370 queues.clear();
371 } catch (InterruptedException e) {
372 LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
373 queues.clear();
374 Thread.currentThread().interrupt();
375 }
376 return queues;
377 }
378
379
380
381
382
383
384
385 private SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
386
387
388 SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
389 try {
390 String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
391 List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
392
393 if (clusters == null || clusters.size() <= 1) {
394 return queues;
395 }
396
397 clusters.remove(RS_LOCK_ZNODE);
398 for (String cluster : clusters) {
399 ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(cluster);
400 if (!peerExists(replicationQueueInfo.getPeerId())) {
401 LOG.warn("Peer " + cluster + " didn't exist, skipping the replay");
402
403 continue;
404 }
405
406
407
408 String newCluster = cluster + "-" + znode;
409 String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster);
410 String clusterPath = ZKUtil.joinZNode(nodePath, cluster);
411 List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
412
413 if (wals == null || wals.size() == 0) {
414 continue;
415 }
416 ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
417 HConstants.EMPTY_BYTE_ARRAY);
418 SortedSet<String> logQueue = new TreeSet<String>();
419 queues.put(newCluster, logQueue);
420 for (String wal : wals) {
421 String z = ZKUtil.joinZNode(clusterPath, wal);
422 byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
423 long position = 0;
424 try {
425 position = ZKUtil.parseWALPositionFrom(positionBytes);
426 } catch (DeserializationException e) {
427 LOG.warn("Failed parse of wal position from the following znode: " + z
428 + ", Exception: " + e);
429 }
430 LOG.debug("Creating " + wal + " with data " + position);
431 String child = ZKUtil.joinZNode(newClusterZnode, wal);
432
433
434 ZKUtil.createAndWatch(this.zookeeper, child, positionBytes);
435 logQueue.add(wal);
436 }
437 }
438 } catch (KeeperException e) {
439 this.abortable.abort("Copy queues from rs", e);
440 } catch (InterruptedException e) {
441 LOG.warn(e);
442 Thread.currentThread().interrupt();
443 }
444 return queues;
445 }
446
447
448
449
450
451
452 static byte[] lockToByteArray(final String lockOwner) {
453 byte[] bytes =
454 ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build().toByteArray();
455 return ProtobufUtil.prependPBMagic(bytes);
456 }
457 }