001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coordination;
019
020import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
021import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
022import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
023import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
024import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
025import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
026import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
027
028import java.io.IOException;
029import java.util.List;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.SplitLogCounters;
034import org.apache.hadoop.hbase.SplitLogTask;
035import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination.TaskFinisher.Status;
036import org.apache.hadoop.hbase.exceptions.DeserializationException;
037import org.apache.hadoop.hbase.log.HBaseMarkers;
038import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective;
039import org.apache.hadoop.hbase.master.SplitLogManager.Task;
040import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus;
041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
042import org.apache.hadoop.hbase.wal.WALSplitUtil;
043import org.apache.hadoop.hbase.zookeeper.ZKListener;
044import org.apache.hadoop.hbase.zookeeper.ZKMetadata;
045import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
046import org.apache.hadoop.hbase.zookeeper.ZKUtil;
047import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
048import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
049import org.apache.hadoop.util.StringUtils;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.apache.zookeeper.AsyncCallback;
052import org.apache.zookeeper.CreateMode;
053import org.apache.zookeeper.KeeperException;
054import org.apache.zookeeper.KeeperException.NoNodeException;
055import org.apache.zookeeper.ZooDefs.Ids;
056import org.apache.zookeeper.data.Stat;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060/**
061 * ZooKeeper based implementation of {@link SplitLogManagerCoordination}
062 */
063@InterfaceAudience.Private
064public class ZKSplitLogManagerCoordination extends ZKListener
065  implements SplitLogManagerCoordination {
066
067  public static final int DEFAULT_TIMEOUT = 120000;
068  public static final int DEFAULT_ZK_RETRIES = 3;
069  public static final int DEFAULT_MAX_RESUBMIT = 3;
070
071  private static final Logger LOG = LoggerFactory.getLogger(SplitLogManagerCoordination.class);
072
073  private final TaskFinisher taskFinisher;
074  private final Configuration conf;
075
076  private long zkretries;
077  private long resubmitThreshold;
078  private long timeout;
079
080  SplitLogManagerDetails details;
081
082  public boolean ignoreZKDeleteForTesting = false;
083
084  public ZKSplitLogManagerCoordination(Configuration conf, ZKWatcher watcher) {
085    super(watcher);
086    this.conf = conf;
087    taskFinisher = new TaskFinisher() {
088      @Override
089      public Status finish(ServerName workerName, String logfile) {
090        try {
091          WALSplitUtil.finishSplitLogFile(logfile, conf);
092        } catch (IOException e) {
093          LOG.warn("Could not finish splitting of log file " + logfile, e);
094          return Status.ERR;
095        }
096        return Status.DONE;
097      }
098    };
099  }
100
101  @Override
102  public void init() throws IOException {
103    this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
104    this.resubmitThreshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
105    this.timeout = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, DEFAULT_TIMEOUT);
106    if (this.watcher != null) {
107      this.watcher.registerListener(this);
108      lookForOrphans();
109    }
110  }
111
112  @Override
113  public String prepareTask(String taskname) {
114    return ZKSplitLog.getEncodedNodeName(watcher, taskname);
115  }
116
117  @Override
118  public int remainingTasksInCoordination() {
119    int count = 0;
120    try {
121      List<String> tasks =
122        ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().splitLogZNode);
123      if (tasks != null) {
124        int listSize = tasks.size();
125        for (int i = 0; i < listSize; i++) {
126          if (!ZKSplitLog.isRescanNode(tasks.get(i))) {
127            count++;
128          }
129        }
130      }
131    } catch (KeeperException ke) {
132      LOG.warn("Failed to check remaining tasks", ke);
133      count = -1;
134    }
135    return count;
136  }
137
138  /**
139   * It is possible for a task to stay in UNASSIGNED state indefinitely - say SplitLogManager wants
140   * to resubmit a task. It forces the task to UNASSIGNED state but it dies before it could create
141   * the RESCAN task node to signal the SplitLogWorkers to pick up the task. To prevent this
142   * scenario the SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup. n
143   */
144  private void handleUnassignedTask(String path) {
145    if (ZKSplitLog.isRescanNode(watcher, path)) {
146      return;
147    }
148    Task task = findOrCreateOrphanTask(path);
149    if (task.isOrphan() && (task.incarnation.get() == 0)) {
150      LOG.info("Resubmitting unassigned orphan task " + path);
151      // ignore failure to resubmit. The timeout-monitor will handle it later
152      // albeit in a more crude fashion
153      resubmitTask(path, task, FORCE);
154    }
155  }
156
157  @Override
158  public void deleteTask(String path) {
159    deleteNode(path, zkretries);
160  }
161
162  @Override
163  public boolean resubmitTask(String path, Task task, ResubmitDirective directive) {
164    // its ok if this thread misses the update to task.deleted. It will fail later
165    if (task.status != IN_PROGRESS) {
166      return false;
167    }
168    int version;
169    if (directive != FORCE) {
170      // We're going to resubmit:
171      // 1) immediately if the worker server is now marked as dead
172      // 2) after a configurable timeout if the server is not marked as dead but has still not
173      // finished the task. This allows to continue if the worker cannot actually handle it,
174      // for any reason.
175      final long time = EnvironmentEdgeManager.currentTime() - task.last_update;
176      final boolean alive = details.getMaster().getServerManager() != null
177        ? details.getMaster().getServerManager().isServerOnline(task.cur_worker_name)
178        : true;
179      if (alive && time < timeout) {
180        LOG.trace("Skipping the resubmit of " + task.toString() + "  because the server "
181          + task.cur_worker_name + " is not marked as dead, we waited for " + time
182          + " while the timeout is " + timeout);
183        return false;
184      }
185
186      if (task.unforcedResubmits.get() >= resubmitThreshold) {
187        if (!task.resubmitThresholdReached) {
188          task.resubmitThresholdReached = true;
189          SplitLogCounters.tot_mgr_resubmit_threshold_reached.increment();
190          LOG.info("Skipping resubmissions of task " + path + " because threshold "
191            + resubmitThreshold + " reached");
192        }
193        return false;
194      }
195      // race with heartbeat() that might be changing last_version
196      version = task.last_version;
197    } else {
198      SplitLogCounters.tot_mgr_resubmit_force.increment();
199      version = -1;
200    }
201    LOG.info("Resubmitting task " + path);
202    task.incarnation.incrementAndGet();
203    boolean result = resubmit(path, version);
204    if (!result) {
205      task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime());
206      return false;
207    }
208    // don't count forced resubmits
209    if (directive != FORCE) {
210      task.unforcedResubmits.incrementAndGet();
211    }
212    task.setUnassigned();
213    rescan(Long.MAX_VALUE);
214    SplitLogCounters.tot_mgr_resubmit.increment();
215    return true;
216  }
217
218  @Override
219  public void checkTasks() {
220    rescan(Long.MAX_VALUE);
221  };
222
223  /**
224   * signal the workers that a task was resubmitted by creating the RESCAN node.
225   */
226  private void rescan(long retries) {
227    // The RESCAN node will be deleted almost immediately by the
228    // SplitLogManager as soon as it is created because it is being
229    // created in the DONE state. This behavior prevents a buildup
230    // of RESCAN nodes. But there is also a chance that a SplitLogWorker
231    // might miss the watch-trigger that creation of RESCAN node provides.
232    // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
233    // therefore this behavior is safe.
234    SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName());
235    this.watcher.getRecoverableZooKeeper().getZooKeeper().create(ZKSplitLog.getRescanNode(watcher),
236      slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
237      new CreateRescanAsyncCallback(), Long.valueOf(retries));
238  }
239
240  @Override
241  public void submitTask(String path) {
242    createNode(path, zkretries);
243  }
244
245  @Override
246  public void checkTaskStillAvailable(String path) {
247    // A negative retry count will lead to ignoring all error processing.
248    this.watcher.getRecoverableZooKeeper().getZooKeeper().getData(path, this.watcher,
249      new GetDataAsyncCallback(), Long.valueOf(-1) /* retry count */);
250    SplitLogCounters.tot_mgr_get_data_queued.increment();
251  }
252
253  private void deleteNode(String path, Long retries) {
254    SplitLogCounters.tot_mgr_node_delete_queued.increment();
255    // Once a task znode is ready for delete, that is it is in the TASK_DONE
256    // state, then no one should be writing to it anymore. That is no one
257    // will be updating the znode version any more.
258    this.watcher.getRecoverableZooKeeper().getZooKeeper().delete(path, -1,
259      new DeleteAsyncCallback(), retries);
260  }
261
262  private void deleteNodeSuccess(String path) {
263    if (ignoreZKDeleteForTesting) {
264      return;
265    }
266    Task task;
267    task = details.getTasks().remove(path);
268    if (task == null) {
269      if (ZKSplitLog.isRescanNode(watcher, path)) {
270        SplitLogCounters.tot_mgr_rescan_deleted.increment();
271      }
272      SplitLogCounters.tot_mgr_missing_state_in_delete.increment();
273      LOG.debug("Deleted task without in memory state " + path);
274      return;
275    }
276    synchronized (task) {
277      task.status = DELETED;
278      task.notify();
279    }
280    SplitLogCounters.tot_mgr_task_deleted.increment();
281  }
282
283  private void deleteNodeFailure(String path) {
284    LOG.info("Failed to delete node " + path + " and will retry soon.");
285    return;
286  }
287
288  private void createRescanSuccess(String path) {
289    SplitLogCounters.tot_mgr_rescan.increment();
290    getDataSetWatch(path, zkretries);
291  }
292
293  private void createRescanFailure() {
294    LOG.error(HBaseMarkers.FATAL, "logic failure, rescan failure must not happen");
295  }
296
297  /**
298   * Helper function to check whether to abandon retries in ZooKeeper AsyncCallback functions
299   * @param statusCode integer value of a ZooKeeper exception code
300   * @param action     description message about the retried action
301   * @return true when need to abandon retries otherwise false
302   */
303  private boolean needAbandonRetries(int statusCode, String action) {
304    if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) {
305      LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for "
306        + "action=" + action);
307      return true;
308    }
309    return false;
310  }
311
312  private void createNode(String path, Long retry_count) {
313    SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName());
314    ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(),
315      retry_count);
316    SplitLogCounters.tot_mgr_node_create_queued.increment();
317    return;
318  }
319
320  private void createNodeSuccess(String path) {
321    LOG.debug("Put up splitlog task at znode " + path);
322    getDataSetWatch(path, zkretries);
323  }
324
325  private void createNodeFailure(String path) {
326    // TODO the Manager should split the log locally instead of giving up
327    LOG.warn("Failed to create task node " + path);
328    setDone(path, FAILURE);
329  }
330
331  private void getDataSetWatch(String path, Long retry_count) {
332    this.watcher.getRecoverableZooKeeper().getZooKeeper().getData(path, this.watcher,
333      new GetDataAsyncCallback(), retry_count);
334    SplitLogCounters.tot_mgr_get_data_queued.increment();
335  }
336
337  private void getDataSetWatchSuccess(String path, byte[] data, int version)
338    throws DeserializationException {
339    if (data == null) {
340      if (version == Integer.MIN_VALUE) {
341        // assume all done. The task znode suddenly disappeared.
342        setDone(path, SUCCESS);
343        return;
344      }
345      SplitLogCounters.tot_mgr_null_data.increment();
346      LOG.error(HBaseMarkers.FATAL, "logic error - got null data " + path);
347      setDone(path, FAILURE);
348      return;
349    }
350    data = ZKMetadata.removeMetaData(data);
351    SplitLogTask slt = SplitLogTask.parseFrom(data);
352    if (slt.isUnassigned()) {
353      LOG.debug("Task not yet acquired " + path + ", ver=" + version);
354      handleUnassignedTask(path);
355    } else if (slt.isOwned()) {
356      heartbeat(path, version, slt.getServerName());
357    } else if (slt.isResigned()) {
358      LOG.info("Task " + path + " entered state=" + slt.toString());
359      resubmitOrFail(path, FORCE);
360    } else if (slt.isDone()) {
361      LOG.info("Task " + path + " entered state=" + slt.toString());
362      if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
363        if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) {
364          setDone(path, SUCCESS);
365        } else {
366          resubmitOrFail(path, CHECK);
367        }
368      } else {
369        setDone(path, SUCCESS);
370      }
371    } else if (slt.isErr()) {
372      LOG.info("Task " + path + " entered state=" + slt.toString());
373      resubmitOrFail(path, CHECK);
374    } else {
375      LOG.error(HBaseMarkers.FATAL,
376        "logic error - unexpected zk state for path = " + path + " data = " + slt.toString());
377      setDone(path, FAILURE);
378    }
379  }
380
381  private void resubmitOrFail(String path, ResubmitDirective directive) {
382    if (resubmitTask(path, findOrCreateOrphanTask(path), directive) == false) {
383      setDone(path, FAILURE);
384    }
385  }
386
387  private void getDataSetWatchFailure(String path) {
388    LOG.warn("Failed to set data watch " + path);
389    setDone(path, FAILURE);
390  }
391
392  private void setDone(String path, TerminationStatus status) {
393    Task task = details.getTasks().get(path);
394    if (task == null) {
395      if (!ZKSplitLog.isRescanNode(watcher, path)) {
396        SplitLogCounters.tot_mgr_unacquired_orphan_done.increment();
397        LOG.debug("Unacquired orphan task is done " + path);
398      }
399    } else {
400      synchronized (task) {
401        if (task.status == IN_PROGRESS) {
402          if (status == SUCCESS) {
403            SplitLogCounters.tot_mgr_log_split_success.increment();
404            LOG.info("Done splitting " + path);
405          } else {
406            SplitLogCounters.tot_mgr_log_split_err.increment();
407            LOG.warn("Error splitting " + path);
408          }
409          task.status = status;
410          if (task.batch != null) {
411            synchronized (task.batch) {
412              if (status == SUCCESS) {
413                task.batch.done++;
414              } else {
415                task.batch.error++;
416              }
417              task.batch.notify();
418            }
419          }
420        }
421      }
422    }
423    // delete the task node in zk. It's an async
424    // call and no one is blocked waiting for this node to be deleted. All
425    // task names are unique (log.<timestamp>) there is no risk of deleting
426    // a future task.
427    // if a deletion fails, TimeoutMonitor will retry the same deletion later
428    deleteNode(path, zkretries);
429    return;
430  }
431
432  private Task findOrCreateOrphanTask(String path) {
433    return computeIfAbsent(details.getTasks(), path, Task::new, () -> {
434      LOG.info("Creating orphan task " + path);
435      SplitLogCounters.tot_mgr_orphan_task_acquired.increment();
436    });
437  }
438
439  private void heartbeat(String path, int new_version, ServerName workerName) {
440    Task task = findOrCreateOrphanTask(path);
441    if (new_version != task.last_version) {
442      if (task.isUnassigned()) {
443        LOG.info("Task " + path + " acquired by " + workerName);
444      }
445      task.heartbeat(EnvironmentEdgeManager.currentTime(), new_version, workerName);
446      SplitLogCounters.tot_mgr_heartbeat.increment();
447    } else {
448      // duplicate heartbeats - heartbeats w/o zk node version
449      // changing - are possible. The timeout thread does
450      // getDataSetWatch() just to check whether a node still
451      // exists or not
452    }
453    return;
454  }
455
456  private void lookForOrphans() {
457    List<String> orphans;
458    try {
459      orphans =
460        ZKUtil.listChildrenNoWatch(this.watcher, this.watcher.getZNodePaths().splitLogZNode);
461      if (orphans == null) {
462        LOG.warn("Could not get children of " + this.watcher.getZNodePaths().splitLogZNode);
463        return;
464      }
465    } catch (KeeperException e) {
466      LOG.warn("Could not get children of " + this.watcher.getZNodePaths().splitLogZNode + " "
467        + StringUtils.stringifyException(e));
468      return;
469    }
470    int rescan_nodes = 0;
471    int listSize = orphans.size();
472    for (int i = 0; i < listSize; i++) {
473      String path = orphans.get(i);
474      String nodepath = ZNodePaths.joinZNode(watcher.getZNodePaths().splitLogZNode, path);
475      if (ZKSplitLog.isRescanNode(watcher, nodepath)) {
476        rescan_nodes++;
477        LOG.debug("Found orphan rescan node " + path);
478      } else {
479        LOG.info("Found orphan task " + path);
480      }
481      getDataSetWatch(nodepath, zkretries);
482    }
483    LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " + rescan_nodes
484      + " rescan nodes");
485  }
486
487  @Override
488  public void nodeDataChanged(String path) {
489    Task task;
490    task = details.getTasks().get(path);
491    if (task != null || ZKSplitLog.isRescanNode(watcher, path)) {
492      if (task != null) {
493        task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime());
494      }
495      getDataSetWatch(path, zkretries);
496    }
497  }
498
499  private boolean resubmit(String path, int version) {
500    try {
501      // blocking zk call but this is done from the timeout thread
502      SplitLogTask slt = new SplitLogTask.Unassigned(this.details.getServerName());
503      if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
504        LOG.debug("Failed to resubmit task " + path + " version changed");
505        return false;
506      }
507    } catch (NoNodeException e) {
508      LOG.warn("Failed to resubmit because znode doesn't exist " + path
509        + " task done (or forced done by removing the znode)");
510      try {
511        getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
512      } catch (DeserializationException e1) {
513        LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1);
514        return false;
515      }
516      return false;
517    } catch (KeeperException.BadVersionException e) {
518      LOG.debug("Failed to resubmit task " + path + " version changed");
519      return false;
520    } catch (KeeperException e) {
521      SplitLogCounters.tot_mgr_resubmit_failed.increment();
522      LOG.warn("Failed to resubmit " + path, e);
523      return false;
524    }
525    return true;
526  }
527
528  /**
529   * {@link org.apache.hadoop.hbase.master.SplitLogManager} can use objects implementing this
530   * interface to finish off a partially done task by
531   * {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}. This provides a serialization
532   * point at the end of the task processing. Must be restartable and idempotent.
533   */
534  public interface TaskFinisher {
535    /**
536     * status that can be returned finish()
537     */
538    enum Status {
539      /**
540       * task completed successfully
541       */
542      DONE(),
543      /**
544       * task completed with error
545       */
546      ERR();
547    }
548
549    /**
550     * finish the partially done task. workername provides clue to where the partial results of the
551     * partially done tasks are present. taskname is the name of the task that was put up in
552     * zookeeper.
553     * <p>
554     * nn * @return DONE if task completed successfully, ERR otherwise
555     */
556    Status finish(ServerName workerName, String taskname);
557  }
558
559  /**
560   * Asynchronous handler for zk create node results. Retries on failures.
561   */
562  public class CreateAsyncCallback implements AsyncCallback.StringCallback {
563    private final Logger LOG = LoggerFactory.getLogger(CreateAsyncCallback.class);
564
565    @Override
566    public void processResult(int rc, String path, Object ctx, String name) {
567      SplitLogCounters.tot_mgr_node_create_result.increment();
568      if (rc != 0) {
569        if (needAbandonRetries(rc, "Create znode " + path)) {
570          createNodeFailure(path);
571          return;
572        }
573        if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
574          // What if there is a delete pending against this pre-existing
575          // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE
576          // state. Only operations that will be carried out on this node by
577          // this manager are get-znode-data, task-finisher and delete-znode.
578          // And all code pieces correctly handle the case of suddenly
579          // disappearing task-znode.
580          LOG.debug("Found pre-existing znode " + path);
581          SplitLogCounters.tot_mgr_node_already_exists.increment();
582        } else {
583          Long retry_count = (Long) ctx;
584          LOG.warn("Create rc=" + KeeperException.Code.get(rc) + " for " + path
585            + " remaining retries=" + retry_count);
586          if (retry_count == 0) {
587            SplitLogCounters.tot_mgr_node_create_err.increment();
588            createNodeFailure(path);
589          } else {
590            SplitLogCounters.tot_mgr_node_create_retry.increment();
591            createNode(path, retry_count - 1);
592          }
593          return;
594        }
595      }
596      createNodeSuccess(path);
597    }
598  }
599
600  /**
601   * Asynchronous handler for zk get-data-set-watch on node results. Retries on failures.
602   */
603  public class GetDataAsyncCallback implements AsyncCallback.DataCallback {
604    private final Logger LOG = LoggerFactory.getLogger(GetDataAsyncCallback.class);
605
606    @Override
607    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
608      SplitLogCounters.tot_mgr_get_data_result.increment();
609      if (rc != 0) {
610        if (needAbandonRetries(rc, "GetData from znode " + path)) {
611          return;
612        }
613        if (rc == KeeperException.Code.NONODE.intValue()) {
614          SplitLogCounters.tot_mgr_get_data_nonode.increment();
615          LOG.warn("Task znode " + path + " vanished or not created yet.");
616          // ignore since we should not end up in a case where there is in-memory task,
617          // but no znode. The only case is between the time task is created in-memory
618          // and the znode is created. See HBASE-11217.
619          return;
620        }
621        Long retry_count = (Long) ctx;
622
623        if (retry_count < 0) {
624          LOG.warn("Getdata rc=" + KeeperException.Code.get(rc) + " " + path
625            + ". Ignoring error. No error handling. No retrying.");
626          return;
627        }
628        LOG.warn("Getdata rc=" + KeeperException.Code.get(rc) + " " + path + " remaining retries="
629          + retry_count);
630        if (retry_count == 0) {
631          SplitLogCounters.tot_mgr_get_data_err.increment();
632          getDataSetWatchFailure(path);
633        } else {
634          SplitLogCounters.tot_mgr_get_data_retry.increment();
635          getDataSetWatch(path, retry_count - 1);
636        }
637        return;
638      }
639      try {
640        getDataSetWatchSuccess(path, data, stat.getVersion());
641      } catch (DeserializationException e) {
642        LOG.warn("Deserialization problem", e);
643      }
644      return;
645    }
646  }
647
648  /**
649   * Asynchronous handler for zk delete node results. Retries on failures.
650   */
651  public class DeleteAsyncCallback implements AsyncCallback.VoidCallback {
652    private final Logger LOG = LoggerFactory.getLogger(DeleteAsyncCallback.class);
653
654    @Override
655    public void processResult(int rc, String path, Object ctx) {
656      SplitLogCounters.tot_mgr_node_delete_result.increment();
657      if (rc != 0) {
658        if (needAbandonRetries(rc, "Delete znode " + path)) {
659          details.getFailedDeletions().add(path);
660          return;
661        }
662        if (rc != KeeperException.Code.NONODE.intValue()) {
663          SplitLogCounters.tot_mgr_node_delete_err.increment();
664          Long retry_count = (Long) ctx;
665          LOG.warn("Delete rc=" + KeeperException.Code.get(rc) + " for " + path
666            + " remaining retries=" + retry_count);
667          if (retry_count == 0) {
668            LOG.warn("Delete failed " + path);
669            details.getFailedDeletions().add(path);
670            deleteNodeFailure(path);
671          } else {
672            deleteNode(path, retry_count - 1);
673          }
674          return;
675        } else {
676          LOG.info(path + " does not exist. Either was created but deleted behind our"
677            + " back by another pending delete OR was deleted"
678            + " in earlier retry rounds. zkretries = " + ctx);
679        }
680      } else {
681        LOG.debug("Deleted " + path);
682      }
683      deleteNodeSuccess(path);
684    }
685  }
686
687  /**
688   * Asynchronous handler for zk create RESCAN-node results. Retries on failures.
689   * <p>
690   * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal for all the
691   * {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}s to rescan for new tasks.
692   */
693  public class CreateRescanAsyncCallback implements AsyncCallback.StringCallback {
694    private final Logger LOG = LoggerFactory.getLogger(CreateRescanAsyncCallback.class);
695
696    @Override
697    public void processResult(int rc, String path, Object ctx, String name) {
698      if (rc != 0) {
699        if (needAbandonRetries(rc, "CreateRescan znode " + path)) {
700          return;
701        }
702        Long retry_count = (Long) ctx;
703        LOG.warn("rc=" + KeeperException.Code.get(rc) + " for " + path + " remaining retries="
704          + retry_count);
705        if (retry_count == 0) {
706          createRescanFailure();
707        } else {
708          rescan(retry_count - 1);
709        }
710        return;
711      }
712      // path is the original arg, name is the actual name that was created
713      createRescanSuccess(name);
714    }
715  }
716
717  @Override
718  public void setDetails(SplitLogManagerDetails details) {
719    this.details = details;
720  }
721
722  @Override
723  public SplitLogManagerDetails getDetails() {
724    return details;
725  }
726
727  /**
728   * Temporary function that is used by unit tests only
729   */
730  public void setIgnoreDeleteForTesting(boolean b) {
731    ignoreZKDeleteForTesting = b;
732  }
733}