001/**
002  * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.coordination;
020
021import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
022import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
023import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
024import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
025import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
026import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
027import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
028
029import java.io.IOException;
030import java.util.List;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.SplitLogCounters;
036import org.apache.hadoop.hbase.SplitLogTask;
037import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination.TaskFinisher.Status;
038import org.apache.hadoop.hbase.exceptions.DeserializationException;
039import org.apache.hadoop.hbase.log.HBaseMarkers;
040import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective;
041import org.apache.hadoop.hbase.master.SplitLogManager.Task;
042import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.apache.hadoop.hbase.wal.WALSplitUtil;
045import org.apache.hadoop.hbase.zookeeper.ZKListener;
046import org.apache.hadoop.hbase.zookeeper.ZKMetadata;
047import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
048import org.apache.hadoop.hbase.zookeeper.ZKUtil;
049import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
050import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
051import org.apache.hadoop.util.StringUtils;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.apache.zookeeper.AsyncCallback;
054import org.apache.zookeeper.CreateMode;
055import org.apache.zookeeper.KeeperException;
056import org.apache.zookeeper.KeeperException.NoNodeException;
057import org.apache.zookeeper.ZooDefs.Ids;
058import org.apache.zookeeper.data.Stat;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
062
063/**
064 * ZooKeeper based implementation of
065 * {@link SplitLogManagerCoordination}
066 */
067@InterfaceAudience.Private
068public class ZKSplitLogManagerCoordination extends ZKListener implements
069    SplitLogManagerCoordination {
070
071  public static final int DEFAULT_TIMEOUT = 120000;
072  public static final int DEFAULT_ZK_RETRIES = 3;
073  public static final int DEFAULT_MAX_RESUBMIT = 3;
074
075  private static final Logger LOG = LoggerFactory.getLogger(SplitLogManagerCoordination.class);
076
077  private final TaskFinisher taskFinisher;
078  private final Configuration conf;
079
080  private long zkretries;
081  private long resubmitThreshold;
082  private long timeout;
083
084  SplitLogManagerDetails details;
085
086  public boolean ignoreZKDeleteForTesting = false;
087
088  public ZKSplitLogManagerCoordination(Configuration conf, ZKWatcher watcher) {
089    super(watcher);
090    this.conf = conf;
091    taskFinisher = new TaskFinisher() {
092      @Override
093      public Status finish(ServerName workerName, String logfile) {
094        try {
095          WALSplitUtil.finishSplitLogFile(logfile, conf);
096        } catch (IOException e) {
097          LOG.warn("Could not finish splitting of log file " + logfile, e);
098          return Status.ERR;
099        }
100        return Status.DONE;
101      }
102    };
103  }
104
105  @Override
106  public void init() throws IOException {
107    this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
108    this.resubmitThreshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
109    this.timeout = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, DEFAULT_TIMEOUT);
110    if (this.watcher != null) {
111      this.watcher.registerListener(this);
112      lookForOrphans();
113    }
114  }
115
116  @Override
117  public String prepareTask(String taskname) {
118    return ZKSplitLog.getEncodedNodeName(watcher, taskname);
119  }
120
121  @Override
122  public int remainingTasksInCoordination() {
123    int count = 0;
124    try {
125      List<String> tasks = ZKUtil.listChildrenNoWatch(watcher,
126              watcher.getZNodePaths().splitLogZNode);
127      if (tasks != null) {
128        int listSize = tasks.size();
129        for (int i = 0; i < listSize; i++) {
130          if (!ZKSplitLog.isRescanNode(tasks.get(i))) {
131            count++;
132          }
133        }
134      }
135    } catch (KeeperException ke) {
136      LOG.warn("Failed to check remaining tasks", ke);
137      count = -1;
138    }
139    return count;
140  }
141
142  /**
143   * It is possible for a task to stay in UNASSIGNED state indefinitely - say SplitLogManager wants
144   * to resubmit a task. It forces the task to UNASSIGNED state but it dies before it could create
145   * the RESCAN task node to signal the SplitLogWorkers to pick up the task. To prevent this
146   * scenario the SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup.
147   * @param path
148   */
149  private void handleUnassignedTask(String path) {
150    if (ZKSplitLog.isRescanNode(watcher, path)) {
151      return;
152    }
153    Task task = findOrCreateOrphanTask(path);
154    if (task.isOrphan() && (task.incarnation.get() == 0)) {
155      LOG.info("Resubmitting unassigned orphan task " + path);
156      // ignore failure to resubmit. The timeout-monitor will handle it later
157      // albeit in a more crude fashion
158      resubmitTask(path, task, FORCE);
159    }
160  }
161
162  @Override
163  public void deleteTask(String path) {
164    deleteNode(path, zkretries);
165  }
166
167  @Override
168  public boolean resubmitTask(String path, Task task, ResubmitDirective directive) {
169    // its ok if this thread misses the update to task.deleted. It will fail later
170    if (task.status != IN_PROGRESS) {
171      return false;
172    }
173    int version;
174    if (directive != FORCE) {
175      // We're going to resubmit:
176      // 1) immediately if the worker server is now marked as dead
177      // 2) after a configurable timeout if the server is not marked as dead but has still not
178      // finished the task. This allows to continue if the worker cannot actually handle it,
179      // for any reason.
180      final long time = EnvironmentEdgeManager.currentTime() - task.last_update;
181      final boolean alive =
182          details.getMaster().getServerManager() != null ? details.getMaster().getServerManager()
183              .isServerOnline(task.cur_worker_name) : true;
184      if (alive && time < timeout) {
185        LOG.trace("Skipping the resubmit of " + task.toString() + "  because the server "
186            + task.cur_worker_name + " is not marked as dead, we waited for " + time
187            + " while the timeout is " + timeout);
188        return false;
189      }
190
191      if (task.unforcedResubmits.get() >= resubmitThreshold) {
192        if (!task.resubmitThresholdReached) {
193          task.resubmitThresholdReached = true;
194          SplitLogCounters.tot_mgr_resubmit_threshold_reached.increment();
195          LOG.info("Skipping resubmissions of task " + path + " because threshold "
196              + resubmitThreshold + " reached");
197        }
198        return false;
199      }
200      // race with heartbeat() that might be changing last_version
201      version = task.last_version;
202    } else {
203      SplitLogCounters.tot_mgr_resubmit_force.increment();
204      version = -1;
205    }
206    LOG.info("Resubmitting task " + path);
207    task.incarnation.incrementAndGet();
208    boolean result = resubmit(path, version);
209    if (!result) {
210      task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime());
211      return false;
212    }
213    // don't count forced resubmits
214    if (directive != FORCE) {
215      task.unforcedResubmits.incrementAndGet();
216    }
217    task.setUnassigned();
218    rescan(Long.MAX_VALUE);
219    SplitLogCounters.tot_mgr_resubmit.increment();
220    return true;
221  }
222
223
224  @Override
225  public void checkTasks() {
226    rescan(Long.MAX_VALUE);
227  };
228
229  /**
230   * signal the workers that a task was resubmitted by creating the RESCAN node.
231   */
232  private void rescan(long retries) {
233    // The RESCAN node will be deleted almost immediately by the
234    // SplitLogManager as soon as it is created because it is being
235    // created in the DONE state. This behavior prevents a buildup
236    // of RESCAN nodes. But there is also a chance that a SplitLogWorker
237    // might miss the watch-trigger that creation of RESCAN node provides.
238    // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
239    // therefore this behavior is safe.
240    SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName());
241    this.watcher
242        .getRecoverableZooKeeper()
243        .getZooKeeper()
244        .create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
245          CreateMode.EPHEMERAL_SEQUENTIAL, new CreateRescanAsyncCallback(), Long.valueOf(retries));
246  }
247
248  @Override
249  public void submitTask(String path) {
250    createNode(path, zkretries);
251  }
252
253  @Override
254  public void checkTaskStillAvailable(String path) {
255    // A negative retry count will lead to ignoring all error processing.
256    this.watcher
257        .getRecoverableZooKeeper()
258        .getZooKeeper()
259        .getData(path, this.watcher, new GetDataAsyncCallback(),
260          Long.valueOf(-1) /* retry count */);
261    SplitLogCounters.tot_mgr_get_data_queued.increment();
262  }
263
264  private void deleteNode(String path, Long retries) {
265    SplitLogCounters.tot_mgr_node_delete_queued.increment();
266    // Once a task znode is ready for delete, that is it is in the TASK_DONE
267    // state, then no one should be writing to it anymore. That is no one
268    // will be updating the znode version any more.
269    this.watcher.getRecoverableZooKeeper().getZooKeeper()
270        .delete(path, -1, new DeleteAsyncCallback(), retries);
271  }
272
273  private void deleteNodeSuccess(String path) {
274    if (ignoreZKDeleteForTesting) {
275      return;
276    }
277    Task task;
278    task = details.getTasks().remove(path);
279    if (task == null) {
280      if (ZKSplitLog.isRescanNode(watcher, path)) {
281        SplitLogCounters.tot_mgr_rescan_deleted.increment();
282      }
283      SplitLogCounters.tot_mgr_missing_state_in_delete.increment();
284      LOG.debug("Deleted task without in memory state " + path);
285      return;
286    }
287    synchronized (task) {
288      task.status = DELETED;
289      task.notify();
290    }
291    SplitLogCounters.tot_mgr_task_deleted.increment();
292  }
293
294  private void deleteNodeFailure(String path) {
295    LOG.info("Failed to delete node " + path + " and will retry soon.");
296    return;
297  }
298
299  private void createRescanSuccess(String path) {
300    SplitLogCounters.tot_mgr_rescan.increment();
301    getDataSetWatch(path, zkretries);
302  }
303
304  private void createRescanFailure() {
305    LOG.error(HBaseMarkers.FATAL, "logic failure, rescan failure must not happen");
306  }
307
308  /**
309   * Helper function to check whether to abandon retries in ZooKeeper AsyncCallback functions
310   * @param statusCode integer value of a ZooKeeper exception code
311   * @param action description message about the retried action
312   * @return true when need to abandon retries otherwise false
313   */
314  private boolean needAbandonRetries(int statusCode, String action) {
315    if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) {
316      LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for "
317          + "action=" + action);
318      return true;
319    }
320    return false;
321  }
322
323  private void createNode(String path, Long retry_count) {
324    SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName());
325    ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(),
326      retry_count);
327    SplitLogCounters.tot_mgr_node_create_queued.increment();
328    return;
329  }
330
331  private void createNodeSuccess(String path) {
332    LOG.debug("Put up splitlog task at znode " + path);
333    getDataSetWatch(path, zkretries);
334  }
335
336  private void createNodeFailure(String path) {
337    // TODO the Manager should split the log locally instead of giving up
338    LOG.warn("Failed to create task node " + path);
339    setDone(path, FAILURE);
340  }
341
342  private void getDataSetWatch(String path, Long retry_count) {
343    this.watcher.getRecoverableZooKeeper().getZooKeeper()
344        .getData(path, this.watcher, new GetDataAsyncCallback(), retry_count);
345    SplitLogCounters.tot_mgr_get_data_queued.increment();
346  }
347
348  private void getDataSetWatchSuccess(String path, byte[] data, int version)
349      throws DeserializationException {
350    if (data == null) {
351      if (version == Integer.MIN_VALUE) {
352        // assume all done. The task znode suddenly disappeared.
353        setDone(path, SUCCESS);
354        return;
355      }
356      SplitLogCounters.tot_mgr_null_data.increment();
357      LOG.error(HBaseMarkers.FATAL, "logic error - got null data " + path);
358      setDone(path, FAILURE);
359      return;
360    }
361    data = ZKMetadata.removeMetaData(data);
362    SplitLogTask slt = SplitLogTask.parseFrom(data);
363    if (slt.isUnassigned()) {
364      LOG.debug("Task not yet acquired " + path + ", ver=" + version);
365      handleUnassignedTask(path);
366    } else if (slt.isOwned()) {
367      heartbeat(path, version, slt.getServerName());
368    } else if (slt.isResigned()) {
369      LOG.info("Task " + path + " entered state=" + slt.toString());
370      resubmitOrFail(path, FORCE);
371    } else if (slt.isDone()) {
372      LOG.info("Task " + path + " entered state=" + slt.toString());
373      if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
374        if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) {
375          setDone(path, SUCCESS);
376        } else {
377          resubmitOrFail(path, CHECK);
378        }
379      } else {
380        setDone(path, SUCCESS);
381      }
382    } else if (slt.isErr()) {
383      LOG.info("Task " + path + " entered state=" + slt.toString());
384      resubmitOrFail(path, CHECK);
385    } else {
386      LOG.error(HBaseMarkers.FATAL, "logic error - unexpected zk state for path = "
387          + path + " data = " + slt.toString());
388      setDone(path, FAILURE);
389    }
390  }
391
392  private void resubmitOrFail(String path, ResubmitDirective directive) {
393    if (resubmitTask(path, findOrCreateOrphanTask(path), directive) == false) {
394      setDone(path, FAILURE);
395    }
396  }
397
398  private void getDataSetWatchFailure(String path) {
399    LOG.warn("Failed to set data watch " + path);
400    setDone(path, FAILURE);
401  }
402
403  private void setDone(String path, TerminationStatus status) {
404    Task task = details.getTasks().get(path);
405    if (task == null) {
406      if (!ZKSplitLog.isRescanNode(watcher, path)) {
407        SplitLogCounters.tot_mgr_unacquired_orphan_done.increment();
408        LOG.debug("Unacquired orphan task is done " + path);
409      }
410    } else {
411      synchronized (task) {
412        if (task.status == IN_PROGRESS) {
413          if (status == SUCCESS) {
414            SplitLogCounters.tot_mgr_log_split_success.increment();
415            LOG.info("Done splitting " + path);
416          } else {
417            SplitLogCounters.tot_mgr_log_split_err.increment();
418            LOG.warn("Error splitting " + path);
419          }
420          task.status = status;
421          if (task.batch != null) {
422            synchronized (task.batch) {
423              if (status == SUCCESS) {
424                task.batch.done++;
425              } else {
426                task.batch.error++;
427              }
428              task.batch.notify();
429            }
430          }
431        }
432      }
433    }
434    // delete the task node in zk. It's an async
435    // call and no one is blocked waiting for this node to be deleted. All
436    // task names are unique (log.<timestamp>) there is no risk of deleting
437    // a future task.
438    // if a deletion fails, TimeoutMonitor will retry the same deletion later
439    deleteNode(path, zkretries);
440    return;
441  }
442
443  private Task findOrCreateOrphanTask(String path) {
444    return computeIfAbsent(details.getTasks(), path, Task::new, () -> {
445      LOG.info("Creating orphan task " + path);
446      SplitLogCounters.tot_mgr_orphan_task_acquired.increment();
447    });
448  }
449
450  private void heartbeat(String path, int new_version, ServerName workerName) {
451    Task task = findOrCreateOrphanTask(path);
452    if (new_version != task.last_version) {
453      if (task.isUnassigned()) {
454        LOG.info("Task " + path + " acquired by " + workerName);
455      }
456      task.heartbeat(EnvironmentEdgeManager.currentTime(), new_version, workerName);
457      SplitLogCounters.tot_mgr_heartbeat.increment();
458    } else {
459      // duplicate heartbeats - heartbeats w/o zk node version
460      // changing - are possible. The timeout thread does
461      // getDataSetWatch() just to check whether a node still
462      // exists or not
463    }
464    return;
465  }
466
467  private void lookForOrphans() {
468    List<String> orphans;
469    try {
470      orphans = ZKUtil.listChildrenNoWatch(this.watcher,
471              this.watcher.getZNodePaths().splitLogZNode);
472      if (orphans == null) {
473        LOG.warn("Could not get children of " + this.watcher.getZNodePaths().splitLogZNode);
474        return;
475      }
476    } catch (KeeperException e) {
477      LOG.warn("Could not get children of " + this.watcher.getZNodePaths().splitLogZNode + " "
478          + StringUtils.stringifyException(e));
479      return;
480    }
481    int rescan_nodes = 0;
482    int listSize = orphans.size();
483    for (int i = 0; i < listSize; i++) {
484      String path = orphans.get(i);
485      String nodepath = ZNodePaths.joinZNode(watcher.getZNodePaths().splitLogZNode, path);
486      if (ZKSplitLog.isRescanNode(watcher, nodepath)) {
487        rescan_nodes++;
488        LOG.debug("Found orphan rescan node " + path);
489      } else {
490        LOG.info("Found orphan task " + path);
491      }
492      getDataSetWatch(nodepath, zkretries);
493    }
494    LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " + rescan_nodes
495        + " rescan nodes");
496  }
497
498  @Override
499  public void nodeDataChanged(String path) {
500    Task task;
501    task = details.getTasks().get(path);
502    if (task != null || ZKSplitLog.isRescanNode(watcher, path)) {
503      if (task != null) {
504        task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime());
505      }
506      getDataSetWatch(path, zkretries);
507    }
508  }
509
510  private boolean resubmit(String path, int version) {
511    try {
512      // blocking zk call but this is done from the timeout thread
513      SplitLogTask slt =
514          new SplitLogTask.Unassigned(this.details.getServerName());
515      if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
516        LOG.debug("Failed to resubmit task " + path + " version changed");
517        return false;
518      }
519    } catch (NoNodeException e) {
520      LOG.warn("Failed to resubmit because znode doesn't exist " + path
521          + " task done (or forced done by removing the znode)");
522      try {
523        getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
524      } catch (DeserializationException e1) {
525        LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1);
526        return false;
527      }
528      return false;
529    } catch (KeeperException.BadVersionException e) {
530      LOG.debug("Failed to resubmit task " + path + " version changed");
531      return false;
532    } catch (KeeperException e) {
533      SplitLogCounters.tot_mgr_resubmit_failed.increment();
534      LOG.warn("Failed to resubmit " + path, e);
535      return false;
536    }
537    return true;
538  }
539
540
541  /**
542   * {@link org.apache.hadoop.hbase.master.SplitLogManager} can use objects implementing this
543   * interface to finish off a partially done task by
544   * {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}. This provides a
545   * serialization point at the end of the task processing. Must be restartable and idempotent.
546   */
547  public interface TaskFinisher {
548    /**
549     * status that can be returned finish()
550     */
551    enum Status {
552      /**
553       * task completed successfully
554       */
555      DONE(),
556      /**
557       * task completed with error
558       */
559      ERR();
560    }
561
562    /**
563     * finish the partially done task. workername provides clue to where the partial results of the
564     * partially done tasks are present. taskname is the name of the task that was put up in
565     * zookeeper.
566     * <p>
567     * @param workerName
568     * @param taskname
569     * @return DONE if task completed successfully, ERR otherwise
570     */
571    Status finish(ServerName workerName, String taskname);
572  }
573
574  /**
575   * Asynchronous handler for zk create node results. Retries on failures.
576   */
577  public class CreateAsyncCallback implements AsyncCallback.StringCallback {
578    private final Logger LOG = LoggerFactory.getLogger(CreateAsyncCallback.class);
579
580    @Override
581    public void processResult(int rc, String path, Object ctx, String name) {
582      SplitLogCounters.tot_mgr_node_create_result.increment();
583      if (rc != 0) {
584        if (needAbandonRetries(rc, "Create znode " + path)) {
585          createNodeFailure(path);
586          return;
587        }
588        if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
589          // What if there is a delete pending against this pre-existing
590          // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE
591          // state. Only operations that will be carried out on this node by
592          // this manager are get-znode-data, task-finisher and delete-znode.
593          // And all code pieces correctly handle the case of suddenly
594          // disappearing task-znode.
595          LOG.debug("Found pre-existing znode " + path);
596          SplitLogCounters.tot_mgr_node_already_exists.increment();
597        } else {
598          Long retry_count = (Long) ctx;
599          LOG.warn("Create rc=" + KeeperException.Code.get(rc) + " for " + path
600              + " remaining retries=" + retry_count);
601          if (retry_count == 0) {
602            SplitLogCounters.tot_mgr_node_create_err.increment();
603            createNodeFailure(path);
604          } else {
605            SplitLogCounters.tot_mgr_node_create_retry.increment();
606            createNode(path, retry_count - 1);
607          }
608          return;
609        }
610      }
611      createNodeSuccess(path);
612    }
613  }
614
615  /**
616   * Asynchronous handler for zk get-data-set-watch on node results. Retries on failures.
617   */
618  public class GetDataAsyncCallback implements AsyncCallback.DataCallback {
619    private final Logger LOG = LoggerFactory.getLogger(GetDataAsyncCallback.class);
620
621    @Override
622    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
623      SplitLogCounters.tot_mgr_get_data_result.increment();
624      if (rc != 0) {
625        if (needAbandonRetries(rc, "GetData from znode " + path)) {
626          return;
627        }
628        if (rc == KeeperException.Code.NONODE.intValue()) {
629          SplitLogCounters.tot_mgr_get_data_nonode.increment();
630          LOG.warn("Task znode " + path + " vanished or not created yet.");
631          // ignore since we should not end up in a case where there is in-memory task,
632          // but no znode. The only case is between the time task is created in-memory
633          // and the znode is created. See HBASE-11217.
634          return;
635        }
636        Long retry_count = (Long) ctx;
637
638        if (retry_count < 0) {
639          LOG.warn("Getdata rc=" + KeeperException.Code.get(rc) + " " + path
640              + ". Ignoring error. No error handling. No retrying.");
641          return;
642        }
643        LOG.warn("Getdata rc=" + KeeperException.Code.get(rc) + " " + path
644            + " remaining retries=" + retry_count);
645        if (retry_count == 0) {
646          SplitLogCounters.tot_mgr_get_data_err.increment();
647          getDataSetWatchFailure(path);
648        } else {
649          SplitLogCounters.tot_mgr_get_data_retry.increment();
650          getDataSetWatch(path, retry_count - 1);
651        }
652        return;
653      }
654      try {
655        getDataSetWatchSuccess(path, data, stat.getVersion());
656      } catch (DeserializationException e) {
657        LOG.warn("Deserialization problem", e);
658      }
659      return;
660    }
661  }
662
663  /**
664   * Asynchronous handler for zk delete node results. Retries on failures.
665   */
666  public class DeleteAsyncCallback implements AsyncCallback.VoidCallback {
667    private final Logger LOG = LoggerFactory.getLogger(DeleteAsyncCallback.class);
668
669    @Override
670    public void processResult(int rc, String path, Object ctx) {
671      SplitLogCounters.tot_mgr_node_delete_result.increment();
672      if (rc != 0) {
673        if (needAbandonRetries(rc, "Delete znode " + path)) {
674          details.getFailedDeletions().add(path);
675          return;
676        }
677        if (rc != KeeperException.Code.NONODE.intValue()) {
678          SplitLogCounters.tot_mgr_node_delete_err.increment();
679          Long retry_count = (Long) ctx;
680          LOG.warn("Delete rc=" + KeeperException.Code.get(rc) + " for " + path
681              + " remaining retries=" + retry_count);
682          if (retry_count == 0) {
683            LOG.warn("Delete failed " + path);
684            details.getFailedDeletions().add(path);
685            deleteNodeFailure(path);
686          } else {
687            deleteNode(path, retry_count - 1);
688          }
689          return;
690        } else {
691          LOG.info(path + " does not exist. Either was created but deleted behind our"
692              + " back by another pending delete OR was deleted"
693              + " in earlier retry rounds. zkretries = " + ctx);
694        }
695      } else {
696        LOG.debug("Deleted " + path);
697      }
698      deleteNodeSuccess(path);
699    }
700  }
701
702  /**
703   * Asynchronous handler for zk create RESCAN-node results. Retries on failures.
704   * <p>
705   * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal for all the
706   * {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}s to rescan for new tasks.
707   */
708  public class CreateRescanAsyncCallback implements AsyncCallback.StringCallback {
709    private final Logger LOG = LoggerFactory.getLogger(CreateRescanAsyncCallback.class);
710
711    @Override
712    public void processResult(int rc, String path, Object ctx, String name) {
713      if (rc != 0) {
714        if (needAbandonRetries(rc, "CreateRescan znode " + path)) {
715          return;
716        }
717        Long retry_count = (Long) ctx;
718        LOG.warn("rc=" + KeeperException.Code.get(rc) + " for " + path + " remaining retries="
719            + retry_count);
720        if (retry_count == 0) {
721          createRescanFailure();
722        } else {
723          rescan(retry_count - 1);
724        }
725        return;
726      }
727      // path is the original arg, name is the actual name that was created
728      createRescanSuccess(name);
729    }
730  }
731
732  @Override
733  public void setDetails(SplitLogManagerDetails details) {
734    this.details = details;
735  }
736
737  @Override
738  public SplitLogManagerDetails getDetails() {
739    return details;
740  }
741
742  /**
743   * Temporary function that is used by unit tests only
744   */
745  @VisibleForTesting
746  public void setIgnoreDeleteForTesting(boolean b) {
747    ignoreZKDeleteForTesting = b;
748  }
749}