1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.net.ConnectException;
24 import java.net.SocketTimeoutException;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.concurrent.atomic.AtomicLong;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.classification.InterfaceAudience;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.hbase.ServerName;
37 import org.apache.hadoop.hbase.SplitLogCounters;
38 import org.apache.hadoop.hbase.SplitLogTask;
39 import org.apache.hadoop.hbase.client.HConnectionManager;
40 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
41 import org.apache.hadoop.hbase.exceptions.DeserializationException;
42 import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
43 import org.apache.hadoop.hbase.master.SplitLogManager;
44 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
45 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
46 import org.apache.hadoop.hbase.util.CancelableProgressable;
47 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
48 import org.apache.hadoop.hbase.util.FSUtils;
49 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
50 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
51 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
52 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
53 import org.apache.hadoop.util.StringUtils;
54 import org.apache.zookeeper.AsyncCallback;
55 import org.apache.zookeeper.KeeperException;
56 import org.apache.zookeeper.data.Stat;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 @InterfaceAudience.Private
79 public class SplitLogWorker extends ZooKeeperListener implements Runnable {
80 private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
81 private static final int checkInterval = 5000;
82
83 Thread worker;
84 private final ServerName serverName;
85 private final TaskExecutor splitTaskExecutor;
86
87 private final Object taskReadyLock = new Object();
88 volatile int taskReadySeq = 0;
89 private volatile String currentTask = null;
90 private int currentVersion;
91 private volatile boolean exitWorker;
92 private final Object grabTaskLock = new Object();
93 private boolean workerInGrabTask = false;
94 private final int report_period;
95 private RegionServerServices server = null;
96 private Configuration conf = null;
97
98 public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf,
99 RegionServerServices server, TaskExecutor splitTaskExecutor) {
100 super(watcher);
101 this.server = server;
102 this.serverName = server.getServerName();
103 this.splitTaskExecutor = splitTaskExecutor;
104 report_period = conf.getInt("hbase.splitlog.report.period",
105 conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
106 this.conf = conf;
107 }
108
109 public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, ServerName serverName,
110 TaskExecutor splitTaskExecutor) {
111 super(watcher);
112 this.serverName = serverName;
113 this.splitTaskExecutor = splitTaskExecutor;
114 report_period = conf.getInt("hbase.splitlog.report.period",
115 conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
116 this.conf = conf;
117 }
118
119 public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf,
120 RegionServerServices server, final LastSequenceId sequenceIdChecker) {
121 this(watcher, conf, server, new TaskExecutor() {
122 @Override
123 public Status exec(String filename, CancelableProgressable p) {
124 Path rootdir;
125 FileSystem fs;
126 try {
127 rootdir = FSUtils.getRootDir(conf);
128 fs = rootdir.getFileSystem(conf);
129 } catch (IOException e) {
130 LOG.warn("could not find root dir or fs", e);
131 return Status.RESIGNED;
132 }
133
134
135
136 try {
137 if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
138 fs, conf, p, sequenceIdChecker, watcher)) {
139 return Status.PREEMPTED;
140 }
141 } catch (InterruptedIOException iioe) {
142 LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe);
143 return Status.RESIGNED;
144 } catch (IOException e) {
145 Throwable cause = e.getCause();
146 if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException
147 || cause instanceof ConnectException
148 || cause instanceof SocketTimeoutException)) {
149 LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, "
150 + "resigning", e);
151 return Status.RESIGNED;
152 } else if (cause instanceof InterruptedException) {
153 LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
154 return Status.RESIGNED;
155 } else if(cause instanceof KeeperException) {
156 LOG.warn("log splitting of " + filename + " hit ZooKeeper issue, resigning", e);
157 return Status.RESIGNED;
158 }
159 LOG.warn("log splitting of " + filename + " failed, returning error", e);
160 return Status.ERR;
161 }
162 return Status.DONE;
163 }
164 });
165 }
166
167 @Override
168 public void run() {
169 try {
170 LOG.info("SplitLogWorker " + this.serverName + " starting");
171 this.watcher.registerListener(this);
172
173 HConnectionManager.getConnection(conf);
174 int res;
175
176 res = -1;
177 while (res == -1 && !exitWorker) {
178 try {
179 res = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
180 } catch (KeeperException e) {
181
182 LOG.warn("Exception when checking for " + watcher.splitLogZNode + " ... retrying", e);
183 }
184 if (res == -1) {
185 try {
186 LOG.info(watcher.splitLogZNode + " znode does not exist, waiting for master to create");
187 Thread.sleep(1000);
188 } catch (InterruptedException e) {
189 LOG.debug("Interrupted while waiting for " + watcher.splitLogZNode
190 + (exitWorker ? "" : " (ERROR: exitWorker is not set, " +
191 "exiting anyway)"));
192 exitWorker = true;
193 break;
194 }
195 }
196 }
197
198 if (!exitWorker) {
199 taskLoop();
200 }
201 } catch (Throwable t) {
202
203
204 LOG.error("unexpected error ", t);
205 } finally {
206 LOG.info("SplitLogWorker " + this.serverName + " exiting");
207 }
208 }
209
210
211
212
213
214
215
216
217
218 private void taskLoop() {
219 while (!exitWorker) {
220 int seq_start = taskReadySeq;
221 List<String> paths = getTaskList();
222 if (paths == null) {
223 LOG.warn("Could not get tasks, did someone remove " +
224 this.watcher.splitLogZNode + " ... worker thread exiting.");
225 return;
226 }
227
228 int offset = (int) (Math.random() * paths.size());
229 for(int i = 0; i < paths.size(); i ++){
230 if(HLogUtil.isMetaFile(paths.get(i))) {
231 offset = i;
232 break;
233 }
234 }
235 for (int i = 0; i < paths.size(); i ++) {
236 int idx = (i + offset) % paths.size();
237
238
239 grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
240 if (exitWorker) {
241 return;
242 }
243 }
244 synchronized (taskReadyLock) {
245 while (seq_start == taskReadySeq) {
246 try {
247 taskReadyLock.wait(checkInterval);
248 if (this.server != null) {
249
250 Map<String, HRegion> recoveringRegions = this.server.getRecoveringRegions();
251 if (!recoveringRegions.isEmpty()) {
252
253
254 List<String> tmpCopy = new ArrayList<String>(recoveringRegions.keySet());
255 for (String region : tmpCopy) {
256 String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode, region);
257 try {
258 if (ZKUtil.checkExists(this.watcher, nodePath) == -1) {
259 HRegion r = recoveringRegions.remove(region);
260 if (r != null) {
261 r.setRecovering(false);
262 }
263 LOG.debug("Mark recovering region:" + region + " up.");
264 } else {
265
266
267
268
269
270 break;
271 }
272 } catch (KeeperException e) {
273
274 LOG.debug("Got a zookeeper when trying to open a recovering region", e);
275 break;
276 }
277 }
278 }
279 }
280 } catch (InterruptedException e) {
281 LOG.info("SplitLogWorker interrupted while waiting for task," +
282 " exiting: " + e.toString() + (exitWorker ? "" :
283 " (ERROR: exitWorker is not set, exiting anyway)"));
284 exitWorker = true;
285 return;
286 }
287 }
288 }
289
290 }
291 }
292
293
294
295
296
297
298 private void grabTask(String path) {
299 Stat stat = new Stat();
300 long t = -1;
301 byte[] data;
302 synchronized (grabTaskLock) {
303 currentTask = path;
304 workerInGrabTask = true;
305 if (Thread.interrupted()) {
306 return;
307 }
308 }
309 try {
310 try {
311 if ((data = ZKUtil.getDataNoWatch(this.watcher, path, stat)) == null) {
312 SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
313 return;
314 }
315 } catch (KeeperException e) {
316 LOG.warn("Failed to get data for znode " + path, e);
317 SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
318 return;
319 }
320 SplitLogTask slt;
321 try {
322 slt = SplitLogTask.parseFrom(data);
323 } catch (DeserializationException e) {
324 LOG.warn("Failed parse data for znode " + path, e);
325 SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
326 return;
327 }
328 if (!slt.isUnassigned()) {
329 SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
330 return;
331 }
332
333 currentVersion = stat.getVersion();
334 if (!attemptToOwnTask(true)) {
335 SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
336 return;
337 }
338
339 if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
340 endTask(new SplitLogTask.Done(this.serverName),
341 SplitLogCounters.tot_wkr_task_acquired_rescan);
342 return;
343 }
344 LOG.info("worker " + serverName + " acquired task " + path);
345 SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
346 getDataSetWatchAsync();
347
348 t = System.currentTimeMillis();
349 TaskExecutor.Status status;
350
351 status = splitTaskExecutor.exec(ZKSplitLog.getFileName(currentTask),
352 new CancelableProgressable() {
353
354 private long last_report_at = 0;
355
356 @Override
357 public boolean progress() {
358 long t = EnvironmentEdgeManager.currentTimeMillis();
359 if ((t - last_report_at) > report_period) {
360 last_report_at = t;
361 if (!attemptToOwnTask(false)) {
362 LOG.warn("Failed to heartbeat the task" + currentTask);
363 return false;
364 }
365 }
366 return true;
367 }
368 });
369
370 switch (status) {
371 case DONE:
372 endTask(new SplitLogTask.Done(this.serverName), SplitLogCounters.tot_wkr_task_done);
373 break;
374 case PREEMPTED:
375 SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
376 LOG.warn("task execution prempted " + path);
377 break;
378 case ERR:
379 if (!exitWorker) {
380 endTask(new SplitLogTask.Err(this.serverName), SplitLogCounters.tot_wkr_task_err);
381 break;
382 }
383
384
385
386 case RESIGNED:
387 if (exitWorker) {
388 LOG.info("task execution interrupted because worker is exiting " + path);
389 endTask(new SplitLogTask.Resigned(this.serverName),
390 SplitLogCounters.tot_wkr_task_resigned);
391 } else {
392 SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
393 LOG.info("task execution interrupted via zk by manager " + path);
394 }
395 break;
396 }
397 } finally {
398 if (t > 0) {
399 LOG.info("worker " + serverName + " done with task " + path +
400 " in " + (System.currentTimeMillis() - t) + "ms");
401 }
402 synchronized (grabTaskLock) {
403 workerInGrabTask = false;
404
405
406 Thread.interrupted();
407 }
408 }
409 }
410
411
412
413
414
415
416
417
418
419
420 private boolean attemptToOwnTask(boolean isFirstTime) {
421 try {
422 SplitLogTask slt = new SplitLogTask.Owned(this.serverName);
423 Stat stat =
424 this.watcher.getRecoverableZooKeeper().setData(currentTask, slt.toByteArray(), currentVersion);
425 if (stat == null) {
426 LOG.warn("zk.setData() returned null for path " + currentTask);
427 SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
428 return (false);
429 }
430 currentVersion = stat.getVersion();
431 SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet();
432 return (true);
433 } catch (KeeperException e) {
434 if (!isFirstTime) {
435 if (e.code().equals(KeeperException.Code.NONODE)) {
436 LOG.warn("NONODE failed to assert ownership for " + currentTask, e);
437 } else if (e.code().equals(KeeperException.Code.BADVERSION)) {
438 LOG.warn("BADVERSION failed to assert ownership for " +
439 currentTask, e);
440 } else {
441 LOG.warn("failed to assert ownership for " + currentTask, e);
442 }
443 }
444 } catch (InterruptedException e1) {
445 LOG.warn("Interrupted while trying to assert ownership of " +
446 currentTask + " " + StringUtils.stringifyException(e1));
447 Thread.currentThread().interrupt();
448 }
449 SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
450 return (false);
451 }
452
453
454
455
456
457
458
459 private void endTask(SplitLogTask slt, AtomicLong ctr) {
460 String path = currentTask;
461 currentTask = null;
462 try {
463 if (ZKUtil.setData(this.watcher, path, slt.toByteArray(),
464 currentVersion)) {
465 LOG.info("successfully transitioned task " + path + " to final state " + slt);
466 ctr.incrementAndGet();
467 return;
468 }
469 LOG.warn("failed to transistion task " + path + " to end state " + slt +
470 " because of version mismatch ");
471 } catch (KeeperException.BadVersionException bve) {
472 LOG.warn("transisition task " + path + " to " + slt +
473 " failed because of version mismatch", bve);
474 } catch (KeeperException.NoNodeException e) {
475 LOG.fatal("logic error - end task " + path + " " + slt +
476 " failed because task doesn't exist", e);
477 } catch (KeeperException e) {
478 LOG.warn("failed to end task, " + path + " " + slt, e);
479 }
480 SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
481 }
482
483 void getDataSetWatchAsync() {
484 this.watcher.getRecoverableZooKeeper().getZooKeeper().
485 getData(currentTask, this.watcher,
486 new GetDataAsyncCallback(), null);
487 SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet();
488 }
489
490 void getDataSetWatchSuccess(String path, byte[] data) {
491 SplitLogTask slt;
492 try {
493 slt = SplitLogTask.parseFrom(data);
494 } catch (DeserializationException e) {
495 LOG.warn("Failed parse", e);
496 return;
497 }
498 synchronized (grabTaskLock) {
499 if (workerInGrabTask) {
500
501 String taskpath = currentTask;
502 if (taskpath != null && taskpath.equals(path)) {
503
504
505
506
507
508
509 if (! slt.isOwned(this.serverName) &&
510 ! slt.isDone(this.serverName) &&
511 ! slt.isErr(this.serverName) &&
512 ! slt.isResigned(this.serverName)) {
513 LOG.info("task " + taskpath + " preempted from " +
514 serverName + ", current task state and owner=" + slt.toString());
515 stopTask();
516 }
517 }
518 }
519 }
520 }
521
522 void getDataSetWatchFailure(String path) {
523 synchronized (grabTaskLock) {
524 if (workerInGrabTask) {
525
526 String taskpath = currentTask;
527 if (taskpath != null && taskpath.equals(path)) {
528 LOG.info("retrying data watch on " + path);
529 SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet();
530 getDataSetWatchAsync();
531 } else {
532
533
534 }
535 }
536 }
537 }
538
539 @Override
540 public void nodeDataChanged(String path) {
541
542
543 synchronized (grabTaskLock) {
544 if (workerInGrabTask) {
545
546 String taskpath = currentTask;
547 if (taskpath!= null && taskpath.equals(path)) {
548 getDataSetWatchAsync();
549 }
550 }
551 }
552 }
553
554
555 private List<String> getTaskList() {
556 List<String> childrenPaths = null;
557 long sleepTime = 1000;
558
559
560 while (!exitWorker) {
561 try {
562 childrenPaths = ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
563 this.watcher.splitLogZNode);
564 if (childrenPaths != null) {
565 return childrenPaths;
566 }
567 } catch (KeeperException e) {
568 LOG.warn("Could not get children of znode "
569 + this.watcher.splitLogZNode, e);
570 }
571 try {
572 LOG.debug("Retry listChildren of znode " + this.watcher.splitLogZNode
573 + " after sleep for " + sleepTime + "ms!");
574 Thread.sleep(sleepTime);
575 } catch (InterruptedException e1) {
576 LOG.warn("Interrupted while trying to get task list ...", e1);
577 Thread.currentThread().interrupt();
578 }
579 }
580 return childrenPaths;
581 }
582
583 @Override
584 public void nodeChildrenChanged(String path) {
585 if(path.equals(watcher.splitLogZNode)) {
586 LOG.debug("tasks arrived or departed");
587 synchronized (taskReadyLock) {
588 taskReadySeq++;
589 taskReadyLock.notify();
590 }
591 }
592 }
593
594
595
596
597
598 void stopTask() {
599 LOG.info("Sending interrupt to stop the worker thread");
600 worker.interrupt();
601 }
602
603
604
605
606
607 public void start() {
608 worker = new Thread(null, this, "SplitLogWorker-" + serverName);
609 exitWorker = false;
610 worker.start();
611 }
612
613
614
615
616 public void stop() {
617 exitWorker = true;
618 stopTask();
619 }
620
621
622
623
624 class GetDataAsyncCallback implements AsyncCallback.DataCallback {
625 private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
626
627 @Override
628 public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
629 SplitLogCounters.tot_wkr_get_data_result.incrementAndGet();
630 if (rc != 0) {
631 LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
632 getDataSetWatchFailure(path);
633 return;
634 }
635 data = watcher.getRecoverableZooKeeper().removeMetaData(data);
636 getDataSetWatchSuccess(path, data);
637 }
638 }
639
640
641
642
643
644
645
646
647 static public interface TaskExecutor {
648 static public enum Status {
649 DONE(),
650 ERR(),
651 RESIGNED(),
652 PREEMPTED()
653 }
654 public Status exec(String name, CancelableProgressable p);
655 }
656 }