1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master;
19
20 import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
21 import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
22 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
23 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
24 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
25 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
26
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Collections;
30 import java.util.HashSet;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Set;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.ConcurrentMap;
36 import java.util.concurrent.locks.ReentrantLock;
37
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.classification.InterfaceAudience;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.fs.FileStatus;
43 import org.apache.hadoop.fs.FileSystem;
44 import org.apache.hadoop.fs.Path;
45 import org.apache.hadoop.fs.PathFilter;
46 import org.apache.hadoop.hbase.Chore;
47 import org.apache.hadoop.hbase.HConstants;
48 import org.apache.hadoop.hbase.HRegionInfo;
49 import org.apache.hadoop.hbase.ServerName;
50 import org.apache.hadoop.hbase.SplitLogCounters;
51 import org.apache.hadoop.hbase.SplitLogTask;
52 import org.apache.hadoop.hbase.Stoppable;
53 import org.apache.hadoop.hbase.exceptions.DeserializationException;
54 import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
55 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
56 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
57 import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
58 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
59 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
60 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
61 import org.apache.hadoop.hbase.util.FSUtils;
62 import org.apache.hadoop.hbase.util.Threads;
63 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
64 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
65 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
66 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
67 import org.apache.hadoop.util.StringUtils;
68 import org.apache.zookeeper.AsyncCallback;
69 import org.apache.zookeeper.CreateMode;
70 import org.apache.zookeeper.KeeperException;
71 import org.apache.zookeeper.KeeperException.NoNodeException;
72 import org.apache.zookeeper.ZooDefs.Ids;
73 import org.apache.zookeeper.data.Stat;
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103 @InterfaceAudience.Private
104 public class SplitLogManager extends ZooKeeperListener {
105 private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
106
107 public static final int DEFAULT_TIMEOUT = 120000;
108 public static final int DEFAULT_ZK_RETRIES = 3;
109 public static final int DEFAULT_MAX_RESUBMIT = 3;
110 public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000);
111
112 private final Stoppable stopper;
113 private final MasterServices master;
114 private final ServerName serverName;
115 private final TaskFinisher taskFinisher;
116 private FileSystem fs;
117 private Configuration conf;
118
119 private long zkretries;
120 private long resubmit_threshold;
121 private long timeout;
122 private long unassignedTimeout;
123 private long lastNodeCreateTime = Long.MAX_VALUE;
124 public boolean ignoreZKDeleteForTesting = false;
125 private volatile long lastRecoveringNodeCreationTime = Long.MAX_VALUE;
126
127
128 private long checkRecoveringTimeThreshold = 15000;
129 private final Set<ServerName> failedRecoveringRegionDeletions = Collections
130 .synchronizedSet(new HashSet<ServerName>());
131
132
133
134
135
136 protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
137
138 final boolean distributedLogReplay;
139
140 private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
141 private TimeoutMonitor timeoutMonitor;
142
143 private volatile Set<ServerName> deadWorkers = null;
144 private final Object deadWorkersLock = new Object();
145
146 private Set<String> failedDeletions = null;
147
148
149
150
151
152
153
154
155
156
157
158
159
160 public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
161 Stoppable stopper, MasterServices master, ServerName serverName) {
162 this(zkw, conf, stopper, master, serverName, new TaskFinisher() {
163 @Override
164 public Status finish(ServerName workerName, String logfile) {
165 try {
166 HLogSplitter.finishSplitLogFile(logfile, conf);
167 } catch (IOException e) {
168 LOG.warn("Could not finish splitting of log file " + logfile, e);
169 return Status.ERR;
170 }
171 return Status.DONE;
172 }
173 });
174 }
175
176
177
178
179
180
181
182
183
184
185
186
187 public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
188 Stoppable stopper, MasterServices master, ServerName serverName, TaskFinisher tf) {
189 super(zkw);
190 this.taskFinisher = tf;
191 this.conf = conf;
192 this.stopper = stopper;
193 this.master = master;
194 this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
195 this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
196 this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT);
197 this.unassignedTimeout =
198 conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
199 LOG.info("timeout=" + timeout + ", unassigned timeout=" + unassignedTimeout);
200
201 this.serverName = serverName;
202 this.timeoutMonitor = new TimeoutMonitor(
203 conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper);
204
205 this.failedDeletions = Collections.synchronizedSet(new HashSet<String>());
206 this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
207 HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
208 LOG.info("distributedLogReplay = " + this.distributedLogReplay);
209 }
210
211 public void finishInitialization(boolean masterRecovery) {
212 if (!masterRecovery) {
213 Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName
214 + ".splitLogManagerTimeoutMonitor");
215 }
216
217 if (this.watcher != null) {
218 this.watcher.registerListener(this);
219 lookForOrphans();
220 }
221 }
222
223 private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
224 List<FileStatus> fileStatus = new ArrayList<FileStatus>();
225 for (Path hLogDir : logDirs) {
226 this.fs = hLogDir.getFileSystem(conf);
227 if (!fs.exists(hLogDir)) {
228 LOG.warn(hLogDir + " doesn't exist. Nothing to do!");
229 continue;
230 }
231 FileStatus[] logfiles = FSUtils.listStatus(fs, hLogDir, filter);
232 if (logfiles == null || logfiles.length == 0) {
233 LOG.info(hLogDir + " is empty dir, no logs to split");
234 } else {
235 for (FileStatus status : logfiles)
236 fileStatus.add(status);
237 }
238 }
239 FileStatus[] a = new FileStatus[fileStatus.size()];
240 return fileStatus.toArray(a);
241 }
242
243
244
245
246
247
248
249
250
251 public long splitLogDistributed(final Path logDir) throws IOException {
252 List<Path> logDirs = new ArrayList<Path>();
253 logDirs.add(logDir);
254 return splitLogDistributed(logDirs);
255 }
256
257
258
259
260
261
262
263
264
265
266
267 public long splitLogDistributed(final List<Path> logDirs) throws IOException {
268 if (logDirs.isEmpty()) {
269 return 0;
270 }
271 Set<ServerName> serverNames = new HashSet<ServerName>();
272 for (Path logDir : logDirs) {
273 try {
274 ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logDir);
275 if (serverName != null) {
276 serverNames.add(serverName);
277 }
278 } catch (IllegalArgumentException e) {
279
280 LOG.warn("Cannot parse server name from " + logDir);
281 }
282 }
283 return splitLogDistributed(serverNames, logDirs, null);
284 }
285
286
287
288
289
290
291
292
293
294
295
296
297 public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
298 PathFilter filter) throws IOException {
299 MonitoredTask status = TaskMonitor.get().createStatus(
300 "Doing distributed log split in " + logDirs);
301 FileStatus[] logfiles = getFileList(logDirs, filter);
302 status.setStatus("Checking directory contents...");
303 LOG.debug("Scheduling batch of logs to split");
304 SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
305 LOG.info("started splitting " + logfiles.length + " logs in " + logDirs);
306 long t = EnvironmentEdgeManager.currentTimeMillis();
307 long totalSize = 0;
308 TaskBatch batch = new TaskBatch();
309 for (FileStatus lf : logfiles) {
310
311
312
313
314
315 totalSize += lf.getLen();
316 String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
317 if (!enqueueSplitTask(pathToLog, batch)) {
318 throw new IOException("duplicate log split scheduled for " + lf.getPath());
319 }
320 }
321 waitForSplittingCompletion(batch, status);
322
323 this.removeRecoveringRegionsFromZK(serverNames);
324
325 if (batch.done != batch.installed) {
326 batch.isDead = true;
327 SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
328 LOG.warn("error while splitting logs in " + logDirs +
329 " installed = " + batch.installed + " but only " + batch.done + " done");
330 String msg = "error or interrupted while splitting logs in "
331 + logDirs + " Task = " + batch;
332 status.abort(msg);
333 throw new IOException(msg);
334 }
335 for(Path logDir: logDirs){
336 status.setStatus("Cleaning up log directory...");
337 try {
338 if (fs.exists(logDir) && !fs.delete(logDir, false)) {
339 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
340 }
341 } catch (IOException ioe) {
342 FileStatus[] files = fs.listStatus(logDir);
343 if (files != null && files.length > 0) {
344 LOG.warn("returning success without actually splitting and " +
345 "deleting all the log files in path " + logDir);
346 } else {
347 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
348 }
349 }
350 SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
351 }
352 String msg = "finished splitting (more than or equal to) " + totalSize +
353 " bytes in " + batch.installed + " log files in " + logDirs + " in " +
354 (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms";
355 status.markComplete(msg);
356 LOG.info(msg);
357 return totalSize;
358 }
359
360
361
362
363
364
365
366
367 boolean enqueueSplitTask(String taskname, TaskBatch batch) {
368 SplitLogCounters.tot_mgr_log_split_start.incrementAndGet();
369
370
371 String path = ZKSplitLog.getEncodedNodeName(watcher, taskname);
372 Task oldtask = createTaskIfAbsent(path, batch);
373 if (oldtask == null) {
374
375 createNode(path, zkretries);
376 return true;
377 }
378 return false;
379 }
380
381 private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
382 synchronized (batch) {
383 while ((batch.done + batch.error) != batch.installed) {
384 try {
385 status.setStatus("Waiting for distributed tasks to finish. "
386 + " scheduled=" + batch.installed
387 + " done=" + batch.done
388 + " error=" + batch.error);
389 int remaining = batch.installed - (batch.done + batch.error);
390 int actual = activeTasks(batch);
391 if (remaining != actual) {
392 LOG.warn("Expected " + remaining
393 + " active tasks, but actually there are " + actual);
394 }
395 int remainingInZK = remainingTasksInZK();
396 if (remainingInZK >= 0 && actual > remainingInZK) {
397 LOG.warn("Expected at least" + actual
398 + " tasks in ZK, but actually there are " + remainingInZK);
399 }
400 if (remainingInZK == 0 || actual == 0) {
401 LOG.warn("No more task remaining (ZK or task map), splitting "
402 + "should have completed. Remaining tasks in ZK " + remainingInZK
403 + ", active tasks in map " + actual);
404 if (remainingInZK == 0 && actual == 0) {
405 return;
406 }
407 }
408 batch.wait(100);
409 if (stopper.isStopped()) {
410 LOG.warn("Stopped while waiting for log splits to be completed");
411 return;
412 }
413 } catch (InterruptedException e) {
414 LOG.warn("Interrupted while waiting for log splits to be completed");
415 Thread.currentThread().interrupt();
416 return;
417 }
418 }
419 }
420 }
421
422 private int activeTasks(final TaskBatch batch) {
423 int count = 0;
424 for (Task t: tasks.values()) {
425 if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
426 count++;
427 }
428 }
429 return count;
430 }
431
432 private int remainingTasksInZK() {
433 int count = 0;
434 try {
435 List<String> tasks =
436 ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
437 if (tasks != null) {
438 for (String t: tasks) {
439 if (!ZKSplitLog.isRescanNode(watcher, t)) {
440 count++;
441 }
442 }
443 }
444 } catch (KeeperException ke) {
445 LOG.warn("Failed to check remaining tasks", ke);
446 count = -1;
447 }
448 return count;
449 }
450
451
452
453
454
455
456 private void removeRecoveringRegionsFromZK(final Set<ServerName> serverNames) {
457
458 if (!this.distributedLogReplay) {
459
460 return;
461 }
462
463 int count = 0;
464 Set<String> recoveredServerNameSet = new HashSet<String>();
465 if (serverNames != null) {
466 for (ServerName tmpServerName : serverNames) {
467 recoveredServerNameSet.add(tmpServerName.getServerName());
468 }
469 }
470
471 try {
472 this.recoveringRegionLock.lock();
473
474 List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
475 if (tasks != null) {
476 for (String t : tasks) {
477 if (!ZKSplitLog.isRescanNode(watcher, t)) {
478 count++;
479 }
480 }
481 }
482 if (count == 0 && this.master.isInitialized()
483 && !this.master.getServerManager().areDeadServersInProgress()) {
484
485 deleteRecoveringRegionZNodes(null);
486
487
488 lastRecoveringNodeCreationTime = Long.MAX_VALUE;
489 } else if (!recoveredServerNameSet.isEmpty()) {
490
491 List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
492 if (regions != null) {
493 for (String region : regions) {
494 String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
495 List<String> failedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
496 if (failedServers == null || failedServers.isEmpty()) {
497 ZKUtil.deleteNode(watcher, nodePath);
498 continue;
499 }
500 if (recoveredServerNameSet.containsAll(failedServers)) {
501 ZKUtil.deleteNodeRecursively(watcher, nodePath);
502 } else {
503 for (String failedServer : failedServers) {
504 if (recoveredServerNameSet.contains(failedServer)) {
505 String tmpPath = ZKUtil.joinZNode(nodePath, failedServer);
506 ZKUtil.deleteNode(watcher, tmpPath);
507 }
508 }
509 }
510 }
511 }
512 }
513 } catch (KeeperException ke) {
514 LOG.warn("removeRecoveringRegionsFromZK got zookeeper exception. Will retry", ke);
515 if (serverNames != null && !serverNames.isEmpty()) {
516 this.failedRecoveringRegionDeletions.addAll(serverNames);
517 }
518 } finally {
519 this.recoveringRegionLock.unlock();
520 }
521 }
522
523
524
525
526
527
528
529 void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
530 throws KeeperException {
531
532 if (!this.distributedLogReplay) {
533
534 return;
535 }
536
537 Set<String> knownFailedServers = new HashSet<String>();
538 if (failedServers != null) {
539 for (ServerName tmpServerName : failedServers) {
540 knownFailedServers.add(tmpServerName.getServerName());
541 }
542 }
543
544 this.recoveringRegionLock.lock();
545 try {
546 List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
547 if (tasks != null) {
548 for (String t : tasks) {
549 byte[] data = ZKUtil.getData(this.watcher, ZKUtil.joinZNode(watcher.splitLogZNode, t));
550 if (data != null) {
551 SplitLogTask slt = null;
552 try {
553 slt = SplitLogTask.parseFrom(data);
554 } catch (DeserializationException e) {
555 LOG.warn("Failed parse data for znode " + t, e);
556 }
557 if (slt != null && slt.isDone()) {
558 continue;
559 }
560 }
561
562 t = ZKSplitLog.getFileName(t);
563 ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(new Path(t));
564 if (serverName != null) {
565 knownFailedServers.add(serverName.getServerName());
566 } else {
567 LOG.warn("Found invalid WAL log file name:" + t);
568 }
569 }
570 }
571
572
573 List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
574 if (regions != null) {
575 for (String region : regions) {
576 String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
577 List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
578 if (regionFailedServers == null || regionFailedServers.isEmpty()) {
579 ZKUtil.deleteNode(watcher, nodePath);
580 continue;
581 }
582 boolean needMoreRecovery = false;
583 for (String tmpFailedServer : regionFailedServers) {
584 if (knownFailedServers.contains(tmpFailedServer)) {
585 needMoreRecovery = true;
586 break;
587 }
588 }
589 if (!needMoreRecovery) {
590 ZKUtil.deleteNode(watcher, nodePath);
591 }
592 }
593 }
594 } finally {
595 this.recoveringRegionLock.unlock();
596 }
597 }
598
599 private void deleteRecoveringRegionZNodes(List<String> regions) {
600 try {
601 if (regions == null) {
602
603 LOG.info("Garbage collecting all recovering regions.");
604 ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode);
605 } else {
606 for (String curRegion : regions) {
607 String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, curRegion);
608 ZKUtil.deleteNodeRecursively(watcher, nodePath);
609 }
610 }
611 } catch (KeeperException e) {
612 LOG.warn("Cannot remove recovering regions from ZooKeeper", e);
613 }
614 }
615
616 private void setDone(String path, TerminationStatus status) {
617 Task task = tasks.get(path);
618 if (task == null) {
619 if (!ZKSplitLog.isRescanNode(watcher, path)) {
620 SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet();
621 LOG.debug("unacquired orphan task is done " + path);
622 }
623 } else {
624 synchronized (task) {
625 if (task.status == IN_PROGRESS) {
626 if (status == SUCCESS) {
627 SplitLogCounters.tot_mgr_log_split_success.incrementAndGet();
628 LOG.info("Done splitting " + path);
629 } else {
630 SplitLogCounters.tot_mgr_log_split_err.incrementAndGet();
631 LOG.warn("Error splitting " + path);
632 }
633 task.status = status;
634 if (task.batch != null) {
635 synchronized (task.batch) {
636 if (status == SUCCESS) {
637 task.batch.done++;
638 } else {
639 task.batch.error++;
640 }
641 task.batch.notify();
642 }
643 }
644 }
645 }
646 }
647
648
649
650
651
652 deleteNode(path, zkretries);
653 return;
654 }
655
656 private void createNode(String path, Long retry_count) {
657 SplitLogTask slt = new SplitLogTask.Unassigned(serverName);
658 ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), retry_count);
659 SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
660 return;
661 }
662
663 private void createNodeSuccess(String path) {
664 lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis();
665 LOG.debug("put up splitlog task at znode " + path);
666 getDataSetWatch(path, zkretries);
667 }
668
669 private void createNodeFailure(String path) {
670
671 LOG.warn("failed to create task node" + path);
672 setDone(path, FAILURE);
673 }
674
675
676 private void getDataSetWatch(String path, Long retry_count) {
677 this.watcher.getRecoverableZooKeeper().getZooKeeper().
678 getData(path, this.watcher,
679 new GetDataAsyncCallback(), retry_count);
680 SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
681 }
682
683 private void tryGetDataSetWatch(String path) {
684
685 this.watcher.getRecoverableZooKeeper().getZooKeeper().
686 getData(path, this.watcher,
687 new GetDataAsyncCallback(), Long.valueOf(-1)
688 SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
689 }
690
691 private void getDataSetWatchSuccess(String path, byte[] data, int version)
692 throws DeserializationException {
693 if (data == null) {
694 if (version == Integer.MIN_VALUE) {
695
696 setDone(path, SUCCESS);
697 return;
698 }
699 SplitLogCounters.tot_mgr_null_data.incrementAndGet();
700 LOG.fatal("logic error - got null data " + path);
701 setDone(path, FAILURE);
702 return;
703 }
704 data = this.watcher.getRecoverableZooKeeper().removeMetaData(data);
705 SplitLogTask slt = SplitLogTask.parseFrom(data);
706 if (slt.isUnassigned()) {
707 LOG.debug("task not yet acquired " + path + " ver = " + version);
708 handleUnassignedTask(path);
709 } else if (slt.isOwned()) {
710 heartbeat(path, version, slt.getServerName());
711 } else if (slt.isResigned()) {
712 LOG.info("task " + path + " entered state: " + slt.toString());
713 resubmitOrFail(path, FORCE);
714 } else if (slt.isDone()) {
715 LOG.info("task " + path + " entered state: " + slt.toString());
716 if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
717 if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) {
718 setDone(path, SUCCESS);
719 } else {
720 resubmitOrFail(path, CHECK);
721 }
722 } else {
723 setDone(path, SUCCESS);
724 }
725 } else if (slt.isErr()) {
726 LOG.info("task " + path + " entered state: " + slt.toString());
727 resubmitOrFail(path, CHECK);
728 } else {
729 LOG.fatal("logic error - unexpected zk state for path = " + path + " data = " + slt.toString());
730 setDone(path, FAILURE);
731 }
732 }
733
734 private void getDataSetWatchFailure(String path) {
735 LOG.warn("failed to set data watch " + path);
736 setDone(path, FAILURE);
737 }
738
739
740
741
742
743
744
745
746
747
748 private void handleUnassignedTask(String path) {
749 if (ZKSplitLog.isRescanNode(watcher, path)) {
750 return;
751 }
752 Task task = findOrCreateOrphanTask(path);
753 if (task.isOrphan() && (task.incarnation == 0)) {
754 LOG.info("resubmitting unassigned orphan task " + path);
755
756
757 resubmit(path, task, FORCE);
758 }
759 }
760
761
762
763
764
765
766
767 private boolean needAbandonRetries(int statusCode, String action) {
768 if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) {
769 LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for "
770 + "action=" + action);
771 return true;
772 }
773 return false;
774 }
775
776 private void heartbeat(String path, int new_version, ServerName workerName) {
777 Task task = findOrCreateOrphanTask(path);
778 if (new_version != task.last_version) {
779 if (task.isUnassigned()) {
780 LOG.info("task " + path + " acquired by " + workerName);
781 }
782 task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(), new_version, workerName);
783 SplitLogCounters.tot_mgr_heartbeat.incrementAndGet();
784 } else {
785
786
787
788
789 }
790 return;
791 }
792
793 private boolean resubmit(String path, Task task, ResubmitDirective directive) {
794
795 if (task.status != IN_PROGRESS) {
796 return false;
797 }
798 int version;
799 if (directive != FORCE) {
800
801
802
803
804
805 final long time = EnvironmentEdgeManager.currentTimeMillis() - task.last_update;
806 final boolean alive = master.getServerManager() != null ?
807 master.getServerManager().isServerOnline(task.cur_worker_name) : true;
808 if (alive && time < timeout) {
809 LOG.trace("Skipping the resubmit of " + task.toString() + " because the server " +
810 task.cur_worker_name + " is not marked as dead, we waited for " + time +
811 " while the timeout is " + timeout);
812 return false;
813 }
814 if (task.unforcedResubmits >= resubmit_threshold) {
815 if (!task.resubmitThresholdReached) {
816 task.resubmitThresholdReached = true;
817 SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet();
818 LOG.info("Skipping resubmissions of task " + path +
819 " because threshold " + resubmit_threshold + " reached");
820 }
821 return false;
822 }
823
824 version = task.last_version;
825 } else {
826 version = -1;
827 }
828 LOG.info("resubmitting task " + path);
829 task.incarnation++;
830 try {
831
832 SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName);
833 if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
834 LOG.debug("failed to resubmit task " + path +
835 " version changed");
836 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
837 return false;
838 }
839 } catch (NoNodeException e) {
840 LOG.warn("failed to resubmit because znode doesn't exist " + path +
841 " task done (or forced done by removing the znode)");
842 try {
843 getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
844 } catch (DeserializationException e1) {
845 LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1);
846 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
847 return false;
848 }
849 return false;
850 } catch (KeeperException.BadVersionException e) {
851 LOG.debug("failed to resubmit task " + path + " version changed");
852 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
853 return false;
854 } catch (KeeperException e) {
855 SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet();
856 LOG.warn("failed to resubmit " + path, e);
857 return false;
858 }
859
860 if (directive != FORCE) {
861 task.unforcedResubmits++;
862 }
863 task.setUnassigned();
864 createRescanNode(Long.MAX_VALUE);
865 SplitLogCounters.tot_mgr_resubmit.incrementAndGet();
866 return true;
867 }
868
869 private void resubmitOrFail(String path, ResubmitDirective directive) {
870 if (resubmit(path, findOrCreateOrphanTask(path), directive) == false) {
871 setDone(path, FAILURE);
872 }
873 }
874
875 private void deleteNode(String path, Long retries) {
876 SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet();
877
878
879
880 this.watcher.getRecoverableZooKeeper().getZooKeeper().
881 delete(path, -1, new DeleteAsyncCallback(),
882 retries);
883 }
884
885 private void deleteNodeSuccess(String path) {
886 if (ignoreZKDeleteForTesting) {
887 return;
888 }
889 Task task;
890 task = tasks.remove(path);
891 if (task == null) {
892 if (ZKSplitLog.isRescanNode(watcher, path)) {
893 SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet();
894 }
895 SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet();
896 LOG.debug("deleted task without in memory state " + path);
897 return;
898 }
899 synchronized (task) {
900 task.status = DELETED;
901 task.notify();
902 }
903 SplitLogCounters.tot_mgr_task_deleted.incrementAndGet();
904 }
905
906 private void deleteNodeFailure(String path) {
907 LOG.info("Failed to delete node " + path + " and will retry soon.");
908 return;
909 }
910
911
912
913
914
915
916 private void createRescanNode(long retries) {
917
918
919
920
921
922
923
924 SplitLogTask slt = new SplitLogTask.Done(this.serverName);
925 this.watcher.getRecoverableZooKeeper().getZooKeeper().
926 create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(),
927 Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
928 new CreateRescanAsyncCallback(), Long.valueOf(retries));
929 }
930
931 private void createRescanSuccess(String path) {
932 lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis();
933 SplitLogCounters.tot_mgr_rescan.incrementAndGet();
934 getDataSetWatch(path, zkretries);
935 }
936
937 private void createRescanFailure() {
938 LOG.fatal("logic failure, rescan failure must not happen");
939 }
940
941
942
943
944
945
946 private Task createTaskIfAbsent(String path, TaskBatch batch) {
947 Task oldtask;
948
949
950 Task newtask = new Task();
951 newtask.batch = batch;
952 oldtask = tasks.putIfAbsent(path, newtask);
953 if (oldtask == null) {
954 batch.installed++;
955 return null;
956 }
957
958 synchronized (oldtask) {
959 if (oldtask.isOrphan()) {
960 if (oldtask.status == SUCCESS) {
961
962
963
964
965 return (null);
966 }
967 if (oldtask.status == IN_PROGRESS) {
968 oldtask.batch = batch;
969 batch.installed++;
970 LOG.debug("Previously orphan task " + path + " is now being waited upon");
971 return null;
972 }
973 while (oldtask.status == FAILURE) {
974 LOG.debug("wait for status of task " + path + " to change to DELETED");
975 SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet();
976 try {
977 oldtask.wait();
978 } catch (InterruptedException e) {
979 Thread.currentThread().interrupt();
980 LOG.warn("Interrupted when waiting for znode delete callback");
981
982 break;
983 }
984 }
985 if (oldtask.status != DELETED) {
986 LOG.warn("Failure because previously failed task" +
987 " state still present. Waiting for znode delete callback" +
988 " path=" + path);
989 return oldtask;
990 }
991
992 Task t = tasks.putIfAbsent(path, newtask);
993 if (t == null) {
994 batch.installed++;
995 return null;
996 }
997 LOG.fatal("Logic error. Deleted task still present in tasks map");
998 assert false : "Deleted task still present in tasks map";
999 return t;
1000 }
1001 LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
1002 return oldtask;
1003 }
1004 }
1005
1006 Task findOrCreateOrphanTask(String path) {
1007 Task orphanTask = new Task();
1008 Task task;
1009 task = tasks.putIfAbsent(path, orphanTask);
1010 if (task == null) {
1011 LOG.info("creating orphan task " + path);
1012 SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
1013 task = orphanTask;
1014 }
1015 return task;
1016 }
1017
1018 @Override
1019 public void nodeDataChanged(String path) {
1020 Task task;
1021 task = tasks.get(path);
1022 if (task != null || ZKSplitLog.isRescanNode(watcher, path)) {
1023 if (task != null) {
1024 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
1025 }
1026 getDataSetWatch(path, zkretries);
1027 }
1028 }
1029
1030 public void stop() {
1031 if (timeoutMonitor != null) {
1032 timeoutMonitor.interrupt();
1033 }
1034 }
1035
1036 private void lookForOrphans() {
1037 List<String> orphans;
1038 try {
1039 orphans = ZKUtil.listChildrenNoWatch(this.watcher,
1040 this.watcher.splitLogZNode);
1041 if (orphans == null) {
1042 LOG.warn("could not get children of " + this.watcher.splitLogZNode);
1043 return;
1044 }
1045 } catch (KeeperException e) {
1046 LOG.warn("could not get children of " + this.watcher.splitLogZNode +
1047 " " + StringUtils.stringifyException(e));
1048 return;
1049 }
1050 int rescan_nodes = 0;
1051 for (String path : orphans) {
1052 String nodepath = ZKUtil.joinZNode(watcher.splitLogZNode, path);
1053 if (ZKSplitLog.isRescanNode(watcher, nodepath)) {
1054 rescan_nodes++;
1055 LOG.debug("found orphan rescan node " + path);
1056 } else {
1057 LOG.info("found orphan task " + path);
1058 }
1059 getDataSetWatch(nodepath, zkretries);
1060 }
1061 LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " +
1062 rescan_nodes + " rescan nodes");
1063 }
1064
1065
1066
1067
1068
1069
1070
1071 void markRegionsRecoveringInZK(final ServerName serverName, Set<HRegionInfo> userRegions)
1072 throws KeeperException {
1073 if (userRegions == null || !this.distributedLogReplay) {
1074 return;
1075 }
1076
1077 try {
1078 this.recoveringRegionLock.lock();
1079
1080 this.lastRecoveringNodeCreationTime = EnvironmentEdgeManager.currentTimeMillis();
1081
1082 for (HRegionInfo region : userRegions) {
1083 String regionEncodeName = region.getEncodedName();
1084 long retries = this.zkretries;
1085
1086 do {
1087 String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName);
1088 long lastRecordedFlushedSequenceId = -1;
1089 try {
1090 long lastSequenceId = this.master.getServerManager().getLastFlushedSequenceId(
1091 regionEncodeName.getBytes());
1092
1093
1094
1095
1096
1097 byte[] data = ZKUtil.getData(this.watcher, nodePath);
1098 if (data == null) {
1099 ZKUtil.createSetData(this.watcher, nodePath,
1100 ZKUtil.positionToByteArray(lastSequenceId));
1101 } else {
1102 lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
1103 if (lastRecordedFlushedSequenceId < lastSequenceId) {
1104
1105 ZKUtil.setData(this.watcher, nodePath,
1106 ZKUtil.positionToByteArray(lastSequenceId));
1107 }
1108 }
1109
1110 nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName());
1111 if (lastSequenceId <= lastRecordedFlushedSequenceId) {
1112
1113 lastSequenceId = lastRecordedFlushedSequenceId;
1114 }
1115 ZKUtil.createSetData(this.watcher, nodePath,
1116 ZKUtil.positionToByteArray(lastSequenceId));
1117 LOG.debug("Mark region " + regionEncodeName + " recovering from failed region server "
1118 + serverName);
1119
1120
1121 break;
1122 } catch (KeeperException e) {
1123
1124 if (retries <= 1) {
1125 throw e;
1126 }
1127
1128 try {
1129 Thread.sleep(20);
1130 } catch (Exception ignoreE) {
1131
1132 }
1133 }
1134 } while ((--retries) > 0 && (!this.stopper.isStopped()));
1135 }
1136 } finally {
1137 this.recoveringRegionLock.unlock();
1138 }
1139 }
1140
1141
1142
1143
1144
1145 public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) {
1146 long lastRecordedFlushedSequenceId = -1l;
1147 try {
1148 lastRecordedFlushedSequenceId = ZKUtil.parseHLogPositionFrom(bytes);
1149 } catch (DeserializationException e) {
1150 lastRecordedFlushedSequenceId = -1l;
1151 LOG.warn("Can't parse last flushed sequence Id", e);
1152 }
1153 return lastRecordedFlushedSequenceId;
1154 }
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164 public static long getLastFlushedSequenceId(ZooKeeperWatcher zkw, String serverName,
1165 String encodedRegionName) throws IOException {
1166
1167
1168
1169
1170
1171
1172
1173 long lastFlushedSequenceId = -1;
1174 String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
1175 nodePath = ZKUtil.joinZNode(nodePath, serverName);
1176 try {
1177 byte[] data = ZKUtil.getData(zkw, nodePath);
1178 if (data != null) {
1179 lastFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
1180 }
1181 } catch (KeeperException e) {
1182 throw new IOException("Cannot get lastFlushedSequenceId from ZooKeeper for server="
1183 + serverName + "; region=" + encodedRegionName, e);
1184 }
1185 return lastFlushedSequenceId;
1186 }
1187
1188
1189
1190
1191
1192
1193
1194 static class TaskBatch {
1195 int installed = 0;
1196 int done = 0;
1197 int error = 0;
1198 volatile boolean isDead = false;
1199
1200 @Override
1201 public String toString() {
1202 return ("installed = " + installed + " done = " + done + " error = " + error);
1203 }
1204 }
1205
1206
1207
1208
1209 static class Task {
1210 volatile long last_update;
1211 volatile int last_version;
1212 volatile ServerName cur_worker_name;
1213 volatile TaskBatch batch;
1214 volatile TerminationStatus status;
1215 volatile int incarnation;
1216 volatile int unforcedResubmits;
1217 volatile boolean resubmitThresholdReached;
1218
1219 @Override
1220 public String toString() {
1221 return ("last_update = " + last_update +
1222 " last_version = " + last_version +
1223 " cur_worker_name = " + cur_worker_name +
1224 " status = " + status +
1225 " incarnation = " + incarnation +
1226 " resubmits = " + unforcedResubmits +
1227 " batch = " + batch);
1228 }
1229
1230 Task() {
1231 incarnation = 0;
1232 last_version = -1;
1233 status = IN_PROGRESS;
1234 setUnassigned();
1235 }
1236
1237 public boolean isOrphan() {
1238 return (batch == null || batch.isDead);
1239 }
1240
1241 public boolean isUnassigned() {
1242 return (cur_worker_name == null);
1243 }
1244
1245 public void heartbeatNoDetails(long time) {
1246 last_update = time;
1247 }
1248
1249 public void heartbeat(long time, int version, ServerName worker) {
1250 last_version = version;
1251 last_update = time;
1252 cur_worker_name = worker;
1253 }
1254
1255 public void setUnassigned() {
1256 cur_worker_name = null;
1257 last_update = -1;
1258 }
1259 }
1260
1261 void handleDeadWorker(ServerName workerName) {
1262
1263
1264 synchronized (deadWorkersLock) {
1265 if (deadWorkers == null) {
1266 deadWorkers = new HashSet<ServerName>(100);
1267 }
1268 deadWorkers.add(workerName);
1269 }
1270 LOG.info("dead splitlog worker " + workerName);
1271 }
1272
1273 void handleDeadWorkers(Set<ServerName> serverNames) {
1274 synchronized (deadWorkersLock) {
1275 if (deadWorkers == null) {
1276 deadWorkers = new HashSet<ServerName>(100);
1277 }
1278 deadWorkers.addAll(serverNames);
1279 }
1280 LOG.info("dead splitlog workers " + serverNames);
1281 }
1282
1283
1284
1285
1286
1287 private class TimeoutMonitor extends Chore {
1288 public TimeoutMonitor(final int period, Stoppable stopper) {
1289 super("SplitLogManager Timeout Monitor", period, stopper);
1290 }
1291
1292 @Override
1293 protected void chore() {
1294 int resubmitted = 0;
1295 int unassigned = 0;
1296 int tot = 0;
1297 boolean found_assigned_task = false;
1298 Set<ServerName> localDeadWorkers;
1299
1300 synchronized (deadWorkersLock) {
1301 localDeadWorkers = deadWorkers;
1302 deadWorkers = null;
1303 }
1304
1305 for (Map.Entry<String, Task> e : tasks.entrySet()) {
1306 String path = e.getKey();
1307 Task task = e.getValue();
1308 ServerName cur_worker = task.cur_worker_name;
1309 tot++;
1310
1311
1312
1313
1314
1315 if (task.isUnassigned()) {
1316 unassigned++;
1317 continue;
1318 }
1319 found_assigned_task = true;
1320 if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
1321 SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet();
1322 if (resubmit(path, task, FORCE)) {
1323 resubmitted++;
1324 } else {
1325 handleDeadWorker(cur_worker);
1326 LOG.warn("Failed to resubmit task " + path + " owned by dead " +
1327 cur_worker + ", will retry.");
1328 }
1329 } else if (resubmit(path, task, CHECK)) {
1330 resubmitted++;
1331 }
1332 }
1333 if (tot > 0) {
1334 LOG.debug("total tasks = " + tot + " unassigned = " + unassigned);
1335 }
1336 if (resubmitted > 0) {
1337 LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
1338 }
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348 if (tot > 0 && !found_assigned_task &&
1349 ((EnvironmentEdgeManager.currentTimeMillis() - lastNodeCreateTime) >
1350 unassignedTimeout)) {
1351 for (Map.Entry<String, Task> e : tasks.entrySet()) {
1352 String path = e.getKey();
1353 Task task = e.getValue();
1354
1355
1356
1357
1358 if (task.isUnassigned() && (task.status != FAILURE)) {
1359
1360 tryGetDataSetWatch(path);
1361 }
1362 }
1363 createRescanNode(Long.MAX_VALUE);
1364 SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
1365 LOG.debug("resubmitting unassigned task(s) after timeout");
1366 }
1367
1368
1369 if (failedDeletions.size() > 0) {
1370 List<String> tmpPaths = new ArrayList<String>(failedDeletions);
1371 for (String tmpPath : tmpPaths) {
1372
1373 deleteNode(tmpPath, zkretries);
1374 }
1375 failedDeletions.removeAll(tmpPaths);
1376 }
1377
1378
1379 long timeInterval = EnvironmentEdgeManager.currentTimeMillis()
1380 - lastRecoveringNodeCreationTime;
1381 if (!failedRecoveringRegionDeletions.isEmpty()
1382 || (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) {
1383
1384 Set<ServerName> previouslyFailedDeletoins = null;
1385 if (!failedRecoveringRegionDeletions.isEmpty()) {
1386 previouslyFailedDeletoins = new HashSet<ServerName>(failedRecoveringRegionDeletions);
1387 failedRecoveringRegionDeletions.removeAll(previouslyFailedDeletoins);
1388 }
1389 removeRecoveringRegionsFromZK(previouslyFailedDeletoins);
1390 }
1391 }
1392 }
1393
1394
1395
1396
1397
1398 class CreateAsyncCallback implements AsyncCallback.StringCallback {
1399 private final Log LOG = LogFactory.getLog(CreateAsyncCallback.class);
1400
1401 @Override
1402 public void processResult(int rc, String path, Object ctx, String name) {
1403 SplitLogCounters.tot_mgr_node_create_result.incrementAndGet();
1404 if (rc != 0) {
1405 if (needAbandonRetries(rc, "Create znode " + path)) {
1406 createNodeFailure(path);
1407 return;
1408 }
1409 if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
1410
1411
1412
1413
1414
1415
1416 LOG.debug("found pre-existing znode " + path);
1417 SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet();
1418 } else {
1419 Long retry_count = (Long)ctx;
1420 LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " +
1421 path + " remaining retries=" + retry_count);
1422 if (retry_count == 0) {
1423 SplitLogCounters.tot_mgr_node_create_err.incrementAndGet();
1424 createNodeFailure(path);
1425 } else {
1426 SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet();
1427 createNode(path, retry_count - 1);
1428 }
1429 return;
1430 }
1431 }
1432 createNodeSuccess(path);
1433 }
1434 }
1435
1436
1437
1438
1439
1440 class GetDataAsyncCallback implements AsyncCallback.DataCallback {
1441 private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
1442
1443 @Override
1444 public void processResult(int rc, String path, Object ctx, byte[] data,
1445 Stat stat) {
1446 SplitLogCounters.tot_mgr_get_data_result.incrementAndGet();
1447 if (rc != 0) {
1448 if (needAbandonRetries(rc, "GetData from znode " + path)) {
1449 return;
1450 }
1451 if (rc == KeeperException.Code.NONODE.intValue()) {
1452 SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet();
1453
1454
1455
1456 LOG.warn("task znode " + path + " vanished.");
1457 try {
1458 getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
1459 } catch (DeserializationException e) {
1460 LOG.warn("Deserialization problem", e);
1461 }
1462 return;
1463 }
1464 Long retry_count = (Long) ctx;
1465
1466 if (retry_count < 0) {
1467 LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
1468 path + ". Ignoring error. No error handling. No retrying.");
1469 return;
1470 }
1471 LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
1472 path + " remaining retries=" + retry_count);
1473 if (retry_count == 0) {
1474 SplitLogCounters.tot_mgr_get_data_err.incrementAndGet();
1475 getDataSetWatchFailure(path);
1476 } else {
1477 SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet();
1478 getDataSetWatch(path, retry_count - 1);
1479 }
1480 return;
1481 }
1482 try {
1483 getDataSetWatchSuccess(path, data, stat.getVersion());
1484 } catch (DeserializationException e) {
1485 LOG.warn("Deserialization problem", e);
1486 }
1487 return;
1488 }
1489 }
1490
1491
1492
1493
1494
1495 class DeleteAsyncCallback implements AsyncCallback.VoidCallback {
1496 private final Log LOG = LogFactory.getLog(DeleteAsyncCallback.class);
1497
1498 @Override
1499 public void processResult(int rc, String path, Object ctx) {
1500 SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet();
1501 if (rc != 0) {
1502 if (needAbandonRetries(rc, "Delete znode " + path)) {
1503 failedDeletions.add(path);
1504 return;
1505 }
1506 if (rc != KeeperException.Code.NONODE.intValue()) {
1507 SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet();
1508 Long retry_count = (Long) ctx;
1509 LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " +
1510 path + " remaining retries=" + retry_count);
1511 if (retry_count == 0) {
1512 LOG.warn("delete failed " + path);
1513 failedDeletions.add(path);
1514 deleteNodeFailure(path);
1515 } else {
1516 deleteNode(path, retry_count - 1);
1517 }
1518 return;
1519 } else {
1520 LOG.info(path +
1521 " does not exist. Either was created but deleted behind our" +
1522 " back by another pending delete OR was deleted" +
1523 " in earlier retry rounds. zkretries = " + (Long) ctx);
1524 }
1525 } else {
1526 LOG.debug("deleted " + path);
1527 }
1528 deleteNodeSuccess(path);
1529 }
1530 }
1531
1532
1533
1534
1535
1536
1537
1538
1539 class CreateRescanAsyncCallback implements AsyncCallback.StringCallback {
1540 private final Log LOG = LogFactory.getLog(CreateRescanAsyncCallback.class);
1541
1542 @Override
1543 public void processResult(int rc, String path, Object ctx, String name) {
1544 if (rc != 0) {
1545 if (needAbandonRetries(rc, "CreateRescan znode " + path)) {
1546 return;
1547 }
1548 Long retry_count = (Long)ctx;
1549 LOG.warn("rc=" + KeeperException.Code.get(rc) + " for "+ path +
1550 " remaining retries=" + retry_count);
1551 if (retry_count == 0) {
1552 createRescanFailure();
1553 } else {
1554 createRescanNode(retry_count - 1);
1555 }
1556 return;
1557 }
1558
1559 createRescanSuccess(name);
1560 }
1561 }
1562
1563
1564
1565
1566
1567
1568
1569 static public interface TaskFinisher {
1570
1571
1572
1573 static public enum Status {
1574
1575
1576
1577 DONE(),
1578
1579
1580
1581 ERR();
1582 }
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592 public Status finish(ServerName workerName, String taskname);
1593 }
1594
1595 enum ResubmitDirective {
1596 CHECK(),
1597 FORCE();
1598 }
1599
1600 enum TerminationStatus {
1601 IN_PROGRESS("in_progress"),
1602 SUCCESS("success"),
1603 FAILURE("failure"),
1604 DELETED("deleted");
1605
1606 String statusMsg;
1607 TerminationStatus(String msg) {
1608 statusMsg = msg;
1609 }
1610
1611 @Override
1612 public String toString() {
1613 return statusMsg;
1614 }
1615 }
1616
1617
1618
1619
1620 public void finishInitialization() {
1621 finishInitialization(false);
1622 }
1623
1624 }