1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.net.ConnectException;
24  import java.net.SocketTimeoutException;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.concurrent.atomic.AtomicLong;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.classification.InterfaceAudience;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.hbase.ServerName;
37  import org.apache.hadoop.hbase.SplitLogCounters;
38  import org.apache.hadoop.hbase.SplitLogTask;
39  import org.apache.hadoop.hbase.client.RetriesExhaustedException;
40  import org.apache.hadoop.hbase.exceptions.DeserializationException;
41  import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
42  import org.apache.hadoop.hbase.master.SplitLogManager;
43  import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
44  import org.apache.hadoop.hbase.util.CancelableProgressable;
45  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
46  import org.apache.hadoop.hbase.util.FSUtils;
47  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
48  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
49  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
50  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
51  import org.apache.hadoop.util.StringUtils;
52  import org.apache.zookeeper.AsyncCallback;
53  import org.apache.zookeeper.KeeperException;
54  import org.apache.zookeeper.data.Stat;
55  
56  /**
57   * This worker is spawned in every regionserver (should we also spawn one in
58   * the master?). The Worker waits for log splitting tasks to be put up by the
59   * {@link SplitLogManager} running in the master and races with other workers
60   * in other serves to acquire those tasks. The coordination is done via
61   * zookeeper. All the action takes place at /hbase/splitlog znode.
62   * <p>
63   * If a worker has successfully moved the task from state UNASSIGNED to
64   * OWNED then it owns the task. It keeps heart beating the manager by
65   * periodically moving the task from UNASSIGNED to OWNED state. On success it
66   * moves the task to TASK_DONE. On unrecoverable error it moves task state to
67   * ERR. If it cannot continue but wants the master to retry the task then it
68   * moves the task state to RESIGNED.
69   * <p>
70   * The manager can take a task away from a worker by moving the task from
71   * OWNED to UNASSIGNED. In the absence of a global lock there is a
72   * unavoidable race here - a worker might have just finished its task when it
73   * is stripped of its ownership. Here we rely on the idempotency of the log
74   * splitting task for correctness
75   */
76  @InterfaceAudience.Private
77  public class SplitLogWorker extends ZooKeeperListener implements Runnable {
78    private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
79    private static final int checkInterval = 5000; // 5 seconds
80  
81    Thread worker;
82    private final ServerName serverName;
83    private final TaskExecutor splitTaskExecutor;
84  
85    private final Object taskReadyLock = new Object();
86    volatile int taskReadySeq = 0;
87    private volatile String currentTask = null;
88    private int currentVersion;
89    private volatile boolean exitWorker;
90    private final Object grabTaskLock = new Object();
91    private boolean workerInGrabTask = false;
92    private final int report_period;
93    private RegionServerServices server = null;
94  
95    public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf,
96        RegionServerServices server, TaskExecutor splitTaskExecutor) {
97      super(watcher);
98      this.server = server;
99      this.serverName = server.getServerName();
100     this.splitTaskExecutor = splitTaskExecutor;
101     report_period = conf.getInt("hbase.splitlog.report.period",
102       conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
103   }
104 
105   public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, ServerName serverName,
106       TaskExecutor splitTaskExecutor) {
107     super(watcher);
108     this.serverName = serverName;
109     this.splitTaskExecutor = splitTaskExecutor;
110     report_period = conf.getInt("hbase.splitlog.report.period",
111       conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
112   }
113 
114   public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf,
115       RegionServerServices server, final LastSequenceId sequenceIdChecker) {
116     this(watcher, conf, server, new TaskExecutor() {
117       @Override
118       public Status exec(String filename, CancelableProgressable p) {
119         Path rootdir;
120         FileSystem fs;
121         try {
122           rootdir = FSUtils.getRootDir(conf);
123           fs = rootdir.getFileSystem(conf);
124         } catch (IOException e) {
125           LOG.warn("could not find root dir or fs", e);
126           return Status.RESIGNED;
127         }
128         // TODO have to correctly figure out when log splitting has been
129         // interrupted or has encountered a transient error and when it has
130         // encountered a bad non-retry-able persistent error.
131         try {
132           if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
133             fs, conf, p, sequenceIdChecker, watcher)) {
134             return Status.PREEMPTED;
135           }
136         } catch (InterruptedIOException iioe) {
137           LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe);
138           return Status.RESIGNED;
139         } catch (IOException e) {
140           Throwable cause = e.getCause();
141           if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException 
142                   || cause instanceof ConnectException 
143                   || cause instanceof SocketTimeoutException)) {
144             LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, "
145             		+ "resigning", e);
146             return Status.RESIGNED;
147           } else if (cause instanceof InterruptedException) {
148             LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
149             return Status.RESIGNED;
150           } else if(cause instanceof KeeperException) {
151             LOG.warn("log splitting of " + filename + " hit ZooKeeper issue, resigning", e);
152             return Status.RESIGNED;
153           }
154           LOG.warn("log splitting of " + filename + " failed, returning error", e);
155           return Status.ERR;
156         }
157         return Status.DONE;
158       }
159     });
160   }
161 
162   @Override
163   public void run() {
164     try {
165       LOG.info("SplitLogWorker " + this.serverName + " starting");
166       this.watcher.registerListener(this);
167       int res;
168       // wait for master to create the splitLogZnode
169       res = -1;
170       while (res == -1 && !exitWorker) {
171         try {
172           res = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
173         } catch (KeeperException e) {
174           // ignore
175           LOG.warn("Exception when checking for " + watcher.splitLogZNode  + " ... retrying", e);
176         }
177         if (res == -1) {
178           try {
179             LOG.info(watcher.splitLogZNode + " znode does not exist, waiting for master to create");
180             Thread.sleep(1000);
181           } catch (InterruptedException e) {
182             LOG.debug("Interrupted while waiting for " + watcher.splitLogZNode
183                 + (exitWorker ? "" : " (ERROR: exitWorker is not set, " +
184                 "exiting anyway)"));
185             exitWorker = true;
186             break;
187           }
188         }
189       }
190 
191       if (!exitWorker) {
192         taskLoop();
193       }
194     } catch (Throwable t) {
195       // only a logical error can cause here. Printing it out
196       // to make debugging easier
197       LOG.error("unexpected error ", t);
198     } finally {
199       LOG.info("SplitLogWorker " + this.serverName + " exiting");
200     }
201   }
202 
203   /**
204    * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task
205    * one at a time. This policy puts an upper-limit on the number of
206    * simultaneous log splitting that could be happening in a cluster.
207    * <p>
208    * Synchronization using {@link #taskReadyLock} ensures that it will
209    * try to grab every task that has been put up
210    */
211   private void taskLoop() {
212     while (!exitWorker) {
213       int seq_start = taskReadySeq;
214       List<String> paths = getTaskList();
215       if (paths == null) {
216         LOG.warn("Could not get tasks, did someone remove " +
217             this.watcher.splitLogZNode + " ... worker thread exiting.");
218         return;
219       }
220       int offset = (int)(Math.random() * paths.size());
221       for (int i = 0; i < paths.size(); i ++) {
222         int idx = (i + offset) % paths.size();
223         // don't call ZKSplitLog.getNodeName() because that will lead to
224         // double encoding of the path name
225         grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
226         if (exitWorker) {
227           return;
228         }
229       }
230       synchronized (taskReadyLock) {
231         while (seq_start == taskReadySeq) {
232           try {
233             taskReadyLock.wait(checkInterval);
234             if (this.server != null) {
235               // check to see if we have stale recovering regions in our internal memory state
236               Map<String, HRegion> recoveringRegions = this.server.getRecoveringRegions();
237               if (!recoveringRegions.isEmpty()) {
238                 // Make a local copy to prevent ConcurrentModificationException when other threads
239                 // modify recoveringRegions
240                 List<String> tmpCopy = new ArrayList<String>(recoveringRegions.keySet());
241                 for (String region : tmpCopy) {
242                   String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode, region);
243                   try {
244                     if (ZKUtil.checkExists(this.watcher, nodePath) == -1) {
245                       HRegion r = recoveringRegions.remove(region);
246                       if (r != null) {
247                         r.setRecovering(false);
248                       }
249                       LOG.debug("Mark recovering region:" + region + " up.");
250                     } else {
251                       // current check is a defensive(or redundant) mechanism to prevent us from
252                       // having stale recovering regions in our internal RS memory state while
253                       // zookeeper(source of truth) says differently. We stop at the first good one
254                       // because we should not have a single instance such as this in normal case so
255                       // check the first one is good enough.
256                       break;
257                     }
258                   } catch (KeeperException e) {
259                     // ignore zookeeper error
260                     LOG.debug("Got a zookeeper when trying to open a recovering region", e);
261                     break;
262                   }
263                 }
264               }
265             }
266           } catch (InterruptedException e) {
267             LOG.info("SplitLogWorker interrupted while waiting for task," +
268                 " exiting: " + e.toString() + (exitWorker ? "" :
269                 " (ERROR: exitWorker is not set, exiting anyway)"));
270             exitWorker = true;
271             return;
272           }
273         }
274       }
275 
276     }
277   }
278 
279   /**
280    * try to grab a 'lock' on the task zk node to own and execute the task.
281    * <p>
282    * @param path zk node for the task
283    */
284   private void grabTask(String path) {
285     Stat stat = new Stat();
286     long t = -1;
287     byte[] data;
288     synchronized (grabTaskLock) {
289       currentTask = path;
290       workerInGrabTask = true;
291       if (Thread.interrupted()) {
292         return;
293       }
294     }
295     try {
296       try {
297         if ((data = ZKUtil.getDataNoWatch(this.watcher, path, stat)) == null) {
298           SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
299           return;
300         }
301       } catch (KeeperException e) {
302         LOG.warn("Failed to get data for znode " + path, e);
303         SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
304         return;
305       }
306       SplitLogTask slt;
307       try {
308         slt = SplitLogTask.parseFrom(data);
309       } catch (DeserializationException e) {
310         LOG.warn("Failed parse data for znode " + path, e);
311         SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
312         return;
313       }
314       if (!slt.isUnassigned()) {
315         SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
316         return;
317       }
318 
319       currentVersion = stat.getVersion();
320       if (!attemptToOwnTask(true)) {
321         SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
322         return;
323       }
324 
325       if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
326         endTask(new SplitLogTask.Done(this.serverName),
327           SplitLogCounters.tot_wkr_task_acquired_rescan);
328         return;
329       }
330       LOG.info("worker " + serverName + " acquired task " + path);
331       SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
332       getDataSetWatchAsync();
333 
334       t = System.currentTimeMillis();
335       TaskExecutor.Status status;
336 
337       status = splitTaskExecutor.exec(ZKSplitLog.getFileName(currentTask),
338           new CancelableProgressable() {
339 
340         private long last_report_at = 0;
341 
342         @Override
343         public boolean progress() {
344           long t = EnvironmentEdgeManager.currentTimeMillis();
345           if ((t - last_report_at) > report_period) {
346             last_report_at = t;
347             if (!attemptToOwnTask(false)) {
348               LOG.warn("Failed to heartbeat the task" + currentTask);
349               return false;
350             }
351           }
352           return true;
353         }
354       });
355 
356       switch (status) {
357         case DONE:
358           endTask(new SplitLogTask.Done(this.serverName), SplitLogCounters.tot_wkr_task_done);
359           break;
360         case PREEMPTED:
361           SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
362           LOG.warn("task execution prempted " + path);
363           break;
364         case ERR:
365           if (!exitWorker) {
366             endTask(new SplitLogTask.Err(this.serverName), SplitLogCounters.tot_wkr_task_err);
367             break;
368           }
369           // if the RS is exiting then there is probably a tons of stuff
370           // that can go wrong. Resign instead of signaling error.
371           //$FALL-THROUGH$
372         case RESIGNED:
373           if (exitWorker) {
374             LOG.info("task execution interrupted because worker is exiting " + path);
375             endTask(new SplitLogTask.Resigned(this.serverName),
376               SplitLogCounters.tot_wkr_task_resigned);
377           } else {
378             SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
379             LOG.info("task execution interrupted via zk by manager " + path);
380           }
381           break;
382       }
383     } finally {
384       if (t > 0) {
385         LOG.info("worker " + serverName + " done with task " + path +
386             " in " + (System.currentTimeMillis() - t) + "ms");
387       }
388       synchronized (grabTaskLock) {
389         workerInGrabTask = false;
390         // clear the interrupt from stopTask() otherwise the next task will
391         // suffer
392         Thread.interrupted();
393       }
394     }
395   }
396 
397   /**
398    * Try to own the task by transitioning the zk node data from UNASSIGNED to
399    * OWNED.
400    * <p>
401    * This method is also used to periodically heartbeat the task progress by
402    * transitioning the node from OWNED to OWNED.
403    * <p>
404    * @return true if task path is successfully locked
405    */
406   private boolean attemptToOwnTask(boolean isFirstTime) {
407     try {
408       SplitLogTask slt = new SplitLogTask.Owned(this.serverName);
409       Stat stat =
410         this.watcher.getRecoverableZooKeeper().setData(currentTask, slt.toByteArray(), currentVersion);
411       if (stat == null) {
412         LOG.warn("zk.setData() returned null for path " + currentTask);
413         SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
414         return (false);
415       }
416       currentVersion = stat.getVersion();
417       SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet();
418       return (true);
419     } catch (KeeperException e) {
420       if (!isFirstTime) {
421         if (e.code().equals(KeeperException.Code.NONODE)) {
422           LOG.warn("NONODE failed to assert ownership for " + currentTask, e);
423         } else if (e.code().equals(KeeperException.Code.BADVERSION)) {
424           LOG.warn("BADVERSION failed to assert ownership for " +
425               currentTask, e);
426         } else {
427           LOG.warn("failed to assert ownership for " + currentTask, e);
428         }
429       }
430     } catch (InterruptedException e1) {
431       LOG.warn("Interrupted while trying to assert ownership of " +
432           currentTask + " " + StringUtils.stringifyException(e1));
433       Thread.currentThread().interrupt();
434     }
435     SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
436     return (false);
437   }
438 
439   /**
440    * endTask() can fail and the only way to recover out of it is for the
441    * {@link SplitLogManager} to timeout the task node.
442    * @param slt
443    * @param ctr
444    */
445   private void endTask(SplitLogTask slt, AtomicLong ctr) {
446     String path = currentTask;
447     currentTask = null;
448     try {
449       if (ZKUtil.setData(this.watcher, path, slt.toByteArray(),
450           currentVersion)) {
451         LOG.info("successfully transitioned task " + path + " to final state " + slt);
452         ctr.incrementAndGet();
453         return;
454       }
455       LOG.warn("failed to transistion task " + path + " to end state " + slt +
456           " because of version mismatch ");
457     } catch (KeeperException.BadVersionException bve) {
458       LOG.warn("transisition task " + path + " to " + slt +
459           " failed because of version mismatch", bve);
460     } catch (KeeperException.NoNodeException e) {
461       LOG.fatal("logic error - end task " + path + " " + slt +
462           " failed because task doesn't exist", e);
463     } catch (KeeperException e) {
464       LOG.warn("failed to end task, " + path + " " + slt, e);
465     }
466     SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
467   }
468 
469   void getDataSetWatchAsync() {
470     this.watcher.getRecoverableZooKeeper().getZooKeeper().
471       getData(currentTask, this.watcher,
472       new GetDataAsyncCallback(), null);
473     SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet();
474   }
475 
476   void getDataSetWatchSuccess(String path, byte[] data) {
477     SplitLogTask slt;
478     try {
479       slt = SplitLogTask.parseFrom(data);
480     } catch (DeserializationException e) {
481       LOG.warn("Failed parse", e);
482       return;
483     }
484     synchronized (grabTaskLock) {
485       if (workerInGrabTask) {
486         // currentTask can change but that's ok
487         String taskpath = currentTask;
488         if (taskpath != null && taskpath.equals(path)) {
489           // have to compare data. cannot compare version because then there
490           // will be race with attemptToOwnTask()
491           // cannot just check whether the node has been transitioned to
492           // UNASSIGNED because by the time this worker sets the data watch
493           // the node might have made two transitions - from owned by this
494           // worker to unassigned to owned by another worker
495           if (! slt.isOwned(this.serverName) &&
496               ! slt.isDone(this.serverName) &&
497               ! slt.isErr(this.serverName) &&
498               ! slt.isResigned(this.serverName)) {
499             LOG.info("task " + taskpath + " preempted from " +
500                 serverName + ", current task state and owner=" + slt.toString());
501             stopTask();
502           }
503         }
504       }
505     }
506   }
507 
508   void getDataSetWatchFailure(String path) {
509     synchronized (grabTaskLock) {
510       if (workerInGrabTask) {
511         // currentTask can change but that's ok
512         String taskpath = currentTask;
513         if (taskpath != null && taskpath.equals(path)) {
514           LOG.info("retrying data watch on " + path);
515           SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet();
516           getDataSetWatchAsync();
517         } else {
518           // no point setting a watch on the task which this worker is not
519           // working upon anymore
520         }
521       }
522     }
523   }
524 
525   @Override
526   public void nodeDataChanged(String path) {
527     // there will be a self generated dataChanged event every time attemptToOwnTask()
528     // heartbeats the task znode by upping its version
529     synchronized (grabTaskLock) {
530       if (workerInGrabTask) {
531         // currentTask can change
532         String taskpath = currentTask;
533         if (taskpath!= null && taskpath.equals(path)) {
534           getDataSetWatchAsync();
535         }
536       }
537     }
538   }
539 
540 
541   private List<String> getTaskList() {
542     List<String> childrenPaths = null;
543     long sleepTime = 1000;
544     // It will be in loop till it gets the list of children or
545     // it will come out if worker thread exited.
546     while (!exitWorker) {
547       try {
548         childrenPaths = ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
549             this.watcher.splitLogZNode);
550         if (childrenPaths != null) {
551           return childrenPaths;
552         }
553       } catch (KeeperException e) {
554         LOG.warn("Could not get children of znode "
555             + this.watcher.splitLogZNode, e);
556       }
557       try {
558         LOG.debug("Retry listChildren of znode " + this.watcher.splitLogZNode
559             + " after sleep for " + sleepTime + "ms!");
560         Thread.sleep(sleepTime);
561       } catch (InterruptedException e1) {
562         LOG.warn("Interrupted while trying to get task list ...", e1);
563         Thread.currentThread().interrupt();
564       }
565     }
566     return childrenPaths;
567   }
568 
569   @Override
570   public void nodeChildrenChanged(String path) {
571     if(path.equals(watcher.splitLogZNode)) {
572       LOG.debug("tasks arrived or departed");
573       synchronized (taskReadyLock) {
574         taskReadySeq++;
575         taskReadyLock.notify();
576       }
577     }
578   }
579 
580   /**
581    * If the worker is doing a task i.e. splitting a log file then stop the task.
582    * It doesn't exit the worker thread.
583    */
584   void stopTask() {
585     LOG.info("Sending interrupt to stop the worker thread");
586     worker.interrupt(); // TODO interrupt often gets swallowed, do what else?
587   }
588 
589 
590   /**
591    * start the SplitLogWorker thread
592    */
593   public void start() {
594     worker = new Thread(null, this, "SplitLogWorker-" + serverName);
595     exitWorker = false;
596     worker.start();
597   }
598 
599   /**
600    * stop the SplitLogWorker thread
601    */
602   public void stop() {
603     exitWorker = true;
604     stopTask();
605   }
606 
607   /**
608    * Asynchronous handler for zk get-data-set-watch on node results.
609    */
610   class GetDataAsyncCallback implements AsyncCallback.DataCallback {
611     private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
612 
613     @Override
614     public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
615       SplitLogCounters.tot_wkr_get_data_result.incrementAndGet();
616       if (rc != 0) {
617         LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
618         getDataSetWatchFailure(path);
619         return;
620       }
621       data = watcher.getRecoverableZooKeeper().removeMetaData(data);
622       getDataSetWatchSuccess(path, data);
623     }
624   }
625 
626   /**
627    * Objects implementing this interface actually do the task that has been
628    * acquired by a {@link SplitLogWorker}. Since there isn't a water-tight
629    * guarantee that two workers will not be executing the same task therefore it
630    * is better to have workers prepare the task and then have the
631    * {@link SplitLogManager} commit the work in SplitLogManager.TaskFinisher
632    */
633   static public interface TaskExecutor {
634     static public enum Status {
635       DONE(),
636       ERR(),
637       RESIGNED(),
638       PREEMPTED()
639     }
640     public Status exec(String name, CancelableProgressable p);
641   }
642 }