001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Comparator; 025import java.util.HashSet; 026import java.util.Iterator; 027import java.util.LinkedList; 028import java.util.Set; 029import java.util.concurrent.LinkedTransferQueue; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.atomic.AtomicLong; 033import java.util.concurrent.atomic.AtomicReference; 034import java.util.concurrent.locks.Condition; 035import java.util.concurrent.locks.ReentrantLock; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FSDataOutputStream; 038import org.apache.hadoop.fs.FSError; 039import org.apache.hadoop.fs.FileAlreadyExistsException; 040import org.apache.hadoop.fs.FileStatus; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.fs.PathFilter; 044import org.apache.hadoop.fs.StreamCapabilities; 045import org.apache.hadoop.hbase.HBaseConfiguration; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.log.HBaseMarkers; 048import org.apache.hadoop.hbase.procedure2.Procedure; 049import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 050import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery; 051import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 052import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase; 053import org.apache.hadoop.hbase.procedure2.util.ByteSlot; 054import org.apache.hadoop.hbase.procedure2.util.StringUtils; 055import org.apache.hadoop.hbase.util.CommonFSUtils; 056import org.apache.hadoop.hbase.util.Threads; 057import org.apache.hadoop.ipc.RemoteException; 058import org.apache.yetus.audience.InterfaceAudience; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 063import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue; 064 065import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; 066 067/** 068 * WAL implementation of the ProcedureStore. 069 * <p/> 070 * When starting, the upper layer will first call {@link #start(int)}, then {@link #recoverLease()}, 071 * then {@link #load(ProcedureLoader)}. 072 * <p/> 073 * In {@link #recoverLease()}, we will get the lease by closing all the existing wal files(by 074 * calling recoverFileLease), and creating a new wal writer. And we will also get the list of all 075 * the old wal files. 076 * <p/> 077 * FIXME: notice that the current recover lease implementation is problematic, it can not deal with 078 * the races if there are two master both wants to acquire the lease... 079 * <p/> 080 * In {@link #load(ProcedureLoader)} method, we will load all the active procedures. See the 081 * comments of this method for more details. 082 * <p/> 083 * The actual logging way is a bit like our FileSystem based WAL implementation as RS side. There is 084 * a {@link #slots}, which is more like the ring buffer, and in the insert, update and delete 085 * methods we will put thing into the {@link #slots} and wait. And there is a background sync 086 * thread(see the {@link #syncLoop()} method) which get data from the {@link #slots} and write them 087 * to the FileSystem, and notify the caller that we have finished. 088 * <p/> 089 * TODO: try using disruptor to increase performance and simplify the logic? 090 * <p/> 091 * The {@link #storeTracker} keeps track of the modified procedures in the newest wal file, which is 092 * also the one being written currently. And the deleted bits in it are for all the procedures, not 093 * only the ones in the newest wal file. And when rolling a log, we will first store it in the 094 * trailer of the current wal file, and then reset its modified bits, so that it can start to track 095 * the modified procedures for the new wal file. 096 * <p/> 097 * The {@link #holdingCleanupTracker} is used to test whether we are safe to delete the oldest wal 098 * file. When there are log rolling and there are more than 1 wal files, we will make use of it. It 099 * will first be initialized to the oldest file's tracker(which is stored in the trailer), using the 100 * method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it 101 * with the tracker of every newer wal files, using the 102 * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}. 103 * If we find out 104 * that all the modified procedures for the oldest wal file are modified or deleted in newer wal 105 * files, then we can delete it. This is because that, every time we call 106 * {@link ProcedureStore#insert(Procedure[])} or {@link ProcedureStore#update(Procedure)}, we will 107 * persist the full state of a Procedure, so the earlier wal records for this procedure can all be 108 * deleted. 109 * @see ProcedureWALPrettyPrinter for printing content of a single WAL. 110 * @see #main(String[]) to parse a directory of MasterWALProcs. 111 * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we 112 * use the new region based procedure store. 113 */ 114@Deprecated 115@InterfaceAudience.Private 116public class WALProcedureStore extends ProcedureStoreBase { 117 private static final Logger LOG = LoggerFactory.getLogger(WALProcedureStore.class); 118 public static final String LOG_PREFIX = "pv2-"; 119 /** Used to construct the name of the log directory for master procedures */ 120 public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs"; 121 122 123 public static final String WAL_COUNT_WARN_THRESHOLD_CONF_KEY = 124 "hbase.procedure.store.wal.warn.threshold"; 125 private static final int DEFAULT_WAL_COUNT_WARN_THRESHOLD = 10; 126 127 public static final String EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY = 128 "hbase.procedure.store.wal.exec.cleanup.on.load"; 129 private static final boolean DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY = true; 130 131 public static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY = 132 "hbase.procedure.store.wal.max.retries.before.roll"; 133 private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3; 134 135 public static final String WAIT_BEFORE_ROLL_CONF_KEY = 136 "hbase.procedure.store.wal.wait.before.roll"; 137 private static final int DEFAULT_WAIT_BEFORE_ROLL = 500; 138 139 public static final String ROLL_RETRIES_CONF_KEY = 140 "hbase.procedure.store.wal.max.roll.retries"; 141 private static final int DEFAULT_ROLL_RETRIES = 3; 142 143 public static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY = 144 "hbase.procedure.store.wal.sync.failure.roll.max"; 145 private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3; 146 147 public static final String PERIODIC_ROLL_CONF_KEY = 148 "hbase.procedure.store.wal.periodic.roll.msec"; 149 private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000; // 1h 150 151 public static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec"; 152 private static final int DEFAULT_SYNC_WAIT_MSEC = 100; 153 154 public static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync"; 155 private static final boolean DEFAULT_USE_HSYNC = true; 156 157 public static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold"; 158 private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M 159 160 public static final String STORE_WAL_SYNC_STATS_COUNT = 161 "hbase.procedure.store.wal.sync.stats.count"; 162 private static final int DEFAULT_SYNC_STATS_COUNT = 10; 163 164 private final LinkedList<ProcedureWALFile> logs = new LinkedList<>(); 165 private final ProcedureStoreTracker holdingCleanupTracker = new ProcedureStoreTracker(); 166 private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker(); 167 private final ReentrantLock lock = new ReentrantLock(); 168 private final Condition waitCond = lock.newCondition(); 169 private final Condition slotCond = lock.newCondition(); 170 private final Condition syncCond = lock.newCondition(); 171 172 private final LeaseRecovery leaseRecovery; 173 private final Configuration conf; 174 private final FileSystem fs; 175 private final Path walDir; 176 private final Path walArchiveDir; 177 private final boolean enforceStreamCapability; 178 179 private final AtomicReference<Throwable> syncException = new AtomicReference<>(); 180 private final AtomicBoolean loading = new AtomicBoolean(true); 181 private final AtomicBoolean inSync = new AtomicBoolean(false); 182 private final AtomicLong totalSynced = new AtomicLong(0); 183 private final AtomicLong lastRollTs = new AtomicLong(0); 184 private final AtomicLong syncId = new AtomicLong(0); 185 186 private LinkedTransferQueue<ByteSlot> slotsCache = null; 187 private Set<ProcedureWALFile> corruptedLogs = null; 188 private FSDataOutputStream stream = null; 189 private int runningProcCount = 1; 190 private long flushLogId = 0; 191 private int syncMaxSlot = 1; 192 private int slotIndex = 0; 193 private Thread syncThread; 194 private ByteSlot[] slots; 195 196 private int walCountWarnThreshold; 197 private int maxRetriesBeforeRoll; 198 private int maxSyncFailureRoll; 199 private int waitBeforeRoll; 200 private int rollRetries; 201 private int periodicRollMsec; 202 private long rollThreshold; 203 private boolean useHsync; 204 private int syncWaitMsec; 205 206 // Variables used for UI display 207 private CircularFifoQueue<SyncMetrics> syncMetricsQueue; 208 209 public static class SyncMetrics { 210 private long timestamp; 211 private long syncWaitMs; 212 private long totalSyncedBytes; 213 private int syncedEntries; 214 private float syncedPerSec; 215 216 public long getTimestamp() { 217 return timestamp; 218 } 219 220 public long getSyncWaitMs() { 221 return syncWaitMs; 222 } 223 224 public long getTotalSyncedBytes() { 225 return totalSyncedBytes; 226 } 227 228 public long getSyncedEntries() { 229 return syncedEntries; 230 } 231 232 public float getSyncedPerSec() { 233 return syncedPerSec; 234 } 235 } 236 237 public WALProcedureStore(Configuration conf, LeaseRecovery leaseRecovery) throws IOException { 238 this(conf, new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR), 239 new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_OLDLOGDIR_NAME), 240 leaseRecovery); 241 } 242 243 @VisibleForTesting 244 public WALProcedureStore(final Configuration conf, final Path walDir, final Path walArchiveDir, 245 final LeaseRecovery leaseRecovery) throws IOException { 246 this.conf = conf; 247 this.leaseRecovery = leaseRecovery; 248 this.walDir = walDir; 249 this.walArchiveDir = walArchiveDir; 250 this.fs = CommonFSUtils.getWALFileSystem(conf); 251 this.enforceStreamCapability = conf.getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, 252 true); 253 254 // Create the log directory for the procedure store 255 if (!fs.exists(walDir)) { 256 if (!fs.mkdirs(walDir)) { 257 throw new IOException("Unable to mkdir " + walDir); 258 } 259 } 260 // Now that it exists, set the log policy 261 String storagePolicy = 262 conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); 263 CommonFSUtils.setStoragePolicy(fs, walDir, storagePolicy); 264 265 // Create archive dir up front. Rename won't work w/o it up on HDFS. 266 if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) { 267 if (this.fs.mkdirs(this.walArchiveDir)) { 268 LOG.debug("Created Procedure Store WAL archive dir {}", this.walArchiveDir); 269 } else { 270 LOG.warn("Failed create of {}", this.walArchiveDir); 271 } 272 } 273 } 274 275 @Override 276 public void start(int numSlots) throws IOException { 277 if (!setRunning(true)) { 278 return; 279 } 280 281 // Init buffer slots 282 loading.set(true); 283 runningProcCount = numSlots; 284 syncMaxSlot = numSlots; 285 slots = new ByteSlot[numSlots]; 286 slotsCache = new LinkedTransferQueue<>(); 287 while (slotsCache.size() < numSlots) { 288 slotsCache.offer(new ByteSlot()); 289 } 290 291 // Tunings 292 walCountWarnThreshold = 293 conf.getInt(WAL_COUNT_WARN_THRESHOLD_CONF_KEY, DEFAULT_WAL_COUNT_WARN_THRESHOLD); 294 maxRetriesBeforeRoll = 295 conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL); 296 maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL); 297 waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL); 298 rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES); 299 rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD); 300 periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL); 301 syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC); 302 useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC); 303 304 // WebUI 305 syncMetricsQueue = new CircularFifoQueue<>( 306 conf.getInt(STORE_WAL_SYNC_STATS_COUNT, DEFAULT_SYNC_STATS_COUNT)); 307 308 // Init sync thread 309 syncThread = new Thread("WALProcedureStoreSyncThread") { 310 @Override 311 public void run() { 312 try { 313 syncLoop(); 314 } catch (Throwable e) { 315 LOG.error("Got an exception from the sync-loop", e); 316 if (!isSyncAborted()) { 317 sendAbortProcessSignal(); 318 } 319 } 320 } 321 }; 322 syncThread.start(); 323 } 324 325 @Override 326 public void stop(final boolean abort) { 327 if (!setRunning(false)) { 328 return; 329 } 330 331 LOG.info("Stopping the WAL Procedure Store, isAbort=" + abort + 332 (isSyncAborted() ? " (self aborting)" : "")); 333 sendStopSignal(); 334 if (!isSyncAborted()) { 335 try { 336 while (syncThread.isAlive()) { 337 sendStopSignal(); 338 syncThread.join(250); 339 } 340 } catch (InterruptedException e) { 341 LOG.warn("join interrupted", e); 342 Thread.currentThread().interrupt(); 343 } 344 } 345 346 // Close the writer 347 closeCurrentLogStream(abort); 348 349 // Close the old logs 350 // they should be already closed, this is just in case the load fails 351 // and we call start() and then stop() 352 for (ProcedureWALFile log: logs) { 353 log.close(); 354 } 355 logs.clear(); 356 loading.set(true); 357 } 358 359 private void sendStopSignal() { 360 if (lock.tryLock()) { 361 try { 362 waitCond.signalAll(); 363 syncCond.signalAll(); 364 } finally { 365 lock.unlock(); 366 } 367 } 368 } 369 370 @Override 371 public int getNumThreads() { 372 return slots == null ? 0 : slots.length; 373 } 374 375 @Override 376 public int setRunningProcedureCount(final int count) { 377 this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length; 378 return this.runningProcCount; 379 } 380 381 public ProcedureStoreTracker getStoreTracker() { 382 return storeTracker; 383 } 384 385 public ArrayList<ProcedureWALFile> getActiveLogs() { 386 lock.lock(); 387 try { 388 return new ArrayList<>(logs); 389 } finally { 390 lock.unlock(); 391 } 392 } 393 394 public Set<ProcedureWALFile> getCorruptedLogs() { 395 return corruptedLogs; 396 } 397 398 @Override 399 public void recoverLease() throws IOException { 400 lock.lock(); 401 try { 402 LOG.debug("Starting WAL Procedure Store lease recovery"); 403 boolean afterFirstAttempt = false; 404 while (isRunning()) { 405 // Don't sleep before first attempt 406 if (afterFirstAttempt) { 407 LOG.trace("Sleep {} ms after first lease recovery attempt.", 408 waitBeforeRoll); 409 Threads.sleepWithoutInterrupt(waitBeforeRoll); 410 } else { 411 afterFirstAttempt = true; 412 } 413 FileStatus[] oldLogs = getLogFiles(); 414 // Get Log-MaxID and recover lease on old logs 415 try { 416 flushLogId = initOldLogs(oldLogs); 417 } catch (FileNotFoundException e) { 418 LOG.warn("Someone else is active and deleted logs. retrying.", e); 419 continue; 420 } 421 422 // Create new state-log 423 if (!rollWriter(flushLogId + 1)) { 424 // someone else has already created this log 425 LOG.debug("Someone else has already created log {}. Retrying.", flushLogId); 426 continue; 427 } 428 429 // We have the lease on the log 430 oldLogs = getLogFiles(); 431 if (getMaxLogId(oldLogs) > flushLogId) { 432 LOG.debug("Someone else created new logs. Expected maxLogId < {}", flushLogId); 433 logs.getLast().removeFile(this.walArchiveDir); 434 continue; 435 } 436 437 LOG.debug("Lease acquired for flushLogId={}", flushLogId); 438 break; 439 } 440 } finally { 441 lock.unlock(); 442 } 443 } 444 445 @Override 446 public void load(ProcedureLoader loader) throws IOException { 447 lock.lock(); 448 try { 449 if (logs.isEmpty()) { 450 throw new IllegalStateException("recoverLease() must be called before loading data"); 451 } 452 453 // Nothing to do, If we have only the current log. 454 if (logs.size() == 1) { 455 LOG.debug("No state logs to replay."); 456 loader.setMaxProcId(0); 457 loading.set(false); 458 return; 459 } 460 461 // Load the old logs 462 Iterator<ProcedureWALFile> it = logs.descendingIterator(); 463 it.next(); // Skip the current log 464 465 ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() { 466 467 @Override 468 public void setMaxProcId(long maxProcId) { 469 loader.setMaxProcId(maxProcId); 470 } 471 472 @Override 473 public void load(ProcedureIterator procIter) throws IOException { 474 loader.load(procIter); 475 } 476 477 @Override 478 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 479 loader.handleCorrupted(procIter); 480 } 481 482 @Override 483 public void markCorruptedWAL(ProcedureWALFile log, IOException e) { 484 if (corruptedLogs == null) { 485 corruptedLogs = new HashSet<>(); 486 } 487 corruptedLogs.add(log); 488 // TODO: sideline corrupted log 489 } 490 }); 491 // if we fail when loading, we should prevent persisting the storeTracker later in the stop 492 // method. As it may happen that, we have finished constructing the modified and deleted bits, 493 // but before we call resetModified, we fail, then if we persist the storeTracker then when 494 // restarting, we will consider that all procedures have been included in this file and delete 495 // all the previous files. Obviously this not correct. So here we will only set loading to 496 // false when we successfully loaded all the procedures, and when closing we will skip 497 // persisting the store tracker. And also, this will prevent the sync thread to do 498 // periodicRoll, where we may also clean old logs. 499 loading.set(false); 500 // try to cleanup inactive wals and complete the operation 501 buildHoldingCleanupTracker(); 502 tryCleanupLogsOnLoad(); 503 } finally { 504 lock.unlock(); 505 } 506 } 507 508 private void tryCleanupLogsOnLoad() { 509 // nothing to cleanup. 510 if (logs.size() <= 1) { 511 return; 512 } 513 514 // the config says to not cleanup wals on load. 515 if (!conf.getBoolean(EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, 516 DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY)) { 517 LOG.debug("WALs cleanup on load is not enabled: " + getActiveLogs()); 518 return; 519 } 520 521 try { 522 periodicRoll(); 523 } catch (IOException e) { 524 LOG.warn("Unable to cleanup logs on load: " + e.getMessage(), e); 525 } 526 } 527 528 @Override 529 public void insert(Procedure<?> proc, Procedure<?>[] subprocs) { 530 if (LOG.isTraceEnabled()) { 531 LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs)); 532 } 533 534 ByteSlot slot = acquireSlot(); 535 try { 536 // Serialize the insert 537 long[] subProcIds = null; 538 if (subprocs != null) { 539 ProcedureWALFormat.writeInsert(slot, proc, subprocs); 540 subProcIds = new long[subprocs.length]; 541 for (int i = 0; i < subprocs.length; ++i) { 542 subProcIds[i] = subprocs[i].getProcId(); 543 } 544 } else { 545 assert !proc.hasParent(); 546 ProcedureWALFormat.writeInsert(slot, proc); 547 } 548 549 // Push the transaction data and wait until it is persisted 550 pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds); 551 } catch (IOException e) { 552 // We are not able to serialize the procedure. 553 // this is a code error, and we are not able to go on. 554 LOG.error(HBaseMarkers.FATAL, "Unable to serialize one of the procedure: proc=" + 555 proc + ", subprocs=" + Arrays.toString(subprocs), e); 556 throw new RuntimeException(e); 557 } finally { 558 releaseSlot(slot); 559 } 560 } 561 562 @Override 563 public void insert(Procedure<?>[] procs) { 564 if (LOG.isTraceEnabled()) { 565 LOG.trace("Insert " + Arrays.toString(procs)); 566 } 567 568 ByteSlot slot = acquireSlot(); 569 try { 570 // Serialize the insert 571 long[] procIds = new long[procs.length]; 572 for (int i = 0; i < procs.length; ++i) { 573 assert !procs[i].hasParent(); 574 procIds[i] = procs[i].getProcId(); 575 ProcedureWALFormat.writeInsert(slot, procs[i]); 576 } 577 578 // Push the transaction data and wait until it is persisted 579 pushData(PushType.INSERT, slot, Procedure.NO_PROC_ID, procIds); 580 } catch (IOException e) { 581 // We are not able to serialize the procedure. 582 // this is a code error, and we are not able to go on. 583 LOG.error(HBaseMarkers.FATAL, "Unable to serialize one of the procedure: " + 584 Arrays.toString(procs), e); 585 throw new RuntimeException(e); 586 } finally { 587 releaseSlot(slot); 588 } 589 } 590 591 @Override 592 public void update(Procedure<?> proc) { 593 if (LOG.isTraceEnabled()) { 594 LOG.trace("Update " + proc); 595 } 596 597 ByteSlot slot = acquireSlot(); 598 try { 599 // Serialize the update 600 ProcedureWALFormat.writeUpdate(slot, proc); 601 602 // Push the transaction data and wait until it is persisted 603 pushData(PushType.UPDATE, slot, proc.getProcId(), null); 604 } catch (IOException e) { 605 // We are not able to serialize the procedure. 606 // this is a code error, and we are not able to go on. 607 LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + proc, e); 608 throw new RuntimeException(e); 609 } finally { 610 releaseSlot(slot); 611 } 612 } 613 614 @Override 615 public void delete(long procId) { 616 LOG.trace("Delete {}", procId); 617 ByteSlot slot = acquireSlot(); 618 try { 619 // Serialize the delete 620 ProcedureWALFormat.writeDelete(slot, procId); 621 622 // Push the transaction data and wait until it is persisted 623 pushData(PushType.DELETE, slot, procId, null); 624 } catch (IOException e) { 625 // We are not able to serialize the procedure. 626 // this is a code error, and we are not able to go on. 627 LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + procId, e); 628 throw new RuntimeException(e); 629 } finally { 630 releaseSlot(slot); 631 } 632 } 633 634 @Override 635 public void delete(Procedure<?> proc, long[] subProcIds) { 636 assert proc != null : "expected a non-null procedure"; 637 assert subProcIds != null && subProcIds.length > 0 : "expected subProcIds"; 638 if (LOG.isTraceEnabled()) { 639 LOG.trace("Update " + proc + " and Delete " + Arrays.toString(subProcIds)); 640 } 641 642 ByteSlot slot = acquireSlot(); 643 try { 644 // Serialize the delete 645 ProcedureWALFormat.writeDelete(slot, proc, subProcIds); 646 647 // Push the transaction data and wait until it is persisted 648 pushData(PushType.DELETE, slot, proc.getProcId(), subProcIds); 649 } catch (IOException e) { 650 // We are not able to serialize the procedure. 651 // this is a code error, and we are not able to go on. 652 LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + proc, e); 653 throw new RuntimeException(e); 654 } finally { 655 releaseSlot(slot); 656 } 657 } 658 659 @Override 660 public void delete(final long[] procIds, final int offset, final int count) { 661 if (count == 0) { 662 return; 663 } 664 665 if (offset == 0 && count == procIds.length) { 666 delete(procIds); 667 } else if (count == 1) { 668 delete(procIds[offset]); 669 } else { 670 delete(Arrays.copyOfRange(procIds, offset, offset + count)); 671 } 672 } 673 674 private void delete(long[] procIds) { 675 if (LOG.isTraceEnabled()) { 676 LOG.trace("Delete " + Arrays.toString(procIds)); 677 } 678 679 final ByteSlot slot = acquireSlot(); 680 try { 681 // Serialize the delete 682 for (int i = 0; i < procIds.length; ++i) { 683 ProcedureWALFormat.writeDelete(slot, procIds[i]); 684 } 685 686 // Push the transaction data and wait until it is persisted 687 pushData(PushType.DELETE, slot, Procedure.NO_PROC_ID, procIds); 688 } catch (IOException e) { 689 // We are not able to serialize the procedure. 690 // this is a code error, and we are not able to go on. 691 LOG.error("Unable to serialize the procedures: " + Arrays.toString(procIds), e); 692 throw new RuntimeException(e); 693 } finally { 694 releaseSlot(slot); 695 } 696 } 697 698 private ByteSlot acquireSlot() { 699 ByteSlot slot = slotsCache.poll(); 700 return slot != null ? slot : new ByteSlot(); 701 } 702 703 private void releaseSlot(final ByteSlot slot) { 704 slot.reset(); 705 slotsCache.offer(slot); 706 } 707 708 private enum PushType { INSERT, UPDATE, DELETE }; 709 710 private long pushData(final PushType type, final ByteSlot slot, 711 final long procId, final long[] subProcIds) { 712 if (!isRunning()) { 713 throw new RuntimeException("the store must be running before inserting data"); 714 } 715 if (logs.isEmpty()) { 716 throw new RuntimeException("recoverLease() must be called before inserting data"); 717 } 718 719 long logId = -1; 720 lock.lock(); 721 try { 722 // Wait for the sync to be completed 723 while (true) { 724 if (!isRunning()) { 725 throw new RuntimeException("store no longer running"); 726 } else if (isSyncAborted()) { 727 throw new RuntimeException("sync aborted", syncException.get()); 728 } else if (inSync.get()) { 729 syncCond.await(); 730 } else if (slotIndex >= syncMaxSlot) { 731 slotCond.signal(); 732 syncCond.await(); 733 } else { 734 break; 735 } 736 } 737 738 final long pushSyncId = syncId.get(); 739 updateStoreTracker(type, procId, subProcIds); 740 slots[slotIndex++] = slot; 741 logId = flushLogId; 742 743 // Notify that there is new data 744 if (slotIndex == 1) { 745 waitCond.signal(); 746 } 747 748 // Notify that the slots are full 749 if (slotIndex == syncMaxSlot) { 750 waitCond.signal(); 751 slotCond.signal(); 752 } 753 754 while (pushSyncId == syncId.get() && isRunning()) { 755 syncCond.await(); 756 } 757 } catch (InterruptedException e) { 758 Thread.currentThread().interrupt(); 759 sendAbortProcessSignal(); 760 throw new RuntimeException(e); 761 } finally { 762 lock.unlock(); 763 if (isSyncAborted()) { 764 throw new RuntimeException("sync aborted", syncException.get()); 765 } 766 } 767 return logId; 768 } 769 770 private void updateStoreTracker(final PushType type, 771 final long procId, final long[] subProcIds) { 772 switch (type) { 773 case INSERT: 774 if (subProcIds == null) { 775 storeTracker.insert(procId); 776 } else if (procId == Procedure.NO_PROC_ID) { 777 storeTracker.insert(subProcIds); 778 } else { 779 storeTracker.insert(procId, subProcIds); 780 holdingCleanupTracker.setDeletedIfModified(procId); 781 } 782 break; 783 case UPDATE: 784 storeTracker.update(procId); 785 holdingCleanupTracker.setDeletedIfModified(procId); 786 break; 787 case DELETE: 788 if (subProcIds != null && subProcIds.length > 0) { 789 storeTracker.delete(subProcIds); 790 holdingCleanupTracker.setDeletedIfModified(subProcIds); 791 } else { 792 storeTracker.delete(procId); 793 holdingCleanupTracker.setDeletedIfModified(procId); 794 } 795 break; 796 default: 797 throw new RuntimeException("invalid push type " + type); 798 } 799 } 800 801 private boolean isSyncAborted() { 802 return syncException.get() != null; 803 } 804 805 private void syncLoop() throws Throwable { 806 long totalSyncedToStore = 0; 807 inSync.set(false); 808 lock.lock(); 809 try { 810 while (isRunning()) { 811 try { 812 // Wait until new data is available 813 if (slotIndex == 0) { 814 if (!loading.get()) { 815 periodicRoll(); 816 } 817 818 if (LOG.isTraceEnabled()) { 819 float rollTsSec = getMillisFromLastRoll() / 1000.0f; 820 LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)", 821 StringUtils.humanSize(totalSynced.get()), 822 StringUtils.humanSize(totalSynced.get() / rollTsSec))); 823 } 824 825 waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS); 826 if (slotIndex == 0) { 827 // no data.. probably a stop() or a periodic roll 828 continue; 829 } 830 } 831 // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing 832 syncMaxSlot = runningProcCount; 833 assert syncMaxSlot > 0 : "unexpected syncMaxSlot=" + syncMaxSlot; 834 final long syncWaitSt = System.currentTimeMillis(); 835 if (slotIndex != syncMaxSlot) { 836 slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS); 837 } 838 839 final long currentTs = System.currentTimeMillis(); 840 final long syncWaitMs = currentTs - syncWaitSt; 841 final float rollSec = getMillisFromLastRoll() / 1000.0f; 842 final float syncedPerSec = totalSyncedToStore / rollSec; 843 if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < syncMaxSlot)) { 844 LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)", 845 StringUtils.humanTimeDiff(syncWaitMs), slotIndex, 846 StringUtils.humanSize(totalSyncedToStore), 847 StringUtils.humanSize(syncedPerSec))); 848 } 849 850 // update webui circular buffers (TODO: get rid of allocations) 851 final SyncMetrics syncMetrics = new SyncMetrics(); 852 syncMetrics.timestamp = currentTs; 853 syncMetrics.syncWaitMs = syncWaitMs; 854 syncMetrics.syncedEntries = slotIndex; 855 syncMetrics.totalSyncedBytes = totalSyncedToStore; 856 syncMetrics.syncedPerSec = syncedPerSec; 857 syncMetricsQueue.add(syncMetrics); 858 859 // sync 860 inSync.set(true); 861 long slotSize = syncSlots(); 862 logs.getLast().addToSize(slotSize); 863 totalSyncedToStore = totalSynced.addAndGet(slotSize); 864 slotIndex = 0; 865 inSync.set(false); 866 syncId.incrementAndGet(); 867 } catch (InterruptedException e) { 868 Thread.currentThread().interrupt(); 869 syncException.compareAndSet(null, e); 870 sendAbortProcessSignal(); 871 throw e; 872 } catch (Throwable t) { 873 syncException.compareAndSet(null, t); 874 sendAbortProcessSignal(); 875 throw t; 876 } finally { 877 syncCond.signalAll(); 878 } 879 } 880 } finally { 881 lock.unlock(); 882 } 883 } 884 885 public ArrayList<SyncMetrics> getSyncMetrics() { 886 lock.lock(); 887 try { 888 return new ArrayList<>(syncMetricsQueue); 889 } finally { 890 lock.unlock(); 891 } 892 } 893 894 private long syncSlots() throws Throwable { 895 int retry = 0; 896 int logRolled = 0; 897 long totalSynced = 0; 898 do { 899 try { 900 totalSynced = syncSlots(stream, slots, 0, slotIndex); 901 break; 902 } catch (Throwable e) { 903 LOG.warn("unable to sync slots, retry=" + retry); 904 if (++retry >= maxRetriesBeforeRoll) { 905 if (logRolled >= maxSyncFailureRoll && isRunning()) { 906 LOG.error("Sync slots after log roll failed, abort.", e); 907 throw e; 908 } 909 910 if (!rollWriterWithRetries()) { 911 throw e; 912 } 913 914 logRolled++; 915 retry = 0; 916 } 917 } 918 } while (isRunning()); 919 return totalSynced; 920 } 921 922 protected long syncSlots(final FSDataOutputStream stream, final ByteSlot[] slots, 923 final int offset, final int count) throws IOException { 924 long totalSynced = 0; 925 for (int i = 0; i < count; ++i) { 926 final ByteSlot data = slots[offset + i]; 927 data.writeTo(stream); 928 totalSynced += data.size(); 929 } 930 931 syncStream(stream); 932 sendPostSyncSignal(); 933 934 if (LOG.isTraceEnabled()) { 935 LOG.trace("Sync slots=" + count + '/' + syncMaxSlot + 936 ", flushed=" + StringUtils.humanSize(totalSynced)); 937 } 938 return totalSynced; 939 } 940 941 protected void syncStream(final FSDataOutputStream stream) throws IOException { 942 if (useHsync) { 943 stream.hsync(); 944 } else { 945 stream.hflush(); 946 } 947 } 948 949 private boolean rollWriterWithRetries() { 950 for (int i = 0; i < rollRetries && isRunning(); ++i) { 951 if (i > 0) { 952 Threads.sleepWithoutInterrupt(waitBeforeRoll * i); 953 } 954 955 try { 956 if (rollWriter()) { 957 return true; 958 } 959 } catch (IOException e) { 960 LOG.warn("Unable to roll the log, attempt=" + (i + 1), e); 961 } 962 } 963 LOG.error(HBaseMarkers.FATAL, "Unable to roll the log"); 964 return false; 965 } 966 967 private boolean tryRollWriter() { 968 try { 969 return rollWriter(); 970 } catch (IOException e) { 971 LOG.warn("Unable to roll the log", e); 972 return false; 973 } 974 } 975 976 public long getMillisToNextPeriodicRoll() { 977 if (lastRollTs.get() > 0 && periodicRollMsec > 0) { 978 return periodicRollMsec - getMillisFromLastRoll(); 979 } 980 return Long.MAX_VALUE; 981 } 982 983 public long getMillisFromLastRoll() { 984 return (System.currentTimeMillis() - lastRollTs.get()); 985 } 986 987 @VisibleForTesting 988 void periodicRollForTesting() throws IOException { 989 lock.lock(); 990 try { 991 periodicRoll(); 992 } finally { 993 lock.unlock(); 994 } 995 } 996 997 @VisibleForTesting 998 public boolean rollWriterForTesting() throws IOException { 999 lock.lock(); 1000 try { 1001 return rollWriter(); 1002 } finally { 1003 lock.unlock(); 1004 } 1005 } 1006 1007 @VisibleForTesting 1008 void removeInactiveLogsForTesting() throws Exception { 1009 lock.lock(); 1010 try { 1011 removeInactiveLogs(); 1012 } finally { 1013 lock.unlock(); 1014 } 1015 } 1016 1017 private void periodicRoll() throws IOException { 1018 if (storeTracker.isEmpty()) { 1019 LOG.trace("no active procedures"); 1020 tryRollWriter(); 1021 removeAllLogs(flushLogId - 1, "no active procedures"); 1022 } else { 1023 if (storeTracker.isAllModified()) { 1024 LOG.trace("all the active procedures are in the latest log"); 1025 removeAllLogs(flushLogId - 1, "all the active procedures are in the latest log"); 1026 } 1027 1028 // if the log size has exceeded the roll threshold 1029 // or the periodic roll timeout is expired, try to roll the wal. 1030 if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) { 1031 tryRollWriter(); 1032 } 1033 1034 removeInactiveLogs(); 1035 } 1036 } 1037 1038 private boolean rollWriter() throws IOException { 1039 if (!isRunning()) { 1040 return false; 1041 } 1042 1043 // Create new state-log 1044 if (!rollWriter(flushLogId + 1)) { 1045 LOG.warn("someone else has already created log {}", flushLogId); 1046 return false; 1047 } 1048 1049 // We have the lease on the log, 1050 // but we should check if someone else has created new files 1051 if (getMaxLogId(getLogFiles()) > flushLogId) { 1052 LOG.warn("Someone else created new logs. Expected maxLogId < {}", flushLogId); 1053 logs.getLast().removeFile(this.walArchiveDir); 1054 return false; 1055 } 1056 1057 // We have the lease on the log 1058 return true; 1059 } 1060 1061 @VisibleForTesting 1062 boolean rollWriter(long logId) throws IOException { 1063 assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId; 1064 assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked(); 1065 1066 ProcedureWALHeader header = ProcedureWALHeader.newBuilder() 1067 .setVersion(ProcedureWALFormat.HEADER_VERSION) 1068 .setType(ProcedureWALFormat.LOG_TYPE_STREAM) 1069 .setMinProcId(storeTracker.getActiveMinProcId()) 1070 .setLogId(logId) 1071 .build(); 1072 1073 FSDataOutputStream newStream = null; 1074 Path newLogFile = null; 1075 long startPos = -1; 1076 newLogFile = getLogFilePath(logId); 1077 try { 1078 newStream = CommonFSUtils.createForWal(fs, newLogFile, false); 1079 } catch (FileAlreadyExistsException e) { 1080 LOG.error("Log file with id={} already exists", logId, e); 1081 return false; 1082 } catch (RemoteException re) { 1083 LOG.warn("failed to create log file with id={}", logId, re); 1084 return false; 1085 } 1086 // After we create the stream but before we attempt to use it at all 1087 // ensure that we can provide the level of data safety we're configured 1088 // to provide. 1089 final String durability = useHsync ? StreamCapabilities.HSYNC : StreamCapabilities.HFLUSH; 1090 if (enforceStreamCapability && !newStream.hasCapability(durability)) { 1091 throw new IllegalStateException("The procedure WAL relies on the ability to " + durability + 1092 " for proper operation during component failures, but the underlying filesystem does " + 1093 "not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY + 1094 "' to set the desired level of robustness and ensure the config value of '" + 1095 CommonFSUtils.HBASE_WAL_DIR + "' points to a FileSystem mount that can provide it."); 1096 } 1097 try { 1098 ProcedureWALFormat.writeHeader(newStream, header); 1099 startPos = newStream.getPos(); 1100 } catch (IOException ioe) { 1101 LOG.warn("Encountered exception writing header", ioe); 1102 newStream.close(); 1103 return false; 1104 } 1105 1106 closeCurrentLogStream(false); 1107 1108 storeTracker.resetModified(); 1109 stream = newStream; 1110 flushLogId = logId; 1111 totalSynced.set(0); 1112 long rollTs = System.currentTimeMillis(); 1113 lastRollTs.set(rollTs); 1114 logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos, rollTs)); 1115 1116 // if it's the first next WAL being added, build the holding cleanup tracker 1117 if (logs.size() == 2) { 1118 buildHoldingCleanupTracker(); 1119 } else if (logs.size() > walCountWarnThreshold) { 1120 LOG.warn("procedure WALs count={} above the warning threshold {}. check running procedures" + 1121 " to see if something is stuck.", logs.size(), walCountWarnThreshold); 1122 // This is just like what we have done at RS side when there are too many wal files. For RS, 1123 // if there are too many wal files, we will find out the wal entries in the oldest file, and 1124 // tell the upper layer to flush these regions so the wal entries will be useless and then we 1125 // can delete the wal file. For WALProcedureStore, the assumption is that, if all the 1126 // procedures recorded in a proc wal file are modified or deleted in a new proc wal file, then 1127 // we are safe to delete it. So here if there are too many proc wal files, we will find out 1128 // the procedure ids in the oldest file, which are neither modified nor deleted in newer proc 1129 // wal files, and tell upper layer to update the state of these procedures to the newest proc 1130 // wal file(by calling ProcedureStore.update), then we are safe to delete the oldest proc wal 1131 // file. 1132 sendForceUpdateSignal(holdingCleanupTracker.getAllActiveProcIds()); 1133 } 1134 1135 LOG.info("Rolled new Procedure Store WAL, id={}", logId); 1136 return true; 1137 } 1138 1139 private void closeCurrentLogStream(boolean abort) { 1140 if (stream == null || logs.isEmpty()) { 1141 return; 1142 } 1143 1144 try { 1145 ProcedureWALFile log = logs.getLast(); 1146 // If the loading flag is true, it usually means that we fail when loading procedures, so we 1147 // should not persist the store tracker, as its state may not be correct. 1148 if (!loading.get()) { 1149 log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId()); 1150 log.updateLocalTracker(storeTracker); 1151 if (!abort) { 1152 long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker); 1153 log.addToSize(trailerSize); 1154 } 1155 } 1156 } catch (IOException | FSError e) { 1157 LOG.warn("Unable to write the trailer", e); 1158 } 1159 try { 1160 stream.close(); 1161 } catch (IOException | FSError e) { 1162 LOG.error("Unable to close the stream", e); 1163 } 1164 stream = null; 1165 } 1166 1167 // ========================================================================== 1168 // Log Files cleaner helpers 1169 // ========================================================================== 1170 private void removeInactiveLogs() throws IOException { 1171 // We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'. 1172 // once there is nothing olding the oldest WAL we can remove it. 1173 while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) { 1174 LOG.info("Remove the oldest log {}", logs.getFirst()); 1175 removeLogFile(logs.getFirst(), walArchiveDir); 1176 buildHoldingCleanupTracker(); 1177 } 1178 1179 // TODO: In case we are holding up a lot of logs for long time we should 1180 // rewrite old procedures (in theory parent procs) to the new WAL. 1181 } 1182 1183 private void buildHoldingCleanupTracker() { 1184 if (logs.size() <= 1) { 1185 // we only have one wal, so nothing to do 1186 holdingCleanupTracker.reset(); 1187 return; 1188 } 1189 1190 // compute the holding tracker. 1191 // - the first WAL is used for the 'updates' 1192 // - the global tracker will be used to determine whether a procedure has been deleted 1193 // - other trackers will be used to determine whether a procedure has been updated, as a deleted 1194 // procedure can always be detected by checking the global tracker, we can save the deleted 1195 // checks when applying other trackers 1196 holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true); 1197 holdingCleanupTracker.setDeletedIfDeletedByThem(storeTracker); 1198 // the logs is a linked list, so avoid calling get(index) on it. 1199 Iterator<ProcedureWALFile> iter = logs.iterator(); 1200 // skip the tracker for the first file when creating the iterator. 1201 iter.next(); 1202 ProcedureStoreTracker tracker = iter.next().getTracker(); 1203 // testing iter.hasNext after calling iter.next to skip applying the tracker for last file, 1204 // which is just the storeTracker above. 1205 while (iter.hasNext()) { 1206 holdingCleanupTracker.setDeletedIfModifiedInBoth(tracker); 1207 if (holdingCleanupTracker.isEmpty()) { 1208 break; 1209 } 1210 tracker = iter.next().getTracker(); 1211 } 1212 } 1213 1214 /** 1215 * Remove all logs with logId <= {@code lastLogId}. 1216 */ 1217 private void removeAllLogs(long lastLogId, String why) { 1218 if (logs.size() <= 1) { 1219 return; 1220 } 1221 1222 LOG.info("Remove all state logs with ID less than {}, since {}", lastLogId, why); 1223 1224 boolean removed = false; 1225 while (logs.size() > 1) { 1226 ProcedureWALFile log = logs.getFirst(); 1227 if (lastLogId < log.getLogId()) { 1228 break; 1229 } 1230 removeLogFile(log, walArchiveDir); 1231 removed = true; 1232 } 1233 1234 if (removed) { 1235 buildHoldingCleanupTracker(); 1236 } 1237 } 1238 1239 private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) { 1240 try { 1241 LOG.trace("Removing log={}", log); 1242 log.removeFile(walArchiveDir); 1243 logs.remove(log); 1244 LOG.debug("Removed log={}, activeLogs={}", log, logs); 1245 assert logs.size() > 0 : "expected at least one log"; 1246 } catch (IOException e) { 1247 LOG.error("Unable to remove log: " + log, e); 1248 return false; 1249 } 1250 return true; 1251 } 1252 1253 // ========================================================================== 1254 // FileSystem Log Files helpers 1255 // ========================================================================== 1256 public Path getWALDir() { 1257 return this.walDir; 1258 } 1259 1260 @VisibleForTesting 1261 Path getWalArchiveDir() { 1262 return this.walArchiveDir; 1263 } 1264 1265 public FileSystem getFileSystem() { 1266 return this.fs; 1267 } 1268 1269 protected Path getLogFilePath(final long logId) throws IOException { 1270 return new Path(walDir, String.format(LOG_PREFIX + "%020d.log", logId)); 1271 } 1272 1273 private static long getLogIdFromName(final String name) { 1274 int end = name.lastIndexOf(".log"); 1275 int start = name.lastIndexOf('-') + 1; 1276 return Long.parseLong(name.substring(start, end)); 1277 } 1278 1279 private static final PathFilter WALS_PATH_FILTER = new PathFilter() { 1280 @Override 1281 public boolean accept(Path path) { 1282 String name = path.getName(); 1283 return name.startsWith(LOG_PREFIX) && name.endsWith(".log"); 1284 } 1285 }; 1286 1287 private static final Comparator<FileStatus> FILE_STATUS_ID_COMPARATOR = 1288 new Comparator<FileStatus>() { 1289 @Override 1290 public int compare(FileStatus a, FileStatus b) { 1291 final long aId = getLogIdFromName(a.getPath().getName()); 1292 final long bId = getLogIdFromName(b.getPath().getName()); 1293 return Long.compare(aId, bId); 1294 } 1295 }; 1296 1297 private FileStatus[] getLogFiles() throws IOException { 1298 try { 1299 FileStatus[] files = fs.listStatus(walDir, WALS_PATH_FILTER); 1300 Arrays.sort(files, FILE_STATUS_ID_COMPARATOR); 1301 return files; 1302 } catch (FileNotFoundException e) { 1303 LOG.warn("Log directory not found: " + e.getMessage()); 1304 return null; 1305 } 1306 } 1307 1308 /** 1309 * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort 1310 * the file set by log id. 1311 * @return Max-LogID of the specified log file set 1312 */ 1313 private static long getMaxLogId(FileStatus[] logFiles) { 1314 if (logFiles == null || logFiles.length == 0) { 1315 return 0L; 1316 } 1317 return getLogIdFromName(logFiles[logFiles.length - 1].getPath().getName()); 1318 } 1319 1320 /** 1321 * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort 1322 * the file set by log id. 1323 * @return Max-LogID of the specified log file set 1324 */ 1325 private long initOldLogs(FileStatus[] logFiles) throws IOException { 1326 if (logFiles == null || logFiles.length == 0) { 1327 return 0L; 1328 } 1329 long maxLogId = 0; 1330 for (int i = 0; i < logFiles.length; ++i) { 1331 final Path logPath = logFiles[i].getPath(); 1332 leaseRecovery.recoverFileLease(fs, logPath); 1333 if (!isRunning()) { 1334 throw new IOException("wal aborting"); 1335 } 1336 1337 maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName())); 1338 ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir); 1339 if (log != null) { 1340 this.logs.add(log); 1341 } 1342 } 1343 initTrackerFromOldLogs(); 1344 return maxLogId; 1345 } 1346 1347 /** 1348 * If last log's tracker is not null, use it as {@link #storeTracker}. Otherwise, set storeTracker 1349 * as partial, and let {@link ProcedureWALFormatReader} rebuild it using entries in the log. 1350 */ 1351 private void initTrackerFromOldLogs() { 1352 if (logs.isEmpty() || !isRunning()) { 1353 return; 1354 } 1355 ProcedureWALFile log = logs.getLast(); 1356 if (!log.getTracker().isPartial()) { 1357 storeTracker.resetTo(log.getTracker()); 1358 } else { 1359 storeTracker.reset(); 1360 storeTracker.setPartialFlag(true); 1361 } 1362 } 1363 1364 /** 1365 * Loads given log file and it's tracker. 1366 */ 1367 private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir) 1368 throws IOException { 1369 final ProcedureWALFile log = new ProcedureWALFile(fs, logFile); 1370 if (logFile.getLen() == 0) { 1371 LOG.warn("Remove uninitialized log: {}", logFile); 1372 log.removeFile(walArchiveDir); 1373 return null; 1374 } 1375 LOG.debug("Opening Pv2 {}", logFile); 1376 try { 1377 log.open(); 1378 } catch (ProcedureWALFormat.InvalidWALDataException e) { 1379 LOG.warn("Remove uninitialized log: {}", logFile, e); 1380 log.removeFile(walArchiveDir); 1381 return null; 1382 } catch (IOException e) { 1383 String msg = "Unable to read state log: " + logFile; 1384 LOG.error(msg, e); 1385 throw new IOException(msg, e); 1386 } 1387 1388 try { 1389 log.readTracker(); 1390 } catch (IOException e) { 1391 log.getTracker().reset(); 1392 log.getTracker().setPartialFlag(true); 1393 LOG.warn("Unable to read tracker for {}", log, e); 1394 } 1395 1396 log.close(); 1397 return log; 1398 } 1399 1400 /** 1401 * Parses a directory of WALs building up ProcedureState. 1402 * For testing parse and profiling. 1403 * @param args Include pointer to directory of WAL files for a store instance to parse & load. 1404 */ 1405 public static void main(String [] args) throws IOException { 1406 Configuration conf = HBaseConfiguration.create(); 1407 if (args == null || args.length != 1) { 1408 System.out.println("ERROR: Empty arguments list; pass path to MASTERPROCWALS_DIR."); 1409 System.out.println("Usage: WALProcedureStore MASTERPROCWALS_DIR"); 1410 System.exit(-1); 1411 } 1412 WALProcedureStore store = new WALProcedureStore(conf, new Path(args[0]), null, 1413 new LeaseRecovery() { 1414 @Override 1415 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 1416 // no-op 1417 } 1418 }); 1419 try { 1420 store.start(16); 1421 ProcedureExecutor<?> pe = new ProcedureExecutor<>(conf, new Object()/*Pass anything*/, store); 1422 pe.init(1, true); 1423 } finally { 1424 store.stop(true); 1425 } 1426 } 1427}