001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coordination; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; 021import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 022 023import java.util.Collections; 024import java.util.List; 025import java.util.concurrent.ThreadLocalRandom; 026import java.util.concurrent.atomic.AtomicInteger; 027import java.util.concurrent.atomic.LongAdder; 028import org.apache.commons.lang3.mutable.MutableInt; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.SplitLogCounters; 035import org.apache.hadoop.hbase.SplitLogTask; 036import org.apache.hadoop.hbase.exceptions.DeserializationException; 037import org.apache.hadoop.hbase.log.HBaseMarkers; 038import org.apache.hadoop.hbase.regionserver.RegionServerServices; 039import org.apache.hadoop.hbase.regionserver.SplitLogWorker; 040import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor; 041import org.apache.hadoop.hbase.regionserver.handler.WALSplitterHandler; 042import org.apache.hadoop.hbase.util.CancelableProgressable; 043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 044import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 045import org.apache.hadoop.hbase.zookeeper.ZKListener; 046import org.apache.hadoop.hbase.zookeeper.ZKMetadata; 047import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 048import org.apache.hadoop.hbase.zookeeper.ZKUtil; 049import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 050import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 051import org.apache.hadoop.util.StringUtils; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.apache.zookeeper.AsyncCallback; 054import org.apache.zookeeper.KeeperException; 055import org.apache.zookeeper.data.Stat; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059/** 060 * ZooKeeper based implementation of {@link SplitLogWorkerCoordination} It listen for changes in 061 * ZooKeeper and 062 */ 063@InterfaceAudience.Private 064public class ZkSplitLogWorkerCoordination extends ZKListener implements SplitLogWorkerCoordination { 065 066 private static final Logger LOG = LoggerFactory.getLogger(ZkSplitLogWorkerCoordination.class); 067 068 private static final int checkInterval = 5000; // 5 seconds 069 private static final int FAILED_TO_OWN_TASK = -1; 070 071 private SplitLogWorker worker; 072 073 private TaskExecutor splitTaskExecutor; 074 075 private final AtomicInteger taskReadySeq = new AtomicInteger(0); 076 private volatile String currentTask = null; 077 private int currentVersion; 078 private volatile boolean shouldStop = false; 079 private final Object grabTaskLock = new Object(); 080 private boolean workerInGrabTask = false; 081 private int reportPeriod; 082 private RegionServerServices server = null; 083 protected final AtomicInteger tasksInProgress = new AtomicInteger(0); 084 private int maxConcurrentTasks = 0; 085 086 private final ServerName serverName; 087 088 public ZkSplitLogWorkerCoordination(ServerName serverName, ZKWatcher watcher) { 089 super(watcher); 090 this.serverName = serverName; 091 } 092 093 /** 094 * Override handler from {@link ZKListener} 095 */ 096 @Override 097 public void nodeChildrenChanged(String path) { 098 if (path.equals(watcher.getZNodePaths().splitLogZNode)) { 099 if (LOG.isTraceEnabled()) { 100 LOG.trace("tasks arrived or departed on " + path); 101 } 102 synchronized (taskReadySeq) { 103 this.taskReadySeq.incrementAndGet(); 104 taskReadySeq.notify(); 105 } 106 } 107 } 108 109 /** 110 * Override handler from {@link ZKListener} 111 */ 112 @Override 113 public void nodeDataChanged(String path) { 114 // there will be a self generated dataChanged event every time attemptToOwnTask() 115 // heartbeats the task znode by upping its version 116 synchronized (grabTaskLock) { 117 if (workerInGrabTask) { 118 // currentTask can change 119 String taskpath = currentTask; 120 if (taskpath != null && taskpath.equals(path)) { 121 getDataSetWatchAsync(); 122 } 123 } 124 } 125 } 126 127 /** 128 * Override setter from {@link SplitLogWorkerCoordination} 129 */ 130 @Override 131 public void init(RegionServerServices server, Configuration conf, TaskExecutor splitExecutor, 132 SplitLogWorker worker) { 133 this.server = server; 134 this.worker = worker; 135 this.splitTaskExecutor = splitExecutor; 136 maxConcurrentTasks = 137 conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER); 138 reportPeriod = conf.getInt("hbase.splitlog.report.period", 139 conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, 140 ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT) / 3); 141 } 142 143 /* Support functions for ZooKeeper async callback */ 144 145 void getDataSetWatchFailure(String path) { 146 synchronized (grabTaskLock) { 147 if (workerInGrabTask) { 148 // currentTask can change but that's ok 149 String taskpath = currentTask; 150 if (taskpath != null && taskpath.equals(path)) { 151 LOG.info("retrying data watch on " + path); 152 SplitLogCounters.tot_wkr_get_data_retry.increment(); 153 getDataSetWatchAsync(); 154 } else { 155 // no point setting a watch on the task which this worker is not 156 // working upon anymore 157 } 158 } 159 } 160 } 161 162 public void getDataSetWatchAsync() { 163 watcher.getRecoverableZooKeeper().getZooKeeper().getData(currentTask, watcher, 164 new GetDataAsyncCallback(), null); 165 SplitLogCounters.tot_wkr_get_data_queued.increment(); 166 } 167 168 void getDataSetWatchSuccess(String path, byte[] data) { 169 SplitLogTask slt; 170 try { 171 slt = SplitLogTask.parseFrom(data); 172 } catch (DeserializationException e) { 173 LOG.warn("Failed parse", e); 174 return; 175 } 176 synchronized (grabTaskLock) { 177 if (workerInGrabTask) { 178 // currentTask can change but that's ok 179 String taskpath = currentTask; 180 if (taskpath != null && taskpath.equals(path)) { 181 // have to compare data. cannot compare version because then there 182 // will be race with attemptToOwnTask() 183 // cannot just check whether the node has been transitioned to 184 // UNASSIGNED because by the time this worker sets the data watch 185 // the node might have made two transitions - from owned by this 186 // worker to unassigned to owned by another worker 187 if ( 188 !slt.isOwned(serverName) && !slt.isDone(serverName) && !slt.isErr(serverName) 189 && !slt.isResigned(serverName) 190 ) { 191 LOG.info("task " + taskpath + " preempted from " + serverName 192 + ", current task state and owner=" + slt.toString()); 193 worker.stopTask(); 194 } 195 } 196 } 197 } 198 } 199 200 /** 201 * try to grab a 'lock' on the task zk node to own and execute the task. 202 * <p> 203 * @param path zk node for the task 204 * @return boolean value when grab a task success return true otherwise false 205 */ 206 private boolean grabTask(String path) { 207 Stat stat = new Stat(); 208 byte[] data; 209 synchronized (grabTaskLock) { 210 currentTask = path; 211 workerInGrabTask = true; 212 if (Thread.interrupted()) { 213 return false; 214 } 215 } 216 try { 217 try { 218 if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) { 219 SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.increment(); 220 return false; 221 } 222 } catch (KeeperException e) { 223 LOG.warn("Failed to get data for znode " + path, e); 224 SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment(); 225 return false; 226 } 227 SplitLogTask slt; 228 try { 229 slt = SplitLogTask.parseFrom(data); 230 } catch (DeserializationException e) { 231 LOG.warn("Failed parse data for znode " + path, e); 232 SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment(); 233 return false; 234 } 235 if (!slt.isUnassigned()) { 236 SplitLogCounters.tot_wkr_failed_to_grab_task_owned.increment(); 237 return false; 238 } 239 240 currentVersion = 241 attemptToOwnTask(true, watcher, server.getServerName(), path, stat.getVersion()); 242 if (currentVersion < 0) { 243 SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.increment(); 244 return false; 245 } 246 247 if (ZKSplitLog.isRescanNode(watcher, currentTask)) { 248 ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails = 249 new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails(); 250 splitTaskDetails.setTaskNode(currentTask); 251 splitTaskDetails.setCurTaskZKVersion(new MutableInt(currentVersion)); 252 253 endTask(new SplitLogTask.Done(server.getServerName()), 254 SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails); 255 return false; 256 } 257 258 LOG.info("worker " + server.getServerName() + " acquired task " + path); 259 SplitLogCounters.tot_wkr_task_acquired.increment(); 260 getDataSetWatchAsync(); 261 262 submitTask(path, currentVersion, reportPeriod); 263 264 // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks 265 try { 266 int sleepTime = ThreadLocalRandom.current().nextInt(500) + 500; 267 Thread.sleep(sleepTime); 268 } catch (InterruptedException e) { 269 LOG.warn("Interrupted while yielding for other region servers", e); 270 Thread.currentThread().interrupt(); 271 } 272 return true; 273 } finally { 274 synchronized (grabTaskLock) { 275 workerInGrabTask = false; 276 // clear the interrupt from stopTask() otherwise the next task will 277 // suffer 278 Thread.interrupted(); 279 } 280 } 281 } 282 283 /** 284 * Submit a log split task to executor service 285 * @param curTask task to submit 286 * @param curTaskZKVersion current version of task 287 */ 288 void submitTask(final String curTask, final int curTaskZKVersion, final int reportPeriod) { 289 final MutableInt zkVersion = new MutableInt(curTaskZKVersion); 290 291 CancelableProgressable reporter = new CancelableProgressable() { 292 private long last_report_at = 0; 293 294 @Override 295 public boolean progress() { 296 long t = EnvironmentEdgeManager.currentTime(); 297 if ((t - last_report_at) > reportPeriod) { 298 last_report_at = t; 299 int latestZKVersion = 300 attemptToOwnTask(false, watcher, server.getServerName(), curTask, zkVersion.intValue()); 301 if (latestZKVersion < 0) { 302 LOG.warn("Failed to heartbeat the task" + curTask); 303 return false; 304 } 305 zkVersion.setValue(latestZKVersion); 306 } 307 return true; 308 } 309 }; 310 ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails = 311 new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails(); 312 splitTaskDetails.setTaskNode(curTask); 313 splitTaskDetails.setCurTaskZKVersion(zkVersion); 314 315 WALSplitterHandler hsh = new WALSplitterHandler(server, this, splitTaskDetails, reporter, 316 this.tasksInProgress, splitTaskExecutor); 317 server.getExecutorService().submit(hsh); 318 } 319 320 /** Returns true if more splitters are available, otherwise false. */ 321 private boolean areSplittersAvailable() { 322 return maxConcurrentTasks - tasksInProgress.get() > 0; 323 } 324 325 /** 326 * Try to own the task by transitioning the zk node data from UNASSIGNED to OWNED. 327 * <p> 328 * This method is also used to periodically heartbeat the task progress by transitioning the node 329 * from OWNED to OWNED. 330 * <p> 331 * @param isFirstTime shows whther it's the first attempt. 332 * @param zkw zk wathcer 333 * @param server name 334 * @param task to own 335 * @param taskZKVersion version of the task in zk 336 * @return non-negative integer value when task can be owned by current region server otherwise -1 337 */ 338 protected static int attemptToOwnTask(boolean isFirstTime, ZKWatcher zkw, ServerName server, 339 String task, int taskZKVersion) { 340 int latestZKVersion = FAILED_TO_OWN_TASK; 341 try { 342 SplitLogTask slt = new SplitLogTask.Owned(server); 343 Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion); 344 if (stat == null) { 345 LOG.warn("zk.setData() returned null for path " + task); 346 SplitLogCounters.tot_wkr_task_heartbeat_failed.increment(); 347 return FAILED_TO_OWN_TASK; 348 } 349 latestZKVersion = stat.getVersion(); 350 SplitLogCounters.tot_wkr_task_heartbeat.increment(); 351 return latestZKVersion; 352 } catch (KeeperException e) { 353 if (!isFirstTime) { 354 if (e.code().equals(KeeperException.Code.NONODE)) { 355 LOG.warn("NONODE failed to assert ownership for " + task, e); 356 } else if (e.code().equals(KeeperException.Code.BADVERSION)) { 357 LOG.warn("BADVERSION failed to assert ownership for " + task, e); 358 } else { 359 LOG.warn("failed to assert ownership for " + task, e); 360 } 361 } 362 } catch (InterruptedException e1) { 363 LOG.warn("Interrupted while trying to assert ownership of " + task + " " 364 + StringUtils.stringifyException(e1)); 365 Thread.currentThread().interrupt(); 366 } 367 SplitLogCounters.tot_wkr_task_heartbeat_failed.increment(); 368 return FAILED_TO_OWN_TASK; 369 } 370 371 /** 372 * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task one at a time. This 373 * policy puts an upper-limit on the number of simultaneous log splitting that could be happening 374 * in a cluster. 375 * <p> 376 * Synchronization using <code>taskReadySeq</code> ensures that it will try to grab every task 377 * that has been put up 378 */ 379 @Override 380 public void taskLoop() throws InterruptedException { 381 while (!shouldStop) { 382 int seq_start = taskReadySeq.get(); 383 List<String> paths; 384 paths = getTaskList(); 385 if (paths == null) { 386 LOG.warn("Could not get tasks, did someone remove " + watcher.getZNodePaths().splitLogZNode 387 + " ... worker thread exiting."); 388 return; 389 } 390 // shuffle the paths to prevent different split log worker start from the same log file after 391 // meta log (if any) 392 Collections.shuffle(paths); 393 // pick meta wal firstly 394 int offset = 0; 395 for (int i = 0; i < paths.size(); i++) { 396 if (AbstractFSWALProvider.isMetaFile(paths.get(i))) { 397 offset = i; 398 break; 399 } 400 } 401 int numTasks = paths.size(); 402 boolean taskGrabbed = false; 403 for (int i = 0; i < numTasks; i++) { 404 while (!shouldStop) { 405 if (this.areSplittersAvailable()) { 406 if (LOG.isTraceEnabled()) { 407 LOG.trace("Current region server " + server.getServerName() 408 + " is ready to take more tasks, will get task list and try grab tasks again."); 409 } 410 int idx = (i + offset) % paths.size(); 411 // don't call ZKSplitLog.getNodeName() because that will lead to 412 // double encoding of the path name 413 taskGrabbed |= 414 grabTask(ZNodePaths.joinZNode(watcher.getZNodePaths().splitLogZNode, paths.get(idx))); 415 break; 416 } else { 417 if (LOG.isTraceEnabled()) { 418 LOG.trace("Current region server " + server.getServerName() + " has " 419 + this.tasksInProgress.get() + " tasks in progress and can't take more."); 420 } 421 Thread.sleep(100); 422 } 423 } 424 if (shouldStop) { 425 return; 426 } 427 } 428 if (!taskGrabbed && !shouldStop) { 429 // do not grab any tasks, sleep a little bit to reduce zk request. 430 Thread.sleep(1000); 431 } 432 SplitLogCounters.tot_wkr_task_grabing.increment(); 433 synchronized (taskReadySeq) { 434 while (seq_start == taskReadySeq.get()) { 435 taskReadySeq.wait(checkInterval); 436 } 437 } 438 } 439 } 440 441 private List<String> getTaskList() throws InterruptedException { 442 List<String> childrenPaths = null; 443 long sleepTime = 1000; 444 // It will be in loop till it gets the list of children or 445 // it will come out if worker thread exited. 446 while (!shouldStop) { 447 try { 448 childrenPaths = 449 ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().splitLogZNode); 450 if (childrenPaths != null) { 451 return childrenPaths; 452 } 453 } catch (KeeperException e) { 454 LOG.warn("Could not get children of znode " + watcher.getZNodePaths().splitLogZNode, e); 455 } 456 LOG.debug("Retry listChildren of znode " + watcher.getZNodePaths().splitLogZNode 457 + " after sleep for " + sleepTime + "ms!"); 458 Thread.sleep(sleepTime); 459 } 460 return childrenPaths; 461 } 462 463 @Override 464 public void markCorrupted(Path rootDir, String name, FileSystem fs) { 465 ZKSplitLog.markCorrupted(rootDir, name, fs); 466 } 467 468 @Override 469 public boolean isReady() throws InterruptedException { 470 int result = -1; 471 try { 472 result = ZKUtil.checkExists(watcher, watcher.getZNodePaths().splitLogZNode); 473 } catch (KeeperException e) { 474 // ignore 475 LOG.warn( 476 "Exception when checking for " + watcher.getZNodePaths().splitLogZNode + " ... retrying", 477 e); 478 } 479 if (result == -1) { 480 LOG.info(watcher.getZNodePaths().splitLogZNode 481 + " znode does not exist, waiting for master to create"); 482 Thread.sleep(1000); 483 } 484 return (result != -1); 485 } 486 487 @Override 488 public int getTaskReadySeq() { 489 return taskReadySeq.get(); 490 } 491 492 @Override 493 public void registerListener() { 494 watcher.registerListener(this); 495 } 496 497 @Override 498 public void removeListener() { 499 watcher.unregisterListener(this); 500 } 501 502 @Override 503 public void stopProcessingTasks() { 504 this.shouldStop = true; 505 506 } 507 508 @Override 509 public boolean isStop() { 510 return shouldStop; 511 } 512 513 /** 514 * Asynchronous handler for zk get-data-set-watch on node results. 515 */ 516 class GetDataAsyncCallback implements AsyncCallback.DataCallback { 517 private final Logger LOG = LoggerFactory.getLogger(GetDataAsyncCallback.class); 518 519 @Override 520 public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { 521 SplitLogCounters.tot_wkr_get_data_result.increment(); 522 if (rc != 0) { 523 LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path); 524 getDataSetWatchFailure(path); 525 return; 526 } 527 data = ZKMetadata.removeMetaData(data); 528 getDataSetWatchSuccess(path, data); 529 } 530 } 531 532 /* 533 * Next part is related to WALSplitterHandler 534 */ 535 /** 536 * endTask() can fail and the only way to recover out of it is for the 537 * {@link org.apache.hadoop.hbase.master.SplitLogManager} to timeout the task node. 538 */ 539 @Override 540 public void endTask(SplitLogTask slt, LongAdder ctr, SplitTaskDetails details) { 541 ZkSplitTaskDetails zkDetails = (ZkSplitTaskDetails) details; 542 String task = zkDetails.getTaskNode(); 543 int taskZKVersion = zkDetails.getCurTaskZKVersion().intValue(); 544 try { 545 if (ZKUtil.setData(watcher, task, slt.toByteArray(), taskZKVersion)) { 546 LOG.info("successfully transitioned task " + task + " to final state " + slt); 547 ctr.increment(); 548 return; 549 } 550 LOG.warn("failed to transistion task " + task + " to end state " + slt 551 + " because of version mismatch "); 552 } catch (KeeperException.BadVersionException bve) { 553 LOG.warn("transisition task " + task + " to " + slt + " failed because of version mismatch", 554 bve); 555 } catch (KeeperException.NoNodeException e) { 556 LOG.error(HBaseMarkers.FATAL, 557 "logic error - end task " + task + " " + slt + " failed because task doesn't exist", e); 558 } catch (KeeperException e) { 559 LOG.warn("failed to end task, " + task + " " + slt, e); 560 } 561 SplitLogCounters.tot_wkr_final_transition_failed.increment(); 562 } 563 564 /** 565 * When ZK-based implementation wants to complete the task, it needs to know task znode and 566 * current znode cversion (needed for subsequent update operation). 567 */ 568 public static class ZkSplitTaskDetails implements SplitTaskDetails { 569 private String taskNode; 570 private MutableInt curTaskZKVersion; 571 572 public ZkSplitTaskDetails() { 573 } 574 575 public ZkSplitTaskDetails(String taskNode, MutableInt curTaskZKVersion) { 576 this.taskNode = taskNode; 577 this.curTaskZKVersion = curTaskZKVersion; 578 } 579 580 public String getTaskNode() { 581 return taskNode; 582 } 583 584 public void setTaskNode(String taskNode) { 585 this.taskNode = taskNode; 586 } 587 588 public MutableInt getCurTaskZKVersion() { 589 return curTaskZKVersion; 590 } 591 592 public void setCurTaskZKVersion(MutableInt curTaskZKVersion) { 593 this.curTaskZKVersion = curTaskZKVersion; 594 } 595 596 @Override 597 public String getWALFile() { 598 return ZKSplitLog.getFileName(taskNode); 599 } 600 } 601 602}