1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.SortedMap;
24 import java.util.SortedSet;
25 import java.util.TreeMap;
26 import java.util.TreeSet;
27
28 import com.google.common.annotations.VisibleForTesting;
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.Abortable;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.exceptions.DeserializationException;
36 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
38 import org.apache.hadoop.hbase.util.Bytes;
39 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
40 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
41 import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
42 import org.apache.zookeeper.KeeperException;
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 @InterfaceAudience.Private
66 public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues {
67
68
69 private String myQueuesZnode;
70
71 private final static String RS_LOCK_ZNODE = "lock";
72
73 private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
74
75 public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf,
76 Abortable abortable) {
77 super(zk, conf, abortable);
78 }
79
80 @Override
81 public void init(String serverName) throws ReplicationException {
82 this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName);
83 try {
84 ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
85 } catch (KeeperException e) {
86 throw new ReplicationException("Could not initialize replication queues.", e);
87 }
88 }
89
90 @Override
91 public void removeQueue(String queueId) {
92 try {
93 ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.myQueuesZnode, queueId));
94 } catch (KeeperException e) {
95 this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", e);
96 }
97 }
98
99 @Override
100 public void addLog(String queueId, String filename) throws ReplicationException {
101 String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
102 znode = ZKUtil.joinZNode(znode, filename);
103 try {
104 ZKUtil.createWithParents(this.zookeeper, znode);
105 } catch (KeeperException e) {
106 throw new ReplicationException(
107 "Could not add log because znode could not be created. queueId=" + queueId
108 + ", filename=" + filename);
109 }
110 }
111
112 @Override
113 public void removeLog(String queueId, String filename) {
114 try {
115 String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
116 znode = ZKUtil.joinZNode(znode, filename);
117 ZKUtil.deleteNode(this.zookeeper, znode);
118 } catch (KeeperException e) {
119 this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename="
120 + filename + ")", e);
121 }
122 }
123
124 @Override
125 public void setLogPosition(String queueId, String filename, long position) {
126 try {
127 String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
128 znode = ZKUtil.joinZNode(znode, filename);
129
130 ZKUtil.setData(this.zookeeper, znode, ZKUtil.positionToByteArray(position));
131 } catch (KeeperException e) {
132 this.abortable.abort("Failed to write replication wal position (filename=" + filename
133 + ", position=" + position + ")", e);
134 }
135 }
136
137 @Override
138 public long getLogPosition(String queueId, String filename) throws ReplicationException {
139 String clusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
140 String znode = ZKUtil.joinZNode(clusterZnode, filename);
141 byte[] bytes = null;
142 try {
143 bytes = ZKUtil.getData(this.zookeeper, znode);
144 } catch (KeeperException e) {
145 throw new ReplicationException("Internal Error: could not get position in log for queueId="
146 + queueId + ", filename=" + filename, e);
147 } catch (InterruptedException e) {
148 Thread.currentThread().interrupt();
149 return 0;
150 }
151 try {
152 return ZKUtil.parseWALPositionFrom(bytes);
153 } catch (DeserializationException de) {
154 LOG.warn("Failed to parse WALPosition for queueId=" + queueId + " and wal=" + filename
155 + "znode content, continuing.");
156 }
157
158
159 return 0;
160 }
161
162 @Override
163 public boolean isThisOurZnode(String znode) {
164 return ZKUtil.joinZNode(this.queuesZNode, znode).equals(this.myQueuesZnode);
165 }
166
167 @Override
168 public SortedMap<String, SortedSet<String>> claimQueues(String regionserverZnode) {
169 SortedMap<String, SortedSet<String>> newQueues = new TreeMap<String, SortedSet<String>>();
170
171 if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
172 LOG.info("Atomically moving " + regionserverZnode + "'s wals to my queue");
173 newQueues = copyQueuesFromRSUsingMulti(regionserverZnode);
174 } else {
175 LOG.info("Moving " + regionserverZnode + "'s wals to my queue");
176 if (!lockOtherRS(regionserverZnode)) {
177 return newQueues;
178 }
179 newQueues = copyQueuesFromRS(regionserverZnode);
180 deleteAnotherRSQueues(regionserverZnode);
181 }
182 return newQueues;
183 }
184
185 @Override
186 public void removeAllQueues() {
187 try {
188 ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode);
189 } catch (KeeperException e) {
190
191 if (e instanceof KeeperException.SessionExpiredException) {
192 return;
193 }
194 this.abortable.abort("Failed to delete replication queues for region server: "
195 + this.myQueuesZnode, e);
196 }
197 }
198
199 @Override
200 public List<String> getLogsInQueue(String queueId) {
201 String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
202 List<String> result = null;
203 try {
204 result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
205 } catch (KeeperException e) {
206 this.abortable.abort("Failed to get list of wals for queueId=" + queueId, e);
207 }
208 return result;
209 }
210
211 @Override
212 public List<String> getAllQueues() {
213 List<String> listOfQueues = null;
214 try {
215 listOfQueues = ZKUtil.listChildrenNoWatch(this.zookeeper, this.myQueuesZnode);
216 } catch (KeeperException e) {
217 this.abortable.abort("Failed to get a list of queues for region server: "
218 + this.myQueuesZnode, e);
219 }
220 return listOfQueues;
221 }
222
223
224
225
226
227
228 @VisibleForTesting
229 public boolean lockOtherRS(String znode) {
230 try {
231 String parent = ZKUtil.joinZNode(this.queuesZNode, znode);
232 if (parent.equals(this.myQueuesZnode)) {
233 LOG.warn("Won't lock because this is us, we're dead!");
234 return false;
235 }
236 String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
237 ZKUtil.createAndWatch(this.zookeeper, p, lockToByteArray(this.myQueuesZnode));
238 } catch (KeeperException e) {
239
240
241
242
243
244 if (e instanceof KeeperException.NoNodeException
245 || e instanceof KeeperException.NodeExistsException) {
246 LOG.info("Won't transfer the queue," + " another RS took care of it because of: "
247 + e.getMessage());
248 } else {
249 LOG.info("Failed lock other rs", e);
250 }
251 return false;
252 }
253 return true;
254 }
255
256 public String getLockZNode(String znode) {
257 return this.queuesZNode + "/" + znode + "/" + RS_LOCK_ZNODE;
258 }
259
260 @VisibleForTesting
261 public boolean checkLockExists(String znode) throws KeeperException {
262 return ZKUtil.checkExists(zookeeper, getLockZNode(znode)) >= 0;
263 }
264
265
266
267
268
269 private void deleteAnotherRSQueues(String regionserverZnode) {
270 String fullpath = ZKUtil.joinZNode(this.queuesZNode, regionserverZnode);
271 try {
272 List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, fullpath);
273 for (String cluster : clusters) {
274
275 if (cluster.equals(RS_LOCK_ZNODE)) {
276 continue;
277 }
278 String fullClusterPath = ZKUtil.joinZNode(fullpath, cluster);
279 ZKUtil.deleteNodeRecursively(this.zookeeper, fullClusterPath);
280 }
281
282 ZKUtil.deleteNodeRecursively(this.zookeeper, fullpath);
283 } catch (KeeperException e) {
284 if (e instanceof KeeperException.NoNodeException
285 || e instanceof KeeperException.NotEmptyException) {
286
287
288
289 if (e.getPath().equals(fullpath)) {
290 return;
291 }
292 }
293 this.abortable.abort("Failed to delete replication queues for region server: "
294 + regionserverZnode, e);
295 }
296 }
297
298
299
300
301
302
303
304 private SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String znode) {
305 SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
306
307 String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
308 List<String> peerIdsToProcess = null;
309 List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
310 try {
311 peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
312 if (peerIdsToProcess == null) return queues;
313 for (String peerId : peerIdsToProcess) {
314 ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
315 if (!peerExists(replicationQueueInfo.getPeerId())) {
316
317
318
319 LOG.warn("Peer " + peerId
320 + " didn't exist, will move its queue to avoid the failure of multi op");
321 }
322 String newPeerId = peerId + "-" + znode;
323 String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
324
325 String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
326 List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
327 if (wals == null || wals.size() == 0) {
328 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
329 continue;
330 }
331
332 SortedSet<String> logQueue = new TreeSet<String>();
333 queues.put(newPeerId, logQueue);
334 ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
335 listOfOps.add(op);
336
337 for (String wal : wals) {
338 String oldWalZnode = ZKUtil.joinZNode(oldClusterZnode, wal);
339 byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalZnode);
340 LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset));
341 String newLogZnode = ZKUtil.joinZNode(newPeerZnode, wal);
342 listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
343
344 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
345 logQueue.add(wal);
346 }
347
348 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
349 }
350
351
352
353 listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
354 LOG.debug(" The multi list size is: " + listOfOps.size());
355 ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
356 LOG.info("Atomically moved the dead regionserver logs. ");
357 } catch (KeeperException e) {
358
359 LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
360 queues.clear();
361 } catch (InterruptedException e) {
362 LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
363 queues.clear();
364 Thread.currentThread().interrupt();
365 }
366 return queues;
367 }
368
369
370
371
372
373
374
375 private SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
376
377
378 SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
379 try {
380 String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
381 List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
382
383 if (clusters == null || clusters.size() <= 1) {
384 return queues;
385 }
386
387 clusters.remove(RS_LOCK_ZNODE);
388 for (String cluster : clusters) {
389 ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(cluster);
390 if (!peerExists(replicationQueueInfo.getPeerId())) {
391 LOG.warn("Peer " + cluster + " didn't exist, skipping the replay");
392
393 continue;
394 }
395
396
397
398 String newCluster = cluster + "-" + znode;
399 String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster);
400 String clusterPath = ZKUtil.joinZNode(nodePath, cluster);
401 List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
402
403 if (wals == null || wals.size() == 0) {
404 continue;
405 }
406 ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
407 HConstants.EMPTY_BYTE_ARRAY);
408 SortedSet<String> logQueue = new TreeSet<String>();
409 queues.put(newCluster, logQueue);
410 for (String wal : wals) {
411 String z = ZKUtil.joinZNode(clusterPath, wal);
412 byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
413 long position = 0;
414 try {
415 position = ZKUtil.parseWALPositionFrom(positionBytes);
416 } catch (DeserializationException e) {
417 LOG.warn("Failed parse of wal position from the following znode: " + z
418 + ", Exception: " + e);
419 }
420 LOG.debug("Creating " + wal + " with data " + position);
421 String child = ZKUtil.joinZNode(newClusterZnode, wal);
422
423
424 ZKUtil.createAndWatch(this.zookeeper, child, positionBytes);
425 logQueue.add(wal);
426 }
427 }
428 } catch (KeeperException e) {
429 this.abortable.abort("Copy queues from rs", e);
430 } catch (InterruptedException e) {
431 LOG.warn(e);
432 Thread.currentThread().interrupt();
433 }
434 return queues;
435 }
436
437
438
439
440
441
442 static byte[] lockToByteArray(final String lockOwner) {
443 byte[] bytes =
444 ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build().toByteArray();
445 return ProtobufUtil.prependPBMagic(bytes);
446 }
447 }