001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Comparator; 025import java.util.HashSet; 026import java.util.Iterator; 027import java.util.LinkedList; 028import java.util.Set; 029import java.util.concurrent.LinkedTransferQueue; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.atomic.AtomicLong; 033import java.util.concurrent.atomic.AtomicReference; 034import java.util.concurrent.locks.Condition; 035import java.util.concurrent.locks.ReentrantLock; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FSDataOutputStream; 038import org.apache.hadoop.fs.FSDataOutputStreamBuilder; 039import org.apache.hadoop.fs.FSError; 040import org.apache.hadoop.fs.FileAlreadyExistsException; 041import org.apache.hadoop.fs.FileStatus; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.fs.PathFilter; 045import org.apache.hadoop.fs.StreamCapabilities; 046import org.apache.hadoop.hbase.HBaseConfiguration; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.log.HBaseMarkers; 049import org.apache.hadoop.hbase.procedure2.Procedure; 050import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 051import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery; 052import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 053import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase; 054import org.apache.hadoop.hbase.procedure2.util.ByteSlot; 055import org.apache.hadoop.hbase.procedure2.util.StringUtils; 056import org.apache.hadoop.hbase.util.CommonFSUtils; 057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 058import org.apache.hadoop.hbase.util.Threads; 059import org.apache.hadoop.hdfs.DistributedFileSystem; 060import org.apache.hadoop.ipc.RemoteException; 061import org.apache.yetus.audience.InterfaceAudience; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue; 066 067import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; 068 069/** 070 * WAL implementation of the ProcedureStore. 071 * <p/> 072 * When starting, the upper layer will first call {@link #start(int)}, then {@link #recoverLease()}, 073 * then {@link #load(ProcedureLoader)}. 074 * <p/> 075 * In {@link #recoverLease()}, we will get the lease by closing all the existing wal files(by 076 * calling recoverFileLease), and creating a new wal writer. And we will also get the list of all 077 * the old wal files. 078 * <p/> 079 * FIXME: notice that the current recover lease implementation is problematic, it can not deal with 080 * the races if there are two master both wants to acquire the lease... 081 * <p/> 082 * In {@link #load(ProcedureLoader)} method, we will load all the active procedures. See the 083 * comments of this method for more details. 084 * <p/> 085 * The actual logging way is a bit like our FileSystem based WAL implementation as RS side. There is 086 * a {@link #slots}, which is more like the ring buffer, and in the insert, update and delete 087 * methods we will put thing into the {@link #slots} and wait. And there is a background sync 088 * thread(see the {@link #syncLoop()} method) which get data from the {@link #slots} and write them 089 * to the FileSystem, and notify the caller that we have finished. 090 * <p/> 091 * TODO: try using disruptor to increase performance and simplify the logic? 092 * <p/> 093 * The {@link #storeTracker} keeps track of the modified procedures in the newest wal file, which is 094 * also the one being written currently. And the deleted bits in it are for all the procedures, not 095 * only the ones in the newest wal file. And when rolling a log, we will first store it in the 096 * trailer of the current wal file, and then reset its modified bits, so that it can start to track 097 * the modified procedures for the new wal file. 098 * <p/> 099 * The {@link #holdingCleanupTracker} is used to test whether we are safe to delete the oldest wal 100 * file. When there are log rolling and there are more than 1 wal files, we will make use of it. It 101 * will first be initialized to the oldest file's tracker(which is stored in the trailer), using the 102 * method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it 103 * with the tracker of every newer wal files, using the 104 * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}. If we find out 105 * that all the modified procedures for the oldest wal file are modified or deleted in newer wal 106 * files, then we can delete it. This is because that, every time we call 107 * {@link ProcedureStore#insert(Procedure[])} or {@link ProcedureStore#update(Procedure)}, we will 108 * persist the full state of a Procedure, so the earlier wal records for this procedure can all be 109 * deleted. 110 * @see ProcedureWALPrettyPrinter for printing content of a single WAL. 111 * @see #main(String[]) to parse a directory of MasterWALProcs. 112 * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we 113 * use the new region based procedure store. 114 */ 115@Deprecated 116@InterfaceAudience.Private 117public class WALProcedureStore extends ProcedureStoreBase { 118 private static final Logger LOG = LoggerFactory.getLogger(WALProcedureStore.class); 119 public static final String LOG_PREFIX = "pv2-"; 120 /** Used to construct the name of the log directory for master procedures */ 121 public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs"; 122 123 public static final String WAL_COUNT_WARN_THRESHOLD_CONF_KEY = 124 "hbase.procedure.store.wal.warn.threshold"; 125 private static final int DEFAULT_WAL_COUNT_WARN_THRESHOLD = 10; 126 127 public static final String EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY = 128 "hbase.procedure.store.wal.exec.cleanup.on.load"; 129 private static final boolean DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY = true; 130 131 public static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY = 132 "hbase.procedure.store.wal.max.retries.before.roll"; 133 private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3; 134 135 public static final String WAIT_BEFORE_ROLL_CONF_KEY = 136 "hbase.procedure.store.wal.wait.before.roll"; 137 private static final int DEFAULT_WAIT_BEFORE_ROLL = 500; 138 139 public static final String ROLL_RETRIES_CONF_KEY = "hbase.procedure.store.wal.max.roll.retries"; 140 private static final int DEFAULT_ROLL_RETRIES = 3; 141 142 public static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY = 143 "hbase.procedure.store.wal.sync.failure.roll.max"; 144 private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3; 145 146 public static final String PERIODIC_ROLL_CONF_KEY = 147 "hbase.procedure.store.wal.periodic.roll.msec"; 148 private static final int DEFAULT_PERIODIC_ROLL = 60 * 60 * 1000; // 1h 149 150 public static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec"; 151 private static final int DEFAULT_SYNC_WAIT_MSEC = 100; 152 153 public static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync"; 154 private static final boolean DEFAULT_USE_HSYNC = true; 155 156 public static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold"; 157 private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M 158 159 public static final String STORE_WAL_SYNC_STATS_COUNT = 160 "hbase.procedure.store.wal.sync.stats.count"; 161 private static final int DEFAULT_SYNC_STATS_COUNT = 10; 162 163 private final LinkedList<ProcedureWALFile> logs = new LinkedList<>(); 164 private final ProcedureStoreTracker holdingCleanupTracker = new ProcedureStoreTracker(); 165 private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker(); 166 private final ReentrantLock lock = new ReentrantLock(); 167 private final Condition waitCond = lock.newCondition(); 168 private final Condition slotCond = lock.newCondition(); 169 private final Condition syncCond = lock.newCondition(); 170 171 private final LeaseRecovery leaseRecovery; 172 private final Configuration conf; 173 private final FileSystem fs; 174 private final Path walDir; 175 private final Path walArchiveDir; 176 private final boolean enforceStreamCapability; 177 178 private final AtomicReference<Throwable> syncException = new AtomicReference<>(); 179 private final AtomicBoolean loading = new AtomicBoolean(true); 180 private final AtomicBoolean inSync = new AtomicBoolean(false); 181 private final AtomicLong totalSynced = new AtomicLong(0); 182 private final AtomicLong lastRollTs = new AtomicLong(0); 183 private final AtomicLong syncId = new AtomicLong(0); 184 185 private LinkedTransferQueue<ByteSlot> slotsCache = null; 186 private Set<ProcedureWALFile> corruptedLogs = null; 187 private FSDataOutputStream stream = null; 188 private int runningProcCount = 1; 189 private long flushLogId = 0; 190 private int syncMaxSlot = 1; 191 private int slotIndex = 0; 192 private Thread syncThread; 193 private ByteSlot[] slots; 194 195 private int walCountWarnThreshold; 196 private int maxRetriesBeforeRoll; 197 private int maxSyncFailureRoll; 198 private int waitBeforeRoll; 199 private int rollRetries; 200 private int periodicRollMsec; 201 private long rollThreshold; 202 private boolean useHsync; 203 private int syncWaitMsec; 204 205 // Variables used for UI display 206 private CircularFifoQueue<SyncMetrics> syncMetricsQueue; 207 208 public static class SyncMetrics { 209 private long timestamp; 210 private long syncWaitMs; 211 private long totalSyncedBytes; 212 private int syncedEntries; 213 private float syncedPerSec; 214 215 public long getTimestamp() { 216 return timestamp; 217 } 218 219 public long getSyncWaitMs() { 220 return syncWaitMs; 221 } 222 223 public long getTotalSyncedBytes() { 224 return totalSyncedBytes; 225 } 226 227 public long getSyncedEntries() { 228 return syncedEntries; 229 } 230 231 public float getSyncedPerSec() { 232 return syncedPerSec; 233 } 234 } 235 236 public WALProcedureStore(Configuration conf, LeaseRecovery leaseRecovery) throws IOException { 237 this(conf, new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR), 238 new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_OLDLOGDIR_NAME), 239 leaseRecovery); 240 } 241 242 public WALProcedureStore(final Configuration conf, final Path walDir, final Path walArchiveDir, 243 final LeaseRecovery leaseRecovery) throws IOException { 244 this.conf = conf; 245 this.leaseRecovery = leaseRecovery; 246 this.walDir = walDir; 247 this.walArchiveDir = walArchiveDir; 248 this.fs = CommonFSUtils.getWALFileSystem(conf); 249 this.enforceStreamCapability = 250 conf.getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true); 251 252 // Create the log directory for the procedure store 253 if (!fs.exists(walDir)) { 254 if (!fs.mkdirs(walDir)) { 255 throw new IOException("Unable to mkdir " + walDir); 256 } 257 } 258 // Now that it exists, set the log policy 259 String storagePolicy = 260 conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); 261 CommonFSUtils.setStoragePolicy(fs, walDir, storagePolicy); 262 263 // Create archive dir up front. Rename won't work w/o it up on HDFS. 264 if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) { 265 if (this.fs.mkdirs(this.walArchiveDir)) { 266 LOG.debug("Created Procedure Store WAL archive dir {}", this.walArchiveDir); 267 } else { 268 LOG.warn("Failed create of {}", this.walArchiveDir); 269 } 270 } 271 } 272 273 @Override 274 public void start(int numSlots) throws IOException { 275 if (!setRunning(true)) { 276 return; 277 } 278 279 // Init buffer slots 280 loading.set(true); 281 runningProcCount = numSlots; 282 syncMaxSlot = numSlots; 283 slots = new ByteSlot[numSlots]; 284 slotsCache = new LinkedTransferQueue<>(); 285 while (slotsCache.size() < numSlots) { 286 slotsCache.offer(new ByteSlot()); 287 } 288 289 // Tunings 290 walCountWarnThreshold = 291 conf.getInt(WAL_COUNT_WARN_THRESHOLD_CONF_KEY, DEFAULT_WAL_COUNT_WARN_THRESHOLD); 292 maxRetriesBeforeRoll = 293 conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, DEFAULT_MAX_RETRIES_BEFORE_ROLL); 294 maxSyncFailureRoll = conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, DEFAULT_MAX_SYNC_FAILURE_ROLL); 295 waitBeforeRoll = conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, DEFAULT_WAIT_BEFORE_ROLL); 296 rollRetries = conf.getInt(ROLL_RETRIES_CONF_KEY, DEFAULT_ROLL_RETRIES); 297 rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD); 298 periodicRollMsec = conf.getInt(PERIODIC_ROLL_CONF_KEY, DEFAULT_PERIODIC_ROLL); 299 syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC); 300 useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC); 301 302 // WebUI 303 syncMetricsQueue = 304 new CircularFifoQueue<>(conf.getInt(STORE_WAL_SYNC_STATS_COUNT, DEFAULT_SYNC_STATS_COUNT)); 305 306 // Init sync thread 307 syncThread = new Thread("WALProcedureStoreSyncThread") { 308 @Override 309 public void run() { 310 try { 311 syncLoop(); 312 } catch (Throwable e) { 313 LOG.error("Got an exception from the sync-loop", e); 314 if (!isSyncAborted()) { 315 sendAbortProcessSignal(); 316 } 317 } 318 } 319 }; 320 syncThread.start(); 321 } 322 323 @Override 324 public void stop(final boolean abort) { 325 if (!setRunning(false)) { 326 return; 327 } 328 329 LOG.info("Stopping the WAL Procedure Store, isAbort=" + abort 330 + (isSyncAborted() ? " (self aborting)" : "")); 331 sendStopSignal(); 332 if (!isSyncAborted()) { 333 try { 334 while (syncThread.isAlive()) { 335 sendStopSignal(); 336 syncThread.join(250); 337 } 338 } catch (InterruptedException e) { 339 LOG.warn("join interrupted", e); 340 Thread.currentThread().interrupt(); 341 } 342 } 343 344 // Close the writer 345 closeCurrentLogStream(abort); 346 347 // Close the old logs 348 // they should be already closed, this is just in case the load fails 349 // and we call start() and then stop() 350 for (ProcedureWALFile log : logs) { 351 log.close(); 352 } 353 logs.clear(); 354 loading.set(true); 355 } 356 357 private void sendStopSignal() { 358 if (lock.tryLock()) { 359 try { 360 waitCond.signalAll(); 361 syncCond.signalAll(); 362 } finally { 363 lock.unlock(); 364 } 365 } 366 } 367 368 @Override 369 public int getNumThreads() { 370 return slots == null ? 0 : slots.length; 371 } 372 373 @Override 374 public int setRunningProcedureCount(final int count) { 375 this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length; 376 return this.runningProcCount; 377 } 378 379 public ProcedureStoreTracker getStoreTracker() { 380 return storeTracker; 381 } 382 383 public ArrayList<ProcedureWALFile> getActiveLogs() { 384 lock.lock(); 385 try { 386 return new ArrayList<>(logs); 387 } finally { 388 lock.unlock(); 389 } 390 } 391 392 public Set<ProcedureWALFile> getCorruptedLogs() { 393 return corruptedLogs; 394 } 395 396 @Override 397 public void recoverLease() throws IOException { 398 lock.lock(); 399 try { 400 LOG.debug("Starting WAL Procedure Store lease recovery"); 401 boolean afterFirstAttempt = false; 402 while (isRunning()) { 403 // Don't sleep before first attempt 404 if (afterFirstAttempt) { 405 LOG.trace("Sleep {} ms after first lease recovery attempt.", waitBeforeRoll); 406 Threads.sleepWithoutInterrupt(waitBeforeRoll); 407 } else { 408 afterFirstAttempt = true; 409 } 410 FileStatus[] oldLogs = getLogFiles(); 411 // Get Log-MaxID and recover lease on old logs 412 try { 413 flushLogId = initOldLogs(oldLogs); 414 } catch (FileNotFoundException e) { 415 LOG.warn("Someone else is active and deleted logs. retrying.", e); 416 continue; 417 } 418 419 // Create new state-log 420 if (!rollWriter(flushLogId + 1)) { 421 // someone else has already created this log 422 LOG.debug("Someone else has already created log {}. Retrying.", flushLogId); 423 continue; 424 } 425 426 // We have the lease on the log 427 oldLogs = getLogFiles(); 428 if (getMaxLogId(oldLogs) > flushLogId) { 429 LOG.debug("Someone else created new logs. Expected maxLogId < {}", flushLogId); 430 logs.getLast().removeFile(this.walArchiveDir); 431 continue; 432 } 433 434 LOG.debug("Lease acquired for flushLogId={}", flushLogId); 435 break; 436 } 437 } finally { 438 lock.unlock(); 439 } 440 } 441 442 @Override 443 public void load(ProcedureLoader loader) throws IOException { 444 lock.lock(); 445 try { 446 if (logs.isEmpty()) { 447 throw new IllegalStateException("recoverLease() must be called before loading data"); 448 } 449 450 // Nothing to do, If we have only the current log. 451 if (logs.size() == 1) { 452 LOG.debug("No state logs to replay."); 453 loader.setMaxProcId(0); 454 loading.set(false); 455 return; 456 } 457 458 // Load the old logs 459 Iterator<ProcedureWALFile> it = logs.descendingIterator(); 460 it.next(); // Skip the current log 461 462 ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() { 463 464 @Override 465 public void setMaxProcId(long maxProcId) { 466 loader.setMaxProcId(maxProcId); 467 } 468 469 @Override 470 public void load(ProcedureIterator procIter) throws IOException { 471 loader.load(procIter); 472 } 473 474 @Override 475 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 476 loader.handleCorrupted(procIter); 477 } 478 479 @Override 480 public void markCorruptedWAL(ProcedureWALFile log, IOException e) { 481 if (corruptedLogs == null) { 482 corruptedLogs = new HashSet<>(); 483 } 484 corruptedLogs.add(log); 485 // TODO: sideline corrupted log 486 } 487 }); 488 // if we fail when loading, we should prevent persisting the storeTracker later in the stop 489 // method. As it may happen that, we have finished constructing the modified and deleted bits, 490 // but before we call resetModified, we fail, then if we persist the storeTracker then when 491 // restarting, we will consider that all procedures have been included in this file and delete 492 // all the previous files. Obviously this not correct. So here we will only set loading to 493 // false when we successfully loaded all the procedures, and when closing we will skip 494 // persisting the store tracker. And also, this will prevent the sync thread to do 495 // periodicRoll, where we may also clean old logs. 496 loading.set(false); 497 // try to cleanup inactive wals and complete the operation 498 buildHoldingCleanupTracker(); 499 tryCleanupLogsOnLoad(); 500 } finally { 501 lock.unlock(); 502 } 503 } 504 505 private void tryCleanupLogsOnLoad() { 506 // nothing to cleanup. 507 if (logs.size() <= 1) { 508 return; 509 } 510 511 // the config says to not cleanup wals on load. 512 if ( 513 !conf.getBoolean(EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY, DEFAULT_EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY) 514 ) { 515 LOG.debug("WALs cleanup on load is not enabled: " + getActiveLogs()); 516 return; 517 } 518 519 try { 520 periodicRoll(); 521 } catch (IOException e) { 522 LOG.warn("Unable to cleanup logs on load: " + e.getMessage(), e); 523 } 524 } 525 526 @Override 527 public void insert(Procedure<?> proc, Procedure<?>[] subprocs) { 528 if (LOG.isTraceEnabled()) { 529 LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs)); 530 } 531 532 ByteSlot slot = acquireSlot(); 533 try { 534 // Serialize the insert 535 long[] subProcIds = null; 536 if (subprocs != null) { 537 ProcedureWALFormat.writeInsert(slot, proc, subprocs); 538 subProcIds = new long[subprocs.length]; 539 for (int i = 0; i < subprocs.length; ++i) { 540 subProcIds[i] = subprocs[i].getProcId(); 541 } 542 } else { 543 assert !proc.hasParent(); 544 ProcedureWALFormat.writeInsert(slot, proc); 545 } 546 547 // Push the transaction data and wait until it is persisted 548 pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds); 549 } catch (IOException e) { 550 // We are not able to serialize the procedure. 551 // this is a code error, and we are not able to go on. 552 LOG.error(HBaseMarkers.FATAL, "Unable to serialize one of the procedure: proc=" + proc 553 + ", subprocs=" + Arrays.toString(subprocs), e); 554 throw new RuntimeException(e); 555 } finally { 556 releaseSlot(slot); 557 } 558 } 559 560 @Override 561 public void insert(Procedure<?>[] procs) { 562 if (LOG.isTraceEnabled()) { 563 LOG.trace("Insert " + Arrays.toString(procs)); 564 } 565 566 ByteSlot slot = acquireSlot(); 567 try { 568 // Serialize the insert 569 long[] procIds = new long[procs.length]; 570 for (int i = 0; i < procs.length; ++i) { 571 assert !procs[i].hasParent(); 572 procIds[i] = procs[i].getProcId(); 573 ProcedureWALFormat.writeInsert(slot, procs[i]); 574 } 575 576 // Push the transaction data and wait until it is persisted 577 pushData(PushType.INSERT, slot, Procedure.NO_PROC_ID, procIds); 578 } catch (IOException e) { 579 // We are not able to serialize the procedure. 580 // this is a code error, and we are not able to go on. 581 LOG.error(HBaseMarkers.FATAL, 582 "Unable to serialize one of the procedure: " + Arrays.toString(procs), e); 583 throw new RuntimeException(e); 584 } finally { 585 releaseSlot(slot); 586 } 587 } 588 589 @Override 590 public void update(Procedure<?> proc) { 591 if (LOG.isTraceEnabled()) { 592 LOG.trace("Update " + proc); 593 } 594 595 ByteSlot slot = acquireSlot(); 596 try { 597 // Serialize the update 598 ProcedureWALFormat.writeUpdate(slot, proc); 599 600 // Push the transaction data and wait until it is persisted 601 pushData(PushType.UPDATE, slot, proc.getProcId(), null); 602 } catch (IOException e) { 603 // We are not able to serialize the procedure. 604 // this is a code error, and we are not able to go on. 605 LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + proc, e); 606 throw new RuntimeException(e); 607 } finally { 608 releaseSlot(slot); 609 } 610 } 611 612 @Override 613 public void delete(long procId) { 614 LOG.trace("Delete {}", procId); 615 ByteSlot slot = acquireSlot(); 616 try { 617 // Serialize the delete 618 ProcedureWALFormat.writeDelete(slot, procId); 619 620 // Push the transaction data and wait until it is persisted 621 pushData(PushType.DELETE, slot, procId, null); 622 } catch (IOException e) { 623 // We are not able to serialize the procedure. 624 // this is a code error, and we are not able to go on. 625 LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + procId, e); 626 throw new RuntimeException(e); 627 } finally { 628 releaseSlot(slot); 629 } 630 } 631 632 @Override 633 public void delete(Procedure<?> proc, long[] subProcIds) { 634 assert proc != null : "expected a non-null procedure"; 635 assert subProcIds != null && subProcIds.length > 0 : "expected subProcIds"; 636 if (LOG.isTraceEnabled()) { 637 LOG.trace("Update " + proc + " and Delete " + Arrays.toString(subProcIds)); 638 } 639 640 ByteSlot slot = acquireSlot(); 641 try { 642 // Serialize the delete 643 ProcedureWALFormat.writeDelete(slot, proc, subProcIds); 644 645 // Push the transaction data and wait until it is persisted 646 pushData(PushType.DELETE, slot, proc.getProcId(), subProcIds); 647 } catch (IOException e) { 648 // We are not able to serialize the procedure. 649 // this is a code error, and we are not able to go on. 650 LOG.error(HBaseMarkers.FATAL, "Unable to serialize the procedure: " + proc, e); 651 throw new RuntimeException(e); 652 } finally { 653 releaseSlot(slot); 654 } 655 } 656 657 @Override 658 public void delete(final long[] procIds, final int offset, final int count) { 659 if (count == 0) { 660 return; 661 } 662 663 if (offset == 0 && count == procIds.length) { 664 delete(procIds); 665 } else if (count == 1) { 666 delete(procIds[offset]); 667 } else { 668 delete(Arrays.copyOfRange(procIds, offset, offset + count)); 669 } 670 } 671 672 private void delete(long[] procIds) { 673 if (LOG.isTraceEnabled()) { 674 LOG.trace("Delete " + Arrays.toString(procIds)); 675 } 676 677 final ByteSlot slot = acquireSlot(); 678 try { 679 // Serialize the delete 680 for (int i = 0; i < procIds.length; ++i) { 681 ProcedureWALFormat.writeDelete(slot, procIds[i]); 682 } 683 684 // Push the transaction data and wait until it is persisted 685 pushData(PushType.DELETE, slot, Procedure.NO_PROC_ID, procIds); 686 } catch (IOException e) { 687 // We are not able to serialize the procedure. 688 // this is a code error, and we are not able to go on. 689 LOG.error("Unable to serialize the procedures: " + Arrays.toString(procIds), e); 690 throw new RuntimeException(e); 691 } finally { 692 releaseSlot(slot); 693 } 694 } 695 696 private ByteSlot acquireSlot() { 697 ByteSlot slot = slotsCache.poll(); 698 return slot != null ? slot : new ByteSlot(); 699 } 700 701 private void releaseSlot(final ByteSlot slot) { 702 slot.reset(); 703 slotsCache.offer(slot); 704 } 705 706 private enum PushType { 707 INSERT, 708 UPDATE, 709 DELETE 710 } 711 712 private long pushData(final PushType type, final ByteSlot slot, final long procId, 713 final long[] subProcIds) { 714 if (!isRunning()) { 715 throw new RuntimeException("the store must be running before inserting data"); 716 } 717 if (logs.isEmpty()) { 718 throw new RuntimeException("recoverLease() must be called before inserting data"); 719 } 720 721 long logId = -1; 722 lock.lock(); 723 try { 724 // Wait for the sync to be completed 725 while (true) { 726 if (!isRunning()) { 727 throw new RuntimeException("store no longer running"); 728 } else if (isSyncAborted()) { 729 throw new RuntimeException("sync aborted", syncException.get()); 730 } else if (inSync.get()) { 731 syncCond.await(); 732 } else if (slotIndex >= syncMaxSlot) { 733 slotCond.signal(); 734 syncCond.await(); 735 } else { 736 break; 737 } 738 } 739 740 final long pushSyncId = syncId.get(); 741 updateStoreTracker(type, procId, subProcIds); 742 slots[slotIndex++] = slot; 743 logId = flushLogId; 744 745 // Notify that there is new data 746 if (slotIndex == 1) { 747 waitCond.signal(); 748 } 749 750 // Notify that the slots are full 751 if (slotIndex == syncMaxSlot) { 752 waitCond.signal(); 753 slotCond.signal(); 754 } 755 756 while (pushSyncId == syncId.get() && isRunning()) { 757 syncCond.await(); 758 } 759 } catch (InterruptedException e) { 760 Thread.currentThread().interrupt(); 761 sendAbortProcessSignal(); 762 throw new RuntimeException(e); 763 } finally { 764 lock.unlock(); 765 if (isSyncAborted()) { 766 throw new RuntimeException("sync aborted", syncException.get()); 767 } 768 } 769 return logId; 770 } 771 772 private void updateStoreTracker(final PushType type, final long procId, final long[] subProcIds) { 773 switch (type) { 774 case INSERT: 775 if (subProcIds == null) { 776 storeTracker.insert(procId); 777 } else if (procId == Procedure.NO_PROC_ID) { 778 storeTracker.insert(subProcIds); 779 } else { 780 storeTracker.insert(procId, subProcIds); 781 holdingCleanupTracker.setDeletedIfModified(procId); 782 } 783 break; 784 case UPDATE: 785 storeTracker.update(procId); 786 holdingCleanupTracker.setDeletedIfModified(procId); 787 break; 788 case DELETE: 789 if (subProcIds != null && subProcIds.length > 0) { 790 storeTracker.delete(subProcIds); 791 holdingCleanupTracker.setDeletedIfModified(subProcIds); 792 } else { 793 storeTracker.delete(procId); 794 holdingCleanupTracker.setDeletedIfModified(procId); 795 } 796 break; 797 default: 798 throw new RuntimeException("invalid push type " + type); 799 } 800 } 801 802 private boolean isSyncAborted() { 803 return syncException.get() != null; 804 } 805 806 private void syncLoop() throws Throwable { 807 long totalSyncedToStore = 0; 808 inSync.set(false); 809 lock.lock(); 810 try { 811 while (isRunning()) { 812 try { 813 // Wait until new data is available 814 if (slotIndex == 0) { 815 if (!loading.get()) { 816 periodicRoll(); 817 } 818 819 if (LOG.isTraceEnabled()) { 820 float rollTsSec = getMillisFromLastRoll() / 1000.0f; 821 LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)", 822 StringUtils.humanSize(totalSynced.get()), 823 StringUtils.humanSize(totalSynced.get() / rollTsSec))); 824 } 825 826 waitCond.await(getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS); 827 if (slotIndex == 0) { 828 // no data.. probably a stop() or a periodic roll 829 continue; 830 } 831 } 832 // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing 833 syncMaxSlot = runningProcCount; 834 assert syncMaxSlot > 0 : "unexpected syncMaxSlot=" + syncMaxSlot; 835 final long syncWaitSt = EnvironmentEdgeManager.currentTime(); 836 if (slotIndex != syncMaxSlot) { 837 slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS); 838 } 839 840 final long currentTs = EnvironmentEdgeManager.currentTime(); 841 final long syncWaitMs = currentTs - syncWaitSt; 842 final float rollSec = getMillisFromLastRoll() / 1000.0f; 843 final float syncedPerSec = totalSyncedToStore / rollSec; 844 if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < syncMaxSlot)) { 845 LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)", 846 StringUtils.humanTimeDiff(syncWaitMs), slotIndex, 847 StringUtils.humanSize(totalSyncedToStore), StringUtils.humanSize(syncedPerSec))); 848 } 849 850 // update webui circular buffers (TODO: get rid of allocations) 851 final SyncMetrics syncMetrics = new SyncMetrics(); 852 syncMetrics.timestamp = currentTs; 853 syncMetrics.syncWaitMs = syncWaitMs; 854 syncMetrics.syncedEntries = slotIndex; 855 syncMetrics.totalSyncedBytes = totalSyncedToStore; 856 syncMetrics.syncedPerSec = syncedPerSec; 857 syncMetricsQueue.add(syncMetrics); 858 859 // sync 860 inSync.set(true); 861 long slotSize = syncSlots(); 862 logs.getLast().addToSize(slotSize); 863 totalSyncedToStore = totalSynced.addAndGet(slotSize); 864 slotIndex = 0; 865 inSync.set(false); 866 syncId.incrementAndGet(); 867 } catch (InterruptedException e) { 868 Thread.currentThread().interrupt(); 869 syncException.compareAndSet(null, e); 870 sendAbortProcessSignal(); 871 throw e; 872 } catch (Throwable t) { 873 syncException.compareAndSet(null, t); 874 sendAbortProcessSignal(); 875 throw t; 876 } finally { 877 syncCond.signalAll(); 878 } 879 } 880 } finally { 881 lock.unlock(); 882 } 883 } 884 885 public ArrayList<SyncMetrics> getSyncMetrics() { 886 lock.lock(); 887 try { 888 return new ArrayList<>(syncMetricsQueue); 889 } finally { 890 lock.unlock(); 891 } 892 } 893 894 private long syncSlots() throws Throwable { 895 int retry = 0; 896 int logRolled = 0; 897 long totalSynced = 0; 898 do { 899 try { 900 totalSynced = syncSlots(stream, slots, 0, slotIndex); 901 break; 902 } catch (Throwable e) { 903 LOG.warn("unable to sync slots, retry=" + retry); 904 if (++retry >= maxRetriesBeforeRoll) { 905 if (logRolled >= maxSyncFailureRoll && isRunning()) { 906 LOG.error("Sync slots after log roll failed, abort.", e); 907 throw e; 908 } 909 910 if (!rollWriterWithRetries()) { 911 throw e; 912 } 913 914 logRolled++; 915 retry = 0; 916 } 917 } 918 } while (isRunning()); 919 return totalSynced; 920 } 921 922 protected long syncSlots(final FSDataOutputStream stream, final ByteSlot[] slots, 923 final int offset, final int count) throws IOException { 924 long totalSynced = 0; 925 for (int i = 0; i < count; ++i) { 926 final ByteSlot data = slots[offset + i]; 927 data.writeTo(stream); 928 totalSynced += data.size(); 929 } 930 931 syncStream(stream); 932 sendPostSyncSignal(); 933 934 if (LOG.isTraceEnabled()) { 935 LOG.trace("Sync slots=" + count + '/' + syncMaxSlot + ", flushed=" 936 + StringUtils.humanSize(totalSynced)); 937 } 938 return totalSynced; 939 } 940 941 protected void syncStream(final FSDataOutputStream stream) throws IOException { 942 if (useHsync) { 943 stream.hsync(); 944 } else { 945 stream.hflush(); 946 } 947 } 948 949 private boolean rollWriterWithRetries() { 950 for (int i = 0; i < rollRetries && isRunning(); ++i) { 951 if (i > 0) { 952 Threads.sleepWithoutInterrupt(waitBeforeRoll * i); 953 } 954 955 try { 956 if (rollWriter()) { 957 return true; 958 } 959 } catch (IOException e) { 960 LOG.warn("Unable to roll the log, attempt=" + (i + 1), e); 961 } 962 } 963 LOG.error(HBaseMarkers.FATAL, "Unable to roll the log"); 964 return false; 965 } 966 967 private boolean tryRollWriter() { 968 try { 969 return rollWriter(); 970 } catch (IOException e) { 971 LOG.warn("Unable to roll the log", e); 972 return false; 973 } 974 } 975 976 public long getMillisToNextPeriodicRoll() { 977 if (lastRollTs.get() > 0 && periodicRollMsec > 0) { 978 return periodicRollMsec - getMillisFromLastRoll(); 979 } 980 return Long.MAX_VALUE; 981 } 982 983 public long getMillisFromLastRoll() { 984 return (EnvironmentEdgeManager.currentTime() - lastRollTs.get()); 985 } 986 987 void periodicRollForTesting() throws IOException { 988 lock.lock(); 989 try { 990 periodicRoll(); 991 } finally { 992 lock.unlock(); 993 } 994 } 995 996 public boolean rollWriterForTesting() throws IOException { 997 lock.lock(); 998 try { 999 return rollWriter(); 1000 } finally { 1001 lock.unlock(); 1002 } 1003 } 1004 1005 void removeInactiveLogsForTesting() throws Exception { 1006 lock.lock(); 1007 try { 1008 removeInactiveLogs(); 1009 } finally { 1010 lock.unlock(); 1011 } 1012 } 1013 1014 private void periodicRoll() throws IOException { 1015 if (storeTracker.isEmpty()) { 1016 LOG.trace("no active procedures"); 1017 tryRollWriter(); 1018 removeAllLogs(flushLogId - 1, "no active procedures"); 1019 } else { 1020 if (storeTracker.isAllModified()) { 1021 LOG.trace("all the active procedures are in the latest log"); 1022 removeAllLogs(flushLogId - 1, "all the active procedures are in the latest log"); 1023 } 1024 1025 // if the log size has exceeded the roll threshold 1026 // or the periodic roll timeout is expired, try to roll the wal. 1027 if (totalSynced.get() > rollThreshold || getMillisToNextPeriodicRoll() <= 0) { 1028 tryRollWriter(); 1029 } 1030 1031 removeInactiveLogs(); 1032 } 1033 } 1034 1035 private boolean rollWriter() throws IOException { 1036 if (!isRunning()) { 1037 return false; 1038 } 1039 1040 // Create new state-log 1041 if (!rollWriter(flushLogId + 1)) { 1042 LOG.warn("someone else has already created log {}", flushLogId); 1043 return false; 1044 } 1045 1046 // We have the lease on the log, 1047 // but we should check if someone else has created new files 1048 if (getMaxLogId(getLogFiles()) > flushLogId) { 1049 LOG.warn("Someone else created new logs. Expected maxLogId < {}", flushLogId); 1050 logs.getLast().removeFile(this.walArchiveDir); 1051 return false; 1052 } 1053 1054 // We have the lease on the log 1055 return true; 1056 } 1057 1058 boolean rollWriter(long logId) throws IOException { 1059 assert logId > flushLogId : "logId=" + logId + " flushLogId=" + flushLogId; 1060 assert lock.isHeldByCurrentThread() : "expected to be the lock owner. " + lock.isLocked(); 1061 1062 ProcedureWALHeader header = ProcedureWALHeader.newBuilder() 1063 .setVersion(ProcedureWALFormat.HEADER_VERSION).setType(ProcedureWALFormat.LOG_TYPE_STREAM) 1064 .setMinProcId(storeTracker.getActiveMinProcId()).setLogId(logId).build(); 1065 1066 FSDataOutputStream newStream = null; 1067 Path newLogFile = null; 1068 long startPos = -1; 1069 newLogFile = getLogFilePath(logId); 1070 try { 1071 FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(newLogFile).overwrite(false); 1072 if (builder instanceof DistributedFileSystem.HdfsDataOutputStreamBuilder) { 1073 newStream = 1074 ((DistributedFileSystem.HdfsDataOutputStreamBuilder) builder).replicate().build(); 1075 } else { 1076 newStream = builder.build(); 1077 } 1078 } catch (FileAlreadyExistsException e) { 1079 LOG.error("Log file with id={} already exists", logId, e); 1080 return false; 1081 } catch (RemoteException re) { 1082 LOG.warn("failed to create log file with id={}", logId, re); 1083 return false; 1084 } 1085 // After we create the stream but before we attempt to use it at all 1086 // ensure that we can provide the level of data safety we're configured 1087 // to provide. 1088 final String durability = useHsync ? StreamCapabilities.HSYNC : StreamCapabilities.HFLUSH; 1089 if (enforceStreamCapability && !newStream.hasCapability(durability)) { 1090 throw new IllegalStateException("The procedure WAL relies on the ability to " + durability 1091 + " for proper operation during component failures, but the underlying filesystem does " 1092 + "not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY 1093 + "' to set the desired level of robustness and ensure the config value of '" 1094 + CommonFSUtils.HBASE_WAL_DIR + "' points to a FileSystem mount that can provide it."); 1095 } 1096 try { 1097 ProcedureWALFormat.writeHeader(newStream, header); 1098 startPos = newStream.getPos(); 1099 } catch (IOException ioe) { 1100 LOG.warn("Encountered exception writing header", ioe); 1101 newStream.close(); 1102 return false; 1103 } 1104 1105 closeCurrentLogStream(false); 1106 1107 storeTracker.resetModified(); 1108 stream = newStream; 1109 flushLogId = logId; 1110 totalSynced.set(0); 1111 long rollTs = EnvironmentEdgeManager.currentTime(); 1112 lastRollTs.set(rollTs); 1113 logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos, rollTs)); 1114 1115 // if it's the first next WAL being added, build the holding cleanup tracker 1116 if (logs.size() == 2) { 1117 buildHoldingCleanupTracker(); 1118 } else if (logs.size() > walCountWarnThreshold) { 1119 LOG.warn("procedure WALs count={} above the warning threshold {}. check running procedures" 1120 + " to see if something is stuck.", logs.size(), walCountWarnThreshold); 1121 // This is just like what we have done at RS side when there are too many wal files. For RS, 1122 // if there are too many wal files, we will find out the wal entries in the oldest file, and 1123 // tell the upper layer to flush these regions so the wal entries will be useless and then we 1124 // can delete the wal file. For WALProcedureStore, the assumption is that, if all the 1125 // procedures recorded in a proc wal file are modified or deleted in a new proc wal file, then 1126 // we are safe to delete it. So here if there are too many proc wal files, we will find out 1127 // the procedure ids in the oldest file, which are neither modified nor deleted in newer proc 1128 // wal files, and tell upper layer to update the state of these procedures to the newest proc 1129 // wal file(by calling ProcedureStore.update), then we are safe to delete the oldest proc wal 1130 // file. 1131 sendForceUpdateSignal(holdingCleanupTracker.getAllActiveProcIds()); 1132 } 1133 1134 LOG.info("Rolled new Procedure Store WAL, id={}", logId); 1135 return true; 1136 } 1137 1138 private void closeCurrentLogStream(boolean abort) { 1139 if (stream == null || logs.isEmpty()) { 1140 return; 1141 } 1142 1143 try { 1144 ProcedureWALFile log = logs.getLast(); 1145 // If the loading flag is true, it usually means that we fail when loading procedures, so we 1146 // should not persist the store tracker, as its state may not be correct. 1147 if (!loading.get()) { 1148 log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId()); 1149 log.updateLocalTracker(storeTracker); 1150 if (!abort) { 1151 long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker); 1152 log.addToSize(trailerSize); 1153 } 1154 } 1155 } catch (IOException | FSError e) { 1156 LOG.warn("Unable to write the trailer", e); 1157 } 1158 try { 1159 stream.close(); 1160 } catch (IOException | FSError e) { 1161 LOG.error("Unable to close the stream", e); 1162 } 1163 stream = null; 1164 } 1165 1166 // ========================================================================== 1167 // Log Files cleaner helpers 1168 // ========================================================================== 1169 private void removeInactiveLogs() throws IOException { 1170 // We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'. 1171 // once there is nothing olding the oldest WAL we can remove it. 1172 while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) { 1173 LOG.info("Remove the oldest log {}", logs.getFirst()); 1174 removeLogFile(logs.getFirst(), walArchiveDir); 1175 buildHoldingCleanupTracker(); 1176 } 1177 1178 // TODO: In case we are holding up a lot of logs for long time we should 1179 // rewrite old procedures (in theory parent procs) to the new WAL. 1180 } 1181 1182 private void buildHoldingCleanupTracker() { 1183 if (logs.size() <= 1) { 1184 // we only have one wal, so nothing to do 1185 holdingCleanupTracker.reset(); 1186 return; 1187 } 1188 1189 // compute the holding tracker. 1190 // - the first WAL is used for the 'updates' 1191 // - the global tracker will be used to determine whether a procedure has been deleted 1192 // - other trackers will be used to determine whether a procedure has been updated, as a deleted 1193 // procedure can always be detected by checking the global tracker, we can save the deleted 1194 // checks when applying other trackers 1195 holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true); 1196 holdingCleanupTracker.setDeletedIfDeletedByThem(storeTracker); 1197 // the logs is a linked list, so avoid calling get(index) on it. 1198 Iterator<ProcedureWALFile> iter = logs.iterator(); 1199 // skip the tracker for the first file when creating the iterator. 1200 iter.next(); 1201 ProcedureStoreTracker tracker = iter.next().getTracker(); 1202 // testing iter.hasNext after calling iter.next to skip applying the tracker for last file, 1203 // which is just the storeTracker above. 1204 while (iter.hasNext()) { 1205 holdingCleanupTracker.setDeletedIfModifiedInBoth(tracker); 1206 if (holdingCleanupTracker.isEmpty()) { 1207 break; 1208 } 1209 tracker = iter.next().getTracker(); 1210 } 1211 } 1212 1213 /** 1214 * Remove all logs with logId <= {@code lastLogId}. 1215 */ 1216 private void removeAllLogs(long lastLogId, String why) { 1217 if (logs.size() <= 1) { 1218 return; 1219 } 1220 1221 LOG.info("Remove all state logs with ID less than {}, since {}", lastLogId, why); 1222 1223 boolean removed = false; 1224 while (logs.size() > 1) { 1225 ProcedureWALFile log = logs.getFirst(); 1226 if (lastLogId < log.getLogId()) { 1227 break; 1228 } 1229 removeLogFile(log, walArchiveDir); 1230 removed = true; 1231 } 1232 1233 if (removed) { 1234 buildHoldingCleanupTracker(); 1235 } 1236 } 1237 1238 private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) { 1239 try { 1240 LOG.trace("Removing log={}", log); 1241 log.removeFile(walArchiveDir); 1242 logs.remove(log); 1243 LOG.debug("Removed log={}, activeLogs={}", log, logs); 1244 assert logs.size() > 0 : "expected at least one log"; 1245 } catch (IOException e) { 1246 LOG.error("Unable to remove log: " + log, e); 1247 return false; 1248 } 1249 return true; 1250 } 1251 1252 // ========================================================================== 1253 // FileSystem Log Files helpers 1254 // ========================================================================== 1255 public Path getWALDir() { 1256 return this.walDir; 1257 } 1258 1259 Path getWalArchiveDir() { 1260 return this.walArchiveDir; 1261 } 1262 1263 public FileSystem getFileSystem() { 1264 return this.fs; 1265 } 1266 1267 protected Path getLogFilePath(final long logId) throws IOException { 1268 return new Path(walDir, String.format(LOG_PREFIX + "%020d.log", logId)); 1269 } 1270 1271 private static long getLogIdFromName(final String name) { 1272 int end = name.lastIndexOf(".log"); 1273 int start = name.lastIndexOf('-') + 1; 1274 return Long.parseLong(name.substring(start, end)); 1275 } 1276 1277 private static final PathFilter WALS_PATH_FILTER = new PathFilter() { 1278 @Override 1279 public boolean accept(Path path) { 1280 String name = path.getName(); 1281 return name.startsWith(LOG_PREFIX) && name.endsWith(".log"); 1282 } 1283 }; 1284 1285 private static final Comparator<FileStatus> FILE_STATUS_ID_COMPARATOR = 1286 new Comparator<FileStatus>() { 1287 @Override 1288 public int compare(FileStatus a, FileStatus b) { 1289 final long aId = getLogIdFromName(a.getPath().getName()); 1290 final long bId = getLogIdFromName(b.getPath().getName()); 1291 return Long.compare(aId, bId); 1292 } 1293 }; 1294 1295 private FileStatus[] getLogFiles() throws IOException { 1296 try { 1297 FileStatus[] files = fs.listStatus(walDir, WALS_PATH_FILTER); 1298 Arrays.sort(files, FILE_STATUS_ID_COMPARATOR); 1299 return files; 1300 } catch (FileNotFoundException e) { 1301 LOG.warn("Log directory not found: " + e.getMessage()); 1302 return null; 1303 } 1304 } 1305 1306 /** 1307 * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort 1308 * the file set by log id. 1309 * @return Max-LogID of the specified log file set 1310 */ 1311 private static long getMaxLogId(FileStatus[] logFiles) { 1312 if (logFiles == null || logFiles.length == 0) { 1313 return 0L; 1314 } 1315 return getLogIdFromName(logFiles[logFiles.length - 1].getPath().getName()); 1316 } 1317 1318 /** 1319 * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort 1320 * the file set by log id. 1321 * @return Max-LogID of the specified log file set 1322 */ 1323 private long initOldLogs(FileStatus[] logFiles) throws IOException { 1324 if (logFiles == null || logFiles.length == 0) { 1325 return 0L; 1326 } 1327 long maxLogId = 0; 1328 for (int i = 0; i < logFiles.length; ++i) { 1329 final Path logPath = logFiles[i].getPath(); 1330 leaseRecovery.recoverFileLease(fs, logPath); 1331 if (!isRunning()) { 1332 throw new IOException("wal aborting"); 1333 } 1334 1335 maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName())); 1336 ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir); 1337 if (log != null) { 1338 this.logs.add(log); 1339 } 1340 } 1341 initTrackerFromOldLogs(); 1342 return maxLogId; 1343 } 1344 1345 /** 1346 * If last log's tracker is not null, use it as {@link #storeTracker}. Otherwise, set storeTracker 1347 * as partial, and let {@link ProcedureWALFormatReader} rebuild it using entries in the log. 1348 */ 1349 private void initTrackerFromOldLogs() { 1350 if (logs.isEmpty() || !isRunning()) { 1351 return; 1352 } 1353 ProcedureWALFile log = logs.getLast(); 1354 if (!log.getTracker().isPartial()) { 1355 storeTracker.resetTo(log.getTracker()); 1356 } else { 1357 storeTracker.reset(); 1358 storeTracker.setPartialFlag(true); 1359 } 1360 } 1361 1362 /** 1363 * Loads given log file and it's tracker. 1364 */ 1365 private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir) 1366 throws IOException { 1367 final ProcedureWALFile log = new ProcedureWALFile(fs, logFile); 1368 if (logFile.getLen() == 0) { 1369 LOG.warn("Remove uninitialized log: {}", logFile); 1370 log.removeFile(walArchiveDir); 1371 return null; 1372 } 1373 LOG.debug("Opening Pv2 {}", logFile); 1374 try { 1375 log.open(); 1376 } catch (ProcedureWALFormat.InvalidWALDataException e) { 1377 LOG.warn("Remove uninitialized log: {}", logFile, e); 1378 log.removeFile(walArchiveDir); 1379 return null; 1380 } catch (IOException e) { 1381 String msg = "Unable to read state log: " + logFile; 1382 LOG.error(msg, e); 1383 throw new IOException(msg, e); 1384 } 1385 1386 try { 1387 log.readTracker(); 1388 } catch (IOException e) { 1389 log.getTracker().reset(); 1390 log.getTracker().setPartialFlag(true); 1391 LOG.warn("Unable to read tracker for {}", log, e); 1392 } 1393 1394 log.close(); 1395 return log; 1396 } 1397 1398 /** 1399 * Parses a directory of WALs building up ProcedureState. For testing parse and profiling. 1400 * @param args Include pointer to directory of WAL files for a store instance to parse & load. 1401 */ 1402 public static void main(String[] args) throws IOException { 1403 Configuration conf = HBaseConfiguration.create(); 1404 if (args == null || args.length != 1) { 1405 System.out.println("ERROR: Empty arguments list; pass path to MASTERPROCWALS_DIR."); 1406 System.out.println("Usage: WALProcedureStore MASTERPROCWALS_DIR"); 1407 System.exit(-1); 1408 } 1409 WALProcedureStore store = 1410 new WALProcedureStore(conf, new Path(args[0]), null, new LeaseRecovery() { 1411 @Override 1412 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 1413 // no-op 1414 } 1415 }); 1416 try { 1417 store.start(16); 1418 ProcedureExecutor<?> pe = 1419 new ProcedureExecutor<>(conf, new Object()/* Pass anything */, store); 1420 pe.init(1, true); 1421 } finally { 1422 store.stop(true); 1423 } 1424 } 1425}