001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK; 020import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE; 021import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED; 022import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE; 023import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS; 024import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS; 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Collections; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Map; 032import java.util.Set; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.ConcurrentMap; 035import java.util.concurrent.atomic.AtomicInteger; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileStatus; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.fs.PathFilter; 041import org.apache.hadoop.hbase.ChoreService; 042import org.apache.hadoop.hbase.ScheduledChore; 043import org.apache.hadoop.hbase.ServerName; 044import org.apache.hadoop.hbase.SplitLogCounters; 045import org.apache.hadoop.hbase.Stoppable; 046import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination; 047import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails; 048import org.apache.hadoop.hbase.log.HBaseMarkers; 049import org.apache.hadoop.hbase.monitoring.MonitoredTask; 050import org.apache.hadoop.hbase.monitoring.TaskMonitor; 051import org.apache.hadoop.hbase.procedure2.util.StringUtils; 052import org.apache.hadoop.hbase.util.CommonFSUtils; 053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 054import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059/** 060 * Distributes the task of log splitting to the available region servers. 061 * Coordination happens via coordination engine. For every log file that has to be split a 062 * task is created. SplitLogWorkers race to grab a task. 063 * 064 * <p>SplitLogManager monitors the tasks that it creates using the 065 * timeoutMonitor thread. If a task's progress is slow then 066 * {@link SplitLogManagerCoordination#checkTasks} will take away the 067 * task from the owner {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} 068 * and the task will be up for grabs again. When the task is done then it is 069 * deleted by SplitLogManager. 070 * 071 * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's 072 * log files. The caller thread waits in this method until all the log files 073 * have been split. 074 * 075 * <p>All the coordination calls made by this class are asynchronous. This is mainly 076 * to help reduce response time seen by the callers. 077 * 078 * <p>There is race in this design between the SplitLogManager and the 079 * SplitLogWorker. SplitLogManager might re-queue a task that has in reality 080 * already been completed by a SplitLogWorker. We rely on the idempotency of 081 * the log splitting task for correctness. 082 * 083 * <p>It is also assumed that every log splitting task is unique and once 084 * completed (either with success or with error) it will be not be submitted 085 * again. If a task is resubmitted then there is a risk that old "delete task" 086 * can delete the re-submission. 087 * @see SplitWALManager for an alternate implementation based on Procedures. 088 * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based 089 * distributed WAL splitter, see SplitWALManager. 090 */ 091@Deprecated 092@InterfaceAudience.Private 093public class SplitLogManager { 094 private static final Logger LOG = LoggerFactory.getLogger(SplitLogManager.class); 095 096 private final MasterServices server; 097 098 private final Configuration conf; 099 private final ChoreService choreService; 100 101 public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min 102 103 private long unassignedTimeout; 104 private long lastTaskCreateTime = Long.MAX_VALUE; 105 106 final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<>(); 107 private TimeoutMonitor timeoutMonitor; 108 109 private volatile Set<ServerName> deadWorkers = null; 110 private final Object deadWorkersLock = new Object(); 111 112 /** 113 * Its OK to construct this object even when region-servers are not online. It does lookup the 114 * orphan tasks in coordination engine but it doesn't block waiting for them to be done. 115 * @param master the master services 116 * @param conf the HBase configuration 117 * @throws IOException 118 */ 119 public SplitLogManager(MasterServices master, Configuration conf) 120 throws IOException { 121 this.server = master; 122 this.conf = conf; 123 // If no CoordinatedStateManager, skip registering as a chore service (The 124 // CoordinatedStateManager is non-null if we are running the ZK-based distributed WAL 125 // splitting. It is null if we are configured to use procedure-based distributed WAL 126 // splitting. 127 if (server.getCoordinatedStateManager() != null) { 128 this.choreService = 129 new ChoreService(master.getServerName().toShortString() + ".splitLogManager."); 130 SplitLogManagerCoordination coordination = getSplitLogManagerCoordination(); 131 Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>()); 132 SplitLogManagerDetails details = new SplitLogManagerDetails(tasks, master, failedDeletions); 133 coordination.setDetails(details); 134 coordination.init(); 135 this.unassignedTimeout = 136 conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); 137 this.timeoutMonitor = 138 new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), 139 master); 140 this.choreService.scheduleChore(timeoutMonitor); 141 } else { 142 this.choreService = null; 143 this.timeoutMonitor = null; 144 } 145 } 146 147 private SplitLogManagerCoordination getSplitLogManagerCoordination() { 148 return server.getCoordinatedStateManager().getSplitLogManagerCoordination(); 149 } 150 151 private List<FileStatus> getFileList(List<Path> logDirs, PathFilter filter) throws IOException { 152 return getFileList(conf, logDirs, filter); 153 } 154 155 /** 156 * Get a list of paths that need to be split given a set of server-specific directories and 157 * optionally a filter. 158 * 159 * See {@link AbstractFSWALProvider#getServerNameFromWALDirectoryName} for more info on directory 160 * layout. 161 * 162 * Should be package-private, but is needed by 163 * {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem, 164 * Configuration, org.apache.hadoop.hbase.wal.WALFactory)} for tests. 165 */ 166 public static List<FileStatus> getFileList(final Configuration conf, final List<Path> logDirs, 167 final PathFilter filter) 168 throws IOException { 169 List<FileStatus> fileStatus = new ArrayList<>(); 170 for (Path logDir : logDirs) { 171 final FileSystem fs = logDir.getFileSystem(conf); 172 if (!fs.exists(logDir)) { 173 LOG.warn(logDir + " doesn't exist. Nothing to do!"); 174 continue; 175 } 176 FileStatus[] logfiles = CommonFSUtils.listStatus(fs, logDir, filter); 177 if (logfiles == null || logfiles.length == 0) { 178 LOG.info("{} dir is empty, no logs to split.", logDir); 179 } else { 180 Collections.addAll(fileStatus, logfiles); 181 } 182 } 183 184 return fileStatus; 185 } 186 187 /** 188 * @param logDir one region sever wal dir path in .logs 189 * @throws IOException if there was an error while splitting any log file 190 * @return cumulative size of the logfiles split 191 * @throws IOException 192 */ 193 public long splitLogDistributed(final Path logDir) throws IOException { 194 List<Path> logDirs = new ArrayList<>(); 195 logDirs.add(logDir); 196 return splitLogDistributed(logDirs); 197 } 198 199 /** 200 * The caller will block until all the log files of the given region server have been processed - 201 * successfully split or an error is encountered - by an available worker region server. This 202 * method must only be called after the region servers have been brought online. 203 * @param logDirs List of log dirs to split 204 * @throws IOException If there was an error while splitting any log file 205 * @return cumulative size of the logfiles split 206 */ 207 public long splitLogDistributed(final List<Path> logDirs) throws IOException { 208 if (logDirs.isEmpty()) { 209 return 0; 210 } 211 Set<ServerName> serverNames = new HashSet<>(); 212 for (Path logDir : logDirs) { 213 try { 214 ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(logDir); 215 if (serverName != null) { 216 serverNames.add(serverName); 217 } 218 } catch (IllegalArgumentException e) { 219 // ignore invalid format error. 220 LOG.warn("Cannot parse server name from " + logDir); 221 } 222 } 223 return splitLogDistributed(serverNames, logDirs, null); 224 } 225 226 /** 227 * The caller will block until all the hbase:meta log files of the given region server have been 228 * processed - successfully split or an error is encountered - by an available worker region 229 * server. This method must only be called after the region servers have been brought online. 230 * @param logDirs List of log dirs to split 231 * @param filter the Path filter to select specific files for considering 232 * @throws IOException If there was an error while splitting any log file 233 * @return cumulative size of the logfiles split 234 */ 235 public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs, 236 PathFilter filter) throws IOException { 237 MonitoredTask status = TaskMonitor.get().createStatus("Doing distributed log split in " + 238 logDirs + " for serverName=" + serverNames); 239 long totalSize = 0; 240 TaskBatch batch = null; 241 long startTime = 0; 242 List<FileStatus> logfiles = getFileList(logDirs, filter); 243 if (!logfiles.isEmpty()) { 244 status.setStatus("Checking directory contents..."); 245 SplitLogCounters.tot_mgr_log_split_batch_start.increment(); 246 LOG.info("Started splitting " + logfiles.size() + " logs in " + logDirs + 247 " for " + serverNames); 248 startTime = EnvironmentEdgeManager.currentTime(); 249 batch = new TaskBatch(); 250 for (FileStatus lf : logfiles) { 251 // TODO If the log file is still being written to - which is most likely 252 // the case for the last log file - then its length will show up here 253 // as zero. The size of such a file can only be retrieved after 254 // recover-lease is done. totalSize will be under in most cases and the 255 // metrics that it drives will also be under-reported. 256 totalSize += lf.getLen(); 257 String pathToLog = CommonFSUtils.removeWALRootPath(lf.getPath(), conf); 258 if (!enqueueSplitTask(pathToLog, batch)) { 259 throw new IOException("duplicate log split scheduled for " + lf.getPath()); 260 } 261 } 262 waitForSplittingCompletion(batch, status); 263 } 264 265 if (batch != null && batch.done != batch.installed) { 266 batch.isDead = true; 267 SplitLogCounters.tot_mgr_log_split_batch_err.increment(); 268 LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed 269 + " but only " + batch.done + " done"); 270 String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch; 271 status.abort(msg); 272 throw new IOException(msg); 273 } 274 for (Path logDir : logDirs) { 275 status.setStatus("Cleaning up log directory..."); 276 final FileSystem fs = logDir.getFileSystem(conf); 277 try { 278 if (fs.exists(logDir) && !fs.delete(logDir, false)) { 279 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir); 280 } 281 } catch (IOException ioe) { 282 FileStatus[] files = fs.listStatus(logDir); 283 if (files != null && files.length > 0) { 284 LOG.warn("Returning success without actually splitting and " 285 + "deleting all the log files in path " + logDir + ": " 286 + Arrays.toString(files), ioe); 287 } else { 288 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe); 289 } 290 } 291 SplitLogCounters.tot_mgr_log_split_batch_success.increment(); 292 } 293 String msg = 294 "Finished splitting (more than or equal to) " + StringUtils.humanSize(totalSize) + " (" + 295 totalSize + " bytes) in " + ((batch == null) ? 0 : batch.installed) + " log files in " + 296 logDirs + " in " + 297 ((startTime == 0) ? startTime : (EnvironmentEdgeManager.currentTime() - startTime)) + 298 "ms"; 299 status.markComplete(msg); 300 LOG.info(msg); 301 return totalSize; 302 } 303 304 /** 305 * Add a task entry to coordination if it is not already there. 306 * @param taskname the path of the log to be split 307 * @param batch the batch this task belongs to 308 * @return true if a new entry is created, false if it is already there. 309 */ 310 boolean enqueueSplitTask(String taskname, TaskBatch batch) { 311 lastTaskCreateTime = EnvironmentEdgeManager.currentTime(); 312 String task = getSplitLogManagerCoordination().prepareTask(taskname); 313 Task oldtask = createTaskIfAbsent(task, batch); 314 if (oldtask == null) { 315 // publish the task in the coordination engine 316 getSplitLogManagerCoordination().submitTask(task); 317 return true; 318 } 319 return false; 320 } 321 322 /** 323 * Get the amount of time in milliseconds to wait till next check. 324 * Check less frequently if a bunch of work to do still. At a max, check every minute. 325 * At a minimum, check every 100ms. This is to alleviate case where perhaps there are a bunch of 326 * threads waiting on a completion. For example, if the zk-based implementation, we will scan the 327 * '/hbase/splitWAL' dir every time through this loop. If there are lots of WALs to 328 * split -- could be tens of thousands if big cluster -- then it will take a while. If 329 * the Master has many SCPs waiting on wal splitting -- could be up to 10 x the configured 330 * PE thread count (default would be 160) -- then the Master will be putting up a bunch of 331 * load on zk. 332 */ 333 static int getBatchWaitTimeMillis(int remainingTasks) { 334 return remainingTasks < 10? 100: remainingTasks < 100? 1000: 60_000; 335 } 336 337 private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) { 338 synchronized (batch) { 339 while ((batch.done + batch.error) != batch.installed) { 340 try { 341 status.setStatus("Waiting for distributed tasks to finish. " + " scheduled=" 342 + batch.installed + " done=" + batch.done + " error=" + batch.error); 343 int remaining = batch.installed - (batch.done + batch.error); 344 int actual = activeTasks(batch); 345 if (remaining != actual) { 346 LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual); 347 } 348 int remainingTasks = getSplitLogManagerCoordination().remainingTasksInCoordination(); 349 if (remainingTasks >= 0 && actual > remainingTasks) { 350 LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are " 351 + remainingTasks); 352 } 353 if (remainingTasks == 0 || actual == 0) { 354 LOG.warn("No more task remaining, splitting " 355 + "should have completed. Remaining tasks is " + remainingTasks 356 + ", active tasks in map " + actual); 357 if (remainingTasks == 0 && actual == 0) { 358 return; 359 } 360 } 361 batch.wait(getBatchWaitTimeMillis(remainingTasks)); 362 if (server.isStopped()) { 363 LOG.warn("Stopped while waiting for log splits to be completed"); 364 return; 365 } 366 } catch (InterruptedException e) { 367 LOG.warn("Interrupted while waiting for log splits to be completed"); 368 Thread.currentThread().interrupt(); 369 return; 370 } 371 } 372 } 373 } 374 375 ConcurrentMap<String, Task> getTasks() { 376 return tasks; 377 } 378 379 private int activeTasks(final TaskBatch batch) { 380 int count = 0; 381 for (Task t : tasks.values()) { 382 if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) { 383 count++; 384 } 385 } 386 return count; 387 388 } 389 390 /** 391 * @param path 392 * @param batch 393 * @return null on success, existing task on error 394 */ 395 private Task createTaskIfAbsent(String path, TaskBatch batch) { 396 Task oldtask; 397 // batch.installed is only changed via this function and 398 // a single thread touches batch.installed. 399 Task newtask = new Task(); 400 newtask.batch = batch; 401 oldtask = tasks.putIfAbsent(path, newtask); 402 if (oldtask == null) { 403 batch.installed++; 404 return null; 405 } 406 // new task was not used. 407 synchronized (oldtask) { 408 if (oldtask.isOrphan()) { 409 if (oldtask.status == SUCCESS) { 410 // The task is already done. Do not install the batch for this 411 // task because it might be too late for setDone() to update 412 // batch.done. There is no need for the batch creator to wait for 413 // this task to complete. 414 return (null); 415 } 416 if (oldtask.status == IN_PROGRESS) { 417 oldtask.batch = batch; 418 batch.installed++; 419 LOG.debug("Previously orphan task " + path + " is now being waited upon"); 420 return null; 421 } 422 while (oldtask.status == FAILURE) { 423 LOG.debug("wait for status of task " + path + " to change to DELETED"); 424 SplitLogCounters.tot_mgr_wait_for_zk_delete.increment(); 425 try { 426 oldtask.wait(); 427 } catch (InterruptedException e) { 428 Thread.currentThread().interrupt(); 429 LOG.warn("Interrupted when waiting for znode delete callback"); 430 // fall through to return failure 431 break; 432 } 433 } 434 if (oldtask.status != DELETED) { 435 LOG.warn("Failure because previously failed task" 436 + " state still present. Waiting for znode delete callback" + " path=" + path); 437 return oldtask; 438 } 439 // reinsert the newTask and it must succeed this time 440 Task t = tasks.putIfAbsent(path, newtask); 441 if (t == null) { 442 batch.installed++; 443 return null; 444 } 445 LOG.error(HBaseMarkers.FATAL, "Logic error. Deleted task still present in tasks map"); 446 assert false : "Deleted task still present in tasks map"; 447 return t; 448 } 449 LOG.warn("Failure because two threads can't wait for the same task; path=" + path); 450 return oldtask; 451 } 452 } 453 454 public void stop() { 455 if (choreService != null) { 456 choreService.shutdown(); 457 } 458 if (timeoutMonitor != null) { 459 timeoutMonitor.cancel(true); 460 } 461 } 462 463 void handleDeadWorker(ServerName workerName) { 464 // resubmit the tasks on the TimeoutMonitor thread. Makes it easier 465 // to reason about concurrency. Makes it easier to retry. 466 synchronized (deadWorkersLock) { 467 if (deadWorkers == null) { 468 deadWorkers = new HashSet<>(100); 469 } 470 deadWorkers.add(workerName); 471 } 472 LOG.info("Dead splitlog worker {}", workerName); 473 } 474 475 void handleDeadWorkers(Set<ServerName> serverNames) { 476 synchronized (deadWorkersLock) { 477 if (deadWorkers == null) { 478 deadWorkers = new HashSet<>(100); 479 } 480 deadWorkers.addAll(serverNames); 481 } 482 LOG.info("dead splitlog workers " + serverNames); 483 } 484 485 /** 486 * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed(). 487 * Clients threads use this object to wait for all their tasks to be done. 488 * <p> 489 * All access is synchronized. 490 */ 491 @InterfaceAudience.Private 492 public static class TaskBatch { 493 public int installed = 0; 494 public int done = 0; 495 public int error = 0; 496 public volatile boolean isDead = false; 497 498 @Override 499 public String toString() { 500 return ("installed = " + installed + " done = " + done + " error = " + error); 501 } 502 } 503 504 /** 505 * in memory state of an active task. 506 */ 507 @InterfaceAudience.Private 508 public static class Task { 509 public volatile long last_update; 510 public volatile int last_version; 511 public volatile ServerName cur_worker_name; 512 public volatile TaskBatch batch; 513 public volatile TerminationStatus status; 514 public volatile AtomicInteger incarnation = new AtomicInteger(0); 515 public final AtomicInteger unforcedResubmits = new AtomicInteger(); 516 public volatile boolean resubmitThresholdReached; 517 518 @Override 519 public String toString() { 520 return ("last_update = " + last_update + " last_version = " + last_version 521 + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = " 522 + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch); 523 } 524 525 public Task() { 526 last_version = -1; 527 status = IN_PROGRESS; 528 setUnassigned(); 529 } 530 531 public boolean isOrphan() { 532 return (batch == null || batch.isDead); 533 } 534 535 public boolean isUnassigned() { 536 return (cur_worker_name == null); 537 } 538 539 public void heartbeatNoDetails(long time) { 540 last_update = time; 541 } 542 543 public void heartbeat(long time, int version, ServerName worker) { 544 last_version = version; 545 last_update = time; 546 cur_worker_name = worker; 547 } 548 549 public void setUnassigned() { 550 cur_worker_name = null; 551 last_update = -1; 552 } 553 } 554 555 /** 556 * Periodically checks all active tasks and resubmits the ones that have timed out 557 */ 558 private class TimeoutMonitor extends ScheduledChore { 559 private long lastLog = 0; 560 561 public TimeoutMonitor(final int period, Stoppable stopper) { 562 super("SplitLogManager Timeout Monitor", stopper, period); 563 } 564 565 @Override 566 protected void chore() { 567 if (server.getCoordinatedStateManager() == null) { 568 return; 569 } 570 571 int resubmitted = 0; 572 int unassigned = 0; 573 int tot = 0; 574 boolean found_assigned_task = false; 575 Set<ServerName> localDeadWorkers; 576 577 synchronized (deadWorkersLock) { 578 localDeadWorkers = deadWorkers; 579 deadWorkers = null; 580 } 581 582 for (Map.Entry<String, Task> e : tasks.entrySet()) { 583 String path = e.getKey(); 584 Task task = e.getValue(); 585 ServerName cur_worker = task.cur_worker_name; 586 tot++; 587 // don't easily resubmit a task which hasn't been picked up yet. It 588 // might be a long while before a SplitLogWorker is free to pick up a 589 // task. This is because a SplitLogWorker picks up a task one at a 590 // time. If we want progress when there are no region servers then we 591 // will have to run a SplitLogWorker thread in the Master. 592 if (task.isUnassigned()) { 593 unassigned++; 594 continue; 595 } 596 found_assigned_task = true; 597 if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) { 598 SplitLogCounters.tot_mgr_resubmit_dead_server_task.increment(); 599 if (getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) { 600 resubmitted++; 601 } else { 602 handleDeadWorker(cur_worker); 603 LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker 604 + ", will retry."); 605 } 606 } else if (getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) { 607 resubmitted++; 608 } 609 } 610 if (tot > 0) { 611 long now = EnvironmentEdgeManager.currentTime(); 612 if (now > lastLog + 5000) { 613 lastLog = now; 614 LOG.info("total=" + tot + ", unassigned=" + unassigned + ", tasks=" + tasks); 615 } 616 } 617 if (resubmitted > 0) { 618 LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks"); 619 } 620 // If there are pending tasks and all of them have been unassigned for 621 // some time then put up a RESCAN node to ping the workers. 622 // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes 623 // because a. it is very unlikely that every worker had a 624 // transient error when trying to grab the task b. if there are no 625 // workers then all tasks wills stay unassigned indefinitely and the 626 // manager will be indefinitely creating RESCAN nodes. TODO may be the 627 // master should spawn both a manager and a worker thread to guarantee 628 // that there is always one worker in the system 629 if (tot > 0 630 && !found_assigned_task 631 && ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > unassignedTimeout)) { 632 for (Map.Entry<String, Task> e : tasks.entrySet()) { 633 String key = e.getKey(); 634 Task task = e.getValue(); 635 // we have to do task.isUnassigned() check again because tasks might 636 // have been asynchronously assigned. There is no locking required 637 // for these checks ... it is OK even if tryGetDataSetWatch() is 638 // called unnecessarily for a taskpath 639 if (task.isUnassigned() && (task.status != FAILURE)) { 640 // We just touch the znode to make sure its still there 641 getSplitLogManagerCoordination().checkTaskStillAvailable(key); 642 } 643 } 644 getSplitLogManagerCoordination().checkTasks(); 645 SplitLogCounters.tot_mgr_resubmit_unassigned.increment(); 646 LOG.debug("resubmitting unassigned task(s) after timeout"); 647 } 648 Set<String> failedDeletions = 649 getSplitLogManagerCoordination().getDetails().getFailedDeletions(); 650 // Retry previously failed deletes 651 if (failedDeletions.size() > 0) { 652 List<String> tmpPaths = new ArrayList<>(failedDeletions); 653 for (String tmpPath : tmpPaths) { 654 // deleteNode is an async call 655 getSplitLogManagerCoordination().deleteTask(tmpPath); 656 } 657 failedDeletions.removeAll(tmpPaths); 658 } 659 } 660 } 661 662 public enum ResubmitDirective { 663 CHECK(), FORCE() 664 } 665 666 public enum TerminationStatus { 667 IN_PROGRESS("in_progress"), SUCCESS("success"), FAILURE("failure"), DELETED("deleted"); 668 669 final String statusMsg; 670 671 TerminationStatus(String msg) { 672 statusMsg = msg; 673 } 674 675 @Override 676 public String toString() { 677 return statusMsg; 678 } 679 } 680}