001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.coordination; 021 022import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; 023import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 024 025import java.util.Collections; 026import java.util.List; 027import java.util.concurrent.atomic.AtomicInteger; 028import java.util.concurrent.atomic.LongAdder; 029 030import org.apache.commons.lang3.RandomUtils; 031import org.apache.commons.lang3.mutable.MutableInt; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.SplitLogCounters; 038import org.apache.hadoop.hbase.SplitLogTask; 039import org.apache.hadoop.hbase.exceptions.DeserializationException; 040import org.apache.hadoop.hbase.log.HBaseMarkers; 041import org.apache.hadoop.hbase.regionserver.RegionServerServices; 042import org.apache.hadoop.hbase.regionserver.SplitLogWorker; 043import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor; 044import org.apache.hadoop.hbase.regionserver.handler.WALSplitterHandler; 045import org.apache.hadoop.hbase.util.CancelableProgressable; 046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 047import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 048import org.apache.hadoop.hbase.zookeeper.ZKListener; 049import org.apache.hadoop.hbase.zookeeper.ZKMetadata; 050import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 051import org.apache.hadoop.hbase.zookeeper.ZKUtil; 052import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 053import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 054import org.apache.hadoop.util.StringUtils; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.apache.zookeeper.AsyncCallback; 057import org.apache.zookeeper.KeeperException; 058import org.apache.zookeeper.data.Stat; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062/** 063 * ZooKeeper based implementation of {@link SplitLogWorkerCoordination} 064 * It listen for changes in ZooKeeper and 065 * 066 */ 067@InterfaceAudience.Private 068public class ZkSplitLogWorkerCoordination extends ZKListener implements 069 SplitLogWorkerCoordination { 070 071 private static final Logger LOG = LoggerFactory.getLogger(ZkSplitLogWorkerCoordination.class); 072 073 private static final int checkInterval = 5000; // 5 seconds 074 private static final int FAILED_TO_OWN_TASK = -1; 075 076 private SplitLogWorker worker; 077 078 private TaskExecutor splitTaskExecutor; 079 080 private final AtomicInteger taskReadySeq = new AtomicInteger(0); 081 private volatile String currentTask = null; 082 private int currentVersion; 083 private volatile boolean shouldStop = false; 084 private final Object grabTaskLock = new Object(); 085 private boolean workerInGrabTask = false; 086 private int reportPeriod; 087 private RegionServerServices server = null; 088 protected final AtomicInteger tasksInProgress = new AtomicInteger(0); 089 private int maxConcurrentTasks = 0; 090 091 private final ServerName serverName; 092 093 public ZkSplitLogWorkerCoordination(ServerName serverName, ZKWatcher watcher) { 094 super(watcher); 095 this.serverName = serverName; 096 } 097 098 /** 099 * Override handler from {@link ZKListener} 100 */ 101 @Override 102 public void nodeChildrenChanged(String path) { 103 if (path.equals(watcher.getZNodePaths().splitLogZNode)) { 104 if (LOG.isTraceEnabled()) { 105 LOG.trace("tasks arrived or departed on " + path); 106 } 107 synchronized (taskReadySeq) { 108 this.taskReadySeq.incrementAndGet(); 109 taskReadySeq.notify(); 110 } 111 } 112 } 113 114 /** 115 * Override handler from {@link ZKListener} 116 */ 117 @Override 118 public void nodeDataChanged(String path) { 119 // there will be a self generated dataChanged event every time attemptToOwnTask() 120 // heartbeats the task znode by upping its version 121 synchronized (grabTaskLock) { 122 if (workerInGrabTask) { 123 // currentTask can change 124 String taskpath = currentTask; 125 if (taskpath != null && taskpath.equals(path)) { 126 getDataSetWatchAsync(); 127 } 128 } 129 } 130 } 131 132 /** 133 * Override setter from {@link SplitLogWorkerCoordination} 134 */ 135 @Override 136 public void init(RegionServerServices server, Configuration conf, 137 TaskExecutor splitExecutor, SplitLogWorker worker) { 138 this.server = server; 139 this.worker = worker; 140 this.splitTaskExecutor = splitExecutor; 141 maxConcurrentTasks = 142 conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER); 143 reportPeriod = 144 conf.getInt("hbase.splitlog.report.period", 145 conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, 146 ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT) / 3); 147 } 148 149 /* Support functions for ZooKeeper async callback */ 150 151 void getDataSetWatchFailure(String path) { 152 synchronized (grabTaskLock) { 153 if (workerInGrabTask) { 154 // currentTask can change but that's ok 155 String taskpath = currentTask; 156 if (taskpath != null && taskpath.equals(path)) { 157 LOG.info("retrying data watch on " + path); 158 SplitLogCounters.tot_wkr_get_data_retry.increment(); 159 getDataSetWatchAsync(); 160 } else { 161 // no point setting a watch on the task which this worker is not 162 // working upon anymore 163 } 164 } 165 } 166 } 167 168 public void getDataSetWatchAsync() { 169 watcher.getRecoverableZooKeeper().getZooKeeper() 170 .getData(currentTask, watcher, new GetDataAsyncCallback(), null); 171 SplitLogCounters.tot_wkr_get_data_queued.increment(); 172 } 173 174 void getDataSetWatchSuccess(String path, byte[] data) { 175 SplitLogTask slt; 176 try { 177 slt = SplitLogTask.parseFrom(data); 178 } catch (DeserializationException e) { 179 LOG.warn("Failed parse", e); 180 return; 181 } 182 synchronized (grabTaskLock) { 183 if (workerInGrabTask) { 184 // currentTask can change but that's ok 185 String taskpath = currentTask; 186 if (taskpath != null && taskpath.equals(path)) { 187 // have to compare data. cannot compare version because then there 188 // will be race with attemptToOwnTask() 189 // cannot just check whether the node has been transitioned to 190 // UNASSIGNED because by the time this worker sets the data watch 191 // the node might have made two transitions - from owned by this 192 // worker to unassigned to owned by another worker 193 if (!slt.isOwned(serverName) && !slt.isDone(serverName) && !slt.isErr(serverName) 194 && !slt.isResigned(serverName)) { 195 LOG.info("task " + taskpath + " preempted from " + serverName 196 + ", current task state and owner=" + slt.toString()); 197 worker.stopTask(); 198 } 199 } 200 } 201 } 202 } 203 204 /** 205 * try to grab a 'lock' on the task zk node to own and execute the task. 206 * <p> 207 * @param path zk node for the task 208 * @return boolean value when grab a task success return true otherwise false 209 */ 210 private boolean grabTask(String path) { 211 Stat stat = new Stat(); 212 byte[] data; 213 synchronized (grabTaskLock) { 214 currentTask = path; 215 workerInGrabTask = true; 216 if (Thread.interrupted()) { 217 return false; 218 } 219 } 220 try { 221 try { 222 if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) { 223 SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.increment(); 224 return false; 225 } 226 } catch (KeeperException e) { 227 LOG.warn("Failed to get data for znode " + path, e); 228 SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment(); 229 return false; 230 } 231 SplitLogTask slt; 232 try { 233 slt = SplitLogTask.parseFrom(data); 234 } catch (DeserializationException e) { 235 LOG.warn("Failed parse data for znode " + path, e); 236 SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment(); 237 return false; 238 } 239 if (!slt.isUnassigned()) { 240 SplitLogCounters.tot_wkr_failed_to_grab_task_owned.increment(); 241 return false; 242 } 243 244 currentVersion = 245 attemptToOwnTask(true, watcher, server.getServerName(), path, stat.getVersion()); 246 if (currentVersion < 0) { 247 SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.increment(); 248 return false; 249 } 250 251 if (ZKSplitLog.isRescanNode(watcher, currentTask)) { 252 ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails = 253 new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails(); 254 splitTaskDetails.setTaskNode(currentTask); 255 splitTaskDetails.setCurTaskZKVersion(new MutableInt(currentVersion)); 256 257 endTask(new SplitLogTask.Done(server.getServerName()), 258 SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails); 259 return false; 260 } 261 262 LOG.info("worker " + server.getServerName() + " acquired task " + path); 263 SplitLogCounters.tot_wkr_task_acquired.increment(); 264 getDataSetWatchAsync(); 265 266 submitTask(path, currentVersion, reportPeriod); 267 268 // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks 269 try { 270 int sleepTime = RandomUtils.nextInt(0, 500) + 500; 271 Thread.sleep(sleepTime); 272 } catch (InterruptedException e) { 273 LOG.warn("Interrupted while yielding for other region servers", e); 274 Thread.currentThread().interrupt(); 275 } 276 return true; 277 } finally { 278 synchronized (grabTaskLock) { 279 workerInGrabTask = false; 280 // clear the interrupt from stopTask() otherwise the next task will 281 // suffer 282 Thread.interrupted(); 283 } 284 } 285 } 286 287 /** 288 * Submit a log split task to executor service 289 * @param curTask task to submit 290 * @param curTaskZKVersion current version of task 291 */ 292 void submitTask(final String curTask, final int curTaskZKVersion, final int reportPeriod) { 293 final MutableInt zkVersion = new MutableInt(curTaskZKVersion); 294 295 CancelableProgressable reporter = new CancelableProgressable() { 296 private long last_report_at = 0; 297 298 @Override 299 public boolean progress() { 300 long t = EnvironmentEdgeManager.currentTime(); 301 if ((t - last_report_at) > reportPeriod) { 302 last_report_at = t; 303 int latestZKVersion = 304 attemptToOwnTask(false, watcher, server.getServerName(), curTask, 305 zkVersion.intValue()); 306 if (latestZKVersion < 0) { 307 LOG.warn("Failed to heartbeat the task" + curTask); 308 return false; 309 } 310 zkVersion.setValue(latestZKVersion); 311 } 312 return true; 313 } 314 }; 315 ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails = 316 new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails(); 317 splitTaskDetails.setTaskNode(curTask); 318 splitTaskDetails.setCurTaskZKVersion(zkVersion); 319 320 WALSplitterHandler hsh = 321 new WALSplitterHandler(server, this, splitTaskDetails, reporter, 322 this.tasksInProgress, splitTaskExecutor); 323 server.getExecutorService().submit(hsh); 324 } 325 326 /** 327 * @return true if more splitters are available, otherwise false. 328 */ 329 private boolean areSplittersAvailable() { 330 return maxConcurrentTasks - tasksInProgress.get() > 0; 331 } 332 333 /** 334 * Try to own the task by transitioning the zk node data from UNASSIGNED to OWNED. 335 * <p> 336 * This method is also used to periodically heartbeat the task progress by transitioning the node 337 * from OWNED to OWNED. 338 * <p> 339 * @param isFirstTime shows whther it's the first attempt. 340 * @param zkw zk wathcer 341 * @param server name 342 * @param task to own 343 * @param taskZKVersion version of the task in zk 344 * @return non-negative integer value when task can be owned by current region server otherwise -1 345 */ 346 protected static int attemptToOwnTask(boolean isFirstTime, ZKWatcher zkw, 347 ServerName server, String task, int taskZKVersion) { 348 int latestZKVersion = FAILED_TO_OWN_TASK; 349 try { 350 SplitLogTask slt = new SplitLogTask.Owned(server); 351 Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion); 352 if (stat == null) { 353 LOG.warn("zk.setData() returned null for path " + task); 354 SplitLogCounters.tot_wkr_task_heartbeat_failed.increment(); 355 return FAILED_TO_OWN_TASK; 356 } 357 latestZKVersion = stat.getVersion(); 358 SplitLogCounters.tot_wkr_task_heartbeat.increment(); 359 return latestZKVersion; 360 } catch (KeeperException e) { 361 if (!isFirstTime) { 362 if (e.code().equals(KeeperException.Code.NONODE)) { 363 LOG.warn("NONODE failed to assert ownership for " + task, e); 364 } else if (e.code().equals(KeeperException.Code.BADVERSION)) { 365 LOG.warn("BADVERSION failed to assert ownership for " + task, e); 366 } else { 367 LOG.warn("failed to assert ownership for " + task, e); 368 } 369 } 370 } catch (InterruptedException e1) { 371 LOG.warn("Interrupted while trying to assert ownership of " + task + " " 372 + StringUtils.stringifyException(e1)); 373 Thread.currentThread().interrupt(); 374 } 375 SplitLogCounters.tot_wkr_task_heartbeat_failed.increment(); 376 return FAILED_TO_OWN_TASK; 377 } 378 379 /** 380 * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task one at a time. This 381 * policy puts an upper-limit on the number of simultaneous log splitting that could be happening 382 * in a cluster. 383 * <p> 384 * Synchronization using <code>taskReadySeq</code> ensures that it will try to grab every task 385 * that has been put up 386 * @throws InterruptedException 387 */ 388 @Override 389 public void taskLoop() throws InterruptedException { 390 while (!shouldStop) { 391 int seq_start = taskReadySeq.get(); 392 List<String> paths; 393 paths = getTaskList(); 394 if (paths == null) { 395 LOG.warn("Could not get tasks, did someone remove " + watcher.getZNodePaths().splitLogZNode 396 + " ... worker thread exiting."); 397 return; 398 } 399 // shuffle the paths to prevent different split log worker start from the same log file after 400 // meta log (if any) 401 Collections.shuffle(paths); 402 // pick meta wal firstly 403 int offset = 0; 404 for (int i = 0; i < paths.size(); i++) { 405 if (AbstractFSWALProvider.isMetaFile(paths.get(i))) { 406 offset = i; 407 break; 408 } 409 } 410 int numTasks = paths.size(); 411 boolean taskGrabbed = false; 412 for (int i = 0; i < numTasks; i++) { 413 while (!shouldStop) { 414 if (this.areSplittersAvailable()) { 415 if (LOG.isTraceEnabled()) { 416 LOG.trace("Current region server " + server.getServerName() 417 + " is ready to take more tasks, will get task list and try grab tasks again."); 418 } 419 int idx = (i + offset) % paths.size(); 420 // don't call ZKSplitLog.getNodeName() because that will lead to 421 // double encoding of the path name 422 taskGrabbed |= grabTask(ZNodePaths.joinZNode( 423 watcher.getZNodePaths().splitLogZNode, paths.get(idx))); 424 break; 425 } else { 426 if (LOG.isTraceEnabled()) { 427 LOG.trace("Current region server " + server.getServerName() + " has " 428 + this.tasksInProgress.get() + " tasks in progress and can't take more."); 429 } 430 Thread.sleep(100); 431 } 432 } 433 if (shouldStop) { 434 return; 435 } 436 } 437 if (!taskGrabbed && !shouldStop) { 438 // do not grab any tasks, sleep a little bit to reduce zk request. 439 Thread.sleep(1000); 440 } 441 SplitLogCounters.tot_wkr_task_grabing.increment(); 442 synchronized (taskReadySeq) { 443 while (seq_start == taskReadySeq.get()) { 444 taskReadySeq.wait(checkInterval); 445 } 446 } 447 } 448 } 449 450 private List<String> getTaskList() throws InterruptedException { 451 List<String> childrenPaths = null; 452 long sleepTime = 1000; 453 // It will be in loop till it gets the list of children or 454 // it will come out if worker thread exited. 455 while (!shouldStop) { 456 try { 457 childrenPaths = ZKUtil.listChildrenAndWatchForNewChildren(watcher, 458 watcher.getZNodePaths().splitLogZNode); 459 if (childrenPaths != null) { 460 return childrenPaths; 461 } 462 } catch (KeeperException e) { 463 LOG.warn("Could not get children of znode " + watcher.getZNodePaths().splitLogZNode, e); 464 } 465 LOG.debug("Retry listChildren of znode " + watcher.getZNodePaths().splitLogZNode 466 + " after sleep for " + sleepTime + "ms!"); 467 Thread.sleep(sleepTime); 468 } 469 return childrenPaths; 470 } 471 472 @Override 473 public void markCorrupted(Path rootDir, String name, FileSystem fs) { 474 ZKSplitLog.markCorrupted(rootDir, name, fs); 475 } 476 477 @Override 478 public boolean isReady() throws InterruptedException { 479 int result = -1; 480 try { 481 result = ZKUtil.checkExists(watcher, watcher.getZNodePaths().splitLogZNode); 482 } catch (KeeperException e) { 483 // ignore 484 LOG.warn("Exception when checking for " + watcher.getZNodePaths().splitLogZNode 485 + " ... retrying", e); 486 } 487 if (result == -1) { 488 LOG.info(watcher.getZNodePaths().splitLogZNode 489 + " znode does not exist, waiting for master to create"); 490 Thread.sleep(1000); 491 } 492 return (result != -1); 493 } 494 495 @Override 496 public int getTaskReadySeq() { 497 return taskReadySeq.get(); 498 } 499 500 @Override 501 public void registerListener() { 502 watcher.registerListener(this); 503 } 504 505 @Override 506 public void removeListener() { 507 watcher.unregisterListener(this); 508 } 509 510 511 @Override 512 public void stopProcessingTasks() { 513 this.shouldStop = true; 514 515 } 516 517 @Override 518 public boolean isStop() { 519 return shouldStop; 520 } 521 522 /** 523 * Asynchronous handler for zk get-data-set-watch on node results. 524 */ 525 class GetDataAsyncCallback implements AsyncCallback.DataCallback { 526 private final Logger LOG = LoggerFactory.getLogger(GetDataAsyncCallback.class); 527 528 @Override 529 public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { 530 SplitLogCounters.tot_wkr_get_data_result.increment(); 531 if (rc != 0) { 532 LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path); 533 getDataSetWatchFailure(path); 534 return; 535 } 536 data = ZKMetadata.removeMetaData(data); 537 getDataSetWatchSuccess(path, data); 538 } 539 } 540 541 /* 542 * Next part is related to WALSplitterHandler 543 */ 544 /** 545 * endTask() can fail and the only way to recover out of it is for the 546 * {@link org.apache.hadoop.hbase.master.SplitLogManager} to timeout the task node. 547 * @param slt 548 * @param ctr 549 */ 550 @Override 551 public void endTask(SplitLogTask slt, LongAdder ctr, SplitTaskDetails details) { 552 ZkSplitTaskDetails zkDetails = (ZkSplitTaskDetails) details; 553 String task = zkDetails.getTaskNode(); 554 int taskZKVersion = zkDetails.getCurTaskZKVersion().intValue(); 555 try { 556 if (ZKUtil.setData(watcher, task, slt.toByteArray(), taskZKVersion)) { 557 LOG.info("successfully transitioned task " + task + " to final state " + slt); 558 ctr.increment(); 559 return; 560 } 561 LOG.warn("failed to transistion task " + task + " to end state " + slt 562 + " because of version mismatch "); 563 } catch (KeeperException.BadVersionException bve) { 564 LOG.warn("transisition task " + task + " to " + slt + " failed because of version mismatch", 565 bve); 566 } catch (KeeperException.NoNodeException e) { 567 LOG.error(HBaseMarkers.FATAL, 568 "logic error - end task " + task + " " + slt + " failed because task doesn't exist", e); 569 } catch (KeeperException e) { 570 LOG.warn("failed to end task, " + task + " " + slt, e); 571 } 572 SplitLogCounters.tot_wkr_final_transition_failed.increment(); 573 } 574 575 /** 576 * When ZK-based implementation wants to complete the task, it needs to know task znode and 577 * current znode cversion (needed for subsequent update operation). 578 */ 579 public static class ZkSplitTaskDetails implements SplitTaskDetails { 580 private String taskNode; 581 private MutableInt curTaskZKVersion; 582 583 public ZkSplitTaskDetails() { 584 } 585 586 public ZkSplitTaskDetails(String taskNode, MutableInt curTaskZKVersion) { 587 this.taskNode = taskNode; 588 this.curTaskZKVersion = curTaskZKVersion; 589 } 590 591 public String getTaskNode() { 592 return taskNode; 593 } 594 595 public void setTaskNode(String taskNode) { 596 this.taskNode = taskNode; 597 } 598 599 public MutableInt getCurTaskZKVersion() { 600 return curTaskZKVersion; 601 } 602 603 public void setCurTaskZKVersion(MutableInt curTaskZKVersion) { 604 this.curTaskZKVersion = curTaskZKVersion; 605 } 606 607 @Override 608 public String getWALFile() { 609 return ZKSplitLog.getFileName(taskNode); 610 } 611 } 612 613}