001/**
002  * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.coordination;
020
021import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
022import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
023import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
024import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
025import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
026import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
027import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
028
029import java.io.IOException;
030import java.util.List;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.SplitLogCounters;
036import org.apache.hadoop.hbase.SplitLogTask;
037import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination.TaskFinisher.Status;
038import org.apache.hadoop.hbase.exceptions.DeserializationException;
039import org.apache.hadoop.hbase.log.HBaseMarkers;
040import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective;
041import org.apache.hadoop.hbase.master.SplitLogManager.Task;
042import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.apache.hadoop.hbase.wal.WALSplitUtil;
045import org.apache.hadoop.hbase.zookeeper.ZKListener;
046import org.apache.hadoop.hbase.zookeeper.ZKMetadata;
047import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
048import org.apache.hadoop.hbase.zookeeper.ZKUtil;
049import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
050import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
051import org.apache.hadoop.util.StringUtils;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.apache.zookeeper.AsyncCallback;
054import org.apache.zookeeper.CreateMode;
055import org.apache.zookeeper.KeeperException;
056import org.apache.zookeeper.KeeperException.NoNodeException;
057import org.apache.zookeeper.ZooDefs.Ids;
058import org.apache.zookeeper.data.Stat;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062/**
063 * ZooKeeper based implementation of
064 * {@link SplitLogManagerCoordination}
065 */
066@InterfaceAudience.Private
067public class ZKSplitLogManagerCoordination extends ZKListener implements
068    SplitLogManagerCoordination {
069
070  public static final int DEFAULT_TIMEOUT = 120000;
071  public static final int DEFAULT_ZK_RETRIES = 3;
072  public static final int DEFAULT_MAX_RESUBMIT = 3;
073
074  private static final Logger LOG = LoggerFactory.getLogger(SplitLogManagerCoordination.class);
075
076  private final TaskFinisher taskFinisher;
077  private final Configuration conf;
078
079  private long zkretries;
080  private long resubmitThreshold;
081  private long timeout;
082
083  SplitLogManagerDetails details;
084
085  public boolean ignoreZKDeleteForTesting = false;
086
087  public ZKSplitLogManagerCoordination(Configuration conf, ZKWatcher watcher) {
088    super(watcher);
089    this.conf = conf;
090    taskFinisher = new TaskFinisher() {
091      @Override
092      public Status finish(ServerName workerName, String logfile) {
093        try {
094          WALSplitUtil.finishSplitLogFile(logfile, conf);
095        } catch (IOException e) {
096          LOG.warn("Could not finish splitting of log file " + logfile, e);
097          return Status.ERR;
098        }
099        return Status.DONE;
100      }
101    };
102  }
103
104  @Override
105  public void init() throws IOException {
106    this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
107    this.resubmitThreshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
108    this.timeout = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, DEFAULT_TIMEOUT);
109    if (this.watcher != null) {
110      this.watcher.registerListener(this);
111      lookForOrphans();
112    }
113  }
114
115  @Override
116  public String prepareTask(String taskname) {
117    return ZKSplitLog.getEncodedNodeName(watcher, taskname);
118  }
119
120  @Override
121  public int remainingTasksInCoordination() {
122    int count = 0;
123    try {
124      List<String> tasks = ZKUtil.listChildrenNoWatch(watcher,
125              watcher.getZNodePaths().splitLogZNode);
126      if (tasks != null) {
127        int listSize = tasks.size();
128        for (int i = 0; i < listSize; i++) {
129          if (!ZKSplitLog.isRescanNode(tasks.get(i))) {
130            count++;
131          }
132        }
133      }
134    } catch (KeeperException ke) {
135      LOG.warn("Failed to check remaining tasks", ke);
136      count = -1;
137    }
138    return count;
139  }
140
141  /**
142   * It is possible for a task to stay in UNASSIGNED state indefinitely - say SplitLogManager wants
143   * to resubmit a task. It forces the task to UNASSIGNED state but it dies before it could create
144   * the RESCAN task node to signal the SplitLogWorkers to pick up the task. To prevent this
145   * scenario the SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup.
146   * @param path
147   */
148  private void handleUnassignedTask(String path) {
149    if (ZKSplitLog.isRescanNode(watcher, path)) {
150      return;
151    }
152    Task task = findOrCreateOrphanTask(path);
153    if (task.isOrphan() && (task.incarnation.get() == 0)) {
154      LOG.info("Resubmitting unassigned orphan task " + path);
155      // ignore failure to resubmit. The timeout-monitor will handle it later
156      // albeit in a more crude fashion
157      resubmitTask(path, task, FORCE);
158    }
159  }
160
161  @Override
162  public void deleteTask(String path) {
163    deleteNode(path, zkretries);
164  }
165
166  @Override
167  public boolean resubmitTask(String path, Task task, ResubmitDirective directive) {
168    // its ok if this thread misses the update to task.deleted. It will fail later
169    if (task.status != IN_PROGRESS) {
170      return false;
171    }
172    int version;
173    if (directive != FORCE) {
174      // We're going to resubmit:
175      // 1) immediately if the worker server is now marked as dead
176      // 2) after a configurable timeout if the server is not marked as dead but has still not
177      // finished the task. This allows to continue if the worker cannot actually handle it,
178      // for any reason.
179      final long time = EnvironmentEdgeManager.currentTime() - task.last_update;
180      final boolean alive =
181          details.getMaster().getServerManager() != null ? details.getMaster().getServerManager()
182              .isServerOnline(task.cur_worker_name) : true;
183      if (alive && time < timeout) {
184        LOG.trace("Skipping the resubmit of " + task.toString() + "  because the server "
185            + task.cur_worker_name + " is not marked as dead, we waited for " + time
186            + " while the timeout is " + timeout);
187        return false;
188      }
189
190      if (task.unforcedResubmits.get() >= resubmitThreshold) {
191        if (!task.resubmitThresholdReached) {
192          task.resubmitThresholdReached = true;
193          SplitLogCounters.tot_mgr_resubmit_threshold_reached.increment();
194          LOG.info("Skipping resubmissions of task " + path + " because threshold "
195              + resubmitThreshold + " reached");
196        }
197        return false;
198      }
199      // race with heartbeat() that might be changing last_version
200      version = task.last_version;
201    } else {
202      SplitLogCounters.tot_mgr_resubmit_force.increment();
203      version = -1;
204    }
205    LOG.info("Resubmitting task " + path);
206    task.incarnation.incrementAndGet();
207    boolean result = resubmit(path, version);
208    if (!result) {
209      task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime());
210      return false;
211    }
212    // don't count forced resubmits
213    if (directive != FORCE) {
214      task.unforcedResubmits.incrementAndGet();
215    }
216    task.setUnassigned();
217    rescan(Long.MAX_VALUE);
218    SplitLogCounters.tot_mgr_resubmit.increment();
219    return true;
220  }
221
222
223  @Override
224  public void checkTasks() {
225    rescan(Long.MAX_VALUE);
226  };
227
228  /**
229   * signal the workers that a task was resubmitted by creating the RESCAN node.
230   */
231  private void rescan(long retries) {
232    // The RESCAN node will be deleted almost immediately by the
233    // SplitLogManager as soon as it is created because it is being
234    // created in the DONE state. This behavior prevents a buildup
235    // of RESCAN nodes. But there is also a chance that a SplitLogWorker
236    // might miss the watch-trigger that creation of RESCAN node provides.
237    // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
238    // therefore this behavior is safe.
239    SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName());
240    this.watcher
241        .getRecoverableZooKeeper()
242        .getZooKeeper()
243        .create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
244          CreateMode.EPHEMERAL_SEQUENTIAL, new CreateRescanAsyncCallback(), Long.valueOf(retries));
245  }
246
247  @Override
248  public void submitTask(String path) {
249    createNode(path, zkretries);
250  }
251
252  @Override
253  public void checkTaskStillAvailable(String path) {
254    // A negative retry count will lead to ignoring all error processing.
255    this.watcher
256        .getRecoverableZooKeeper()
257        .getZooKeeper()
258        .getData(path, this.watcher, new GetDataAsyncCallback(),
259          Long.valueOf(-1) /* retry count */);
260    SplitLogCounters.tot_mgr_get_data_queued.increment();
261  }
262
263  private void deleteNode(String path, Long retries) {
264    SplitLogCounters.tot_mgr_node_delete_queued.increment();
265    // Once a task znode is ready for delete, that is it is in the TASK_DONE
266    // state, then no one should be writing to it anymore. That is no one
267    // will be updating the znode version any more.
268    this.watcher.getRecoverableZooKeeper().getZooKeeper()
269        .delete(path, -1, new DeleteAsyncCallback(), retries);
270  }
271
272  private void deleteNodeSuccess(String path) {
273    if (ignoreZKDeleteForTesting) {
274      return;
275    }
276    Task task;
277    task = details.getTasks().remove(path);
278    if (task == null) {
279      if (ZKSplitLog.isRescanNode(watcher, path)) {
280        SplitLogCounters.tot_mgr_rescan_deleted.increment();
281      }
282      SplitLogCounters.tot_mgr_missing_state_in_delete.increment();
283      LOG.debug("Deleted task without in memory state " + path);
284      return;
285    }
286    synchronized (task) {
287      task.status = DELETED;
288      task.notify();
289    }
290    SplitLogCounters.tot_mgr_task_deleted.increment();
291  }
292
293  private void deleteNodeFailure(String path) {
294    LOG.info("Failed to delete node " + path + " and will retry soon.");
295    return;
296  }
297
298  private void createRescanSuccess(String path) {
299    SplitLogCounters.tot_mgr_rescan.increment();
300    getDataSetWatch(path, zkretries);
301  }
302
303  private void createRescanFailure() {
304    LOG.error(HBaseMarkers.FATAL, "logic failure, rescan failure must not happen");
305  }
306
307  /**
308   * Helper function to check whether to abandon retries in ZooKeeper AsyncCallback functions
309   * @param statusCode integer value of a ZooKeeper exception code
310   * @param action description message about the retried action
311   * @return true when need to abandon retries otherwise false
312   */
313  private boolean needAbandonRetries(int statusCode, String action) {
314    if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) {
315      LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for "
316          + "action=" + action);
317      return true;
318    }
319    return false;
320  }
321
322  private void createNode(String path, Long retry_count) {
323    SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName());
324    ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(),
325      retry_count);
326    SplitLogCounters.tot_mgr_node_create_queued.increment();
327    return;
328  }
329
330  private void createNodeSuccess(String path) {
331    LOG.debug("Put up splitlog task at znode " + path);
332    getDataSetWatch(path, zkretries);
333  }
334
335  private void createNodeFailure(String path) {
336    // TODO the Manager should split the log locally instead of giving up
337    LOG.warn("Failed to create task node " + path);
338    setDone(path, FAILURE);
339  }
340
341  private void getDataSetWatch(String path, Long retry_count) {
342    this.watcher.getRecoverableZooKeeper().getZooKeeper()
343        .getData(path, this.watcher, new GetDataAsyncCallback(), retry_count);
344    SplitLogCounters.tot_mgr_get_data_queued.increment();
345  }
346
347  private void getDataSetWatchSuccess(String path, byte[] data, int version)
348      throws DeserializationException {
349    if (data == null) {
350      if (version == Integer.MIN_VALUE) {
351        // assume all done. The task znode suddenly disappeared.
352        setDone(path, SUCCESS);
353        return;
354      }
355      SplitLogCounters.tot_mgr_null_data.increment();
356      LOG.error(HBaseMarkers.FATAL, "logic error - got null data " + path);
357      setDone(path, FAILURE);
358      return;
359    }
360    data = ZKMetadata.removeMetaData(data);
361    SplitLogTask slt = SplitLogTask.parseFrom(data);
362    if (slt.isUnassigned()) {
363      LOG.debug("Task not yet acquired " + path + ", ver=" + version);
364      handleUnassignedTask(path);
365    } else if (slt.isOwned()) {
366      heartbeat(path, version, slt.getServerName());
367    } else if (slt.isResigned()) {
368      LOG.info("Task " + path + " entered state=" + slt.toString());
369      resubmitOrFail(path, FORCE);
370    } else if (slt.isDone()) {
371      LOG.info("Task " + path + " entered state=" + slt.toString());
372      if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
373        if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) {
374          setDone(path, SUCCESS);
375        } else {
376          resubmitOrFail(path, CHECK);
377        }
378      } else {
379        setDone(path, SUCCESS);
380      }
381    } else if (slt.isErr()) {
382      LOG.info("Task " + path + " entered state=" + slt.toString());
383      resubmitOrFail(path, CHECK);
384    } else {
385      LOG.error(HBaseMarkers.FATAL, "logic error - unexpected zk state for path = "
386          + path + " data = " + slt.toString());
387      setDone(path, FAILURE);
388    }
389  }
390
391  private void resubmitOrFail(String path, ResubmitDirective directive) {
392    if (resubmitTask(path, findOrCreateOrphanTask(path), directive) == false) {
393      setDone(path, FAILURE);
394    }
395  }
396
397  private void getDataSetWatchFailure(String path) {
398    LOG.warn("Failed to set data watch " + path);
399    setDone(path, FAILURE);
400  }
401
402  private void setDone(String path, TerminationStatus status) {
403    Task task = details.getTasks().get(path);
404    if (task == null) {
405      if (!ZKSplitLog.isRescanNode(watcher, path)) {
406        SplitLogCounters.tot_mgr_unacquired_orphan_done.increment();
407        LOG.debug("Unacquired orphan task is done " + path);
408      }
409    } else {
410      synchronized (task) {
411        if (task.status == IN_PROGRESS) {
412          if (status == SUCCESS) {
413            SplitLogCounters.tot_mgr_log_split_success.increment();
414            LOG.info("Done splitting " + path);
415          } else {
416            SplitLogCounters.tot_mgr_log_split_err.increment();
417            LOG.warn("Error splitting " + path);
418          }
419          task.status = status;
420          if (task.batch != null) {
421            synchronized (task.batch) {
422              if (status == SUCCESS) {
423                task.batch.done++;
424              } else {
425                task.batch.error++;
426              }
427              task.batch.notify();
428            }
429          }
430        }
431      }
432    }
433    // delete the task node in zk. It's an async
434    // call and no one is blocked waiting for this node to be deleted. All
435    // task names are unique (log.<timestamp>) there is no risk of deleting
436    // a future task.
437    // if a deletion fails, TimeoutMonitor will retry the same deletion later
438    deleteNode(path, zkretries);
439    return;
440  }
441
442  private Task findOrCreateOrphanTask(String path) {
443    return computeIfAbsent(details.getTasks(), path, Task::new, () -> {
444      LOG.info("Creating orphan task " + path);
445      SplitLogCounters.tot_mgr_orphan_task_acquired.increment();
446    });
447  }
448
449  private void heartbeat(String path, int new_version, ServerName workerName) {
450    Task task = findOrCreateOrphanTask(path);
451    if (new_version != task.last_version) {
452      if (task.isUnassigned()) {
453        LOG.info("Task " + path + " acquired by " + workerName);
454      }
455      task.heartbeat(EnvironmentEdgeManager.currentTime(), new_version, workerName);
456      SplitLogCounters.tot_mgr_heartbeat.increment();
457    } else {
458      // duplicate heartbeats - heartbeats w/o zk node version
459      // changing - are possible. The timeout thread does
460      // getDataSetWatch() just to check whether a node still
461      // exists or not
462    }
463    return;
464  }
465
466  private void lookForOrphans() {
467    List<String> orphans;
468    try {
469      orphans = ZKUtil.listChildrenNoWatch(this.watcher,
470              this.watcher.getZNodePaths().splitLogZNode);
471      if (orphans == null) {
472        LOG.warn("Could not get children of " + this.watcher.getZNodePaths().splitLogZNode);
473        return;
474      }
475    } catch (KeeperException e) {
476      LOG.warn("Could not get children of " + this.watcher.getZNodePaths().splitLogZNode + " "
477          + StringUtils.stringifyException(e));
478      return;
479    }
480    int rescan_nodes = 0;
481    int listSize = orphans.size();
482    for (int i = 0; i < listSize; i++) {
483      String path = orphans.get(i);
484      String nodepath = ZNodePaths.joinZNode(watcher.getZNodePaths().splitLogZNode, path);
485      if (ZKSplitLog.isRescanNode(watcher, nodepath)) {
486        rescan_nodes++;
487        LOG.debug("Found orphan rescan node " + path);
488      } else {
489        LOG.info("Found orphan task " + path);
490      }
491      getDataSetWatch(nodepath, zkretries);
492    }
493    LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " + rescan_nodes
494        + " rescan nodes");
495  }
496
497  @Override
498  public void nodeDataChanged(String path) {
499    Task task;
500    task = details.getTasks().get(path);
501    if (task != null || ZKSplitLog.isRescanNode(watcher, path)) {
502      if (task != null) {
503        task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime());
504      }
505      getDataSetWatch(path, zkretries);
506    }
507  }
508
509  private boolean resubmit(String path, int version) {
510    try {
511      // blocking zk call but this is done from the timeout thread
512      SplitLogTask slt =
513          new SplitLogTask.Unassigned(this.details.getServerName());
514      if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
515        LOG.debug("Failed to resubmit task " + path + " version changed");
516        return false;
517      }
518    } catch (NoNodeException e) {
519      LOG.warn("Failed to resubmit because znode doesn't exist " + path
520          + " task done (or forced done by removing the znode)");
521      try {
522        getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
523      } catch (DeserializationException e1) {
524        LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1);
525        return false;
526      }
527      return false;
528    } catch (KeeperException.BadVersionException e) {
529      LOG.debug("Failed to resubmit task " + path + " version changed");
530      return false;
531    } catch (KeeperException e) {
532      SplitLogCounters.tot_mgr_resubmit_failed.increment();
533      LOG.warn("Failed to resubmit " + path, e);
534      return false;
535    }
536    return true;
537  }
538
539
540  /**
541   * {@link org.apache.hadoop.hbase.master.SplitLogManager} can use objects implementing this
542   * interface to finish off a partially done task by
543   * {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}. This provides a
544   * serialization point at the end of the task processing. Must be restartable and idempotent.
545   */
546  public interface TaskFinisher {
547    /**
548     * status that can be returned finish()
549     */
550    enum Status {
551      /**
552       * task completed successfully
553       */
554      DONE(),
555      /**
556       * task completed with error
557       */
558      ERR();
559    }
560
561    /**
562     * finish the partially done task. workername provides clue to where the partial results of the
563     * partially done tasks are present. taskname is the name of the task that was put up in
564     * zookeeper.
565     * <p>
566     * @param workerName
567     * @param taskname
568     * @return DONE if task completed successfully, ERR otherwise
569     */
570    Status finish(ServerName workerName, String taskname);
571  }
572
573  /**
574   * Asynchronous handler for zk create node results. Retries on failures.
575   */
576  public class CreateAsyncCallback implements AsyncCallback.StringCallback {
577    private final Logger LOG = LoggerFactory.getLogger(CreateAsyncCallback.class);
578
579    @Override
580    public void processResult(int rc, String path, Object ctx, String name) {
581      SplitLogCounters.tot_mgr_node_create_result.increment();
582      if (rc != 0) {
583        if (needAbandonRetries(rc, "Create znode " + path)) {
584          createNodeFailure(path);
585          return;
586        }
587        if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
588          // What if there is a delete pending against this pre-existing
589          // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE
590          // state. Only operations that will be carried out on this node by
591          // this manager are get-znode-data, task-finisher and delete-znode.
592          // And all code pieces correctly handle the case of suddenly
593          // disappearing task-znode.
594          LOG.debug("Found pre-existing znode " + path);
595          SplitLogCounters.tot_mgr_node_already_exists.increment();
596        } else {
597          Long retry_count = (Long) ctx;
598          LOG.warn("Create rc=" + KeeperException.Code.get(rc) + " for " + path
599              + " remaining retries=" + retry_count);
600          if (retry_count == 0) {
601            SplitLogCounters.tot_mgr_node_create_err.increment();
602            createNodeFailure(path);
603          } else {
604            SplitLogCounters.tot_mgr_node_create_retry.increment();
605            createNode(path, retry_count - 1);
606          }
607          return;
608        }
609      }
610      createNodeSuccess(path);
611    }
612  }
613
614  /**
615   * Asynchronous handler for zk get-data-set-watch on node results. Retries on failures.
616   */
617  public class GetDataAsyncCallback implements AsyncCallback.DataCallback {
618    private final Logger LOG = LoggerFactory.getLogger(GetDataAsyncCallback.class);
619
620    @Override
621    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
622      SplitLogCounters.tot_mgr_get_data_result.increment();
623      if (rc != 0) {
624        if (needAbandonRetries(rc, "GetData from znode " + path)) {
625          return;
626        }
627        if (rc == KeeperException.Code.NONODE.intValue()) {
628          SplitLogCounters.tot_mgr_get_data_nonode.increment();
629          LOG.warn("Task znode " + path + " vanished or not created yet.");
630          // ignore since we should not end up in a case where there is in-memory task,
631          // but no znode. The only case is between the time task is created in-memory
632          // and the znode is created. See HBASE-11217.
633          return;
634        }
635        Long retry_count = (Long) ctx;
636
637        if (retry_count < 0) {
638          LOG.warn("Getdata rc=" + KeeperException.Code.get(rc) + " " + path
639              + ". Ignoring error. No error handling. No retrying.");
640          return;
641        }
642        LOG.warn("Getdata rc=" + KeeperException.Code.get(rc) + " " + path
643            + " remaining retries=" + retry_count);
644        if (retry_count == 0) {
645          SplitLogCounters.tot_mgr_get_data_err.increment();
646          getDataSetWatchFailure(path);
647        } else {
648          SplitLogCounters.tot_mgr_get_data_retry.increment();
649          getDataSetWatch(path, retry_count - 1);
650        }
651        return;
652      }
653      try {
654        getDataSetWatchSuccess(path, data, stat.getVersion());
655      } catch (DeserializationException e) {
656        LOG.warn("Deserialization problem", e);
657      }
658      return;
659    }
660  }
661
662  /**
663   * Asynchronous handler for zk delete node results. Retries on failures.
664   */
665  public class DeleteAsyncCallback implements AsyncCallback.VoidCallback {
666    private final Logger LOG = LoggerFactory.getLogger(DeleteAsyncCallback.class);
667
668    @Override
669    public void processResult(int rc, String path, Object ctx) {
670      SplitLogCounters.tot_mgr_node_delete_result.increment();
671      if (rc != 0) {
672        if (needAbandonRetries(rc, "Delete znode " + path)) {
673          details.getFailedDeletions().add(path);
674          return;
675        }
676        if (rc != KeeperException.Code.NONODE.intValue()) {
677          SplitLogCounters.tot_mgr_node_delete_err.increment();
678          Long retry_count = (Long) ctx;
679          LOG.warn("Delete rc=" + KeeperException.Code.get(rc) + " for " + path
680              + " remaining retries=" + retry_count);
681          if (retry_count == 0) {
682            LOG.warn("Delete failed " + path);
683            details.getFailedDeletions().add(path);
684            deleteNodeFailure(path);
685          } else {
686            deleteNode(path, retry_count - 1);
687          }
688          return;
689        } else {
690          LOG.info(path + " does not exist. Either was created but deleted behind our"
691              + " back by another pending delete OR was deleted"
692              + " in earlier retry rounds. zkretries = " + ctx);
693        }
694      } else {
695        LOG.debug("Deleted " + path);
696      }
697      deleteNodeSuccess(path);
698    }
699  }
700
701  /**
702   * Asynchronous handler for zk create RESCAN-node results. Retries on failures.
703   * <p>
704   * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal for all the
705   * {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}s to rescan for new tasks.
706   */
707  public class CreateRescanAsyncCallback implements AsyncCallback.StringCallback {
708    private final Logger LOG = LoggerFactory.getLogger(CreateRescanAsyncCallback.class);
709
710    @Override
711    public void processResult(int rc, String path, Object ctx, String name) {
712      if (rc != 0) {
713        if (needAbandonRetries(rc, "CreateRescan znode " + path)) {
714          return;
715        }
716        Long retry_count = (Long) ctx;
717        LOG.warn("rc=" + KeeperException.Code.get(rc) + " for " + path + " remaining retries="
718            + retry_count);
719        if (retry_count == 0) {
720          createRescanFailure();
721        } else {
722          rescan(retry_count - 1);
723        }
724        return;
725      }
726      // path is the original arg, name is the actual name that was created
727      createRescanSuccess(name);
728    }
729  }
730
731  @Override
732  public void setDetails(SplitLogManagerDetails details) {
733    this.details = details;
734  }
735
736  @Override
737  public SplitLogManagerDetails getDetails() {
738    return details;
739  }
740
741  /**
742   * Temporary function that is used by unit tests only
743   */
744  public void setIgnoreDeleteForTesting(boolean b) {
745    ignoreZKDeleteForTesting = b;
746  }
747}