001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Comparator; 025import java.util.HashSet; 026import java.util.Iterator; 027import java.util.LinkedList; 028import java.util.Set; 029import java.util.concurrent.LinkedTransferQueue; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.atomic.AtomicLong; 033import java.util.concurrent.atomic.AtomicReference; 034import java.util.concurrent.locks.Condition; 035import java.util.concurrent.locks.ReentrantLock; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FSDataOutputStream; 038import org.apache.hadoop.fs.FileAlreadyExistsException; 039import org.apache.hadoop.fs.FileStatus; 040import org.apache.hadoop.fs.FileSystem; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.fs.PathFilter; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.log.HBaseMarkers; 046import org.apache.hadoop.hbase.procedure2.Procedure; 047import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 048import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 049import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase; 050import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; 051import org.apache.hadoop.hbase.procedure2.util.ByteSlot; 052import org.apache.hadoop.hbase.procedure2.util.StringUtils; 053import org.apache.hadoop.hbase.util.CommonFSUtils; 054import org.apache.hadoop.hbase.util.Threads; 055import org.apache.hadoop.ipc.RemoteException; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 061import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue; 062 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; 064 065/** 066 * WAL implementation of the ProcedureStore. 067 * <p/> 068 * When starting, the upper layer will first call {@link #start(int)}, then {@link #recoverLease()}, 069 * then {@link #load(ProcedureLoader)}. 070 * <p/> 071 * In {@link #recoverLease()}, we will get the lease by closing all the existing wal files(by 072 * calling recoverFileLease), and creating a new wal writer. And we will also get the list of all 073 * the old wal files. 074 * <p/> 075 * FIXME: notice that the current recover lease implementation is problematic, it can not deal with 076 * the races if there are two master both wants to acquire the lease... 077 * <p/> 078 * In {@link #load(ProcedureLoader)} method, we will load all the active procedures. See the 079 * comments of this method for more details. 080 * <p/> 081 * The actual logging way is a bit like our FileSystem based WAL implementation as RS side. There is 082 * a {@link #slots}, which is more like the ring buffer, and in the insert, update and delete 083 * methods we will put thing into the {@link #slots} and wait. And there is a background sync 084 * thread(see the {@link #syncLoop()} method) which get data from the {@link #slots} and write them 085 * to the FileSystem, and notify the caller that we have finished. 086 * <p/> 087 * TODO: try using disruptor to increase performance and simplify the logic? 088 * <p/> 089 * The {@link #storeTracker} keeps track of the modified procedures in the newest wal file, which is 090 * also the one being written currently. And the deleted bits in it are for all the procedures, not 091 * only the ones in the newest wal file. And when rolling a log, we will first store it in the 092 * trailer of the current wal file, and then reset its modified bits, so that it can start to track 093 * the modified procedures for the new wal file. 094 * <p/> 095 * The {@link #holdingCleanupTracker} is used to test whether we are safe to delete the oldest wal 096 * file. When there are log rolling and there are more than 1 wal files, we will make use of it. It 097 * will first be initialized to the oldest file's tracker(which is stored in the trailer), using the 098 * method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it 099 * with the tracker of every newer wal files, using the 100 * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}. 101 * If we find out 102 * that all the modified procedures for the oldest wal file are modified or deleted in newer wal 103 * files, then we can delete it. This is because that, every time we call 104 * {@link ProcedureStore#insert(Procedure[])} or {@link ProcedureStore#update(Procedure)}, we will 105 * persist the full state of a Procedure, so the earlier wal records for this procedure can all be 106 * deleted. 107 * @see ProcedureWALPrettyPrinter for printing content of a single WAL. 108 * @see #main(String[]) to parse a directory of MasterWALProcs. 109 */ 110@InterfaceAudience.Private 111public class WALProcedureStore extends ProcedureStoreBase { 112 private static final Logger LOG = LoggerFactory.getLogger(WALProcedureStore.class); 113 public static final String LOG_PREFIX = "pv2-"; 114 /** Used to construct the name of the log directory for master procedures */ 115 public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs"; 116 117 118 public interface LeaseRecovery { 119 void recoverFileLease(FileSystem fs, Path path) throws IOException; 120 } 121 122 public static final String WAL_COUNT_WARN_THRESHOLD_CONF_KEY = 123 "hbase.procedure.store.wal.warn.threshold"; 124 private static final int DEFAULT_WAL_COUNT_WARN_THRESHOLD = 10; 125 126 public static final String EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY = 127 "hbase.procedure.store.wal.exec.cleanup.on.load"; 128 private static final boolean DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY = true; 129 130 public static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY = 131 "hbase.procedure.store.wal.max.retries.before.roll"; 132 private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3; 133 134 public static final String WAIT_BEFORE_ROLL_CONF_KEY = 135 "hbase.procedure.store.wal.wait.before.roll"; 136 private static final int DEFAULT_WAIT_BEFORE_ROLL = 500; 137 138 public static final String ROLL_RETRIES_CONF_KEY = 139 "hbase.procedure.store.wal.max.roll.retries"; 140 private static final int DEFAULT_ROLL_RETRIES = 3; 141 142 public static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY = 143 "hbase.procedure.store.wal.sync.failure.roll.max"; 144 private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3; 145 146 public static final String PERIODIC_ROLL_CONF_KEY = 147 "hbase.procedure.store.wal.periodic.roll.msec"; 148 private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000; // 1h 149 150 public static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec"; 151 private static final int DEFAULT_SYNC_WAIT_MSEC = 100; 152 153 public static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync"; 154 private static final boolean DEFAULT_USE_HSYNC = true; 155 156 public static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold"; 157 private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M 158 159 public static final String STORE_WAL_SYNC_STATS_COUNT = 160 "hbase.procedure.store.wal.sync.stats.count"; 161 private static final int DEFAULT_SYNC_STATS_COUNT = 10; 162 163 private final LinkedList<ProcedureWALFile> logs = new LinkedList<>(); 164 private final ProcedureStoreTracker holdingCleanupTracker = new ProcedureStoreTracker(); 165 private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker(); 166 private final ReentrantLock lock = new ReentrantLock(); 167 private final Condition waitCond = lock.newCondition(); 168 private final Condition slotCond = lock.newCondition(); 169 private final Condition syncCond = lock.newCondition(); 170 171 private final LeaseRecovery leaseRecovery; 172 private final Configuration conf; 173 private final FileSystem fs; 174 private final Path walDir; 175 private final Path walArchiveDir; 176 private final boolean enforceStreamCapability; 177 178 private final AtomicReference<Throwable> syncException = new AtomicReference<>(); 179 private final AtomicBoolean loading = new AtomicBoolean(true); 180 private final AtomicBoolean inSync = new AtomicBoolean(false); 181 private final AtomicLong totalSynced = new AtomicLong(0); 182 private final AtomicLong lastRollTs = new AtomicLong(0); 183 private final AtomicLong syncId = new AtomicLong(0); 184 185 private LinkedTransferQueue<ByteSlot> slotsCache = null; 186 private Set<ProcedureWALFile> corruptedLogs = null; 187 private FSDataOutputStream stream = null; 188 private int runningProcCount = 1; 189 private long flushLogId = 0; 190 private int syncMaxSlot = 1; 191 private int slotIndex = 0; 192 private Thread syncThread; 193 private ByteSlot[] slots; 194 195 private int walCountWarnThreshold; 196 private int maxRetriesBeforeRoll; 197 private int maxSyncFailureRoll; 198 private int waitBeforeRoll; 199 private int rollRetries; 200 private int periodicRollMsec; 201 private long rollThreshold; 202 private boolean useHsync; 203 private int syncWaitMsec; 204 205 // Variables used for UI display 206 private CircularFifoQueue<SyncMetrics> syncMetricsQueue; 207 208 public static class SyncMetrics { 209 private long timestamp; 210 private long syncWaitMs; 211 private long totalSyncedBytes; 212 private int syncedEntries; 213 private float syncedPerSec; 214 215 public long getTimestamp() { 216 return timestamp; 217 } 218 219 public long getSyncWaitMs() { 220 return syncWaitMs; 221 } 222 223 public long getTotalSyncedBytes() { 224 return totalSyncedBytes; 225 } 226 227 public long getSyncedEntries() { 228 return syncedEntries; 229 } 230 231 public float getSyncedPerSec() { 232 return syncedPerSec; 233 } 234 } 235 236 public WALProcedureStore(final Configuration conf, final LeaseRecovery leaseRecovery) 237 throws IOException { 238 this(conf, 239 new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR), 240 new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_OLDLOGDIR_NAME), 241 leaseRecovery); 242 } 243 244 @VisibleForTesting 245 public WALProcedureStore(final Configuration conf, final Path walDir, final Path walArchiveDir, 246 final LeaseRecovery leaseRecovery) throws IOException { 247 this.conf = conf; 248 this.leaseRecovery = leaseRecovery; 249 this.walDir = walDir; 250 this.walArchiveDir = walArchiveDir; 251 this.fs = CommonFSUtils.getWALFileSystem(conf); 252 this.enforceStreamCapability = conf.getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, 253 true); 254 255 // Create the log directory for the procedure store 256 if (!fs.exists(walDir)) { 257 if (!fs.mkdirs(walDir)) { 258 throw new IOException("Unable to mkdir " + walDir); 259 } 260 } 261 // Now that it exists, set the log policy 262 String storagePolicy = 263 conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); 264 CommonFSUtils.setStoragePolicy(fs, walDir, storagePolicy); 265 266 // Create archive dir up front. Rename won't work w/o it up on HDFS. 267 if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) { 268 if (this.fs.mkdirs(this.walArchiveDir)) { 269 LOG.debug("Created Procedure Store WAL archive dir {}", this.walArchiveDir); 270 } else { 271 LOG.warn("Failed create of {}", this.walArchiveDir); 272 } 273 } 274 } 275 276 @Override 277 public void start(int numSlots) throws IOException { 278 if (!setRunning(true)) { 279 return; 280 } 281 282 // Init buffer slots 283 loading.set(true); 284 runningProcCount = numSlots; 285 syncMaxSlot = numSlots; 286 slots = new ByteSlot[numSlots]; 287 slotsCache = new LinkedTransferQueue<>(); 288 while (slotsCache.size() < numSlots) { 289 slotsCache.offer(new ByteSlot()); 290 } 291 292 // Tunings 293 walCountWarnThreshold = 294 conf.getInt(WAL_COUNT_WARN_THRESHOLD_CONF_KEY, DEFAULT_WAL_COUNT_WARN_THRESHOLD); 295 maxRetriesBeforeRoll = 296 conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL); 297 maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL); 298 waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL); 299 rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES); 300 rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD); 301 periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL); 302 syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC); 303 useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC); 304 305 // WebUI 306 syncMetricsQueue = new CircularFifoQueue<>( 307 conf.getInt(STORE_WAL_SYNC_STATS_COUNT, DEFAULT_SYNC_STATS_COUNT)); 308 309 // Init sync thread 310 syncThread = new Thread("WALProcedureStoreSyncThread") { 311 @Override 312 public void run() { 313 try { 314 syncLoop(); 315 } catch (Throwable e) { 316 LOG.error("Got an exception from the sync-loop", e); 317 if (!isSyncAborted()) { 318 sendAbortProcessSignal(); 319 } 320 } 321 } 322 }; 323 syncThread.start(); 324 } 325 326 @Override 327 public void stop(final boolean abort) { 328 if (!setRunning(false)) { 329 return; 330 } 331 332 LOG.info("Stopping the WAL Procedure Store, isAbort=" + abort + 333 (isSyncAborted() ? " (self aborting)" : "")); 334 sendStopSignal(); 335 if (!isSyncAborted()) { 336 try { 337 while (syncThread.isAlive()) { 338 sendStopSignal(); 339 syncThread.join(250); 340 } 341 } catch (InterruptedException e) { 342 LOG.warn("join interrupted", e); 343 Thread.currentThread().interrupt(); 344 } 345 } 346 347 // Close the writer 348 closeCurrentLogStream(abort); 349 350 // Close the old logs 351 // they should be already closed, this is just in case the load fails 352 // and we call start() and then stop() 353 for (ProcedureWALFile log: logs) { 354 log.close(); 355 } 356 logs.clear(); 357 loading.set(true); 358 } 359 360 private void sendStopSignal() { 361 if (lock.tryLock()) { 362 try { 363 waitCond.signalAll(); 364 syncCond.signalAll(); 365 } finally { 366 lock.unlock(); 367 } 368 } 369 } 370 371 @Override 372 public int getNumThreads() { 373 return slots == null ? 0 : slots.length; 374 } 375 376 @Override 377 public int setRunningProcedureCount(final int count) { 378 this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length; 379 return this.runningProcCount; 380 } 381 382 public ProcedureStoreTracker getStoreTracker() { 383 return storeTracker; 384 } 385 386 public ArrayList<ProcedureWALFile> getActiveLogs() { 387 lock.lock(); 388 try { 389 return new ArrayList<>(logs); 390 } finally { 391 lock.unlock(); 392 } 393 } 394 395 public Set<ProcedureWALFile> getCorruptedLogs() { 396 return corruptedLogs; 397 } 398 399 @Override 400 public void recoverLease() throws IOException { 401 lock.lock(); 402 try { 403 LOG.debug("Starting WAL Procedure Store lease recovery"); 404 boolean afterFirstAttempt = false; 405 while (isRunning()) { 406 // Don't sleep before first attempt 407 if (afterFirstAttempt) { 408 LOG.trace("Sleep {} ms after first lease recovery attempt.", 409 waitBeforeRoll); 410 Threads.sleepWithoutInterrupt(waitBeforeRoll); 411 } else { 412 afterFirstAttempt = true; 413 } 414 FileStatus[] oldLogs = getLogFiles(); 415 // Get Log-MaxID and recover lease on old logs 416 try { 417 flushLogId = initOldLogs(oldLogs); 418 } catch (FileNotFoundException e) { 419 LOG.warn("Someone else is active and deleted logs. retrying.", e); 420 continue; 421 } 422 423 // Create new state-log 424 if (!rollWriter(flushLogId + 1)) { 425 // someone else has already created this log 426 LOG.debug("Someone else has already created log {}. Retrying.", flushLogId); 427 continue; 428 } 429 430 // We have the lease on the log 431 oldLogs = getLogFiles(); 432 if (getMaxLogId(oldLogs) > flushLogId) { 433 LOG.debug("Someone else created new logs. Expected maxLogId < {}", flushLogId); 434 logs.getLast().removeFile(this.walArchiveDir); 435 continue; 436 } 437 438 LOG.debug("Lease acquired for flushLogId={}", flushLogId); 439 break; 440 } 441 } finally { 442 lock.unlock(); 443 } 444 } 445 446 @Override 447 public void load(ProcedureLoader loader) throws IOException { 448 lock.lock(); 449 try { 450 if (logs.isEmpty()) { 451 throw new IllegalStateException("recoverLease() must be called before loading data"); 452 } 453 454 // Nothing to do, If we have only the current log. 455 if (logs.size() == 1) { 456 LOG.debug("No state logs to replay."); 457 loader.setMaxProcId(0); 458 loading.set(false); 459 return; 460 } 461 462 // Load the old logs 463 Iterator<ProcedureWALFile> it = logs.descendingIterator(); 464 it.next(); // Skip the current log 465 466 ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() { 467 468 @Override 469 public void setMaxProcId(long maxProcId) { 470 loader.setMaxProcId(maxProcId); 471 } 472 473 @Override 474 public void load(ProcedureIterator procIter) throws IOException { 475 loader.load(procIter); 476 } 477 478 @Override 479 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 480 loader.handleCorrupted(procIter); 481 } 482 483 @Override 484 public void markCorruptedWAL(ProcedureWALFile log, IOException e) { 485 if (corruptedLogs == null) { 486 corruptedLogs = new HashSet<>(); 487 } 488 corruptedLogs.add(log); 489 // TODO: sideline corrupted log 490 } 491 }); 492 // if we fail when loading, we should prevent persisting the storeTracker later in the stop 493 // method. As it may happen that, we have finished constructing the modified and deleted bits, 494 // but before we call resetModified, we fail, then if we persist the storeTracker then when 495 // restarting, we will consider that all procedures have been included in this file and delete 496 // all the previous files. Obviously this not correct. So here we will only set loading to 497 // false when we successfully loaded all the procedures, and when closing we will skip 498 // persisting the store tracker. And also, this will prevent the sync thread to do 499 // periodicRoll, where we may also clean old logs. 500 loading.set(false); 501 // try to cleanup inactive wals and complete the operation 502 buildHoldingCleanupTracker(); 503 tryCleanupLogsOnLoad(); 504 } finally { 505 lock.unlock(); 506 } 507 } 508 509 private void tryCleanupLogsOnLoad() { 510 // nothing to cleanup. 511 if (logs.size() <= 1) { 512 return; 513 } 514 515 // the config says to not cleanup wals on load. 516 if (!conf.getBoolean(EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, 517 DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY)) { 518 LOG.debug("WALs cleanup on load is not enabled: " + getActiveLogs()); 519 return; 520 } 521 522 try { 523 periodicRoll(); 524 } catch (IOException e) { 525 LOG.warn("Unable to cleanup logs on load: " + e.getMessage(), e); 526 } 527 } 528 529 @Override 530 public void insert(Procedure<?> proc, Procedure<?>[] subprocs) { 531 if (LOG.isTraceEnabled()) { 532 LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs)); 533 } 534 535 ByteSlot slot = acquireSlot(); 536 try { 537 // Serialize the insert 538 long[] subProcIds = null; 539 if (subprocs != null) { 540 ProcedureWALFormat.writeInsert(slot, proc, subprocs); 541 subProcIds = new long[subprocs.length]; 542 for (int i = 0; i < subprocs.length; ++i) { 543 subProcIds[i] = subprocs[i].getProcId(); 544 } 545 } else { 546 assert !proc.hasParent(); 547 ProcedureWALFormat.writeInsert(slot, proc); 548 } 549 550 // Push the transaction data and wait until it is persisted 551 pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds); 552 } catch (IOException e) { 553 // We are not able to serialize the procedure. 554 // this is a code error, and we are not able to go on. 555 LOG.error(HBaseMarkers.FATAL, "Unable to serialize one of the procedure: proc=" + 556 proc + ", subprocs=" + Arrays.toString(subprocs), e); 557 throw new RuntimeException(e); 558 } finally { 559 releaseSlot(slot); 560 } 561 } 562 563 @Override 564 public void insert(Procedure<?>[] procs) { 565 if (LOG.isTraceEnabled()) { 566 LOG.trace("Insert " + Arrays.toString(procs)); 567 } 568 569 ByteSlot slot = acquireSlot(); 570 try { 571 // Serialize the insert 572 long[] procIds = new long[procs.length]; 573 for (int i = 0; i < procs.length; ++i) { 574 assert !procs[i].hasParent(); 575 procIds[i] = procs[i].getProcId(); 576 ProcedureWALFormat.writeInsert(slot, procs[i]); 577 } 578 579 // Push the transaction data and wait until it is persisted 580 pushData(PushType.INSERT, slot, Procedure.NO_PROC_ID, procIds); 581 } catch (IOException e) { 582 // We are not able to serialize the procedure. 583 // this is a code error, and we are not able to go on. 584 LOG.error(HBaseMarkers.FATAL, "Unable to serialize one of the procedure: " + 585 Arrays.toString(procs), e); 586 throw new RuntimeException(e); 587 } finally { 588 releaseSlot(slot); 589 } 590 } 591 592 @Override 593 public void update(Procedure<?> proc) { 594 if (LOG.isTraceEnabled()) { 595 LOG.trace("Update " + proc); 596 } 597 598 ByteSlot slot = acquireSlot(); 599 try { 600 // Serialize the update 601 ProcedureWALFormat.writeUpdate(slot, proc); 602 603 // Push the transaction data and wait until it is persisted 604 pushData(PushType.UPDATE, slot, proc.getProcId(), null); 605 } catch (IOException e) { 606 // We are not able to serialize the procedure. 607 // this is a code error, and we are not able to go on. 608 LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + proc, e); 609 throw new RuntimeException(e); 610 } finally { 611 releaseSlot(slot); 612 } 613 } 614 615 @Override 616 public void delete(long procId) { 617 LOG.trace("Delete {}", procId); 618 ByteSlot slot = acquireSlot(); 619 try { 620 // Serialize the delete 621 ProcedureWALFormat.writeDelete(slot, procId); 622 623 // Push the transaction data and wait until it is persisted 624 pushData(PushType.DELETE, slot, procId, null); 625 } catch (IOException e) { 626 // We are not able to serialize the procedure. 627 // this is a code error, and we are not able to go on. 628 LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + procId, e); 629 throw new RuntimeException(e); 630 } finally { 631 releaseSlot(slot); 632 } 633 } 634 635 @Override 636 public void delete(Procedure<?> proc, long[] subProcIds) { 637 assert proc != null : "expected a non-null procedure"; 638 assert subProcIds != null && subProcIds.length > 0 : "expected subProcIds"; 639 if (LOG.isTraceEnabled()) { 640 LOG.trace("Update " + proc + " and Delete " + Arrays.toString(subProcIds)); 641 } 642 643 ByteSlot slot = acquireSlot(); 644 try { 645 // Serialize the delete 646 ProcedureWALFormat.writeDelete(slot, proc, subProcIds); 647 648 // Push the transaction data and wait until it is persisted 649 pushData(PushType.DELETE, slot, proc.getProcId(), subProcIds); 650 } catch (IOException e) { 651 // We are not able to serialize the procedure. 652 // this is a code error, and we are not able to go on. 653 LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + proc, e); 654 throw new RuntimeException(e); 655 } finally { 656 releaseSlot(slot); 657 } 658 } 659 660 @Override 661 public void delete(final long[] procIds, final int offset, final int count) { 662 if (count == 0) { 663 return; 664 } 665 666 if (offset == 0 && count == procIds.length) { 667 delete(procIds); 668 } else if (count == 1) { 669 delete(procIds[offset]); 670 } else { 671 delete(Arrays.copyOfRange(procIds, offset, offset + count)); 672 } 673 } 674 675 private void delete(long[] procIds) { 676 if (LOG.isTraceEnabled()) { 677 LOG.trace("Delete " + Arrays.toString(procIds)); 678 } 679 680 final ByteSlot slot = acquireSlot(); 681 try { 682 // Serialize the delete 683 for (int i = 0; i < procIds.length; ++i) { 684 ProcedureWALFormat.writeDelete(slot, procIds[i]); 685 } 686 687 // Push the transaction data and wait until it is persisted 688 pushData(PushType.DELETE, slot, Procedure.NO_PROC_ID, procIds); 689 } catch (IOException e) { 690 // We are not able to serialize the procedure. 691 // this is a code error, and we are not able to go on. 692 LOG.error("Unable to serialize the procedures: " + Arrays.toString(procIds), e); 693 throw new RuntimeException(e); 694 } finally { 695 releaseSlot(slot); 696 } 697 } 698 699 private ByteSlot acquireSlot() { 700 ByteSlot slot = slotsCache.poll(); 701 return slot != null ? slot : new ByteSlot(); 702 } 703 704 private void releaseSlot(final ByteSlot slot) { 705 slot.reset(); 706 slotsCache.offer(slot); 707 } 708 709 private enum PushType { INSERT, UPDATE, DELETE }; 710 711 private long pushData(final PushType type, final ByteSlot slot, 712 final long procId, final long[] subProcIds) { 713 if (!isRunning()) { 714 throw new RuntimeException("the store must be running before inserting data"); 715 } 716 if (logs.isEmpty()) { 717 throw new RuntimeException("recoverLease() must be called before inserting data"); 718 } 719 720 long logId = -1; 721 lock.lock(); 722 try { 723 // Wait for the sync to be completed 724 while (true) { 725 if (!isRunning()) { 726 throw new RuntimeException("store no longer running"); 727 } else if (isSyncAborted()) { 728 throw new RuntimeException("sync aborted", syncException.get()); 729 } else if (inSync.get()) { 730 syncCond.await(); 731 } else if (slotIndex >= syncMaxSlot) { 732 slotCond.signal(); 733 syncCond.await(); 734 } else { 735 break; 736 } 737 } 738 739 final long pushSyncId = syncId.get(); 740 updateStoreTracker(type, procId, subProcIds); 741 slots[slotIndex++] = slot; 742 logId = flushLogId; 743 744 // Notify that there is new data 745 if (slotIndex == 1) { 746 waitCond.signal(); 747 } 748 749 // Notify that the slots are full 750 if (slotIndex == syncMaxSlot) { 751 waitCond.signal(); 752 slotCond.signal(); 753 } 754 755 while (pushSyncId == syncId.get() && isRunning()) { 756 syncCond.await(); 757 } 758 } catch (InterruptedException e) { 759 Thread.currentThread().interrupt(); 760 sendAbortProcessSignal(); 761 throw new RuntimeException(e); 762 } finally { 763 lock.unlock(); 764 if (isSyncAborted()) { 765 throw new RuntimeException("sync aborted", syncException.get()); 766 } 767 } 768 return logId; 769 } 770 771 private void updateStoreTracker(final PushType type, 772 final long procId, final long[] subProcIds) { 773 switch (type) { 774 case INSERT: 775 if (subProcIds == null) { 776 storeTracker.insert(procId); 777 } else if (procId == Procedure.NO_PROC_ID) { 778 storeTracker.insert(subProcIds); 779 } else { 780 storeTracker.insert(procId, subProcIds); 781 holdingCleanupTracker.setDeletedIfModified(procId); 782 } 783 break; 784 case UPDATE: 785 storeTracker.update(procId); 786 holdingCleanupTracker.setDeletedIfModified(procId); 787 break; 788 case DELETE: 789 if (subProcIds != null && subProcIds.length > 0) { 790 storeTracker.delete(subProcIds); 791 holdingCleanupTracker.setDeletedIfModified(subProcIds); 792 } else { 793 storeTracker.delete(procId); 794 holdingCleanupTracker.setDeletedIfModified(procId); 795 } 796 break; 797 default: 798 throw new RuntimeException("invalid push type " + type); 799 } 800 } 801 802 private boolean isSyncAborted() { 803 return syncException.get() != null; 804 } 805 806 private void syncLoop() throws Throwable { 807 long totalSyncedToStore = 0; 808 inSync.set(false); 809 lock.lock(); 810 try { 811 while (isRunning()) { 812 try { 813 // Wait until new data is available 814 if (slotIndex == 0) { 815 if (!loading.get()) { 816 periodicRoll(); 817 } 818 819 if (LOG.isTraceEnabled()) { 820 float rollTsSec = getMillisFromLastRoll() / 1000.0f; 821 LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)", 822 StringUtils.humanSize(totalSynced.get()), 823 StringUtils.humanSize(totalSynced.get() / rollTsSec))); 824 } 825 826 waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS); 827 if (slotIndex == 0) { 828 // no data.. probably a stop() or a periodic roll 829 continue; 830 } 831 } 832 // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing 833 syncMaxSlot = runningProcCount; 834 assert syncMaxSlot > 0 : "unexpected syncMaxSlot=" + syncMaxSlot; 835 final long syncWaitSt = System.currentTimeMillis(); 836 if (slotIndex != syncMaxSlot) { 837 slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS); 838 } 839 840 final long currentTs = System.currentTimeMillis(); 841 final long syncWaitMs = currentTs - syncWaitSt; 842 final float rollSec = getMillisFromLastRoll() / 1000.0f; 843 final float syncedPerSec = totalSyncedToStore / rollSec; 844 if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < syncMaxSlot)) { 845 LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)", 846 StringUtils.humanTimeDiff(syncWaitMs), slotIndex, 847 StringUtils.humanSize(totalSyncedToStore), 848 StringUtils.humanSize(syncedPerSec))); 849 } 850 851 // update webui circular buffers (TODO: get rid of allocations) 852 final SyncMetrics syncMetrics = new SyncMetrics(); 853 syncMetrics.timestamp = currentTs; 854 syncMetrics.syncWaitMs = syncWaitMs; 855 syncMetrics.syncedEntries = slotIndex; 856 syncMetrics.totalSyncedBytes = totalSyncedToStore; 857 syncMetrics.syncedPerSec = syncedPerSec; 858 syncMetricsQueue.add(syncMetrics); 859 860 // sync 861 inSync.set(true); 862 long slotSize = syncSlots(); 863 logs.getLast().addToSize(slotSize); 864 totalSyncedToStore = totalSynced.addAndGet(slotSize); 865 slotIndex = 0; 866 inSync.set(false); 867 syncId.incrementAndGet(); 868 } catch (InterruptedException e) { 869 Thread.currentThread().interrupt(); 870 syncException.compareAndSet(null, e); 871 sendAbortProcessSignal(); 872 throw e; 873 } catch (Throwable t) { 874 syncException.compareAndSet(null, t); 875 sendAbortProcessSignal(); 876 throw t; 877 } finally { 878 syncCond.signalAll(); 879 } 880 } 881 } finally { 882 lock.unlock(); 883 } 884 } 885 886 public ArrayList<SyncMetrics> getSyncMetrics() { 887 lock.lock(); 888 try { 889 return new ArrayList<>(syncMetricsQueue); 890 } finally { 891 lock.unlock(); 892 } 893 } 894 895 private long syncSlots() throws Throwable { 896 int retry = 0; 897 int logRolled = 0; 898 long totalSynced = 0; 899 do { 900 try { 901 totalSynced = syncSlots(stream, slots, 0, slotIndex); 902 break; 903 } catch (Throwable e) { 904 LOG.warn("unable to sync slots, retry=" + retry); 905 if (++retry >= maxRetriesBeforeRoll) { 906 if (logRolled >= maxSyncFailureRoll && isRunning()) { 907 LOG.error("Sync slots after log roll failed, abort.", e); 908 throw e; 909 } 910 911 if (!rollWriterWithRetries()) { 912 throw e; 913 } 914 915 logRolled++; 916 retry = 0; 917 } 918 } 919 } while (isRunning()); 920 return totalSynced; 921 } 922 923 protected long syncSlots(final FSDataOutputStream stream, final ByteSlot[] slots, 924 final int offset, final int count) throws IOException { 925 long totalSynced = 0; 926 for (int i = 0; i < count; ++i) { 927 final ByteSlot data = slots[offset + i]; 928 data.writeTo(stream); 929 totalSynced += data.size(); 930 } 931 932 syncStream(stream); 933 sendPostSyncSignal(); 934 935 if (LOG.isTraceEnabled()) { 936 LOG.trace("Sync slots=" + count + '/' + syncMaxSlot + 937 ", flushed=" + StringUtils.humanSize(totalSynced)); 938 } 939 return totalSynced; 940 } 941 942 protected void syncStream(final FSDataOutputStream stream) throws IOException { 943 if (useHsync) { 944 stream.hsync(); 945 } else { 946 stream.hflush(); 947 } 948 } 949 950 private boolean rollWriterWithRetries() { 951 for (int i = 0; i < rollRetries && isRunning(); ++i) { 952 if (i > 0) { 953 Threads.sleepWithoutInterrupt(waitBeforeRoll * i); 954 } 955 956 try { 957 if (rollWriter()) { 958 return true; 959 } 960 } catch (IOException e) { 961 LOG.warn("Unable to roll the log, attempt=" + (i + 1), e); 962 } 963 } 964 LOG.error(HBaseMarkers.FATAL, "Unable to roll the log"); 965 return false; 966 } 967 968 private boolean tryRollWriter() { 969 try { 970 return rollWriter(); 971 } catch (IOException e) { 972 LOG.warn("Unable to roll the log", e); 973 return false; 974 } 975 } 976 977 public long getMillisToNextPeriodicRoll() { 978 if (lastRollTs.get() > 0 && periodicRollMsec > 0) { 979 return periodicRollMsec - getMillisFromLastRoll(); 980 } 981 return Long.MAX_VALUE; 982 } 983 984 public long getMillisFromLastRoll() { 985 return (System.currentTimeMillis() - lastRollTs.get()); 986 } 987 988 @VisibleForTesting 989 void periodicRollForTesting() throws IOException { 990 lock.lock(); 991 try { 992 periodicRoll(); 993 } finally { 994 lock.unlock(); 995 } 996 } 997 998 @VisibleForTesting 999 public boolean rollWriterForTesting() throws IOException { 1000 lock.lock(); 1001 try { 1002 return rollWriter(); 1003 } finally { 1004 lock.unlock(); 1005 } 1006 } 1007 1008 @VisibleForTesting 1009 void removeInactiveLogsForTesting() throws Exception { 1010 lock.lock(); 1011 try { 1012 removeInactiveLogs(); 1013 } finally { 1014 lock.unlock(); 1015 } 1016 } 1017 1018 private void periodicRoll() throws IOException { 1019 if (storeTracker.isEmpty()) { 1020 LOG.trace("no active procedures"); 1021 tryRollWriter(); 1022 removeAllLogs(flushLogId - 1, "no active procedures"); 1023 } else { 1024 if (storeTracker.isAllModified()) { 1025 LOG.trace("all the active procedures are in the latest log"); 1026 removeAllLogs(flushLogId - 1, "all the active procedures are in the latest log"); 1027 } 1028 1029 // if the log size has exceeded the roll threshold 1030 // or the periodic roll timeout is expired, try to roll the wal. 1031 if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) { 1032 tryRollWriter(); 1033 } 1034 1035 removeInactiveLogs(); 1036 } 1037 } 1038 1039 private boolean rollWriter() throws IOException { 1040 if (!isRunning()) { 1041 return false; 1042 } 1043 1044 // Create new state-log 1045 if (!rollWriter(flushLogId + 1)) { 1046 LOG.warn("someone else has already created log {}", flushLogId); 1047 return false; 1048 } 1049 1050 // We have the lease on the log, 1051 // but we should check if someone else has created new files 1052 if (getMaxLogId(getLogFiles()) > flushLogId) { 1053 LOG.warn("Someone else created new logs. Expected maxLogId < {}", flushLogId); 1054 logs.getLast().removeFile(this.walArchiveDir); 1055 return false; 1056 } 1057 1058 // We have the lease on the log 1059 return true; 1060 } 1061 1062 @VisibleForTesting 1063 boolean rollWriter(long logId) throws IOException { 1064 assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId; 1065 assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked(); 1066 1067 ProcedureWALHeader header = ProcedureWALHeader.newBuilder() 1068 .setVersion(ProcedureWALFormat.HEADER_VERSION) 1069 .setType(ProcedureWALFormat.LOG_TYPE_STREAM) 1070 .setMinProcId(storeTracker.getActiveMinProcId()) 1071 .setLogId(logId) 1072 .build(); 1073 1074 FSDataOutputStream newStream = null; 1075 Path newLogFile = null; 1076 long startPos = -1; 1077 newLogFile = getLogFilePath(logId); 1078 try { 1079 newStream = CommonFSUtils.createForWal(fs, newLogFile, false); 1080 } catch (FileAlreadyExistsException e) { 1081 LOG.error("Log file with id={} already exists", logId, e); 1082 return false; 1083 } catch (RemoteException re) { 1084 LOG.warn("failed to create log file with id={}", logId, re); 1085 return false; 1086 } 1087 // After we create the stream but before we attempt to use it at all 1088 // ensure that we can provide the level of data safety we're configured 1089 // to provide. 1090 final String durability = useHsync ? "hsync" : "hflush"; 1091 if (enforceStreamCapability && !(CommonFSUtils.hasCapability(newStream, durability))) { 1092 throw new IllegalStateException("The procedure WAL relies on the ability to " + durability + 1093 " for proper operation during component failures, but the underlying filesystem does " + 1094 "not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY + 1095 "' to set the desired level of robustness and ensure the config value of '" + 1096 CommonFSUtils.HBASE_WAL_DIR + "' points to a FileSystem mount that can provide it."); 1097 } 1098 try { 1099 ProcedureWALFormat.writeHeader(newStream, header); 1100 startPos = newStream.getPos(); 1101 } catch (IOException ioe) { 1102 LOG.warn("Encountered exception writing header", ioe); 1103 newStream.close(); 1104 return false; 1105 } 1106 1107 closeCurrentLogStream(false); 1108 1109 storeTracker.resetModified(); 1110 stream = newStream; 1111 flushLogId = logId; 1112 totalSynced.set(0); 1113 long rollTs = System.currentTimeMillis(); 1114 lastRollTs.set(rollTs); 1115 logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos, rollTs)); 1116 1117 // if it's the first next WAL being added, build the holding cleanup tracker 1118 if (logs.size() == 2) { 1119 buildHoldingCleanupTracker(); 1120 } else if (logs.size() > walCountWarnThreshold) { 1121 LOG.warn("procedure WALs count={} above the warning threshold {}. check running procedures" + 1122 " to see if something is stuck.", logs.size(), walCountWarnThreshold); 1123 // This is just like what we have done at RS side when there are too many wal files. For RS, 1124 // if there are too many wal files, we will find out the wal entries in the oldest file, and 1125 // tell the upper layer to flush these regions so the wal entries will be useless and then we 1126 // can delete the wal file. For WALProcedureStore, the assumption is that, if all the 1127 // procedures recorded in a proc wal file are modified or deleted in a new proc wal file, then 1128 // we are safe to delete it. So here if there are too many proc wal files, we will find out 1129 // the procedure ids in the oldest file, which are neither modified nor deleted in newer proc 1130 // wal files, and tell upper layer to update the state of these procedures to the newest proc 1131 // wal file(by calling ProcedureStore.update), then we are safe to delete the oldest proc wal 1132 // file. 1133 sendForceUpdateSignal(holdingCleanupTracker.getAllActiveProcIds()); 1134 } 1135 1136 LOG.info("Rolled new Procedure Store WAL, id={}", logId); 1137 return true; 1138 } 1139 1140 private void closeCurrentLogStream(boolean abort) { 1141 if (stream == null || logs.isEmpty()) { 1142 return; 1143 } 1144 1145 try { 1146 ProcedureWALFile log = logs.getLast(); 1147 // If the loading flag is true, it usually means that we fail when loading procedures, so we 1148 // should not persist the store tracker, as its state may not be correct. 1149 if (!loading.get()) { 1150 log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId()); 1151 log.updateLocalTracker(storeTracker); 1152 if (!abort) { 1153 long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker); 1154 log.addToSize(trailerSize); 1155 } 1156 } 1157 } catch (IOException e) { 1158 LOG.warn("Unable to write the trailer", e); 1159 } 1160 try { 1161 stream.close(); 1162 } catch (IOException e) { 1163 LOG.error("Unable to close the stream", e); 1164 } 1165 stream = null; 1166 } 1167 1168 // ========================================================================== 1169 // Log Files cleaner helpers 1170 // ========================================================================== 1171 private void removeInactiveLogs() throws IOException { 1172 // We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'. 1173 // once there is nothing olding the oldest WAL we can remove it. 1174 while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) { 1175 LOG.info("Remove the oldest log {}", logs.getFirst()); 1176 removeLogFile(logs.getFirst(), walArchiveDir); 1177 buildHoldingCleanupTracker(); 1178 } 1179 1180 // TODO: In case we are holding up a lot of logs for long time we should 1181 // rewrite old procedures (in theory parent procs) to the new WAL. 1182 } 1183 1184 private void buildHoldingCleanupTracker() { 1185 if (logs.size() <= 1) { 1186 // we only have one wal, so nothing to do 1187 holdingCleanupTracker.reset(); 1188 return; 1189 } 1190 1191 // compute the holding tracker. 1192 // - the first WAL is used for the 'updates' 1193 // - the global tracker will be used to determine whether a procedure has been deleted 1194 // - other trackers will be used to determine whether a procedure has been updated, as a deleted 1195 // procedure can always be detected by checking the global tracker, we can save the deleted 1196 // checks when applying other trackers 1197 holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true); 1198 holdingCleanupTracker.setDeletedIfDeletedByThem(storeTracker); 1199 // the logs is a linked list, so avoid calling get(index) on it. 1200 Iterator<ProcedureWALFile> iter = logs.iterator(); 1201 // skip the tracker for the first file when creating the iterator. 1202 iter.next(); 1203 ProcedureStoreTracker tracker = iter.next().getTracker(); 1204 // testing iter.hasNext after calling iter.next to skip applying the tracker for last file, 1205 // which is just the storeTracker above. 1206 while (iter.hasNext()) { 1207 holdingCleanupTracker.setDeletedIfModifiedInBoth(tracker); 1208 if (holdingCleanupTracker.isEmpty()) { 1209 break; 1210 } 1211 tracker = iter.next().getTracker(); 1212 } 1213 } 1214 1215 /** 1216 * Remove all logs with logId <= {@code lastLogId}. 1217 */ 1218 private void removeAllLogs(long lastLogId, String why) { 1219 if (logs.size() <= 1) { 1220 return; 1221 } 1222 1223 LOG.info("Remove all state logs with ID less than {}, since {}", lastLogId, why); 1224 1225 boolean removed = false; 1226 while (logs.size() > 1) { 1227 ProcedureWALFile log = logs.getFirst(); 1228 if (lastLogId < log.getLogId()) { 1229 break; 1230 } 1231 removeLogFile(log, walArchiveDir); 1232 removed = true; 1233 } 1234 1235 if (removed) { 1236 buildHoldingCleanupTracker(); 1237 } 1238 } 1239 1240 private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) { 1241 try { 1242 LOG.trace("Removing log={}", log); 1243 log.removeFile(walArchiveDir); 1244 logs.remove(log); 1245 LOG.debug("Removed log={}, activeLogs={}", log, logs); 1246 assert logs.size() > 0 : "expected at least one log"; 1247 } catch (IOException e) { 1248 LOG.error("Unable to remove log: " + log, e); 1249 return false; 1250 } 1251 return true; 1252 } 1253 1254 // ========================================================================== 1255 // FileSystem Log Files helpers 1256 // ========================================================================== 1257 public Path getWALDir() { 1258 return this.walDir; 1259 } 1260 1261 @VisibleForTesting 1262 Path getWalArchiveDir() { 1263 return this.walArchiveDir; 1264 } 1265 1266 public FileSystem getFileSystem() { 1267 return this.fs; 1268 } 1269 1270 protected Path getLogFilePath(final long logId) throws IOException { 1271 return new Path(walDir, String.format(LOG_PREFIX + "%020d.log", logId)); 1272 } 1273 1274 private static long getLogIdFromName(final String name) { 1275 int end = name.lastIndexOf(".log"); 1276 int start = name.lastIndexOf('-') + 1; 1277 return Long.parseLong(name.substring(start, end)); 1278 } 1279 1280 private static final PathFilter WALS_PATH_FILTER = new PathFilter() { 1281 @Override 1282 public boolean accept(Path path) { 1283 String name = path.getName(); 1284 return name.startsWith(LOG_PREFIX) && name.endsWith(".log"); 1285 } 1286 }; 1287 1288 private static final Comparator<FileStatus> FILE_STATUS_ID_COMPARATOR = 1289 new Comparator<FileStatus>() { 1290 @Override 1291 public int compare(FileStatus a, FileStatus b) { 1292 final long aId = getLogIdFromName(a.getPath().getName()); 1293 final long bId = getLogIdFromName(b.getPath().getName()); 1294 return Long.compare(aId, bId); 1295 } 1296 }; 1297 1298 private FileStatus[] getLogFiles() throws IOException { 1299 try { 1300 FileStatus[] files = fs.listStatus(walDir, WALS_PATH_FILTER); 1301 Arrays.sort(files, FILE_STATUS_ID_COMPARATOR); 1302 return files; 1303 } catch (FileNotFoundException e) { 1304 LOG.warn("Log directory not found: " + e.getMessage()); 1305 return null; 1306 } 1307 } 1308 1309 /** 1310 * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort 1311 * the file set by log id. 1312 * @return Max-LogID of the specified log file set 1313 */ 1314 private static long getMaxLogId(FileStatus[] logFiles) { 1315 if (logFiles == null || logFiles.length == 0) { 1316 return 0L; 1317 } 1318 return getLogIdFromName(logFiles[logFiles.length - 1].getPath().getName()); 1319 } 1320 1321 /** 1322 * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort 1323 * the file set by log id. 1324 * @return Max-LogID of the specified log file set 1325 */ 1326 private long initOldLogs(FileStatus[] logFiles) throws IOException { 1327 if (logFiles == null || logFiles.length == 0) { 1328 return 0L; 1329 } 1330 long maxLogId = 0; 1331 for (int i = 0; i < logFiles.length; ++i) { 1332 final Path logPath = logFiles[i].getPath(); 1333 leaseRecovery.recoverFileLease(fs, logPath); 1334 if (!isRunning()) { 1335 throw new IOException("wal aborting"); 1336 } 1337 1338 maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName())); 1339 ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir); 1340 if (log != null) { 1341 this.logs.add(log); 1342 } 1343 } 1344 initTrackerFromOldLogs(); 1345 return maxLogId; 1346 } 1347 1348 /** 1349 * If last log's tracker is not null, use it as {@link #storeTracker}. Otherwise, set storeTracker 1350 * as partial, and let {@link ProcedureWALFormatReader} rebuild it using entries in the log. 1351 */ 1352 private void initTrackerFromOldLogs() { 1353 if (logs.isEmpty() || !isRunning()) { 1354 return; 1355 } 1356 ProcedureWALFile log = logs.getLast(); 1357 if (!log.getTracker().isPartial()) { 1358 storeTracker.resetTo(log.getTracker()); 1359 } else { 1360 storeTracker.reset(); 1361 storeTracker.setPartialFlag(true); 1362 } 1363 } 1364 1365 /** 1366 * Loads given log file and it's tracker. 1367 */ 1368 private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir) 1369 throws IOException { 1370 final ProcedureWALFile log = new ProcedureWALFile(fs, logFile); 1371 if (logFile.getLen() == 0) { 1372 LOG.warn("Remove uninitialized log: {}", logFile); 1373 log.removeFile(walArchiveDir); 1374 return null; 1375 } 1376 LOG.debug("Opening Pv2 {}", logFile); 1377 try { 1378 log.open(); 1379 } catch (ProcedureWALFormat.InvalidWALDataException e) { 1380 LOG.warn("Remove uninitialized log: {}", logFile, e); 1381 log.removeFile(walArchiveDir); 1382 return null; 1383 } catch (IOException e) { 1384 String msg = "Unable to read state log: " + logFile; 1385 LOG.error(msg, e); 1386 throw new IOException(msg, e); 1387 } 1388 1389 try { 1390 log.readTracker(); 1391 } catch (IOException e) { 1392 log.getTracker().reset(); 1393 log.getTracker().setPartialFlag(true); 1394 LOG.warn("Unable to read tracker for {}", log, e); 1395 } 1396 1397 log.close(); 1398 return log; 1399 } 1400 1401 /** 1402 * Parses a directory of WALs building up ProcedureState. 1403 * For testing parse and profiling. 1404 * @param args Include pointer to directory of WAL files for a store instance to parse & load. 1405 */ 1406 public static void main(String [] args) throws IOException { 1407 Configuration conf = HBaseConfiguration.create(); 1408 if (args == null || args.length != 1) { 1409 System.out.println("ERROR: Empty arguments list; pass path to MASTERPROCWALS_DIR."); 1410 System.out.println("Usage: WALProcedureStore MASTERPROCWALS_DIR"); 1411 System.exit(-1); 1412 } 1413 WALProcedureStore store = new WALProcedureStore(conf, new Path(args[0]), null, 1414 new WALProcedureStore.LeaseRecovery() { 1415 @Override 1416 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 1417 // no-op 1418 } 1419 }); 1420 try { 1421 store.start(16); 1422 ProcedureExecutor<?> pe = new ProcedureExecutor<>(conf, new Object()/*Pass anything*/, store); 1423 pe.init(1, true); 1424 } finally { 1425 store.stop(true); 1426 } 1427 } 1428}