001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coordination; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; 021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 022 023import java.util.Collections; 024import java.util.List; 025import java.util.concurrent.ThreadLocalRandom; 026import java.util.concurrent.atomic.AtomicInteger; 027import java.util.concurrent.atomic.LongAdder; 028import org.apache.commons.lang3.mutable.MutableInt; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.SplitLogCounters; 035import org.apache.hadoop.hbase.SplitLogTask; 036import org.apache.hadoop.hbase.exceptions.DeserializationException; 037import org.apache.hadoop.hbase.log.HBaseMarkers; 038import org.apache.hadoop.hbase.regionserver.RegionServerServices; 039import org.apache.hadoop.hbase.regionserver.SplitLogWorker; 040import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor; 041import org.apache.hadoop.hbase.regionserver.handler.WALSplitterHandler; 042import org.apache.hadoop.hbase.util.CancelableProgressable; 043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 044import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 045import org.apache.hadoop.hbase.zookeeper.ZKListener; 046import org.apache.hadoop.hbase.zookeeper.ZKMetadata; 047import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 048import org.apache.hadoop.hbase.zookeeper.ZKUtil; 049import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 050import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 051import org.apache.hadoop.util.StringUtils; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.apache.zookeeper.AsyncCallback; 054import org.apache.zookeeper.KeeperException; 055import org.apache.zookeeper.data.Stat; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059/** 060 * ZooKeeper based implementation of {@link SplitLogWorkerCoordination} It listen for changes in 061 * ZooKeeper and 062 */ 063@InterfaceAudience.Private 064public class ZkSplitLogWorkerCoordination extends ZKListener implements SplitLogWorkerCoordination { 065 066 private static final Logger LOG = LoggerFactory.getLogger(ZkSplitLogWorkerCoordination.class); 067 068 private static final int checkInterval = 5000; // 5 seconds 069 private static final int FAILED_TO_OWN_TASK = -1; 070 071 private SplitLogWorker worker; 072 073 private TaskExecutor splitTaskExecutor; 074 075 private final AtomicInteger taskReadySeq = new AtomicInteger(0); 076 private volatile String currentTask = null; 077 private int currentVersion; 078 private volatile boolean shouldStop = false; 079 private final Object grabTaskLock = new Object(); 080 private boolean workerInGrabTask = false; 081 private int reportPeriod; 082 private RegionServerServices server = null; 083 protected final AtomicInteger tasksInProgress = new AtomicInteger(0); 084 private int maxConcurrentTasks = 0; 085 086 private final ServerName serverName; 087 088 public ZkSplitLogWorkerCoordination(ServerName serverName, ZKWatcher watcher) { 089 super(watcher); 090 this.serverName = serverName; 091 } 092 093 /** 094 * Override handler from {@link ZKListener} 095 */ 096 @Override 097 public void nodeChildrenChanged(String path) { 098 if (path.equals(watcher.getZNodePaths().splitLogZNode)) { 099 if (LOG.isTraceEnabled()) { 100 LOG.trace("tasks arrived or departed on " + path); 101 } 102 synchronized (taskReadySeq) { 103 this.taskReadySeq.incrementAndGet(); 104 taskReadySeq.notify(); 105 } 106 } 107 } 108 109 /** 110 * Override handler from {@link ZKListener} 111 */ 112 @Override 113 public void nodeDataChanged(String path) { 114 // there will be a self generated dataChanged event every time attemptToOwnTask() 115 // heartbeats the task znode by upping its version 116 synchronized (grabTaskLock) { 117 if (workerInGrabTask) { 118 // currentTask can change 119 String taskpath = currentTask; 120 if (taskpath != null && taskpath.equals(path)) { 121 getDataSetWatchAsync(); 122 } 123 } 124 } 125 } 126 127 /** 128 * Override setter from {@link SplitLogWorkerCoordination} 129 */ 130 @Override 131 public void init(RegionServerServices server, Configuration conf, TaskExecutor splitExecutor, 132 SplitLogWorker worker) { 133 this.server = server; 134 this.worker = worker; 135 this.splitTaskExecutor = splitExecutor; 136 maxConcurrentTasks = 137 conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER); 138 reportPeriod = conf.getInt("hbase.splitlog.report.period", 139 conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, 140 ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT) / 3); 141 } 142 143 /* Support functions for ZooKeeper async callback */ 144 145 void getDataSetWatchFailure(String path) { 146 synchronized (grabTaskLock) { 147 if (workerInGrabTask) { 148 // currentTask can change but that's ok 149 String taskpath = currentTask; 150 if (taskpath != null && taskpath.equals(path)) { 151 LOG.info("retrying data watch on " + path); 152 SplitLogCounters.tot_wkr_get_data_retry.increment(); 153 getDataSetWatchAsync(); 154 } else { 155 // no point setting a watch on the task which this worker is not 156 // working upon anymore 157 } 158 } 159 } 160 } 161 162 public void getDataSetWatchAsync() { 163 watcher.getRecoverableZooKeeper().getZooKeeper().getData(currentTask, watcher, 164 new GetDataAsyncCallback(), null); 165 SplitLogCounters.tot_wkr_get_data_queued.increment(); 166 } 167 168 void getDataSetWatchSuccess(String path, byte[] data) { 169 SplitLogTask slt; 170 try { 171 slt = SplitLogTask.parseFrom(data); 172 } catch (DeserializationException e) { 173 LOG.warn("Failed parse", e); 174 return; 175 } 176 synchronized (grabTaskLock) { 177 if (workerInGrabTask) { 178 // currentTask can change but that's ok 179 String taskpath = currentTask; 180 if (taskpath != null && taskpath.equals(path)) { 181 // have to compare data. cannot compare version because then there 182 // will be race with attemptToOwnTask() 183 // cannot just check whether the node has been transitioned to 184 // UNASSIGNED because by the time this worker sets the data watch 185 // the node might have made two transitions - from owned by this 186 // worker to unassigned to owned by another worker 187 if ( 188 !slt.isOwned(serverName) && !slt.isDone(serverName) && !slt.isErr(serverName) 189 && !slt.isResigned(serverName) 190 ) { 191 LOG.info("task " + taskpath + " preempted from " + serverName 192 + ", current task state and owner=" + slt.toString()); 193 worker.stopTask(); 194 } 195 } 196 } 197 } 198 } 199 200 /** 201 * try to grab a 'lock' on the task zk node to own and execute the task. 202 * <p> 203 * @param path zk node for the task 204 * @return boolean value when grab a task success return true otherwise false 205 */ 206 private boolean grabTask(String path) { 207 Stat stat = new Stat(); 208 byte[] data; 209 synchronized (grabTaskLock) { 210 currentTask = path; 211 workerInGrabTask = true; 212 if (Thread.interrupted()) { 213 return false; 214 } 215 } 216 try { 217 try { 218 if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) { 219 SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.increment(); 220 return false; 221 } 222 } catch (KeeperException e) { 223 LOG.warn("Failed to get data for znode " + path, e); 224 SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment(); 225 return false; 226 } 227 SplitLogTask slt; 228 try { 229 slt = SplitLogTask.parseFrom(data); 230 } catch (DeserializationException e) { 231 LOG.warn("Failed parse data for znode " + path, e); 232 SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment(); 233 return false; 234 } 235 if (!slt.isUnassigned()) { 236 SplitLogCounters.tot_wkr_failed_to_grab_task_owned.increment(); 237 return false; 238 } 239 240 currentVersion = 241 attemptToOwnTask(true, watcher, server.getServerName(), path, stat.getVersion()); 242 if (currentVersion < 0) { 243 SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.increment(); 244 return false; 245 } 246 247 if (ZKSplitLog.isRescanNode(watcher, currentTask)) { 248 ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails = 249 new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails(); 250 splitTaskDetails.setTaskNode(currentTask); 251 splitTaskDetails.setCurTaskZKVersion(new MutableInt(currentVersion)); 252 253 endTask(new SplitLogTask.Done(server.getServerName()), 254 SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails); 255 return false; 256 } 257 258 LOG.info("worker " + server.getServerName() + " acquired task " + path); 259 SplitLogCounters.tot_wkr_task_acquired.increment(); 260 getDataSetWatchAsync(); 261 262 submitTask(path, currentVersion, reportPeriod); 263 264 // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks 265 try { 266 int sleepTime = ThreadLocalRandom.current().nextInt(500) + 500; 267 Thread.sleep(sleepTime); 268 } catch (InterruptedException e) { 269 LOG.warn("Interrupted while yielding for other region servers", e); 270 Thread.currentThread().interrupt(); 271 } 272 return true; 273 } finally { 274 synchronized (grabTaskLock) { 275 workerInGrabTask = false; 276 // clear the interrupt from stopTask() otherwise the next task will 277 // suffer 278 Thread.interrupted(); 279 } 280 } 281 } 282 283 /** 284 * Submit a log split task to executor service 285 * @param curTask task to submit 286 * @param curTaskZKVersion current version of task 287 */ 288 void submitTask(final String curTask, final int curTaskZKVersion, final int reportPeriod) { 289 final MutableInt zkVersion = new MutableInt(curTaskZKVersion); 290 291 CancelableProgressable reporter = new CancelableProgressable() { 292 private long last_report_at = 0; 293 294 @Override 295 public boolean progress() { 296 long t = EnvironmentEdgeManager.currentTime(); 297 if ((t - last_report_at) > reportPeriod) { 298 last_report_at = t; 299 int latestZKVersion = 300 attemptToOwnTask(false, watcher, server.getServerName(), curTask, zkVersion.intValue()); 301 if (latestZKVersion < 0) { 302 LOG.warn("Failed to heartbeat the task" + curTask); 303 return false; 304 } 305 zkVersion.setValue(latestZKVersion); 306 } 307 return true; 308 } 309 }; 310 ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails = 311 new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails(); 312 splitTaskDetails.setTaskNode(curTask); 313 splitTaskDetails.setCurTaskZKVersion(zkVersion); 314 315 WALSplitterHandler hsh = new WALSplitterHandler(server, this, splitTaskDetails, reporter, 316 this.tasksInProgress, splitTaskExecutor); 317 server.getExecutorService().submit(hsh); 318 } 319 320 /** 321 * @return true if more splitters are available, otherwise false. 322 */ 323 private boolean areSplittersAvailable() { 324 return maxConcurrentTasks - tasksInProgress.get() > 0; 325 } 326 327 /** 328 * Try to own the task by transitioning the zk node data from UNASSIGNED to OWNED. 329 * <p> 330 * This method is also used to periodically heartbeat the task progress by transitioning the node 331 * from OWNED to OWNED. 332 * <p> 333 * @param isFirstTime shows whther it's the first attempt. 334 * @param zkw zk wathcer 335 * @param server name 336 * @param task to own 337 * @param taskZKVersion version of the task in zk 338 * @return non-negative integer value when task can be owned by current region server otherwise -1 339 */ 340 protected static int attemptToOwnTask(boolean isFirstTime, ZKWatcher zkw, ServerName server, 341 String task, int taskZKVersion) { 342 int latestZKVersion = FAILED_TO_OWN_TASK; 343 try { 344 SplitLogTask slt = new SplitLogTask.Owned(server); 345 Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion); 346 if (stat == null) { 347 LOG.warn("zk.setData() returned null for path " + task); 348 SplitLogCounters.tot_wkr_task_heartbeat_failed.increment(); 349 return FAILED_TO_OWN_TASK; 350 } 351 latestZKVersion = stat.getVersion(); 352 SplitLogCounters.tot_wkr_task_heartbeat.increment(); 353 return latestZKVersion; 354 } catch (KeeperException e) { 355 if (!isFirstTime) { 356 if (e.code().equals(KeeperException.Code.NONODE)) { 357 LOG.warn("NONODE failed to assert ownership for " + task, e); 358 } else if (e.code().equals(KeeperException.Code.BADVERSION)) { 359 LOG.warn("BADVERSION failed to assert ownership for " + task, e); 360 } else { 361 LOG.warn("failed to assert ownership for " + task, e); 362 } 363 } 364 } catch (InterruptedException e1) { 365 LOG.warn("Interrupted while trying to assert ownership of " + task + " " 366 + StringUtils.stringifyException(e1)); 367 Thread.currentThread().interrupt(); 368 } 369 SplitLogCounters.tot_wkr_task_heartbeat_failed.increment(); 370 return FAILED_TO_OWN_TASK; 371 } 372 373 /** 374 * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task one at a time. This 375 * policy puts an upper-limit on the number of simultaneous log splitting that could be happening 376 * in a cluster. 377 * <p> 378 * Synchronization using <code>taskReadySeq</code> ensures that it will try to grab every task 379 * that has been put up n 380 */ 381 @Override 382 public void taskLoop() throws InterruptedException { 383 while (!shouldStop) { 384 int seq_start = taskReadySeq.get(); 385 List<String> paths; 386 paths = getTaskList(); 387 if (paths == null) { 388 LOG.warn("Could not get tasks, did someone remove " + watcher.getZNodePaths().splitLogZNode 389 + " ... worker thread exiting."); 390 return; 391 } 392 // shuffle the paths to prevent different split log worker start from the same log file after 393 // meta log (if any) 394 Collections.shuffle(paths); 395 // pick meta wal firstly 396 int offset = 0; 397 for (int i = 0; i < paths.size(); i++) { 398 if (AbstractFSWALProvider.isMetaFile(paths.get(i))) { 399 offset = i; 400 break; 401 } 402 } 403 int numTasks = paths.size(); 404 boolean taskGrabbed = false; 405 for (int i = 0; i < numTasks; i++) { 406 while (!shouldStop) { 407 if (this.areSplittersAvailable()) { 408 if (LOG.isTraceEnabled()) { 409 LOG.trace("Current region server " + server.getServerName() 410 + " is ready to take more tasks, will get task list and try grab tasks again."); 411 } 412 int idx = (i + offset) % paths.size(); 413 // don't call ZKSplitLog.getNodeName() because that will lead to 414 // double encoding of the path name 415 taskGrabbed |= 416 grabTask(ZNodePaths.joinZNode(watcher.getZNodePaths().splitLogZNode, paths.get(idx))); 417 break; 418 } else { 419 if (LOG.isTraceEnabled()) { 420 LOG.trace("Current region server " + server.getServerName() + " has " 421 + this.tasksInProgress.get() + " tasks in progress and can't take more."); 422 } 423 Thread.sleep(100); 424 } 425 } 426 if (shouldStop) { 427 return; 428 } 429 } 430 if (!taskGrabbed && !shouldStop) { 431 // do not grab any tasks, sleep a little bit to reduce zk request. 432 Thread.sleep(1000); 433 } 434 SplitLogCounters.tot_wkr_task_grabing.increment(); 435 synchronized (taskReadySeq) { 436 while (seq_start == taskReadySeq.get()) { 437 taskReadySeq.wait(checkInterval); 438 } 439 } 440 } 441 } 442 443 private List<String> getTaskList() throws InterruptedException { 444 List<String> childrenPaths = null; 445 long sleepTime = 1000; 446 // It will be in loop till it gets the list of children or 447 // it will come out if worker thread exited. 448 while (!shouldStop) { 449 try { 450 childrenPaths = 451 ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().splitLogZNode); 452 if (childrenPaths != null) { 453 return childrenPaths; 454 } 455 } catch (KeeperException e) { 456 LOG.warn("Could not get children of znode " + watcher.getZNodePaths().splitLogZNode, e); 457 } 458 LOG.debug("Retry listChildren of znode " + watcher.getZNodePaths().splitLogZNode 459 + " after sleep for " + sleepTime + "ms!"); 460 Thread.sleep(sleepTime); 461 } 462 return childrenPaths; 463 } 464 465 @Override 466 public void markCorrupted(Path rootDir, String name, FileSystem fs) { 467 ZKSplitLog.markCorrupted(rootDir, name, fs); 468 } 469 470 @Override 471 public boolean isReady() throws InterruptedException { 472 int result = -1; 473 try { 474 result = ZKUtil.checkExists(watcher, watcher.getZNodePaths().splitLogZNode); 475 } catch (KeeperException e) { 476 // ignore 477 LOG.warn( 478 "Exception when checking for " + watcher.getZNodePaths().splitLogZNode + " ... retrying", 479 e); 480 } 481 if (result == -1) { 482 LOG.info(watcher.getZNodePaths().splitLogZNode 483 + " znode does not exist, waiting for master to create"); 484 Thread.sleep(1000); 485 } 486 return (result != -1); 487 } 488 489 @Override 490 public int getTaskReadySeq() { 491 return taskReadySeq.get(); 492 } 493 494 @Override 495 public void registerListener() { 496 watcher.registerListener(this); 497 } 498 499 @Override 500 public void removeListener() { 501 watcher.unregisterListener(this); 502 } 503 504 @Override 505 public void stopProcessingTasks() { 506 this.shouldStop = true; 507 508 } 509 510 @Override 511 public boolean isStop() { 512 return shouldStop; 513 } 514 515 /** 516 * Asynchronous handler for zk get-data-set-watch on node results. 517 */ 518 class GetDataAsyncCallback implements AsyncCallback.DataCallback { 519 private final Logger LOG = LoggerFactory.getLogger(GetDataAsyncCallback.class); 520 521 @Override 522 public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { 523 SplitLogCounters.tot_wkr_get_data_result.increment(); 524 if (rc != 0) { 525 LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path); 526 getDataSetWatchFailure(path); 527 return; 528 } 529 data = ZKMetadata.removeMetaData(data); 530 getDataSetWatchSuccess(path, data); 531 } 532 } 533 534 /* 535 * Next part is related to WALSplitterHandler 536 */ 537 /** 538 * endTask() can fail and the only way to recover out of it is for the 539 * {@link org.apache.hadoop.hbase.master.SplitLogManager} to timeout the task node. nn 540 */ 541 @Override 542 public void endTask(SplitLogTask slt, LongAdder ctr, SplitTaskDetails details) { 543 ZkSplitTaskDetails zkDetails = (ZkSplitTaskDetails) details; 544 String task = zkDetails.getTaskNode(); 545 int taskZKVersion = zkDetails.getCurTaskZKVersion().intValue(); 546 try { 547 if (ZKUtil.setData(watcher, task, slt.toByteArray(), taskZKVersion)) { 548 LOG.info("successfully transitioned task " + task + " to final state " + slt); 549 ctr.increment(); 550 return; 551 } 552 LOG.warn("failed to transistion task " + task + " to end state " + slt 553 + " because of version mismatch "); 554 } catch (KeeperException.BadVersionException bve) { 555 LOG.warn("transisition task " + task + " to " + slt + " failed because of version mismatch", 556 bve); 557 } catch (KeeperException.NoNodeException e) { 558 LOG.error(HBaseMarkers.FATAL, 559 "logic error - end task " + task + " " + slt + " failed because task doesn't exist", e); 560 } catch (KeeperException e) { 561 LOG.warn("failed to end task, " + task + " " + slt, e); 562 } 563 SplitLogCounters.tot_wkr_final_transition_failed.increment(); 564 } 565 566 /** 567 * When ZK-based implementation wants to complete the task, it needs to know task znode and 568 * current znode cversion (needed for subsequent update operation). 569 */ 570 public static class ZkSplitTaskDetails implements SplitTaskDetails { 571 private String taskNode; 572 private MutableInt curTaskZKVersion; 573 574 public ZkSplitTaskDetails() { 575 } 576 577 public ZkSplitTaskDetails(String taskNode, MutableInt curTaskZKVersion) { 578 this.taskNode = taskNode; 579 this.curTaskZKVersion = curTaskZKVersion; 580 } 581 582 public String getTaskNode() { 583 return taskNode; 584 } 585 586 public void setTaskNode(String taskNode) { 587 this.taskNode = taskNode; 588 } 589 590 public MutableInt getCurTaskZKVersion() { 591 return curTaskZKVersion; 592 } 593 594 public void setCurTaskZKVersion(MutableInt curTaskZKVersion) { 595 this.curTaskZKVersion = curTaskZKVersion; 596 } 597 598 @Override 599 public String getWALFile() { 600 return ZKSplitLog.getFileName(taskNode); 601 } 602 } 603 604}