1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.coordination;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import java.util.concurrent.atomic.AtomicLong;
28
29 import org.apache.commons.lang.math.RandomUtils;
30 import org.apache.commons.lang.mutable.MutableInt;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.ServerName;
38 import org.apache.hadoop.hbase.SplitLogCounters;
39 import org.apache.hadoop.hbase.SplitLogTask;
40 import org.apache.hadoop.hbase.classification.InterfaceAudience;
41 import org.apache.hadoop.hbase.exceptions.DeserializationException;
42 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
43 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
44 import org.apache.hadoop.hbase.regionserver.Region;
45 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
46 import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
47 import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
48 import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler;
49 import org.apache.hadoop.hbase.regionserver.handler.WALSplitterHandler;
50 import org.apache.hadoop.hbase.util.CancelableProgressable;
51 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
52 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
53 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
54 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
55 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
56 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
57 import org.apache.hadoop.util.StringUtils;
58 import org.apache.zookeeper.AsyncCallback;
59 import org.apache.zookeeper.KeeperException;
60 import org.apache.zookeeper.data.Stat;
61
62
63
64
65
66
67 @InterfaceAudience.Private
68 public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
69 SplitLogWorkerCoordination {
70
71 private static final Log LOG = LogFactory.getLog(ZkSplitLogWorkerCoordination.class);
72
73 private static final int checkInterval = 5000;
74 private static final int FAILED_TO_OWN_TASK = -1;
75
76 private SplitLogWorker worker;
77
78 private TaskExecutor splitTaskExecutor;
79
80 private final Object taskReadyLock = new Object();
81 private AtomicInteger taskReadySeq = new AtomicInteger(0);
82 private volatile String currentTask = null;
83 private int currentVersion;
84 private volatile boolean shouldStop = false;
85 private final Object grabTaskLock = new Object();
86 private boolean workerInGrabTask = false;
87 private int reportPeriod;
88 private RegionServerServices server = null;
89 protected final AtomicInteger tasksInProgress = new AtomicInteger(0);
90 private int maxConcurrentTasks = 0;
91
92 private final ZkCoordinatedStateManager manager;
93
94 public ZkSplitLogWorkerCoordination(ZkCoordinatedStateManager zkCoordinatedStateManager,
95 ZooKeeperWatcher watcher) {
96 super(watcher);
97 manager = zkCoordinatedStateManager;
98
99 }
100
101
102
103
104 @Override
105 public void nodeChildrenChanged(String path) {
106 if (path.equals(watcher.splitLogZNode)) {
107 if (LOG.isTraceEnabled()) LOG.trace("tasks arrived or departed on " + path);
108 synchronized (taskReadyLock) {
109 this.taskReadySeq.incrementAndGet();
110 taskReadyLock.notify();
111 }
112 }
113 }
114
115
116
117
118 @Override
119 public void nodeDataChanged(String path) {
120
121
122 synchronized (grabTaskLock) {
123 if (workerInGrabTask) {
124
125 String taskpath = currentTask;
126 if (taskpath != null && taskpath.equals(path)) {
127 getDataSetWatchAsync();
128 }
129 }
130 }
131 }
132
133
134
135
136 @Override
137 public void init(RegionServerServices server, Configuration conf,
138 TaskExecutor splitExecutor, SplitLogWorker worker) {
139 this.server = server;
140 this.worker = worker;
141 this.splitTaskExecutor = splitExecutor;
142 maxConcurrentTasks = conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS);
143 reportPeriod =
144 conf.getInt("hbase.splitlog.report.period",
145 conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
146 ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT) / 3);
147 }
148
149
150
151 void getDataSetWatchFailure(String path) {
152 synchronized (grabTaskLock) {
153 if (workerInGrabTask) {
154
155 String taskpath = currentTask;
156 if (taskpath != null && taskpath.equals(path)) {
157 LOG.info("retrying data watch on " + path);
158 SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet();
159 getDataSetWatchAsync();
160 } else {
161
162
163 }
164 }
165 }
166 }
167
168 public void getDataSetWatchAsync() {
169 watcher.getRecoverableZooKeeper().getZooKeeper()
170 .getData(currentTask, watcher, new GetDataAsyncCallback(), null);
171 SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet();
172 }
173
174 void getDataSetWatchSuccess(String path, byte[] data) {
175 SplitLogTask slt;
176 try {
177 slt = SplitLogTask.parseFrom(data);
178 } catch (DeserializationException e) {
179 LOG.warn("Failed parse", e);
180 return;
181 }
182 synchronized (grabTaskLock) {
183 if (workerInGrabTask) {
184
185 String taskpath = currentTask;
186 if (taskpath != null && taskpath.equals(path)) {
187 ServerName serverName = manager.getServer().getServerName();
188
189
190
191
192
193
194 if (!slt.isOwned(serverName) && !slt.isDone(serverName) && !slt.isErr(serverName)
195 && !slt.isResigned(serverName)) {
196 LOG.info("task " + taskpath + " preempted from " + serverName
197 + ", current task state and owner=" + slt.toString());
198 worker.stopTask();
199 }
200 }
201 }
202 }
203 }
204
205
206
207
208
209
210 private void grabTask(String path) {
211 Stat stat = new Stat();
212 byte[] data;
213 synchronized (grabTaskLock) {
214 currentTask = path;
215 workerInGrabTask = true;
216 if (Thread.interrupted()) {
217 return;
218 }
219 }
220 try {
221 try {
222 if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) {
223 SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
224 return;
225 }
226 } catch (KeeperException e) {
227 LOG.warn("Failed to get data for znode " + path, e);
228 SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
229 return;
230 }
231 SplitLogTask slt;
232 try {
233 slt = SplitLogTask.parseFrom(data);
234 } catch (DeserializationException e) {
235 LOG.warn("Failed parse data for znode " + path, e);
236 SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
237 return;
238 }
239 if (!slt.isUnassigned()) {
240 SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
241 return;
242 }
243
244 currentVersion =
245 attemptToOwnTask(true, watcher, server.getServerName(), path,
246 slt.getMode(), stat.getVersion());
247 if (currentVersion < 0) {
248 SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
249 return;
250 }
251
252 if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
253 ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails =
254 new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails();
255 splitTaskDetails.setTaskNode(currentTask);
256 splitTaskDetails.setCurTaskZKVersion(new MutableInt(currentVersion));
257
258 endTask(new SplitLogTask.Done(server.getServerName(), slt.getMode()),
259 SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails);
260 return;
261 }
262
263 LOG.info("worker " + server.getServerName() + " acquired task " + path);
264 SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
265 getDataSetWatchAsync();
266
267 submitTask(path, slt.getMode(), currentVersion, reportPeriod);
268
269
270 try {
271 int sleepTime = RandomUtils.nextInt(500) + 500;
272 Thread.sleep(sleepTime);
273 } catch (InterruptedException e) {
274 LOG.warn("Interrupted while yielding for other region servers", e);
275 Thread.currentThread().interrupt();
276 }
277 } finally {
278 synchronized (grabTaskLock) {
279 workerInGrabTask = false;
280
281
282 Thread.interrupted();
283 }
284 }
285 }
286
287
288
289
290
291
292 void submitTask(final String curTask, final RecoveryMode mode, final int curTaskZKVersion,
293 final int reportPeriod) {
294 final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
295
296 CancelableProgressable reporter = new CancelableProgressable() {
297 private long last_report_at = 0;
298
299 @Override
300 public boolean progress() {
301 long t = EnvironmentEdgeManager.currentTime();
302 if ((t - last_report_at) > reportPeriod) {
303 last_report_at = t;
304 int latestZKVersion =
305 attemptToOwnTask(false, watcher, server.getServerName(), curTask,
306 mode, zkVersion.intValue());
307 if (latestZKVersion < 0) {
308 LOG.warn("Failed to heartbeat the task" + curTask);
309 return false;
310 }
311 zkVersion.setValue(latestZKVersion);
312 }
313 return true;
314 }
315 };
316 ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails =
317 new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails();
318 splitTaskDetails.setTaskNode(curTask);
319 splitTaskDetails.setCurTaskZKVersion(zkVersion);
320
321 WALSplitterHandler hsh =
322 new WALSplitterHandler(server, this, splitTaskDetails, reporter,
323 this.tasksInProgress, splitTaskExecutor, mode);
324 server.getExecutorService().submit(hsh);
325 }
326
327
328
329
330
331
332
333 private int calculateAvailableSplitters(int numTasks) {
334
335 int availableRSs = 1;
336 try {
337 List<String> regionServers =
338 ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode);
339 availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size());
340 } catch (KeeperException e) {
341
342 LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
343 }
344
345 int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1);
346 expectedTasksPerRS = Math.max(1, expectedTasksPerRS);
347
348 return Math.min(expectedTasksPerRS, maxConcurrentTasks)
349 - this.tasksInProgress.get();
350 }
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365 protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher zkw,
366 ServerName server, String task, RecoveryMode mode, int taskZKVersion) {
367 int latestZKVersion = FAILED_TO_OWN_TASK;
368 try {
369 SplitLogTask slt = new SplitLogTask.Owned(server, mode);
370 Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion);
371 if (stat == null) {
372 LOG.warn("zk.setData() returned null for path " + task);
373 SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
374 return FAILED_TO_OWN_TASK;
375 }
376 latestZKVersion = stat.getVersion();
377 SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet();
378 return latestZKVersion;
379 } catch (KeeperException e) {
380 if (!isFirstTime) {
381 if (e.code().equals(KeeperException.Code.NONODE)) {
382 LOG.warn("NONODE failed to assert ownership for " + task, e);
383 } else if (e.code().equals(KeeperException.Code.BADVERSION)) {
384 LOG.warn("BADVERSION failed to assert ownership for " + task, e);
385 } else {
386 LOG.warn("failed to assert ownership for " + task, e);
387 }
388 }
389 } catch (InterruptedException e1) {
390 LOG.warn("Interrupted while trying to assert ownership of " + task + " "
391 + StringUtils.stringifyException(e1));
392 Thread.currentThread().interrupt();
393 }
394 SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
395 return FAILED_TO_OWN_TASK;
396 }
397
398
399
400
401
402
403
404
405
406
407 @Override
408 public void taskLoop() throws InterruptedException {
409 while (!shouldStop) {
410 int seq_start = taskReadySeq.get();
411 List<String> paths = null;
412 paths = getTaskList();
413 if (paths == null) {
414 LOG.warn("Could not get tasks, did someone remove " + watcher.splitLogZNode
415 + " ... worker thread exiting.");
416 return;
417 }
418
419 int offset = (int) (Math.random() * paths.size());
420 for (int i = 0; i < paths.size(); i++) {
421 if (DefaultWALProvider.isMetaFile(paths.get(i))) {
422 offset = i;
423 break;
424 }
425 }
426 int numTasks = paths.size();
427 for (int i = 0; i < numTasks; i++) {
428 int idx = (i + offset) % paths.size();
429
430
431 if (this.calculateAvailableSplitters(numTasks) > 0) {
432 grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
433 } else {
434 LOG.debug("Current region server " + server.getServerName() + " has "
435 + this.tasksInProgress.get() + " tasks in progress and can't take more.");
436 break;
437 }
438 if (shouldStop) {
439 return;
440 }
441 }
442 SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
443 synchronized (taskReadyLock) {
444 while (seq_start == taskReadySeq.get()) {
445 taskReadyLock.wait(checkInterval);
446 if (server != null) {
447
448 Map<String, Region> recoveringRegions = server.getRecoveringRegions();
449 if (!recoveringRegions.isEmpty()) {
450
451
452 List<String> tmpCopy = new ArrayList<String>(recoveringRegions.keySet());
453 int listSize = tmpCopy.size();
454 for (int i = 0; i < listSize; i++) {
455 String region = tmpCopy.get(i);
456 String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
457 try {
458 if (ZKUtil.checkExists(watcher, nodePath) == -1) {
459 server.getExecutorService().submit(
460 new FinishRegionRecoveringHandler(server, region, nodePath));
461 } else {
462
463
464
465
466
467 break;
468 }
469 } catch (KeeperException e) {
470
471 LOG.debug("Got a zookeeper when trying to open a recovering region", e);
472 break;
473 }
474 }
475 }
476 }
477 }
478 }
479 }
480 }
481
482 private List<String> getTaskList() throws InterruptedException {
483 List<String> childrenPaths = null;
484 long sleepTime = 1000;
485
486
487 while (!shouldStop) {
488 try {
489 childrenPaths =
490 ZKUtil.listChildrenAndWatchForNewChildren(watcher,
491 watcher.splitLogZNode);
492 if (childrenPaths != null) {
493 return childrenPaths;
494 }
495 } catch (KeeperException e) {
496 LOG.warn("Could not get children of znode " + watcher.splitLogZNode, e);
497 }
498 LOG.debug("Retry listChildren of znode " + watcher.splitLogZNode
499 + " after sleep for " + sleepTime + "ms!");
500 Thread.sleep(sleepTime);
501 }
502 return childrenPaths;
503 }
504
505 @Override
506 public void markCorrupted(Path rootDir, String name, FileSystem fs) {
507 ZKSplitLog.markCorrupted(rootDir, name, fs);
508 }
509
510 @Override
511 public boolean isReady() throws InterruptedException {
512 int result = -1;
513 try {
514 result = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
515 } catch (KeeperException e) {
516
517 LOG.warn("Exception when checking for " + watcher.splitLogZNode
518 + " ... retrying", e);
519 }
520 if (result == -1) {
521 LOG.info(watcher.splitLogZNode
522 + " znode does not exist, waiting for master to create");
523 Thread.sleep(1000);
524 }
525 return (result != -1);
526 }
527
528 @Override
529 public int getTaskReadySeq() {
530 return taskReadySeq.get();
531 }
532
533 @Override
534 public void registerListener() {
535 watcher.registerListener(this);
536 }
537
538 @Override
539 public void removeListener() {
540 watcher.unregisterListener(this);
541 }
542
543
544 @Override
545 public void stopProcessingTasks() {
546 this.shouldStop = true;
547
548 }
549
550 @Override
551 public boolean isStop() {
552 return shouldStop;
553 }
554
555 @Override
556 public RegionStoreSequenceIds getRegionFlushedSequenceId(String failedServerName, String key)
557 throws IOException {
558 return ZKSplitLog.getRegionFlushedSequenceId(watcher, failedServerName, key);
559 }
560
561
562
563
564 class GetDataAsyncCallback implements AsyncCallback.DataCallback {
565 private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
566
567 @Override
568 public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
569 SplitLogCounters.tot_wkr_get_data_result.incrementAndGet();
570 if (rc != 0) {
571 LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
572 getDataSetWatchFailure(path);
573 return;
574 }
575 data = watcher.getRecoverableZooKeeper().removeMetaData(data);
576 getDataSetWatchSuccess(path, data);
577 }
578 }
579
580
581
582
583
584
585
586
587
588
589 @Override
590 public void endTask(SplitLogTask slt, AtomicLong ctr, SplitTaskDetails details) {
591 ZkSplitTaskDetails zkDetails = (ZkSplitTaskDetails) details;
592 String task = zkDetails.getTaskNode();
593 int taskZKVersion = zkDetails.getCurTaskZKVersion().intValue();
594 try {
595 if (ZKUtil.setData(watcher, task, slt.toByteArray(), taskZKVersion)) {
596 LOG.info("successfully transitioned task " + task + " to final state " + slt);
597 ctr.incrementAndGet();
598 return;
599 }
600 LOG.warn("failed to transistion task " + task + " to end state " + slt
601 + " because of version mismatch ");
602 } catch (KeeperException.BadVersionException bve) {
603 LOG.warn("transisition task " + task + " to " + slt + " failed because of version mismatch",
604 bve);
605 } catch (KeeperException.NoNodeException e) {
606 LOG.fatal(
607 "logic error - end task " + task + " " + slt + " failed because task doesn't exist", e);
608 } catch (KeeperException e) {
609 LOG.warn("failed to end task, " + task + " " + slt, e);
610 }
611 SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
612 }
613
614
615
616
617
618 public static class ZkSplitTaskDetails implements SplitTaskDetails {
619 private String taskNode;
620 private MutableInt curTaskZKVersion;
621
622 public ZkSplitTaskDetails() {
623 }
624
625 public ZkSplitTaskDetails(String taskNode, MutableInt curTaskZKVersion) {
626 this.taskNode = taskNode;
627 this.curTaskZKVersion = curTaskZKVersion;
628 }
629
630 public String getTaskNode() {
631 return taskNode;
632 }
633
634 public void setTaskNode(String taskNode) {
635 this.taskNode = taskNode;
636 }
637
638 public MutableInt getCurTaskZKVersion() {
639 return curTaskZKVersion;
640 }
641
642 public void setCurTaskZKVersion(MutableInt curTaskZKVersion) {
643 this.curTaskZKVersion = curTaskZKVersion;
644 }
645
646 @Override
647 public String getWALFile() {
648 return ZKSplitLog.getFileName(taskNode);
649 }
650 }
651
652 }