001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK; 021import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE; 022import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED; 023import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE; 024import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS; 025import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.HashSet; 032import java.util.List; 033import java.util.Map; 034import java.util.Set; 035import java.util.concurrent.ConcurrentHashMap; 036import java.util.concurrent.ConcurrentMap; 037import java.util.concurrent.atomic.AtomicInteger; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FileStatus; 040import org.apache.hadoop.fs.FileSystem; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.fs.PathFilter; 043import org.apache.hadoop.hbase.ChoreService; 044import org.apache.hadoop.hbase.ScheduledChore; 045import org.apache.hadoop.hbase.ServerName; 046import org.apache.hadoop.hbase.SplitLogCounters; 047import org.apache.hadoop.hbase.Stoppable; 048import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination; 049import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails; 050import org.apache.hadoop.hbase.log.HBaseMarkers; 051import org.apache.hadoop.hbase.monitoring.MonitoredTask; 052import org.apache.hadoop.hbase.monitoring.TaskMonitor; 053import org.apache.hadoop.hbase.procedure2.util.StringUtils; 054import org.apache.hadoop.hbase.util.CommonFSUtils; 055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 056import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 057import org.apache.yetus.audience.InterfaceAudience; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061/** 062 * Distributes the task of log splitting to the available region servers. Coordination happens via 063 * coordination engine. For every log file that has to be split a task is created. SplitLogWorkers 064 * race to grab a task. 065 * <p> 066 * SplitLogManager monitors the tasks that it creates using the timeoutMonitor thread. If a task's 067 * progress is slow then {@link SplitLogManagerCoordination#checkTasks} will take away the task from 068 * the owner {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} and the task will be up for 069 * grabs again. When the task is done then it is deleted by SplitLogManager. 070 * <p> 071 * Clients call {@link #splitLogDistributed(Path)} to split a region server's log files. The caller 072 * thread waits in this method until all the log files have been split. 073 * <p> 074 * All the coordination calls made by this class are asynchronous. This is mainly to help reduce 075 * response time seen by the callers. 076 * <p> 077 * There is race in this design between the SplitLogManager and the SplitLogWorker. SplitLogManager 078 * might re-queue a task that has in reality already been completed by a SplitLogWorker. We rely on 079 * the idempotency of the log splitting task for correctness. 080 * <p> 081 * It is also assumed that every log splitting task is unique and once completed (either with 082 * success or with error) it will be not be submitted again. If a task is resubmitted then there is 083 * a risk that old "delete task" can delete the re-submission. 084 * @see SplitWALManager for an alternate implementation based on Procedures. 085 * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based 086 * distributed WAL splitter, see SplitWALManager. 087 */ 088@Deprecated 089@InterfaceAudience.Private 090public class SplitLogManager { 091 private static final Logger LOG = LoggerFactory.getLogger(SplitLogManager.class); 092 093 private final MasterServices server; 094 095 private final Configuration conf; 096 private final ChoreService choreService; 097 098 public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min 099 100 private long unassignedTimeout; 101 private long lastTaskCreateTime = Long.MAX_VALUE; 102 103 final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<>(); 104 private TimeoutMonitor timeoutMonitor; 105 106 private volatile Set<ServerName> deadWorkers = null; 107 private final Object deadWorkersLock = new Object(); 108 109 /** 110 * Its OK to construct this object even when region-servers are not online. It does lookup the 111 * orphan tasks in coordination engine but it doesn't block waiting for them to be done. 112 * @param master the master services 113 * @param conf the HBase configuration n 114 */ 115 public SplitLogManager(MasterServices master, Configuration conf) throws IOException { 116 this.server = master; 117 this.conf = conf; 118 // If no CoordinatedStateManager, skip registering as a chore service (The 119 // CoordinatedStateManager is non-null if we are running the ZK-based distributed WAL 120 // splitting. It is null if we are configured to use procedure-based distributed WAL 121 // splitting. 122 if (server.getCoordinatedStateManager() != null) { 123 this.choreService = 124 new ChoreService(master.getServerName().toShortString() + ".splitLogManager."); 125 SplitLogManagerCoordination coordination = getSplitLogManagerCoordination(); 126 Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>()); 127 SplitLogManagerDetails details = new SplitLogManagerDetails(tasks, master, failedDeletions); 128 coordination.setDetails(details); 129 coordination.init(); 130 this.unassignedTimeout = 131 conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); 132 this.timeoutMonitor = new TimeoutMonitor( 133 conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), master); 134 this.choreService.scheduleChore(timeoutMonitor); 135 } else { 136 this.choreService = null; 137 this.timeoutMonitor = null; 138 } 139 } 140 141 private SplitLogManagerCoordination getSplitLogManagerCoordination() { 142 return server.getCoordinatedStateManager().getSplitLogManagerCoordination(); 143 } 144 145 private List<FileStatus> getFileList(List<Path> logDirs, PathFilter filter) throws IOException { 146 return getFileList(conf, logDirs, filter); 147 } 148 149 /** 150 * Get a list of paths that need to be split given a set of server-specific directories and 151 * optionally a filter. 152 * <p/> 153 * See {@link AbstractFSWALProvider#getServerNameFromWALDirectoryName} for more info on directory 154 * layout. 155 * <p/> 156 * Should be package-private, but is needed by 157 * {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem, Configuration, org.apache.hadoop.hbase.wal.WALFactory)} 158 * for tests. 159 */ 160 public static List<FileStatus> getFileList(final Configuration conf, final List<Path> logDirs, 161 final PathFilter filter) throws IOException { 162 List<FileStatus> fileStatus = new ArrayList<>(); 163 for (Path logDir : logDirs) { 164 final FileSystem fs = logDir.getFileSystem(conf); 165 if (!fs.exists(logDir)) { 166 LOG.warn(logDir + " doesn't exist. Nothing to do!"); 167 continue; 168 } 169 FileStatus[] logfiles = CommonFSUtils.listStatus(fs, logDir, filter); 170 if (logfiles == null || logfiles.length == 0) { 171 LOG.info("{} dir is empty, no logs to split.", logDir); 172 } else { 173 Collections.addAll(fileStatus, logfiles); 174 } 175 } 176 177 return fileStatus; 178 } 179 180 /** 181 * @param logDir one region sever wal dir path in .logs 182 * @throws IOException if there was an error while splitting any log file 183 * @return cumulative size of the logfiles split n 184 */ 185 public long splitLogDistributed(final Path logDir) throws IOException { 186 List<Path> logDirs = new ArrayList<>(); 187 logDirs.add(logDir); 188 return splitLogDistributed(logDirs); 189 } 190 191 /** 192 * The caller will block until all the log files of the given region server have been processed - 193 * successfully split or an error is encountered - by an available worker region server. This 194 * method must only be called after the region servers have been brought online. 195 * @param logDirs List of log dirs to split 196 * @throws IOException If there was an error while splitting any log file 197 * @return cumulative size of the logfiles split 198 */ 199 public long splitLogDistributed(final List<Path> logDirs) throws IOException { 200 if (logDirs.isEmpty()) { 201 return 0; 202 } 203 Set<ServerName> serverNames = new HashSet<>(); 204 for (Path logDir : logDirs) { 205 try { 206 ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(logDir); 207 if (serverName != null) { 208 serverNames.add(serverName); 209 } 210 } catch (IllegalArgumentException e) { 211 // ignore invalid format error. 212 LOG.warn("Cannot parse server name from " + logDir); 213 } 214 } 215 return splitLogDistributed(serverNames, logDirs, null); 216 } 217 218 /** 219 * The caller will block until all the hbase:meta log files of the given region server have been 220 * processed - successfully split or an error is encountered - by an available worker region 221 * server. This method must only be called after the region servers have been brought online. 222 * @param logDirs List of log dirs to split 223 * @param filter the Path filter to select specific files for considering 224 * @throws IOException If there was an error while splitting any log file 225 * @return cumulative size of the logfiles split 226 */ 227 public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs, 228 PathFilter filter) throws IOException { 229 MonitoredTask status = TaskMonitor.get() 230 .createStatus("Doing distributed log split in " + logDirs + " for serverName=" + serverNames); 231 long totalSize = 0; 232 TaskBatch batch = null; 233 long startTime = 0; 234 List<FileStatus> logfiles = getFileList(logDirs, filter); 235 if (!logfiles.isEmpty()) { 236 status.setStatus("Checking directory contents..."); 237 SplitLogCounters.tot_mgr_log_split_batch_start.increment(); 238 LOG.info( 239 "Started splitting " + logfiles.size() + " logs in " + logDirs + " for " + serverNames); 240 startTime = EnvironmentEdgeManager.currentTime(); 241 batch = new TaskBatch(); 242 for (FileStatus lf : logfiles) { 243 // TODO If the log file is still being written to - which is most likely 244 // the case for the last log file - then its length will show up here 245 // as zero. The size of such a file can only be retrieved after 246 // recover-lease is done. totalSize will be under in most cases and the 247 // metrics that it drives will also be under-reported. 248 totalSize += lf.getLen(); 249 String pathToLog = CommonFSUtils.removeWALRootPath(lf.getPath(), conf); 250 if (!enqueueSplitTask(pathToLog, batch)) { 251 throw new IOException("duplicate log split scheduled for " + lf.getPath()); 252 } 253 } 254 waitForSplittingCompletion(batch, status); 255 } 256 257 if (batch != null && batch.done != batch.installed) { 258 batch.isDead = true; 259 SplitLogCounters.tot_mgr_log_split_batch_err.increment(); 260 LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed 261 + " but only " + batch.done + " done"); 262 String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch; 263 status.abort(msg); 264 throw new IOException(msg); 265 } 266 for (Path logDir : logDirs) { 267 status.setStatus("Cleaning up log directory..."); 268 final FileSystem fs = logDir.getFileSystem(conf); 269 try { 270 if (fs.exists(logDir) && !fs.delete(logDir, false)) { 271 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir); 272 } 273 } catch (IOException ioe) { 274 FileStatus[] files = fs.listStatus(logDir); 275 if (files != null && files.length > 0) { 276 LOG.warn( 277 "Returning success without actually splitting and " 278 + "deleting all the log files in path " + logDir + ": " + Arrays.toString(files), 279 ioe); 280 } else { 281 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe); 282 } 283 } 284 SplitLogCounters.tot_mgr_log_split_batch_success.increment(); 285 } 286 String msg = "Finished splitting (more than or equal to) " + StringUtils.humanSize(totalSize) 287 + " (" + totalSize + " bytes) in " + ((batch == null) ? 0 : batch.installed) 288 + " log files in " + logDirs + " in " 289 + ((startTime == 0) ? startTime : (EnvironmentEdgeManager.currentTime() - startTime)) + "ms"; 290 status.markComplete(msg); 291 LOG.info(msg); 292 return totalSize; 293 } 294 295 /** 296 * Add a task entry to coordination if it is not already there. 297 * @param taskname the path of the log to be split 298 * @param batch the batch this task belongs to 299 * @return true if a new entry is created, false if it is already there. 300 */ 301 boolean enqueueSplitTask(String taskname, TaskBatch batch) { 302 lastTaskCreateTime = EnvironmentEdgeManager.currentTime(); 303 String task = getSplitLogManagerCoordination().prepareTask(taskname); 304 Task oldtask = createTaskIfAbsent(task, batch); 305 if (oldtask == null) { 306 // publish the task in the coordination engine 307 getSplitLogManagerCoordination().submitTask(task); 308 return true; 309 } 310 return false; 311 } 312 313 /** 314 * Get the amount of time in milliseconds to wait till next check. Check less frequently if a 315 * bunch of work to do still. At a max, check every minute. At a minimum, check every 100ms. This 316 * is to alleviate case where perhaps there are a bunch of threads waiting on a completion. For 317 * example, if the zk-based implementation, we will scan the '/hbase/splitWAL' dir every time 318 * through this loop. If there are lots of WALs to split -- could be tens of thousands if big 319 * cluster -- then it will take a while. If the Master has many SCPs waiting on wal splitting -- 320 * could be up to 10 x the configured PE thread count (default would be 160) -- then the Master 321 * will be putting up a bunch of load on zk. 322 */ 323 static int getBatchWaitTimeMillis(int remainingTasks) { 324 return remainingTasks < 10 ? 100 : remainingTasks < 100 ? 1000 : 60_000; 325 } 326 327 private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) { 328 synchronized (batch) { 329 while ((batch.done + batch.error) != batch.installed) { 330 try { 331 status.setStatus("Waiting for distributed tasks to finish. " + " scheduled=" 332 + batch.installed + " done=" + batch.done + " error=" + batch.error); 333 int remaining = batch.installed - (batch.done + batch.error); 334 int actual = activeTasks(batch); 335 if (remaining != actual) { 336 LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual); 337 } 338 int remainingTasks = getSplitLogManagerCoordination().remainingTasksInCoordination(); 339 if (remainingTasks >= 0 && actual > remainingTasks) { 340 LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are " 341 + remainingTasks); 342 } 343 if (remainingTasks == 0 || actual == 0) { 344 LOG.warn( 345 "No more task remaining, splitting " + "should have completed. Remaining tasks is " 346 + remainingTasks + ", active tasks in map " + actual); 347 if (remainingTasks == 0 && actual == 0) { 348 return; 349 } 350 } 351 batch.wait(getBatchWaitTimeMillis(remainingTasks)); 352 if (server.isStopped()) { 353 LOG.warn("Stopped while waiting for log splits to be completed"); 354 return; 355 } 356 } catch (InterruptedException e) { 357 LOG.warn("Interrupted while waiting for log splits to be completed"); 358 Thread.currentThread().interrupt(); 359 return; 360 } 361 } 362 } 363 } 364 365 ConcurrentMap<String, Task> getTasks() { 366 return tasks; 367 } 368 369 private int activeTasks(final TaskBatch batch) { 370 int count = 0; 371 for (Task t : tasks.values()) { 372 if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) { 373 count++; 374 } 375 } 376 return count; 377 378 } 379 380 /** 381 * nn * @return null on success, existing task on error 382 */ 383 private Task createTaskIfAbsent(String path, TaskBatch batch) { 384 Task oldtask; 385 // batch.installed is only changed via this function and 386 // a single thread touches batch.installed. 387 Task newtask = new Task(); 388 newtask.batch = batch; 389 oldtask = tasks.putIfAbsent(path, newtask); 390 if (oldtask == null) { 391 batch.installed++; 392 return null; 393 } 394 // new task was not used. 395 synchronized (oldtask) { 396 if (oldtask.isOrphan()) { 397 if (oldtask.status == SUCCESS) { 398 // The task is already done. Do not install the batch for this 399 // task because it might be too late for setDone() to update 400 // batch.done. There is no need for the batch creator to wait for 401 // this task to complete. 402 return (null); 403 } 404 if (oldtask.status == IN_PROGRESS) { 405 oldtask.batch = batch; 406 batch.installed++; 407 LOG.debug("Previously orphan task " + path + " is now being waited upon"); 408 return null; 409 } 410 while (oldtask.status == FAILURE) { 411 LOG.debug("wait for status of task " + path + " to change to DELETED"); 412 SplitLogCounters.tot_mgr_wait_for_zk_delete.increment(); 413 try { 414 oldtask.wait(); 415 } catch (InterruptedException e) { 416 Thread.currentThread().interrupt(); 417 LOG.warn("Interrupted when waiting for znode delete callback"); 418 // fall through to return failure 419 break; 420 } 421 } 422 if (oldtask.status != DELETED) { 423 LOG.warn("Failure because previously failed task" 424 + " state still present. Waiting for znode delete callback" + " path=" + path); 425 return oldtask; 426 } 427 // reinsert the newTask and it must succeed this time 428 Task t = tasks.putIfAbsent(path, newtask); 429 if (t == null) { 430 batch.installed++; 431 return null; 432 } 433 LOG.error(HBaseMarkers.FATAL, "Logic error. Deleted task still present in tasks map"); 434 assert false : "Deleted task still present in tasks map"; 435 return t; 436 } 437 LOG.warn("Failure because two threads can't wait for the same task; path=" + path); 438 return oldtask; 439 } 440 } 441 442 public void stop() { 443 if (choreService != null) { 444 choreService.shutdown(); 445 } 446 if (timeoutMonitor != null) { 447 timeoutMonitor.shutdown(true); 448 } 449 } 450 451 void handleDeadWorker(ServerName workerName) { 452 // resubmit the tasks on the TimeoutMonitor thread. Makes it easier 453 // to reason about concurrency. Makes it easier to retry. 454 synchronized (deadWorkersLock) { 455 if (deadWorkers == null) { 456 deadWorkers = new HashSet<>(100); 457 } 458 deadWorkers.add(workerName); 459 } 460 LOG.info("Dead splitlog worker {}", workerName); 461 } 462 463 void handleDeadWorkers(Set<ServerName> serverNames) { 464 synchronized (deadWorkersLock) { 465 if (deadWorkers == null) { 466 deadWorkers = new HashSet<>(100); 467 } 468 deadWorkers.addAll(serverNames); 469 } 470 LOG.info("dead splitlog workers " + serverNames); 471 } 472 473 /** 474 * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed(). 475 * Clients threads use this object to wait for all their tasks to be done. 476 * <p> 477 * All access is synchronized. 478 */ 479 @InterfaceAudience.Private 480 public static class TaskBatch { 481 public int installed = 0; 482 public int done = 0; 483 public int error = 0; 484 public volatile boolean isDead = false; 485 486 @Override 487 public String toString() { 488 return ("installed = " + installed + " done = " + done + " error = " + error); 489 } 490 } 491 492 /** 493 * in memory state of an active task. 494 */ 495 @InterfaceAudience.Private 496 public static class Task { 497 public volatile long last_update; 498 public volatile int last_version; 499 public volatile ServerName cur_worker_name; 500 public volatile TaskBatch batch; 501 public volatile TerminationStatus status; 502 public volatile AtomicInteger incarnation = new AtomicInteger(0); 503 public final AtomicInteger unforcedResubmits = new AtomicInteger(); 504 public volatile boolean resubmitThresholdReached; 505 506 @Override 507 public String toString() { 508 return ("last_update = " + last_update + " last_version = " + last_version 509 + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = " 510 + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch); 511 } 512 513 public Task() { 514 last_version = -1; 515 status = IN_PROGRESS; 516 setUnassigned(); 517 } 518 519 public boolean isOrphan() { 520 return (batch == null || batch.isDead); 521 } 522 523 public boolean isUnassigned() { 524 return (cur_worker_name == null); 525 } 526 527 public void heartbeatNoDetails(long time) { 528 last_update = time; 529 } 530 531 public void heartbeat(long time, int version, ServerName worker) { 532 last_version = version; 533 last_update = time; 534 cur_worker_name = worker; 535 } 536 537 public void setUnassigned() { 538 cur_worker_name = null; 539 last_update = -1; 540 } 541 } 542 543 /** 544 * Periodically checks all active tasks and resubmits the ones that have timed out 545 */ 546 private class TimeoutMonitor extends ScheduledChore { 547 private long lastLog = 0; 548 549 public TimeoutMonitor(final int period, Stoppable stopper) { 550 super("SplitLogManager Timeout Monitor", stopper, period); 551 } 552 553 @Override 554 protected void chore() { 555 if (server.getCoordinatedStateManager() == null) { 556 return; 557 } 558 559 int resubmitted = 0; 560 int unassigned = 0; 561 int tot = 0; 562 boolean found_assigned_task = false; 563 Set<ServerName> localDeadWorkers; 564 565 synchronized (deadWorkersLock) { 566 localDeadWorkers = deadWorkers; 567 deadWorkers = null; 568 } 569 570 for (Map.Entry<String, Task> e : tasks.entrySet()) { 571 String path = e.getKey(); 572 Task task = e.getValue(); 573 ServerName cur_worker = task.cur_worker_name; 574 tot++; 575 // don't easily resubmit a task which hasn't been picked up yet. It 576 // might be a long while before a SplitLogWorker is free to pick up a 577 // task. This is because a SplitLogWorker picks up a task one at a 578 // time. If we want progress when there are no region servers then we 579 // will have to run a SplitLogWorker thread in the Master. 580 if (task.isUnassigned()) { 581 unassigned++; 582 continue; 583 } 584 found_assigned_task = true; 585 if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) { 586 SplitLogCounters.tot_mgr_resubmit_dead_server_task.increment(); 587 if (getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) { 588 resubmitted++; 589 } else { 590 handleDeadWorker(cur_worker); 591 LOG.warn( 592 "Failed to resubmit task " + path + " owned by dead " + cur_worker + ", will retry."); 593 } 594 } else if (getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) { 595 resubmitted++; 596 } 597 } 598 if (tot > 0) { 599 long now = EnvironmentEdgeManager.currentTime(); 600 if (now > lastLog + 5000) { 601 lastLog = now; 602 LOG.info("total=" + tot + ", unassigned=" + unassigned + ", tasks=" + tasks); 603 } 604 } 605 if (resubmitted > 0) { 606 LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks"); 607 } 608 // If there are pending tasks and all of them have been unassigned for 609 // some time then put up a RESCAN node to ping the workers. 610 // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes 611 // because a. it is very unlikely that every worker had a 612 // transient error when trying to grab the task b. if there are no 613 // workers then all tasks wills stay unassigned indefinitely and the 614 // manager will be indefinitely creating RESCAN nodes. TODO may be the 615 // master should spawn both a manager and a worker thread to guarantee 616 // that there is always one worker in the system 617 if ( 618 tot > 0 && !found_assigned_task 619 && ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > unassignedTimeout) 620 ) { 621 for (Map.Entry<String, Task> e : tasks.entrySet()) { 622 String key = e.getKey(); 623 Task task = e.getValue(); 624 // we have to do task.isUnassigned() check again because tasks might 625 // have been asynchronously assigned. There is no locking required 626 // for these checks ... it is OK even if tryGetDataSetWatch() is 627 // called unnecessarily for a taskpath 628 if (task.isUnassigned() && (task.status != FAILURE)) { 629 // We just touch the znode to make sure its still there 630 getSplitLogManagerCoordination().checkTaskStillAvailable(key); 631 } 632 } 633 getSplitLogManagerCoordination().checkTasks(); 634 SplitLogCounters.tot_mgr_resubmit_unassigned.increment(); 635 LOG.debug("resubmitting unassigned task(s) after timeout"); 636 } 637 Set<String> failedDeletions = 638 getSplitLogManagerCoordination().getDetails().getFailedDeletions(); 639 // Retry previously failed deletes 640 if (failedDeletions.size() > 0) { 641 List<String> tmpPaths = new ArrayList<>(failedDeletions); 642 for (String tmpPath : tmpPaths) { 643 // deleteNode is an async call 644 getSplitLogManagerCoordination().deleteTask(tmpPath); 645 } 646 failedDeletions.removeAll(tmpPaths); 647 } 648 } 649 } 650 651 public enum ResubmitDirective { 652 CHECK(), 653 FORCE() 654 } 655 656 public enum TerminationStatus { 657 IN_PROGRESS("in_progress"), 658 SUCCESS("success"), 659 FAILURE("failure"), 660 DELETED("deleted"); 661 662 final String statusMsg; 663 664 TerminationStatus(String msg) { 665 statusMsg = msg; 666 } 667 668 @Override 669 public String toString() { 670 return statusMsg; 671 } 672 } 673}