001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK; 020import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE; 021import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED; 022import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE; 023import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS; 024import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS; 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Collections; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Map; 032import java.util.Set; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.ConcurrentMap; 035import java.util.concurrent.atomic.AtomicInteger; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileStatus; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.fs.PathFilter; 041import org.apache.hadoop.hbase.ChoreService; 042import org.apache.hadoop.hbase.ScheduledChore; 043import org.apache.hadoop.hbase.ServerName; 044import org.apache.hadoop.hbase.SplitLogCounters; 045import org.apache.hadoop.hbase.Stoppable; 046import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination; 047import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails; 048import org.apache.hadoop.hbase.log.HBaseMarkers; 049import org.apache.hadoop.hbase.monitoring.MonitoredTask; 050import org.apache.hadoop.hbase.monitoring.TaskMonitor; 051import org.apache.hadoop.hbase.procedure2.util.StringUtils; 052import org.apache.hadoop.hbase.util.CommonFSUtils; 053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 054import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 059 060/** 061 * Distributes the task of log splitting to the available region servers. 062 * Coordination happens via coordination engine. For every log file that has to be split a 063 * task is created. SplitLogWorkers race to grab a task. 064 * 065 * <p>SplitLogManager monitors the tasks that it creates using the 066 * timeoutMonitor thread. If a task's progress is slow then 067 * {@link SplitLogManagerCoordination#checkTasks} will take away the 068 * task from the owner {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} 069 * and the task will be up for grabs again. When the task is done then it is 070 * deleted by SplitLogManager. 071 * 072 * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's 073 * log files. The caller thread waits in this method until all the log files 074 * have been split. 075 * 076 * <p>All the coordination calls made by this class are asynchronous. This is mainly 077 * to help reduce response time seen by the callers. 078 * 079 * <p>There is race in this design between the SplitLogManager and the 080 * SplitLogWorker. SplitLogManager might re-queue a task that has in reality 081 * already been completed by a SplitLogWorker. We rely on the idempotency of 082 * the log splitting task for correctness. 083 * 084 * <p>It is also assumed that every log splitting task is unique and once 085 * completed (either with success or with error) it will be not be submitted 086 * again. If a task is resubmitted then there is a risk that old "delete task" 087 * can delete the re-submission. 088 * @see SplitWALManager for an alternate implementation based on Procedures. 089 */ 090@InterfaceAudience.Private 091public class SplitLogManager { 092 private static final Logger LOG = LoggerFactory.getLogger(SplitLogManager.class); 093 094 private final MasterServices server; 095 096 private final Configuration conf; 097 private final ChoreService choreService; 098 099 public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min 100 101 private long unassignedTimeout; 102 private long lastTaskCreateTime = Long.MAX_VALUE; 103 104 @VisibleForTesting 105 final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<>(); 106 private TimeoutMonitor timeoutMonitor; 107 108 private volatile Set<ServerName> deadWorkers = null; 109 private final Object deadWorkersLock = new Object(); 110 111 /** 112 * Its OK to construct this object even when region-servers are not online. It does lookup the 113 * orphan tasks in coordination engine but it doesn't block waiting for them to be done. 114 * @param master the master services 115 * @param conf the HBase configuration 116 * @throws IOException 117 */ 118 public SplitLogManager(MasterServices master, Configuration conf) 119 throws IOException { 120 this.server = master; 121 this.conf = conf; 122 // If no CoordinatedStateManager, skip registering as a chore service (The 123 // CoordinatedStateManager is non-null if we are running the ZK-based distributed WAL 124 // splitting. It is null if we are configured to use procedure-based distributed WAL 125 // splitting. 126 if (server.getCoordinatedStateManager() != null) { 127 this.choreService = 128 new ChoreService(master.getServerName().toShortString() + ".splitLogManager."); 129 SplitLogManagerCoordination coordination = getSplitLogManagerCoordination(); 130 Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>()); 131 SplitLogManagerDetails details = new SplitLogManagerDetails(tasks, master, failedDeletions); 132 coordination.setDetails(details); 133 coordination.init(); 134 this.unassignedTimeout = 135 conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); 136 this.timeoutMonitor = 137 new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), 138 master); 139 this.choreService.scheduleChore(timeoutMonitor); 140 } else { 141 this.choreService = null; 142 this.timeoutMonitor = null; 143 } 144 } 145 146 private SplitLogManagerCoordination getSplitLogManagerCoordination() { 147 return server.getCoordinatedStateManager().getSplitLogManagerCoordination(); 148 } 149 150 private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException { 151 return getFileList(conf, logDirs, filter); 152 } 153 154 /** 155 * Get a list of paths that need to be split given a set of server-specific directories and 156 * optionally a filter. 157 * 158 * See {@link AbstractFSWALProvider#getServerNameFromWALDirectoryName} for more info on directory 159 * layout. 160 * 161 * Should be package-private, but is needed by 162 * {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem, 163 * Configuration, org.apache.hadoop.hbase.wal.WALFactory)} for tests. 164 */ 165 @VisibleForTesting 166 public static FileStatus[] getFileList(final Configuration conf, final List<Path> logDirs, 167 final PathFilter filter) 168 throws IOException { 169 List<FileStatus> fileStatus = new ArrayList<>(); 170 for (Path logDir : logDirs) { 171 final FileSystem fs = logDir.getFileSystem(conf); 172 if (!fs.exists(logDir)) { 173 LOG.warn(logDir + " doesn't exist. Nothing to do!"); 174 continue; 175 } 176 FileStatus[] logfiles = CommonFSUtils.listStatus(fs, logDir, filter); 177 if (logfiles == null || logfiles.length == 0) { 178 LOG.info("{} dir is empty, no logs to split.", logDir); 179 } else { 180 Collections.addAll(fileStatus, logfiles); 181 } 182 } 183 FileStatus[] a = new FileStatus[fileStatus.size()]; 184 return fileStatus.toArray(a); 185 } 186 187 /** 188 * @param logDir one region sever wal dir path in .logs 189 * @throws IOException if there was an error while splitting any log file 190 * @return cumulative size of the logfiles split 191 * @throws IOException 192 */ 193 public long splitLogDistributed(final Path logDir) throws IOException { 194 List<Path> logDirs = new ArrayList<>(); 195 logDirs.add(logDir); 196 return splitLogDistributed(logDirs); 197 } 198 199 /** 200 * The caller will block until all the log files of the given region server have been processed - 201 * successfully split or an error is encountered - by an available worker region server. This 202 * method must only be called after the region servers have been brought online. 203 * @param logDirs List of log dirs to split 204 * @throws IOException If there was an error while splitting any log file 205 * @return cumulative size of the logfiles split 206 */ 207 public long splitLogDistributed(final List<Path> logDirs) throws IOException { 208 if (logDirs.isEmpty()) { 209 return 0; 210 } 211 Set<ServerName> serverNames = new HashSet<>(); 212 for (Path logDir : logDirs) { 213 try { 214 ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(logDir); 215 if (serverName != null) { 216 serverNames.add(serverName); 217 } 218 } catch (IllegalArgumentException e) { 219 // ignore invalid format error. 220 LOG.warn("Cannot parse server name from " + logDir); 221 } 222 } 223 return splitLogDistributed(serverNames, logDirs, null); 224 } 225 226 /** 227 * The caller will block until all the hbase:meta log files of the given region server have been 228 * processed - successfully split or an error is encountered - by an available worker region 229 * server. This method must only be called after the region servers have been brought online. 230 * @param logDirs List of log dirs to split 231 * @param filter the Path filter to select specific files for considering 232 * @throws IOException If there was an error while splitting any log file 233 * @return cumulative size of the logfiles split 234 */ 235 public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs, 236 PathFilter filter) throws IOException { 237 MonitoredTask status = TaskMonitor.get().createStatus("Doing distributed log split in " + 238 logDirs + " for serverName=" + serverNames); 239 long totalSize = 0; 240 TaskBatch batch = null; 241 long startTime = 0; 242 FileStatus[] logfiles = getFileList(logDirs, filter); 243 if (logfiles.length != 0) { 244 status.setStatus("Checking directory contents..."); 245 SplitLogCounters.tot_mgr_log_split_batch_start.increment(); 246 LOG.info("Started splitting " + logfiles.length + " logs in " + logDirs + 247 " for " + serverNames); 248 startTime = EnvironmentEdgeManager.currentTime(); 249 batch = new TaskBatch(); 250 for (FileStatus lf : logfiles) { 251 // TODO If the log file is still being written to - which is most likely 252 // the case for the last log file - then its length will show up here 253 // as zero. The size of such a file can only be retrieved after 254 // recover-lease is done. totalSize will be under in most cases and the 255 // metrics that it drives will also be under-reported. 256 totalSize += lf.getLen(); 257 String pathToLog = CommonFSUtils.removeWALRootPath(lf.getPath(), conf); 258 if (!enqueueSplitTask(pathToLog, batch)) { 259 throw new IOException("duplicate log split scheduled for " + lf.getPath()); 260 } 261 } 262 waitForSplittingCompletion(batch, status); 263 } 264 265 if (batch != null && batch.done != batch.installed) { 266 batch.isDead = true; 267 SplitLogCounters.tot_mgr_log_split_batch_err.increment(); 268 LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed 269 + " but only " + batch.done + " done"); 270 String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch; 271 status.abort(msg); 272 throw new IOException(msg); 273 } 274 for (Path logDir : logDirs) { 275 status.setStatus("Cleaning up log directory..."); 276 final FileSystem fs = logDir.getFileSystem(conf); 277 try { 278 if (fs.exists(logDir) && !fs.delete(logDir, false)) { 279 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir); 280 } 281 } catch (IOException ioe) { 282 FileStatus[] files = fs.listStatus(logDir); 283 if (files != null && files.length > 0) { 284 LOG.warn("Returning success without actually splitting and " 285 + "deleting all the log files in path " + logDir + ": " 286 + Arrays.toString(files), ioe); 287 } else { 288 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe); 289 } 290 } 291 SplitLogCounters.tot_mgr_log_split_batch_success.increment(); 292 } 293 String msg = 294 "Finished splitting (more than or equal to) " + StringUtils.humanSize(totalSize) + " (" + 295 totalSize + " bytes) in " + ((batch == null) ? 0 : batch.installed) + " log files in " + 296 logDirs + " in " + 297 ((startTime == 0) ? startTime : (EnvironmentEdgeManager.currentTime() - startTime)) + 298 "ms"; 299 status.markComplete(msg); 300 LOG.info(msg); 301 return totalSize; 302 } 303 304 /** 305 * Add a task entry to coordination if it is not already there. 306 * @param taskname the path of the log to be split 307 * @param batch the batch this task belongs to 308 * @return true if a new entry is created, false if it is already there. 309 */ 310 boolean enqueueSplitTask(String taskname, TaskBatch batch) { 311 lastTaskCreateTime = EnvironmentEdgeManager.currentTime(); 312 String task = getSplitLogManagerCoordination().prepareTask(taskname); 313 Task oldtask = createTaskIfAbsent(task, batch); 314 if (oldtask == null) { 315 // publish the task in the coordination engine 316 getSplitLogManagerCoordination().submitTask(task); 317 return true; 318 } 319 return false; 320 } 321 322 /** 323 * Get the amount of time in milliseconds to wait till next check. 324 * Check less frequently if a bunch of work to do still. At a max, check every minute. 325 * At a minimum, check every 100ms. This is to alleviate case where perhaps there are a bunch of 326 * threads waiting on a completion. For example, if the zk-based implementation, we will scan the 327 * '/hbase/splitWAL' dir every time through this loop. If there are lots of WALs to 328 * split -- could be tens of thousands if big cluster -- then it will take a while. If 329 * the Master has many SCPs waiting on wal splitting -- could be up to 10 x the configured 330 * PE thread count (default would be 160) -- then the Master will be putting up a bunch of 331 * load on zk. 332 */ 333 static int getBatchWaitTimeMillis(int remainingTasks) { 334 return remainingTasks < 10? 100: remainingTasks < 100? 1000: 60_000; 335 } 336 337 private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) { 338 synchronized (batch) { 339 while ((batch.done + batch.error) != batch.installed) { 340 try { 341 status.setStatus("Waiting for distributed tasks to finish. " + " scheduled=" 342 + batch.installed + " done=" + batch.done + " error=" + batch.error); 343 int remaining = batch.installed - (batch.done + batch.error); 344 int actual = activeTasks(batch); 345 if (remaining != actual) { 346 LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual); 347 } 348 int remainingTasks = getSplitLogManagerCoordination().remainingTasksInCoordination(); 349 if (remainingTasks >= 0 && actual > remainingTasks) { 350 LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are " 351 + remainingTasks); 352 } 353 if (remainingTasks == 0 || actual == 0) { 354 LOG.warn("No more task remaining, splitting " 355 + "should have completed. Remaining tasks is " + remainingTasks 356 + ", active tasks in map " + actual); 357 if (remainingTasks == 0 && actual == 0) { 358 return; 359 } 360 } 361 batch.wait(getBatchWaitTimeMillis(remainingTasks)); 362 if (server.isStopped()) { 363 LOG.warn("Stopped while waiting for log splits to be completed"); 364 return; 365 } 366 } catch (InterruptedException e) { 367 LOG.warn("Interrupted while waiting for log splits to be completed"); 368 Thread.currentThread().interrupt(); 369 return; 370 } 371 } 372 } 373 } 374 375 @VisibleForTesting 376 ConcurrentMap<String, Task> getTasks() { 377 return tasks; 378 } 379 380 private int activeTasks(final TaskBatch batch) { 381 int count = 0; 382 for (Task t : tasks.values()) { 383 if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) { 384 count++; 385 } 386 } 387 return count; 388 389 } 390 391 /** 392 * @param path 393 * @param batch 394 * @return null on success, existing task on error 395 */ 396 private Task createTaskIfAbsent(String path, TaskBatch batch) { 397 Task oldtask; 398 // batch.installed is only changed via this function and 399 // a single thread touches batch.installed. 400 Task newtask = new Task(); 401 newtask.batch = batch; 402 oldtask = tasks.putIfAbsent(path, newtask); 403 if (oldtask == null) { 404 batch.installed++; 405 return null; 406 } 407 // new task was not used. 408 synchronized (oldtask) { 409 if (oldtask.isOrphan()) { 410 if (oldtask.status == SUCCESS) { 411 // The task is already done. Do not install the batch for this 412 // task because it might be too late for setDone() to update 413 // batch.done. There is no need for the batch creator to wait for 414 // this task to complete. 415 return (null); 416 } 417 if (oldtask.status == IN_PROGRESS) { 418 oldtask.batch = batch; 419 batch.installed++; 420 LOG.debug("Previously orphan task " + path + " is now being waited upon"); 421 return null; 422 } 423 while (oldtask.status == FAILURE) { 424 LOG.debug("wait for status of task " + path + " to change to DELETED"); 425 SplitLogCounters.tot_mgr_wait_for_zk_delete.increment(); 426 try { 427 oldtask.wait(); 428 } catch (InterruptedException e) { 429 Thread.currentThread().interrupt(); 430 LOG.warn("Interrupted when waiting for znode delete callback"); 431 // fall through to return failure 432 break; 433 } 434 } 435 if (oldtask.status != DELETED) { 436 LOG.warn("Failure because previously failed task" 437 + " state still present. Waiting for znode delete callback" + " path=" + path); 438 return oldtask; 439 } 440 // reinsert the newTask and it must succeed this time 441 Task t = tasks.putIfAbsent(path, newtask); 442 if (t == null) { 443 batch.installed++; 444 return null; 445 } 446 LOG.error(HBaseMarkers.FATAL, "Logic error. Deleted task still present in tasks map"); 447 assert false : "Deleted task still present in tasks map"; 448 return t; 449 } 450 LOG.warn("Failure because two threads can't wait for the same task; path=" + path); 451 return oldtask; 452 } 453 } 454 455 public void stop() { 456 if (choreService != null) { 457 choreService.shutdown(); 458 } 459 if (timeoutMonitor != null) { 460 timeoutMonitor.cancel(true); 461 } 462 } 463 464 void handleDeadWorker(ServerName workerName) { 465 // resubmit the tasks on the TimeoutMonitor thread. Makes it easier 466 // to reason about concurrency. Makes it easier to retry. 467 synchronized (deadWorkersLock) { 468 if (deadWorkers == null) { 469 deadWorkers = new HashSet<>(100); 470 } 471 deadWorkers.add(workerName); 472 } 473 LOG.info("Dead splitlog worker {}", workerName); 474 } 475 476 void handleDeadWorkers(Set<ServerName> serverNames) { 477 synchronized (deadWorkersLock) { 478 if (deadWorkers == null) { 479 deadWorkers = new HashSet<>(100); 480 } 481 deadWorkers.addAll(serverNames); 482 } 483 LOG.info("dead splitlog workers " + serverNames); 484 } 485 486 /** 487 * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed(). 488 * Clients threads use this object to wait for all their tasks to be done. 489 * <p> 490 * All access is synchronized. 491 */ 492 @InterfaceAudience.Private 493 public static class TaskBatch { 494 public int installed = 0; 495 public int done = 0; 496 public int error = 0; 497 public volatile boolean isDead = false; 498 499 @Override 500 public String toString() { 501 return ("installed = " + installed + " done = " + done + " error = " + error); 502 } 503 } 504 505 /** 506 * in memory state of an active task. 507 */ 508 @InterfaceAudience.Private 509 public static class Task { 510 public volatile long last_update; 511 public volatile int last_version; 512 public volatile ServerName cur_worker_name; 513 public volatile TaskBatch batch; 514 public volatile TerminationStatus status; 515 public volatile AtomicInteger incarnation = new AtomicInteger(0); 516 public final AtomicInteger unforcedResubmits = new AtomicInteger(); 517 public volatile boolean resubmitThresholdReached; 518 519 @Override 520 public String toString() { 521 return ("last_update = " + last_update + " last_version = " + last_version 522 + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = " 523 + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch); 524 } 525 526 public Task() { 527 last_version = -1; 528 status = IN_PROGRESS; 529 setUnassigned(); 530 } 531 532 public boolean isOrphan() { 533 return (batch == null || batch.isDead); 534 } 535 536 public boolean isUnassigned() { 537 return (cur_worker_name == null); 538 } 539 540 public void heartbeatNoDetails(long time) { 541 last_update = time; 542 } 543 544 public void heartbeat(long time, int version, ServerName worker) { 545 last_version = version; 546 last_update = time; 547 cur_worker_name = worker; 548 } 549 550 public void setUnassigned() { 551 cur_worker_name = null; 552 last_update = -1; 553 } 554 } 555 556 /** 557 * Periodically checks all active tasks and resubmits the ones that have timed out 558 */ 559 private class TimeoutMonitor extends ScheduledChore { 560 private long lastLog = 0; 561 562 public TimeoutMonitor(final int period, Stoppable stopper) { 563 super("SplitLogManager Timeout Monitor", stopper, period); 564 } 565 566 @Override 567 protected void chore() { 568 if (server.getCoordinatedStateManager() == null) { 569 return; 570 } 571 572 int resubmitted = 0; 573 int unassigned = 0; 574 int tot = 0; 575 boolean found_assigned_task = false; 576 Set<ServerName> localDeadWorkers; 577 578 synchronized (deadWorkersLock) { 579 localDeadWorkers = deadWorkers; 580 deadWorkers = null; 581 } 582 583 for (Map.Entry<String, Task> e : tasks.entrySet()) { 584 String path = e.getKey(); 585 Task task = e.getValue(); 586 ServerName cur_worker = task.cur_worker_name; 587 tot++; 588 // don't easily resubmit a task which hasn't been picked up yet. It 589 // might be a long while before a SplitLogWorker is free to pick up a 590 // task. This is because a SplitLogWorker picks up a task one at a 591 // time. If we want progress when there are no region servers then we 592 // will have to run a SplitLogWorker thread in the Master. 593 if (task.isUnassigned()) { 594 unassigned++; 595 continue; 596 } 597 found_assigned_task = true; 598 if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) { 599 SplitLogCounters.tot_mgr_resubmit_dead_server_task.increment(); 600 if (getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) { 601 resubmitted++; 602 } else { 603 handleDeadWorker(cur_worker); 604 LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker 605 + ", will retry."); 606 } 607 } else if (getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) { 608 resubmitted++; 609 } 610 } 611 if (tot > 0) { 612 long now = EnvironmentEdgeManager.currentTime(); 613 if (now > lastLog + 5000) { 614 lastLog = now; 615 LOG.info("total=" + tot + ", unassigned=" + unassigned + ", tasks=" + tasks); 616 } 617 } 618 if (resubmitted > 0) { 619 LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks"); 620 } 621 // If there are pending tasks and all of them have been unassigned for 622 // some time then put up a RESCAN node to ping the workers. 623 // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes 624 // because a. it is very unlikely that every worker had a 625 // transient error when trying to grab the task b. if there are no 626 // workers then all tasks wills stay unassigned indefinitely and the 627 // manager will be indefinitely creating RESCAN nodes. TODO may be the 628 // master should spawn both a manager and a worker thread to guarantee 629 // that there is always one worker in the system 630 if (tot > 0 631 && !found_assigned_task 632 && ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > unassignedTimeout)) { 633 for (Map.Entry<String, Task> e : tasks.entrySet()) { 634 String key = e.getKey(); 635 Task task = e.getValue(); 636 // we have to do task.isUnassigned() check again because tasks might 637 // have been asynchronously assigned. There is no locking required 638 // for these checks ... it is OK even if tryGetDataSetWatch() is 639 // called unnecessarily for a taskpath 640 if (task.isUnassigned() && (task.status != FAILURE)) { 641 // We just touch the znode to make sure its still there 642 getSplitLogManagerCoordination().checkTaskStillAvailable(key); 643 } 644 } 645 getSplitLogManagerCoordination().checkTasks(); 646 SplitLogCounters.tot_mgr_resubmit_unassigned.increment(); 647 LOG.debug("resubmitting unassigned task(s) after timeout"); 648 } 649 Set<String> failedDeletions = 650 getSplitLogManagerCoordination().getDetails().getFailedDeletions(); 651 // Retry previously failed deletes 652 if (failedDeletions.size() > 0) { 653 List<String> tmpPaths = new ArrayList<>(failedDeletions); 654 for (String tmpPath : tmpPaths) { 655 // deleteNode is an async call 656 getSplitLogManagerCoordination().deleteTask(tmpPath); 657 } 658 failedDeletions.removeAll(tmpPaths); 659 } 660 } 661 } 662 663 public enum ResubmitDirective { 664 CHECK(), FORCE() 665 } 666 667 public enum TerminationStatus { 668 IN_PROGRESS("in_progress"), SUCCESS("success"), FAILURE("failure"), DELETED("deleted"); 669 670 final String statusMsg; 671 672 TerminationStatus(String msg) { 673 statusMsg = msg; 674 } 675 676 @Override 677 public String toString() { 678 return statusMsg; 679 } 680 } 681}