001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.coordination; 020 021import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK; 022import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE; 023import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED; 024import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE; 025import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS; 026import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS; 027import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 028 029import java.io.IOException; 030import java.util.List; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.SplitLogCounters; 036import org.apache.hadoop.hbase.SplitLogTask; 037import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination.TaskFinisher.Status; 038import org.apache.hadoop.hbase.exceptions.DeserializationException; 039import org.apache.hadoop.hbase.log.HBaseMarkers; 040import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective; 041import org.apache.hadoop.hbase.master.SplitLogManager.Task; 042import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus; 043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 044import org.apache.hadoop.hbase.wal.WALSplitUtil; 045import org.apache.hadoop.hbase.zookeeper.ZKListener; 046import org.apache.hadoop.hbase.zookeeper.ZKMetadata; 047import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 048import org.apache.hadoop.hbase.zookeeper.ZKUtil; 049import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 050import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 051import org.apache.hadoop.util.StringUtils; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.apache.zookeeper.AsyncCallback; 054import org.apache.zookeeper.CreateMode; 055import org.apache.zookeeper.KeeperException; 056import org.apache.zookeeper.KeeperException.NoNodeException; 057import org.apache.zookeeper.ZooDefs.Ids; 058import org.apache.zookeeper.data.Stat; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 062 063/** 064 * ZooKeeper based implementation of 065 * {@link SplitLogManagerCoordination} 066 */ 067@InterfaceAudience.Private 068public class ZKSplitLogManagerCoordination extends ZKListener implements 069 SplitLogManagerCoordination { 070 071 public static final int DEFAULT_TIMEOUT = 120000; 072 public static final int DEFAULT_ZK_RETRIES = 3; 073 public static final int DEFAULT_MAX_RESUBMIT = 3; 074 075 private static final Logger LOG = LoggerFactory.getLogger(SplitLogManagerCoordination.class); 076 077 private final TaskFinisher taskFinisher; 078 private final Configuration conf; 079 080 private long zkretries; 081 private long resubmitThreshold; 082 private long timeout; 083 084 SplitLogManagerDetails details; 085 086 public boolean ignoreZKDeleteForTesting = false; 087 088 public ZKSplitLogManagerCoordination(Configuration conf, ZKWatcher watcher) { 089 super(watcher); 090 this.conf = conf; 091 taskFinisher = new TaskFinisher() { 092 @Override 093 public Status finish(ServerName workerName, String logfile) { 094 try { 095 WALSplitUtil.finishSplitLogFile(logfile, conf); 096 } catch (IOException e) { 097 LOG.warn("Could not finish splitting of log file " + logfile, e); 098 return Status.ERR; 099 } 100 return Status.DONE; 101 } 102 }; 103 } 104 105 @Override 106 public void init() throws IOException { 107 this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES); 108 this.resubmitThreshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT); 109 this.timeout = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, DEFAULT_TIMEOUT); 110 if (this.watcher != null) { 111 this.watcher.registerListener(this); 112 lookForOrphans(); 113 } 114 } 115 116 @Override 117 public String prepareTask(String taskname) { 118 return ZKSplitLog.getEncodedNodeName(watcher, taskname); 119 } 120 121 @Override 122 public int remainingTasksInCoordination() { 123 int count = 0; 124 try { 125 List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, 126 watcher.getZNodePaths().splitLogZNode); 127 if (tasks != null) { 128 int listSize = tasks.size(); 129 for (int i = 0; i < listSize; i++) { 130 if (!ZKSplitLog.isRescanNode(tasks.get(i))) { 131 count++; 132 } 133 } 134 } 135 } catch (KeeperException ke) { 136 LOG.warn("Failed to check remaining tasks", ke); 137 count = -1; 138 } 139 return count; 140 } 141 142 /** 143 * It is possible for a task to stay in UNASSIGNED state indefinitely - say SplitLogManager wants 144 * to resubmit a task. It forces the task to UNASSIGNED state but it dies before it could create 145 * the RESCAN task node to signal the SplitLogWorkers to pick up the task. To prevent this 146 * scenario the SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup. 147 * @param path 148 */ 149 private void handleUnassignedTask(String path) { 150 if (ZKSplitLog.isRescanNode(watcher, path)) { 151 return; 152 } 153 Task task = findOrCreateOrphanTask(path); 154 if (task.isOrphan() && (task.incarnation.get() == 0)) { 155 LOG.info("Resubmitting unassigned orphan task " + path); 156 // ignore failure to resubmit. The timeout-monitor will handle it later 157 // albeit in a more crude fashion 158 resubmitTask(path, task, FORCE); 159 } 160 } 161 162 @Override 163 public void deleteTask(String path) { 164 deleteNode(path, zkretries); 165 } 166 167 @Override 168 public boolean resubmitTask(String path, Task task, ResubmitDirective directive) { 169 // its ok if this thread misses the update to task.deleted. It will fail later 170 if (task.status != IN_PROGRESS) { 171 return false; 172 } 173 int version; 174 if (directive != FORCE) { 175 // We're going to resubmit: 176 // 1) immediately if the worker server is now marked as dead 177 // 2) after a configurable timeout if the server is not marked as dead but has still not 178 // finished the task. This allows to continue if the worker cannot actually handle it, 179 // for any reason. 180 final long time = EnvironmentEdgeManager.currentTime() - task.last_update; 181 final boolean alive = 182 details.getMaster().getServerManager() != null ? details.getMaster().getServerManager() 183 .isServerOnline(task.cur_worker_name) : true; 184 if (alive && time < timeout) { 185 LOG.trace("Skipping the resubmit of " + task.toString() + " because the server " 186 + task.cur_worker_name + " is not marked as dead, we waited for " + time 187 + " while the timeout is " + timeout); 188 return false; 189 } 190 191 if (task.unforcedResubmits.get() >= resubmitThreshold) { 192 if (!task.resubmitThresholdReached) { 193 task.resubmitThresholdReached = true; 194 SplitLogCounters.tot_mgr_resubmit_threshold_reached.increment(); 195 LOG.info("Skipping resubmissions of task " + path + " because threshold " 196 + resubmitThreshold + " reached"); 197 } 198 return false; 199 } 200 // race with heartbeat() that might be changing last_version 201 version = task.last_version; 202 } else { 203 SplitLogCounters.tot_mgr_resubmit_force.increment(); 204 version = -1; 205 } 206 LOG.info("Resubmitting task " + path); 207 task.incarnation.incrementAndGet(); 208 boolean result = resubmit(path, version); 209 if (!result) { 210 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime()); 211 return false; 212 } 213 // don't count forced resubmits 214 if (directive != FORCE) { 215 task.unforcedResubmits.incrementAndGet(); 216 } 217 task.setUnassigned(); 218 rescan(Long.MAX_VALUE); 219 SplitLogCounters.tot_mgr_resubmit.increment(); 220 return true; 221 } 222 223 224 @Override 225 public void checkTasks() { 226 rescan(Long.MAX_VALUE); 227 }; 228 229 /** 230 * signal the workers that a task was resubmitted by creating the RESCAN node. 231 */ 232 private void rescan(long retries) { 233 // The RESCAN node will be deleted almost immediately by the 234 // SplitLogManager as soon as it is created because it is being 235 // created in the DONE state. This behavior prevents a buildup 236 // of RESCAN nodes. But there is also a chance that a SplitLogWorker 237 // might miss the watch-trigger that creation of RESCAN node provides. 238 // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks 239 // therefore this behavior is safe. 240 SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName()); 241 this.watcher 242 .getRecoverableZooKeeper() 243 .getZooKeeper() 244 .create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 245 CreateMode.EPHEMERAL_SEQUENTIAL, new CreateRescanAsyncCallback(), Long.valueOf(retries)); 246 } 247 248 @Override 249 public void submitTask(String path) { 250 createNode(path, zkretries); 251 } 252 253 @Override 254 public void checkTaskStillAvailable(String path) { 255 // A negative retry count will lead to ignoring all error processing. 256 this.watcher 257 .getRecoverableZooKeeper() 258 .getZooKeeper() 259 .getData(path, this.watcher, new GetDataAsyncCallback(), 260 Long.valueOf(-1) /* retry count */); 261 SplitLogCounters.tot_mgr_get_data_queued.increment(); 262 } 263 264 private void deleteNode(String path, Long retries) { 265 SplitLogCounters.tot_mgr_node_delete_queued.increment(); 266 // Once a task znode is ready for delete, that is it is in the TASK_DONE 267 // state, then no one should be writing to it anymore. That is no one 268 // will be updating the znode version any more. 269 this.watcher.getRecoverableZooKeeper().getZooKeeper() 270 .delete(path, -1, new DeleteAsyncCallback(), retries); 271 } 272 273 private void deleteNodeSuccess(String path) { 274 if (ignoreZKDeleteForTesting) { 275 return; 276 } 277 Task task; 278 task = details.getTasks().remove(path); 279 if (task == null) { 280 if (ZKSplitLog.isRescanNode(watcher, path)) { 281 SplitLogCounters.tot_mgr_rescan_deleted.increment(); 282 } 283 SplitLogCounters.tot_mgr_missing_state_in_delete.increment(); 284 LOG.debug("Deleted task without in memory state " + path); 285 return; 286 } 287 synchronized (task) { 288 task.status = DELETED; 289 task.notify(); 290 } 291 SplitLogCounters.tot_mgr_task_deleted.increment(); 292 } 293 294 private void deleteNodeFailure(String path) { 295 LOG.info("Failed to delete node " + path + " and will retry soon."); 296 return; 297 } 298 299 private void createRescanSuccess(String path) { 300 SplitLogCounters.tot_mgr_rescan.increment(); 301 getDataSetWatch(path, zkretries); 302 } 303 304 private void createRescanFailure() { 305 LOG.error(HBaseMarkers.FATAL, "logic failure, rescan failure must not happen"); 306 } 307 308 /** 309 * Helper function to check whether to abandon retries in ZooKeeper AsyncCallback functions 310 * @param statusCode integer value of a ZooKeeper exception code 311 * @param action description message about the retried action 312 * @return true when need to abandon retries otherwise false 313 */ 314 private boolean needAbandonRetries(int statusCode, String action) { 315 if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) { 316 LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for " 317 + "action=" + action); 318 return true; 319 } 320 return false; 321 } 322 323 private void createNode(String path, Long retry_count) { 324 SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName()); 325 ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), 326 retry_count); 327 SplitLogCounters.tot_mgr_node_create_queued.increment(); 328 return; 329 } 330 331 private void createNodeSuccess(String path) { 332 LOG.debug("Put up splitlog task at znode " + path); 333 getDataSetWatch(path, zkretries); 334 } 335 336 private void createNodeFailure(String path) { 337 // TODO the Manager should split the log locally instead of giving up 338 LOG.warn("Failed to create task node " + path); 339 setDone(path, FAILURE); 340 } 341 342 private void getDataSetWatch(String path, Long retry_count) { 343 this.watcher.getRecoverableZooKeeper().getZooKeeper() 344 .getData(path, this.watcher, new GetDataAsyncCallback(), retry_count); 345 SplitLogCounters.tot_mgr_get_data_queued.increment(); 346 } 347 348 private void getDataSetWatchSuccess(String path, byte[] data, int version) 349 throws DeserializationException { 350 if (data == null) { 351 if (version == Integer.MIN_VALUE) { 352 // assume all done. The task znode suddenly disappeared. 353 setDone(path, SUCCESS); 354 return; 355 } 356 SplitLogCounters.tot_mgr_null_data.increment(); 357 LOG.error(HBaseMarkers.FATAL, "logic error - got null data " + path); 358 setDone(path, FAILURE); 359 return; 360 } 361 data = ZKMetadata.removeMetaData(data); 362 SplitLogTask slt = SplitLogTask.parseFrom(data); 363 if (slt.isUnassigned()) { 364 LOG.debug("Task not yet acquired " + path + ", ver=" + version); 365 handleUnassignedTask(path); 366 } else if (slt.isOwned()) { 367 heartbeat(path, version, slt.getServerName()); 368 } else if (slt.isResigned()) { 369 LOG.info("Task " + path + " entered state=" + slt.toString()); 370 resubmitOrFail(path, FORCE); 371 } else if (slt.isDone()) { 372 LOG.info("Task " + path + " entered state=" + slt.toString()); 373 if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) { 374 if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) { 375 setDone(path, SUCCESS); 376 } else { 377 resubmitOrFail(path, CHECK); 378 } 379 } else { 380 setDone(path, SUCCESS); 381 } 382 } else if (slt.isErr()) { 383 LOG.info("Task " + path + " entered state=" + slt.toString()); 384 resubmitOrFail(path, CHECK); 385 } else { 386 LOG.error(HBaseMarkers.FATAL, "logic error - unexpected zk state for path = " 387 + path + " data = " + slt.toString()); 388 setDone(path, FAILURE); 389 } 390 } 391 392 private void resubmitOrFail(String path, ResubmitDirective directive) { 393 if (resubmitTask(path, findOrCreateOrphanTask(path), directive) == false) { 394 setDone(path, FAILURE); 395 } 396 } 397 398 private void getDataSetWatchFailure(String path) { 399 LOG.warn("Failed to set data watch " + path); 400 setDone(path, FAILURE); 401 } 402 403 private void setDone(String path, TerminationStatus status) { 404 Task task = details.getTasks().get(path); 405 if (task == null) { 406 if (!ZKSplitLog.isRescanNode(watcher, path)) { 407 SplitLogCounters.tot_mgr_unacquired_orphan_done.increment(); 408 LOG.debug("Unacquired orphan task is done " + path); 409 } 410 } else { 411 synchronized (task) { 412 if (task.status == IN_PROGRESS) { 413 if (status == SUCCESS) { 414 SplitLogCounters.tot_mgr_log_split_success.increment(); 415 LOG.info("Done splitting " + path); 416 } else { 417 SplitLogCounters.tot_mgr_log_split_err.increment(); 418 LOG.warn("Error splitting " + path); 419 } 420 task.status = status; 421 if (task.batch != null) { 422 synchronized (task.batch) { 423 if (status == SUCCESS) { 424 task.batch.done++; 425 } else { 426 task.batch.error++; 427 } 428 task.batch.notify(); 429 } 430 } 431 } 432 } 433 } 434 // delete the task node in zk. It's an async 435 // call and no one is blocked waiting for this node to be deleted. All 436 // task names are unique (log.<timestamp>) there is no risk of deleting 437 // a future task. 438 // if a deletion fails, TimeoutMonitor will retry the same deletion later 439 deleteNode(path, zkretries); 440 return; 441 } 442 443 private Task findOrCreateOrphanTask(String path) { 444 return computeIfAbsent(details.getTasks(), path, Task::new, () -> { 445 LOG.info("Creating orphan task " + path); 446 SplitLogCounters.tot_mgr_orphan_task_acquired.increment(); 447 }); 448 } 449 450 private void heartbeat(String path, int new_version, ServerName workerName) { 451 Task task = findOrCreateOrphanTask(path); 452 if (new_version != task.last_version) { 453 if (task.isUnassigned()) { 454 LOG.info("Task " + path + " acquired by " + workerName); 455 } 456 task.heartbeat(EnvironmentEdgeManager.currentTime(), new_version, workerName); 457 SplitLogCounters.tot_mgr_heartbeat.increment(); 458 } else { 459 // duplicate heartbeats - heartbeats w/o zk node version 460 // changing - are possible. The timeout thread does 461 // getDataSetWatch() just to check whether a node still 462 // exists or not 463 } 464 return; 465 } 466 467 private void lookForOrphans() { 468 List<String> orphans; 469 try { 470 orphans = ZKUtil.listChildrenNoWatch(this.watcher, 471 this.watcher.getZNodePaths().splitLogZNode); 472 if (orphans == null) { 473 LOG.warn("Could not get children of " + this.watcher.getZNodePaths().splitLogZNode); 474 return; 475 } 476 } catch (KeeperException e) { 477 LOG.warn("Could not get children of " + this.watcher.getZNodePaths().splitLogZNode + " " 478 + StringUtils.stringifyException(e)); 479 return; 480 } 481 int rescan_nodes = 0; 482 int listSize = orphans.size(); 483 for (int i = 0; i < listSize; i++) { 484 String path = orphans.get(i); 485 String nodepath = ZNodePaths.joinZNode(watcher.getZNodePaths().splitLogZNode, path); 486 if (ZKSplitLog.isRescanNode(watcher, nodepath)) { 487 rescan_nodes++; 488 LOG.debug("Found orphan rescan node " + path); 489 } else { 490 LOG.info("Found orphan task " + path); 491 } 492 getDataSetWatch(nodepath, zkretries); 493 } 494 LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " + rescan_nodes 495 + " rescan nodes"); 496 } 497 498 @Override 499 public void nodeDataChanged(String path) { 500 Task task; 501 task = details.getTasks().get(path); 502 if (task != null || ZKSplitLog.isRescanNode(watcher, path)) { 503 if (task != null) { 504 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime()); 505 } 506 getDataSetWatch(path, zkretries); 507 } 508 } 509 510 private boolean resubmit(String path, int version) { 511 try { 512 // blocking zk call but this is done from the timeout thread 513 SplitLogTask slt = 514 new SplitLogTask.Unassigned(this.details.getServerName()); 515 if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) { 516 LOG.debug("Failed to resubmit task " + path + " version changed"); 517 return false; 518 } 519 } catch (NoNodeException e) { 520 LOG.warn("Failed to resubmit because znode doesn't exist " + path 521 + " task done (or forced done by removing the znode)"); 522 try { 523 getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); 524 } catch (DeserializationException e1) { 525 LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1); 526 return false; 527 } 528 return false; 529 } catch (KeeperException.BadVersionException e) { 530 LOG.debug("Failed to resubmit task " + path + " version changed"); 531 return false; 532 } catch (KeeperException e) { 533 SplitLogCounters.tot_mgr_resubmit_failed.increment(); 534 LOG.warn("Failed to resubmit " + path, e); 535 return false; 536 } 537 return true; 538 } 539 540 541 /** 542 * {@link org.apache.hadoop.hbase.master.SplitLogManager} can use objects implementing this 543 * interface to finish off a partially done task by 544 * {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}. This provides a 545 * serialization point at the end of the task processing. Must be restartable and idempotent. 546 */ 547 public interface TaskFinisher { 548 /** 549 * status that can be returned finish() 550 */ 551 enum Status { 552 /** 553 * task completed successfully 554 */ 555 DONE(), 556 /** 557 * task completed with error 558 */ 559 ERR(); 560 } 561 562 /** 563 * finish the partially done task. workername provides clue to where the partial results of the 564 * partially done tasks are present. taskname is the name of the task that was put up in 565 * zookeeper. 566 * <p> 567 * @param workerName 568 * @param taskname 569 * @return DONE if task completed successfully, ERR otherwise 570 */ 571 Status finish(ServerName workerName, String taskname); 572 } 573 574 /** 575 * Asynchronous handler for zk create node results. Retries on failures. 576 */ 577 public class CreateAsyncCallback implements AsyncCallback.StringCallback { 578 private final Logger LOG = LoggerFactory.getLogger(CreateAsyncCallback.class); 579 580 @Override 581 public void processResult(int rc, String path, Object ctx, String name) { 582 SplitLogCounters.tot_mgr_node_create_result.increment(); 583 if (rc != 0) { 584 if (needAbandonRetries(rc, "Create znode " + path)) { 585 createNodeFailure(path); 586 return; 587 } 588 if (rc == KeeperException.Code.NODEEXISTS.intValue()) { 589 // What if there is a delete pending against this pre-existing 590 // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE 591 // state. Only operations that will be carried out on this node by 592 // this manager are get-znode-data, task-finisher and delete-znode. 593 // And all code pieces correctly handle the case of suddenly 594 // disappearing task-znode. 595 LOG.debug("Found pre-existing znode " + path); 596 SplitLogCounters.tot_mgr_node_already_exists.increment(); 597 } else { 598 Long retry_count = (Long) ctx; 599 LOG.warn("Create rc=" + KeeperException.Code.get(rc) + " for " + path 600 + " remaining retries=" + retry_count); 601 if (retry_count == 0) { 602 SplitLogCounters.tot_mgr_node_create_err.increment(); 603 createNodeFailure(path); 604 } else { 605 SplitLogCounters.tot_mgr_node_create_retry.increment(); 606 createNode(path, retry_count - 1); 607 } 608 return; 609 } 610 } 611 createNodeSuccess(path); 612 } 613 } 614 615 /** 616 * Asynchronous handler for zk get-data-set-watch on node results. Retries on failures. 617 */ 618 public class GetDataAsyncCallback implements AsyncCallback.DataCallback { 619 private final Logger LOG = LoggerFactory.getLogger(GetDataAsyncCallback.class); 620 621 @Override 622 public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { 623 SplitLogCounters.tot_mgr_get_data_result.increment(); 624 if (rc != 0) { 625 if (needAbandonRetries(rc, "GetData from znode " + path)) { 626 return; 627 } 628 if (rc == KeeperException.Code.NONODE.intValue()) { 629 SplitLogCounters.tot_mgr_get_data_nonode.increment(); 630 LOG.warn("Task znode " + path + " vanished or not created yet."); 631 // ignore since we should not end up in a case where there is in-memory task, 632 // but no znode. The only case is between the time task is created in-memory 633 // and the znode is created. See HBASE-11217. 634 return; 635 } 636 Long retry_count = (Long) ctx; 637 638 if (retry_count < 0) { 639 LOG.warn("Getdata rc=" + KeeperException.Code.get(rc) + " " + path 640 + ". Ignoring error. No error handling. No retrying."); 641 return; 642 } 643 LOG.warn("Getdata rc=" + KeeperException.Code.get(rc) + " " + path 644 + " remaining retries=" + retry_count); 645 if (retry_count == 0) { 646 SplitLogCounters.tot_mgr_get_data_err.increment(); 647 getDataSetWatchFailure(path); 648 } else { 649 SplitLogCounters.tot_mgr_get_data_retry.increment(); 650 getDataSetWatch(path, retry_count - 1); 651 } 652 return; 653 } 654 try { 655 getDataSetWatchSuccess(path, data, stat.getVersion()); 656 } catch (DeserializationException e) { 657 LOG.warn("Deserialization problem", e); 658 } 659 return; 660 } 661 } 662 663 /** 664 * Asynchronous handler for zk delete node results. Retries on failures. 665 */ 666 public class DeleteAsyncCallback implements AsyncCallback.VoidCallback { 667 private final Logger LOG = LoggerFactory.getLogger(DeleteAsyncCallback.class); 668 669 @Override 670 public void processResult(int rc, String path, Object ctx) { 671 SplitLogCounters.tot_mgr_node_delete_result.increment(); 672 if (rc != 0) { 673 if (needAbandonRetries(rc, "Delete znode " + path)) { 674 details.getFailedDeletions().add(path); 675 return; 676 } 677 if (rc != KeeperException.Code.NONODE.intValue()) { 678 SplitLogCounters.tot_mgr_node_delete_err.increment(); 679 Long retry_count = (Long) ctx; 680 LOG.warn("Delete rc=" + KeeperException.Code.get(rc) + " for " + path 681 + " remaining retries=" + retry_count); 682 if (retry_count == 0) { 683 LOG.warn("Delete failed " + path); 684 details.getFailedDeletions().add(path); 685 deleteNodeFailure(path); 686 } else { 687 deleteNode(path, retry_count - 1); 688 } 689 return; 690 } else { 691 LOG.info(path + " does not exist. Either was created but deleted behind our" 692 + " back by another pending delete OR was deleted" 693 + " in earlier retry rounds. zkretries = " + ctx); 694 } 695 } else { 696 LOG.debug("Deleted " + path); 697 } 698 deleteNodeSuccess(path); 699 } 700 } 701 702 /** 703 * Asynchronous handler for zk create RESCAN-node results. Retries on failures. 704 * <p> 705 * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal for all the 706 * {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}s to rescan for new tasks. 707 */ 708 public class CreateRescanAsyncCallback implements AsyncCallback.StringCallback { 709 private final Logger LOG = LoggerFactory.getLogger(CreateRescanAsyncCallback.class); 710 711 @Override 712 public void processResult(int rc, String path, Object ctx, String name) { 713 if (rc != 0) { 714 if (needAbandonRetries(rc, "CreateRescan znode " + path)) { 715 return; 716 } 717 Long retry_count = (Long) ctx; 718 LOG.warn("rc=" + KeeperException.Code.get(rc) + " for " + path + " remaining retries=" 719 + retry_count); 720 if (retry_count == 0) { 721 createRescanFailure(); 722 } else { 723 rescan(retry_count - 1); 724 } 725 return; 726 } 727 // path is the original arg, name is the actual name that was created 728 createRescanSuccess(name); 729 } 730 } 731 732 @Override 733 public void setDetails(SplitLogManagerDetails details) { 734 this.details = details; 735 } 736 737 @Override 738 public SplitLogManagerDetails getDetails() { 739 return details; 740 } 741 742 /** 743 * Temporary function that is used by unit tests only 744 */ 745 @VisibleForTesting 746 public void setIgnoreDeleteForTesting(boolean b) { 747 ignoreZKDeleteForTesting = b; 748 } 749}