001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.coordination; 020 021import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK; 022import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE; 023import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED; 024import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE; 025import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS; 026import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS; 027import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 028 029import java.io.IOException; 030import java.util.List; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.SplitLogCounters; 036import org.apache.hadoop.hbase.SplitLogTask; 037import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination.TaskFinisher.Status; 038import org.apache.hadoop.hbase.exceptions.DeserializationException; 039import org.apache.hadoop.hbase.log.HBaseMarkers; 040import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective; 041import org.apache.hadoop.hbase.master.SplitLogManager.Task; 042import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus; 043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 044import org.apache.hadoop.hbase.wal.WALSplitUtil; 045import org.apache.hadoop.hbase.zookeeper.ZKListener; 046import org.apache.hadoop.hbase.zookeeper.ZKMetadata; 047import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 048import org.apache.hadoop.hbase.zookeeper.ZKUtil; 049import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 050import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 051import org.apache.hadoop.util.StringUtils; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.apache.zookeeper.AsyncCallback; 054import org.apache.zookeeper.CreateMode; 055import org.apache.zookeeper.KeeperException; 056import org.apache.zookeeper.KeeperException.NoNodeException; 057import org.apache.zookeeper.ZooDefs.Ids; 058import org.apache.zookeeper.data.Stat; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062/** 063 * ZooKeeper based implementation of 064 * {@link SplitLogManagerCoordination} 065 */ 066@InterfaceAudience.Private 067public class ZKSplitLogManagerCoordination extends ZKListener implements 068 SplitLogManagerCoordination { 069 070 public static final int DEFAULT_TIMEOUT = 120000; 071 public static final int DEFAULT_ZK_RETRIES = 3; 072 public static final int DEFAULT_MAX_RESUBMIT = 3; 073 074 private static final Logger LOG = LoggerFactory.getLogger(SplitLogManagerCoordination.class); 075 076 private final TaskFinisher taskFinisher; 077 private final Configuration conf; 078 079 private long zkretries; 080 private long resubmitThreshold; 081 private long timeout; 082 083 SplitLogManagerDetails details; 084 085 public boolean ignoreZKDeleteForTesting = false; 086 087 public ZKSplitLogManagerCoordination(Configuration conf, ZKWatcher watcher) { 088 super(watcher); 089 this.conf = conf; 090 taskFinisher = new TaskFinisher() { 091 @Override 092 public Status finish(ServerName workerName, String logfile) { 093 try { 094 WALSplitUtil.finishSplitLogFile(logfile, conf); 095 } catch (IOException e) { 096 LOG.warn("Could not finish splitting of log file " + logfile, e); 097 return Status.ERR; 098 } 099 return Status.DONE; 100 } 101 }; 102 } 103 104 @Override 105 public void init() throws IOException { 106 this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES); 107 this.resubmitThreshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT); 108 this.timeout = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, DEFAULT_TIMEOUT); 109 if (this.watcher != null) { 110 this.watcher.registerListener(this); 111 lookForOrphans(); 112 } 113 } 114 115 @Override 116 public String prepareTask(String taskname) { 117 return ZKSplitLog.getEncodedNodeName(watcher, taskname); 118 } 119 120 @Override 121 public int remainingTasksInCoordination() { 122 int count = 0; 123 try { 124 List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, 125 watcher.getZNodePaths().splitLogZNode); 126 if (tasks != null) { 127 int listSize = tasks.size(); 128 for (int i = 0; i < listSize; i++) { 129 if (!ZKSplitLog.isRescanNode(tasks.get(i))) { 130 count++; 131 } 132 } 133 } 134 } catch (KeeperException ke) { 135 LOG.warn("Failed to check remaining tasks", ke); 136 count = -1; 137 } 138 return count; 139 } 140 141 /** 142 * It is possible for a task to stay in UNASSIGNED state indefinitely - say SplitLogManager wants 143 * to resubmit a task. It forces the task to UNASSIGNED state but it dies before it could create 144 * the RESCAN task node to signal the SplitLogWorkers to pick up the task. To prevent this 145 * scenario the SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup. 146 * @param path 147 */ 148 private void handleUnassignedTask(String path) { 149 if (ZKSplitLog.isRescanNode(watcher, path)) { 150 return; 151 } 152 Task task = findOrCreateOrphanTask(path); 153 if (task.isOrphan() && (task.incarnation.get() == 0)) { 154 LOG.info("Resubmitting unassigned orphan task " + path); 155 // ignore failure to resubmit. The timeout-monitor will handle it later 156 // albeit in a more crude fashion 157 resubmitTask(path, task, FORCE); 158 } 159 } 160 161 @Override 162 public void deleteTask(String path) { 163 deleteNode(path, zkretries); 164 } 165 166 @Override 167 public boolean resubmitTask(String path, Task task, ResubmitDirective directive) { 168 // its ok if this thread misses the update to task.deleted. It will fail later 169 if (task.status != IN_PROGRESS) { 170 return false; 171 } 172 int version; 173 if (directive != FORCE) { 174 // We're going to resubmit: 175 // 1) immediately if the worker server is now marked as dead 176 // 2) after a configurable timeout if the server is not marked as dead but has still not 177 // finished the task. This allows to continue if the worker cannot actually handle it, 178 // for any reason. 179 final long time = EnvironmentEdgeManager.currentTime() - task.last_update; 180 final boolean alive = 181 details.getMaster().getServerManager() != null ? details.getMaster().getServerManager() 182 .isServerOnline(task.cur_worker_name) : true; 183 if (alive && time < timeout) { 184 LOG.trace("Skipping the resubmit of " + task.toString() + " because the server " 185 + task.cur_worker_name + " is not marked as dead, we waited for " + time 186 + " while the timeout is " + timeout); 187 return false; 188 } 189 190 if (task.unforcedResubmits.get() >= resubmitThreshold) { 191 if (!task.resubmitThresholdReached) { 192 task.resubmitThresholdReached = true; 193 SplitLogCounters.tot_mgr_resubmit_threshold_reached.increment(); 194 LOG.info("Skipping resubmissions of task " + path + " because threshold " 195 + resubmitThreshold + " reached"); 196 } 197 return false; 198 } 199 // race with heartbeat() that might be changing last_version 200 version = task.last_version; 201 } else { 202 SplitLogCounters.tot_mgr_resubmit_force.increment(); 203 version = -1; 204 } 205 LOG.info("Resubmitting task " + path); 206 task.incarnation.incrementAndGet(); 207 boolean result = resubmit(path, version); 208 if (!result) { 209 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime()); 210 return false; 211 } 212 // don't count forced resubmits 213 if (directive != FORCE) { 214 task.unforcedResubmits.incrementAndGet(); 215 } 216 task.setUnassigned(); 217 rescan(Long.MAX_VALUE); 218 SplitLogCounters.tot_mgr_resubmit.increment(); 219 return true; 220 } 221 222 223 @Override 224 public void checkTasks() { 225 rescan(Long.MAX_VALUE); 226 }; 227 228 /** 229 * signal the workers that a task was resubmitted by creating the RESCAN node. 230 */ 231 private void rescan(long retries) { 232 // The RESCAN node will be deleted almost immediately by the 233 // SplitLogManager as soon as it is created because it is being 234 // created in the DONE state. This behavior prevents a buildup 235 // of RESCAN nodes. But there is also a chance that a SplitLogWorker 236 // might miss the watch-trigger that creation of RESCAN node provides. 237 // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks 238 // therefore this behavior is safe. 239 SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName()); 240 this.watcher 241 .getRecoverableZooKeeper() 242 .getZooKeeper() 243 .create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, 244 CreateMode.EPHEMERAL_SEQUENTIAL, new CreateRescanAsyncCallback(), Long.valueOf(retries)); 245 } 246 247 @Override 248 public void submitTask(String path) { 249 createNode(path, zkretries); 250 } 251 252 @Override 253 public void checkTaskStillAvailable(String path) { 254 // A negative retry count will lead to ignoring all error processing. 255 this.watcher 256 .getRecoverableZooKeeper() 257 .getZooKeeper() 258 .getData(path, this.watcher, new GetDataAsyncCallback(), 259 Long.valueOf(-1) /* retry count */); 260 SplitLogCounters.tot_mgr_get_data_queued.increment(); 261 } 262 263 private void deleteNode(String path, Long retries) { 264 SplitLogCounters.tot_mgr_node_delete_queued.increment(); 265 // Once a task znode is ready for delete, that is it is in the TASK_DONE 266 // state, then no one should be writing to it anymore. That is no one 267 // will be updating the znode version any more. 268 this.watcher.getRecoverableZooKeeper().getZooKeeper() 269 .delete(path, -1, new DeleteAsyncCallback(), retries); 270 } 271 272 private void deleteNodeSuccess(String path) { 273 if (ignoreZKDeleteForTesting) { 274 return; 275 } 276 Task task; 277 task = details.getTasks().remove(path); 278 if (task == null) { 279 if (ZKSplitLog.isRescanNode(watcher, path)) { 280 SplitLogCounters.tot_mgr_rescan_deleted.increment(); 281 } 282 SplitLogCounters.tot_mgr_missing_state_in_delete.increment(); 283 LOG.debug("Deleted task without in memory state " + path); 284 return; 285 } 286 synchronized (task) { 287 task.status = DELETED; 288 task.notify(); 289 } 290 SplitLogCounters.tot_mgr_task_deleted.increment(); 291 } 292 293 private void deleteNodeFailure(String path) { 294 LOG.info("Failed to delete node " + path + " and will retry soon."); 295 return; 296 } 297 298 private void createRescanSuccess(String path) { 299 SplitLogCounters.tot_mgr_rescan.increment(); 300 getDataSetWatch(path, zkretries); 301 } 302 303 private void createRescanFailure() { 304 LOG.error(HBaseMarkers.FATAL, "logic failure, rescan failure must not happen"); 305 } 306 307 /** 308 * Helper function to check whether to abandon retries in ZooKeeper AsyncCallback functions 309 * @param statusCode integer value of a ZooKeeper exception code 310 * @param action description message about the retried action 311 * @return true when need to abandon retries otherwise false 312 */ 313 private boolean needAbandonRetries(int statusCode, String action) { 314 if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) { 315 LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for " 316 + "action=" + action); 317 return true; 318 } 319 return false; 320 } 321 322 private void createNode(String path, Long retry_count) { 323 SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName()); 324 ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), 325 retry_count); 326 SplitLogCounters.tot_mgr_node_create_queued.increment(); 327 return; 328 } 329 330 private void createNodeSuccess(String path) { 331 LOG.debug("Put up splitlog task at znode " + path); 332 getDataSetWatch(path, zkretries); 333 } 334 335 private void createNodeFailure(String path) { 336 // TODO the Manager should split the log locally instead of giving up 337 LOG.warn("Failed to create task node " + path); 338 setDone(path, FAILURE); 339 } 340 341 private void getDataSetWatch(String path, Long retry_count) { 342 this.watcher.getRecoverableZooKeeper().getZooKeeper() 343 .getData(path, this.watcher, new GetDataAsyncCallback(), retry_count); 344 SplitLogCounters.tot_mgr_get_data_queued.increment(); 345 } 346 347 private void getDataSetWatchSuccess(String path, byte[] data, int version) 348 throws DeserializationException { 349 if (data == null) { 350 if (version == Integer.MIN_VALUE) { 351 // assume all done. The task znode suddenly disappeared. 352 setDone(path, SUCCESS); 353 return; 354 } 355 SplitLogCounters.tot_mgr_null_data.increment(); 356 LOG.error(HBaseMarkers.FATAL, "logic error - got null data " + path); 357 setDone(path, FAILURE); 358 return; 359 } 360 data = ZKMetadata.removeMetaData(data); 361 SplitLogTask slt = SplitLogTask.parseFrom(data); 362 if (slt.isUnassigned()) { 363 LOG.debug("Task not yet acquired " + path + ", ver=" + version); 364 handleUnassignedTask(path); 365 } else if (slt.isOwned()) { 366 heartbeat(path, version, slt.getServerName()); 367 } else if (slt.isResigned()) { 368 LOG.info("Task " + path + " entered state=" + slt.toString()); 369 resubmitOrFail(path, FORCE); 370 } else if (slt.isDone()) { 371 LOG.info("Task " + path + " entered state=" + slt.toString()); 372 if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) { 373 if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) { 374 setDone(path, SUCCESS); 375 } else { 376 resubmitOrFail(path, CHECK); 377 } 378 } else { 379 setDone(path, SUCCESS); 380 } 381 } else if (slt.isErr()) { 382 LOG.info("Task " + path + " entered state=" + slt.toString()); 383 resubmitOrFail(path, CHECK); 384 } else { 385 LOG.error(HBaseMarkers.FATAL, "logic error - unexpected zk state for path = " 386 + path + " data = " + slt.toString()); 387 setDone(path, FAILURE); 388 } 389 } 390 391 private void resubmitOrFail(String path, ResubmitDirective directive) { 392 if (resubmitTask(path, findOrCreateOrphanTask(path), directive) == false) { 393 setDone(path, FAILURE); 394 } 395 } 396 397 private void getDataSetWatchFailure(String path) { 398 LOG.warn("Failed to set data watch " + path); 399 setDone(path, FAILURE); 400 } 401 402 private void setDone(String path, TerminationStatus status) { 403 Task task = details.getTasks().get(path); 404 if (task == null) { 405 if (!ZKSplitLog.isRescanNode(watcher, path)) { 406 SplitLogCounters.tot_mgr_unacquired_orphan_done.increment(); 407 LOG.debug("Unacquired orphan task is done " + path); 408 } 409 } else { 410 synchronized (task) { 411 if (task.status == IN_PROGRESS) { 412 if (status == SUCCESS) { 413 SplitLogCounters.tot_mgr_log_split_success.increment(); 414 LOG.info("Done splitting " + path); 415 } else { 416 SplitLogCounters.tot_mgr_log_split_err.increment(); 417 LOG.warn("Error splitting " + path); 418 } 419 task.status = status; 420 if (task.batch != null) { 421 synchronized (task.batch) { 422 if (status == SUCCESS) { 423 task.batch.done++; 424 } else { 425 task.batch.error++; 426 } 427 task.batch.notify(); 428 } 429 } 430 } 431 } 432 } 433 // delete the task node in zk. It's an async 434 // call and no one is blocked waiting for this node to be deleted. All 435 // task names are unique (log.<timestamp>) there is no risk of deleting 436 // a future task. 437 // if a deletion fails, TimeoutMonitor will retry the same deletion later 438 deleteNode(path, zkretries); 439 return; 440 } 441 442 private Task findOrCreateOrphanTask(String path) { 443 return computeIfAbsent(details.getTasks(), path, Task::new, () -> { 444 LOG.info("Creating orphan task " + path); 445 SplitLogCounters.tot_mgr_orphan_task_acquired.increment(); 446 }); 447 } 448 449 private void heartbeat(String path, int new_version, ServerName workerName) { 450 Task task = findOrCreateOrphanTask(path); 451 if (new_version != task.last_version) { 452 if (task.isUnassigned()) { 453 LOG.info("Task " + path + " acquired by " + workerName); 454 } 455 task.heartbeat(EnvironmentEdgeManager.currentTime(), new_version, workerName); 456 SplitLogCounters.tot_mgr_heartbeat.increment(); 457 } else { 458 // duplicate heartbeats - heartbeats w/o zk node version 459 // changing - are possible. The timeout thread does 460 // getDataSetWatch() just to check whether a node still 461 // exists or not 462 } 463 return; 464 } 465 466 private void lookForOrphans() { 467 List<String> orphans; 468 try { 469 orphans = ZKUtil.listChildrenNoWatch(this.watcher, 470 this.watcher.getZNodePaths().splitLogZNode); 471 if (orphans == null) { 472 LOG.warn("Could not get children of " + this.watcher.getZNodePaths().splitLogZNode); 473 return; 474 } 475 } catch (KeeperException e) { 476 LOG.warn("Could not get children of " + this.watcher.getZNodePaths().splitLogZNode + " " 477 + StringUtils.stringifyException(e)); 478 return; 479 } 480 int rescan_nodes = 0; 481 int listSize = orphans.size(); 482 for (int i = 0; i < listSize; i++) { 483 String path = orphans.get(i); 484 String nodepath = ZNodePaths.joinZNode(watcher.getZNodePaths().splitLogZNode, path); 485 if (ZKSplitLog.isRescanNode(watcher, nodepath)) { 486 rescan_nodes++; 487 LOG.debug("Found orphan rescan node " + path); 488 } else { 489 LOG.info("Found orphan task " + path); 490 } 491 getDataSetWatch(nodepath, zkretries); 492 } 493 LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " + rescan_nodes 494 + " rescan nodes"); 495 } 496 497 @Override 498 public void nodeDataChanged(String path) { 499 Task task; 500 task = details.getTasks().get(path); 501 if (task != null || ZKSplitLog.isRescanNode(watcher, path)) { 502 if (task != null) { 503 task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime()); 504 } 505 getDataSetWatch(path, zkretries); 506 } 507 } 508 509 private boolean resubmit(String path, int version) { 510 try { 511 // blocking zk call but this is done from the timeout thread 512 SplitLogTask slt = 513 new SplitLogTask.Unassigned(this.details.getServerName()); 514 if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) { 515 LOG.debug("Failed to resubmit task " + path + " version changed"); 516 return false; 517 } 518 } catch (NoNodeException e) { 519 LOG.warn("Failed to resubmit because znode doesn't exist " + path 520 + " task done (or forced done by removing the znode)"); 521 try { 522 getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); 523 } catch (DeserializationException e1) { 524 LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1); 525 return false; 526 } 527 return false; 528 } catch (KeeperException.BadVersionException e) { 529 LOG.debug("Failed to resubmit task " + path + " version changed"); 530 return false; 531 } catch (KeeperException e) { 532 SplitLogCounters.tot_mgr_resubmit_failed.increment(); 533 LOG.warn("Failed to resubmit " + path, e); 534 return false; 535 } 536 return true; 537 } 538 539 540 /** 541 * {@link org.apache.hadoop.hbase.master.SplitLogManager} can use objects implementing this 542 * interface to finish off a partially done task by 543 * {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}. This provides a 544 * serialization point at the end of the task processing. Must be restartable and idempotent. 545 */ 546 public interface TaskFinisher { 547 /** 548 * status that can be returned finish() 549 */ 550 enum Status { 551 /** 552 * task completed successfully 553 */ 554 DONE(), 555 /** 556 * task completed with error 557 */ 558 ERR(); 559 } 560 561 /** 562 * finish the partially done task. workername provides clue to where the partial results of the 563 * partially done tasks are present. taskname is the name of the task that was put up in 564 * zookeeper. 565 * <p> 566 * @param workerName 567 * @param taskname 568 * @return DONE if task completed successfully, ERR otherwise 569 */ 570 Status finish(ServerName workerName, String taskname); 571 } 572 573 /** 574 * Asynchronous handler for zk create node results. Retries on failures. 575 */ 576 public class CreateAsyncCallback implements AsyncCallback.StringCallback { 577 private final Logger LOG = LoggerFactory.getLogger(CreateAsyncCallback.class); 578 579 @Override 580 public void processResult(int rc, String path, Object ctx, String name) { 581 SplitLogCounters.tot_mgr_node_create_result.increment(); 582 if (rc != 0) { 583 if (needAbandonRetries(rc, "Create znode " + path)) { 584 createNodeFailure(path); 585 return; 586 } 587 if (rc == KeeperException.Code.NODEEXISTS.intValue()) { 588 // What if there is a delete pending against this pre-existing 589 // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE 590 // state. Only operations that will be carried out on this node by 591 // this manager are get-znode-data, task-finisher and delete-znode. 592 // And all code pieces correctly handle the case of suddenly 593 // disappearing task-znode. 594 LOG.debug("Found pre-existing znode " + path); 595 SplitLogCounters.tot_mgr_node_already_exists.increment(); 596 } else { 597 Long retry_count = (Long) ctx; 598 LOG.warn("Create rc=" + KeeperException.Code.get(rc) + " for " + path 599 + " remaining retries=" + retry_count); 600 if (retry_count == 0) { 601 SplitLogCounters.tot_mgr_node_create_err.increment(); 602 createNodeFailure(path); 603 } else { 604 SplitLogCounters.tot_mgr_node_create_retry.increment(); 605 createNode(path, retry_count - 1); 606 } 607 return; 608 } 609 } 610 createNodeSuccess(path); 611 } 612 } 613 614 /** 615 * Asynchronous handler for zk get-data-set-watch on node results. Retries on failures. 616 */ 617 public class GetDataAsyncCallback implements AsyncCallback.DataCallback { 618 private final Logger LOG = LoggerFactory.getLogger(GetDataAsyncCallback.class); 619 620 @Override 621 public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { 622 SplitLogCounters.tot_mgr_get_data_result.increment(); 623 if (rc != 0) { 624 if (needAbandonRetries(rc, "GetData from znode " + path)) { 625 return; 626 } 627 if (rc == KeeperException.Code.NONODE.intValue()) { 628 SplitLogCounters.tot_mgr_get_data_nonode.increment(); 629 LOG.warn("Task znode " + path + " vanished or not created yet."); 630 // ignore since we should not end up in a case where there is in-memory task, 631 // but no znode. The only case is between the time task is created in-memory 632 // and the znode is created. See HBASE-11217. 633 return; 634 } 635 Long retry_count = (Long) ctx; 636 637 if (retry_count < 0) { 638 LOG.warn("Getdata rc=" + KeeperException.Code.get(rc) + " " + path 639 + ". Ignoring error. No error handling. No retrying."); 640 return; 641 } 642 LOG.warn("Getdata rc=" + KeeperException.Code.get(rc) + " " + path 643 + " remaining retries=" + retry_count); 644 if (retry_count == 0) { 645 SplitLogCounters.tot_mgr_get_data_err.increment(); 646 getDataSetWatchFailure(path); 647 } else { 648 SplitLogCounters.tot_mgr_get_data_retry.increment(); 649 getDataSetWatch(path, retry_count - 1); 650 } 651 return; 652 } 653 try { 654 getDataSetWatchSuccess(path, data, stat.getVersion()); 655 } catch (DeserializationException e) { 656 LOG.warn("Deserialization problem", e); 657 } 658 return; 659 } 660 } 661 662 /** 663 * Asynchronous handler for zk delete node results. Retries on failures. 664 */ 665 public class DeleteAsyncCallback implements AsyncCallback.VoidCallback { 666 private final Logger LOG = LoggerFactory.getLogger(DeleteAsyncCallback.class); 667 668 @Override 669 public void processResult(int rc, String path, Object ctx) { 670 SplitLogCounters.tot_mgr_node_delete_result.increment(); 671 if (rc != 0) { 672 if (needAbandonRetries(rc, "Delete znode " + path)) { 673 details.getFailedDeletions().add(path); 674 return; 675 } 676 if (rc != KeeperException.Code.NONODE.intValue()) { 677 SplitLogCounters.tot_mgr_node_delete_err.increment(); 678 Long retry_count = (Long) ctx; 679 LOG.warn("Delete rc=" + KeeperException.Code.get(rc) + " for " + path 680 + " remaining retries=" + retry_count); 681 if (retry_count == 0) { 682 LOG.warn("Delete failed " + path); 683 details.getFailedDeletions().add(path); 684 deleteNodeFailure(path); 685 } else { 686 deleteNode(path, retry_count - 1); 687 } 688 return; 689 } else { 690 LOG.info(path + " does not exist. Either was created but deleted behind our" 691 + " back by another pending delete OR was deleted" 692 + " in earlier retry rounds. zkretries = " + ctx); 693 } 694 } else { 695 LOG.debug("Deleted " + path); 696 } 697 deleteNodeSuccess(path); 698 } 699 } 700 701 /** 702 * Asynchronous handler for zk create RESCAN-node results. Retries on failures. 703 * <p> 704 * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal for all the 705 * {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}s to rescan for new tasks. 706 */ 707 public class CreateRescanAsyncCallback implements AsyncCallback.StringCallback { 708 private final Logger LOG = LoggerFactory.getLogger(CreateRescanAsyncCallback.class); 709 710 @Override 711 public void processResult(int rc, String path, Object ctx, String name) { 712 if (rc != 0) { 713 if (needAbandonRetries(rc, "CreateRescan znode " + path)) { 714 return; 715 } 716 Long retry_count = (Long) ctx; 717 LOG.warn("rc=" + KeeperException.Code.get(rc) + " for " + path + " remaining retries=" 718 + retry_count); 719 if (retry_count == 0) { 720 createRescanFailure(); 721 } else { 722 rescan(retry_count - 1); 723 } 724 return; 725 } 726 // path is the original arg, name is the actual name that was created 727 createRescanSuccess(name); 728 } 729 } 730 731 @Override 732 public void setDetails(SplitLogManagerDetails details) { 733 this.details = details; 734 } 735 736 @Override 737 public SplitLogManagerDetails getDetails() { 738 return details; 739 } 740 741 /** 742 * Temporary function that is used by unit tests only 743 */ 744 public void setIgnoreDeleteForTesting(boolean b) { 745 ignoreZKDeleteForTesting = b; 746 } 747}