001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coordination;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
022
023import java.util.Collections;
024import java.util.List;
025import java.util.concurrent.ThreadLocalRandom;
026import java.util.concurrent.atomic.AtomicInteger;
027import java.util.concurrent.atomic.LongAdder;
028import org.apache.commons.lang3.mutable.MutableInt;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.SplitLogCounters;
035import org.apache.hadoop.hbase.SplitLogTask;
036import org.apache.hadoop.hbase.exceptions.DeserializationException;
037import org.apache.hadoop.hbase.log.HBaseMarkers;
038import org.apache.hadoop.hbase.regionserver.RegionServerServices;
039import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
040import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
041import org.apache.hadoop.hbase.regionserver.handler.WALSplitterHandler;
042import org.apache.hadoop.hbase.util.CancelableProgressable;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
045import org.apache.hadoop.hbase.zookeeper.ZKListener;
046import org.apache.hadoop.hbase.zookeeper.ZKMetadata;
047import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
048import org.apache.hadoop.hbase.zookeeper.ZKUtil;
049import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
050import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
051import org.apache.hadoop.util.StringUtils;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.apache.zookeeper.AsyncCallback;
054import org.apache.zookeeper.KeeperException;
055import org.apache.zookeeper.data.Stat;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059/**
060 * ZooKeeper based implementation of {@link SplitLogWorkerCoordination} It listen for changes in
061 * ZooKeeper and
062 */
063@InterfaceAudience.Private
064public class ZkSplitLogWorkerCoordination extends ZKListener implements SplitLogWorkerCoordination {
065
066  private static final Logger LOG = LoggerFactory.getLogger(ZkSplitLogWorkerCoordination.class);
067
068  private static final int checkInterval = 5000; // 5 seconds
069  private static final int FAILED_TO_OWN_TASK = -1;
070
071  private SplitLogWorker worker;
072
073  private TaskExecutor splitTaskExecutor;
074
075  private final AtomicInteger taskReadySeq = new AtomicInteger(0);
076  private volatile String currentTask = null;
077  private int currentVersion;
078  private volatile boolean shouldStop = false;
079  private final Object grabTaskLock = new Object();
080  private boolean workerInGrabTask = false;
081  private int reportPeriod;
082  private RegionServerServices server = null;
083  protected final AtomicInteger tasksInProgress = new AtomicInteger(0);
084  private int maxConcurrentTasks = 0;
085
086  private final ServerName serverName;
087
088  public ZkSplitLogWorkerCoordination(ServerName serverName, ZKWatcher watcher) {
089    super(watcher);
090    this.serverName = serverName;
091  }
092
093  /**
094   * Override handler from {@link ZKListener}
095   */
096  @Override
097  public void nodeChildrenChanged(String path) {
098    if (path.equals(watcher.getZNodePaths().splitLogZNode)) {
099      if (LOG.isTraceEnabled()) {
100        LOG.trace("tasks arrived or departed on " + path);
101      }
102      synchronized (taskReadySeq) {
103        this.taskReadySeq.incrementAndGet();
104        taskReadySeq.notify();
105      }
106    }
107  }
108
109  /**
110   * Override handler from {@link ZKListener}
111   */
112  @Override
113  public void nodeDataChanged(String path) {
114    // there will be a self generated dataChanged event every time attemptToOwnTask()
115    // heartbeats the task znode by upping its version
116    synchronized (grabTaskLock) {
117      if (workerInGrabTask) {
118        // currentTask can change
119        String taskpath = currentTask;
120        if (taskpath != null && taskpath.equals(path)) {
121          getDataSetWatchAsync();
122        }
123      }
124    }
125  }
126
127  /**
128   * Override setter from {@link SplitLogWorkerCoordination}
129   */
130  @Override
131  public void init(RegionServerServices server, Configuration conf, TaskExecutor splitExecutor,
132    SplitLogWorker worker) {
133    this.server = server;
134    this.worker = worker;
135    this.splitTaskExecutor = splitExecutor;
136    maxConcurrentTasks =
137      conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
138    reportPeriod = conf.getInt("hbase.splitlog.report.period",
139      conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
140        ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT) / 3);
141  }
142
143  /* Support functions for ZooKeeper async callback */
144
145  void getDataSetWatchFailure(String path) {
146    synchronized (grabTaskLock) {
147      if (workerInGrabTask) {
148        // currentTask can change but that's ok
149        String taskpath = currentTask;
150        if (taskpath != null && taskpath.equals(path)) {
151          LOG.info("retrying data watch on " + path);
152          SplitLogCounters.tot_wkr_get_data_retry.increment();
153          getDataSetWatchAsync();
154        } else {
155          // no point setting a watch on the task which this worker is not
156          // working upon anymore
157        }
158      }
159    }
160  }
161
162  public void getDataSetWatchAsync() {
163    watcher.getRecoverableZooKeeper().getZooKeeper().getData(currentTask, watcher,
164      new GetDataAsyncCallback(), null);
165    SplitLogCounters.tot_wkr_get_data_queued.increment();
166  }
167
168  void getDataSetWatchSuccess(String path, byte[] data) {
169    SplitLogTask slt;
170    try {
171      slt = SplitLogTask.parseFrom(data);
172    } catch (DeserializationException e) {
173      LOG.warn("Failed parse", e);
174      return;
175    }
176    synchronized (grabTaskLock) {
177      if (workerInGrabTask) {
178        // currentTask can change but that's ok
179        String taskpath = currentTask;
180        if (taskpath != null && taskpath.equals(path)) {
181          // have to compare data. cannot compare version because then there
182          // will be race with attemptToOwnTask()
183          // cannot just check whether the node has been transitioned to
184          // UNASSIGNED because by the time this worker sets the data watch
185          // the node might have made two transitions - from owned by this
186          // worker to unassigned to owned by another worker
187          if (
188            !slt.isOwned(serverName) && !slt.isDone(serverName) && !slt.isErr(serverName)
189              && !slt.isResigned(serverName)
190          ) {
191            LOG.info("task " + taskpath + " preempted from " + serverName
192              + ", current task state and owner=" + slt.toString());
193            worker.stopTask();
194          }
195        }
196      }
197    }
198  }
199
200  /**
201   * try to grab a 'lock' on the task zk node to own and execute the task.
202   * <p>
203   * @param path zk node for the task
204   * @return boolean value when grab a task success return true otherwise false
205   */
206  private boolean grabTask(String path) {
207    Stat stat = new Stat();
208    byte[] data;
209    synchronized (grabTaskLock) {
210      currentTask = path;
211      workerInGrabTask = true;
212      if (Thread.interrupted()) {
213        return false;
214      }
215    }
216    try {
217      try {
218        if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) {
219          SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.increment();
220          return false;
221        }
222      } catch (KeeperException e) {
223        LOG.warn("Failed to get data for znode " + path, e);
224        SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment();
225        return false;
226      }
227      SplitLogTask slt;
228      try {
229        slt = SplitLogTask.parseFrom(data);
230      } catch (DeserializationException e) {
231        LOG.warn("Failed parse data for znode " + path, e);
232        SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment();
233        return false;
234      }
235      if (!slt.isUnassigned()) {
236        SplitLogCounters.tot_wkr_failed_to_grab_task_owned.increment();
237        return false;
238      }
239
240      currentVersion =
241        attemptToOwnTask(true, watcher, server.getServerName(), path, stat.getVersion());
242      if (currentVersion < 0) {
243        SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.increment();
244        return false;
245      }
246
247      if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
248        ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails =
249          new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails();
250        splitTaskDetails.setTaskNode(currentTask);
251        splitTaskDetails.setCurTaskZKVersion(new MutableInt(currentVersion));
252
253        endTask(new SplitLogTask.Done(server.getServerName()),
254          SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails);
255        return false;
256      }
257
258      LOG.info("worker " + server.getServerName() + " acquired task " + path);
259      SplitLogCounters.tot_wkr_task_acquired.increment();
260      getDataSetWatchAsync();
261
262      submitTask(path, currentVersion, reportPeriod);
263
264      // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks
265      try {
266        int sleepTime = ThreadLocalRandom.current().nextInt(500) + 500;
267        Thread.sleep(sleepTime);
268      } catch (InterruptedException e) {
269        LOG.warn("Interrupted while yielding for other region servers", e);
270        Thread.currentThread().interrupt();
271      }
272      return true;
273    } finally {
274      synchronized (grabTaskLock) {
275        workerInGrabTask = false;
276        // clear the interrupt from stopTask() otherwise the next task will
277        // suffer
278        Thread.interrupted();
279      }
280    }
281  }
282
283  /**
284   * Submit a log split task to executor service
285   * @param curTask          task to submit
286   * @param curTaskZKVersion current version of task
287   */
288  void submitTask(final String curTask, final int curTaskZKVersion, final int reportPeriod) {
289    final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
290
291    CancelableProgressable reporter = new CancelableProgressable() {
292      private long last_report_at = 0;
293
294      @Override
295      public boolean progress() {
296        long t = EnvironmentEdgeManager.currentTime();
297        if ((t - last_report_at) > reportPeriod) {
298          last_report_at = t;
299          int latestZKVersion =
300            attemptToOwnTask(false, watcher, server.getServerName(), curTask, zkVersion.intValue());
301          if (latestZKVersion < 0) {
302            LOG.warn("Failed to heartbeat the task" + curTask);
303            return false;
304          }
305          zkVersion.setValue(latestZKVersion);
306        }
307        return true;
308      }
309    };
310    ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails =
311      new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails();
312    splitTaskDetails.setTaskNode(curTask);
313    splitTaskDetails.setCurTaskZKVersion(zkVersion);
314
315    WALSplitterHandler hsh = new WALSplitterHandler(server, this, splitTaskDetails, reporter,
316      this.tasksInProgress, splitTaskExecutor);
317    server.getExecutorService().submit(hsh);
318  }
319
320  /**
321   * @return true if more splitters are available, otherwise false.
322   */
323  private boolean areSplittersAvailable() {
324    return maxConcurrentTasks - tasksInProgress.get() > 0;
325  }
326
327  /**
328   * Try to own the task by transitioning the zk node data from UNASSIGNED to OWNED.
329   * <p>
330   * This method is also used to periodically heartbeat the task progress by transitioning the node
331   * from OWNED to OWNED.
332   * <p>
333   * @param isFirstTime   shows whther it's the first attempt.
334   * @param zkw           zk wathcer
335   * @param server        name
336   * @param task          to own
337   * @param taskZKVersion version of the task in zk
338   * @return non-negative integer value when task can be owned by current region server otherwise -1
339   */
340  protected static int attemptToOwnTask(boolean isFirstTime, ZKWatcher zkw, ServerName server,
341    String task, int taskZKVersion) {
342    int latestZKVersion = FAILED_TO_OWN_TASK;
343    try {
344      SplitLogTask slt = new SplitLogTask.Owned(server);
345      Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion);
346      if (stat == null) {
347        LOG.warn("zk.setData() returned null for path " + task);
348        SplitLogCounters.tot_wkr_task_heartbeat_failed.increment();
349        return FAILED_TO_OWN_TASK;
350      }
351      latestZKVersion = stat.getVersion();
352      SplitLogCounters.tot_wkr_task_heartbeat.increment();
353      return latestZKVersion;
354    } catch (KeeperException e) {
355      if (!isFirstTime) {
356        if (e.code().equals(KeeperException.Code.NONODE)) {
357          LOG.warn("NONODE failed to assert ownership for " + task, e);
358        } else if (e.code().equals(KeeperException.Code.BADVERSION)) {
359          LOG.warn("BADVERSION failed to assert ownership for " + task, e);
360        } else {
361          LOG.warn("failed to assert ownership for " + task, e);
362        }
363      }
364    } catch (InterruptedException e1) {
365      LOG.warn("Interrupted while trying to assert ownership of " + task + " "
366        + StringUtils.stringifyException(e1));
367      Thread.currentThread().interrupt();
368    }
369    SplitLogCounters.tot_wkr_task_heartbeat_failed.increment();
370    return FAILED_TO_OWN_TASK;
371  }
372
373  /**
374   * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task one at a time. This
375   * policy puts an upper-limit on the number of simultaneous log splitting that could be happening
376   * in a cluster.
377   * <p>
378   * Synchronization using <code>taskReadySeq</code> ensures that it will try to grab every task
379   * that has been put up n
380   */
381  @Override
382  public void taskLoop() throws InterruptedException {
383    while (!shouldStop) {
384      int seq_start = taskReadySeq.get();
385      List<String> paths;
386      paths = getTaskList();
387      if (paths == null) {
388        LOG.warn("Could not get tasks, did someone remove " + watcher.getZNodePaths().splitLogZNode
389          + " ... worker thread exiting.");
390        return;
391      }
392      // shuffle the paths to prevent different split log worker start from the same log file after
393      // meta log (if any)
394      Collections.shuffle(paths);
395      // pick meta wal firstly
396      int offset = 0;
397      for (int i = 0; i < paths.size(); i++) {
398        if (AbstractFSWALProvider.isMetaFile(paths.get(i))) {
399          offset = i;
400          break;
401        }
402      }
403      int numTasks = paths.size();
404      boolean taskGrabbed = false;
405      for (int i = 0; i < numTasks; i++) {
406        while (!shouldStop) {
407          if (this.areSplittersAvailable()) {
408            if (LOG.isTraceEnabled()) {
409              LOG.trace("Current region server " + server.getServerName()
410                + " is ready to take more tasks, will get task list and try grab tasks again.");
411            }
412            int idx = (i + offset) % paths.size();
413            // don't call ZKSplitLog.getNodeName() because that will lead to
414            // double encoding of the path name
415            taskGrabbed |=
416              grabTask(ZNodePaths.joinZNode(watcher.getZNodePaths().splitLogZNode, paths.get(idx)));
417            break;
418          } else {
419            if (LOG.isTraceEnabled()) {
420              LOG.trace("Current region server " + server.getServerName() + " has "
421                + this.tasksInProgress.get() + " tasks in progress and can't take more.");
422            }
423            Thread.sleep(100);
424          }
425        }
426        if (shouldStop) {
427          return;
428        }
429      }
430      if (!taskGrabbed && !shouldStop) {
431        // do not grab any tasks, sleep a little bit to reduce zk request.
432        Thread.sleep(1000);
433      }
434      SplitLogCounters.tot_wkr_task_grabing.increment();
435      synchronized (taskReadySeq) {
436        while (seq_start == taskReadySeq.get()) {
437          taskReadySeq.wait(checkInterval);
438        }
439      }
440    }
441  }
442
443  private List<String> getTaskList() throws InterruptedException {
444    List<String> childrenPaths = null;
445    long sleepTime = 1000;
446    // It will be in loop till it gets the list of children or
447    // it will come out if worker thread exited.
448    while (!shouldStop) {
449      try {
450        childrenPaths =
451          ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().splitLogZNode);
452        if (childrenPaths != null) {
453          return childrenPaths;
454        }
455      } catch (KeeperException e) {
456        LOG.warn("Could not get children of znode " + watcher.getZNodePaths().splitLogZNode, e);
457      }
458      LOG.debug("Retry listChildren of znode " + watcher.getZNodePaths().splitLogZNode
459        + " after sleep for " + sleepTime + "ms!");
460      Thread.sleep(sleepTime);
461    }
462    return childrenPaths;
463  }
464
465  @Override
466  public void markCorrupted(Path rootDir, String name, FileSystem fs) {
467    ZKSplitLog.markCorrupted(rootDir, name, fs);
468  }
469
470  @Override
471  public boolean isReady() throws InterruptedException {
472    int result = -1;
473    try {
474      result = ZKUtil.checkExists(watcher, watcher.getZNodePaths().splitLogZNode);
475    } catch (KeeperException e) {
476      // ignore
477      LOG.warn(
478        "Exception when checking for " + watcher.getZNodePaths().splitLogZNode + " ... retrying",
479        e);
480    }
481    if (result == -1) {
482      LOG.info(watcher.getZNodePaths().splitLogZNode
483        + " znode does not exist, waiting for master to create");
484      Thread.sleep(1000);
485    }
486    return (result != -1);
487  }
488
489  @Override
490  public int getTaskReadySeq() {
491    return taskReadySeq.get();
492  }
493
494  @Override
495  public void registerListener() {
496    watcher.registerListener(this);
497  }
498
499  @Override
500  public void removeListener() {
501    watcher.unregisterListener(this);
502  }
503
504  @Override
505  public void stopProcessingTasks() {
506    this.shouldStop = true;
507
508  }
509
510  @Override
511  public boolean isStop() {
512    return shouldStop;
513  }
514
515  /**
516   * Asynchronous handler for zk get-data-set-watch on node results.
517   */
518  class GetDataAsyncCallback implements AsyncCallback.DataCallback {
519    private final Logger LOG = LoggerFactory.getLogger(GetDataAsyncCallback.class);
520
521    @Override
522    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
523      SplitLogCounters.tot_wkr_get_data_result.increment();
524      if (rc != 0) {
525        LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
526        getDataSetWatchFailure(path);
527        return;
528      }
529      data = ZKMetadata.removeMetaData(data);
530      getDataSetWatchSuccess(path, data);
531    }
532  }
533
534  /*
535   * Next part is related to WALSplitterHandler
536   */
537  /**
538   * endTask() can fail and the only way to recover out of it is for the
539   * {@link org.apache.hadoop.hbase.master.SplitLogManager} to timeout the task node. nn
540   */
541  @Override
542  public void endTask(SplitLogTask slt, LongAdder ctr, SplitTaskDetails details) {
543    ZkSplitTaskDetails zkDetails = (ZkSplitTaskDetails) details;
544    String task = zkDetails.getTaskNode();
545    int taskZKVersion = zkDetails.getCurTaskZKVersion().intValue();
546    try {
547      if (ZKUtil.setData(watcher, task, slt.toByteArray(), taskZKVersion)) {
548        LOG.info("successfully transitioned task " + task + " to final state " + slt);
549        ctr.increment();
550        return;
551      }
552      LOG.warn("failed to transistion task " + task + " to end state " + slt
553        + " because of version mismatch ");
554    } catch (KeeperException.BadVersionException bve) {
555      LOG.warn("transisition task " + task + " to " + slt + " failed because of version mismatch",
556        bve);
557    } catch (KeeperException.NoNodeException e) {
558      LOG.error(HBaseMarkers.FATAL,
559        "logic error - end task " + task + " " + slt + " failed because task doesn't exist", e);
560    } catch (KeeperException e) {
561      LOG.warn("failed to end task, " + task + " " + slt, e);
562    }
563    SplitLogCounters.tot_wkr_final_transition_failed.increment();
564  }
565
566  /**
567   * When ZK-based implementation wants to complete the task, it needs to know task znode and
568   * current znode cversion (needed for subsequent update operation).
569   */
570  public static class ZkSplitTaskDetails implements SplitTaskDetails {
571    private String taskNode;
572    private MutableInt curTaskZKVersion;
573
574    public ZkSplitTaskDetails() {
575    }
576
577    public ZkSplitTaskDetails(String taskNode, MutableInt curTaskZKVersion) {
578      this.taskNode = taskNode;
579      this.curTaskZKVersion = curTaskZKVersion;
580    }
581
582    public String getTaskNode() {
583      return taskNode;
584    }
585
586    public void setTaskNode(String taskNode) {
587      this.taskNode = taskNode;
588    }
589
590    public MutableInt getCurTaskZKVersion() {
591      return curTaskZKVersion;
592    }
593
594    public void setCurTaskZKVersion(MutableInt curTaskZKVersion) {
595      this.curTaskZKVersion = curTaskZKVersion;
596    }
597
598    @Override
599    public String getWALFile() {
600      return ZKSplitLog.getFileName(taskNode);
601    }
602  }
603
604}