001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Comparator; 025import java.util.HashSet; 026import java.util.Iterator; 027import java.util.LinkedList; 028import java.util.Set; 029import java.util.concurrent.LinkedTransferQueue; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.atomic.AtomicLong; 033import java.util.concurrent.atomic.AtomicReference; 034import java.util.concurrent.locks.Condition; 035import java.util.concurrent.locks.ReentrantLock; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FSDataOutputStream; 038import org.apache.hadoop.fs.FileAlreadyExistsException; 039import org.apache.hadoop.fs.FileStatus; 040import org.apache.hadoop.fs.FileSystem; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.fs.PathFilter; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.log.HBaseMarkers; 045import org.apache.hadoop.hbase.procedure2.Procedure; 046import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 047import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase; 048import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; 049import org.apache.hadoop.hbase.procedure2.util.ByteSlot; 050import org.apache.hadoop.hbase.procedure2.util.StringUtils; 051import org.apache.hadoop.hbase.util.CommonFSUtils; 052import org.apache.hadoop.hbase.util.Threads; 053import org.apache.hadoop.ipc.RemoteException; 054import org.apache.yetus.audience.InterfaceAudience; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 059import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue; 060 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; 062 063/** 064 * WAL implementation of the ProcedureStore. 065 * <p/> 066 * When starting, the upper layer will first call {@link #start(int)}, then {@link #recoverLease()}, 067 * then {@link #load(ProcedureLoader)}. 068 * <p/> 069 * In {@link #recoverLease()}, we will get the lease by closing all the existing wal files(by 070 * calling recoverFileLease), and creating a new wal writer. And we will also get the list of all 071 * the old wal files. 072 * <p/> 073 * FIXME: notice that the current recover lease implementation is problematic, it can not deal with 074 * the races if there are two master both wants to acquire the lease... 075 * <p/> 076 * In {@link #load(ProcedureLoader)} method, we will load all the active procedures. See the 077 * comments of this method for more details. 078 * <p/> 079 * The actual logging way is a bit like our FileSystem based WAL implementation as RS side. There is 080 * a {@link #slots}, which is more like the ring buffer, and in the insert, update and delete 081 * methods we will put thing into the {@link #slots} and wait. And there is a background sync 082 * thread(see the {@link #syncLoop()} method) which get data from the {@link #slots} and write them 083 * to the FileSystem, and notify the caller that we have finished. 084 * <p/> 085 * TODO: try using disruptor to increase performance and simplify the logic? 086 * <p/> 087 * The {@link #storeTracker} keeps track of the modified procedures in the newest wal file, which is 088 * also the one being written currently. And the deleted bits in it are for all the procedures, not 089 * only the ones in the newest wal file. And when rolling a log, we will first store it in the 090 * trailer of the current wal file, and then reset its modified bits, so that it can start to track 091 * the modified procedures for the new wal file. 092 * <p/> 093 * The {@link #holdingCleanupTracker} is used to test whether we are safe to delete the oldest wal 094 * file. When there are log rolling and there are more than 1 wal files, we will make use of it. It 095 * will first be initialized to the oldest file's tracker(which is stored in the trailer), using the 096 * method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it 097 * with the tracker of every newer wal files, using the 098 * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}. 099 * If we find out 100 * that all the modified procedures for the oldest wal file are modified or deleted in newer wal 101 * files, then we can delete it. This is because that, every time we call 102 * {@link ProcedureStore#insert(Procedure[])} or {@link ProcedureStore#update(Procedure)}, we will 103 * persist the full state of a Procedure, so the earlier wal records for this procedure can all be 104 * deleted. 105 * @see ProcedureWALPrettyPrinter for printing content of a single WAL. 106 * @see #main(String[]) to parse a directory of MasterWALProcs. 107 */ 108@InterfaceAudience.Private 109public class WALProcedureStore extends ProcedureStoreBase { 110 private static final Logger LOG = LoggerFactory.getLogger(WALProcedureStore.class); 111 public static final String LOG_PREFIX = "pv2-"; 112 /** Used to construct the name of the log directory for master procedures */ 113 public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs"; 114 115 116 public interface LeaseRecovery { 117 void recoverFileLease(FileSystem fs, Path path) throws IOException; 118 } 119 120 public static final String WAL_COUNT_WARN_THRESHOLD_CONF_KEY = 121 "hbase.procedure.store.wal.warn.threshold"; 122 private static final int DEFAULT_WAL_COUNT_WARN_THRESHOLD = 10; 123 124 public static final String EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY = 125 "hbase.procedure.store.wal.exec.cleanup.on.load"; 126 private static final boolean DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY = true; 127 128 public static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY = 129 "hbase.procedure.store.wal.max.retries.before.roll"; 130 private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3; 131 132 public static final String WAIT_BEFORE_ROLL_CONF_KEY = 133 "hbase.procedure.store.wal.wait.before.roll"; 134 private static final int DEFAULT_WAIT_BEFORE_ROLL = 500; 135 136 public static final String ROLL_RETRIES_CONF_KEY = 137 "hbase.procedure.store.wal.max.roll.retries"; 138 private static final int DEFAULT_ROLL_RETRIES = 3; 139 140 public static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY = 141 "hbase.procedure.store.wal.sync.failure.roll.max"; 142 private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3; 143 144 public static final String PERIODIC_ROLL_CONF_KEY = 145 "hbase.procedure.store.wal.periodic.roll.msec"; 146 private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000; // 1h 147 148 public static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec"; 149 private static final int DEFAULT_SYNC_WAIT_MSEC = 100; 150 151 public static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync"; 152 private static final boolean DEFAULT_USE_HSYNC = true; 153 154 public static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold"; 155 private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M 156 157 public static final String STORE_WAL_SYNC_STATS_COUNT = 158 "hbase.procedure.store.wal.sync.stats.count"; 159 private static final int DEFAULT_SYNC_STATS_COUNT = 10; 160 161 private final LinkedList<ProcedureWALFile> logs = new LinkedList<>(); 162 private final ProcedureStoreTracker holdingCleanupTracker = new ProcedureStoreTracker(); 163 private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker(); 164 private final ReentrantLock lock = new ReentrantLock(); 165 private final Condition waitCond = lock.newCondition(); 166 private final Condition slotCond = lock.newCondition(); 167 private final Condition syncCond = lock.newCondition(); 168 169 private final LeaseRecovery leaseRecovery; 170 private final Configuration conf; 171 private final FileSystem fs; 172 private final Path walDir; 173 private final Path walArchiveDir; 174 private final boolean enforceStreamCapability; 175 176 private final AtomicReference<Throwable> syncException = new AtomicReference<>(); 177 private final AtomicBoolean loading = new AtomicBoolean(true); 178 private final AtomicBoolean inSync = new AtomicBoolean(false); 179 private final AtomicLong totalSynced = new AtomicLong(0); 180 private final AtomicLong lastRollTs = new AtomicLong(0); 181 private final AtomicLong syncId = new AtomicLong(0); 182 183 private LinkedTransferQueue<ByteSlot> slotsCache = null; 184 private Set<ProcedureWALFile> corruptedLogs = null; 185 private FSDataOutputStream stream = null; 186 private int runningProcCount = 1; 187 private long flushLogId = 0; 188 private int syncMaxSlot = 1; 189 private int slotIndex = 0; 190 private Thread syncThread; 191 private ByteSlot[] slots; 192 193 private int walCountWarnThreshold; 194 private int maxRetriesBeforeRoll; 195 private int maxSyncFailureRoll; 196 private int waitBeforeRoll; 197 private int rollRetries; 198 private int periodicRollMsec; 199 private long rollThreshold; 200 private boolean useHsync; 201 private int syncWaitMsec; 202 203 // Variables used for UI display 204 private CircularFifoQueue<SyncMetrics> syncMetricsQueue; 205 206 public static class SyncMetrics { 207 private long timestamp; 208 private long syncWaitMs; 209 private long totalSyncedBytes; 210 private int syncedEntries; 211 private float syncedPerSec; 212 213 public long getTimestamp() { 214 return timestamp; 215 } 216 217 public long getSyncWaitMs() { 218 return syncWaitMs; 219 } 220 221 public long getTotalSyncedBytes() { 222 return totalSyncedBytes; 223 } 224 225 public long getSyncedEntries() { 226 return syncedEntries; 227 } 228 229 public float getSyncedPerSec() { 230 return syncedPerSec; 231 } 232 } 233 234 public WALProcedureStore(final Configuration conf, final LeaseRecovery leaseRecovery) 235 throws IOException { 236 this(conf, 237 new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR), 238 new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_OLDLOGDIR_NAME), 239 leaseRecovery); 240 } 241 242 @VisibleForTesting 243 public WALProcedureStore(final Configuration conf, final Path walDir, final Path walArchiveDir, 244 final LeaseRecovery leaseRecovery) throws IOException { 245 this.conf = conf; 246 this.leaseRecovery = leaseRecovery; 247 this.walDir = walDir; 248 this.walArchiveDir = walArchiveDir; 249 this.fs = walDir.getFileSystem(conf); 250 this.enforceStreamCapability = conf.getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true); 251 252 // Create the log directory for the procedure store 253 if (!fs.exists(walDir)) { 254 if (!fs.mkdirs(walDir)) { 255 throw new IOException("Unable to mkdir " + walDir); 256 } 257 } 258 // Now that it exists, set the log policy 259 String storagePolicy = 260 conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); 261 CommonFSUtils.setStoragePolicy(fs, walDir, storagePolicy); 262 263 // Create archive dir up front. Rename won't work w/o it up on HDFS. 264 if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) { 265 if (this.fs.mkdirs(this.walArchiveDir)) { 266 LOG.debug("Created Procedure Store WAL archive dir {}", this.walArchiveDir); 267 } else { 268 LOG.warn("Failed create of {}", this.walArchiveDir); 269 } 270 } 271 } 272 273 @Override 274 public void start(int numSlots) throws IOException { 275 if (!setRunning(true)) { 276 return; 277 } 278 279 // Init buffer slots 280 loading.set(true); 281 runningProcCount = numSlots; 282 syncMaxSlot = numSlots; 283 slots = new ByteSlot[numSlots]; 284 slotsCache = new LinkedTransferQueue<>(); 285 while (slotsCache.size() < numSlots) { 286 slotsCache.offer(new ByteSlot()); 287 } 288 289 // Tunings 290 walCountWarnThreshold = 291 conf.getInt(WAL_COUNT_WARN_THRESHOLD_CONF_KEY, DEFAULT_WAL_COUNT_WARN_THRESHOLD); 292 maxRetriesBeforeRoll = 293 conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL); 294 maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL); 295 waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL); 296 rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES); 297 rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD); 298 periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL); 299 syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC); 300 useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC); 301 302 // WebUI 303 syncMetricsQueue = new CircularFifoQueue<>( 304 conf.getInt(STORE_WAL_SYNC_STATS_COUNT, DEFAULT_SYNC_STATS_COUNT)); 305 306 // Init sync thread 307 syncThread = new Thread("WALProcedureStoreSyncThread") { 308 @Override 309 public void run() { 310 try { 311 syncLoop(); 312 } catch (Throwable e) { 313 LOG.error("Got an exception from the sync-loop", e); 314 if (!isSyncAborted()) { 315 sendAbortProcessSignal(); 316 } 317 } 318 } 319 }; 320 syncThread.start(); 321 } 322 323 @Override 324 public void stop(final boolean abort) { 325 if (!setRunning(false)) { 326 return; 327 } 328 329 LOG.info("Stopping the WAL Procedure Store, isAbort=" + abort + 330 (isSyncAborted() ? " (self aborting)" : "")); 331 sendStopSignal(); 332 if (!isSyncAborted()) { 333 try { 334 while (syncThread.isAlive()) { 335 sendStopSignal(); 336 syncThread.join(250); 337 } 338 } catch (InterruptedException e) { 339 LOG.warn("join interrupted", e); 340 Thread.currentThread().interrupt(); 341 } 342 } 343 344 // Close the writer 345 closeCurrentLogStream(abort); 346 347 // Close the old logs 348 // they should be already closed, this is just in case the load fails 349 // and we call start() and then stop() 350 for (ProcedureWALFile log: logs) { 351 log.close(); 352 } 353 logs.clear(); 354 loading.set(true); 355 } 356 357 private void sendStopSignal() { 358 if (lock.tryLock()) { 359 try { 360 waitCond.signalAll(); 361 syncCond.signalAll(); 362 } finally { 363 lock.unlock(); 364 } 365 } 366 } 367 368 @Override 369 public int getNumThreads() { 370 return slots == null ? 0 : slots.length; 371 } 372 373 @Override 374 public int setRunningProcedureCount(final int count) { 375 this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length; 376 return this.runningProcCount; 377 } 378 379 public ProcedureStoreTracker getStoreTracker() { 380 return storeTracker; 381 } 382 383 public ArrayList<ProcedureWALFile> getActiveLogs() { 384 lock.lock(); 385 try { 386 return new ArrayList<>(logs); 387 } finally { 388 lock.unlock(); 389 } 390 } 391 392 public Set<ProcedureWALFile> getCorruptedLogs() { 393 return corruptedLogs; 394 } 395 396 @Override 397 public void recoverLease() throws IOException { 398 lock.lock(); 399 try { 400 LOG.debug("Starting WAL Procedure Store lease recovery"); 401 boolean afterFirstAttempt = false; 402 while (isRunning()) { 403 // Don't sleep before first attempt 404 if (afterFirstAttempt) { 405 LOG.trace("Sleep {} ms after first lease recovery attempt.", 406 waitBeforeRoll); 407 Threads.sleepWithoutInterrupt(waitBeforeRoll); 408 } else { 409 afterFirstAttempt = true; 410 } 411 FileStatus[] oldLogs = getLogFiles(); 412 // Get Log-MaxID and recover lease on old logs 413 try { 414 flushLogId = initOldLogs(oldLogs); 415 } catch (FileNotFoundException e) { 416 LOG.warn("Someone else is active and deleted logs. retrying.", e); 417 continue; 418 } 419 420 // Create new state-log 421 if (!rollWriter(flushLogId + 1)) { 422 // someone else has already created this log 423 LOG.debug("Someone else has already created log {}. Retrying.", flushLogId); 424 continue; 425 } 426 427 // We have the lease on the log 428 oldLogs = getLogFiles(); 429 if (getMaxLogId(oldLogs) > flushLogId) { 430 LOG.debug("Someone else created new logs. Expected maxLogId < {}", flushLogId); 431 logs.getLast().removeFile(this.walArchiveDir); 432 continue; 433 } 434 435 LOG.debug("Lease acquired for flushLogId={}", flushLogId); 436 break; 437 } 438 } finally { 439 lock.unlock(); 440 } 441 } 442 443 @Override 444 public void load(ProcedureLoader loader) throws IOException { 445 lock.lock(); 446 try { 447 if (logs.isEmpty()) { 448 throw new IllegalStateException("recoverLease() must be called before loading data"); 449 } 450 451 // Nothing to do, If we have only the current log. 452 if (logs.size() == 1) { 453 LOG.debug("No state logs to replay."); 454 loader.setMaxProcId(0); 455 loading.set(false); 456 return; 457 } 458 459 // Load the old logs 460 Iterator<ProcedureWALFile> it = logs.descendingIterator(); 461 it.next(); // Skip the current log 462 463 ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() { 464 @Override 465 public void setMaxProcId(long maxProcId) { 466 loader.setMaxProcId(maxProcId); 467 } 468 469 @Override 470 public void load(ProcedureIterator procIter) throws IOException { 471 loader.load(procIter); 472 } 473 474 @Override 475 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 476 loader.handleCorrupted(procIter); 477 } 478 479 @Override 480 public void markCorruptedWAL(ProcedureWALFile log, IOException e) { 481 if (corruptedLogs == null) { 482 corruptedLogs = new HashSet<>(); 483 } 484 corruptedLogs.add(log); 485 // TODO: sideline corrupted log 486 } 487 }); 488 // if we fail when loading, we should prevent persisting the storeTracker later in the stop 489 // method. As it may happen that, we have finished constructing the modified and deleted bits, 490 // but before we call resetModified, we fail, then if we persist the storeTracker then when 491 // restarting, we will consider that all procedures have been included in this file and delete 492 // all the previous files. Obviously this not correct. So here we will only set loading to 493 // false when we successfully loaded all the procedures, and when closing we will skip 494 // persisting the store tracker. And also, this will prevent the sync thread to do 495 // periodicRoll, where we may also clean old logs. 496 loading.set(false); 497 // try to cleanup inactive wals and complete the operation 498 buildHoldingCleanupTracker(); 499 tryCleanupLogsOnLoad(); 500 } finally { 501 lock.unlock(); 502 } 503 } 504 505 private void tryCleanupLogsOnLoad() { 506 // nothing to cleanup. 507 if (logs.size() <= 1) { 508 return; 509 } 510 511 // the config says to not cleanup wals on load. 512 if (!conf.getBoolean(EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, 513 DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY)) { 514 LOG.debug("WALs cleanup on load is not enabled: " + getActiveLogs()); 515 return; 516 } 517 518 try { 519 periodicRoll(); 520 } catch (IOException e) { 521 LOG.warn("Unable to cleanup logs on load: " + e.getMessage(), e); 522 } 523 } 524 525 @Override 526 public void insert(Procedure<?> proc, Procedure<?>[] subprocs) { 527 if (LOG.isTraceEnabled()) { 528 LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs)); 529 } 530 531 ByteSlot slot = acquireSlot(); 532 try { 533 // Serialize the insert 534 long[] subProcIds = null; 535 if (subprocs != null) { 536 ProcedureWALFormat.writeInsert(slot, proc, subprocs); 537 subProcIds = new long[subprocs.length]; 538 for (int i = 0; i < subprocs.length; ++i) { 539 subProcIds[i] = subprocs[i].getProcId(); 540 } 541 } else { 542 assert !proc.hasParent(); 543 ProcedureWALFormat.writeInsert(slot, proc); 544 } 545 546 // Push the transaction data and wait until it is persisted 547 pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds); 548 } catch (IOException e) { 549 // We are not able to serialize the procedure. 550 // this is a code error, and we are not able to go on. 551 LOG.error(HBaseMarkers.FATAL, "Unable to serialize one of the procedure: proc=" + 552 proc + ", subprocs=" + Arrays.toString(subprocs), e); 553 throw new RuntimeException(e); 554 } finally { 555 releaseSlot(slot); 556 } 557 } 558 559 @Override 560 public void insert(Procedure<?>[] procs) { 561 if (LOG.isTraceEnabled()) { 562 LOG.trace("Insert " + Arrays.toString(procs)); 563 } 564 565 ByteSlot slot = acquireSlot(); 566 try { 567 // Serialize the insert 568 long[] procIds = new long[procs.length]; 569 for (int i = 0; i < procs.length; ++i) { 570 assert !procs[i].hasParent(); 571 procIds[i] = procs[i].getProcId(); 572 ProcedureWALFormat.writeInsert(slot, procs[i]); 573 } 574 575 // Push the transaction data and wait until it is persisted 576 pushData(PushType.INSERT, slot, Procedure.NO_PROC_ID, procIds); 577 } catch (IOException e) { 578 // We are not able to serialize the procedure. 579 // this is a code error, and we are not able to go on. 580 LOG.error(HBaseMarkers.FATAL, "Unable to serialize one of the procedure: " + 581 Arrays.toString(procs), e); 582 throw new RuntimeException(e); 583 } finally { 584 releaseSlot(slot); 585 } 586 } 587 588 @Override 589 public void update(Procedure<?> proc) { 590 if (LOG.isTraceEnabled()) { 591 LOG.trace("Update " + proc); 592 } 593 594 ByteSlot slot = acquireSlot(); 595 try { 596 // Serialize the update 597 ProcedureWALFormat.writeUpdate(slot, proc); 598 599 // Push the transaction data and wait until it is persisted 600 pushData(PushType.UPDATE, slot, proc.getProcId(), null); 601 } catch (IOException e) { 602 // We are not able to serialize the procedure. 603 // this is a code error, and we are not able to go on. 604 LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + proc, e); 605 throw new RuntimeException(e); 606 } finally { 607 releaseSlot(slot); 608 } 609 } 610 611 @Override 612 public void delete(long procId) { 613 LOG.trace("Delete {}", procId); 614 ByteSlot slot = acquireSlot(); 615 try { 616 // Serialize the delete 617 ProcedureWALFormat.writeDelete(slot, procId); 618 619 // Push the transaction data and wait until it is persisted 620 pushData(PushType.DELETE, slot, procId, null); 621 } catch (IOException e) { 622 // We are not able to serialize the procedure. 623 // this is a code error, and we are not able to go on. 624 LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + procId, e); 625 throw new RuntimeException(e); 626 } finally { 627 releaseSlot(slot); 628 } 629 } 630 631 @Override 632 public void delete(Procedure<?> proc, long[] subProcIds) { 633 assert proc != null : "expected a non-null procedure"; 634 assert subProcIds != null && subProcIds.length > 0 : "expected subProcIds"; 635 if (LOG.isTraceEnabled()) { 636 LOG.trace("Update " + proc + " and Delete " + Arrays.toString(subProcIds)); 637 } 638 639 ByteSlot slot = acquireSlot(); 640 try { 641 // Serialize the delete 642 ProcedureWALFormat.writeDelete(slot, proc, subProcIds); 643 644 // Push the transaction data and wait until it is persisted 645 pushData(PushType.DELETE, slot, proc.getProcId(), subProcIds); 646 } catch (IOException e) { 647 // We are not able to serialize the procedure. 648 // this is a code error, and we are not able to go on. 649 LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + proc, e); 650 throw new RuntimeException(e); 651 } finally { 652 releaseSlot(slot); 653 } 654 } 655 656 @Override 657 public void delete(final long[] procIds, final int offset, final int count) { 658 if (count == 0) return; 659 if (offset == 0 && count == procIds.length) { 660 delete(procIds); 661 } else if (count == 1) { 662 delete(procIds[offset]); 663 } else { 664 delete(Arrays.copyOfRange(procIds, offset, offset + count)); 665 } 666 } 667 668 private void delete(long[] procIds) { 669 if (LOG.isTraceEnabled()) { 670 LOG.trace("Delete " + Arrays.toString(procIds)); 671 } 672 673 final ByteSlot slot = acquireSlot(); 674 try { 675 // Serialize the delete 676 for (int i = 0; i < procIds.length; ++i) { 677 ProcedureWALFormat.writeDelete(slot, procIds[i]); 678 } 679 680 // Push the transaction data and wait until it is persisted 681 pushData(PushType.DELETE, slot, Procedure.NO_PROC_ID, procIds); 682 } catch (IOException e) { 683 // We are not able to serialize the procedure. 684 // this is a code error, and we are not able to go on. 685 LOG.error("Unable to serialize the procedures: " + Arrays.toString(procIds), e); 686 throw new RuntimeException(e); 687 } finally { 688 releaseSlot(slot); 689 } 690 } 691 692 private ByteSlot acquireSlot() { 693 ByteSlot slot = slotsCache.poll(); 694 return slot != null ? slot : new ByteSlot(); 695 } 696 697 private void releaseSlot(final ByteSlot slot) { 698 slot.reset(); 699 slotsCache.offer(slot); 700 } 701 702 private enum PushType { INSERT, UPDATE, DELETE }; 703 704 private long pushData(final PushType type, final ByteSlot slot, 705 final long procId, final long[] subProcIds) { 706 if (!isRunning()) { 707 throw new RuntimeException("the store must be running before inserting data"); 708 } 709 if (logs.isEmpty()) { 710 throw new RuntimeException("recoverLease() must be called before inserting data"); 711 } 712 713 long logId = -1; 714 lock.lock(); 715 try { 716 // Wait for the sync to be completed 717 while (true) { 718 if (!isRunning()) { 719 throw new RuntimeException("store no longer running"); 720 } else if (isSyncAborted()) { 721 throw new RuntimeException("sync aborted", syncException.get()); 722 } else if (inSync.get()) { 723 syncCond.await(); 724 } else if (slotIndex >= syncMaxSlot) { 725 slotCond.signal(); 726 syncCond.await(); 727 } else { 728 break; 729 } 730 } 731 732 final long pushSyncId = syncId.get(); 733 updateStoreTracker(type, procId, subProcIds); 734 slots[slotIndex++] = slot; 735 logId = flushLogId; 736 737 // Notify that there is new data 738 if (slotIndex == 1) { 739 waitCond.signal(); 740 } 741 742 // Notify that the slots are full 743 if (slotIndex == syncMaxSlot) { 744 waitCond.signal(); 745 slotCond.signal(); 746 } 747 748 while (pushSyncId == syncId.get() && isRunning()) { 749 syncCond.await(); 750 } 751 } catch (InterruptedException e) { 752 Thread.currentThread().interrupt(); 753 sendAbortProcessSignal(); 754 throw new RuntimeException(e); 755 } finally { 756 lock.unlock(); 757 if (isSyncAborted()) { 758 throw new RuntimeException("sync aborted", syncException.get()); 759 } 760 } 761 return logId; 762 } 763 764 private void updateStoreTracker(final PushType type, 765 final long procId, final long[] subProcIds) { 766 switch (type) { 767 case INSERT: 768 if (subProcIds == null) { 769 storeTracker.insert(procId); 770 } else if (procId == Procedure.NO_PROC_ID) { 771 storeTracker.insert(subProcIds); 772 } else { 773 storeTracker.insert(procId, subProcIds); 774 holdingCleanupTracker.setDeletedIfModified(procId); 775 } 776 break; 777 case UPDATE: 778 storeTracker.update(procId); 779 holdingCleanupTracker.setDeletedIfModified(procId); 780 break; 781 case DELETE: 782 if (subProcIds != null && subProcIds.length > 0) { 783 storeTracker.delete(subProcIds); 784 holdingCleanupTracker.setDeletedIfModified(subProcIds); 785 } else { 786 storeTracker.delete(procId); 787 holdingCleanupTracker.setDeletedIfModified(procId); 788 } 789 break; 790 default: 791 throw new RuntimeException("invalid push type " + type); 792 } 793 } 794 795 private boolean isSyncAborted() { 796 return syncException.get() != null; 797 } 798 799 private void syncLoop() throws Throwable { 800 long totalSyncedToStore = 0; 801 inSync.set(false); 802 lock.lock(); 803 try { 804 while (isRunning()) { 805 try { 806 // Wait until new data is available 807 if (slotIndex == 0) { 808 if (!loading.get()) { 809 periodicRoll(); 810 } 811 812 if (LOG.isTraceEnabled()) { 813 float rollTsSec = getMillisFromLastRoll() / 1000.0f; 814 LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)", 815 StringUtils.humanSize(totalSynced.get()), 816 StringUtils.humanSize(totalSynced.get() / rollTsSec))); 817 } 818 819 waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS); 820 if (slotIndex == 0) { 821 // no data.. probably a stop() or a periodic roll 822 continue; 823 } 824 } 825 // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing 826 syncMaxSlot = runningProcCount; 827 assert syncMaxSlot > 0 : "unexpected syncMaxSlot=" + syncMaxSlot; 828 final long syncWaitSt = System.currentTimeMillis(); 829 if (slotIndex != syncMaxSlot) { 830 slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS); 831 } 832 833 final long currentTs = System.currentTimeMillis(); 834 final long syncWaitMs = currentTs - syncWaitSt; 835 final float rollSec = getMillisFromLastRoll() / 1000.0f; 836 final float syncedPerSec = totalSyncedToStore / rollSec; 837 if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < syncMaxSlot)) { 838 LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)", 839 StringUtils.humanTimeDiff(syncWaitMs), slotIndex, 840 StringUtils.humanSize(totalSyncedToStore), 841 StringUtils.humanSize(syncedPerSec))); 842 } 843 844 // update webui circular buffers (TODO: get rid of allocations) 845 final SyncMetrics syncMetrics = new SyncMetrics(); 846 syncMetrics.timestamp = currentTs; 847 syncMetrics.syncWaitMs = syncWaitMs; 848 syncMetrics.syncedEntries = slotIndex; 849 syncMetrics.totalSyncedBytes = totalSyncedToStore; 850 syncMetrics.syncedPerSec = syncedPerSec; 851 syncMetricsQueue.add(syncMetrics); 852 853 // sync 854 inSync.set(true); 855 long slotSize = syncSlots(); 856 logs.getLast().addToSize(slotSize); 857 totalSyncedToStore = totalSynced.addAndGet(slotSize); 858 slotIndex = 0; 859 inSync.set(false); 860 syncId.incrementAndGet(); 861 } catch (InterruptedException e) { 862 Thread.currentThread().interrupt(); 863 syncException.compareAndSet(null, e); 864 sendAbortProcessSignal(); 865 throw e; 866 } catch (Throwable t) { 867 syncException.compareAndSet(null, t); 868 sendAbortProcessSignal(); 869 throw t; 870 } finally { 871 syncCond.signalAll(); 872 } 873 } 874 } finally { 875 lock.unlock(); 876 } 877 } 878 879 public ArrayList<SyncMetrics> getSyncMetrics() { 880 lock.lock(); 881 try { 882 return new ArrayList<>(syncMetricsQueue); 883 } finally { 884 lock.unlock(); 885 } 886 } 887 888 private long syncSlots() throws Throwable { 889 int retry = 0; 890 int logRolled = 0; 891 long totalSynced = 0; 892 do { 893 try { 894 totalSynced = syncSlots(stream, slots, 0, slotIndex); 895 break; 896 } catch (Throwable e) { 897 LOG.warn("unable to sync slots, retry=" + retry); 898 if (++retry >= maxRetriesBeforeRoll) { 899 if (logRolled >= maxSyncFailureRoll && isRunning()) { 900 LOG.error("Sync slots after log roll failed, abort.", e); 901 throw e; 902 } 903 904 if (!rollWriterWithRetries()) { 905 throw e; 906 } 907 908 logRolled++; 909 retry = 0; 910 } 911 } 912 } while (isRunning()); 913 return totalSynced; 914 } 915 916 protected long syncSlots(final FSDataOutputStream stream, final ByteSlot[] slots, 917 final int offset, final int count) throws IOException { 918 long totalSynced = 0; 919 for (int i = 0; i < count; ++i) { 920 final ByteSlot data = slots[offset + i]; 921 data.writeTo(stream); 922 totalSynced += data.size(); 923 } 924 925 syncStream(stream); 926 sendPostSyncSignal(); 927 928 if (LOG.isTraceEnabled()) { 929 LOG.trace("Sync slots=" + count + '/' + syncMaxSlot + 930 ", flushed=" + StringUtils.humanSize(totalSynced)); 931 } 932 return totalSynced; 933 } 934 935 protected void syncStream(final FSDataOutputStream stream) throws IOException { 936 if (useHsync) { 937 stream.hsync(); 938 } else { 939 stream.hflush(); 940 } 941 } 942 943 private boolean rollWriterWithRetries() { 944 for (int i = 0; i < rollRetries && isRunning(); ++i) { 945 if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i); 946 947 try { 948 if (rollWriter()) { 949 return true; 950 } 951 } catch (IOException e) { 952 LOG.warn("Unable to roll the log, attempt=" + (i + 1), e); 953 } 954 } 955 LOG.error(HBaseMarkers.FATAL, "Unable to roll the log"); 956 return false; 957 } 958 959 private boolean tryRollWriter() { 960 try { 961 return rollWriter(); 962 } catch (IOException e) { 963 LOG.warn("Unable to roll the log", e); 964 return false; 965 } 966 } 967 968 public long getMillisToNextPeriodicRoll() { 969 if (lastRollTs.get() > 0 && periodicRollMsec > 0) { 970 return periodicRollMsec - getMillisFromLastRoll(); 971 } 972 return Long.MAX_VALUE; 973 } 974 975 public long getMillisFromLastRoll() { 976 return (System.currentTimeMillis() - lastRollTs.get()); 977 } 978 979 @VisibleForTesting 980 void periodicRollForTesting() throws IOException { 981 lock.lock(); 982 try { 983 periodicRoll(); 984 } finally { 985 lock.unlock(); 986 } 987 } 988 989 @VisibleForTesting 990 public boolean rollWriterForTesting() throws IOException { 991 lock.lock(); 992 try { 993 return rollWriter(); 994 } finally { 995 lock.unlock(); 996 } 997 } 998 999 @VisibleForTesting 1000 void removeInactiveLogsForTesting() throws Exception { 1001 lock.lock(); 1002 try { 1003 removeInactiveLogs(); 1004 } finally { 1005 lock.unlock(); 1006 } 1007 } 1008 1009 private void periodicRoll() throws IOException { 1010 if (storeTracker.isEmpty()) { 1011 LOG.trace("no active procedures"); 1012 tryRollWriter(); 1013 removeAllLogs(flushLogId - 1, "no active procedures"); 1014 } else { 1015 if (storeTracker.isAllModified()) { 1016 LOG.trace("all the active procedures are in the latest log"); 1017 removeAllLogs(flushLogId - 1, "all the active procedures are in the latest log"); 1018 } 1019 1020 // if the log size has exceeded the roll threshold 1021 // or the periodic roll timeout is expired, try to roll the wal. 1022 if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) { 1023 tryRollWriter(); 1024 } 1025 1026 removeInactiveLogs(); 1027 } 1028 } 1029 1030 private boolean rollWriter() throws IOException { 1031 if (!isRunning()) { 1032 return false; 1033 } 1034 1035 // Create new state-log 1036 if (!rollWriter(flushLogId + 1)) { 1037 LOG.warn("someone else has already created log {}", flushLogId); 1038 return false; 1039 } 1040 1041 // We have the lease on the log, 1042 // but we should check if someone else has created new files 1043 if (getMaxLogId(getLogFiles()) > flushLogId) { 1044 LOG.warn("Someone else created new logs. Expected maxLogId < {}", flushLogId); 1045 logs.getLast().removeFile(this.walArchiveDir); 1046 return false; 1047 } 1048 1049 // We have the lease on the log 1050 return true; 1051 } 1052 1053 @VisibleForTesting 1054 boolean rollWriter(long logId) throws IOException { 1055 assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId; 1056 assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked(); 1057 1058 ProcedureWALHeader header = ProcedureWALHeader.newBuilder() 1059 .setVersion(ProcedureWALFormat.HEADER_VERSION) 1060 .setType(ProcedureWALFormat.LOG_TYPE_STREAM) 1061 .setMinProcId(storeTracker.getActiveMinProcId()) 1062 .setLogId(logId) 1063 .build(); 1064 1065 FSDataOutputStream newStream = null; 1066 Path newLogFile = null; 1067 long startPos = -1; 1068 newLogFile = getLogFilePath(logId); 1069 try { 1070 newStream = CommonFSUtils.createForWal(fs, newLogFile, false); 1071 } catch (FileAlreadyExistsException e) { 1072 LOG.error("Log file with id={} already exists", logId, e); 1073 return false; 1074 } catch (RemoteException re) { 1075 LOG.warn("failed to create log file with id={}", logId, re); 1076 return false; 1077 } 1078 // After we create the stream but before we attempt to use it at all 1079 // ensure that we can provide the level of data safety we're configured 1080 // to provide. 1081 final String durability = useHsync ? "hsync" : "hflush"; 1082 if (enforceStreamCapability && !(CommonFSUtils.hasCapability(newStream, durability))) { 1083 throw new IllegalStateException("The procedure WAL relies on the ability to " + durability + 1084 " for proper operation during component failures, but the underlying filesystem does " + 1085 "not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY + 1086 "' to set the desired level of robustness and ensure the config value of '" + 1087 CommonFSUtils.HBASE_WAL_DIR + "' points to a FileSystem mount that can provide it."); 1088 } 1089 try { 1090 ProcedureWALFormat.writeHeader(newStream, header); 1091 startPos = newStream.getPos(); 1092 } catch (IOException ioe) { 1093 LOG.warn("Encountered exception writing header", ioe); 1094 newStream.close(); 1095 return false; 1096 } 1097 1098 closeCurrentLogStream(false); 1099 1100 storeTracker.resetModified(); 1101 stream = newStream; 1102 flushLogId = logId; 1103 totalSynced.set(0); 1104 long rollTs = System.currentTimeMillis(); 1105 lastRollTs.set(rollTs); 1106 logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos, rollTs)); 1107 1108 // if it's the first next WAL being added, build the holding cleanup tracker 1109 if (logs.size() == 2) { 1110 buildHoldingCleanupTracker(); 1111 } else if (logs.size() > walCountWarnThreshold) { 1112 LOG.warn("procedure WALs count={} above the warning threshold {}. check running procedures" + 1113 " to see if something is stuck.", logs.size(), walCountWarnThreshold); 1114 // This is just like what we have done at RS side when there are too many wal files. For RS, 1115 // if there are too many wal files, we will find out the wal entries in the oldest file, and 1116 // tell the upper layer to flush these regions so the wal entries will be useless and then we 1117 // can delete the wal file. For WALProcedureStore, the assumption is that, if all the 1118 // procedures recorded in a proc wal file are modified or deleted in a new proc wal file, then 1119 // we are safe to delete it. So here if there are too many proc wal files, we will find out 1120 // the procedure ids in the oldest file, which are neither modified nor deleted in newer proc 1121 // wal files, and tell upper layer to update the state of these procedures to the newest proc 1122 // wal file(by calling ProcedureStore.update), then we are safe to delete the oldest proc wal 1123 // file. 1124 sendForceUpdateSignal(holdingCleanupTracker.getAllActiveProcIds()); 1125 } 1126 1127 LOG.info("Rolled new Procedure Store WAL, id={}", logId); 1128 return true; 1129 } 1130 1131 private void closeCurrentLogStream(boolean abort) { 1132 if (stream == null || logs.isEmpty()) { 1133 return; 1134 } 1135 1136 try { 1137 ProcedureWALFile log = logs.getLast(); 1138 // If the loading flag is true, it usually means that we fail when loading procedures, so we 1139 // should not persist the store tracker, as its state may not be correct. 1140 if (!loading.get()) { 1141 log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId()); 1142 log.updateLocalTracker(storeTracker); 1143 if (!abort) { 1144 long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker); 1145 log.addToSize(trailerSize); 1146 } 1147 } 1148 } catch (IOException e) { 1149 LOG.warn("Unable to write the trailer", e); 1150 } 1151 try { 1152 stream.close(); 1153 } catch (IOException e) { 1154 LOG.error("Unable to close the stream", e); 1155 } 1156 stream = null; 1157 } 1158 1159 // ========================================================================== 1160 // Log Files cleaner helpers 1161 // ========================================================================== 1162 private void removeInactiveLogs() throws IOException { 1163 // We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'. 1164 // once there is nothing olding the oldest WAL we can remove it. 1165 while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) { 1166 LOG.info("Remove the oldest log {}", logs.getFirst()); 1167 removeLogFile(logs.getFirst(), walArchiveDir); 1168 buildHoldingCleanupTracker(); 1169 } 1170 1171 // TODO: In case we are holding up a lot of logs for long time we should 1172 // rewrite old procedures (in theory parent procs) to the new WAL. 1173 } 1174 1175 private void buildHoldingCleanupTracker() { 1176 if (logs.size() <= 1) { 1177 // we only have one wal, so nothing to do 1178 holdingCleanupTracker.reset(); 1179 return; 1180 } 1181 1182 // compute the holding tracker. 1183 // - the first WAL is used for the 'updates' 1184 // - the global tracker will be used to determine whether a procedure has been deleted 1185 // - other trackers will be used to determine whether a procedure has been updated, as a deleted 1186 // procedure can always be detected by checking the global tracker, we can save the deleted 1187 // checks when applying other trackers 1188 holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true); 1189 holdingCleanupTracker.setDeletedIfDeletedByThem(storeTracker); 1190 // the logs is a linked list, so avoid calling get(index) on it. 1191 Iterator<ProcedureWALFile> iter = logs.iterator(); 1192 // skip the tracker for the first file when creating the iterator. 1193 iter.next(); 1194 ProcedureStoreTracker tracker = iter.next().getTracker(); 1195 // testing iter.hasNext after calling iter.next to skip applying the tracker for last file, 1196 // which is just the storeTracker above. 1197 while (iter.hasNext()) { 1198 holdingCleanupTracker.setDeletedIfModifiedInBoth(tracker); 1199 if (holdingCleanupTracker.isEmpty()) { 1200 break; 1201 } 1202 tracker = iter.next().getTracker(); 1203 } 1204 } 1205 1206 /** 1207 * Remove all logs with logId <= {@code lastLogId}. 1208 */ 1209 private void removeAllLogs(long lastLogId, String why) { 1210 if (logs.size() <= 1) { 1211 return; 1212 } 1213 1214 LOG.info("Remove all state logs with ID less than {}, since {}", lastLogId, why); 1215 1216 boolean removed = false; 1217 while (logs.size() > 1) { 1218 ProcedureWALFile log = logs.getFirst(); 1219 if (lastLogId < log.getLogId()) { 1220 break; 1221 } 1222 removeLogFile(log, walArchiveDir); 1223 removed = true; 1224 } 1225 1226 if (removed) { 1227 buildHoldingCleanupTracker(); 1228 } 1229 } 1230 1231 private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) { 1232 try { 1233 LOG.trace("Removing log={}", log); 1234 log.removeFile(walArchiveDir); 1235 logs.remove(log); 1236 LOG.debug("Removed log={}, activeLogs={}", log, logs); 1237 assert logs.size() > 0 : "expected at least one log"; 1238 } catch (IOException e) { 1239 LOG.error("Unable to remove log: " + log, e); 1240 return false; 1241 } 1242 return true; 1243 } 1244 1245 // ========================================================================== 1246 // FileSystem Log Files helpers 1247 // ========================================================================== 1248 public Path getWALDir() { 1249 return this.walDir; 1250 } 1251 1252 @VisibleForTesting 1253 Path getWalArchiveDir() { 1254 return this.walArchiveDir; 1255 } 1256 1257 public FileSystem getFileSystem() { 1258 return this.fs; 1259 } 1260 1261 protected Path getLogFilePath(final long logId) throws IOException { 1262 return new Path(walDir, String.format(LOG_PREFIX + "%020d.log", logId)); 1263 } 1264 1265 private static long getLogIdFromName(final String name) { 1266 int end = name.lastIndexOf(".log"); 1267 int start = name.lastIndexOf('-') + 1; 1268 return Long.parseLong(name.substring(start, end)); 1269 } 1270 1271 private static final PathFilter WALS_PATH_FILTER = new PathFilter() { 1272 @Override 1273 public boolean accept(Path path) { 1274 String name = path.getName(); 1275 return name.startsWith(LOG_PREFIX) && name.endsWith(".log"); 1276 } 1277 }; 1278 1279 private static final Comparator<FileStatus> FILE_STATUS_ID_COMPARATOR = 1280 new Comparator<FileStatus>() { 1281 @Override 1282 public int compare(FileStatus a, FileStatus b) { 1283 final long aId = getLogIdFromName(a.getPath().getName()); 1284 final long bId = getLogIdFromName(b.getPath().getName()); 1285 return Long.compare(aId, bId); 1286 } 1287 }; 1288 1289 private FileStatus[] getLogFiles() throws IOException { 1290 try { 1291 FileStatus[] files = fs.listStatus(walDir, WALS_PATH_FILTER); 1292 Arrays.sort(files, FILE_STATUS_ID_COMPARATOR); 1293 return files; 1294 } catch (FileNotFoundException e) { 1295 LOG.warn("Log directory not found: " + e.getMessage()); 1296 return null; 1297 } 1298 } 1299 1300 /** 1301 * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort 1302 * the file set by log id. 1303 * @return Max-LogID of the specified log file set 1304 */ 1305 private static long getMaxLogId(FileStatus[] logFiles) { 1306 if (logFiles == null || logFiles.length == 0) { 1307 return 0L; 1308 } 1309 return getLogIdFromName(logFiles[logFiles.length - 1].getPath().getName()); 1310 } 1311 1312 /** 1313 * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort 1314 * the file set by log id. 1315 * @return Max-LogID of the specified log file set 1316 */ 1317 private long initOldLogs(FileStatus[] logFiles) throws IOException { 1318 if (logFiles == null || logFiles.length == 0) { 1319 return 0L; 1320 } 1321 long maxLogId = 0; 1322 for (int i = 0; i < logFiles.length; ++i) { 1323 final Path logPath = logFiles[i].getPath(); 1324 leaseRecovery.recoverFileLease(fs, logPath); 1325 if (!isRunning()) { 1326 throw new IOException("wal aborting"); 1327 } 1328 1329 maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName())); 1330 ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir); 1331 if (log != null) { 1332 this.logs.add(log); 1333 } 1334 } 1335 initTrackerFromOldLogs(); 1336 return maxLogId; 1337 } 1338 1339 /** 1340 * If last log's tracker is not null, use it as {@link #storeTracker}. Otherwise, set storeTracker 1341 * as partial, and let {@link ProcedureWALFormatReader} rebuild it using entries in the log. 1342 */ 1343 private void initTrackerFromOldLogs() { 1344 if (logs.isEmpty() || !isRunning()) { 1345 return; 1346 } 1347 ProcedureWALFile log = logs.getLast(); 1348 if (!log.getTracker().isPartial()) { 1349 storeTracker.resetTo(log.getTracker()); 1350 } else { 1351 storeTracker.reset(); 1352 storeTracker.setPartialFlag(true); 1353 } 1354 } 1355 1356 /** 1357 * Loads given log file and it's tracker. 1358 */ 1359 private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir) 1360 throws IOException { 1361 final ProcedureWALFile log = new ProcedureWALFile(fs, logFile); 1362 if (logFile.getLen() == 0) { 1363 LOG.warn("Remove uninitialized log: {}", logFile); 1364 log.removeFile(walArchiveDir); 1365 return null; 1366 } 1367 LOG.debug("Opening Pv2 {}", logFile); 1368 try { 1369 log.open(); 1370 } catch (ProcedureWALFormat.InvalidWALDataException e) { 1371 LOG.warn("Remove uninitialized log: {}", logFile, e); 1372 log.removeFile(walArchiveDir); 1373 return null; 1374 } catch (IOException e) { 1375 String msg = "Unable to read state log: " + logFile; 1376 LOG.error(msg, e); 1377 throw new IOException(msg, e); 1378 } 1379 1380 try { 1381 log.readTracker(); 1382 } catch (IOException e) { 1383 log.getTracker().reset(); 1384 log.getTracker().setPartialFlag(true); 1385 LOG.warn("Unable to read tracker for {}", log, e); 1386 } 1387 1388 log.close(); 1389 return log; 1390 } 1391}