1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.coordination;
20
21 import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
22 import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
23 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
24 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
25 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
26 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
27
28 import java.io.IOException;
29 import java.io.InterruptedIOException;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.List;
33 import java.util.Set;
34 import java.util.concurrent.ConcurrentMap;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.CoordinatedStateManager;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.HRegionInfo;
43 import org.apache.hadoop.hbase.Server;
44 import org.apache.hadoop.hbase.ServerName;
45 import org.apache.hadoop.hbase.SplitLogCounters;
46 import org.apache.hadoop.hbase.SplitLogTask;
47 import org.apache.hadoop.hbase.classification.InterfaceAudience;
48 import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination.TaskFinisher.Status;
49 import org.apache.hadoop.hbase.exceptions.DeserializationException;
50 import org.apache.hadoop.hbase.master.MasterServices;
51 import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective;
52 import org.apache.hadoop.hbase.master.SplitLogManager.Task;
53 import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus;
54 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
55 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
56 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
57 import org.apache.hadoop.hbase.wal.WALSplitter;
58 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
59 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
60 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
61 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
62 import org.apache.hadoop.util.StringUtils;
63 import org.apache.zookeeper.AsyncCallback;
64 import org.apache.zookeeper.CreateMode;
65 import org.apache.zookeeper.KeeperException;
66 import org.apache.zookeeper.KeeperException.NoNodeException;
67 import org.apache.zookeeper.ZooDefs.Ids;
68 import org.apache.zookeeper.data.Stat;
69
70
71
72
73
74 @InterfaceAudience.Private
75 public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
76 SplitLogManagerCoordination {
77
78 public static class ZkSplitLogManagerDetails extends SplitLogManagerDetails {
79
80 ZkSplitLogManagerDetails(ConcurrentMap<String, Task> tasks, MasterServices master,
81 Set<String> failedDeletions, ServerName serverName) {
82 super(tasks, master, failedDeletions, serverName);
83 }
84 }
85
86 public static final int DEFAULT_TIMEOUT = 120000;
87 public static final int DEFAULT_ZK_RETRIES = 3;
88 public static final int DEFAULT_MAX_RESUBMIT = 3;
89
90 private static final Log LOG = LogFactory.getLog(SplitLogManagerCoordination.class);
91
92 private Server server;
93 private long zkretries;
94 private long resubmitThreshold;
95 private long timeout;
96 private TaskFinisher taskFinisher;
97
98 SplitLogManagerDetails details;
99
100
101
102 private volatile long lastRecoveringNodeCreationTime = 0;
103 private Configuration conf;
104 public boolean ignoreZKDeleteForTesting = false;
105
106 private RecoveryMode recoveryMode;
107
108 private boolean isDrainingDone = false;
109
110 public ZKSplitLogManagerCoordination(final CoordinatedStateManager manager,
111 ZooKeeperWatcher watcher) {
112 super(watcher);
113 taskFinisher = new TaskFinisher() {
114 @Override
115 public Status finish(ServerName workerName, String logfile) {
116 try {
117 WALSplitter.finishSplitLogFile(logfile, manager.getServer().getConfiguration());
118 } catch (IOException e) {
119 LOG.warn("Could not finish splitting of log file " + logfile, e);
120 return Status.ERR;
121 }
122 return Status.DONE;
123 }
124 };
125 this.server = manager.getServer();
126 this.conf = server.getConfiguration();
127 }
128
129 @Override
130 public void init() throws IOException {
131 this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
132 this.resubmitThreshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
133 this.timeout = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, DEFAULT_TIMEOUT);
134 setRecoveryMode(true);
135 if (this.watcher != null) {
136 this.watcher.registerListener(this);
137 lookForOrphans();
138 }
139 }
140
141 @Override
142 public String prepareTask(String taskname) {
143 return ZKSplitLog.getEncodedNodeName(watcher, taskname);
144 }
145
146 @Override
147 public int remainingTasksInCoordination() {
148 int count = 0;
149 try {
150 List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
151 if (tasks != null) {
152 int listSize = tasks.size();
153 for (int i = 0; i < listSize; i++) {
154 if (!ZKSplitLog.isRescanNode(tasks.get(i))) {
155 count++;
156 }
157 }
158 }
159 } catch (KeeperException ke) {
160 LOG.warn("Failed to check remaining tasks", ke);
161 count = -1;
162 }
163 return count;
164 }
165
166
167
168
169
170
171
172
173 private void handleUnassignedTask(String path) {
174 if (ZKSplitLog.isRescanNode(watcher, path)) {
175 return;
176 }
177 Task task = findOrCreateOrphanTask(path);
178 if (task.isOrphan() && (task.incarnation == 0)) {
179 LOG.info("resubmitting unassigned orphan task " + path);
180
181
182 resubmitTask(path, task, FORCE);
183 }
184 }
185
186 @Override
187 public void deleteTask(String path) {
188 deleteNode(path, zkretries);
189 }
190
191 @Override
192 public boolean resubmitTask(String path, Task task, ResubmitDirective directive) {
193
194 if (task.status != IN_PROGRESS) {
195 return false;
196 }
197 int version;
198 if (directive != FORCE) {
199
200
201
202
203
204 final long time = EnvironmentEdgeManager.currentTime() - task.last_update;
205 final boolean alive =
206 details.getMaster().getServerManager() != null ? details.getMaster().getServerManager()
207 .isServerOnline(task.cur_worker_name) : true;
208 if (alive && time < timeout) {
209 LOG.trace("Skipping the resubmit of " + task.toString() + " because the server "
210 + task.cur_worker_name + " is not marked as dead, we waited for " + time
211 + " while the timeout is " + timeout);
212 return false;
213 }
214
215 if (task.unforcedResubmits.get() >= resubmitThreshold) {
216 if (!task.resubmitThresholdReached) {
217 task.resubmitThresholdReached = true;
218 SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet();
219 LOG.info("Skipping resubmissions of task " + path + " because threshold "
220 + resubmitThreshold + " reached");
221 }
222 return false;
223 }
224
225 version = task.last_version;
226 } else {
227 SplitLogCounters.tot_mgr_resubmit_force.incrementAndGet();
228 version = -1;
229 }
230 LOG.info("resubmitting task " + path);
231 task.incarnation++;
232 boolean result = resubmit(this.details.getServerName(), path, version);
233 if (!result) {
234 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime());
235 return false;
236 }
237
238 if (directive != FORCE) {
239 task.unforcedResubmits.incrementAndGet();
240 }
241 task.setUnassigned();
242 rescan(Long.MAX_VALUE);
243 SplitLogCounters.tot_mgr_resubmit.incrementAndGet();
244 return true;
245 }
246
247
248 @Override
249 public void checkTasks() {
250 rescan(Long.MAX_VALUE);
251 };
252
253
254
255
256 private void rescan(long retries) {
257
258
259
260
261
262
263
264 SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName(), getRecoveryMode());
265 this.watcher
266 .getRecoverableZooKeeper()
267 .getZooKeeper()
268 .create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
269 CreateMode.EPHEMERAL_SEQUENTIAL, new CreateRescanAsyncCallback(), Long.valueOf(retries));
270 }
271
272 @Override
273 public void submitTask(String path) {
274 createNode(path, zkretries);
275 }
276
277 @Override
278 public void checkTaskStillAvailable(String path) {
279
280 this.watcher
281 .getRecoverableZooKeeper()
282 .getZooKeeper()
283 .getData(path, this.watcher, new GetDataAsyncCallback(),
284 Long.valueOf(-1)
285 SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
286 }
287
288
289
290
291
292
293
294
295 @Override
296 public void removeRecoveringRegions(final Set<String> recoveredServerNameSet,
297 Boolean isMetaRecovery)
298 throws IOException {
299 final String metaEncodeRegionName = HRegionInfo.FIRST_META_REGIONINFO.getEncodedName();
300 int count = 0;
301 try {
302 List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
303 if (tasks != null) {
304 int listSize = tasks.size();
305 for (int i = 0; i < listSize; i++) {
306 if (!ZKSplitLog.isRescanNode(tasks.get(i))) {
307 count++;
308 }
309 }
310 }
311 if (count == 0 && this.details.getMaster().isInitialized()
312 && !this.details.getMaster().getServerManager().areDeadServersInProgress()) {
313
314 ZKSplitLog.deleteRecoveringRegionZNodes(watcher, null);
315
316
317 lastRecoveringNodeCreationTime = Long.MAX_VALUE;
318 } else if (!recoveredServerNameSet.isEmpty()) {
319
320 List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
321 if (regions != null) {
322 int listSize = regions.size();
323 if (LOG.isDebugEnabled()) {
324 LOG.debug("Processing recovering " + regions + " and servers " +
325 recoveredServerNameSet + ", isMetaRecovery=" + isMetaRecovery);
326 }
327 for (int i = 0; i < listSize; i++) {
328 String region = regions.get(i);
329 if (isMetaRecovery != null) {
330 if ((isMetaRecovery && !region.equalsIgnoreCase(metaEncodeRegionName))
331 || (!isMetaRecovery && region.equalsIgnoreCase(metaEncodeRegionName))) {
332
333
334 continue;
335 }
336 }
337 String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
338 List<String> failedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
339 if (failedServers == null || failedServers.isEmpty()) {
340 ZKUtil.deleteNode(watcher, nodePath);
341 continue;
342 }
343 if (recoveredServerNameSet.containsAll(failedServers)) {
344 ZKUtil.deleteNodeRecursively(watcher, nodePath);
345 } else {
346 int tmpFailedServerSize = failedServers.size();
347 for (int j = 0; j < tmpFailedServerSize; j++) {
348 String failedServer = failedServers.get(j);
349 if (recoveredServerNameSet.contains(failedServer)) {
350 String tmpPath = ZKUtil.joinZNode(nodePath, failedServer);
351 ZKUtil.deleteNode(watcher, tmpPath);
352 }
353 }
354 }
355 }
356 }
357 }
358 } catch (KeeperException ke) {
359 LOG.warn("removeRecoveringRegionsFromZK got zookeeper exception. Will retry", ke);
360 throw new IOException(ke);
361 }
362 }
363
364 private void deleteNode(String path, Long retries) {
365 SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet();
366
367
368
369 this.watcher.getRecoverableZooKeeper().getZooKeeper()
370 .delete(path, -1, new DeleteAsyncCallback(), retries);
371 }
372
373 private void deleteNodeSuccess(String path) {
374 if (ignoreZKDeleteForTesting) {
375 return;
376 }
377 Task task;
378 task = details.getTasks().remove(path);
379 if (task == null) {
380 if (ZKSplitLog.isRescanNode(watcher, path)) {
381 SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet();
382 }
383 SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet();
384 LOG.debug("deleted task without in memory state " + path);
385 return;
386 }
387 synchronized (task) {
388 task.status = DELETED;
389 task.notify();
390 }
391 SplitLogCounters.tot_mgr_task_deleted.incrementAndGet();
392 }
393
394 private void deleteNodeFailure(String path) {
395 LOG.info("Failed to delete node " + path + " and will retry soon.");
396 return;
397 }
398
399 private void createRescanSuccess(String path) {
400 SplitLogCounters.tot_mgr_rescan.incrementAndGet();
401 getDataSetWatch(path, zkretries);
402 }
403
404 private void createRescanFailure() {
405 LOG.fatal("logic failure, rescan failure must not happen");
406 }
407
408
409
410
411
412
413
414 private boolean needAbandonRetries(int statusCode, String action) {
415 if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) {
416 LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for "
417 + "action=" + action);
418 return true;
419 }
420 return false;
421 }
422
423 private void createNode(String path, Long retry_count) {
424 SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName(), getRecoveryMode());
425 ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(),
426 retry_count);
427 SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
428 return;
429 }
430
431 private void createNodeSuccess(String path) {
432 LOG.debug("put up splitlog task at znode " + path);
433 getDataSetWatch(path, zkretries);
434 }
435
436 private void createNodeFailure(String path) {
437
438 LOG.warn("failed to create task node" + path);
439 setDone(path, FAILURE);
440 }
441
442 private void getDataSetWatch(String path, Long retry_count) {
443 this.watcher.getRecoverableZooKeeper().getZooKeeper()
444 .getData(path, this.watcher, new GetDataAsyncCallback(), retry_count);
445 SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
446 }
447
448
449 private void getDataSetWatchSuccess(String path, byte[] data, int version)
450 throws DeserializationException {
451 if (data == null) {
452 if (version == Integer.MIN_VALUE) {
453
454 setDone(path, SUCCESS);
455 return;
456 }
457 SplitLogCounters.tot_mgr_null_data.incrementAndGet();
458 LOG.fatal("logic error - got null data " + path);
459 setDone(path, FAILURE);
460 return;
461 }
462 data = this.watcher.getRecoverableZooKeeper().removeMetaData(data);
463 SplitLogTask slt = SplitLogTask.parseFrom(data);
464 if (slt.isUnassigned()) {
465 LOG.debug("task not yet acquired " + path + " ver = " + version);
466 handleUnassignedTask(path);
467 } else if (slt.isOwned()) {
468 heartbeat(path, version, slt.getServerName());
469 } else if (slt.isResigned()) {
470 LOG.info("task " + path + " entered state: " + slt.toString());
471 resubmitOrFail(path, FORCE);
472 } else if (slt.isDone()) {
473 LOG.info("task " + path + " entered state: " + slt.toString());
474 if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
475 if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) {
476 setDone(path, SUCCESS);
477 } else {
478 resubmitOrFail(path, CHECK);
479 }
480 } else {
481 setDone(path, SUCCESS);
482 }
483 } else if (slt.isErr()) {
484 LOG.info("task " + path + " entered state: " + slt.toString());
485 resubmitOrFail(path, CHECK);
486 } else {
487 LOG.fatal("logic error - unexpected zk state for path = " + path + " data = "
488 + slt.toString());
489 setDone(path, FAILURE);
490 }
491 }
492
493 private void resubmitOrFail(String path, ResubmitDirective directive) {
494 if (resubmitTask(path, findOrCreateOrphanTask(path), directive) == false) {
495 setDone(path, FAILURE);
496 }
497 }
498
499 private void getDataSetWatchFailure(String path) {
500 LOG.warn("failed to set data watch " + path);
501 setDone(path, FAILURE);
502 }
503
504 private void setDone(String path, TerminationStatus status) {
505 Task task = details.getTasks().get(path);
506 if (task == null) {
507 if (!ZKSplitLog.isRescanNode(watcher, path)) {
508 SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet();
509 LOG.debug("unacquired orphan task is done " + path);
510 }
511 } else {
512 synchronized (task) {
513 if (task.status == IN_PROGRESS) {
514 if (status == SUCCESS) {
515 SplitLogCounters.tot_mgr_log_split_success.incrementAndGet();
516 LOG.info("Done splitting " + path);
517 } else {
518 SplitLogCounters.tot_mgr_log_split_err.incrementAndGet();
519 LOG.warn("Error splitting " + path);
520 }
521 task.status = status;
522 if (task.batch != null) {
523 synchronized (task.batch) {
524 if (status == SUCCESS) {
525 task.batch.done++;
526 } else {
527 task.batch.error++;
528 }
529 task.batch.notify();
530 }
531 }
532 }
533 }
534 }
535
536
537
538
539
540 deleteNode(path, zkretries);
541 return;
542 }
543
544 Task findOrCreateOrphanTask(String path) {
545 Task orphanTask = new Task();
546 Task task;
547 task = details.getTasks().putIfAbsent(path, orphanTask);
548 if (task == null) {
549 LOG.info("creating orphan task " + path);
550 SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
551 task = orphanTask;
552 }
553 return task;
554 }
555
556 private void heartbeat(String path, int new_version, ServerName workerName) {
557 Task task = findOrCreateOrphanTask(path);
558 if (new_version != task.last_version) {
559 if (task.isUnassigned()) {
560 LOG.info("task " + path + " acquired by " + workerName);
561 }
562 task.heartbeat(EnvironmentEdgeManager.currentTime(), new_version, workerName);
563 SplitLogCounters.tot_mgr_heartbeat.incrementAndGet();
564 } else {
565
566
567
568
569 }
570 return;
571 }
572
573 private void lookForOrphans() {
574 List<String> orphans;
575 try {
576 orphans = ZKUtil.listChildrenNoWatch(this.watcher, this.watcher.splitLogZNode);
577 if (orphans == null) {
578 LOG.warn("could not get children of " + this.watcher.splitLogZNode);
579 return;
580 }
581 } catch (KeeperException e) {
582 LOG.warn("could not get children of " + this.watcher.splitLogZNode + " "
583 + StringUtils.stringifyException(e));
584 return;
585 }
586 int rescan_nodes = 0;
587 int listSize = orphans.size();
588 for (int i = 0; i < listSize; i++) {
589 String path = orphans.get(i);
590 String nodepath = ZKUtil.joinZNode(watcher.splitLogZNode, path);
591 if (ZKSplitLog.isRescanNode(watcher, nodepath)) {
592 rescan_nodes++;
593 LOG.debug("found orphan rescan node " + path);
594 } else {
595 LOG.info("found orphan task " + path);
596 }
597 getDataSetWatch(nodepath, zkretries);
598 }
599 LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " + rescan_nodes
600 + " rescan nodes");
601 }
602
603
604
605
606
607
608
609 @Override
610 public void markRegionsRecovering(final ServerName serverName, Set<HRegionInfo> userRegions)
611 throws IOException, InterruptedIOException {
612 this.lastRecoveringNodeCreationTime = EnvironmentEdgeManager.currentTime();
613 for (HRegionInfo region : userRegions) {
614 String regionEncodeName = region.getEncodedName();
615 long retries = this.zkretries;
616
617 do {
618 String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName);
619 long lastRecordedFlushedSequenceId = -1;
620 try {
621 long lastSequenceId =
622 this.details.getMaster().getServerManager()
623 .getLastFlushedSequenceId(regionEncodeName.getBytes()).getLastFlushedSequenceId();
624
625
626
627
628
629 byte[] data = ZKUtil.getData(this.watcher, nodePath);
630 if (data == null) {
631 ZKUtil
632 .createSetData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId));
633 } else {
634 lastRecordedFlushedSequenceId =
635 ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
636 if (lastRecordedFlushedSequenceId < lastSequenceId) {
637
638 ZKUtil.setData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId));
639 }
640 }
641
642 nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName());
643 if (lastSequenceId <= lastRecordedFlushedSequenceId) {
644
645 lastSequenceId = lastRecordedFlushedSequenceId;
646 }
647 ZKUtil.createSetData(this.watcher, nodePath,
648 ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null));
649 if (LOG.isDebugEnabled()) {
650 LOG.debug("Marked " + regionEncodeName + " as recovering from " + serverName +
651 ": " + nodePath);
652 }
653
654 break;
655 } catch (KeeperException e) {
656
657 if (retries <= 1) {
658 throw new IOException(e);
659 }
660
661 try {
662 Thread.sleep(20);
663 } catch (InterruptedException e1) {
664 throw new InterruptedIOException();
665 }
666 } catch (InterruptedException e) {
667 throw new InterruptedIOException();
668 }
669 } while ((--retries) > 0);
670 }
671 }
672
673 @Override
674 public void nodeDataChanged(String path) {
675 Task task;
676 task = details.getTasks().get(path);
677 if (task != null || ZKSplitLog.isRescanNode(watcher, path)) {
678 if (task != null) {
679 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime());
680 }
681 getDataSetWatch(path, zkretries);
682 }
683 }
684
685
686
687
688
689
690 @Override
691 public void removeStaleRecoveringRegions(final Set<String> knownFailedServers)
692 throws IOException, InterruptedIOException {
693
694 try {
695 List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
696 if (tasks != null) {
697 int listSize = tasks.size();
698 for (int i = 0; i < listSize; i++) {
699 String t = tasks.get(i);
700 byte[] data;
701 try {
702 data = ZKUtil.getData(this.watcher, ZKUtil.joinZNode(watcher.splitLogZNode, t));
703 } catch (InterruptedException e) {
704 throw new InterruptedIOException();
705 }
706 if (data != null) {
707 SplitLogTask slt = null;
708 try {
709 slt = SplitLogTask.parseFrom(data);
710 } catch (DeserializationException e) {
711 LOG.warn("Failed parse data for znode " + t, e);
712 }
713 if (slt != null && slt.isDone()) {
714 continue;
715 }
716 }
717
718 t = ZKSplitLog.getFileName(t);
719 ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(new Path(t));
720 if (serverName != null) {
721 knownFailedServers.add(serverName.getServerName());
722 } else {
723 LOG.warn("Found invalid WAL log file name:" + t);
724 }
725 }
726 }
727
728
729 List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
730 if (regions != null) {
731 int listSize = regions.size();
732 for (int i = 0; i < listSize; i++) {
733 String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regions.get(i));
734 List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
735 if (regionFailedServers == null || regionFailedServers.isEmpty()) {
736 ZKUtil.deleteNode(watcher, nodePath);
737 continue;
738 }
739 boolean needMoreRecovery = false;
740 int tmpFailedServerSize = regionFailedServers.size();
741 for (int j = 0; j < tmpFailedServerSize; j++) {
742 if (knownFailedServers.contains(regionFailedServers.get(j))) {
743 needMoreRecovery = true;
744 break;
745 }
746 }
747 if (!needMoreRecovery) {
748 ZKUtil.deleteNodeRecursively(watcher, nodePath);
749 }
750 }
751 }
752 } catch (KeeperException e) {
753 throw new IOException(e);
754 }
755 }
756
757 @Override
758 public synchronized boolean isReplaying() {
759 return this.recoveryMode == RecoveryMode.LOG_REPLAY;
760 }
761
762 @Override
763 public synchronized boolean isSplitting() {
764 return this.recoveryMode == RecoveryMode.LOG_SPLITTING;
765 }
766
767 private List<String> listSplitLogTasks() throws KeeperException {
768 List<String> taskOrRescanList = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
769 if (taskOrRescanList == null || taskOrRescanList.isEmpty()) {
770 return Collections.<String> emptyList();
771 }
772 List<String> taskList = new ArrayList<String>();
773 for (String taskOrRescan : taskOrRescanList) {
774
775 if (!ZKSplitLog.isRescanNode(taskOrRescan)) {
776 taskList.add(taskOrRescan);
777 }
778 }
779 return taskList;
780 }
781
782
783
784
785
786
787
788 @Override
789 public void setRecoveryMode(boolean isForInitialization) throws IOException {
790 synchronized(this) {
791 if (this.isDrainingDone) {
792
793
794 return;
795 }
796 }
797 if (this.watcher == null) {
798
799 synchronized(this) {
800 this.isDrainingDone = true;
801 this.recoveryMode = RecoveryMode.LOG_SPLITTING;
802 }
803 return;
804 }
805 boolean hasSplitLogTask = false;
806 boolean hasRecoveringRegions = false;
807 RecoveryMode previousRecoveryMode = RecoveryMode.UNKNOWN;
808 RecoveryMode recoveryModeInConfig =
809 (isDistributedLogReplay(conf)) ? RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING;
810
811
812 try {
813 List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
814 if (regions != null && !regions.isEmpty()) {
815 hasRecoveringRegions = true;
816 previousRecoveryMode = RecoveryMode.LOG_REPLAY;
817 }
818 if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
819
820 List<String> tasks = listSplitLogTasks();
821 if (!tasks.isEmpty()) {
822 hasSplitLogTask = true;
823 if (isForInitialization) {
824
825 int listSize = tasks.size();
826 for (int i = 0; i < listSize; i++) {
827 String task = tasks.get(i);
828 try {
829 byte[] data =
830 ZKUtil.getData(this.watcher, ZKUtil.joinZNode(watcher.splitLogZNode, task));
831 if (data == null) continue;
832 SplitLogTask slt = SplitLogTask.parseFrom(data);
833 previousRecoveryMode = slt.getMode();
834 if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
835
836
837
838 previousRecoveryMode = RecoveryMode.LOG_SPLITTING;
839 }
840 break;
841 } catch (DeserializationException e) {
842 LOG.warn("Failed parse data for znode " + task, e);
843 } catch (InterruptedException e) {
844 throw new InterruptedIOException();
845 }
846 }
847 }
848 }
849 }
850 } catch (KeeperException e) {
851 throw new IOException(e);
852 }
853
854 synchronized (this) {
855 if (this.isDrainingDone) {
856 return;
857 }
858 if (!hasSplitLogTask && !hasRecoveringRegions) {
859 this.isDrainingDone = true;
860 this.recoveryMode = recoveryModeInConfig;
861 return;
862 } else if (!isForInitialization) {
863
864 return;
865 }
866
867 if (previousRecoveryMode != RecoveryMode.UNKNOWN) {
868 this.isDrainingDone = (previousRecoveryMode == recoveryModeInConfig);
869 this.recoveryMode = previousRecoveryMode;
870 } else {
871 this.recoveryMode = recoveryModeInConfig;
872 }
873 }
874 }
875
876
877
878
879
880
881 private boolean isDistributedLogReplay(Configuration conf) {
882 boolean dlr =
883 conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
884 HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
885 if (LOG.isDebugEnabled()) {
886 LOG.debug("Distributed log replay=" + dlr);
887 }
888 return dlr;
889 }
890
891 private boolean resubmit(ServerName serverName, String path, int version) {
892 try {
893
894 SplitLogTask slt =
895 new SplitLogTask.Unassigned(this.details.getServerName(), getRecoveryMode());
896 if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
897 LOG.debug("failed to resubmit task " + path + " version changed");
898 return false;
899 }
900 } catch (NoNodeException e) {
901 LOG.warn("failed to resubmit because znode doesn't exist " + path
902 + " task done (or forced done by removing the znode)");
903 try {
904 getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
905 } catch (DeserializationException e1) {
906 LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1);
907 return false;
908 }
909 return false;
910 } catch (KeeperException.BadVersionException e) {
911 LOG.debug("failed to resubmit task " + path + " version changed");
912 return false;
913 } catch (KeeperException e) {
914 SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet();
915 LOG.warn("failed to resubmit " + path, e);
916 return false;
917 }
918 return true;
919 }
920
921
922
923
924
925
926
927
928
929 public interface TaskFinisher {
930
931
932
933 enum Status {
934
935
936
937 DONE(),
938
939
940
941 ERR();
942 }
943
944
945
946
947
948
949
950
951
952
953 Status finish(ServerName workerName, String taskname);
954 }
955
956
957
958
959 public class CreateAsyncCallback implements AsyncCallback.StringCallback {
960 private final Log LOG = LogFactory.getLog(CreateAsyncCallback.class);
961
962 @Override
963 public void processResult(int rc, String path, Object ctx, String name) {
964 SplitLogCounters.tot_mgr_node_create_result.incrementAndGet();
965 if (rc != 0) {
966 if (needAbandonRetries(rc, "Create znode " + path)) {
967 createNodeFailure(path);
968 return;
969 }
970 if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
971
972
973
974
975
976
977 LOG.debug("found pre-existing znode " + path);
978 SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet();
979 } else {
980 Long retry_count = (Long) ctx;
981 LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " + path
982 + " remaining retries=" + retry_count);
983 if (retry_count == 0) {
984 SplitLogCounters.tot_mgr_node_create_err.incrementAndGet();
985 createNodeFailure(path);
986 } else {
987 SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet();
988 createNode(path, retry_count - 1);
989 }
990 return;
991 }
992 }
993 createNodeSuccess(path);
994 }
995 }
996
997
998
999
1000 public class GetDataAsyncCallback implements AsyncCallback.DataCallback {
1001 private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
1002
1003 @Override
1004 public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
1005 SplitLogCounters.tot_mgr_get_data_result.incrementAndGet();
1006 if (rc != 0) {
1007 if (needAbandonRetries(rc, "GetData from znode " + path)) {
1008 return;
1009 }
1010 if (rc == KeeperException.Code.NONODE.intValue()) {
1011 SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet();
1012 LOG.warn("task znode " + path + " vanished or not created yet.");
1013
1014
1015
1016 return;
1017 }
1018 Long retry_count = (Long) ctx;
1019
1020 if (retry_count < 0) {
1021 LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path
1022 + ". Ignoring error. No error handling. No retrying.");
1023 return;
1024 }
1025 LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path
1026 + " remaining retries=" + retry_count);
1027 if (retry_count == 0) {
1028 SplitLogCounters.tot_mgr_get_data_err.incrementAndGet();
1029 getDataSetWatchFailure(path);
1030 } else {
1031 SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet();
1032 getDataSetWatch(path, retry_count - 1);
1033 }
1034 return;
1035 }
1036 try {
1037 getDataSetWatchSuccess(path, data, stat.getVersion());
1038 } catch (DeserializationException e) {
1039 LOG.warn("Deserialization problem", e);
1040 }
1041 return;
1042 }
1043 }
1044
1045
1046
1047
1048 public class DeleteAsyncCallback implements AsyncCallback.VoidCallback {
1049 private final Log LOG = LogFactory.getLog(DeleteAsyncCallback.class);
1050
1051 @Override
1052 public void processResult(int rc, String path, Object ctx) {
1053 SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet();
1054 if (rc != 0) {
1055 if (needAbandonRetries(rc, "Delete znode " + path)) {
1056 details.getFailedDeletions().add(path);
1057 return;
1058 }
1059 if (rc != KeeperException.Code.NONODE.intValue()) {
1060 SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet();
1061 Long retry_count = (Long) ctx;
1062 LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " + path
1063 + " remaining retries=" + retry_count);
1064 if (retry_count == 0) {
1065 LOG.warn("delete failed " + path);
1066 details.getFailedDeletions().add(path);
1067 deleteNodeFailure(path);
1068 } else {
1069 deleteNode(path, retry_count - 1);
1070 }
1071 return;
1072 } else {
1073 LOG.info(path + " does not exist. Either was created but deleted behind our"
1074 + " back by another pending delete OR was deleted"
1075 + " in earlier retry rounds. zkretries = " + ctx);
1076 }
1077 } else {
1078 LOG.debug("deleted " + path);
1079 }
1080 deleteNodeSuccess(path);
1081 }
1082 }
1083
1084
1085
1086
1087
1088
1089
1090 public class CreateRescanAsyncCallback implements AsyncCallback.StringCallback {
1091 private final Log LOG = LogFactory.getLog(CreateRescanAsyncCallback.class);
1092
1093 @Override
1094 public void processResult(int rc, String path, Object ctx, String name) {
1095 if (rc != 0) {
1096 if (needAbandonRetries(rc, "CreateRescan znode " + path)) {
1097 return;
1098 }
1099 Long retry_count = (Long) ctx;
1100 LOG.warn("rc=" + KeeperException.Code.get(rc) + " for " + path + " remaining retries="
1101 + retry_count);
1102 if (retry_count == 0) {
1103 createRescanFailure();
1104 } else {
1105 rescan(retry_count - 1);
1106 }
1107 return;
1108 }
1109
1110 createRescanSuccess(name);
1111 }
1112 }
1113
1114 @Override
1115 public void setDetails(SplitLogManagerDetails details) {
1116 this.details = details;
1117 }
1118
1119 @Override
1120 public SplitLogManagerDetails getDetails() {
1121 return details;
1122 }
1123
1124 @Override
1125 public synchronized RecoveryMode getRecoveryMode() {
1126 return recoveryMode;
1127 }
1128
1129 @Override
1130 public long getLastRecoveryTime() {
1131 return lastRecoveringNodeCreationTime;
1132 }
1133
1134
1135
1136
1137 public void setIgnoreDeleteForTesting(boolean b) {
1138 ignoreZKDeleteForTesting = b;
1139 }
1140 }