001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.coordination; 021 022import java.util.Collections; 023import java.util.List; 024import java.util.concurrent.atomic.AtomicInteger; 025import java.util.concurrent.atomic.LongAdder; 026 027import org.apache.commons.lang3.RandomUtils; 028import org.apache.commons.lang3.mutable.MutableInt; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.SplitLogCounters; 035import org.apache.hadoop.hbase.SplitLogTask; 036import org.apache.hadoop.hbase.exceptions.DeserializationException; 037import org.apache.hadoop.hbase.log.HBaseMarkers; 038import org.apache.hadoop.hbase.regionserver.RegionServerServices; 039import org.apache.hadoop.hbase.regionserver.SplitLogWorker; 040import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor; 041import org.apache.hadoop.hbase.regionserver.handler.WALSplitterHandler; 042import org.apache.hadoop.hbase.util.CancelableProgressable; 043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 044import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 045import org.apache.hadoop.hbase.zookeeper.ZKListener; 046import org.apache.hadoop.hbase.zookeeper.ZKMetadata; 047import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 048import org.apache.hadoop.hbase.zookeeper.ZKUtil; 049import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 050import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 051import org.apache.hadoop.util.StringUtils; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.apache.zookeeper.AsyncCallback; 054import org.apache.zookeeper.KeeperException; 055import org.apache.zookeeper.data.Stat; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059/** 060 * ZooKeeper based implementation of {@link SplitLogWorkerCoordination} 061 * It listen for changes in ZooKeeper and 062 * 063 */ 064@InterfaceAudience.Private 065public class ZkSplitLogWorkerCoordination extends ZKListener implements 066 SplitLogWorkerCoordination { 067 068 private static final Logger LOG = LoggerFactory.getLogger(ZkSplitLogWorkerCoordination.class); 069 070 private static final int checkInterval = 5000; // 5 seconds 071 private static final int FAILED_TO_OWN_TASK = -1; 072 073 private SplitLogWorker worker; 074 075 private TaskExecutor splitTaskExecutor; 076 077 private final AtomicInteger taskReadySeq = new AtomicInteger(0); 078 private volatile String currentTask = null; 079 private int currentVersion; 080 private volatile boolean shouldStop = false; 081 private final Object grabTaskLock = new Object(); 082 private boolean workerInGrabTask = false; 083 private int reportPeriod; 084 private RegionServerServices server = null; 085 protected final AtomicInteger tasksInProgress = new AtomicInteger(0); 086 private int maxConcurrentTasks = 0; 087 088 private final ServerName serverName; 089 090 public ZkSplitLogWorkerCoordination(ServerName serverName, ZKWatcher watcher) { 091 super(watcher); 092 this.serverName = serverName; 093 } 094 095 /** 096 * Override handler from {@link ZKListener} 097 */ 098 @Override 099 public void nodeChildrenChanged(String path) { 100 if (path.equals(watcher.znodePaths.splitLogZNode)) { 101 if (LOG.isTraceEnabled()) { 102 LOG.trace("tasks arrived or departed on " + path); 103 } 104 synchronized (taskReadySeq) { 105 this.taskReadySeq.incrementAndGet(); 106 taskReadySeq.notify(); 107 } 108 } 109 } 110 111 /** 112 * Override handler from {@link ZKListener} 113 */ 114 @Override 115 public void nodeDataChanged(String path) { 116 // there will be a self generated dataChanged event every time attemptToOwnTask() 117 // heartbeats the task znode by upping its version 118 synchronized (grabTaskLock) { 119 if (workerInGrabTask) { 120 // currentTask can change 121 String taskpath = currentTask; 122 if (taskpath != null && taskpath.equals(path)) { 123 getDataSetWatchAsync(); 124 } 125 } 126 } 127 } 128 129 /** 130 * Override setter from {@link SplitLogWorkerCoordination} 131 */ 132 @Override 133 public void init(RegionServerServices server, Configuration conf, 134 TaskExecutor splitExecutor, SplitLogWorker worker) { 135 this.server = server; 136 this.worker = worker; 137 this.splitTaskExecutor = splitExecutor; 138 maxConcurrentTasks = conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS); 139 reportPeriod = 140 conf.getInt("hbase.splitlog.report.period", 141 conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, 142 ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT) / 3); 143 } 144 145 /* Support functions for ZooKeeper async callback */ 146 147 void getDataSetWatchFailure(String path) { 148 synchronized (grabTaskLock) { 149 if (workerInGrabTask) { 150 // currentTask can change but that's ok 151 String taskpath = currentTask; 152 if (taskpath != null && taskpath.equals(path)) { 153 LOG.info("retrying data watch on " + path); 154 SplitLogCounters.tot_wkr_get_data_retry.increment(); 155 getDataSetWatchAsync(); 156 } else { 157 // no point setting a watch on the task which this worker is not 158 // working upon anymore 159 } 160 } 161 } 162 } 163 164 public void getDataSetWatchAsync() { 165 watcher.getRecoverableZooKeeper().getZooKeeper() 166 .getData(currentTask, watcher, new GetDataAsyncCallback(), null); 167 SplitLogCounters.tot_wkr_get_data_queued.increment(); 168 } 169 170 void getDataSetWatchSuccess(String path, byte[] data) { 171 SplitLogTask slt; 172 try { 173 slt = SplitLogTask.parseFrom(data); 174 } catch (DeserializationException e) { 175 LOG.warn("Failed parse", e); 176 return; 177 } 178 synchronized (grabTaskLock) { 179 if (workerInGrabTask) { 180 // currentTask can change but that's ok 181 String taskpath = currentTask; 182 if (taskpath != null && taskpath.equals(path)) { 183 // have to compare data. cannot compare version because then there 184 // will be race with attemptToOwnTask() 185 // cannot just check whether the node has been transitioned to 186 // UNASSIGNED because by the time this worker sets the data watch 187 // the node might have made two transitions - from owned by this 188 // worker to unassigned to owned by another worker 189 if (!slt.isOwned(serverName) && !slt.isDone(serverName) && !slt.isErr(serverName) 190 && !slt.isResigned(serverName)) { 191 LOG.info("task " + taskpath + " preempted from " + serverName 192 + ", current task state and owner=" + slt.toString()); 193 worker.stopTask(); 194 } 195 } 196 } 197 } 198 } 199 200 /** 201 * try to grab a 'lock' on the task zk node to own and execute the task. 202 * <p> 203 * @param path zk node for the task 204 * @return boolean value when grab a task success return true otherwise false 205 */ 206 private boolean grabTask(String path) { 207 Stat stat = new Stat(); 208 byte[] data; 209 synchronized (grabTaskLock) { 210 currentTask = path; 211 workerInGrabTask = true; 212 if (Thread.interrupted()) { 213 return false; 214 } 215 } 216 try { 217 try { 218 if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) { 219 SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.increment(); 220 return false; 221 } 222 } catch (KeeperException e) { 223 LOG.warn("Failed to get data for znode " + path, e); 224 SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment(); 225 return false; 226 } 227 SplitLogTask slt; 228 try { 229 slt = SplitLogTask.parseFrom(data); 230 } catch (DeserializationException e) { 231 LOG.warn("Failed parse data for znode " + path, e); 232 SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment(); 233 return false; 234 } 235 if (!slt.isUnassigned()) { 236 SplitLogCounters.tot_wkr_failed_to_grab_task_owned.increment(); 237 return false; 238 } 239 240 currentVersion = 241 attemptToOwnTask(true, watcher, server.getServerName(), path, stat.getVersion()); 242 if (currentVersion < 0) { 243 SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.increment(); 244 return false; 245 } 246 247 if (ZKSplitLog.isRescanNode(watcher, currentTask)) { 248 ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails = 249 new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails(); 250 splitTaskDetails.setTaskNode(currentTask); 251 splitTaskDetails.setCurTaskZKVersion(new MutableInt(currentVersion)); 252 253 endTask(new SplitLogTask.Done(server.getServerName()), 254 SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails); 255 return false; 256 } 257 258 LOG.info("worker " + server.getServerName() + " acquired task " + path); 259 SplitLogCounters.tot_wkr_task_acquired.increment(); 260 getDataSetWatchAsync(); 261 262 submitTask(path, currentVersion, reportPeriod); 263 264 // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks 265 try { 266 int sleepTime = RandomUtils.nextInt(0, 500) + 500; 267 Thread.sleep(sleepTime); 268 } catch (InterruptedException e) { 269 LOG.warn("Interrupted while yielding for other region servers", e); 270 Thread.currentThread().interrupt(); 271 } 272 return true; 273 } finally { 274 synchronized (grabTaskLock) { 275 workerInGrabTask = false; 276 // clear the interrupt from stopTask() otherwise the next task will 277 // suffer 278 Thread.interrupted(); 279 } 280 } 281 } 282 283 /** 284 * Submit a log split task to executor service 285 * @param curTask task to submit 286 * @param curTaskZKVersion current version of task 287 */ 288 void submitTask(final String curTask, final int curTaskZKVersion, final int reportPeriod) { 289 final MutableInt zkVersion = new MutableInt(curTaskZKVersion); 290 291 CancelableProgressable reporter = new CancelableProgressable() { 292 private long last_report_at = 0; 293 294 @Override 295 public boolean progress() { 296 long t = EnvironmentEdgeManager.currentTime(); 297 if ((t - last_report_at) > reportPeriod) { 298 last_report_at = t; 299 int latestZKVersion = 300 attemptToOwnTask(false, watcher, server.getServerName(), curTask, 301 zkVersion.intValue()); 302 if (latestZKVersion < 0) { 303 LOG.warn("Failed to heartbeat the task" + curTask); 304 return false; 305 } 306 zkVersion.setValue(latestZKVersion); 307 } 308 return true; 309 } 310 }; 311 ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails = 312 new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails(); 313 splitTaskDetails.setTaskNode(curTask); 314 splitTaskDetails.setCurTaskZKVersion(zkVersion); 315 316 WALSplitterHandler hsh = 317 new WALSplitterHandler(server, this, splitTaskDetails, reporter, 318 this.tasksInProgress, splitTaskExecutor); 319 server.getExecutorService().submit(hsh); 320 } 321 322 /** 323 * This function calculates how many splitters this RS should create based on expected average 324 * tasks per RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br> 325 * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound) 326 * @param numTasks total number of split tasks available 327 * @return number of tasks this RS can grab 328 */ 329 private int getNumExpectedTasksPerRS(int numTasks) { 330 // at lease one RS(itself) available 331 int availableRSs = 1; 332 try { 333 List<String> regionServers = 334 ZKUtil.listChildrenNoWatch(watcher, watcher.znodePaths.rsZNode); 335 availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size()); 336 } catch (KeeperException e) { 337 // do nothing 338 LOG.debug("getAvailableRegionServers got ZooKeeper exception", e); 339 } 340 int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1); 341 return Math.max(1, expectedTasksPerRS); // at least be one 342 } 343 344 /** 345 * @param expectedTasksPerRS Average number of tasks to be handled by each RS 346 * @return true if more splitters are available, otherwise false. 347 */ 348 private boolean areSplittersAvailable(int expectedTasksPerRS) { 349 return (Math.min(expectedTasksPerRS, maxConcurrentTasks) 350 - this.tasksInProgress.get()) > 0; 351 } 352 353 /** 354 * Try to own the task by transitioning the zk node data from UNASSIGNED to OWNED. 355 * <p> 356 * This method is also used to periodically heartbeat the task progress by transitioning the node 357 * from OWNED to OWNED. 358 * <p> 359 * @param isFirstTime shows whther it's the first attempt. 360 * @param zkw zk wathcer 361 * @param server name 362 * @param task to own 363 * @param taskZKVersion version of the task in zk 364 * @return non-negative integer value when task can be owned by current region server otherwise -1 365 */ 366 protected static int attemptToOwnTask(boolean isFirstTime, ZKWatcher zkw, 367 ServerName server, String task, int taskZKVersion) { 368 int latestZKVersion = FAILED_TO_OWN_TASK; 369 try { 370 SplitLogTask slt = new SplitLogTask.Owned(server); 371 Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion); 372 if (stat == null) { 373 LOG.warn("zk.setData() returned null for path " + task); 374 SplitLogCounters.tot_wkr_task_heartbeat_failed.increment(); 375 return FAILED_TO_OWN_TASK; 376 } 377 latestZKVersion = stat.getVersion(); 378 SplitLogCounters.tot_wkr_task_heartbeat.increment(); 379 return latestZKVersion; 380 } catch (KeeperException e) { 381 if (!isFirstTime) { 382 if (e.code().equals(KeeperException.Code.NONODE)) { 383 LOG.warn("NONODE failed to assert ownership for " + task, e); 384 } else if (e.code().equals(KeeperException.Code.BADVERSION)) { 385 LOG.warn("BADVERSION failed to assert ownership for " + task, e); 386 } else { 387 LOG.warn("failed to assert ownership for " + task, e); 388 } 389 } 390 } catch (InterruptedException e1) { 391 LOG.warn("Interrupted while trying to assert ownership of " + task + " " 392 + StringUtils.stringifyException(e1)); 393 Thread.currentThread().interrupt(); 394 } 395 SplitLogCounters.tot_wkr_task_heartbeat_failed.increment(); 396 return FAILED_TO_OWN_TASK; 397 } 398 399 /** 400 * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task one at a time. This 401 * policy puts an upper-limit on the number of simultaneous log splitting that could be happening 402 * in a cluster. 403 * <p> 404 * Synchronization using <code>taskReadySeq</code> ensures that it will try to grab every task 405 * that has been put up 406 * @throws InterruptedException 407 */ 408 @Override 409 public void taskLoop() throws InterruptedException { 410 while (!shouldStop) { 411 int seq_start = taskReadySeq.get(); 412 List<String> paths; 413 paths = getTaskList(); 414 if (paths == null) { 415 LOG.warn("Could not get tasks, did someone remove " + watcher.znodePaths.splitLogZNode 416 + " ... worker thread exiting."); 417 return; 418 } 419 // shuffle the paths to prevent different split log worker start from the same log file after 420 // meta log (if any) 421 Collections.shuffle(paths); 422 // pick meta wal firstly 423 int offset = 0; 424 for (int i = 0; i < paths.size(); i++) { 425 if (AbstractFSWALProvider.isMetaFile(paths.get(i))) { 426 offset = i; 427 break; 428 } 429 } 430 int numTasks = paths.size(); 431 int expectedTasksPerRS = getNumExpectedTasksPerRS(numTasks); 432 boolean taskGrabbed = false; 433 for (int i = 0; i < numTasks; i++) { 434 while (!shouldStop) { 435 if (this.areSplittersAvailable(expectedTasksPerRS)) { 436 LOG.debug("Current region server " + server.getServerName() 437 + " is ready to take more tasks, will get task list and try grab tasks again."); 438 int idx = (i + offset) % paths.size(); 439 // don't call ZKSplitLog.getNodeName() because that will lead to 440 // double encoding of the path name 441 taskGrabbed |= grabTask(ZNodePaths.joinZNode( 442 watcher.znodePaths.splitLogZNode, paths.get(idx))); 443 break; 444 } else { 445 LOG.debug("Current region server " + server.getServerName() + " has " 446 + this.tasksInProgress.get() + " tasks in progress and can't take more."); 447 Thread.sleep(100); 448 } 449 } 450 if (shouldStop) { 451 return; 452 } 453 } 454 if (!taskGrabbed && !shouldStop) { 455 // do not grab any tasks, sleep a little bit to reduce zk request. 456 Thread.sleep(1000); 457 } 458 SplitLogCounters.tot_wkr_task_grabing.increment(); 459 synchronized (taskReadySeq) { 460 while (seq_start == taskReadySeq.get()) { 461 taskReadySeq.wait(checkInterval); 462 } 463 } 464 } 465 } 466 467 private List<String> getTaskList() throws InterruptedException { 468 List<String> childrenPaths = null; 469 long sleepTime = 1000; 470 // It will be in loop till it gets the list of children or 471 // it will come out if worker thread exited. 472 while (!shouldStop) { 473 try { 474 childrenPaths = ZKUtil.listChildrenAndWatchForNewChildren(watcher, 475 watcher.znodePaths.splitLogZNode); 476 if (childrenPaths != null) { 477 return childrenPaths; 478 } 479 } catch (KeeperException e) { 480 LOG.warn("Could not get children of znode " + watcher.znodePaths.splitLogZNode, e); 481 } 482 LOG.debug("Retry listChildren of znode " + watcher.znodePaths.splitLogZNode 483 + " after sleep for " + sleepTime + "ms!"); 484 Thread.sleep(sleepTime); 485 } 486 return childrenPaths; 487 } 488 489 @Override 490 public void markCorrupted(Path rootDir, String name, FileSystem fs) { 491 ZKSplitLog.markCorrupted(rootDir, name, fs); 492 } 493 494 @Override 495 public boolean isReady() throws InterruptedException { 496 int result = -1; 497 try { 498 result = ZKUtil.checkExists(watcher, watcher.znodePaths.splitLogZNode); 499 } catch (KeeperException e) { 500 // ignore 501 LOG.warn("Exception when checking for " + watcher.znodePaths.splitLogZNode 502 + " ... retrying", e); 503 } 504 if (result == -1) { 505 LOG.info(watcher.znodePaths.splitLogZNode 506 + " znode does not exist, waiting for master to create"); 507 Thread.sleep(1000); 508 } 509 return (result != -1); 510 } 511 512 @Override 513 public int getTaskReadySeq() { 514 return taskReadySeq.get(); 515 } 516 517 @Override 518 public void registerListener() { 519 watcher.registerListener(this); 520 } 521 522 @Override 523 public void removeListener() { 524 watcher.unregisterListener(this); 525 } 526 527 528 @Override 529 public void stopProcessingTasks() { 530 this.shouldStop = true; 531 532 } 533 534 @Override 535 public boolean isStop() { 536 return shouldStop; 537 } 538 539 /** 540 * Asynchronous handler for zk get-data-set-watch on node results. 541 */ 542 class GetDataAsyncCallback implements AsyncCallback.DataCallback { 543 private final Logger LOG = LoggerFactory.getLogger(GetDataAsyncCallback.class); 544 545 @Override 546 public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { 547 SplitLogCounters.tot_wkr_get_data_result.increment(); 548 if (rc != 0) { 549 LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path); 550 getDataSetWatchFailure(path); 551 return; 552 } 553 data = ZKMetadata.removeMetaData(data); 554 getDataSetWatchSuccess(path, data); 555 } 556 } 557 558 /* 559 * Next part is related to WALSplitterHandler 560 */ 561 /** 562 * endTask() can fail and the only way to recover out of it is for the 563 * {@link org.apache.hadoop.hbase.master.SplitLogManager} to timeout the task node. 564 * @param slt 565 * @param ctr 566 */ 567 @Override 568 public void endTask(SplitLogTask slt, LongAdder ctr, SplitTaskDetails details) { 569 ZkSplitTaskDetails zkDetails = (ZkSplitTaskDetails) details; 570 String task = zkDetails.getTaskNode(); 571 int taskZKVersion = zkDetails.getCurTaskZKVersion().intValue(); 572 try { 573 if (ZKUtil.setData(watcher, task, slt.toByteArray(), taskZKVersion)) { 574 LOG.info("successfully transitioned task " + task + " to final state " + slt); 575 if (ctr != null) { 576 ctr.increment(); 577 } 578 return; 579 } 580 LOG.warn("failed to transistion task " + task + " to end state " + slt 581 + " because of version mismatch "); 582 } catch (KeeperException.BadVersionException bve) { 583 LOG.warn("transisition task " + task + " to " + slt + " failed because of version mismatch", 584 bve); 585 } catch (KeeperException.NoNodeException e) { 586 LOG.error(HBaseMarkers.FATAL, 587 "logic error - end task " + task + " " + slt + " failed because task doesn't exist", e); 588 } catch (KeeperException e) { 589 LOG.warn("failed to end task, " + task + " " + slt, e); 590 } 591 SplitLogCounters.tot_wkr_final_transition_failed.increment(); 592 } 593 594 /** 595 * When ZK-based implementation wants to complete the task, it needs to know task znode and 596 * current znode cversion (needed for subsequent update operation). 597 */ 598 public static class ZkSplitTaskDetails implements SplitTaskDetails { 599 private String taskNode; 600 private MutableInt curTaskZKVersion; 601 602 public ZkSplitTaskDetails() { 603 } 604 605 public ZkSplitTaskDetails(String taskNode, MutableInt curTaskZKVersion) { 606 this.taskNode = taskNode; 607 this.curTaskZKVersion = curTaskZKVersion; 608 } 609 610 public String getTaskNode() { 611 return taskNode; 612 } 613 614 public void setTaskNode(String taskNode) { 615 this.taskNode = taskNode; 616 } 617 618 public MutableInt getCurTaskZKVersion() { 619 return curTaskZKVersion; 620 } 621 622 public void setCurTaskZKVersion(MutableInt curTaskZKVersion) { 623 this.curTaskZKVersion = curTaskZKVersion; 624 } 625 626 @Override 627 public String getWALFile() { 628 return ZKSplitLog.getFileName(taskNode); 629 } 630 } 631 632}