001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Comparator; 025import java.util.HashSet; 026import java.util.Iterator; 027import java.util.LinkedList; 028import java.util.Set; 029import java.util.concurrent.LinkedTransferQueue; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.atomic.AtomicLong; 033import java.util.concurrent.atomic.AtomicReference; 034import java.util.concurrent.locks.Condition; 035import java.util.concurrent.locks.ReentrantLock; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FSDataOutputStream; 038import org.apache.hadoop.fs.FSError; 039import org.apache.hadoop.fs.FileAlreadyExistsException; 040import org.apache.hadoop.fs.FileStatus; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.fs.PathFilter; 044import org.apache.hadoop.fs.StreamCapabilities; 045import org.apache.hadoop.hbase.HBaseConfiguration; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.log.HBaseMarkers; 048import org.apache.hadoop.hbase.procedure2.Procedure; 049import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 050import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery; 051import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 052import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase; 053import org.apache.hadoop.hbase.procedure2.util.ByteSlot; 054import org.apache.hadoop.hbase.procedure2.util.StringUtils; 055import org.apache.hadoop.hbase.util.CommonFSUtils; 056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 057import org.apache.hadoop.hbase.util.Threads; 058import org.apache.hadoop.ipc.RemoteException; 059import org.apache.yetus.audience.InterfaceAudience; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue; 064 065import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; 066 067/** 068 * WAL implementation of the ProcedureStore. 069 * <p/> 070 * When starting, the upper layer will first call {@link #start(int)}, then {@link #recoverLease()}, 071 * then {@link #load(ProcedureLoader)}. 072 * <p/> 073 * In {@link #recoverLease()}, we will get the lease by closing all the existing wal files(by 074 * calling recoverFileLease), and creating a new wal writer. And we will also get the list of all 075 * the old wal files. 076 * <p/> 077 * FIXME: notice that the current recover lease implementation is problematic, it can not deal with 078 * the races if there are two master both wants to acquire the lease... 079 * <p/> 080 * In {@link #load(ProcedureLoader)} method, we will load all the active procedures. See the 081 * comments of this method for more details. 082 * <p/> 083 * The actual logging way is a bit like our FileSystem based WAL implementation as RS side. There is 084 * a {@link #slots}, which is more like the ring buffer, and in the insert, update and delete 085 * methods we will put thing into the {@link #slots} and wait. And there is a background sync 086 * thread(see the {@link #syncLoop()} method) which get data from the {@link #slots} and write them 087 * to the FileSystem, and notify the caller that we have finished. 088 * <p/> 089 * TODO: try using disruptor to increase performance and simplify the logic? 090 * <p/> 091 * The {@link #storeTracker} keeps track of the modified procedures in the newest wal file, which is 092 * also the one being written currently. And the deleted bits in it are for all the procedures, not 093 * only the ones in the newest wal file. And when rolling a log, we will first store it in the 094 * trailer of the current wal file, and then reset its modified bits, so that it can start to track 095 * the modified procedures for the new wal file. 096 * <p/> 097 * The {@link #holdingCleanupTracker} is used to test whether we are safe to delete the oldest wal 098 * file. When there are log rolling and there are more than 1 wal files, we will make use of it. It 099 * will first be initialized to the oldest file's tracker(which is stored in the trailer), using the 100 * method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it 101 * with the tracker of every newer wal files, using the 102 * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}. If we find out 103 * that all the modified procedures for the oldest wal file are modified or deleted in newer wal 104 * files, then we can delete it. This is because that, every time we call 105 * {@link ProcedureStore#insert(Procedure[])} or {@link ProcedureStore#update(Procedure)}, we will 106 * persist the full state of a Procedure, so the earlier wal records for this procedure can all be 107 * deleted. 108 * @see ProcedureWALPrettyPrinter for printing content of a single WAL. 109 * @see #main(String[]) to parse a directory of MasterWALProcs. 110 * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we 111 * use the new region based procedure store. 112 */ 113@Deprecated 114@InterfaceAudience.Private 115public class WALProcedureStore extends ProcedureStoreBase { 116 private static final Logger LOG = LoggerFactory.getLogger(WALProcedureStore.class); 117 public static final String LOG_PREFIX = "pv2-"; 118 /** Used to construct the name of the log directory for master procedures */ 119 public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs"; 120 121 public static final String WAL_COUNT_WARN_THRESHOLD_CONF_KEY = 122 "hbase.procedure.store.wal.warn.threshold"; 123 private static final int DEFAULT_WAL_COUNT_WARN_THRESHOLD = 10; 124 125 public static final String EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY = 126 "hbase.procedure.store.wal.exec.cleanup.on.load"; 127 private static final boolean DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY = true; 128 129 public static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY = 130 "hbase.procedure.store.wal.max.retries.before.roll"; 131 private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3; 132 133 public static final String WAIT_BEFORE_ROLL_CONF_KEY = 134 "hbase.procedure.store.wal.wait.before.roll"; 135 private static final int DEFAULT_WAIT_BEFORE_ROLL = 500; 136 137 public static final String ROLL_RETRIES_CONF_KEY = "hbase.procedure.store.wal.max.roll.retries"; 138 private static final int DEFAULT_ROLL_RETRIES = 3; 139 140 public static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY = 141 "hbase.procedure.store.wal.sync.failure.roll.max"; 142 private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3; 143 144 public static final String PERIODIC_ROLL_CONF_KEY = 145 "hbase.procedure.store.wal.periodic.roll.msec"; 146 private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000; // 1h 147 148 public static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec"; 149 private static final int DEFAULT_SYNC_WAIT_MSEC = 100; 150 151 public static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync"; 152 private static final boolean DEFAULT_USE_HSYNC = true; 153 154 public static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold"; 155 private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M 156 157 public static final String STORE_WAL_SYNC_STATS_COUNT = 158 "hbase.procedure.store.wal.sync.stats.count"; 159 private static final int DEFAULT_SYNC_STATS_COUNT = 10; 160 161 private final LinkedList<ProcedureWALFile> logs = new LinkedList<>(); 162 private final ProcedureStoreTracker holdingCleanupTracker = new ProcedureStoreTracker(); 163 private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker(); 164 private final ReentrantLock lock = new ReentrantLock(); 165 private final Condition waitCond = lock.newCondition(); 166 private final Condition slotCond = lock.newCondition(); 167 private final Condition syncCond = lock.newCondition(); 168 169 private final LeaseRecovery leaseRecovery; 170 private final Configuration conf; 171 private final FileSystem fs; 172 private final Path walDir; 173 private final Path walArchiveDir; 174 private final boolean enforceStreamCapability; 175 176 private final AtomicReference<Throwable> syncException = new AtomicReference<>(); 177 private final AtomicBoolean loading = new AtomicBoolean(true); 178 private final AtomicBoolean inSync = new AtomicBoolean(false); 179 private final AtomicLong totalSynced = new AtomicLong(0); 180 private final AtomicLong lastRollTs = new AtomicLong(0); 181 private final AtomicLong syncId = new AtomicLong(0); 182 183 private LinkedTransferQueue<ByteSlot> slotsCache = null; 184 private Set<ProcedureWALFile> corruptedLogs = null; 185 private FSDataOutputStream stream = null; 186 private int runningProcCount = 1; 187 private long flushLogId = 0; 188 private int syncMaxSlot = 1; 189 private int slotIndex = 0; 190 private Thread syncThread; 191 private ByteSlot[] slots; 192 193 private int walCountWarnThreshold; 194 private int maxRetriesBeforeRoll; 195 private int maxSyncFailureRoll; 196 private int waitBeforeRoll; 197 private int rollRetries; 198 private int periodicRollMsec; 199 private long rollThreshold; 200 private boolean useHsync; 201 private int syncWaitMsec; 202 203 // Variables used for UI display 204 private CircularFifoQueue<SyncMetrics> syncMetricsQueue; 205 206 public static class SyncMetrics { 207 private long timestamp; 208 private long syncWaitMs; 209 private long totalSyncedBytes; 210 private int syncedEntries; 211 private float syncedPerSec; 212 213 public long getTimestamp() { 214 return timestamp; 215 } 216 217 public long getSyncWaitMs() { 218 return syncWaitMs; 219 } 220 221 public long getTotalSyncedBytes() { 222 return totalSyncedBytes; 223 } 224 225 public long getSyncedEntries() { 226 return syncedEntries; 227 } 228 229 public float getSyncedPerSec() { 230 return syncedPerSec; 231 } 232 } 233 234 public WALProcedureStore(Configuration conf, LeaseRecovery leaseRecovery) throws IOException { 235 this(conf, new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR), 236 new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_OLDLOGDIR_NAME), 237 leaseRecovery); 238 } 239 240 public WALProcedureStore(final Configuration conf, final Path walDir, final Path walArchiveDir, 241 final LeaseRecovery leaseRecovery) throws IOException { 242 this.conf = conf; 243 this.leaseRecovery = leaseRecovery; 244 this.walDir = walDir; 245 this.walArchiveDir = walArchiveDir; 246 this.fs = CommonFSUtils.getWALFileSystem(conf); 247 this.enforceStreamCapability = 248 conf.getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true); 249 250 // Create the log directory for the procedure store 251 if (!fs.exists(walDir)) { 252 if (!fs.mkdirs(walDir)) { 253 throw new IOException("Unable to mkdir " + walDir); 254 } 255 } 256 // Now that it exists, set the log policy 257 String storagePolicy = 258 conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); 259 CommonFSUtils.setStoragePolicy(fs, walDir, storagePolicy); 260 261 // Create archive dir up front. Rename won't work w/o it up on HDFS. 262 if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) { 263 if (this.fs.mkdirs(this.walArchiveDir)) { 264 LOG.debug("Created Procedure Store WAL archive dir {}", this.walArchiveDir); 265 } else { 266 LOG.warn("Failed create of {}", this.walArchiveDir); 267 } 268 } 269 } 270 271 @Override 272 public void start(int numSlots) throws IOException { 273 if (!setRunning(true)) { 274 return; 275 } 276 277 // Init buffer slots 278 loading.set(true); 279 runningProcCount = numSlots; 280 syncMaxSlot = numSlots; 281 slots = new ByteSlot[numSlots]; 282 slotsCache = new LinkedTransferQueue<>(); 283 while (slotsCache.size() < numSlots) { 284 slotsCache.offer(new ByteSlot()); 285 } 286 287 // Tunings 288 walCountWarnThreshold = 289 conf.getInt(WAL_COUNT_WARN_THRESHOLD_CONF_KEY, DEFAULT_WAL_COUNT_WARN_THRESHOLD); 290 maxRetriesBeforeRoll = 291 conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL); 292 maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL); 293 waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL); 294 rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES); 295 rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD); 296 periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL); 297 syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC); 298 useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC); 299 300 // WebUI 301 syncMetricsQueue = 302 new CircularFifoQueue<>(conf.getInt(STORE_WAL_SYNC_STATS_COUNT, DEFAULT_SYNC_STATS_COUNT)); 303 304 // Init sync thread 305 syncThread = new Thread("WALProcedureStoreSyncThread") { 306 @Override 307 public void run() { 308 try { 309 syncLoop(); 310 } catch (Throwable e) { 311 LOG.error("Got an exception from the sync-loop", e); 312 if (!isSyncAborted()) { 313 sendAbortProcessSignal(); 314 } 315 } 316 } 317 }; 318 syncThread.start(); 319 } 320 321 @Override 322 public void stop(final boolean abort) { 323 if (!setRunning(false)) { 324 return; 325 } 326 327 LOG.info("Stopping the WAL Procedure Store, isAbort=" + abort 328 + (isSyncAborted() ? " (self aborting)" : "")); 329 sendStopSignal(); 330 if (!isSyncAborted()) { 331 try { 332 while (syncThread.isAlive()) { 333 sendStopSignal(); 334 syncThread.join(250); 335 } 336 } catch (InterruptedException e) { 337 LOG.warn("join interrupted", e); 338 Thread.currentThread().interrupt(); 339 } 340 } 341 342 // Close the writer 343 closeCurrentLogStream(abort); 344 345 // Close the old logs 346 // they should be already closed, this is just in case the load fails 347 // and we call start() and then stop() 348 for (ProcedureWALFile log : logs) { 349 log.close(); 350 } 351 logs.clear(); 352 loading.set(true); 353 } 354 355 private void sendStopSignal() { 356 if (lock.tryLock()) { 357 try { 358 waitCond.signalAll(); 359 syncCond.signalAll(); 360 } finally { 361 lock.unlock(); 362 } 363 } 364 } 365 366 @Override 367 public int getNumThreads() { 368 return slots == null ? 0 : slots.length; 369 } 370 371 @Override 372 public int setRunningProcedureCount(final int count) { 373 this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length; 374 return this.runningProcCount; 375 } 376 377 public ProcedureStoreTracker getStoreTracker() { 378 return storeTracker; 379 } 380 381 public ArrayList<ProcedureWALFile> getActiveLogs() { 382 lock.lock(); 383 try { 384 return new ArrayList<>(logs); 385 } finally { 386 lock.unlock(); 387 } 388 } 389 390 public Set<ProcedureWALFile> getCorruptedLogs() { 391 return corruptedLogs; 392 } 393 394 @Override 395 public void recoverLease() throws IOException { 396 lock.lock(); 397 try { 398 LOG.debug("Starting WAL Procedure Store lease recovery"); 399 boolean afterFirstAttempt = false; 400 while (isRunning()) { 401 // Don't sleep before first attempt 402 if (afterFirstAttempt) { 403 LOG.trace("Sleep {} ms after first lease recovery attempt.", waitBeforeRoll); 404 Threads.sleepWithoutInterrupt(waitBeforeRoll); 405 } else { 406 afterFirstAttempt = true; 407 } 408 FileStatus[] oldLogs = getLogFiles(); 409 // Get Log-MaxID and recover lease on old logs 410 try { 411 flushLogId = initOldLogs(oldLogs); 412 } catch (FileNotFoundException e) { 413 LOG.warn("Someone else is active and deleted logs. retrying.", e); 414 continue; 415 } 416 417 // Create new state-log 418 if (!rollWriter(flushLogId + 1)) { 419 // someone else has already created this log 420 LOG.debug("Someone else has already created log {}. Retrying.", flushLogId); 421 continue; 422 } 423 424 // We have the lease on the log 425 oldLogs = getLogFiles(); 426 if (getMaxLogId(oldLogs) > flushLogId) { 427 LOG.debug("Someone else created new logs. Expected maxLogId < {}", flushLogId); 428 logs.getLast().removeFile(this.walArchiveDir); 429 continue; 430 } 431 432 LOG.debug("Lease acquired for flushLogId={}", flushLogId); 433 break; 434 } 435 } finally { 436 lock.unlock(); 437 } 438 } 439 440 @Override 441 public void load(ProcedureLoader loader) throws IOException { 442 lock.lock(); 443 try { 444 if (logs.isEmpty()) { 445 throw new IllegalStateException("recoverLease() must be called before loading data"); 446 } 447 448 // Nothing to do, If we have only the current log. 449 if (logs.size() == 1) { 450 LOG.debug("No state logs to replay."); 451 loader.setMaxProcId(0); 452 loading.set(false); 453 return; 454 } 455 456 // Load the old logs 457 Iterator<ProcedureWALFile> it = logs.descendingIterator(); 458 it.next(); // Skip the current log 459 460 ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() { 461 462 @Override 463 public void setMaxProcId(long maxProcId) { 464 loader.setMaxProcId(maxProcId); 465 } 466 467 @Override 468 public void load(ProcedureIterator procIter) throws IOException { 469 loader.load(procIter); 470 } 471 472 @Override 473 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 474 loader.handleCorrupted(procIter); 475 } 476 477 @Override 478 public void markCorruptedWAL(ProcedureWALFile log, IOException e) { 479 if (corruptedLogs == null) { 480 corruptedLogs = new HashSet<>(); 481 } 482 corruptedLogs.add(log); 483 // TODO: sideline corrupted log 484 } 485 }); 486 // if we fail when loading, we should prevent persisting the storeTracker later in the stop 487 // method. As it may happen that, we have finished constructing the modified and deleted bits, 488 // but before we call resetModified, we fail, then if we persist the storeTracker then when 489 // restarting, we will consider that all procedures have been included in this file and delete 490 // all the previous files. Obviously this not correct. So here we will only set loading to 491 // false when we successfully loaded all the procedures, and when closing we will skip 492 // persisting the store tracker. And also, this will prevent the sync thread to do 493 // periodicRoll, where we may also clean old logs. 494 loading.set(false); 495 // try to cleanup inactive wals and complete the operation 496 buildHoldingCleanupTracker(); 497 tryCleanupLogsOnLoad(); 498 } finally { 499 lock.unlock(); 500 } 501 } 502 503 private void tryCleanupLogsOnLoad() { 504 // nothing to cleanup. 505 if (logs.size() <= 1) { 506 return; 507 } 508 509 // the config says to not cleanup wals on load. 510 if ( 511 !conf.getBoolean(EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY) 512 ) { 513 LOG.debug("WALs cleanup on load is not enabled: " + getActiveLogs()); 514 return; 515 } 516 517 try { 518 periodicRoll(); 519 } catch (IOException e) { 520 LOG.warn("Unable to cleanup logs on load: " + e.getMessage(), e); 521 } 522 } 523 524 @Override 525 public void insert(Procedure<?> proc, Procedure<?>[] subprocs) { 526 if (LOG.isTraceEnabled()) { 527 LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs)); 528 } 529 530 ByteSlot slot = acquireSlot(); 531 try { 532 // Serialize the insert 533 long[] subProcIds = null; 534 if (subprocs != null) { 535 ProcedureWALFormat.writeInsert(slot, proc, subprocs); 536 subProcIds = new long[subprocs.length]; 537 for (int i = 0; i < subprocs.length; ++i) { 538 subProcIds[i] = subprocs[i].getProcId(); 539 } 540 } else { 541 assert !proc.hasParent(); 542 ProcedureWALFormat.writeInsert(slot, proc); 543 } 544 545 // Push the transaction data and wait until it is persisted 546 pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds); 547 } catch (IOException e) { 548 // We are not able to serialize the procedure. 549 // this is a code error, and we are not able to go on. 550 LOG.error(HBaseMarkers.FATAL, "Unable to serialize one of the procedure: proc=" + proc 551 + ", subprocs=" + Arrays.toString(subprocs), e); 552 throw new RuntimeException(e); 553 } finally { 554 releaseSlot(slot); 555 } 556 } 557 558 @Override 559 public void insert(Procedure<?>[] procs) { 560 if (LOG.isTraceEnabled()) { 561 LOG.trace("Insert " + Arrays.toString(procs)); 562 } 563 564 ByteSlot slot = acquireSlot(); 565 try { 566 // Serialize the insert 567 long[] procIds = new long[procs.length]; 568 for (int i = 0; i < procs.length; ++i) { 569 assert !procs[i].hasParent(); 570 procIds[i] = procs[i].getProcId(); 571 ProcedureWALFormat.writeInsert(slot, procs[i]); 572 } 573 574 // Push the transaction data and wait until it is persisted 575 pushData(PushType.INSERT, slot, Procedure.NO_PROC_ID, procIds); 576 } catch (IOException e) { 577 // We are not able to serialize the procedure. 578 // this is a code error, and we are not able to go on. 579 LOG.error(HBaseMarkers.FATAL, 580 "Unable to serialize one of the procedure: " + Arrays.toString(procs), e); 581 throw new RuntimeException(e); 582 } finally { 583 releaseSlot(slot); 584 } 585 } 586 587 @Override 588 public void update(Procedure<?> proc) { 589 if (LOG.isTraceEnabled()) { 590 LOG.trace("Update " + proc); 591 } 592 593 ByteSlot slot = acquireSlot(); 594 try { 595 // Serialize the update 596 ProcedureWALFormat.writeUpdate(slot, proc); 597 598 // Push the transaction data and wait until it is persisted 599 pushData(PushType.UPDATE, slot, proc.getProcId(), null); 600 } catch (IOException e) { 601 // We are not able to serialize the procedure. 602 // this is a code error, and we are not able to go on. 603 LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + proc, e); 604 throw new RuntimeException(e); 605 } finally { 606 releaseSlot(slot); 607 } 608 } 609 610 @Override 611 public void delete(long procId) { 612 LOG.trace("Delete {}", procId); 613 ByteSlot slot = acquireSlot(); 614 try { 615 // Serialize the delete 616 ProcedureWALFormat.writeDelete(slot, procId); 617 618 // Push the transaction data and wait until it is persisted 619 pushData(PushType.DELETE, slot, procId, null); 620 } catch (IOException e) { 621 // We are not able to serialize the procedure. 622 // this is a code error, and we are not able to go on. 623 LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + procId, e); 624 throw new RuntimeException(e); 625 } finally { 626 releaseSlot(slot); 627 } 628 } 629 630 @Override 631 public void delete(Procedure<?> proc, long[] subProcIds) { 632 assert proc != null : "expected a non-null procedure"; 633 assert subProcIds != null && subProcIds.length > 0 : "expected subProcIds"; 634 if (LOG.isTraceEnabled()) { 635 LOG.trace("Update " + proc + " and Delete " + Arrays.toString(subProcIds)); 636 } 637 638 ByteSlot slot = acquireSlot(); 639 try { 640 // Serialize the delete 641 ProcedureWALFormat.writeDelete(slot, proc, subProcIds); 642 643 // Push the transaction data and wait until it is persisted 644 pushData(PushType.DELETE, slot, proc.getProcId(), subProcIds); 645 } catch (IOException e) { 646 // We are not able to serialize the procedure. 647 // this is a code error, and we are not able to go on. 648 LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + proc, e); 649 throw new RuntimeException(e); 650 } finally { 651 releaseSlot(slot); 652 } 653 } 654 655 @Override 656 public void delete(final long[] procIds, final int offset, final int count) { 657 if (count == 0) { 658 return; 659 } 660 661 if (offset == 0 && count == procIds.length) { 662 delete(procIds); 663 } else if (count == 1) { 664 delete(procIds[offset]); 665 } else { 666 delete(Arrays.copyOfRange(procIds, offset, offset + count)); 667 } 668 } 669 670 private void delete(long[] procIds) { 671 if (LOG.isTraceEnabled()) { 672 LOG.trace("Delete " + Arrays.toString(procIds)); 673 } 674 675 final ByteSlot slot = acquireSlot(); 676 try { 677 // Serialize the delete 678 for (int i = 0; i < procIds.length; ++i) { 679 ProcedureWALFormat.writeDelete(slot, procIds[i]); 680 } 681 682 // Push the transaction data and wait until it is persisted 683 pushData(PushType.DELETE, slot, Procedure.NO_PROC_ID, procIds); 684 } catch (IOException e) { 685 // We are not able to serialize the procedure. 686 // this is a code error, and we are not able to go on. 687 LOG.error("Unable to serialize the procedures: " + Arrays.toString(procIds), e); 688 throw new RuntimeException(e); 689 } finally { 690 releaseSlot(slot); 691 } 692 } 693 694 private ByteSlot acquireSlot() { 695 ByteSlot slot = slotsCache.poll(); 696 return slot != null ? slot : new ByteSlot(); 697 } 698 699 private void releaseSlot(final ByteSlot slot) { 700 slot.reset(); 701 slotsCache.offer(slot); 702 } 703 704 private enum PushType { 705 INSERT, 706 UPDATE, 707 DELETE 708 }; 709 710 private long pushData(final PushType type, final ByteSlot slot, final long procId, 711 final long[] subProcIds) { 712 if (!isRunning()) { 713 throw new RuntimeException("the store must be running before inserting data"); 714 } 715 if (logs.isEmpty()) { 716 throw new RuntimeException("recoverLease() must be called before inserting data"); 717 } 718 719 long logId = -1; 720 lock.lock(); 721 try { 722 // Wait for the sync to be completed 723 while (true) { 724 if (!isRunning()) { 725 throw new RuntimeException("store no longer running"); 726 } else if (isSyncAborted()) { 727 throw new RuntimeException("sync aborted", syncException.get()); 728 } else if (inSync.get()) { 729 syncCond.await(); 730 } else if (slotIndex >= syncMaxSlot) { 731 slotCond.signal(); 732 syncCond.await(); 733 } else { 734 break; 735 } 736 } 737 738 final long pushSyncId = syncId.get(); 739 updateStoreTracker(type, procId, subProcIds); 740 slots[slotIndex++] = slot; 741 logId = flushLogId; 742 743 // Notify that there is new data 744 if (slotIndex == 1) { 745 waitCond.signal(); 746 } 747 748 // Notify that the slots are full 749 if (slotIndex == syncMaxSlot) { 750 waitCond.signal(); 751 slotCond.signal(); 752 } 753 754 while (pushSyncId == syncId.get() && isRunning()) { 755 syncCond.await(); 756 } 757 } catch (InterruptedException e) { 758 Thread.currentThread().interrupt(); 759 sendAbortProcessSignal(); 760 throw new RuntimeException(e); 761 } finally { 762 lock.unlock(); 763 if (isSyncAborted()) { 764 throw new RuntimeException("sync aborted", syncException.get()); 765 } 766 } 767 return logId; 768 } 769 770 private void updateStoreTracker(final PushType type, final long procId, final long[] subProcIds) { 771 switch (type) { 772 case INSERT: 773 if (subProcIds == null) { 774 storeTracker.insert(procId); 775 } else if (procId == Procedure.NO_PROC_ID) { 776 storeTracker.insert(subProcIds); 777 } else { 778 storeTracker.insert(procId, subProcIds); 779 holdingCleanupTracker.setDeletedIfModified(procId); 780 } 781 break; 782 case UPDATE: 783 storeTracker.update(procId); 784 holdingCleanupTracker.setDeletedIfModified(procId); 785 break; 786 case DELETE: 787 if (subProcIds != null && subProcIds.length > 0) { 788 storeTracker.delete(subProcIds); 789 holdingCleanupTracker.setDeletedIfModified(subProcIds); 790 } else { 791 storeTracker.delete(procId); 792 holdingCleanupTracker.setDeletedIfModified(procId); 793 } 794 break; 795 default: 796 throw new RuntimeException("invalid push type " + type); 797 } 798 } 799 800 private boolean isSyncAborted() { 801 return syncException.get() != null; 802 } 803 804 private void syncLoop() throws Throwable { 805 long totalSyncedToStore = 0; 806 inSync.set(false); 807 lock.lock(); 808 try { 809 while (isRunning()) { 810 try { 811 // Wait until new data is available 812 if (slotIndex == 0) { 813 if (!loading.get()) { 814 periodicRoll(); 815 } 816 817 if (LOG.isTraceEnabled()) { 818 float rollTsSec = getMillisFromLastRoll() / 1000.0f; 819 LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)", 820 StringUtils.humanSize(totalSynced.get()), 821 StringUtils.humanSize(totalSynced.get() / rollTsSec))); 822 } 823 824 waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS); 825 if (slotIndex == 0) { 826 // no data.. probably a stop() or a periodic roll 827 continue; 828 } 829 } 830 // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing 831 syncMaxSlot = runningProcCount; 832 assert syncMaxSlot > 0 : "unexpected syncMaxSlot=" + syncMaxSlot; 833 final long syncWaitSt = EnvironmentEdgeManager.currentTime(); 834 if (slotIndex != syncMaxSlot) { 835 slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS); 836 } 837 838 final long currentTs = EnvironmentEdgeManager.currentTime(); 839 final long syncWaitMs = currentTs - syncWaitSt; 840 final float rollSec = getMillisFromLastRoll() / 1000.0f; 841 final float syncedPerSec = totalSyncedToStore / rollSec; 842 if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < syncMaxSlot)) { 843 LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)", 844 StringUtils.humanTimeDiff(syncWaitMs), slotIndex, 845 StringUtils.humanSize(totalSyncedToStore), StringUtils.humanSize(syncedPerSec))); 846 } 847 848 // update webui circular buffers (TODO: get rid of allocations) 849 final SyncMetrics syncMetrics = new SyncMetrics(); 850 syncMetrics.timestamp = currentTs; 851 syncMetrics.syncWaitMs = syncWaitMs; 852 syncMetrics.syncedEntries = slotIndex; 853 syncMetrics.totalSyncedBytes = totalSyncedToStore; 854 syncMetrics.syncedPerSec = syncedPerSec; 855 syncMetricsQueue.add(syncMetrics); 856 857 // sync 858 inSync.set(true); 859 long slotSize = syncSlots(); 860 logs.getLast().addToSize(slotSize); 861 totalSyncedToStore = totalSynced.addAndGet(slotSize); 862 slotIndex = 0; 863 inSync.set(false); 864 syncId.incrementAndGet(); 865 } catch (InterruptedException e) { 866 Thread.currentThread().interrupt(); 867 syncException.compareAndSet(null, e); 868 sendAbortProcessSignal(); 869 throw e; 870 } catch (Throwable t) { 871 syncException.compareAndSet(null, t); 872 sendAbortProcessSignal(); 873 throw t; 874 } finally { 875 syncCond.signalAll(); 876 } 877 } 878 } finally { 879 lock.unlock(); 880 } 881 } 882 883 public ArrayList<SyncMetrics> getSyncMetrics() { 884 lock.lock(); 885 try { 886 return new ArrayList<>(syncMetricsQueue); 887 } finally { 888 lock.unlock(); 889 } 890 } 891 892 private long syncSlots() throws Throwable { 893 int retry = 0; 894 int logRolled = 0; 895 long totalSynced = 0; 896 do { 897 try { 898 totalSynced = syncSlots(stream, slots, 0, slotIndex); 899 break; 900 } catch (Throwable e) { 901 LOG.warn("unable to sync slots, retry=" + retry); 902 if (++retry >= maxRetriesBeforeRoll) { 903 if (logRolled >= maxSyncFailureRoll && isRunning()) { 904 LOG.error("Sync slots after log roll failed, abort.", e); 905 throw e; 906 } 907 908 if (!rollWriterWithRetries()) { 909 throw e; 910 } 911 912 logRolled++; 913 retry = 0; 914 } 915 } 916 } while (isRunning()); 917 return totalSynced; 918 } 919 920 protected long syncSlots(final FSDataOutputStream stream, final ByteSlot[] slots, 921 final int offset, final int count) throws IOException { 922 long totalSynced = 0; 923 for (int i = 0; i < count; ++i) { 924 final ByteSlot data = slots[offset + i]; 925 data.writeTo(stream); 926 totalSynced += data.size(); 927 } 928 929 syncStream(stream); 930 sendPostSyncSignal(); 931 932 if (LOG.isTraceEnabled()) { 933 LOG.trace("Sync slots=" + count + '/' + syncMaxSlot + ", flushed=" 934 + StringUtils.humanSize(totalSynced)); 935 } 936 return totalSynced; 937 } 938 939 protected void syncStream(final FSDataOutputStream stream) throws IOException { 940 if (useHsync) { 941 stream.hsync(); 942 } else { 943 stream.hflush(); 944 } 945 } 946 947 private boolean rollWriterWithRetries() { 948 for (int i = 0; i < rollRetries && isRunning(); ++i) { 949 if (i > 0) { 950 Threads.sleepWithoutInterrupt(waitBeforeRoll * i); 951 } 952 953 try { 954 if (rollWriter()) { 955 return true; 956 } 957 } catch (IOException e) { 958 LOG.warn("Unable to roll the log, attempt=" + (i + 1), e); 959 } 960 } 961 LOG.error(HBaseMarkers.FATAL, "Unable to roll the log"); 962 return false; 963 } 964 965 private boolean tryRollWriter() { 966 try { 967 return rollWriter(); 968 } catch (IOException e) { 969 LOG.warn("Unable to roll the log", e); 970 return false; 971 } 972 } 973 974 public long getMillisToNextPeriodicRoll() { 975 if (lastRollTs.get() > 0 && periodicRollMsec > 0) { 976 return periodicRollMsec - getMillisFromLastRoll(); 977 } 978 return Long.MAX_VALUE; 979 } 980 981 public long getMillisFromLastRoll() { 982 return (EnvironmentEdgeManager.currentTime() - lastRollTs.get()); 983 } 984 985 void periodicRollForTesting() throws IOException { 986 lock.lock(); 987 try { 988 periodicRoll(); 989 } finally { 990 lock.unlock(); 991 } 992 } 993 994 public boolean rollWriterForTesting() throws IOException { 995 lock.lock(); 996 try { 997 return rollWriter(); 998 } finally { 999 lock.unlock(); 1000 } 1001 } 1002 1003 void removeInactiveLogsForTesting() throws Exception { 1004 lock.lock(); 1005 try { 1006 removeInactiveLogs(); 1007 } finally { 1008 lock.unlock(); 1009 } 1010 } 1011 1012 private void periodicRoll() throws IOException { 1013 if (storeTracker.isEmpty()) { 1014 LOG.trace("no active procedures"); 1015 tryRollWriter(); 1016 removeAllLogs(flushLogId - 1, "no active procedures"); 1017 } else { 1018 if (storeTracker.isAllModified()) { 1019 LOG.trace("all the active procedures are in the latest log"); 1020 removeAllLogs(flushLogId - 1, "all the active procedures are in the latest log"); 1021 } 1022 1023 // if the log size has exceeded the roll threshold 1024 // or the periodic roll timeout is expired, try to roll the wal. 1025 if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) { 1026 tryRollWriter(); 1027 } 1028 1029 removeInactiveLogs(); 1030 } 1031 } 1032 1033 private boolean rollWriter() throws IOException { 1034 if (!isRunning()) { 1035 return false; 1036 } 1037 1038 // Create new state-log 1039 if (!rollWriter(flushLogId + 1)) { 1040 LOG.warn("someone else has already created log {}", flushLogId); 1041 return false; 1042 } 1043 1044 // We have the lease on the log, 1045 // but we should check if someone else has created new files 1046 if (getMaxLogId(getLogFiles()) > flushLogId) { 1047 LOG.warn("Someone else created new logs. Expected maxLogId < {}", flushLogId); 1048 logs.getLast().removeFile(this.walArchiveDir); 1049 return false; 1050 } 1051 1052 // We have the lease on the log 1053 return true; 1054 } 1055 1056 boolean rollWriter(long logId) throws IOException { 1057 assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId; 1058 assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked(); 1059 1060 ProcedureWALHeader header = ProcedureWALHeader.newBuilder() 1061 .setVersion(ProcedureWALFormat.HEADER_VERSION).setType(ProcedureWALFormat.LOG_TYPE_STREAM) 1062 .setMinProcId(storeTracker.getActiveMinProcId()).setLogId(logId).build(); 1063 1064 FSDataOutputStream newStream = null; 1065 Path newLogFile = null; 1066 long startPos = -1; 1067 newLogFile = getLogFilePath(logId); 1068 try { 1069 newStream = CommonFSUtils.createForWal(fs, newLogFile, false); 1070 } catch (FileAlreadyExistsException e) { 1071 LOG.error("Log file with id={} already exists", logId, e); 1072 return false; 1073 } catch (RemoteException re) { 1074 LOG.warn("failed to create log file with id={}", logId, re); 1075 return false; 1076 } 1077 // After we create the stream but before we attempt to use it at all 1078 // ensure that we can provide the level of data safety we're configured 1079 // to provide. 1080 final String durability = useHsync ? StreamCapabilities.HSYNC : StreamCapabilities.HFLUSH; 1081 if (enforceStreamCapability && !newStream.hasCapability(durability)) { 1082 throw new IllegalStateException("The procedure WAL relies on the ability to " + durability 1083 + " for proper operation during component failures, but the underlying filesystem does " 1084 + "not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY 1085 + "' to set the desired level of robustness and ensure the config value of '" 1086 + CommonFSUtils.HBASE_WAL_DIR + "' points to a FileSystem mount that can provide it."); 1087 } 1088 try { 1089 ProcedureWALFormat.writeHeader(newStream, header); 1090 startPos = newStream.getPos(); 1091 } catch (IOException ioe) { 1092 LOG.warn("Encountered exception writing header", ioe); 1093 newStream.close(); 1094 return false; 1095 } 1096 1097 closeCurrentLogStream(false); 1098 1099 storeTracker.resetModified(); 1100 stream = newStream; 1101 flushLogId = logId; 1102 totalSynced.set(0); 1103 long rollTs = EnvironmentEdgeManager.currentTime(); 1104 lastRollTs.set(rollTs); 1105 logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos, rollTs)); 1106 1107 // if it's the first next WAL being added, build the holding cleanup tracker 1108 if (logs.size() == 2) { 1109 buildHoldingCleanupTracker(); 1110 } else if (logs.size() > walCountWarnThreshold) { 1111 LOG.warn("procedure WALs count={} above the warning threshold {}. check running procedures" 1112 + " to see if something is stuck.", logs.size(), walCountWarnThreshold); 1113 // This is just like what we have done at RS side when there are too many wal files. For RS, 1114 // if there are too many wal files, we will find out the wal entries in the oldest file, and 1115 // tell the upper layer to flush these regions so the wal entries will be useless and then we 1116 // can delete the wal file. For WALProcedureStore, the assumption is that, if all the 1117 // procedures recorded in a proc wal file are modified or deleted in a new proc wal file, then 1118 // we are safe to delete it. So here if there are too many proc wal files, we will find out 1119 // the procedure ids in the oldest file, which are neither modified nor deleted in newer proc 1120 // wal files, and tell upper layer to update the state of these procedures to the newest proc 1121 // wal file(by calling ProcedureStore.update), then we are safe to delete the oldest proc wal 1122 // file. 1123 sendForceUpdateSignal(holdingCleanupTracker.getAllActiveProcIds()); 1124 } 1125 1126 LOG.info("Rolled new Procedure Store WAL, id={}", logId); 1127 return true; 1128 } 1129 1130 private void closeCurrentLogStream(boolean abort) { 1131 if (stream == null || logs.isEmpty()) { 1132 return; 1133 } 1134 1135 try { 1136 ProcedureWALFile log = logs.getLast(); 1137 // If the loading flag is true, it usually means that we fail when loading procedures, so we 1138 // should not persist the store tracker, as its state may not be correct. 1139 if (!loading.get()) { 1140 log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId()); 1141 log.updateLocalTracker(storeTracker); 1142 if (!abort) { 1143 long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker); 1144 log.addToSize(trailerSize); 1145 } 1146 } 1147 } catch (IOException | FSError e) { 1148 LOG.warn("Unable to write the trailer", e); 1149 } 1150 try { 1151 stream.close(); 1152 } catch (IOException | FSError e) { 1153 LOG.error("Unable to close the stream", e); 1154 } 1155 stream = null; 1156 } 1157 1158 // ========================================================================== 1159 // Log Files cleaner helpers 1160 // ========================================================================== 1161 private void removeInactiveLogs() throws IOException { 1162 // We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'. 1163 // once there is nothing olding the oldest WAL we can remove it. 1164 while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) { 1165 LOG.info("Remove the oldest log {}", logs.getFirst()); 1166 removeLogFile(logs.getFirst(), walArchiveDir); 1167 buildHoldingCleanupTracker(); 1168 } 1169 1170 // TODO: In case we are holding up a lot of logs for long time we should 1171 // rewrite old procedures (in theory parent procs) to the new WAL. 1172 } 1173 1174 private void buildHoldingCleanupTracker() { 1175 if (logs.size() <= 1) { 1176 // we only have one wal, so nothing to do 1177 holdingCleanupTracker.reset(); 1178 return; 1179 } 1180 1181 // compute the holding tracker. 1182 // - the first WAL is used for the 'updates' 1183 // - the global tracker will be used to determine whether a procedure has been deleted 1184 // - other trackers will be used to determine whether a procedure has been updated, as a deleted 1185 // procedure can always be detected by checking the global tracker, we can save the deleted 1186 // checks when applying other trackers 1187 holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true); 1188 holdingCleanupTracker.setDeletedIfDeletedByThem(storeTracker); 1189 // the logs is a linked list, so avoid calling get(index) on it. 1190 Iterator<ProcedureWALFile> iter = logs.iterator(); 1191 // skip the tracker for the first file when creating the iterator. 1192 iter.next(); 1193 ProcedureStoreTracker tracker = iter.next().getTracker(); 1194 // testing iter.hasNext after calling iter.next to skip applying the tracker for last file, 1195 // which is just the storeTracker above. 1196 while (iter.hasNext()) { 1197 holdingCleanupTracker.setDeletedIfModifiedInBoth(tracker); 1198 if (holdingCleanupTracker.isEmpty()) { 1199 break; 1200 } 1201 tracker = iter.next().getTracker(); 1202 } 1203 } 1204 1205 /** 1206 * Remove all logs with logId <= {@code lastLogId}. 1207 */ 1208 private void removeAllLogs(long lastLogId, String why) { 1209 if (logs.size() <= 1) { 1210 return; 1211 } 1212 1213 LOG.info("Remove all state logs with ID less than {}, since {}", lastLogId, why); 1214 1215 boolean removed = false; 1216 while (logs.size() > 1) { 1217 ProcedureWALFile log = logs.getFirst(); 1218 if (lastLogId < log.getLogId()) { 1219 break; 1220 } 1221 removeLogFile(log, walArchiveDir); 1222 removed = true; 1223 } 1224 1225 if (removed) { 1226 buildHoldingCleanupTracker(); 1227 } 1228 } 1229 1230 private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) { 1231 try { 1232 LOG.trace("Removing log={}", log); 1233 log.removeFile(walArchiveDir); 1234 logs.remove(log); 1235 LOG.debug("Removed log={}, activeLogs={}", log, logs); 1236 assert logs.size() > 0 : "expected at least one log"; 1237 } catch (IOException e) { 1238 LOG.error("Unable to remove log: " + log, e); 1239 return false; 1240 } 1241 return true; 1242 } 1243 1244 // ========================================================================== 1245 // FileSystem Log Files helpers 1246 // ========================================================================== 1247 public Path getWALDir() { 1248 return this.walDir; 1249 } 1250 1251 Path getWalArchiveDir() { 1252 return this.walArchiveDir; 1253 } 1254 1255 public FileSystem getFileSystem() { 1256 return this.fs; 1257 } 1258 1259 protected Path getLogFilePath(final long logId) throws IOException { 1260 return new Path(walDir, String.format(LOG_PREFIX + "%020d.log", logId)); 1261 } 1262 1263 private static long getLogIdFromName(final String name) { 1264 int end = name.lastIndexOf(".log"); 1265 int start = name.lastIndexOf('-') + 1; 1266 return Long.parseLong(name.substring(start, end)); 1267 } 1268 1269 private static final PathFilter WALS_PATH_FILTER = new PathFilter() { 1270 @Override 1271 public boolean accept(Path path) { 1272 String name = path.getName(); 1273 return name.startsWith(LOG_PREFIX) && name.endsWith(".log"); 1274 } 1275 }; 1276 1277 private static final Comparator<FileStatus> FILE_STATUS_ID_COMPARATOR = 1278 new Comparator<FileStatus>() { 1279 @Override 1280 public int compare(FileStatus a, FileStatus b) { 1281 final long aId = getLogIdFromName(a.getPath().getName()); 1282 final long bId = getLogIdFromName(b.getPath().getName()); 1283 return Long.compare(aId, bId); 1284 } 1285 }; 1286 1287 private FileStatus[] getLogFiles() throws IOException { 1288 try { 1289 FileStatus[] files = fs.listStatus(walDir, WALS_PATH_FILTER); 1290 Arrays.sort(files, FILE_STATUS_ID_COMPARATOR); 1291 return files; 1292 } catch (FileNotFoundException e) { 1293 LOG.warn("Log directory not found: " + e.getMessage()); 1294 return null; 1295 } 1296 } 1297 1298 /** 1299 * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort 1300 * the file set by log id. 1301 * @return Max-LogID of the specified log file set 1302 */ 1303 private static long getMaxLogId(FileStatus[] logFiles) { 1304 if (logFiles == null || logFiles.length == 0) { 1305 return 0L; 1306 } 1307 return getLogIdFromName(logFiles[logFiles.length - 1].getPath().getName()); 1308 } 1309 1310 /** 1311 * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort 1312 * the file set by log id. 1313 * @return Max-LogID of the specified log file set 1314 */ 1315 private long initOldLogs(FileStatus[] logFiles) throws IOException { 1316 if (logFiles == null || logFiles.length == 0) { 1317 return 0L; 1318 } 1319 long maxLogId = 0; 1320 for (int i = 0; i < logFiles.length; ++i) { 1321 final Path logPath = logFiles[i].getPath(); 1322 leaseRecovery.recoverFileLease(fs, logPath); 1323 if (!isRunning()) { 1324 throw new IOException("wal aborting"); 1325 } 1326 1327 maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName())); 1328 ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir); 1329 if (log != null) { 1330 this.logs.add(log); 1331 } 1332 } 1333 initTrackerFromOldLogs(); 1334 return maxLogId; 1335 } 1336 1337 /** 1338 * If last log's tracker is not null, use it as {@link #storeTracker}. Otherwise, set storeTracker 1339 * as partial, and let {@link ProcedureWALFormatReader} rebuild it using entries in the log. 1340 */ 1341 private void initTrackerFromOldLogs() { 1342 if (logs.isEmpty() || !isRunning()) { 1343 return; 1344 } 1345 ProcedureWALFile log = logs.getLast(); 1346 if (!log.getTracker().isPartial()) { 1347 storeTracker.resetTo(log.getTracker()); 1348 } else { 1349 storeTracker.reset(); 1350 storeTracker.setPartialFlag(true); 1351 } 1352 } 1353 1354 /** 1355 * Loads given log file and it's tracker. 1356 */ 1357 private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir) 1358 throws IOException { 1359 final ProcedureWALFile log = new ProcedureWALFile(fs, logFile); 1360 if (logFile.getLen() == 0) { 1361 LOG.warn("Remove uninitialized log: {}", logFile); 1362 log.removeFile(walArchiveDir); 1363 return null; 1364 } 1365 LOG.debug("Opening Pv2 {}", logFile); 1366 try { 1367 log.open(); 1368 } catch (ProcedureWALFormat.InvalidWALDataException e) { 1369 LOG.warn("Remove uninitialized log: {}", logFile, e); 1370 log.removeFile(walArchiveDir); 1371 return null; 1372 } catch (IOException e) { 1373 String msg = "Unable to read state log: " + logFile; 1374 LOG.error(msg, e); 1375 throw new IOException(msg, e); 1376 } 1377 1378 try { 1379 log.readTracker(); 1380 } catch (IOException e) { 1381 log.getTracker().reset(); 1382 log.getTracker().setPartialFlag(true); 1383 LOG.warn("Unable to read tracker for {}", log, e); 1384 } 1385 1386 log.close(); 1387 return log; 1388 } 1389 1390 /** 1391 * Parses a directory of WALs building up ProcedureState. For testing parse and profiling. 1392 * @param args Include pointer to directory of WAL files for a store instance to parse & load. 1393 */ 1394 public static void main(String[] args) throws IOException { 1395 Configuration conf = HBaseConfiguration.create(); 1396 if (args == null || args.length != 1) { 1397 System.out.println("ERROR: Empty arguments list; pass path to MASTERPROCWALS_DIR."); 1398 System.out.println("Usage: WALProcedureStore MASTERPROCWALS_DIR"); 1399 System.exit(-1); 1400 } 1401 WALProcedureStore store = 1402 new WALProcedureStore(conf, new Path(args[0]), null, new LeaseRecovery() { 1403 @Override 1404 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 1405 // no-op 1406 } 1407 }); 1408 try { 1409 store.start(16); 1410 ProcedureExecutor<?> pe = 1411 new ProcedureExecutor<>(conf, new Object()/* Pass anything */, store); 1412 pe.init(1, true); 1413 } finally { 1414 store.stop(true); 1415 } 1416 } 1417}