001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.coordination;
021
022import java.util.Collections;
023import java.util.List;
024import java.util.concurrent.atomic.AtomicInteger;
025import java.util.concurrent.atomic.LongAdder;
026
027import org.apache.commons.lang3.RandomUtils;
028import org.apache.commons.lang3.mutable.MutableInt;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.SplitLogCounters;
035import org.apache.hadoop.hbase.SplitLogTask;
036import org.apache.hadoop.hbase.exceptions.DeserializationException;
037import org.apache.hadoop.hbase.log.HBaseMarkers;
038import org.apache.hadoop.hbase.regionserver.RegionServerServices;
039import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
040import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
041import org.apache.hadoop.hbase.regionserver.handler.WALSplitterHandler;
042import org.apache.hadoop.hbase.util.CancelableProgressable;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
045import org.apache.hadoop.hbase.zookeeper.ZKListener;
046import org.apache.hadoop.hbase.zookeeper.ZKMetadata;
047import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
048import org.apache.hadoop.hbase.zookeeper.ZKUtil;
049import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
050import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
051import org.apache.hadoop.util.StringUtils;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.apache.zookeeper.AsyncCallback;
054import org.apache.zookeeper.KeeperException;
055import org.apache.zookeeper.data.Stat;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059/**
060 * ZooKeeper based implementation of {@link SplitLogWorkerCoordination}
061 * It listen for changes in ZooKeeper and
062 *
063 */
064@InterfaceAudience.Private
065public class ZkSplitLogWorkerCoordination extends ZKListener implements
066    SplitLogWorkerCoordination {
067
068  private static final Logger LOG = LoggerFactory.getLogger(ZkSplitLogWorkerCoordination.class);
069
070  private static final int checkInterval = 5000; // 5 seconds
071  private static final int FAILED_TO_OWN_TASK = -1;
072
073  private  SplitLogWorker worker;
074
075  private TaskExecutor splitTaskExecutor;
076
077  private final AtomicInteger taskReadySeq = new AtomicInteger(0);
078  private volatile String currentTask = null;
079  private int currentVersion;
080  private volatile boolean shouldStop = false;
081  private final Object grabTaskLock = new Object();
082  private boolean workerInGrabTask = false;
083  private int reportPeriod;
084  private RegionServerServices server = null;
085  protected final AtomicInteger tasksInProgress = new AtomicInteger(0);
086  private int maxConcurrentTasks = 0;
087
088  private final ServerName serverName;
089
090  public ZkSplitLogWorkerCoordination(ServerName serverName, ZKWatcher watcher) {
091    super(watcher);
092    this.serverName = serverName;
093  }
094
095  /**
096   * Override handler from {@link ZKListener}
097   */
098  @Override
099  public void nodeChildrenChanged(String path) {
100    if (path.equals(watcher.znodePaths.splitLogZNode)) {
101      if (LOG.isTraceEnabled()) {
102        LOG.trace("tasks arrived or departed on " + path);
103      }
104      synchronized (taskReadySeq) {
105        this.taskReadySeq.incrementAndGet();
106        taskReadySeq.notify();
107      }
108    }
109  }
110
111  /**
112   * Override handler from {@link ZKListener}
113   */
114  @Override
115  public void nodeDataChanged(String path) {
116    // there will be a self generated dataChanged event every time attemptToOwnTask()
117    // heartbeats the task znode by upping its version
118    synchronized (grabTaskLock) {
119      if (workerInGrabTask) {
120        // currentTask can change
121        String taskpath = currentTask;
122        if (taskpath != null && taskpath.equals(path)) {
123          getDataSetWatchAsync();
124        }
125      }
126    }
127  }
128
129  /**
130   * Override setter from {@link SplitLogWorkerCoordination}
131   */
132  @Override
133  public void init(RegionServerServices server, Configuration conf,
134      TaskExecutor splitExecutor, SplitLogWorker worker) {
135    this.server = server;
136    this.worker = worker;
137    this.splitTaskExecutor = splitExecutor;
138    maxConcurrentTasks = conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS);
139    reportPeriod =
140        conf.getInt("hbase.splitlog.report.period",
141          conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
142            ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT) / 3);
143  }
144
145  /* Support functions for ZooKeeper async callback */
146
147  void getDataSetWatchFailure(String path) {
148    synchronized (grabTaskLock) {
149      if (workerInGrabTask) {
150        // currentTask can change but that's ok
151        String taskpath = currentTask;
152        if (taskpath != null && taskpath.equals(path)) {
153          LOG.info("retrying data watch on " + path);
154          SplitLogCounters.tot_wkr_get_data_retry.increment();
155          getDataSetWatchAsync();
156        } else {
157          // no point setting a watch on the task which this worker is not
158          // working upon anymore
159        }
160      }
161    }
162  }
163
164  public void getDataSetWatchAsync() {
165    watcher.getRecoverableZooKeeper().getZooKeeper()
166        .getData(currentTask, watcher, new GetDataAsyncCallback(), null);
167    SplitLogCounters.tot_wkr_get_data_queued.increment();
168  }
169
170  void getDataSetWatchSuccess(String path, byte[] data) {
171    SplitLogTask slt;
172    try {
173      slt = SplitLogTask.parseFrom(data);
174    } catch (DeserializationException e) {
175      LOG.warn("Failed parse", e);
176      return;
177    }
178    synchronized (grabTaskLock) {
179      if (workerInGrabTask) {
180        // currentTask can change but that's ok
181        String taskpath = currentTask;
182        if (taskpath != null && taskpath.equals(path)) {
183          // have to compare data. cannot compare version because then there
184          // will be race with attemptToOwnTask()
185          // cannot just check whether the node has been transitioned to
186          // UNASSIGNED because by the time this worker sets the data watch
187          // the node might have made two transitions - from owned by this
188          // worker to unassigned to owned by another worker
189          if (!slt.isOwned(serverName) && !slt.isDone(serverName) && !slt.isErr(serverName)
190              && !slt.isResigned(serverName)) {
191            LOG.info("task " + taskpath + " preempted from " + serverName
192                + ", current task state and owner=" + slt.toString());
193            worker.stopTask();
194          }
195        }
196      }
197    }
198  }
199
200  /**
201   * try to grab a 'lock' on the task zk node to own and execute the task.
202   * <p>
203   * @param path zk node for the task
204   * @return boolean value when grab a task success return true otherwise false
205   */
206  private boolean grabTask(String path) {
207    Stat stat = new Stat();
208    byte[] data;
209    synchronized (grabTaskLock) {
210      currentTask = path;
211      workerInGrabTask = true;
212      if (Thread.interrupted()) {
213        return false;
214      }
215    }
216    try {
217      try {
218        if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) {
219          SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.increment();
220          return false;
221        }
222      } catch (KeeperException e) {
223        LOG.warn("Failed to get data for znode " + path, e);
224        SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment();
225        return false;
226      }
227      SplitLogTask slt;
228      try {
229        slt = SplitLogTask.parseFrom(data);
230      } catch (DeserializationException e) {
231        LOG.warn("Failed parse data for znode " + path, e);
232        SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment();
233        return false;
234      }
235      if (!slt.isUnassigned()) {
236        SplitLogCounters.tot_wkr_failed_to_grab_task_owned.increment();
237        return false;
238      }
239
240      currentVersion =
241          attemptToOwnTask(true, watcher, server.getServerName(), path, stat.getVersion());
242      if (currentVersion < 0) {
243        SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.increment();
244        return false;
245      }
246
247      if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
248        ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails =
249            new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails();
250        splitTaskDetails.setTaskNode(currentTask);
251        splitTaskDetails.setCurTaskZKVersion(new MutableInt(currentVersion));
252
253        endTask(new SplitLogTask.Done(server.getServerName()),
254          SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails);
255        return false;
256      }
257
258      LOG.info("worker " + server.getServerName() + " acquired task " + path);
259      SplitLogCounters.tot_wkr_task_acquired.increment();
260      getDataSetWatchAsync();
261
262      submitTask(path, currentVersion, reportPeriod);
263
264      // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks
265      try {
266        int sleepTime = RandomUtils.nextInt(0, 500) + 500;
267        Thread.sleep(sleepTime);
268      } catch (InterruptedException e) {
269        LOG.warn("Interrupted while yielding for other region servers", e);
270        Thread.currentThread().interrupt();
271      }
272      return true;
273    } finally {
274      synchronized (grabTaskLock) {
275        workerInGrabTask = false;
276        // clear the interrupt from stopTask() otherwise the next task will
277        // suffer
278        Thread.interrupted();
279      }
280    }
281  }
282
283  /**
284   * Submit a log split task to executor service
285   * @param curTask task to submit
286   * @param curTaskZKVersion current version of task
287   */
288  void submitTask(final String curTask, final int curTaskZKVersion, final int reportPeriod) {
289    final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
290
291    CancelableProgressable reporter = new CancelableProgressable() {
292      private long last_report_at = 0;
293
294      @Override
295      public boolean progress() {
296        long t = EnvironmentEdgeManager.currentTime();
297        if ((t - last_report_at) > reportPeriod) {
298          last_report_at = t;
299          int latestZKVersion =
300              attemptToOwnTask(false, watcher, server.getServerName(), curTask,
301                zkVersion.intValue());
302          if (latestZKVersion < 0) {
303            LOG.warn("Failed to heartbeat the task" + curTask);
304            return false;
305          }
306          zkVersion.setValue(latestZKVersion);
307        }
308        return true;
309      }
310    };
311    ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails =
312        new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails();
313    splitTaskDetails.setTaskNode(curTask);
314    splitTaskDetails.setCurTaskZKVersion(zkVersion);
315
316    WALSplitterHandler hsh =
317        new WALSplitterHandler(server, this, splitTaskDetails, reporter,
318            this.tasksInProgress, splitTaskExecutor);
319    server.getExecutorService().submit(hsh);
320  }
321
322  /**
323   * This function calculates how many splitters this RS should create based on expected average
324   * tasks per RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br>
325   * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound)
326   * @param numTasks total number of split tasks available
327   * @return number of tasks this RS can grab
328   */
329  private int getNumExpectedTasksPerRS(int numTasks) {
330    // at lease one RS(itself) available
331    int availableRSs = 1;
332    try {
333      List<String> regionServers =
334          ZKUtil.listChildrenNoWatch(watcher, watcher.znodePaths.rsZNode);
335      availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size());
336    } catch (KeeperException e) {
337      // do nothing
338      LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
339    }
340    int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1);
341    return Math.max(1, expectedTasksPerRS); // at least be one
342  }
343
344  /**
345   * @param expectedTasksPerRS Average number of tasks to be handled by each RS
346   * @return true if more splitters are available, otherwise false.
347   */
348  private boolean areSplittersAvailable(int expectedTasksPerRS) {
349    return (Math.min(expectedTasksPerRS, maxConcurrentTasks)
350        - this.tasksInProgress.get()) > 0;
351  }
352
353  /**
354   * Try to own the task by transitioning the zk node data from UNASSIGNED to OWNED.
355   * <p>
356   * This method is also used to periodically heartbeat the task progress by transitioning the node
357   * from OWNED to OWNED.
358   * <p>
359   * @param isFirstTime shows whther it's the first attempt.
360   * @param zkw zk wathcer
361   * @param server name
362   * @param task to own
363   * @param taskZKVersion version of the task in zk
364   * @return non-negative integer value when task can be owned by current region server otherwise -1
365   */
366  protected static int attemptToOwnTask(boolean isFirstTime, ZKWatcher zkw,
367      ServerName server, String task, int taskZKVersion) {
368    int latestZKVersion = FAILED_TO_OWN_TASK;
369    try {
370      SplitLogTask slt = new SplitLogTask.Owned(server);
371      Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion);
372      if (stat == null) {
373        LOG.warn("zk.setData() returned null for path " + task);
374        SplitLogCounters.tot_wkr_task_heartbeat_failed.increment();
375        return FAILED_TO_OWN_TASK;
376      }
377      latestZKVersion = stat.getVersion();
378      SplitLogCounters.tot_wkr_task_heartbeat.increment();
379      return latestZKVersion;
380    } catch (KeeperException e) {
381      if (!isFirstTime) {
382        if (e.code().equals(KeeperException.Code.NONODE)) {
383          LOG.warn("NONODE failed to assert ownership for " + task, e);
384        } else if (e.code().equals(KeeperException.Code.BADVERSION)) {
385          LOG.warn("BADVERSION failed to assert ownership for " + task, e);
386        } else {
387          LOG.warn("failed to assert ownership for " + task, e);
388        }
389      }
390    } catch (InterruptedException e1) {
391      LOG.warn("Interrupted while trying to assert ownership of " + task + " "
392          + StringUtils.stringifyException(e1));
393      Thread.currentThread().interrupt();
394    }
395    SplitLogCounters.tot_wkr_task_heartbeat_failed.increment();
396    return FAILED_TO_OWN_TASK;
397  }
398
399  /**
400   * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task one at a time. This
401   * policy puts an upper-limit on the number of simultaneous log splitting that could be happening
402   * in a cluster.
403   * <p>
404   * Synchronization using <code>taskReadySeq</code> ensures that it will try to grab every task
405   * that has been put up
406   * @throws InterruptedException
407   */
408  @Override
409  public void taskLoop() throws InterruptedException {
410    while (!shouldStop) {
411      int seq_start = taskReadySeq.get();
412      List<String> paths;
413      paths = getTaskList();
414      if (paths == null) {
415        LOG.warn("Could not get tasks, did someone remove " + watcher.znodePaths.splitLogZNode
416            + " ... worker thread exiting.");
417        return;
418      }
419      // shuffle the paths to prevent different split log worker start from the same log file after
420      // meta log (if any)
421      Collections.shuffle(paths);
422      // pick meta wal firstly
423      int offset = 0;
424      for (int i = 0; i < paths.size(); i++) {
425        if (AbstractFSWALProvider.isMetaFile(paths.get(i))) {
426          offset = i;
427          break;
428        }
429      }
430      int numTasks = paths.size();
431      int expectedTasksPerRS = getNumExpectedTasksPerRS(numTasks);
432      boolean taskGrabbed = false;
433      for (int i = 0; i < numTasks; i++) {
434        while (!shouldStop) {
435          if (this.areSplittersAvailable(expectedTasksPerRS)) {
436            LOG.debug("Current region server " + server.getServerName()
437                + " is ready to take more tasks, will get task list and try grab tasks again.");
438            int idx = (i + offset) % paths.size();
439            // don't call ZKSplitLog.getNodeName() because that will lead to
440            // double encoding of the path name
441            taskGrabbed |= grabTask(ZNodePaths.joinZNode(
442                watcher.znodePaths.splitLogZNode, paths.get(idx)));
443            break;
444          } else {
445            LOG.debug("Current region server " + server.getServerName() + " has "
446                + this.tasksInProgress.get() + " tasks in progress and can't take more.");
447            Thread.sleep(100);
448          }
449        }
450        if (shouldStop) {
451          return;
452        }
453      }
454      if (!taskGrabbed && !shouldStop) {
455        // do not grab any tasks, sleep a little bit to reduce zk request.
456        Thread.sleep(1000);
457      }
458      SplitLogCounters.tot_wkr_task_grabing.increment();
459      synchronized (taskReadySeq) {
460        while (seq_start == taskReadySeq.get()) {
461          taskReadySeq.wait(checkInterval);
462        }
463      }
464    }
465  }
466
467  private List<String> getTaskList() throws InterruptedException {
468    List<String> childrenPaths = null;
469    long sleepTime = 1000;
470    // It will be in loop till it gets the list of children or
471    // it will come out if worker thread exited.
472    while (!shouldStop) {
473      try {
474        childrenPaths = ZKUtil.listChildrenAndWatchForNewChildren(watcher,
475          watcher.znodePaths.splitLogZNode);
476        if (childrenPaths != null) {
477          return childrenPaths;
478        }
479      } catch (KeeperException e) {
480        LOG.warn("Could not get children of znode " + watcher.znodePaths.splitLogZNode, e);
481      }
482      LOG.debug("Retry listChildren of znode " + watcher.znodePaths.splitLogZNode
483          + " after sleep for " + sleepTime + "ms!");
484      Thread.sleep(sleepTime);
485    }
486    return childrenPaths;
487  }
488
489  @Override
490  public void markCorrupted(Path rootDir, String name, FileSystem fs) {
491    ZKSplitLog.markCorrupted(rootDir, name, fs);
492  }
493
494  @Override
495  public boolean isReady() throws InterruptedException {
496    int result = -1;
497    try {
498      result = ZKUtil.checkExists(watcher, watcher.znodePaths.splitLogZNode);
499    } catch (KeeperException e) {
500      // ignore
501      LOG.warn("Exception when checking for " + watcher.znodePaths.splitLogZNode
502          + " ... retrying", e);
503    }
504    if (result == -1) {
505      LOG.info(watcher.znodePaths.splitLogZNode
506          + " znode does not exist, waiting for master to create");
507      Thread.sleep(1000);
508    }
509    return (result != -1);
510  }
511
512  @Override
513  public int getTaskReadySeq() {
514    return taskReadySeq.get();
515  }
516
517  @Override
518  public void registerListener() {
519    watcher.registerListener(this);
520  }
521
522  @Override
523  public void removeListener() {
524    watcher.unregisterListener(this);
525  }
526
527
528  @Override
529  public void stopProcessingTasks() {
530    this.shouldStop = true;
531
532  }
533
534  @Override
535  public boolean isStop() {
536    return shouldStop;
537  }
538
539  /**
540   * Asynchronous handler for zk get-data-set-watch on node results.
541   */
542  class GetDataAsyncCallback implements AsyncCallback.DataCallback {
543    private final Logger LOG = LoggerFactory.getLogger(GetDataAsyncCallback.class);
544
545    @Override
546    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
547      SplitLogCounters.tot_wkr_get_data_result.increment();
548      if (rc != 0) {
549        LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
550        getDataSetWatchFailure(path);
551        return;
552      }
553      data = ZKMetadata.removeMetaData(data);
554      getDataSetWatchSuccess(path, data);
555    }
556  }
557
558  /*
559   * Next part is related to WALSplitterHandler
560   */
561  /**
562   * endTask() can fail and the only way to recover out of it is for the
563   * {@link org.apache.hadoop.hbase.master.SplitLogManager} to timeout the task node.
564   * @param slt
565   * @param ctr
566   */
567  @Override
568  public void endTask(SplitLogTask slt, LongAdder ctr, SplitTaskDetails details) {
569    ZkSplitTaskDetails zkDetails = (ZkSplitTaskDetails) details;
570    String task = zkDetails.getTaskNode();
571    int taskZKVersion = zkDetails.getCurTaskZKVersion().intValue();
572    try {
573      if (ZKUtil.setData(watcher, task, slt.toByteArray(), taskZKVersion)) {
574        LOG.info("successfully transitioned task " + task + " to final state " + slt);
575        if (ctr != null) {
576          ctr.increment();
577        }
578        return;
579      }
580      LOG.warn("failed to transistion task " + task + " to end state " + slt
581          + " because of version mismatch ");
582    } catch (KeeperException.BadVersionException bve) {
583      LOG.warn("transisition task " + task + " to " + slt + " failed because of version mismatch",
584        bve);
585    } catch (KeeperException.NoNodeException e) {
586      LOG.error(HBaseMarkers.FATAL,
587        "logic error - end task " + task + " " + slt + " failed because task doesn't exist", e);
588    } catch (KeeperException e) {
589      LOG.warn("failed to end task, " + task + " " + slt, e);
590    }
591    SplitLogCounters.tot_wkr_final_transition_failed.increment();
592  }
593
594  /**
595   * When ZK-based implementation wants to complete the task, it needs to know task znode and
596   * current znode cversion (needed for subsequent update operation).
597   */
598  public static class ZkSplitTaskDetails implements SplitTaskDetails {
599    private String taskNode;
600    private MutableInt curTaskZKVersion;
601
602    public ZkSplitTaskDetails() {
603    }
604
605    public ZkSplitTaskDetails(String taskNode, MutableInt curTaskZKVersion) {
606      this.taskNode = taskNode;
607      this.curTaskZKVersion = curTaskZKVersion;
608    }
609
610    public String getTaskNode() {
611      return taskNode;
612    }
613
614    public void setTaskNode(String taskNode) {
615      this.taskNode = taskNode;
616    }
617
618    public MutableInt getCurTaskZKVersion() {
619      return curTaskZKVersion;
620    }
621
622    public void setCurTaskZKVersion(MutableInt curTaskZKVersion) {
623      this.curTaskZKVersion = curTaskZKVersion;
624    }
625
626    @Override
627    public String getWALFile() {
628      return ZKSplitLog.getFileName(taskNode);
629    }
630  }
631
632}