1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.net.ConnectException;
24 import java.net.SocketTimeoutException;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.concurrent.atomic.AtomicLong;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.classification.InterfaceAudience;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.hbase.ServerName;
37 import org.apache.hadoop.hbase.SplitLogCounters;
38 import org.apache.hadoop.hbase.SplitLogTask;
39 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
40 import org.apache.hadoop.hbase.exceptions.DeserializationException;
41 import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
42 import org.apache.hadoop.hbase.master.SplitLogManager;
43 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
44 import org.apache.hadoop.hbase.util.CancelableProgressable;
45 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
46 import org.apache.hadoop.hbase.util.FSUtils;
47 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
48 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
49 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
50 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
51 import org.apache.hadoop.util.StringUtils;
52 import org.apache.zookeeper.AsyncCallback;
53 import org.apache.zookeeper.KeeperException;
54 import org.apache.zookeeper.data.Stat;
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 @InterfaceAudience.Private
77 public class SplitLogWorker extends ZooKeeperListener implements Runnable {
78 private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
79 private static final int checkInterval = 5000;
80
81 Thread worker;
82 private final ServerName serverName;
83 private final TaskExecutor splitTaskExecutor;
84
85 private final Object taskReadyLock = new Object();
86 volatile int taskReadySeq = 0;
87 private volatile String currentTask = null;
88 private int currentVersion;
89 private volatile boolean exitWorker;
90 private final Object grabTaskLock = new Object();
91 private boolean workerInGrabTask = false;
92 private final int report_period;
93 private RegionServerServices server = null;
94
95 public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf,
96 RegionServerServices server, TaskExecutor splitTaskExecutor) {
97 super(watcher);
98 this.server = server;
99 this.serverName = server.getServerName();
100 this.splitTaskExecutor = splitTaskExecutor;
101 report_period = conf.getInt("hbase.splitlog.report.period",
102 conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
103 }
104
105 public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, ServerName serverName,
106 TaskExecutor splitTaskExecutor) {
107 super(watcher);
108 this.serverName = serverName;
109 this.splitTaskExecutor = splitTaskExecutor;
110 report_period = conf.getInt("hbase.splitlog.report.period",
111 conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
112 }
113
114 public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf,
115 RegionServerServices server, final LastSequenceId sequenceIdChecker) {
116 this(watcher, conf, server, new TaskExecutor() {
117 @Override
118 public Status exec(String filename, CancelableProgressable p) {
119 Path rootdir;
120 FileSystem fs;
121 try {
122 rootdir = FSUtils.getRootDir(conf);
123 fs = rootdir.getFileSystem(conf);
124 } catch (IOException e) {
125 LOG.warn("could not find root dir or fs", e);
126 return Status.RESIGNED;
127 }
128
129
130
131 try {
132 if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
133 fs, conf, p, sequenceIdChecker, watcher)) {
134 return Status.PREEMPTED;
135 }
136 } catch (InterruptedIOException iioe) {
137 LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe);
138 return Status.RESIGNED;
139 } catch (IOException e) {
140 Throwable cause = e.getCause();
141 if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException
142 || cause instanceof ConnectException
143 || cause instanceof SocketTimeoutException)) {
144 LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, "
145 + "resigning", e);
146 return Status.RESIGNED;
147 } else if (cause instanceof InterruptedException) {
148 LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
149 return Status.RESIGNED;
150 } else if(cause instanceof KeeperException) {
151 LOG.warn("log splitting of " + filename + " hit ZooKeeper issue, resigning", e);
152 return Status.RESIGNED;
153 }
154 LOG.warn("log splitting of " + filename + " failed, returning error", e);
155 return Status.ERR;
156 }
157 return Status.DONE;
158 }
159 });
160 }
161
162 @Override
163 public void run() {
164 try {
165 LOG.info("SplitLogWorker " + this.serverName + " starting");
166 this.watcher.registerListener(this);
167 int res;
168
169 res = -1;
170 while (res == -1 && !exitWorker) {
171 try {
172 res = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
173 } catch (KeeperException e) {
174
175 LOG.warn("Exception when checking for " + watcher.splitLogZNode + " ... retrying", e);
176 }
177 if (res == -1) {
178 try {
179 LOG.info(watcher.splitLogZNode + " znode does not exist, waiting for master to create");
180 Thread.sleep(1000);
181 } catch (InterruptedException e) {
182 LOG.debug("Interrupted while waiting for " + watcher.splitLogZNode
183 + (exitWorker ? "" : " (ERROR: exitWorker is not set, " +
184 "exiting anyway)"));
185 exitWorker = true;
186 break;
187 }
188 }
189 }
190
191 if (!exitWorker) {
192 taskLoop();
193 }
194 } catch (Throwable t) {
195
196
197 LOG.error("unexpected error ", t);
198 } finally {
199 LOG.info("SplitLogWorker " + this.serverName + " exiting");
200 }
201 }
202
203
204
205
206
207
208
209
210
211 private void taskLoop() {
212 while (!exitWorker) {
213 int seq_start = taskReadySeq;
214 List<String> paths = getTaskList();
215 if (paths == null) {
216 LOG.warn("Could not get tasks, did someone remove " +
217 this.watcher.splitLogZNode + " ... worker thread exiting.");
218 return;
219 }
220 int offset = (int)(Math.random() * paths.size());
221 for (int i = 0; i < paths.size(); i ++) {
222 int idx = (i + offset) % paths.size();
223
224
225 grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
226 if (exitWorker) {
227 return;
228 }
229 }
230 synchronized (taskReadyLock) {
231 while (seq_start == taskReadySeq) {
232 try {
233 taskReadyLock.wait(checkInterval);
234 if (this.server != null) {
235
236 Map<String, HRegion> recoveringRegions = this.server.getRecoveringRegions();
237 if (!recoveringRegions.isEmpty()) {
238
239
240 List<String> tmpCopy = new ArrayList<String>(recoveringRegions.keySet());
241 for (String region : tmpCopy) {
242 String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode, region);
243 try {
244 if (ZKUtil.checkExists(this.watcher, nodePath) == -1) {
245 HRegion r = recoveringRegions.remove(region);
246 if (r != null) {
247 r.setRecovering(false);
248 }
249 LOG.debug("Mark recovering region:" + region + " up.");
250 } else {
251
252
253
254
255
256 break;
257 }
258 } catch (KeeperException e) {
259
260 LOG.debug("Got a zookeeper when trying to open a recovering region", e);
261 break;
262 }
263 }
264 }
265 }
266 } catch (InterruptedException e) {
267 LOG.info("SplitLogWorker interrupted while waiting for task," +
268 " exiting: " + e.toString() + (exitWorker ? "" :
269 " (ERROR: exitWorker is not set, exiting anyway)"));
270 exitWorker = true;
271 return;
272 }
273 }
274 }
275
276 }
277 }
278
279
280
281
282
283
284 private void grabTask(String path) {
285 Stat stat = new Stat();
286 long t = -1;
287 byte[] data;
288 synchronized (grabTaskLock) {
289 currentTask = path;
290 workerInGrabTask = true;
291 if (Thread.interrupted()) {
292 return;
293 }
294 }
295 try {
296 try {
297 if ((data = ZKUtil.getDataNoWatch(this.watcher, path, stat)) == null) {
298 SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
299 return;
300 }
301 } catch (KeeperException e) {
302 LOG.warn("Failed to get data for znode " + path, e);
303 SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
304 return;
305 }
306 SplitLogTask slt;
307 try {
308 slt = SplitLogTask.parseFrom(data);
309 } catch (DeserializationException e) {
310 LOG.warn("Failed parse data for znode " + path, e);
311 SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
312 return;
313 }
314 if (!slt.isUnassigned()) {
315 SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
316 return;
317 }
318
319 currentVersion = stat.getVersion();
320 if (!attemptToOwnTask(true)) {
321 SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
322 return;
323 }
324
325 if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
326 endTask(new SplitLogTask.Done(this.serverName),
327 SplitLogCounters.tot_wkr_task_acquired_rescan);
328 return;
329 }
330 LOG.info("worker " + serverName + " acquired task " + path);
331 SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
332 getDataSetWatchAsync();
333
334 t = System.currentTimeMillis();
335 TaskExecutor.Status status;
336
337 status = splitTaskExecutor.exec(ZKSplitLog.getFileName(currentTask),
338 new CancelableProgressable() {
339
340 private long last_report_at = 0;
341
342 @Override
343 public boolean progress() {
344 long t = EnvironmentEdgeManager.currentTimeMillis();
345 if ((t - last_report_at) > report_period) {
346 last_report_at = t;
347 if (!attemptToOwnTask(false)) {
348 LOG.warn("Failed to heartbeat the task" + currentTask);
349 return false;
350 }
351 }
352 return true;
353 }
354 });
355
356 switch (status) {
357 case DONE:
358 endTask(new SplitLogTask.Done(this.serverName), SplitLogCounters.tot_wkr_task_done);
359 break;
360 case PREEMPTED:
361 SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
362 LOG.warn("task execution prempted " + path);
363 break;
364 case ERR:
365 if (!exitWorker) {
366 endTask(new SplitLogTask.Err(this.serverName), SplitLogCounters.tot_wkr_task_err);
367 break;
368 }
369
370
371
372 case RESIGNED:
373 if (exitWorker) {
374 LOG.info("task execution interrupted because worker is exiting " + path);
375 endTask(new SplitLogTask.Resigned(this.serverName),
376 SplitLogCounters.tot_wkr_task_resigned);
377 } else {
378 SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
379 LOG.info("task execution interrupted via zk by manager " + path);
380 }
381 break;
382 }
383 } finally {
384 if (t > 0) {
385 LOG.info("worker " + serverName + " done with task " + path +
386 " in " + (System.currentTimeMillis() - t) + "ms");
387 }
388 synchronized (grabTaskLock) {
389 workerInGrabTask = false;
390
391
392 Thread.interrupted();
393 }
394 }
395 }
396
397
398
399
400
401
402
403
404
405
406 private boolean attemptToOwnTask(boolean isFirstTime) {
407 try {
408 SplitLogTask slt = new SplitLogTask.Owned(this.serverName);
409 Stat stat =
410 this.watcher.getRecoverableZooKeeper().setData(currentTask, slt.toByteArray(), currentVersion);
411 if (stat == null) {
412 LOG.warn("zk.setData() returned null for path " + currentTask);
413 SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
414 return (false);
415 }
416 currentVersion = stat.getVersion();
417 SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet();
418 return (true);
419 } catch (KeeperException e) {
420 if (!isFirstTime) {
421 if (e.code().equals(KeeperException.Code.NONODE)) {
422 LOG.warn("NONODE failed to assert ownership for " + currentTask, e);
423 } else if (e.code().equals(KeeperException.Code.BADVERSION)) {
424 LOG.warn("BADVERSION failed to assert ownership for " +
425 currentTask, e);
426 } else {
427 LOG.warn("failed to assert ownership for " + currentTask, e);
428 }
429 }
430 } catch (InterruptedException e1) {
431 LOG.warn("Interrupted while trying to assert ownership of " +
432 currentTask + " " + StringUtils.stringifyException(e1));
433 Thread.currentThread().interrupt();
434 }
435 SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
436 return (false);
437 }
438
439
440
441
442
443
444
445 private void endTask(SplitLogTask slt, AtomicLong ctr) {
446 String path = currentTask;
447 currentTask = null;
448 try {
449 if (ZKUtil.setData(this.watcher, path, slt.toByteArray(),
450 currentVersion)) {
451 LOG.info("successfully transitioned task " + path + " to final state " + slt);
452 ctr.incrementAndGet();
453 return;
454 }
455 LOG.warn("failed to transistion task " + path + " to end state " + slt +
456 " because of version mismatch ");
457 } catch (KeeperException.BadVersionException bve) {
458 LOG.warn("transisition task " + path + " to " + slt +
459 " failed because of version mismatch", bve);
460 } catch (KeeperException.NoNodeException e) {
461 LOG.fatal("logic error - end task " + path + " " + slt +
462 " failed because task doesn't exist", e);
463 } catch (KeeperException e) {
464 LOG.warn("failed to end task, " + path + " " + slt, e);
465 }
466 SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
467 }
468
469 void getDataSetWatchAsync() {
470 this.watcher.getRecoverableZooKeeper().getZooKeeper().
471 getData(currentTask, this.watcher,
472 new GetDataAsyncCallback(), null);
473 SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet();
474 }
475
476 void getDataSetWatchSuccess(String path, byte[] data) {
477 SplitLogTask slt;
478 try {
479 slt = SplitLogTask.parseFrom(data);
480 } catch (DeserializationException e) {
481 LOG.warn("Failed parse", e);
482 return;
483 }
484 synchronized (grabTaskLock) {
485 if (workerInGrabTask) {
486
487 String taskpath = currentTask;
488 if (taskpath != null && taskpath.equals(path)) {
489
490
491
492
493
494
495 if (! slt.isOwned(this.serverName) &&
496 ! slt.isDone(this.serverName) &&
497 ! slt.isErr(this.serverName) &&
498 ! slt.isResigned(this.serverName)) {
499 LOG.info("task " + taskpath + " preempted from " +
500 serverName + ", current task state and owner=" + slt.toString());
501 stopTask();
502 }
503 }
504 }
505 }
506 }
507
508 void getDataSetWatchFailure(String path) {
509 synchronized (grabTaskLock) {
510 if (workerInGrabTask) {
511
512 String taskpath = currentTask;
513 if (taskpath != null && taskpath.equals(path)) {
514 LOG.info("retrying data watch on " + path);
515 SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet();
516 getDataSetWatchAsync();
517 } else {
518
519
520 }
521 }
522 }
523 }
524
525 @Override
526 public void nodeDataChanged(String path) {
527
528
529 synchronized (grabTaskLock) {
530 if (workerInGrabTask) {
531
532 String taskpath = currentTask;
533 if (taskpath!= null && taskpath.equals(path)) {
534 getDataSetWatchAsync();
535 }
536 }
537 }
538 }
539
540
541 private List<String> getTaskList() {
542 List<String> childrenPaths = null;
543 long sleepTime = 1000;
544
545
546 while (!exitWorker) {
547 try {
548 childrenPaths = ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
549 this.watcher.splitLogZNode);
550 if (childrenPaths != null) {
551 return childrenPaths;
552 }
553 } catch (KeeperException e) {
554 LOG.warn("Could not get children of znode "
555 + this.watcher.splitLogZNode, e);
556 }
557 try {
558 LOG.debug("Retry listChildren of znode " + this.watcher.splitLogZNode
559 + " after sleep for " + sleepTime + "ms!");
560 Thread.sleep(sleepTime);
561 } catch (InterruptedException e1) {
562 LOG.warn("Interrupted while trying to get task list ...", e1);
563 Thread.currentThread().interrupt();
564 }
565 }
566 return childrenPaths;
567 }
568
569 @Override
570 public void nodeChildrenChanged(String path) {
571 if(path.equals(watcher.splitLogZNode)) {
572 LOG.debug("tasks arrived or departed");
573 synchronized (taskReadyLock) {
574 taskReadySeq++;
575 taskReadyLock.notify();
576 }
577 }
578 }
579
580
581
582
583
584 void stopTask() {
585 LOG.info("Sending interrupt to stop the worker thread");
586 worker.interrupt();
587 }
588
589
590
591
592
593 public void start() {
594 worker = new Thread(null, this, "SplitLogWorker-" + serverName);
595 exitWorker = false;
596 worker.start();
597 }
598
599
600
601
602 public void stop() {
603 exitWorker = true;
604 stopTask();
605 }
606
607
608
609
610 class GetDataAsyncCallback implements AsyncCallback.DataCallback {
611 private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
612
613 @Override
614 public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
615 SplitLogCounters.tot_wkr_get_data_result.incrementAndGet();
616 if (rc != 0) {
617 LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
618 getDataSetWatchFailure(path);
619 return;
620 }
621 data = watcher.getRecoverableZooKeeper().removeMetaData(data);
622 getDataSetWatchSuccess(path, data);
623 }
624 }
625
626
627
628
629
630
631
632
633 static public interface TaskExecutor {
634 static public enum Status {
635 DONE(),
636 ERR(),
637 RESIGNED(),
638 PREEMPTED()
639 }
640 public Status exec(String name, CancelableProgressable p);
641 }
642 }