001/**
002  * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.coordination;
020
021import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
022import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
023import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
024import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
025import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
026import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
027import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
028
029import java.io.IOException;
030import java.util.List;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.SplitLogCounters;
036import org.apache.hadoop.hbase.SplitLogTask;
037import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination.TaskFinisher.Status;
038import org.apache.hadoop.hbase.exceptions.DeserializationException;
039import org.apache.hadoop.hbase.log.HBaseMarkers;
040import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective;
041import org.apache.hadoop.hbase.master.SplitLogManager.Task;
042import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.apache.hadoop.hbase.wal.WALSplitter;
045import org.apache.hadoop.hbase.zookeeper.ZKListener;
046import org.apache.hadoop.hbase.zookeeper.ZKMetadata;
047import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
048import org.apache.hadoop.hbase.zookeeper.ZKUtil;
049import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
050import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
051import org.apache.hadoop.util.StringUtils;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.apache.zookeeper.AsyncCallback;
054import org.apache.zookeeper.CreateMode;
055import org.apache.zookeeper.KeeperException;
056import org.apache.zookeeper.KeeperException.NoNodeException;
057import org.apache.zookeeper.ZooDefs.Ids;
058import org.apache.zookeeper.data.Stat;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
062
063/**
064 * ZooKeeper based implementation of
065 * {@link SplitLogManagerCoordination}
066 */
067@InterfaceAudience.Private
068public class ZKSplitLogManagerCoordination extends ZKListener implements
069    SplitLogManagerCoordination {
070
071  public static final int DEFAULT_TIMEOUT = 120000;
072  public static final int DEFAULT_ZK_RETRIES = 3;
073  public static final int DEFAULT_MAX_RESUBMIT = 3;
074
075  private static final Logger LOG = LoggerFactory.getLogger(SplitLogManagerCoordination.class);
076
077  private final TaskFinisher taskFinisher;
078  private final Configuration conf;
079
080  private long zkretries;
081  private long resubmitThreshold;
082  private long timeout;
083
084  SplitLogManagerDetails details;
085
086  public boolean ignoreZKDeleteForTesting = false;
087
088  public ZKSplitLogManagerCoordination(Configuration conf, ZKWatcher watcher) {
089    super(watcher);
090    this.conf = conf;
091    taskFinisher = new TaskFinisher() {
092      @Override
093      public Status finish(ServerName workerName, String logfile) {
094        try {
095          WALSplitter.finishSplitLogFile(logfile, conf);
096        } catch (IOException e) {
097          LOG.warn("Could not finish splitting of log file " + logfile, e);
098          return Status.ERR;
099        }
100        return Status.DONE;
101      }
102    };
103  }
104
105  @Override
106  public void init() throws IOException {
107    this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
108    this.resubmitThreshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
109    this.timeout = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, DEFAULT_TIMEOUT);
110    if (this.watcher != null) {
111      this.watcher.registerListener(this);
112      lookForOrphans();
113    }
114  }
115
116  @Override
117  public String prepareTask(String taskname) {
118    return ZKSplitLog.getEncodedNodeName(watcher, taskname);
119  }
120
121  @Override
122  public int remainingTasksInCoordination() {
123    int count = 0;
124    try {
125      List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.znodePaths.splitLogZNode);
126      if (tasks != null) {
127        int listSize = tasks.size();
128        for (int i = 0; i < listSize; i++) {
129          if (!ZKSplitLog.isRescanNode(tasks.get(i))) {
130            count++;
131          }
132        }
133      }
134    } catch (KeeperException ke) {
135      LOG.warn("Failed to check remaining tasks", ke);
136      count = -1;
137    }
138    return count;
139  }
140
141  /**
142   * It is possible for a task to stay in UNASSIGNED state indefinitely - say SplitLogManager wants
143   * to resubmit a task. It forces the task to UNASSIGNED state but it dies before it could create
144   * the RESCAN task node to signal the SplitLogWorkers to pick up the task. To prevent this
145   * scenario the SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup.
146   * @param path
147   */
148  private void handleUnassignedTask(String path) {
149    if (ZKSplitLog.isRescanNode(watcher, path)) {
150      return;
151    }
152    Task task = findOrCreateOrphanTask(path);
153    if (task.isOrphan() && (task.incarnation.get() == 0)) {
154      LOG.info("Resubmitting unassigned orphan task " + path);
155      // ignore failure to resubmit. The timeout-monitor will handle it later
156      // albeit in a more crude fashion
157      resubmitTask(path, task, FORCE);
158    }
159  }
160
161  @Override
162  public void deleteTask(String path) {
163    deleteNode(path, zkretries);
164  }
165
166  @Override
167  public boolean resubmitTask(String path, Task task, ResubmitDirective directive) {
168    // its ok if this thread misses the update to task.deleted. It will fail later
169    if (task.status != IN_PROGRESS) {
170      return false;
171    }
172    int version;
173    if (directive != FORCE) {
174      // We're going to resubmit:
175      // 1) immediately if the worker server is now marked as dead
176      // 2) after a configurable timeout if the server is not marked as dead but has still not
177      // finished the task. This allows to continue if the worker cannot actually handle it,
178      // for any reason.
179      final long time = EnvironmentEdgeManager.currentTime() - task.last_update;
180      final boolean alive =
181          details.getMaster().getServerManager() != null ? details.getMaster().getServerManager()
182              .isServerOnline(task.cur_worker_name) : true;
183      if (alive && time < timeout) {
184        LOG.trace("Skipping the resubmit of " + task.toString() + "  because the server "
185            + task.cur_worker_name + " is not marked as dead, we waited for " + time
186            + " while the timeout is " + timeout);
187        return false;
188      }
189
190      if (task.unforcedResubmits.get() >= resubmitThreshold) {
191        if (!task.resubmitThresholdReached) {
192          task.resubmitThresholdReached = true;
193          SplitLogCounters.tot_mgr_resubmit_threshold_reached.increment();
194          LOG.info("Skipping resubmissions of task " + path + " because threshold "
195              + resubmitThreshold + " reached");
196        }
197        return false;
198      }
199      // race with heartbeat() that might be changing last_version
200      version = task.last_version;
201    } else {
202      SplitLogCounters.tot_mgr_resubmit_force.increment();
203      version = -1;
204    }
205    LOG.info("Resubmitting task " + path);
206    task.incarnation.incrementAndGet();
207    boolean result = resubmit(path, version);
208    if (!result) {
209      task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime());
210      return false;
211    }
212    // don't count forced resubmits
213    if (directive != FORCE) {
214      task.unforcedResubmits.incrementAndGet();
215    }
216    task.setUnassigned();
217    rescan(Long.MAX_VALUE);
218    SplitLogCounters.tot_mgr_resubmit.increment();
219    return true;
220  }
221
222
223  @Override
224  public void checkTasks() {
225    rescan(Long.MAX_VALUE);
226  };
227
228  /**
229   * signal the workers that a task was resubmitted by creating the RESCAN node.
230   */
231  private void rescan(long retries) {
232    // The RESCAN node will be deleted almost immediately by the
233    // SplitLogManager as soon as it is created because it is being
234    // created in the DONE state. This behavior prevents a buildup
235    // of RESCAN nodes. But there is also a chance that a SplitLogWorker
236    // might miss the watch-trigger that creation of RESCAN node provides.
237    // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
238    // therefore this behavior is safe.
239    SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName());
240    this.watcher
241        .getRecoverableZooKeeper()
242        .getZooKeeper()
243        .create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
244          CreateMode.EPHEMERAL_SEQUENTIAL, new CreateRescanAsyncCallback(), Long.valueOf(retries));
245  }
246
247  @Override
248  public void submitTask(String path) {
249    createNode(path, zkretries);
250  }
251
252  @Override
253  public void checkTaskStillAvailable(String path) {
254    // A negative retry count will lead to ignoring all error processing.
255    this.watcher
256        .getRecoverableZooKeeper()
257        .getZooKeeper()
258        .getData(path, this.watcher, new GetDataAsyncCallback(),
259          Long.valueOf(-1) /* retry count */);
260    SplitLogCounters.tot_mgr_get_data_queued.increment();
261  }
262
263  private void deleteNode(String path, Long retries) {
264    SplitLogCounters.tot_mgr_node_delete_queued.increment();
265    // Once a task znode is ready for delete, that is it is in the TASK_DONE
266    // state, then no one should be writing to it anymore. That is no one
267    // will be updating the znode version any more.
268    this.watcher.getRecoverableZooKeeper().getZooKeeper()
269        .delete(path, -1, new DeleteAsyncCallback(), retries);
270  }
271
272  private void deleteNodeSuccess(String path) {
273    if (ignoreZKDeleteForTesting) {
274      return;
275    }
276    Task task;
277    task = details.getTasks().remove(path);
278    if (task == null) {
279      if (ZKSplitLog.isRescanNode(watcher, path)) {
280        SplitLogCounters.tot_mgr_rescan_deleted.increment();
281      }
282      SplitLogCounters.tot_mgr_missing_state_in_delete.increment();
283      LOG.debug("Deleted task without in memory state " + path);
284      return;
285    }
286    synchronized (task) {
287      task.status = DELETED;
288      task.notify();
289    }
290    SplitLogCounters.tot_mgr_task_deleted.increment();
291  }
292
293  private void deleteNodeFailure(String path) {
294    LOG.info("Failed to delete node " + path + " and will retry soon.");
295    return;
296  }
297
298  private void createRescanSuccess(String path) {
299    SplitLogCounters.tot_mgr_rescan.increment();
300    getDataSetWatch(path, zkretries);
301  }
302
303  private void createRescanFailure() {
304    LOG.error(HBaseMarkers.FATAL, "logic failure, rescan failure must not happen");
305  }
306
307  /**
308   * Helper function to check whether to abandon retries in ZooKeeper AsyncCallback functions
309   * @param statusCode integer value of a ZooKeeper exception code
310   * @param action description message about the retried action
311   * @return true when need to abandon retries otherwise false
312   */
313  private boolean needAbandonRetries(int statusCode, String action) {
314    if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) {
315      LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for "
316          + "action=" + action);
317      return true;
318    }
319    return false;
320  }
321
322  private void createNode(String path, Long retry_count) {
323    SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName());
324    ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(),
325      retry_count);
326    SplitLogCounters.tot_mgr_node_create_queued.increment();
327    return;
328  }
329
330  private void createNodeSuccess(String path) {
331    LOG.debug("Put up splitlog task at znode " + path);
332    getDataSetWatch(path, zkretries);
333  }
334
335  private void createNodeFailure(String path) {
336    // TODO the Manager should split the log locally instead of giving up
337    LOG.warn("Failed to create task node " + path);
338    setDone(path, FAILURE);
339  }
340
341  private void getDataSetWatch(String path, Long retry_count) {
342    this.watcher.getRecoverableZooKeeper().getZooKeeper()
343        .getData(path, this.watcher, new GetDataAsyncCallback(), retry_count);
344    SplitLogCounters.tot_mgr_get_data_queued.increment();
345  }
346
347  private void getDataSetWatchSuccess(String path, byte[] data, int version)
348      throws DeserializationException {
349    if (data == null) {
350      if (version == Integer.MIN_VALUE) {
351        // assume all done. The task znode suddenly disappeared.
352        setDone(path, SUCCESS);
353        return;
354      }
355      SplitLogCounters.tot_mgr_null_data.increment();
356      LOG.error(HBaseMarkers.FATAL, "logic error - got null data " + path);
357      setDone(path, FAILURE);
358      return;
359    }
360    data = ZKMetadata.removeMetaData(data);
361    SplitLogTask slt = SplitLogTask.parseFrom(data);
362    if (slt.isUnassigned()) {
363      LOG.debug("Task not yet acquired " + path + ", ver=" + version);
364      handleUnassignedTask(path);
365    } else if (slt.isOwned()) {
366      heartbeat(path, version, slt.getServerName());
367    } else if (slt.isResigned()) {
368      LOG.info("Task " + path + " entered state=" + slt.toString());
369      resubmitOrFail(path, FORCE);
370    } else if (slt.isDone()) {
371      LOG.info("Task " + path + " entered state=" + slt.toString());
372      if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
373        if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) {
374          setDone(path, SUCCESS);
375        } else {
376          resubmitOrFail(path, CHECK);
377        }
378      } else {
379        setDone(path, SUCCESS);
380      }
381    } else if (slt.isErr()) {
382      LOG.info("Task " + path + " entered state=" + slt.toString());
383      resubmitOrFail(path, CHECK);
384    } else {
385      LOG.error(HBaseMarkers.FATAL, "logic error - unexpected zk state for path = "
386          + path + " data = " + slt.toString());
387      setDone(path, FAILURE);
388    }
389  }
390
391  private void resubmitOrFail(String path, ResubmitDirective directive) {
392    if (resubmitTask(path, findOrCreateOrphanTask(path), directive) == false) {
393      setDone(path, FAILURE);
394    }
395  }
396
397  private void getDataSetWatchFailure(String path) {
398    LOG.warn("Failed to set data watch " + path);
399    setDone(path, FAILURE);
400  }
401
402  private void setDone(String path, TerminationStatus status) {
403    Task task = details.getTasks().get(path);
404    if (task == null) {
405      if (!ZKSplitLog.isRescanNode(watcher, path)) {
406        SplitLogCounters.tot_mgr_unacquired_orphan_done.increment();
407        LOG.debug("Unacquired orphan task is done " + path);
408      }
409    } else {
410      synchronized (task) {
411        if (task.status == IN_PROGRESS) {
412          if (status == SUCCESS) {
413            SplitLogCounters.tot_mgr_log_split_success.increment();
414            LOG.info("Done splitting " + path);
415          } else {
416            SplitLogCounters.tot_mgr_log_split_err.increment();
417            LOG.warn("Error splitting " + path);
418          }
419          task.status = status;
420          if (task.batch != null) {
421            synchronized (task.batch) {
422              if (status == SUCCESS) {
423                task.batch.done++;
424              } else {
425                task.batch.error++;
426              }
427              task.batch.notify();
428            }
429          }
430        }
431      }
432    }
433    // delete the task node in zk. It's an async
434    // call and no one is blocked waiting for this node to be deleted. All
435    // task names are unique (log.<timestamp>) there is no risk of deleting
436    // a future task.
437    // if a deletion fails, TimeoutMonitor will retry the same deletion later
438    deleteNode(path, zkretries);
439    return;
440  }
441
442  private Task findOrCreateOrphanTask(String path) {
443    return computeIfAbsent(details.getTasks(), path, Task::new, () -> {
444      LOG.info("Creating orphan task " + path);
445      SplitLogCounters.tot_mgr_orphan_task_acquired.increment();
446    });
447  }
448
449  private void heartbeat(String path, int new_version, ServerName workerName) {
450    Task task = findOrCreateOrphanTask(path);
451    if (new_version != task.last_version) {
452      if (task.isUnassigned()) {
453        LOG.info("Task " + path + " acquired by " + workerName);
454      }
455      task.heartbeat(EnvironmentEdgeManager.currentTime(), new_version, workerName);
456      SplitLogCounters.tot_mgr_heartbeat.increment();
457    } else {
458      // duplicate heartbeats - heartbeats w/o zk node version
459      // changing - are possible. The timeout thread does
460      // getDataSetWatch() just to check whether a node still
461      // exists or not
462    }
463    return;
464  }
465
466  private void lookForOrphans() {
467    List<String> orphans;
468    try {
469      orphans = ZKUtil.listChildrenNoWatch(this.watcher, this.watcher.znodePaths.splitLogZNode);
470      if (orphans == null) {
471        LOG.warn("Could not get children of " + this.watcher.znodePaths.splitLogZNode);
472        return;
473      }
474    } catch (KeeperException e) {
475      LOG.warn("Could not get children of " + this.watcher.znodePaths.splitLogZNode + " "
476          + StringUtils.stringifyException(e));
477      return;
478    }
479    int rescan_nodes = 0;
480    int listSize = orphans.size();
481    for (int i = 0; i < listSize; i++) {
482      String path = orphans.get(i);
483      String nodepath = ZNodePaths.joinZNode(watcher.znodePaths.splitLogZNode, path);
484      if (ZKSplitLog.isRescanNode(watcher, nodepath)) {
485        rescan_nodes++;
486        LOG.debug("Found orphan rescan node " + path);
487      } else {
488        LOG.info("Found orphan task " + path);
489      }
490      getDataSetWatch(nodepath, zkretries);
491    }
492    LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " + rescan_nodes
493        + " rescan nodes");
494  }
495
496  @Override
497  public void nodeDataChanged(String path) {
498    Task task;
499    task = details.getTasks().get(path);
500    if (task != null || ZKSplitLog.isRescanNode(watcher, path)) {
501      if (task != null) {
502        task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime());
503      }
504      getDataSetWatch(path, zkretries);
505    }
506  }
507
508  private boolean resubmit(String path, int version) {
509    try {
510      // blocking zk call but this is done from the timeout thread
511      SplitLogTask slt =
512          new SplitLogTask.Unassigned(this.details.getServerName());
513      if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
514        LOG.debug("Failed to resubmit task " + path + " version changed");
515        return false;
516      }
517    } catch (NoNodeException e) {
518      LOG.warn("Failed to resubmit because znode doesn't exist " + path
519          + " task done (or forced done by removing the znode)");
520      try {
521        getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
522      } catch (DeserializationException e1) {
523        LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1);
524        return false;
525      }
526      return false;
527    } catch (KeeperException.BadVersionException e) {
528      LOG.debug("Failed to resubmit task " + path + " version changed");
529      return false;
530    } catch (KeeperException e) {
531      SplitLogCounters.tot_mgr_resubmit_failed.increment();
532      LOG.warn("Failed to resubmit " + path, e);
533      return false;
534    }
535    return true;
536  }
537
538
539  /**
540   * {@link org.apache.hadoop.hbase.master.SplitLogManager} can use objects implementing this
541   * interface to finish off a partially done task by
542   * {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}. This provides a
543   * serialization point at the end of the task processing. Must be restartable and idempotent.
544   */
545  public interface TaskFinisher {
546    /**
547     * status that can be returned finish()
548     */
549    enum Status {
550      /**
551       * task completed successfully
552       */
553      DONE(),
554      /**
555       * task completed with error
556       */
557      ERR();
558    }
559
560    /**
561     * finish the partially done task. workername provides clue to where the partial results of the
562     * partially done tasks are present. taskname is the name of the task that was put up in
563     * zookeeper.
564     * <p>
565     * @param workerName
566     * @param taskname
567     * @return DONE if task completed successfully, ERR otherwise
568     */
569    Status finish(ServerName workerName, String taskname);
570  }
571
572  /**
573   * Asynchronous handler for zk create node results. Retries on failures.
574   */
575  public class CreateAsyncCallback implements AsyncCallback.StringCallback {
576    private final Logger LOG = LoggerFactory.getLogger(CreateAsyncCallback.class);
577
578    @Override
579    public void processResult(int rc, String path, Object ctx, String name) {
580      SplitLogCounters.tot_mgr_node_create_result.increment();
581      if (rc != 0) {
582        if (needAbandonRetries(rc, "Create znode " + path)) {
583          createNodeFailure(path);
584          return;
585        }
586        if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
587          // What if there is a delete pending against this pre-existing
588          // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE
589          // state. Only operations that will be carried out on this node by
590          // this manager are get-znode-data, task-finisher and delete-znode.
591          // And all code pieces correctly handle the case of suddenly
592          // disappearing task-znode.
593          LOG.debug("Found pre-existing znode " + path);
594          SplitLogCounters.tot_mgr_node_already_exists.increment();
595        } else {
596          Long retry_count = (Long) ctx;
597          LOG.warn("Create rc=" + KeeperException.Code.get(rc) + " for " + path
598              + " remaining retries=" + retry_count);
599          if (retry_count == 0) {
600            SplitLogCounters.tot_mgr_node_create_err.increment();
601            createNodeFailure(path);
602          } else {
603            SplitLogCounters.tot_mgr_node_create_retry.increment();
604            createNode(path, retry_count - 1);
605          }
606          return;
607        }
608      }
609      createNodeSuccess(path);
610    }
611  }
612
613  /**
614   * Asynchronous handler for zk get-data-set-watch on node results. Retries on failures.
615   */
616  public class GetDataAsyncCallback implements AsyncCallback.DataCallback {
617    private final Logger LOG = LoggerFactory.getLogger(GetDataAsyncCallback.class);
618
619    @Override
620    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
621      SplitLogCounters.tot_mgr_get_data_result.increment();
622      if (rc != 0) {
623        if (needAbandonRetries(rc, "GetData from znode " + path)) {
624          return;
625        }
626        if (rc == KeeperException.Code.NONODE.intValue()) {
627          SplitLogCounters.tot_mgr_get_data_nonode.increment();
628          LOG.warn("Task znode " + path + " vanished or not created yet.");
629          // ignore since we should not end up in a case where there is in-memory task,
630          // but no znode. The only case is between the time task is created in-memory
631          // and the znode is created. See HBASE-11217.
632          return;
633        }
634        Long retry_count = (Long) ctx;
635
636        if (retry_count < 0) {
637          LOG.warn("Getdata rc=" + KeeperException.Code.get(rc) + " " + path
638              + ". Ignoring error. No error handling. No retrying.");
639          return;
640        }
641        LOG.warn("Getdata rc=" + KeeperException.Code.get(rc) + " " + path
642            + " remaining retries=" + retry_count);
643        if (retry_count == 0) {
644          SplitLogCounters.tot_mgr_get_data_err.increment();
645          getDataSetWatchFailure(path);
646        } else {
647          SplitLogCounters.tot_mgr_get_data_retry.increment();
648          getDataSetWatch(path, retry_count - 1);
649        }
650        return;
651      }
652      try {
653        getDataSetWatchSuccess(path, data, stat.getVersion());
654      } catch (DeserializationException e) {
655        LOG.warn("Deserialization problem", e);
656      }
657      return;
658    }
659  }
660
661  /**
662   * Asynchronous handler for zk delete node results. Retries on failures.
663   */
664  public class DeleteAsyncCallback implements AsyncCallback.VoidCallback {
665    private final Logger LOG = LoggerFactory.getLogger(DeleteAsyncCallback.class);
666
667    @Override
668    public void processResult(int rc, String path, Object ctx) {
669      SplitLogCounters.tot_mgr_node_delete_result.increment();
670      if (rc != 0) {
671        if (needAbandonRetries(rc, "Delete znode " + path)) {
672          details.getFailedDeletions().add(path);
673          return;
674        }
675        if (rc != KeeperException.Code.NONODE.intValue()) {
676          SplitLogCounters.tot_mgr_node_delete_err.increment();
677          Long retry_count = (Long) ctx;
678          LOG.warn("Delete rc=" + KeeperException.Code.get(rc) + " for " + path
679              + " remaining retries=" + retry_count);
680          if (retry_count == 0) {
681            LOG.warn("Delete failed " + path);
682            details.getFailedDeletions().add(path);
683            deleteNodeFailure(path);
684          } else {
685            deleteNode(path, retry_count - 1);
686          }
687          return;
688        } else {
689          LOG.info(path + " does not exist. Either was created but deleted behind our"
690              + " back by another pending delete OR was deleted"
691              + " in earlier retry rounds. zkretries = " + ctx);
692        }
693      } else {
694        LOG.debug("Deleted " + path);
695      }
696      deleteNodeSuccess(path);
697    }
698  }
699
700  /**
701   * Asynchronous handler for zk create RESCAN-node results. Retries on failures.
702   * <p>
703   * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal for all the
704   * {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}s to rescan for new tasks.
705   */
706  public class CreateRescanAsyncCallback implements AsyncCallback.StringCallback {
707    private final Logger LOG = LoggerFactory.getLogger(CreateRescanAsyncCallback.class);
708
709    @Override
710    public void processResult(int rc, String path, Object ctx, String name) {
711      if (rc != 0) {
712        if (needAbandonRetries(rc, "CreateRescan znode " + path)) {
713          return;
714        }
715        Long retry_count = (Long) ctx;
716        LOG.warn("rc=" + KeeperException.Code.get(rc) + " for " + path + " remaining retries="
717            + retry_count);
718        if (retry_count == 0) {
719          createRescanFailure();
720        } else {
721          rescan(retry_count - 1);
722        }
723        return;
724      }
725      // path is the original arg, name is the actual name that was created
726      createRescanSuccess(name);
727    }
728  }
729
730  @Override
731  public void setDetails(SplitLogManagerDetails details) {
732    this.details = details;
733  }
734
735  @Override
736  public SplitLogManagerDetails getDetails() {
737    return details;
738  }
739
740  /**
741   * Temporary function that is used by unit tests only
742   */
743  @VisibleForTesting
744  public void setIgnoreDeleteForTesting(boolean b) {
745    ignoreZKDeleteForTesting = b;
746  }
747}