001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.coordination; 020 021import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK; 022import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE; 023import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED; 024import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE; 025import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS; 026import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS; 027import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; 028 029import java.io.IOException; 030import java.util.List; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.SplitLogCounters; 036import org.apache.hadoop.hbase.SplitLogTask; 037import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination.TaskFinisher.Status; 038import org.apache.hadoop.hbase.exceptions.DeserializationException; 039import org.apache.hadoop.hbase.log.HBaseMarkers; 040import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective; 041import org.apache.hadoop.hbase.master.SplitLogManager.Task; 042import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus; 043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 044import org.apache.hadoop.hbase.wal.WALSplitter; 045import org.apache.hadoop.hbase.zookeeper.ZKListener; 046import org.apache.hadoop.hbase.zookeeper.ZKMetadata; 047import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 048import org.apache.hadoop.hbase.zookeeper.ZKUtil; 049import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 050import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 051import org.apache.hadoop.util.StringUtils; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.apache.zookeeper.AsyncCallback; 054import org.apache.zookeeper.CreateMode; 055import org.apache.zookeeper.KeeperException; 056import org.apache.zookeeper.KeeperException.NoNodeException; 057import org.apache.zookeeper.ZooDefs.Ids; 058import org.apache.zookeeper.data.Stat; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 062 063/** 064 * ZooKeeper based implementation of 065 * {@link SplitLogManagerCoordination} 066 */ 067@InterfaceAudience.Private 068public class ZKSplitLogManagerCoordination extends ZKListener implements 069 SplitLogManagerCoordination { 070 071 public static final int DEFAULT_TIMEOUT = 120000; 072 public static final int DEFAULT_ZK_RETRIES = 3; 073 public static final int DEFAULT_MAX_RESUBMIT = 3; 074 075 private static final Logger LOG = LoggerFactory.getLogger(SplitLogManagerCoordination.class); 076 077 private final TaskFinisher taskFinisher; 078 private final Configuration conf; 079 080 private long zkretries; 081 private long resubmitThreshold; 082 private long timeout; 083 084 SplitLogManagerDetails details; 085 086 public boolean ignoreZKDeleteForTesting = false; 087 088 public ZKSplitLogManagerCoordination(Configuration conf, ZKWatcher watcher) { 089 super(watcher); 090 this.conf = conf; 091 taskFinisher = new TaskFinisher() { 092 @Override 093 public Status finish(ServerName workerName, String logfile) { 094 try { 095 WALSplitter.finishSplitLogFile(logfile, conf); 096 } catch (IOException e) { 097 LOG.warn("Could not finish splitting of log file " + logfile, e); 098 return Status.ERR; 099 } 100 return Status.DONE; 101 } 102 }; 103 } 104 105 @Override 106 public void init() throws IOException { 107 this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES); 108 this.resubmitThreshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT); 109 this.timeout = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, DEFAULT_TIMEOUT); 110 if (this.watcher != null) { 111 this.watcher.registerListener(this); 112 lookForOrphans(); 113 } 114 } 115 116 @Override 117 public String prepareTask(String taskname) { 118 return ZKSplitLog.getEncodedNodeName(watcher, taskname); 119 } 120 121 @Override 122 public int remainingTasksInCoordination() { 123 int count = 0; 124 try { 125 List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.znodePaths.splitLogZNode); 126 if (tasks != null) { 127 int listSize = tasks.size(); 128 for (int i = 0; i < listSize; i++) { 129 if (!ZKSplitLog.isRescanNode(tasks.get(i))) { 130 count++; 131 } 132 } 133 } 134 } catch (KeeperException ke) { 135 LOG.warn("Failed to check remaining tasks", ke); 136 count = -1; 137 } 138 return count; 139 } 140 141 /** 142 * It is possible for a task to stay in UNASSIGNED state indefinitely - say SplitLogManager wants 143 * to resubmit a task. It forces the task to UNASSIGNED state but it dies before it could create 144 * the RESCAN task node to signal the SplitLogWorkers to pick up the task. To prevent this 145 * scenario the SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup. 146 * @param path 147 */ 148 private void handleUnassignedTask(String path) { 149 if (ZKSplitLog.isRescanNode(watcher, path)) { 150 return; 151 } 152 Task task = findOrCreateOrphanTask(path); 153 if (task.isOrphan() && (task.incarnation.get() == 0)) { 154 LOG.info("Resubmitting unassigned orphan task " + path); 155 // ignore failure to resubmit. The timeout-monitor will handle it later 156 // albeit in a more crude fashion 157 resubmitTask(path, task, FORCE); 158 } 159 } 160 161 @Override 162 public void deleteTask(String path) { 163 deleteNode(path, zkretries); 164 } 165 166 @Override 167 public boolean resubmitTask(String path, Task task, ResubmitDirective directive) { 168 // its ok if this thread misses the update to task.deleted. It will fail later 169 if (task.status != IN_PROGRESS) { 170 return false; 171 } 172 int version; 173 if (directive != FORCE) { 174 // We're going to resubmit: 175 // 1) immediately if the worker server is now marked as dead 176 // 2) after a configurable timeout if the server is not marked as dead but has still not 177 // finished the task. This allows to continue if the worker cannot actually handle it, 178 // for any reason. 179 final long time = EnvironmentEdgeManager.currentTime() - task.last_update; 180 final boolean alive = 181 details.getMaster().getServerManager() != null ? details.getMaster().getServerManager() 182 .isServerOnline(task.cur_worker_name) : true; 183 if (alive && time < timeout) { 184 LOG.trace("Skipping the resubmit of " + task.toString() + " because the server " 185 + task.cur_worker_name + " is not marked as dead, we waited for " + time 186 + " while the timeout is " + timeout); 187 return false; 188 } 189 190 if (task.unforcedResubmits.get() >= resubmitThreshold) { 191 if (!task.resubmitThresholdReached) { 192 task.resubmitThresholdReached = true; 193 SplitLogCounters.tot_mgr_resubmit_threshold_reached.increment(); 194 LOG.info("Skipping resubmissions of task " + path + " because threshold " 195 + resubmitThreshold + " reached"); 196 } 197 return false; 198 } 199 // race with heartbeat() that might be changing last_version 200 version = task.last_version; 201 } else { 202 SplitLogCounters.tot_mgr_resubmit_force.increment(); 203 version = -1; 204 } 205 LOG.info("Resubmitting task " + path); 206 task.incarnation.incrementAndGet(); 207 boolean result = resubmit(path, version); 208 if (!result) { 209 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime()); 210 return false; 211 } 212 // don't count forced resubmits 213 if (directive != FORCE) { 214 task.unforcedResubmits.incrementAndGet(); 215 } 216 task.setUnassigned(); 217 rescan(Long.MAX_VALUE); 218 SplitLogCounters.tot_mgr_resubmit.increment(); 219 return true; 220 } 221 222 223 @Override 224 public void checkTasks() { 225 rescan(Long.MAX_VALUE); 226 }; 227 228 /** 229 * signal the workers that a task was resubmitted by creating the RESCAN node. 230 */ 231 private void rescan(long retries) { 232 // The RESCAN node will be deleted almost immediately by the 233 // SplitLogManager as soon as it is created because it is being 234 // created in the DONE state. This behavior prevents a buildup 235 // of RESCAN nodes. But there is also a chance that a SplitLogWorker 236 // might miss the watch-trigger that creation of RESCAN node provides. 237 // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks 238 // therefore this behavior is safe. 239 SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName()); 240 this.watcher 241 .getRecoverableZooKeeper() 242 .getZooKeeper() 243 .create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 244 CreateMode.EPHEMERAL_SEQUENTIAL, new CreateRescanAsyncCallback(), Long.valueOf(retries)); 245 } 246 247 @Override 248 public void submitTask(String path) { 249 createNode(path, zkretries); 250 } 251 252 @Override 253 public void checkTaskStillAvailable(String path) { 254 // A negative retry count will lead to ignoring all error processing. 255 this.watcher 256 .getRecoverableZooKeeper() 257 .getZooKeeper() 258 .getData(path, this.watcher, new GetDataAsyncCallback(), 259 Long.valueOf(-1) /* retry count */); 260 SplitLogCounters.tot_mgr_get_data_queued.increment(); 261 } 262 263 private void deleteNode(String path, Long retries) { 264 SplitLogCounters.tot_mgr_node_delete_queued.increment(); 265 // Once a task znode is ready for delete, that is it is in the TASK_DONE 266 // state, then no one should be writing to it anymore. That is no one 267 // will be updating the znode version any more. 268 this.watcher.getRecoverableZooKeeper().getZooKeeper() 269 .delete(path, -1, new DeleteAsyncCallback(), retries); 270 } 271 272 private void deleteNodeSuccess(String path) { 273 if (ignoreZKDeleteForTesting) { 274 return; 275 } 276 Task task; 277 task = details.getTasks().remove(path); 278 if (task == null) { 279 if (ZKSplitLog.isRescanNode(watcher, path)) { 280 SplitLogCounters.tot_mgr_rescan_deleted.increment(); 281 } 282 SplitLogCounters.tot_mgr_missing_state_in_delete.increment(); 283 LOG.debug("Deleted task without in memory state " + path); 284 return; 285 } 286 synchronized (task) { 287 task.status = DELETED; 288 task.notify(); 289 } 290 SplitLogCounters.tot_mgr_task_deleted.increment(); 291 } 292 293 private void deleteNodeFailure(String path) { 294 LOG.info("Failed to delete node " + path + " and will retry soon."); 295 return; 296 } 297 298 private void createRescanSuccess(String path) { 299 SplitLogCounters.tot_mgr_rescan.increment(); 300 getDataSetWatch(path, zkretries); 301 } 302 303 private void createRescanFailure() { 304 LOG.error(HBaseMarkers.FATAL, "logic failure, rescan failure must not happen"); 305 } 306 307 /** 308 * Helper function to check whether to abandon retries in ZooKeeper AsyncCallback functions 309 * @param statusCode integer value of a ZooKeeper exception code 310 * @param action description message about the retried action 311 * @return true when need to abandon retries otherwise false 312 */ 313 private boolean needAbandonRetries(int statusCode, String action) { 314 if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) { 315 LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for " 316 + "action=" + action); 317 return true; 318 } 319 return false; 320 } 321 322 private void createNode(String path, Long retry_count) { 323 SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName()); 324 ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), 325 retry_count); 326 SplitLogCounters.tot_mgr_node_create_queued.increment(); 327 return; 328 } 329 330 private void createNodeSuccess(String path) { 331 LOG.debug("Put up splitlog task at znode " + path); 332 getDataSetWatch(path, zkretries); 333 } 334 335 private void createNodeFailure(String path) { 336 // TODO the Manager should split the log locally instead of giving up 337 LOG.warn("Failed to create task node " + path); 338 setDone(path, FAILURE); 339 } 340 341 private void getDataSetWatch(String path, Long retry_count) { 342 this.watcher.getRecoverableZooKeeper().getZooKeeper() 343 .getData(path, this.watcher, new GetDataAsyncCallback(), retry_count); 344 SplitLogCounters.tot_mgr_get_data_queued.increment(); 345 } 346 347 private void getDataSetWatchSuccess(String path, byte[] data, int version) 348 throws DeserializationException { 349 if (data == null) { 350 if (version == Integer.MIN_VALUE) { 351 // assume all done. The task znode suddenly disappeared. 352 setDone(path, SUCCESS); 353 return; 354 } 355 SplitLogCounters.tot_mgr_null_data.increment(); 356 LOG.error(HBaseMarkers.FATAL, "logic error - got null data " + path); 357 setDone(path, FAILURE); 358 return; 359 } 360 data = ZKMetadata.removeMetaData(data); 361 SplitLogTask slt = SplitLogTask.parseFrom(data); 362 if (slt.isUnassigned()) { 363 LOG.debug("Task not yet acquired " + path + ", ver=" + version); 364 handleUnassignedTask(path); 365 } else if (slt.isOwned()) { 366 heartbeat(path, version, slt.getServerName()); 367 } else if (slt.isResigned()) { 368 LOG.info("Task " + path + " entered state=" + slt.toString()); 369 resubmitOrFail(path, FORCE); 370 } else if (slt.isDone()) { 371 LOG.info("Task " + path + " entered state=" + slt.toString()); 372 if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) { 373 if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) { 374 setDone(path, SUCCESS); 375 } else { 376 resubmitOrFail(path, CHECK); 377 } 378 } else { 379 setDone(path, SUCCESS); 380 } 381 } else if (slt.isErr()) { 382 LOG.info("Task " + path + " entered state=" + slt.toString()); 383 resubmitOrFail(path, CHECK); 384 } else { 385 LOG.error(HBaseMarkers.FATAL, "logic error - unexpected zk state for path = " 386 + path + " data = " + slt.toString()); 387 setDone(path, FAILURE); 388 } 389 } 390 391 private void resubmitOrFail(String path, ResubmitDirective directive) { 392 if (resubmitTask(path, findOrCreateOrphanTask(path), directive) == false) { 393 setDone(path, FAILURE); 394 } 395 } 396 397 private void getDataSetWatchFailure(String path) { 398 LOG.warn("Failed to set data watch " + path); 399 setDone(path, FAILURE); 400 } 401 402 private void setDone(String path, TerminationStatus status) { 403 Task task = details.getTasks().get(path); 404 if (task == null) { 405 if (!ZKSplitLog.isRescanNode(watcher, path)) { 406 SplitLogCounters.tot_mgr_unacquired_orphan_done.increment(); 407 LOG.debug("Unacquired orphan task is done " + path); 408 } 409 } else { 410 synchronized (task) { 411 if (task.status == IN_PROGRESS) { 412 if (status == SUCCESS) { 413 SplitLogCounters.tot_mgr_log_split_success.increment(); 414 LOG.info("Done splitting " + path); 415 } else { 416 SplitLogCounters.tot_mgr_log_split_err.increment(); 417 LOG.warn("Error splitting " + path); 418 } 419 task.status = status; 420 if (task.batch != null) { 421 synchronized (task.batch) { 422 if (status == SUCCESS) { 423 task.batch.done++; 424 } else { 425 task.batch.error++; 426 } 427 task.batch.notify(); 428 } 429 } 430 } 431 } 432 } 433 // delete the task node in zk. It's an async 434 // call and no one is blocked waiting for this node to be deleted. All 435 // task names are unique (log.<timestamp>) there is no risk of deleting 436 // a future task. 437 // if a deletion fails, TimeoutMonitor will retry the same deletion later 438 deleteNode(path, zkretries); 439 return; 440 } 441 442 private Task findOrCreateOrphanTask(String path) { 443 return computeIfAbsent(details.getTasks(), path, Task::new, () -> { 444 LOG.info("Creating orphan task " + path); 445 SplitLogCounters.tot_mgr_orphan_task_acquired.increment(); 446 }); 447 } 448 449 private void heartbeat(String path, int new_version, ServerName workerName) { 450 Task task = findOrCreateOrphanTask(path); 451 if (new_version != task.last_version) { 452 if (task.isUnassigned()) { 453 LOG.info("Task " + path + " acquired by " + workerName); 454 } 455 task.heartbeat(EnvironmentEdgeManager.currentTime(), new_version, workerName); 456 SplitLogCounters.tot_mgr_heartbeat.increment(); 457 } else { 458 // duplicate heartbeats - heartbeats w/o zk node version 459 // changing - are possible. The timeout thread does 460 // getDataSetWatch() just to check whether a node still 461 // exists or not 462 } 463 return; 464 } 465 466 private void lookForOrphans() { 467 List<String> orphans; 468 try { 469 orphans = ZKUtil.listChildrenNoWatch(this.watcher, this.watcher.znodePaths.splitLogZNode); 470 if (orphans == null) { 471 LOG.warn("Could not get children of " + this.watcher.znodePaths.splitLogZNode); 472 return; 473 } 474 } catch (KeeperException e) { 475 LOG.warn("Could not get children of " + this.watcher.znodePaths.splitLogZNode + " " 476 + StringUtils.stringifyException(e)); 477 return; 478 } 479 int rescan_nodes = 0; 480 int listSize = orphans.size(); 481 for (int i = 0; i < listSize; i++) { 482 String path = orphans.get(i); 483 String nodepath = ZNodePaths.joinZNode(watcher.znodePaths.splitLogZNode, path); 484 if (ZKSplitLog.isRescanNode(watcher, nodepath)) { 485 rescan_nodes++; 486 LOG.debug("Found orphan rescan node " + path); 487 } else { 488 LOG.info("Found orphan task " + path); 489 } 490 getDataSetWatch(nodepath, zkretries); 491 } 492 LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " + rescan_nodes 493 + " rescan nodes"); 494 } 495 496 @Override 497 public void nodeDataChanged(String path) { 498 Task task; 499 task = details.getTasks().get(path); 500 if (task != null || ZKSplitLog.isRescanNode(watcher, path)) { 501 if (task != null) { 502 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime()); 503 } 504 getDataSetWatch(path, zkretries); 505 } 506 } 507 508 private boolean resubmit(String path, int version) { 509 try { 510 // blocking zk call but this is done from the timeout thread 511 SplitLogTask slt = 512 new SplitLogTask.Unassigned(this.details.getServerName()); 513 if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) { 514 LOG.debug("Failed to resubmit task " + path + " version changed"); 515 return false; 516 } 517 } catch (NoNodeException e) { 518 LOG.warn("Failed to resubmit because znode doesn't exist " + path 519 + " task done (or forced done by removing the znode)"); 520 try { 521 getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); 522 } catch (DeserializationException e1) { 523 LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1); 524 return false; 525 } 526 return false; 527 } catch (KeeperException.BadVersionException e) { 528 LOG.debug("Failed to resubmit task " + path + " version changed"); 529 return false; 530 } catch (KeeperException e) { 531 SplitLogCounters.tot_mgr_resubmit_failed.increment(); 532 LOG.warn("Failed to resubmit " + path, e); 533 return false; 534 } 535 return true; 536 } 537 538 539 /** 540 * {@link org.apache.hadoop.hbase.master.SplitLogManager} can use objects implementing this 541 * interface to finish off a partially done task by 542 * {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}. This provides a 543 * serialization point at the end of the task processing. Must be restartable and idempotent. 544 */ 545 public interface TaskFinisher { 546 /** 547 * status that can be returned finish() 548 */ 549 enum Status { 550 /** 551 * task completed successfully 552 */ 553 DONE(), 554 /** 555 * task completed with error 556 */ 557 ERR(); 558 } 559 560 /** 561 * finish the partially done task. workername provides clue to where the partial results of the 562 * partially done tasks are present. taskname is the name of the task that was put up in 563 * zookeeper. 564 * <p> 565 * @param workerName 566 * @param taskname 567 * @return DONE if task completed successfully, ERR otherwise 568 */ 569 Status finish(ServerName workerName, String taskname); 570 } 571 572 /** 573 * Asynchronous handler for zk create node results. Retries on failures. 574 */ 575 public class CreateAsyncCallback implements AsyncCallback.StringCallback { 576 private final Logger LOG = LoggerFactory.getLogger(CreateAsyncCallback.class); 577 578 @Override 579 public void processResult(int rc, String path, Object ctx, String name) { 580 SplitLogCounters.tot_mgr_node_create_result.increment(); 581 if (rc != 0) { 582 if (needAbandonRetries(rc, "Create znode " + path)) { 583 createNodeFailure(path); 584 return; 585 } 586 if (rc == KeeperException.Code.NODEEXISTS.intValue()) { 587 // What if there is a delete pending against this pre-existing 588 // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE 589 // state. Only operations that will be carried out on this node by 590 // this manager are get-znode-data, task-finisher and delete-znode. 591 // And all code pieces correctly handle the case of suddenly 592 // disappearing task-znode. 593 LOG.debug("Found pre-existing znode " + path); 594 SplitLogCounters.tot_mgr_node_already_exists.increment(); 595 } else { 596 Long retry_count = (Long) ctx; 597 LOG.warn("Create rc=" + KeeperException.Code.get(rc) + " for " + path 598 + " remaining retries=" + retry_count); 599 if (retry_count == 0) { 600 SplitLogCounters.tot_mgr_node_create_err.increment(); 601 createNodeFailure(path); 602 } else { 603 SplitLogCounters.tot_mgr_node_create_retry.increment(); 604 createNode(path, retry_count - 1); 605 } 606 return; 607 } 608 } 609 createNodeSuccess(path); 610 } 611 } 612 613 /** 614 * Asynchronous handler for zk get-data-set-watch on node results. Retries on failures. 615 */ 616 public class GetDataAsyncCallback implements AsyncCallback.DataCallback { 617 private final Logger LOG = LoggerFactory.getLogger(GetDataAsyncCallback.class); 618 619 @Override 620 public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { 621 SplitLogCounters.tot_mgr_get_data_result.increment(); 622 if (rc != 0) { 623 if (needAbandonRetries(rc, "GetData from znode " + path)) { 624 return; 625 } 626 if (rc == KeeperException.Code.NONODE.intValue()) { 627 SplitLogCounters.tot_mgr_get_data_nonode.increment(); 628 LOG.warn("Task znode " + path + " vanished or not created yet."); 629 // ignore since we should not end up in a case where there is in-memory task, 630 // but no znode. The only case is between the time task is created in-memory 631 // and the znode is created. See HBASE-11217. 632 return; 633 } 634 Long retry_count = (Long) ctx; 635 636 if (retry_count < 0) { 637 LOG.warn("Getdata rc=" + KeeperException.Code.get(rc) + " " + path 638 + ". Ignoring error. No error handling. No retrying."); 639 return; 640 } 641 LOG.warn("Getdata rc=" + KeeperException.Code.get(rc) + " " + path 642 + " remaining retries=" + retry_count); 643 if (retry_count == 0) { 644 SplitLogCounters.tot_mgr_get_data_err.increment(); 645 getDataSetWatchFailure(path); 646 } else { 647 SplitLogCounters.tot_mgr_get_data_retry.increment(); 648 getDataSetWatch(path, retry_count - 1); 649 } 650 return; 651 } 652 try { 653 getDataSetWatchSuccess(path, data, stat.getVersion()); 654 } catch (DeserializationException e) { 655 LOG.warn("Deserialization problem", e); 656 } 657 return; 658 } 659 } 660 661 /** 662 * Asynchronous handler for zk delete node results. Retries on failures. 663 */ 664 public class DeleteAsyncCallback implements AsyncCallback.VoidCallback { 665 private final Logger LOG = LoggerFactory.getLogger(DeleteAsyncCallback.class); 666 667 @Override 668 public void processResult(int rc, String path, Object ctx) { 669 SplitLogCounters.tot_mgr_node_delete_result.increment(); 670 if (rc != 0) { 671 if (needAbandonRetries(rc, "Delete znode " + path)) { 672 details.getFailedDeletions().add(path); 673 return; 674 } 675 if (rc != KeeperException.Code.NONODE.intValue()) { 676 SplitLogCounters.tot_mgr_node_delete_err.increment(); 677 Long retry_count = (Long) ctx; 678 LOG.warn("Delete rc=" + KeeperException.Code.get(rc) + " for " + path 679 + " remaining retries=" + retry_count); 680 if (retry_count == 0) { 681 LOG.warn("Delete failed " + path); 682 details.getFailedDeletions().add(path); 683 deleteNodeFailure(path); 684 } else { 685 deleteNode(path, retry_count - 1); 686 } 687 return; 688 } else { 689 LOG.info(path + " does not exist. Either was created but deleted behind our" 690 + " back by another pending delete OR was deleted" 691 + " in earlier retry rounds. zkretries = " + ctx); 692 } 693 } else { 694 LOG.debug("Deleted " + path); 695 } 696 deleteNodeSuccess(path); 697 } 698 } 699 700 /** 701 * Asynchronous handler for zk create RESCAN-node results. Retries on failures. 702 * <p> 703 * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal for all the 704 * {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}s to rescan for new tasks. 705 */ 706 public class CreateRescanAsyncCallback implements AsyncCallback.StringCallback { 707 private final Logger LOG = LoggerFactory.getLogger(CreateRescanAsyncCallback.class); 708 709 @Override 710 public void processResult(int rc, String path, Object ctx, String name) { 711 if (rc != 0) { 712 if (needAbandonRetries(rc, "CreateRescan znode " + path)) { 713 return; 714 } 715 Long retry_count = (Long) ctx; 716 LOG.warn("rc=" + KeeperException.Code.get(rc) + " for " + path + " remaining retries=" 717 + retry_count); 718 if (retry_count == 0) { 719 createRescanFailure(); 720 } else { 721 rescan(retry_count - 1); 722 } 723 return; 724 } 725 // path is the original arg, name is the actual name that was created 726 createRescanSuccess(name); 727 } 728 } 729 730 @Override 731 public void setDetails(SplitLogManagerDetails details) { 732 this.details = details; 733 } 734 735 @Override 736 public SplitLogManagerDetails getDetails() { 737 return details; 738 } 739 740 /** 741 * Temporary function that is used by unit tests only 742 */ 743 @VisibleForTesting 744 public void setIgnoreDeleteForTesting(boolean b) { 745 ignoreZKDeleteForTesting = b; 746 } 747}