001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.coordination;
021
022import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
024
025import java.util.Collections;
026import java.util.List;
027import java.util.concurrent.atomic.AtomicInteger;
028import java.util.concurrent.atomic.LongAdder;
029
030import org.apache.commons.lang3.RandomUtils;
031import org.apache.commons.lang3.mutable.MutableInt;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.SplitLogCounters;
038import org.apache.hadoop.hbase.SplitLogTask;
039import org.apache.hadoop.hbase.exceptions.DeserializationException;
040import org.apache.hadoop.hbase.log.HBaseMarkers;
041import org.apache.hadoop.hbase.regionserver.RegionServerServices;
042import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
043import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
044import org.apache.hadoop.hbase.regionserver.handler.WALSplitterHandler;
045import org.apache.hadoop.hbase.util.CancelableProgressable;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
048import org.apache.hadoop.hbase.zookeeper.ZKListener;
049import org.apache.hadoop.hbase.zookeeper.ZKMetadata;
050import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
051import org.apache.hadoop.hbase.zookeeper.ZKUtil;
052import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
053import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
054import org.apache.hadoop.util.StringUtils;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.apache.zookeeper.AsyncCallback;
057import org.apache.zookeeper.KeeperException;
058import org.apache.zookeeper.data.Stat;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062/**
063 * ZooKeeper based implementation of {@link SplitLogWorkerCoordination}
064 * It listen for changes in ZooKeeper and
065 *
066 */
067@InterfaceAudience.Private
068public class ZkSplitLogWorkerCoordination extends ZKListener implements
069    SplitLogWorkerCoordination {
070
071  private static final Logger LOG = LoggerFactory.getLogger(ZkSplitLogWorkerCoordination.class);
072
073  private static final int checkInterval = 5000; // 5 seconds
074  private static final int FAILED_TO_OWN_TASK = -1;
075
076  private  SplitLogWorker worker;
077
078  private TaskExecutor splitTaskExecutor;
079
080  private final AtomicInteger taskReadySeq = new AtomicInteger(0);
081  private volatile String currentTask = null;
082  private int currentVersion;
083  private volatile boolean shouldStop = false;
084  private final Object grabTaskLock = new Object();
085  private boolean workerInGrabTask = false;
086  private int reportPeriod;
087  private RegionServerServices server = null;
088  protected final AtomicInteger tasksInProgress = new AtomicInteger(0);
089  private int maxConcurrentTasks = 0;
090
091  private final ServerName serverName;
092
093  public ZkSplitLogWorkerCoordination(ServerName serverName, ZKWatcher watcher) {
094    super(watcher);
095    this.serverName = serverName;
096  }
097
098  /**
099   * Override handler from {@link ZKListener}
100   */
101  @Override
102  public void nodeChildrenChanged(String path) {
103    if (path.equals(watcher.getZNodePaths().splitLogZNode)) {
104      if (LOG.isTraceEnabled()) {
105        LOG.trace("tasks arrived or departed on " + path);
106      }
107      synchronized (taskReadySeq) {
108        this.taskReadySeq.incrementAndGet();
109        taskReadySeq.notify();
110      }
111    }
112  }
113
114  /**
115   * Override handler from {@link ZKListener}
116   */
117  @Override
118  public void nodeDataChanged(String path) {
119    // there will be a self generated dataChanged event every time attemptToOwnTask()
120    // heartbeats the task znode by upping its version
121    synchronized (grabTaskLock) {
122      if (workerInGrabTask) {
123        // currentTask can change
124        String taskpath = currentTask;
125        if (taskpath != null && taskpath.equals(path)) {
126          getDataSetWatchAsync();
127        }
128      }
129    }
130  }
131
132  /**
133   * Override setter from {@link SplitLogWorkerCoordination}
134   */
135  @Override
136  public void init(RegionServerServices server, Configuration conf,
137      TaskExecutor splitExecutor, SplitLogWorker worker) {
138    this.server = server;
139    this.worker = worker;
140    this.splitTaskExecutor = splitExecutor;
141    maxConcurrentTasks =
142        conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
143    reportPeriod =
144        conf.getInt("hbase.splitlog.report.period",
145          conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
146            ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT) / 3);
147  }
148
149  /* Support functions for ZooKeeper async callback */
150
151  void getDataSetWatchFailure(String path) {
152    synchronized (grabTaskLock) {
153      if (workerInGrabTask) {
154        // currentTask can change but that's ok
155        String taskpath = currentTask;
156        if (taskpath != null && taskpath.equals(path)) {
157          LOG.info("retrying data watch on " + path);
158          SplitLogCounters.tot_wkr_get_data_retry.increment();
159          getDataSetWatchAsync();
160        } else {
161          // no point setting a watch on the task which this worker is not
162          // working upon anymore
163        }
164      }
165    }
166  }
167
168  public void getDataSetWatchAsync() {
169    watcher.getRecoverableZooKeeper().getZooKeeper()
170        .getData(currentTask, watcher, new GetDataAsyncCallback(), null);
171    SplitLogCounters.tot_wkr_get_data_queued.increment();
172  }
173
174  void getDataSetWatchSuccess(String path, byte[] data) {
175    SplitLogTask slt;
176    try {
177      slt = SplitLogTask.parseFrom(data);
178    } catch (DeserializationException e) {
179      LOG.warn("Failed parse", e);
180      return;
181    }
182    synchronized (grabTaskLock) {
183      if (workerInGrabTask) {
184        // currentTask can change but that's ok
185        String taskpath = currentTask;
186        if (taskpath != null && taskpath.equals(path)) {
187          // have to compare data. cannot compare version because then there
188          // will be race with attemptToOwnTask()
189          // cannot just check whether the node has been transitioned to
190          // UNASSIGNED because by the time this worker sets the data watch
191          // the node might have made two transitions - from owned by this
192          // worker to unassigned to owned by another worker
193          if (!slt.isOwned(serverName) && !slt.isDone(serverName) && !slt.isErr(serverName)
194              && !slt.isResigned(serverName)) {
195            LOG.info("task " + taskpath + " preempted from " + serverName
196                + ", current task state and owner=" + slt.toString());
197            worker.stopTask();
198          }
199        }
200      }
201    }
202  }
203
204  /**
205   * try to grab a 'lock' on the task zk node to own and execute the task.
206   * <p>
207   * @param path zk node for the task
208   * @return boolean value when grab a task success return true otherwise false
209   */
210  private boolean grabTask(String path) {
211    Stat stat = new Stat();
212    byte[] data;
213    synchronized (grabTaskLock) {
214      currentTask = path;
215      workerInGrabTask = true;
216      if (Thread.interrupted()) {
217        return false;
218      }
219    }
220    try {
221      try {
222        if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) {
223          SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.increment();
224          return false;
225        }
226      } catch (KeeperException e) {
227        LOG.warn("Failed to get data for znode " + path, e);
228        SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment();
229        return false;
230      }
231      SplitLogTask slt;
232      try {
233        slt = SplitLogTask.parseFrom(data);
234      } catch (DeserializationException e) {
235        LOG.warn("Failed parse data for znode " + path, e);
236        SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment();
237        return false;
238      }
239      if (!slt.isUnassigned()) {
240        SplitLogCounters.tot_wkr_failed_to_grab_task_owned.increment();
241        return false;
242      }
243
244      currentVersion =
245          attemptToOwnTask(true, watcher, server.getServerName(), path, stat.getVersion());
246      if (currentVersion < 0) {
247        SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.increment();
248        return false;
249      }
250
251      if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
252        ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails =
253            new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails();
254        splitTaskDetails.setTaskNode(currentTask);
255        splitTaskDetails.setCurTaskZKVersion(new MutableInt(currentVersion));
256
257        endTask(new SplitLogTask.Done(server.getServerName()),
258          SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails);
259        return false;
260      }
261
262      LOG.info("worker " + server.getServerName() + " acquired task " + path);
263      SplitLogCounters.tot_wkr_task_acquired.increment();
264      getDataSetWatchAsync();
265
266      submitTask(path, currentVersion, reportPeriod);
267
268      // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks
269      try {
270        int sleepTime = RandomUtils.nextInt(0, 500) + 500;
271        Thread.sleep(sleepTime);
272      } catch (InterruptedException e) {
273        LOG.warn("Interrupted while yielding for other region servers", e);
274        Thread.currentThread().interrupt();
275      }
276      return true;
277    } finally {
278      synchronized (grabTaskLock) {
279        workerInGrabTask = false;
280        // clear the interrupt from stopTask() otherwise the next task will
281        // suffer
282        Thread.interrupted();
283      }
284    }
285  }
286
287  /**
288   * Submit a log split task to executor service
289   * @param curTask task to submit
290   * @param curTaskZKVersion current version of task
291   */
292  void submitTask(final String curTask, final int curTaskZKVersion, final int reportPeriod) {
293    final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
294
295    CancelableProgressable reporter = new CancelableProgressable() {
296      private long last_report_at = 0;
297
298      @Override
299      public boolean progress() {
300        long t = EnvironmentEdgeManager.currentTime();
301        if ((t - last_report_at) > reportPeriod) {
302          last_report_at = t;
303          int latestZKVersion =
304              attemptToOwnTask(false, watcher, server.getServerName(), curTask,
305                zkVersion.intValue());
306          if (latestZKVersion < 0) {
307            LOG.warn("Failed to heartbeat the task" + curTask);
308            return false;
309          }
310          zkVersion.setValue(latestZKVersion);
311        }
312        return true;
313      }
314    };
315    ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails =
316        new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails();
317    splitTaskDetails.setTaskNode(curTask);
318    splitTaskDetails.setCurTaskZKVersion(zkVersion);
319
320    WALSplitterHandler hsh =
321        new WALSplitterHandler(server, this, splitTaskDetails, reporter,
322            this.tasksInProgress, splitTaskExecutor);
323    server.getExecutorService().submit(hsh);
324  }
325
326  /**
327   * @return true if more splitters are available, otherwise false.
328   */
329  private boolean areSplittersAvailable() {
330    return maxConcurrentTasks - tasksInProgress.get() > 0;
331  }
332
333  /**
334   * Try to own the task by transitioning the zk node data from UNASSIGNED to OWNED.
335   * <p>
336   * This method is also used to periodically heartbeat the task progress by transitioning the node
337   * from OWNED to OWNED.
338   * <p>
339   * @param isFirstTime shows whther it's the first attempt.
340   * @param zkw zk wathcer
341   * @param server name
342   * @param task to own
343   * @param taskZKVersion version of the task in zk
344   * @return non-negative integer value when task can be owned by current region server otherwise -1
345   */
346  protected static int attemptToOwnTask(boolean isFirstTime, ZKWatcher zkw,
347      ServerName server, String task, int taskZKVersion) {
348    int latestZKVersion = FAILED_TO_OWN_TASK;
349    try {
350      SplitLogTask slt = new SplitLogTask.Owned(server);
351      Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion);
352      if (stat == null) {
353        LOG.warn("zk.setData() returned null for path " + task);
354        SplitLogCounters.tot_wkr_task_heartbeat_failed.increment();
355        return FAILED_TO_OWN_TASK;
356      }
357      latestZKVersion = stat.getVersion();
358      SplitLogCounters.tot_wkr_task_heartbeat.increment();
359      return latestZKVersion;
360    } catch (KeeperException e) {
361      if (!isFirstTime) {
362        if (e.code().equals(KeeperException.Code.NONODE)) {
363          LOG.warn("NONODE failed to assert ownership for " + task, e);
364        } else if (e.code().equals(KeeperException.Code.BADVERSION)) {
365          LOG.warn("BADVERSION failed to assert ownership for " + task, e);
366        } else {
367          LOG.warn("failed to assert ownership for " + task, e);
368        }
369      }
370    } catch (InterruptedException e1) {
371      LOG.warn("Interrupted while trying to assert ownership of " + task + " "
372          + StringUtils.stringifyException(e1));
373      Thread.currentThread().interrupt();
374    }
375    SplitLogCounters.tot_wkr_task_heartbeat_failed.increment();
376    return FAILED_TO_OWN_TASK;
377  }
378
379  /**
380   * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task one at a time. This
381   * policy puts an upper-limit on the number of simultaneous log splitting that could be happening
382   * in a cluster.
383   * <p>
384   * Synchronization using <code>taskReadySeq</code> ensures that it will try to grab every task
385   * that has been put up
386   * @throws InterruptedException
387   */
388  @Override
389  public void taskLoop() throws InterruptedException {
390    while (!shouldStop) {
391      int seq_start = taskReadySeq.get();
392      List<String> paths;
393      paths = getTaskList();
394      if (paths == null) {
395        LOG.warn("Could not get tasks, did someone remove " + watcher.getZNodePaths().splitLogZNode
396            + " ... worker thread exiting.");
397        return;
398      }
399      // shuffle the paths to prevent different split log worker start from the same log file after
400      // meta log (if any)
401      Collections.shuffle(paths);
402      // pick meta wal firstly
403      int offset = 0;
404      for (int i = 0; i < paths.size(); i++) {
405        if (AbstractFSWALProvider.isMetaFile(paths.get(i))) {
406          offset = i;
407          break;
408        }
409      }
410      int numTasks = paths.size();
411      boolean taskGrabbed = false;
412      for (int i = 0; i < numTasks; i++) {
413        while (!shouldStop) {
414          if (this.areSplittersAvailable()) {
415            if (LOG.isTraceEnabled()) {
416              LOG.trace("Current region server " + server.getServerName()
417                + " is ready to take more tasks, will get task list and try grab tasks again.");
418            }
419            int idx = (i + offset) % paths.size();
420            // don't call ZKSplitLog.getNodeName() because that will lead to
421            // double encoding of the path name
422            taskGrabbed |= grabTask(ZNodePaths.joinZNode(
423                watcher.getZNodePaths().splitLogZNode, paths.get(idx)));
424            break;
425          } else {
426            if (LOG.isTraceEnabled()) {
427              LOG.trace("Current region server " + server.getServerName() + " has "
428                + this.tasksInProgress.get() + " tasks in progress and can't take more.");
429            }
430            Thread.sleep(100);
431          }
432        }
433        if (shouldStop) {
434          return;
435        }
436      }
437      if (!taskGrabbed && !shouldStop) {
438        // do not grab any tasks, sleep a little bit to reduce zk request.
439        Thread.sleep(1000);
440      }
441      SplitLogCounters.tot_wkr_task_grabing.increment();
442      synchronized (taskReadySeq) {
443        while (seq_start == taskReadySeq.get()) {
444          taskReadySeq.wait(checkInterval);
445        }
446      }
447    }
448  }
449
450  private List<String> getTaskList() throws InterruptedException {
451    List<String> childrenPaths = null;
452    long sleepTime = 1000;
453    // It will be in loop till it gets the list of children or
454    // it will come out if worker thread exited.
455    while (!shouldStop) {
456      try {
457        childrenPaths = ZKUtil.listChildrenAndWatchForNewChildren(watcher,
458          watcher.getZNodePaths().splitLogZNode);
459        if (childrenPaths != null) {
460          return childrenPaths;
461        }
462      } catch (KeeperException e) {
463        LOG.warn("Could not get children of znode " + watcher.getZNodePaths().splitLogZNode, e);
464      }
465      LOG.debug("Retry listChildren of znode " + watcher.getZNodePaths().splitLogZNode
466          + " after sleep for " + sleepTime + "ms!");
467      Thread.sleep(sleepTime);
468    }
469    return childrenPaths;
470  }
471
472  @Override
473  public void markCorrupted(Path rootDir, String name, FileSystem fs) {
474    ZKSplitLog.markCorrupted(rootDir, name, fs);
475  }
476
477  @Override
478  public boolean isReady() throws InterruptedException {
479    int result = -1;
480    try {
481      result = ZKUtil.checkExists(watcher, watcher.getZNodePaths().splitLogZNode);
482    } catch (KeeperException e) {
483      // ignore
484      LOG.warn("Exception when checking for " + watcher.getZNodePaths().splitLogZNode
485          + " ... retrying", e);
486    }
487    if (result == -1) {
488      LOG.info(watcher.getZNodePaths().splitLogZNode
489          + " znode does not exist, waiting for master to create");
490      Thread.sleep(1000);
491    }
492    return (result != -1);
493  }
494
495  @Override
496  public int getTaskReadySeq() {
497    return taskReadySeq.get();
498  }
499
500  @Override
501  public void registerListener() {
502    watcher.registerListener(this);
503  }
504
505  @Override
506  public void removeListener() {
507    watcher.unregisterListener(this);
508  }
509
510
511  @Override
512  public void stopProcessingTasks() {
513    this.shouldStop = true;
514
515  }
516
517  @Override
518  public boolean isStop() {
519    return shouldStop;
520  }
521
522  /**
523   * Asynchronous handler for zk get-data-set-watch on node results.
524   */
525  class GetDataAsyncCallback implements AsyncCallback.DataCallback {
526    private final Logger LOG = LoggerFactory.getLogger(GetDataAsyncCallback.class);
527
528    @Override
529    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
530      SplitLogCounters.tot_wkr_get_data_result.increment();
531      if (rc != 0) {
532        LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
533        getDataSetWatchFailure(path);
534        return;
535      }
536      data = ZKMetadata.removeMetaData(data);
537      getDataSetWatchSuccess(path, data);
538    }
539  }
540
541  /*
542   * Next part is related to WALSplitterHandler
543   */
544  /**
545   * endTask() can fail and the only way to recover out of it is for the
546   * {@link org.apache.hadoop.hbase.master.SplitLogManager} to timeout the task node.
547   * @param slt
548   * @param ctr
549   */
550  @Override
551  public void endTask(SplitLogTask slt, LongAdder ctr, SplitTaskDetails details) {
552    ZkSplitTaskDetails zkDetails = (ZkSplitTaskDetails) details;
553    String task = zkDetails.getTaskNode();
554    int taskZKVersion = zkDetails.getCurTaskZKVersion().intValue();
555    try {
556      if (ZKUtil.setData(watcher, task, slt.toByteArray(), taskZKVersion)) {
557        LOG.info("successfully transitioned task " + task + " to final state " + slt);
558        ctr.increment();
559        return;
560      }
561      LOG.warn("failed to transistion task " + task + " to end state " + slt
562          + " because of version mismatch ");
563    } catch (KeeperException.BadVersionException bve) {
564      LOG.warn("transisition task " + task + " to " + slt + " failed because of version mismatch",
565        bve);
566    } catch (KeeperException.NoNodeException e) {
567      LOG.error(HBaseMarkers.FATAL,
568        "logic error - end task " + task + " " + slt + " failed because task doesn't exist", e);
569    } catch (KeeperException e) {
570      LOG.warn("failed to end task, " + task + " " + slt, e);
571    }
572    SplitLogCounters.tot_wkr_final_transition_failed.increment();
573  }
574
575  /**
576   * When ZK-based implementation wants to complete the task, it needs to know task znode and
577   * current znode cversion (needed for subsequent update operation).
578   */
579  public static class ZkSplitTaskDetails implements SplitTaskDetails {
580    private String taskNode;
581    private MutableInt curTaskZKVersion;
582
583    public ZkSplitTaskDetails() {
584    }
585
586    public ZkSplitTaskDetails(String taskNode, MutableInt curTaskZKVersion) {
587      this.taskNode = taskNode;
588      this.curTaskZKVersion = curTaskZKVersion;
589    }
590
591    public String getTaskNode() {
592      return taskNode;
593    }
594
595    public void setTaskNode(String taskNode) {
596      this.taskNode = taskNode;
597    }
598
599    public MutableInt getCurTaskZKVersion() {
600      return curTaskZKVersion;
601    }
602
603    public void setCurTaskZKVersion(MutableInt curTaskZKVersion) {
604      this.curTaskZKVersion = curTaskZKVersion;
605    }
606
607    @Override
608    public String getWALFile() {
609      return ZKSplitLog.getFileName(taskNode);
610    }
611  }
612
613}