View Javadoc

1   /**
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
23  
24  import java.io.IOException;
25  import java.io.InterruptedIOException;
26  import java.util.List;
27  import java.util.concurrent.atomic.AtomicLong;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.master.SplitLogManager;
35  import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
36  import org.apache.hadoop.hbase.util.CancelableProgressable;
37  import org.apache.hadoop.hbase.util.FSUtils;
38  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
39  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState;
40  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
41  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
42  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
43  import org.apache.hadoop.util.StringUtils;
44  import org.apache.zookeeper.AsyncCallback;
45  import org.apache.zookeeper.KeeperException;
46  import org.apache.zookeeper.data.Stat;
47  
48  /**
49   * This worker is spawned in every regionserver (should we also spawn one in
50   * the master?). The Worker waits for log splitting tasks to be put up by the
51   * {@link SplitLogManager} running in the master and races with other workers
52   * in other serves to acquire those tasks. The coordination is done via
53   * zookeeper. All the action takes place at /hbase/splitlog znode.
54   * <p>
55   * If a worker has successfully moved the task from state UNASSIGNED to
56   * OWNED then it owns the task. It keeps heart beating the manager by
57   * periodically moving the task from UNASSIGNED to OWNED state. On success it
58   * moves the task to TASK_DONE. On unrecoverable error it moves task state to
59   * ERR. If it cannot continue but wants the master to retry the task then it
60   * moves the task state to RESIGNED.
61   * <p>
62   * The manager can take a task away from a worker by moving the task from
63   * OWNED to UNASSIGNED. In the absence of a global lock there is a
64   * unavoidable race here - a worker might have just finished its task when it
65   * is stripped of its ownership. Here we rely on the idempotency of the log
66   * splitting task for correctness
67   */
68  public class SplitLogWorker extends ZooKeeperListener implements Runnable {
69    private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
70  
71    Thread worker;
72    private final String serverName;
73    private final TaskExecutor splitTaskExecutor;
74    private long zkretries;
75  
76    private Object taskReadyLock = new Object();
77    volatile int taskReadySeq = 0;
78    private volatile String currentTask = null;
79    private int currentVersion;
80    private volatile boolean exitWorker;
81    private Object grabTaskLock = new Object();
82    private boolean workerInGrabTask = false;
83  
84  
85    public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf,
86        String serverName, TaskExecutor splitTaskExecutor) {
87      super(watcher);
88      this.serverName = serverName;
89      this.splitTaskExecutor = splitTaskExecutor;
90      this.zkretries = conf.getLong("hbase.splitlog.zk.retries", 3);
91    }
92  
93    public SplitLogWorker(ZooKeeperWatcher watcher, final Configuration conf,
94        final String serverName) {
95      this(watcher, conf, serverName, new TaskExecutor () {
96        @Override
97        public Status exec(String filename, CancelableProgressable p) {
98          Path rootdir;
99          FileSystem fs;
100         try {
101           rootdir = FSUtils.getRootDir(conf);
102           fs = rootdir.getFileSystem(conf);
103         } catch (IOException e) {
104           LOG.warn("could not find root dir or fs", e);
105           return Status.RESIGNED;
106         }
107         // TODO have to correctly figure out when log splitting has been
108         // interrupted or has encountered a transient error and when it has
109         // encountered a bad non-retry-able persistent error.
110         try {          
111           String relativeLogPath = getRelativeLogPath(filename);
112           if (HLogSplitter.splitLogFile(rootdir,
113               fs.getFileStatus(new Path(rootdir, relativeLogPath)), fs, conf, p) == false) {
114             return Status.PREEMPTED;
115           }
116         } catch (InterruptedIOException iioe) {
117           LOG.warn("log splitting of " + filename + " interrupted, resigning",
118               iioe);
119           return Status.RESIGNED;
120         } catch (IOException e) {
121           Throwable cause = e.getCause();
122           if (cause instanceof InterruptedException) {
123             LOG.warn("log splitting of " + filename + " interrupted, resigning",
124                 e);
125             return Status.RESIGNED;
126           }
127           LOG.warn("log splitting of " + filename + " failed, returning error",
128               e);
129           return Status.ERR;
130         }
131         return Status.DONE;
132       }
133 
134       private String getRelativeLogPath(String logPath) {
135         StringBuilder sb = new StringBuilder();
136         String znodeDelimiter = Character.toString(Path.SEPARATOR_CHAR);
137         String[] filenameSplits = logPath.split(znodeDelimiter);
138         int len = filenameSplits.length;
139         String relativeLogPath = logPath;
140         if (len > 3) {
141           // the last three terms are .logs/server/log-file
142           relativeLogPath = sb.append(filenameSplits[len - 3]).append(znodeDelimiter)
143             .append(filenameSplits[len - 2]).append(znodeDelimiter)
144             .append(filenameSplits[len - 1]).toString();
145         }
146         return relativeLogPath;
147       }
148     });
149   }
150 
151   @Override
152   public void run() {
153    try {
154     LOG.info("SplitLogWorker " + this.serverName + " starting");
155     this.watcher.registerListener(this);
156     int res;
157     // wait for master to create the splitLogZnode
158     res = -1;
159     while (res == -1) {
160       try {
161         res = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
162       } catch (KeeperException e) {
163         // ignore
164         LOG.warn("Exception when checking for " + watcher.splitLogZNode +
165             " ... retrying", e);
166       }
167       if (res == -1) {
168         try {
169           LOG.info(watcher.splitLogZNode + " znode does not exist," +
170               " waiting for master to create one");
171           Thread.sleep(1000);
172         } catch (InterruptedException e) {
173           LOG.debug("Interrupted while waiting for " + watcher.splitLogZNode);
174           assert exitWorker == true;
175         }
176       }
177     }
178 
179     taskLoop();
180    } catch (Throwable t) {
181 	   // only a logical error can cause here. Printing it out 
182 	   // to make debugging easier
183 	   LOG.error("unexpected error ", t);
184    } finally {
185 	   LOG.info("SplitLogWorker " + this.serverName + " exiting");
186    }
187   }
188 
189   /**
190    * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task
191    * one at a time. This policy puts an upper-limit on the number of
192    * simultaneous log splitting that could be happening in a cluster.
193    * <p>
194    * Synchronization using {@link #task_ready_signal_seq} ensures that it will
195    * try to grab every task that has been put up
196    */
197   private void taskLoop() {
198     while (true) {
199       int seq_start = taskReadySeq;
200       List<String> paths = getTaskList();
201       if (paths == null) {
202         LOG.warn("Could not get tasks, did someone remove " +
203             this.watcher.splitLogZNode + " ... worker thread exiting.");
204         return;
205       }
206       int offset = (int)(Math.random() * paths.size());
207       for (int i = 0; i < paths.size(); i ++) {
208         int idx = (i + offset) % paths.size();
209         // don't call ZKSplitLog.getNodeName() because that will lead to
210         // double encoding of the path name
211         grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
212         if (exitWorker == true) {
213           return;
214         }
215       }
216       synchronized (taskReadyLock) {
217         while (seq_start == taskReadySeq) {
218           try {
219             taskReadyLock.wait();
220           } catch (InterruptedException e) {
221             LOG.info("SplitLogWorker interrupted while waiting for task," +
222               " exiting: " + e.toString());
223             assert exitWorker == true;
224             return;
225           }
226         }
227       }
228     }
229   }
230 
231   /**
232    * try to grab a 'lock' on the task zk node to own and execute the task.
233    * <p>
234    * @param path zk node for the task
235    */
236   private void grabTask(String path) {
237     Stat stat = new Stat();
238     long t = -1;
239     byte[] data;
240     synchronized (grabTaskLock) {
241       currentTask = path;
242       workerInGrabTask = true;
243       if (Thread.interrupted()) {
244         return;
245       }
246     }
247     try {
248       try {
249         if ((data = ZKUtil.getDataNoWatch(this.watcher, path, stat)) == null) {
250           tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
251           return;
252         }
253       } catch (KeeperException e) {
254         LOG.warn("Failed to get data for znode " + path, e);
255         tot_wkr_failed_to_grab_task_exception.incrementAndGet();
256         return;
257       }
258       if (TaskState.TASK_UNASSIGNED.equals(data) == false) {
259         tot_wkr_failed_to_grab_task_owned.incrementAndGet();
260         return;
261       }
262 
263       currentVersion = stat.getVersion();
264       if (attemptToOwnTask(true) == false) {
265         tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
266         return;
267       }
268 
269       if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
270         endTask(TaskState.TASK_DONE, tot_wkr_task_acquired_rescan);
271         return;
272       }
273       LOG.info("worker " + serverName + " acquired task " + path);
274       tot_wkr_task_acquired.incrementAndGet();
275       getDataSetWatchAsync();
276 
277       t = System.currentTimeMillis();
278       TaskExecutor.Status status;
279 
280       status = splitTaskExecutor.exec(ZKSplitLog.getFileName(currentTask),
281           new CancelableProgressable() {
282 
283         @Override
284         public boolean progress() {
285           if (attemptToOwnTask(false) == false) {
286             LOG.warn("Failed to heartbeat the task" + currentTask);
287             return false;
288           }
289           return true;
290         }
291       });
292       switch (status) {
293         case DONE:
294           endTask(TaskState.TASK_DONE, tot_wkr_task_done);
295           break;
296         case PREEMPTED:
297           tot_wkr_preempt_task.incrementAndGet();
298           LOG.warn("task execution prempted " + path);
299           break;
300         case ERR:
301           if (!exitWorker) {
302             endTask(TaskState.TASK_ERR, tot_wkr_task_err);
303             break;
304           }
305           // if the RS is exiting then there is probably a tons of stuff
306           // that can go wrong. Resign instead of signaling error.
307           //$FALL-THROUGH$
308         case RESIGNED:
309           if (exitWorker) {
310             LOG.info("task execution interrupted because worker is exiting " +
311                 path);
312             endTask(TaskState.TASK_RESIGNED, tot_wkr_task_resigned);
313           } else {
314             tot_wkr_preempt_task.incrementAndGet();
315             LOG.info("task execution interrupted via zk by manager " +
316                 path);
317           }
318           break;
319       }
320     } finally {
321       if (t > 0) {
322         LOG.info("worker " + serverName + " done with task " + path +
323             " in " + (System.currentTimeMillis() - t) + "ms");
324       }
325       synchronized (grabTaskLock) {
326         workerInGrabTask = false;
327         // clear the interrupt from stopTask() otherwise the next task will
328         // suffer
329         Thread.interrupted();
330       }
331     }
332     return;
333   }
334 
335   /**
336    * Try to own the task by transitioning the zk node data from UNASSIGNED to
337    * OWNED.
338    * <p>
339    * This method is also used to periodically heartbeat the task progress by
340    * transitioning the node from OWNED to OWNED.
341    * <p>
342    * @return true if task path is successfully locked
343    */
344   private boolean attemptToOwnTask(boolean isFirstTime) {
345     try {
346       Stat stat = this.watcher.getRecoverableZooKeeper().setData(currentTask,
347           TaskState.TASK_OWNED.get(serverName), currentVersion);
348       if (stat == null) {
349         LOG.warn("zk.setData() returned null for path " + currentTask);
350         tot_wkr_task_heartbeat_failed.incrementAndGet();
351         return (false);
352       }
353       currentVersion = stat.getVersion();
354       tot_wkr_task_heartbeat.incrementAndGet();
355       return (true);
356     } catch (KeeperException e) {
357       if (!isFirstTime) {
358         if (e.code().equals(KeeperException.Code.NONODE)) {
359           LOG.warn("NONODE failed to assert ownership for " + currentTask, e);
360         } else if (e.code().equals(KeeperException.Code.BADVERSION)) {
361           LOG.warn("BADVERSION failed to assert ownership for " +
362               currentTask, e);
363         } else {
364           LOG.warn("failed to assert ownership for " + currentTask, e);
365         }
366       }
367     } catch (InterruptedException e1) {
368       LOG.warn("Interrupted while trying to assert ownership of " +
369           currentTask + " " + StringUtils.stringifyException(e1));
370       Thread.currentThread().interrupt();
371     }
372     tot_wkr_task_heartbeat_failed.incrementAndGet();
373     return (false);
374   }
375 
376   /**
377    * endTask() can fail and the only way to recover out of it is for the
378    * {@link SplitLogManager} to timeout the task node.
379    * @param ts
380    * @param ctr
381    */
382   private void endTask(ZKSplitLog.TaskState ts, AtomicLong ctr) {
383     String path = currentTask;
384     currentTask = null;
385     try {
386       if (ZKUtil.setData(this.watcher, path, ts.get(serverName),
387           currentVersion)) {
388         LOG.info("successfully transitioned task " + path +
389             " to final state " + ts);
390         ctr.incrementAndGet();
391         return;
392       }
393       LOG.warn("failed to transistion task " + path + " to end state " + ts +
394           " because of version mismatch ");
395     } catch (KeeperException.BadVersionException bve) {
396       LOG.warn("transisition task " + path + " to " + ts +
397           " failed because of version mismatch", bve);
398     } catch (KeeperException.NoNodeException e) {
399       LOG.fatal("logic error - end task " + path + " " + ts +
400           " failed because task doesn't exist", e);
401     } catch (KeeperException e) {
402       LOG.warn("failed to end task, " + path + " " + ts, e);
403     }
404     tot_wkr_final_transistion_failed.incrementAndGet();
405     return;
406   }
407 
408   void getDataSetWatchAsync() {
409     this.watcher.getRecoverableZooKeeper().getZooKeeper().
410       getData(currentTask, this.watcher,
411       new GetDataAsyncCallback(), null);
412     tot_wkr_get_data_queued.incrementAndGet();
413   }
414 
415   void getDataSetWatchSuccess(String path, byte[] data) {
416     synchronized (grabTaskLock) {
417       if (workerInGrabTask) {
418         // currentTask can change but that's ok
419         String taskpath = currentTask;
420         if (taskpath != null && taskpath.equals(path)) {
421           // have to compare data. cannot compare version because then there
422           // will be race with attemptToOwnTask()
423           // cannot just check whether the node has been transitioned to
424           // UNASSIGNED because by the time this worker sets the data watch
425           // the node might have made two transitions - from owned by this
426           // worker to unassigned to owned by another worker
427           if (! TaskState.TASK_OWNED.equals(data, serverName) &&
428               ! TaskState.TASK_DONE.equals(data, serverName) &&
429               ! TaskState.TASK_ERR.equals(data, serverName) &&
430               ! TaskState.TASK_RESIGNED.equals(data, serverName)) {
431             LOG.info("task " + taskpath + " preempted from " +
432                 serverName + ", current task state and owner=" +
433                 new String(data));
434             stopTask();
435           }
436         }
437       }
438     }
439   }
440 
441   void getDataSetWatchFailure(String path) {
442     synchronized (grabTaskLock) {
443       if (workerInGrabTask) {
444         // currentTask can change but that's ok
445         String taskpath = currentTask;
446         if (taskpath != null && taskpath.equals(path)) {
447           LOG.info("retrying data watch on " + path);
448           tot_wkr_get_data_retry.incrementAndGet();
449           getDataSetWatchAsync();
450         } else {
451           // no point setting a watch on the task which this worker is not
452           // working upon anymore
453         }
454       }
455     }
456   }
457 
458 
459 
460 
461   @Override
462   public void nodeDataChanged(String path) {
463     // there will be a self generated dataChanged event every time attemptToOwnTask()
464     // heartbeats the task znode by upping its version
465     synchronized (grabTaskLock) {
466       if (workerInGrabTask) {
467         // currentTask can change
468         String taskpath = currentTask;
469         if (taskpath!= null && taskpath.equals(path)) {
470           getDataSetWatchAsync();
471         }
472       }
473     }
474   }
475 
476 
477   private List<String> getTaskList() {
478     List<String> childrenPaths = null;
479     long sleepTime = 1000;
480     // It will be in loop till it gets the list of children or
481     // it will come out if worker thread exited.
482     while (!exitWorker) {
483       try {
484         childrenPaths = ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
485             this.watcher.splitLogZNode);
486         if (childrenPaths != null) {
487           return childrenPaths;
488         }
489       } catch (KeeperException e) {
490         LOG.warn("Could not get children of znode "
491             + this.watcher.splitLogZNode, e);
492       }
493       try {
494         LOG.debug("Retry listChildren of znode " + this.watcher.splitLogZNode
495             + " after sleep for " + sleepTime + "ms!");
496         Thread.sleep(sleepTime);
497       } catch (InterruptedException e1) {
498         LOG.warn("Interrupted while trying to get task list ...", e1);
499         Thread.currentThread().interrupt();
500       }
501     }
502     return childrenPaths;
503   }
504 
505 
506   @Override
507   public void nodeChildrenChanged(String path) {
508     if(path.equals(watcher.splitLogZNode)) {
509       LOG.debug("tasks arrived or departed");
510       synchronized (taskReadyLock) {
511         taskReadySeq++;
512         taskReadyLock.notify();
513       }
514     }
515   }
516 
517   /**
518    * If the worker is doing a task i.e. splitting a log file then stop the task.
519    * It doesn't exit the worker thread.
520    */
521   void stopTask() {
522     LOG.info("Sending interrupt to stop the worker thread");
523     worker.interrupt(); // TODO interrupt often gets swallowed, do what else?
524   }
525 
526 
527   /**
528    * start the SplitLogWorker thread
529    */
530   public void start() {
531     worker = new Thread(null, this, "SplitLogWorker-" + serverName);
532     exitWorker = false;
533     worker.start();
534     return;
535   }
536 
537   /**
538    * stop the SplitLogWorker thread
539    */
540   public void stop() {
541     exitWorker = true;
542     stopTask();
543   }
544 
545   /**
546    * Asynchronous handler for zk get-data-set-watch on node results.
547    */
548   class GetDataAsyncCallback implements AsyncCallback.DataCallback {
549     private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
550 
551     @Override
552     public void processResult(int rc, String path, Object ctx, byte[] data,
553         Stat stat) {
554       tot_wkr_get_data_result.incrementAndGet();
555       if (rc != 0) {
556         LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
557         getDataSetWatchFailure(path);
558         return;
559       }
560       data = watcher.getRecoverableZooKeeper().removeMetaData(data);
561       getDataSetWatchSuccess(path, data);
562       return;
563     }
564   }
565 
566   /**
567    * Objects implementing this interface actually do the task that has been
568    * acquired by a {@link SplitLogWorker}. Since there isn't a water-tight
569    * guarantee that two workers will not be executing the same task therefore it
570    * is better to have workers prepare the task and then have the
571    * {@link SplitLogManager} commit the work in SplitLogManager.TaskFinisher
572    */
573   static public interface TaskExecutor {
574     static public enum Status {
575       DONE(),
576       ERR(),
577       RESIGNED(),
578       PREEMPTED();
579     }
580     public Status exec(String name, CancelableProgressable p);
581   }
582 }