1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.coordination;
20
21 import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
22 import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
23 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
24 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
25 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
26 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
27
28 import java.io.IOException;
29 import java.io.InterruptedIOException;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.List;
33 import java.util.Set;
34 import java.util.concurrent.ConcurrentMap;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.CoordinatedStateManager;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.HRegionInfo;
43 import org.apache.hadoop.hbase.Server;
44 import org.apache.hadoop.hbase.ServerName;
45 import org.apache.hadoop.hbase.SplitLogCounters;
46 import org.apache.hadoop.hbase.SplitLogTask;
47 import org.apache.hadoop.hbase.classification.InterfaceAudience;
48 import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination.TaskFinisher.Status;
49 import org.apache.hadoop.hbase.exceptions.DeserializationException;
50 import org.apache.hadoop.hbase.master.MasterServices;
51 import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective;
52 import org.apache.hadoop.hbase.master.SplitLogManager.Task;
53 import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus;
54 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
55 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
56 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
57 import org.apache.hadoop.hbase.wal.WALSplitter;
58 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
59 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
60 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
61 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
62 import org.apache.hadoop.util.StringUtils;
63 import org.apache.zookeeper.AsyncCallback;
64 import org.apache.zookeeper.CreateMode;
65 import org.apache.zookeeper.KeeperException;
66 import org.apache.zookeeper.KeeperException.NoNodeException;
67 import org.apache.zookeeper.ZooDefs.Ids;
68 import org.apache.zookeeper.data.Stat;
69
70
71
72
73
74 @InterfaceAudience.Private
75 public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
76 SplitLogManagerCoordination {
77
78 public static class ZkSplitLogManagerDetails extends SplitLogManagerDetails {
79
80 ZkSplitLogManagerDetails(ConcurrentMap<String, Task> tasks, MasterServices master,
81 Set<String> failedDeletions, ServerName serverName) {
82 super(tasks, master, failedDeletions, serverName);
83 }
84 }
85
86 public static final int DEFAULT_TIMEOUT = 120000;
87 public static final int DEFAULT_ZK_RETRIES = 3;
88 public static final int DEFAULT_MAX_RESUBMIT = 3;
89
90 private static final Log LOG = LogFactory.getLog(SplitLogManagerCoordination.class);
91
92 private Server server;
93 private long zkretries;
94 private long resubmitThreshold;
95 private long timeout;
96 private TaskFinisher taskFinisher;
97
98 SplitLogManagerDetails details;
99
100
101
102 private volatile long lastRecoveringNodeCreationTime = 0;
103 private Configuration conf;
104 public boolean ignoreZKDeleteForTesting = false;
105
106 private RecoveryMode recoveryMode;
107
108 private boolean isDrainingDone = false;
109
110 public ZKSplitLogManagerCoordination(final CoordinatedStateManager manager,
111 ZooKeeperWatcher watcher) {
112 super(watcher);
113 taskFinisher = new TaskFinisher() {
114 @Override
115 public Status finish(ServerName workerName, String logfile) {
116 try {
117 WALSplitter.finishSplitLogFile(logfile, manager.getServer().getConfiguration());
118 } catch (IOException e) {
119 LOG.warn("Could not finish splitting of log file " + logfile, e);
120 return Status.ERR;
121 }
122 return Status.DONE;
123 }
124 };
125 this.server = manager.getServer();
126 this.conf = server.getConfiguration();
127 }
128
129 @Override
130 public void init() throws IOException {
131 this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
132 this.resubmitThreshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
133 this.timeout = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, DEFAULT_TIMEOUT);
134 setRecoveryMode(true);
135 if (this.watcher != null) {
136 this.watcher.registerListener(this);
137 lookForOrphans();
138 }
139 }
140
141 @Override
142 public String prepareTask(String taskname) {
143 return ZKSplitLog.getEncodedNodeName(watcher, taskname);
144 }
145
146 @Override
147 public int remainingTasksInCoordination() {
148 int count = 0;
149 try {
150 List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
151 if (tasks != null) {
152 int listSize = tasks.size();
153 for (int i = 0; i < listSize; i++) {
154 if (!ZKSplitLog.isRescanNode(tasks.get(i))) {
155 count++;
156 }
157 }
158 }
159 } catch (KeeperException ke) {
160 LOG.warn("Failed to check remaining tasks", ke);
161 count = -1;
162 }
163 return count;
164 }
165
166
167
168
169
170
171
172
173 private void handleUnassignedTask(String path) {
174 if (ZKSplitLog.isRescanNode(watcher, path)) {
175 return;
176 }
177 Task task = findOrCreateOrphanTask(path);
178 if (task.isOrphan() && (task.incarnation.get() == 0)) {
179 LOG.info("resubmitting unassigned orphan task " + path);
180
181
182 resubmitTask(path, task, FORCE);
183 }
184 }
185
186 @Override
187 public void deleteTask(String path) {
188 deleteNode(path, zkretries);
189 }
190
191 @Override
192 public boolean resubmitTask(String path, Task task, ResubmitDirective directive) {
193
194 if (task.status != IN_PROGRESS) {
195 return false;
196 }
197 int version;
198 if (directive != FORCE) {
199
200
201
202
203
204 final long time = EnvironmentEdgeManager.currentTime() - task.last_update;
205 final boolean alive =
206 details.getMaster().getServerManager() != null ? details.getMaster().getServerManager()
207 .isServerOnline(task.cur_worker_name) : true;
208 if (alive && time < timeout) {
209 LOG.trace("Skipping the resubmit of " + task.toString() + " because the server "
210 + task.cur_worker_name + " is not marked as dead, we waited for " + time
211 + " while the timeout is " + timeout);
212 return false;
213 }
214
215 if (task.unforcedResubmits.get() >= resubmitThreshold) {
216 if (!task.resubmitThresholdReached) {
217 task.resubmitThresholdReached = true;
218 SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet();
219 LOG.info("Skipping resubmissions of task " + path + " because threshold "
220 + resubmitThreshold + " reached");
221 }
222 return false;
223 }
224
225 version = task.last_version;
226 } else {
227 SplitLogCounters.tot_mgr_resubmit_force.incrementAndGet();
228 version = -1;
229 }
230 LOG.info("resubmitting task " + path);
231 task.incarnation.incrementAndGet();
232 boolean result = resubmit(this.details.getServerName(), path, version);
233 if (!result) {
234 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime());
235 return false;
236 }
237
238 if (directive != FORCE) {
239 task.unforcedResubmits.incrementAndGet();
240 }
241 task.setUnassigned();
242 rescan(Long.MAX_VALUE);
243 SplitLogCounters.tot_mgr_resubmit.incrementAndGet();
244 return true;
245 }
246
247
248 @Override
249 public void checkTasks() {
250 rescan(Long.MAX_VALUE);
251 };
252
253
254
255
256 private void rescan(long retries) {
257
258
259
260
261
262
263
264 SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName(), getRecoveryMode());
265 this.watcher
266 .getRecoverableZooKeeper()
267 .getZooKeeper()
268 .create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
269 CreateMode.EPHEMERAL_SEQUENTIAL, new CreateRescanAsyncCallback(), Long.valueOf(retries));
270 }
271
272 @Override
273 public void submitTask(String path) {
274 createNode(path, zkretries);
275 }
276
277 @Override
278 public void checkTaskStillAvailable(String path) {
279
280 this.watcher
281 .getRecoverableZooKeeper()
282 .getZooKeeper()
283 .getData(path, this.watcher, new GetDataAsyncCallback(),
284 Long.valueOf(-1)
285 SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
286 }
287
288
289
290
291
292
293
294
295 @Override
296 public void removeRecoveringRegions(final Set<String> recoveredServerNameSet,
297 Boolean isMetaRecovery)
298 throws IOException {
299 final String metaEncodeRegionName = HRegionInfo.FIRST_META_REGIONINFO.getEncodedName();
300 int count = 0;
301 try {
302 List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
303 if (tasks != null) {
304 int listSize = tasks.size();
305 for (int i = 0; i < listSize; i++) {
306 if (!ZKSplitLog.isRescanNode(tasks.get(i))) {
307 count++;
308 }
309 }
310 }
311 if (count == 0 && this.details.getMaster().isInitialized()
312 && !this.details.getMaster().getServerManager().areDeadServersInProgress()) {
313
314 ZKSplitLog.deleteRecoveringRegionZNodes(watcher, null);
315
316
317 lastRecoveringNodeCreationTime = Long.MAX_VALUE;
318 } else if (!recoveredServerNameSet.isEmpty()) {
319
320 List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
321 if (regions != null) {
322 int listSize = regions.size();
323 if (LOG.isDebugEnabled()) {
324 LOG.debug("Processing recovering " + regions + " and servers " +
325 recoveredServerNameSet + ", isMetaRecovery=" + isMetaRecovery);
326 }
327 for (int i = 0; i < listSize; i++) {
328 String region = regions.get(i);
329 if (isMetaRecovery != null) {
330 if ((isMetaRecovery && !region.equalsIgnoreCase(metaEncodeRegionName))
331 || (!isMetaRecovery && region.equalsIgnoreCase(metaEncodeRegionName))) {
332
333
334 continue;
335 }
336 }
337 String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
338 List<String> failedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
339 if (failedServers == null || failedServers.isEmpty()) {
340 ZKUtil.deleteNode(watcher, nodePath);
341 continue;
342 }
343 if (recoveredServerNameSet.containsAll(failedServers)) {
344 ZKUtil.deleteNodeRecursively(watcher, nodePath);
345 } else {
346 int tmpFailedServerSize = failedServers.size();
347 for (int j = 0; j < tmpFailedServerSize; j++) {
348 String failedServer = failedServers.get(j);
349 if (recoveredServerNameSet.contains(failedServer)) {
350 String tmpPath = ZKUtil.joinZNode(nodePath, failedServer);
351 ZKUtil.deleteNode(watcher, tmpPath);
352 }
353 }
354 }
355 }
356 }
357 }
358 } catch (KeeperException ke) {
359 LOG.warn("removeRecoveringRegionsFromZK got zookeeper exception. Will retry", ke);
360 throw new IOException(ke);
361 }
362 }
363
364 private void deleteNode(String path, Long retries) {
365 SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet();
366
367
368
369 this.watcher.getRecoverableZooKeeper().getZooKeeper()
370 .delete(path, -1, new DeleteAsyncCallback(), retries);
371 }
372
373 private void deleteNodeSuccess(String path) {
374 if (ignoreZKDeleteForTesting) {
375 return;
376 }
377 Task task;
378 task = details.getTasks().remove(path);
379 if (task == null) {
380 if (ZKSplitLog.isRescanNode(watcher, path)) {
381 SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet();
382 }
383 SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet();
384 LOG.debug("deleted task without in memory state " + path);
385 return;
386 }
387 synchronized (task) {
388 task.status = DELETED;
389 task.notify();
390 }
391 SplitLogCounters.tot_mgr_task_deleted.incrementAndGet();
392 }
393
394 private void deleteNodeFailure(String path) {
395 LOG.info("Failed to delete node " + path + " and will retry soon.");
396 return;
397 }
398
399 private void createRescanSuccess(String path) {
400 SplitLogCounters.tot_mgr_rescan.incrementAndGet();
401 getDataSetWatch(path, zkretries);
402 }
403
404 private void createRescanFailure() {
405 LOG.fatal("logic failure, rescan failure must not happen");
406 }
407
408
409
410
411
412
413
414 private boolean needAbandonRetries(int statusCode, String action) {
415 if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) {
416 LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for "
417 + "action=" + action);
418 return true;
419 }
420 return false;
421 }
422
423 private void createNode(String path, Long retry_count) {
424 SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName(), getRecoveryMode());
425 ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(),
426 retry_count);
427 SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
428 return;
429 }
430
431 private void createNodeSuccess(String path) {
432 LOG.debug("put up splitlog task at znode " + path);
433 getDataSetWatch(path, zkretries);
434 }
435
436 private void createNodeFailure(String path) {
437
438 LOG.warn("failed to create task node" + path);
439 setDone(path, FAILURE);
440 }
441
442 private void getDataSetWatch(String path, Long retry_count) {
443 this.watcher.getRecoverableZooKeeper().getZooKeeper()
444 .getData(path, this.watcher, new GetDataAsyncCallback(), retry_count);
445 SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
446 }
447
448
449 private void getDataSetWatchSuccess(String path, byte[] data, int version)
450 throws DeserializationException {
451 if (data == null) {
452 if (version == Integer.MIN_VALUE) {
453
454 setDone(path, SUCCESS);
455 return;
456 }
457 SplitLogCounters.tot_mgr_null_data.incrementAndGet();
458 LOG.fatal("logic error - got null data " + path);
459 setDone(path, FAILURE);
460 return;
461 }
462 data = this.watcher.getRecoverableZooKeeper().removeMetaData(data);
463 SplitLogTask slt = SplitLogTask.parseFrom(data);
464 if (slt.isUnassigned()) {
465 LOG.debug("task not yet acquired " + path + " ver = " + version);
466 handleUnassignedTask(path);
467 } else if (slt.isOwned()) {
468 heartbeat(path, version, slt.getServerName());
469 } else if (slt.isResigned()) {
470 LOG.info("task " + path + " entered state: " + slt.toString());
471 resubmitOrFail(path, FORCE);
472 } else if (slt.isDone()) {
473 LOG.info("task " + path + " entered state: " + slt.toString());
474 if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
475 if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) {
476 setDone(path, SUCCESS);
477 } else {
478 resubmitOrFail(path, CHECK);
479 }
480 } else {
481 setDone(path, SUCCESS);
482 }
483 } else if (slt.isErr()) {
484 LOG.info("task " + path + " entered state: " + slt.toString());
485 resubmitOrFail(path, CHECK);
486 } else {
487 LOG.fatal("logic error - unexpected zk state for path = " + path + " data = "
488 + slt.toString());
489 setDone(path, FAILURE);
490 }
491 }
492
493 private void resubmitOrFail(String path, ResubmitDirective directive) {
494 if (resubmitTask(path, findOrCreateOrphanTask(path), directive) == false) {
495 setDone(path, FAILURE);
496 }
497 }
498
499 private void getDataSetWatchFailure(String path) {
500 LOG.warn("failed to set data watch " + path);
501 setDone(path, FAILURE);
502 }
503
504 private void setDone(String path, TerminationStatus status) {
505 Task task = details.getTasks().get(path);
506 if (task == null) {
507 if (!ZKSplitLog.isRescanNode(watcher, path)) {
508 SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet();
509 LOG.debug("unacquired orphan task is done " + path);
510 }
511 } else {
512 synchronized (task) {
513 if (task.status == IN_PROGRESS) {
514 if (status == SUCCESS) {
515 SplitLogCounters.tot_mgr_log_split_success.incrementAndGet();
516 LOG.info("Done splitting " + path);
517 } else {
518 SplitLogCounters.tot_mgr_log_split_err.incrementAndGet();
519 LOG.warn("Error splitting " + path);
520 }
521 task.status = status;
522 if (task.batch != null) {
523 synchronized (task.batch) {
524 if (status == SUCCESS) {
525 task.batch.done++;
526 } else {
527 task.batch.error++;
528 }
529 task.batch.notify();
530 }
531 }
532 }
533 }
534 }
535
536
537
538
539
540 deleteNode(path, zkretries);
541 return;
542 }
543
544 Task findOrCreateOrphanTask(String path) {
545 Task orphanTask = new Task();
546 Task task;
547 task = details.getTasks().putIfAbsent(path, orphanTask);
548 if (task == null) {
549 LOG.info("creating orphan task " + path);
550 SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
551 task = orphanTask;
552 }
553 return task;
554 }
555
556 private void heartbeat(String path, int new_version, ServerName workerName) {
557 Task task = findOrCreateOrphanTask(path);
558 if (new_version != task.last_version) {
559 if (task.isUnassigned()) {
560 LOG.info("task " + path + " acquired by " + workerName);
561 }
562 task.heartbeat(EnvironmentEdgeManager.currentTime(), new_version, workerName);
563 SplitLogCounters.tot_mgr_heartbeat.incrementAndGet();
564 } else {
565
566
567
568
569 }
570 return;
571 }
572
573 private void lookForOrphans() {
574 List<String> orphans;
575 try {
576 orphans = ZKUtil.listChildrenNoWatch(this.watcher, this.watcher.splitLogZNode);
577 if (orphans == null) {
578 LOG.warn("could not get children of " + this.watcher.splitLogZNode);
579 return;
580 }
581 } catch (KeeperException e) {
582 LOG.warn("could not get children of " + this.watcher.splitLogZNode + " "
583 + StringUtils.stringifyException(e));
584 return;
585 }
586 int rescan_nodes = 0;
587 int listSize = orphans.size();
588 for (int i = 0; i < listSize; i++) {
589 String path = orphans.get(i);
590 String nodepath = ZKUtil.joinZNode(watcher.splitLogZNode, path);
591 if (ZKSplitLog.isRescanNode(watcher, nodepath)) {
592 rescan_nodes++;
593 LOG.debug("found orphan rescan node " + path);
594 } else {
595 LOG.info("found orphan task " + path);
596 }
597 getDataSetWatch(nodepath, zkretries);
598 }
599 LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " + rescan_nodes
600 + " rescan nodes");
601 }
602
603
604
605
606
607
608
609 @Override
610 public void markRegionsRecovering(final ServerName serverName, Set<HRegionInfo> userRegions)
611 throws IOException, InterruptedIOException {
612 this.lastRecoveringNodeCreationTime = EnvironmentEdgeManager.currentTime();
613 for (HRegionInfo region : userRegions) {
614 String regionEncodeName = region.getEncodedName();
615 long retries = this.zkretries;
616
617 do {
618 String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName);
619 long lastRecordedFlushedSequenceId = -1;
620 try {
621 long lastSequenceId =
622 this.details.getMaster().getServerManager()
623 .getLastFlushedSequenceId(regionEncodeName.getBytes()).getLastFlushedSequenceId();
624
625
626
627
628
629 byte[] data = ZKUtil.getData(this.watcher, nodePath);
630 if (data == null) {
631 ZKUtil
632 .createSetData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId));
633 } else {
634 lastRecordedFlushedSequenceId =
635 ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
636 if (lastRecordedFlushedSequenceId < lastSequenceId) {
637
638 ZKUtil.setData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId));
639 }
640 }
641
642 nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName());
643 if (lastSequenceId <= lastRecordedFlushedSequenceId) {
644
645 lastSequenceId = lastRecordedFlushedSequenceId;
646 }
647 ZKUtil.createSetData(this.watcher, nodePath,
648 ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null));
649 if (LOG.isDebugEnabled()) {
650 LOG.debug("Marked " + regionEncodeName + " recovering from " + serverName +
651 ": " + nodePath);
652 }
653
654 break;
655 } catch (KeeperException e) {
656
657 if (retries <= 1) {
658 throw new IOException(e);
659 }
660
661 try {
662 Thread.sleep(20);
663 } catch (InterruptedException e1) {
664 throw new InterruptedIOException();
665 }
666 } catch (InterruptedException e) {
667 throw new InterruptedIOException();
668 }
669 } while ((--retries) > 0);
670 }
671 }
672
673 @Override
674 public void nodeDataChanged(String path) {
675 Task task;
676 task = details.getTasks().get(path);
677 if (task != null || ZKSplitLog.isRescanNode(watcher, path)) {
678 if (task != null) {
679 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime());
680 }
681 getDataSetWatch(path, zkretries);
682 }
683 }
684
685
686
687
688
689 @Override
690 public void removeStaleRecoveringRegions(final Set<String> knownFailedServers)
691 throws IOException, InterruptedIOException {
692
693 try {
694 List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
695 if (tasks != null) {
696 int listSize = tasks.size();
697 for (int i = 0; i < listSize; i++) {
698 String t = tasks.get(i);
699 byte[] data;
700 try {
701 data = ZKUtil.getData(this.watcher, ZKUtil.joinZNode(watcher.splitLogZNode, t));
702 } catch (InterruptedException e) {
703 throw new InterruptedIOException();
704 }
705 if (data != null) {
706 SplitLogTask slt = null;
707 try {
708 slt = SplitLogTask.parseFrom(data);
709 } catch (DeserializationException e) {
710 LOG.warn("Failed parse data for znode " + t, e);
711 }
712 if (slt != null && slt.isDone()) {
713 continue;
714 }
715 }
716
717 t = ZKSplitLog.getFileName(t);
718 ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(new Path(t));
719 if (serverName != null) {
720 knownFailedServers.add(serverName.getServerName());
721 } else {
722 LOG.warn("Found invalid WAL log file name:" + t);
723 }
724 }
725 }
726
727
728 List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
729 if (regions != null) {
730 int listSize = regions.size();
731 for (int i = 0; i < listSize; i++) {
732 String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regions.get(i));
733 List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
734 if (regionFailedServers == null || regionFailedServers.isEmpty()) {
735 ZKUtil.deleteNode(watcher, nodePath);
736 continue;
737 }
738 boolean needMoreRecovery = false;
739 int tmpFailedServerSize = regionFailedServers.size();
740 for (int j = 0; j < tmpFailedServerSize; j++) {
741 if (knownFailedServers.contains(regionFailedServers.get(j))) {
742 needMoreRecovery = true;
743 break;
744 }
745 }
746 if (!needMoreRecovery) {
747 ZKUtil.deleteNodeRecursively(watcher, nodePath);
748 }
749 }
750 }
751 } catch (KeeperException e) {
752 throw new IOException(e);
753 }
754 }
755
756 @Override
757 public synchronized boolean isReplaying() {
758 return this.recoveryMode == RecoveryMode.LOG_REPLAY;
759 }
760
761 @Override
762 public synchronized boolean isSplitting() {
763 return this.recoveryMode == RecoveryMode.LOG_SPLITTING;
764 }
765
766 private List<String> listSplitLogTasks() throws KeeperException {
767 List<String> taskOrRescanList = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
768 if (taskOrRescanList == null || taskOrRescanList.isEmpty()) {
769 return Collections.<String> emptyList();
770 }
771 List<String> taskList = new ArrayList<String>();
772 for (String taskOrRescan : taskOrRescanList) {
773
774 if (!ZKSplitLog.isRescanNode(taskOrRescan)) {
775 taskList.add(taskOrRescan);
776 }
777 }
778 return taskList;
779 }
780
781
782
783
784
785
786
787 @Override
788 public void setRecoveryMode(boolean isForInitialization) throws IOException {
789 synchronized(this) {
790 if (this.isDrainingDone) {
791
792
793 return;
794 }
795 }
796 if (this.watcher == null) {
797
798 synchronized(this) {
799 this.isDrainingDone = true;
800 this.recoveryMode = RecoveryMode.LOG_SPLITTING;
801 }
802 return;
803 }
804 boolean hasSplitLogTask = false;
805 boolean hasRecoveringRegions = false;
806 RecoveryMode previousRecoveryMode = RecoveryMode.UNKNOWN;
807 RecoveryMode recoveryModeInConfig =
808 (isDistributedLogReplay(conf)) ? RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING;
809
810
811 try {
812 List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
813 if (regions != null && !regions.isEmpty()) {
814 hasRecoveringRegions = true;
815 previousRecoveryMode = RecoveryMode.LOG_REPLAY;
816 }
817 if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
818
819 List<String> tasks = listSplitLogTasks();
820 if (!tasks.isEmpty()) {
821 hasSplitLogTask = true;
822 if (isForInitialization) {
823
824 int listSize = tasks.size();
825 for (int i = 0; i < listSize; i++) {
826 String task = tasks.get(i);
827 try {
828 byte[] data =
829 ZKUtil.getData(this.watcher, ZKUtil.joinZNode(watcher.splitLogZNode, task));
830 if (data == null) continue;
831 SplitLogTask slt = SplitLogTask.parseFrom(data);
832 previousRecoveryMode = slt.getMode();
833 if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
834
835
836
837 previousRecoveryMode = RecoveryMode.LOG_SPLITTING;
838 }
839 break;
840 } catch (DeserializationException e) {
841 LOG.warn("Failed parse data for znode " + task, e);
842 } catch (InterruptedException e) {
843 throw new InterruptedIOException();
844 }
845 }
846 }
847 }
848 }
849 } catch (KeeperException e) {
850 throw new IOException(e);
851 }
852
853 synchronized (this) {
854 if (this.isDrainingDone) {
855 return;
856 }
857 if (!hasSplitLogTask && !hasRecoveringRegions) {
858 this.isDrainingDone = true;
859 this.recoveryMode = recoveryModeInConfig;
860 return;
861 } else if (!isForInitialization) {
862
863 return;
864 }
865
866 if (previousRecoveryMode != RecoveryMode.UNKNOWN) {
867 this.isDrainingDone = (previousRecoveryMode == recoveryModeInConfig);
868 this.recoveryMode = previousRecoveryMode;
869 } else {
870 this.recoveryMode = recoveryModeInConfig;
871 }
872 }
873 }
874
875
876
877
878
879
880 private boolean isDistributedLogReplay(Configuration conf) {
881 return false;
882 }
883
884 private boolean resubmit(ServerName serverName, String path, int version) {
885 try {
886
887 SplitLogTask slt =
888 new SplitLogTask.Unassigned(this.details.getServerName(), getRecoveryMode());
889 if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
890 LOG.debug("failed to resubmit task " + path + " version changed");
891 return false;
892 }
893 } catch (NoNodeException e) {
894 LOG.warn("failed to resubmit because znode doesn't exist " + path
895 + " task done (or forced done by removing the znode)");
896 try {
897 getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
898 } catch (DeserializationException e1) {
899 LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1);
900 return false;
901 }
902 return false;
903 } catch (KeeperException.BadVersionException e) {
904 LOG.debug("failed to resubmit task " + path + " version changed");
905 return false;
906 } catch (KeeperException e) {
907 SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet();
908 LOG.warn("failed to resubmit " + path, e);
909 return false;
910 }
911 return true;
912 }
913
914
915
916
917
918
919
920
921
922 public interface TaskFinisher {
923
924
925
926 enum Status {
927
928
929
930 DONE(),
931
932
933
934 ERR();
935 }
936
937
938
939
940
941
942
943
944
945
946 Status finish(ServerName workerName, String taskname);
947 }
948
949
950
951
952 public class CreateAsyncCallback implements AsyncCallback.StringCallback {
953 private final Log LOG = LogFactory.getLog(CreateAsyncCallback.class);
954
955 @Override
956 public void processResult(int rc, String path, Object ctx, String name) {
957 SplitLogCounters.tot_mgr_node_create_result.incrementAndGet();
958 if (rc != 0) {
959 if (needAbandonRetries(rc, "Create znode " + path)) {
960 createNodeFailure(path);
961 return;
962 }
963 if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
964
965
966
967
968
969
970 LOG.debug("found pre-existing znode " + path);
971 SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet();
972 } else {
973 Long retry_count = (Long) ctx;
974 LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " + path
975 + " remaining retries=" + retry_count);
976 if (retry_count == 0) {
977 SplitLogCounters.tot_mgr_node_create_err.incrementAndGet();
978 createNodeFailure(path);
979 } else {
980 SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet();
981 createNode(path, retry_count - 1);
982 }
983 return;
984 }
985 }
986 createNodeSuccess(path);
987 }
988 }
989
990
991
992
993 public class GetDataAsyncCallback implements AsyncCallback.DataCallback {
994 private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
995
996 @Override
997 public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
998 SplitLogCounters.tot_mgr_get_data_result.incrementAndGet();
999 if (rc != 0) {
1000 if (needAbandonRetries(rc, "GetData from znode " + path)) {
1001 return;
1002 }
1003 if (rc == KeeperException.Code.NONODE.intValue()) {
1004 SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet();
1005 LOG.warn("task znode " + path + " vanished or not created yet.");
1006
1007
1008
1009 return;
1010 }
1011 Long retry_count = (Long) ctx;
1012
1013 if (retry_count < 0) {
1014 LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path
1015 + ". Ignoring error. No error handling. No retrying.");
1016 return;
1017 }
1018 LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path
1019 + " remaining retries=" + retry_count);
1020 if (retry_count == 0) {
1021 SplitLogCounters.tot_mgr_get_data_err.incrementAndGet();
1022 getDataSetWatchFailure(path);
1023 } else {
1024 SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet();
1025 getDataSetWatch(path, retry_count - 1);
1026 }
1027 return;
1028 }
1029 try {
1030 getDataSetWatchSuccess(path, data, stat.getVersion());
1031 } catch (DeserializationException e) {
1032 LOG.warn("Deserialization problem", e);
1033 }
1034 return;
1035 }
1036 }
1037
1038
1039
1040
1041 public class DeleteAsyncCallback implements AsyncCallback.VoidCallback {
1042 private final Log LOG = LogFactory.getLog(DeleteAsyncCallback.class);
1043
1044 @Override
1045 public void processResult(int rc, String path, Object ctx) {
1046 SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet();
1047 if (rc != 0) {
1048 if (needAbandonRetries(rc, "Delete znode " + path)) {
1049 details.getFailedDeletions().add(path);
1050 return;
1051 }
1052 if (rc != KeeperException.Code.NONODE.intValue()) {
1053 SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet();
1054 Long retry_count = (Long) ctx;
1055 LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " + path
1056 + " remaining retries=" + retry_count);
1057 if (retry_count == 0) {
1058 LOG.warn("delete failed " + path);
1059 details.getFailedDeletions().add(path);
1060 deleteNodeFailure(path);
1061 } else {
1062 deleteNode(path, retry_count - 1);
1063 }
1064 return;
1065 } else {
1066 LOG.info(path + " does not exist. Either was created but deleted behind our"
1067 + " back by another pending delete OR was deleted"
1068 + " in earlier retry rounds. zkretries = " + ctx);
1069 }
1070 } else {
1071 LOG.debug("deleted " + path);
1072 }
1073 deleteNodeSuccess(path);
1074 }
1075 }
1076
1077
1078
1079
1080
1081
1082
1083 public class CreateRescanAsyncCallback implements AsyncCallback.StringCallback {
1084 private final Log LOG = LogFactory.getLog(CreateRescanAsyncCallback.class);
1085
1086 @Override
1087 public void processResult(int rc, String path, Object ctx, String name) {
1088 if (rc != 0) {
1089 if (needAbandonRetries(rc, "CreateRescan znode " + path)) {
1090 return;
1091 }
1092 Long retry_count = (Long) ctx;
1093 LOG.warn("rc=" + KeeperException.Code.get(rc) + " for " + path + " remaining retries="
1094 + retry_count);
1095 if (retry_count == 0) {
1096 createRescanFailure();
1097 } else {
1098 rescan(retry_count - 1);
1099 }
1100 return;
1101 }
1102
1103 createRescanSuccess(name);
1104 }
1105 }
1106
1107 @Override
1108 public void setDetails(SplitLogManagerDetails details) {
1109 this.details = details;
1110 }
1111
1112 @Override
1113 public SplitLogManagerDetails getDetails() {
1114 return details;
1115 }
1116
1117 @Override
1118 public synchronized RecoveryMode getRecoveryMode() {
1119 return recoveryMode;
1120 }
1121
1122 @Override
1123 public long getLastRecoveryTime() {
1124 return lastRecoveringNodeCreationTime;
1125 }
1126
1127
1128
1129
1130 public void setIgnoreDeleteForTesting(boolean b) {
1131 ignoreZKDeleteForTesting = b;
1132 }
1133 }