001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.region; 019 020import static org.apache.hadoop.hbase.HConstants.HREGION_LOGDIR_NAME; 021 022import com.google.errorprone.annotations.RestrictedApi; 023import java.io.IOException; 024import java.util.List; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileStatus; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.HBaseIOException; 030import org.apache.hadoop.hbase.Server; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 033import org.apache.hadoop.hbase.client.Get; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.client.RegionInfoBuilder; 036import org.apache.hadoop.hbase.client.Result; 037import org.apache.hadoop.hbase.client.ResultScanner; 038import org.apache.hadoop.hbase.client.Scan; 039import org.apache.hadoop.hbase.client.TableDescriptor; 040import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 041import org.apache.hadoop.hbase.log.HBaseMarkers; 042import org.apache.hadoop.hbase.regionserver.HRegion; 043import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 044import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 045import org.apache.hadoop.hbase.regionserver.RegionScanner; 046import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 047import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; 048import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 049import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 050import org.apache.hadoop.hbase.regionserver.wal.WALSyncTimeoutIOException; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.CommonFSUtils; 053import org.apache.hadoop.hbase.util.FSTableDescriptors; 054import org.apache.hadoop.hbase.util.FSUtils; 055import org.apache.hadoop.hbase.util.HFileArchiveUtil; 056import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 057import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 058import org.apache.hadoop.hbase.wal.WAL; 059import org.apache.hadoop.hbase.wal.WALFactory; 060import org.apache.yetus.audience.InterfaceAudience; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064import org.apache.hbase.thirdparty.com.google.common.math.IntMath; 065 066/** 067 * A region that stores data in a separated directory, which can be used to store master local data. 068 * <p/> 069 * FileSystem layout: 070 * 071 * <pre> 072 * hbase 073 * | 074 * --<region dir> 075 * | 076 * --data 077 * | | 078 * | --/<ns>/<table>/<encoded-region-name> <---- The region data 079 * | | 080 * | --replay <---- The edits to replay 081 * | 082 * --WALs 083 * | 084 * --<master-server-name> <---- The WAL dir for active master 085 * | 086 * --<master-server-name>-dead <---- The WAL dir for dead master 087 * </pre> 088 * 089 * Notice that, you can use different root file system and WAL file system. Then the above directory 090 * will be on two file systems, the root file system will have the data directory while the WAL 091 * filesystem will have the WALs directory. The archived HFile will be moved to the global HFile 092 * archived directory with the {@link MasterRegionParams#archivedHFileSuffix()} suffix. The archived 093 * WAL will be moved to the global WAL archived directory with the 094 * {@link MasterRegionParams#archivedWalSuffix()} suffix. 095 */ 096@InterfaceAudience.Private 097public final class MasterRegion { 098 099 private static final Logger LOG = LoggerFactory.getLogger(MasterRegion.class); 100 101 private static final String REPLAY_EDITS_DIR = "recovered.wals"; 102 103 private static final String DEAD_WAL_DIR_SUFFIX = "-dead"; 104 105 static final String INITIALIZING_FLAG = ".initializing"; 106 107 static final String INITIALIZED_FLAG = ".initialized"; 108 109 private static final int REGION_ID = 1; 110 111 private final Server server; 112 113 private final WALFactory walFactory; 114 115 final HRegion region; 116 117 final MasterRegionFlusherAndCompactor flusherAndCompactor; 118 119 private MasterRegionWALRoller walRoller; 120 121 private MasterRegion(Server server, HRegion region, WALFactory walFactory, 122 MasterRegionFlusherAndCompactor flusherAndCompactor, MasterRegionWALRoller walRoller) { 123 this.server = server; 124 this.region = region; 125 this.walFactory = walFactory; 126 this.flusherAndCompactor = flusherAndCompactor; 127 this.walRoller = walRoller; 128 } 129 130 private void closeRegion(boolean abort) { 131 try { 132 region.close(abort); 133 } catch (IOException e) { 134 LOG.warn("Failed to close region", e); 135 } 136 } 137 138 private void shutdownWAL() { 139 try { 140 walFactory.shutdown(); 141 } catch (IOException e) { 142 LOG.warn("Failed to shutdown WAL", e); 143 } 144 } 145 146 public void update(UpdateMasterRegion action) throws IOException { 147 try { 148 action.update(region); 149 flusherAndCompactor.onUpdate(); 150 } catch (WALSyncTimeoutIOException e) { 151 LOG.error(HBaseMarkers.FATAL, "WAL sync timeout. Aborting server."); 152 server.abort("WAL sync timeout", e); 153 throw e; 154 } 155 } 156 157 public Result get(Get get) throws IOException { 158 return region.get(get); 159 } 160 161 public ResultScanner getScanner(Scan scan) throws IOException { 162 return new RegionScannerAsResultScanner(region.getScanner(scan)); 163 } 164 165 public RegionScanner getRegionScanner(Scan scan) throws IOException { 166 return region.getScanner(scan); 167 } 168 169 public FlushResult flush(boolean force) throws IOException { 170 try { 171 flusherAndCompactor.resetChangesAfterLastFlush(); 172 FlushResult flushResult = region.flush(force); 173 flusherAndCompactor.recordLastFlushTime(); 174 return flushResult; 175 } catch (WALSyncTimeoutIOException e) { 176 LOG.error(HBaseMarkers.FATAL, "WAL sync timeout. Aborting server."); 177 server.abort("WAL sync timeout", e); 178 throw e; 179 } 180 } 181 182 @RestrictedApi(explanation = "Should only be called in tests", link = "", 183 allowedOnPath = ".*/src/test/.*") 184 public void requestRollAll() { 185 walRoller.requestRollAll(); 186 } 187 188 @RestrictedApi(explanation = "Should only be called in tests", link = "", 189 allowedOnPath = ".*/src/test/.*") 190 public void waitUntilWalRollFinished() throws InterruptedException { 191 walRoller.waitUntilWalRollFinished(); 192 } 193 194 public void close(boolean abort) { 195 LOG.info("Closing local region {}, isAbort={}", region.getRegionInfo(), abort); 196 if (flusherAndCompactor != null) { 197 flusherAndCompactor.close(); 198 } 199 // if abort, we shutdown wal first to fail the ongoing updates to the region, and then close the 200 // region, otherwise there will be dead lock. 201 if (abort) { 202 shutdownWAL(); 203 closeRegion(true); 204 } else { 205 closeRegion(false); 206 shutdownWAL(); 207 } 208 209 if (walRoller != null) { 210 walRoller.close(); 211 } 212 } 213 214 private static WAL createWAL(WALFactory walFactory, MasterRegionWALRoller walRoller, 215 String serverName, FileSystem walFs, Path walRootDir, RegionInfo regionInfo) 216 throws IOException { 217 String logName = AbstractFSWALProvider.getWALDirectoryName(serverName); 218 Path walDir = new Path(walRootDir, logName); 219 LOG.debug("WALDir={}", walDir); 220 if (walFs.exists(walDir)) { 221 throw new HBaseIOException( 222 "Already created wal directory at " + walDir + " for local region " + regionInfo); 223 } 224 if (!walFs.mkdirs(walDir)) { 225 throw new IOException( 226 "Can not create wal directory " + walDir + " for local region " + regionInfo); 227 } 228 WAL wal = walFactory.getWAL(regionInfo); 229 walRoller.addWAL(wal); 230 return wal; 231 } 232 233 private static HRegion bootstrap(Configuration conf, TableDescriptor td, FileSystem fs, 234 Path rootDir, FileSystem walFs, Path walRootDir, WALFactory walFactory, 235 MasterRegionWALRoller walRoller, String serverName, boolean touchInitializingFlag) 236 throws IOException { 237 TableName tn = td.getTableName(); 238 RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tn).setRegionId(REGION_ID).build(); 239 Path tableDir = CommonFSUtils.getTableDir(rootDir, tn); 240 // persist table descriptor 241 FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, true); 242 HRegion.createHRegion(conf, regionInfo, fs, tableDir, td).close(); 243 Path initializedFlag = new Path(tableDir, INITIALIZED_FLAG); 244 if (!fs.mkdirs(initializedFlag)) { 245 throw new IOException("Can not touch initialized flag: " + initializedFlag); 246 } 247 Path initializingFlag = new Path(tableDir, INITIALIZING_FLAG); 248 if (!fs.delete(initializingFlag, true)) { 249 LOG.warn("failed to clean up initializing flag: " + initializingFlag); 250 } 251 WAL wal = createWAL(walFactory, walRoller, serverName, walFs, walRootDir, regionInfo); 252 return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null); 253 } 254 255 private static RegionInfo loadRegionInfo(FileSystem fs, Path tableDir) throws IOException { 256 Path regionDir = 257 fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0] 258 .getPath(); 259 return HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir); 260 } 261 262 private static HRegion open(Configuration conf, TableDescriptor td, RegionInfo regionInfo, 263 FileSystem fs, Path rootDir, FileSystem walFs, Path walRootDir, WALFactory walFactory, 264 MasterRegionWALRoller walRoller, String serverName) throws IOException { 265 Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName()); 266 Path walRegionDir = FSUtils.getRegionDirFromRootDir(walRootDir, regionInfo); 267 Path replayEditsDir = new Path(walRegionDir, REPLAY_EDITS_DIR); 268 if (!walFs.exists(replayEditsDir) && !walFs.mkdirs(replayEditsDir)) { 269 throw new IOException("Failed to create replay directory: " + replayEditsDir); 270 } 271 272 // Replay any WALs for the Master Region before opening it. 273 Path walsDir = new Path(walRootDir, HREGION_LOGDIR_NAME); 274 // In open(...), we expect that the WAL directory for the MasterRegion to already exist. 275 // This is in contrast to bootstrap() where we create the MasterRegion data and WAL dir. 276 // However, it's possible that users directly remove the WAL directory. We expect walsDir 277 // to always exist in normal situations, but we should guard against users changing the 278 // filesystem outside of HBase's line of sight. 279 if (walFs.exists(walsDir)) { 280 replayWALs(conf, walFs, walRootDir, walsDir, regionInfo, serverName, replayEditsDir); 281 } else { 282 LOG.error( 283 "UNEXPECTED: WAL directory for MasterRegion is missing." + " {} is unexpectedly missing.", 284 walsDir); 285 } 286 287 // Create a new WAL 288 WAL wal = createWAL(walFactory, walRoller, serverName, walFs, walRootDir, regionInfo); 289 conf.set(HRegion.SPECIAL_RECOVERED_EDITS_DIR, 290 replayEditsDir.makeQualified(walFs.getUri(), walFs.getWorkingDirectory()).toString()); 291 // we do not do WAL splitting here so it is possible to have uncleanly closed WAL files, so we 292 // need to ignore EOFException. 293 conf.setBoolean(HRegion.RECOVERED_EDITS_IGNORE_EOF, true); 294 return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null); 295 } 296 297 private static void replayWALs(Configuration conf, FileSystem walFs, Path walRootDir, 298 Path walsDir, RegionInfo regionInfo, String serverName, Path replayEditsDir) 299 throws IOException { 300 for (FileStatus walDir : walFs.listStatus(walsDir)) { 301 if (!walDir.isDirectory()) { 302 continue; 303 } 304 if (walDir.getPath().getName().startsWith(serverName)) { 305 LOG.warn("This should not happen in real production as we have not created our WAL " 306 + "directory yet, ignore if you are running a local region related UT"); 307 } 308 Path deadWALDir; 309 if (!walDir.getPath().getName().endsWith(DEAD_WAL_DIR_SUFFIX)) { 310 deadWALDir = 311 new Path(walDir.getPath().getParent(), walDir.getPath().getName() + DEAD_WAL_DIR_SUFFIX); 312 if (!walFs.rename(walDir.getPath(), deadWALDir)) { 313 throw new IOException("Can not rename " + walDir + " to " + deadWALDir 314 + " when recovering lease of proc store"); 315 } 316 LOG.info("Renamed {} to {} as it is dead", walDir.getPath(), deadWALDir); 317 } else { 318 deadWALDir = walDir.getPath(); 319 LOG.info("{} is already marked as dead", deadWALDir); 320 } 321 for (FileStatus walFile : walFs.listStatus(deadWALDir)) { 322 Path replayEditsFile = new Path(replayEditsDir, walFile.getPath().getName()); 323 RecoverLeaseFSUtils.recoverFileLease(walFs, walFile.getPath(), conf); 324 if (!walFs.rename(walFile.getPath(), replayEditsFile)) { 325 throw new IOException("Can not rename " + walFile.getPath() + " to " + replayEditsFile 326 + " when recovering lease for local region"); 327 } 328 LOG.info("Renamed {} to {}", walFile.getPath(), replayEditsFile); 329 } 330 LOG.info("Delete empty local region wal dir {}", deadWALDir); 331 walFs.delete(deadWALDir, true); 332 } 333 } 334 335 private static void tryMigrate(Configuration conf, FileSystem fs, Path tableDir, 336 RegionInfo regionInfo, TableDescriptor oldTd, TableDescriptor newTd) throws IOException { 337 Class<? extends StoreFileTracker> oldSft = 338 StoreFileTrackerFactory.getTrackerClass(oldTd.getValue(StoreFileTrackerFactory.TRACKER_IMPL)); 339 Class<? extends StoreFileTracker> newSft = 340 StoreFileTrackerFactory.getTrackerClass(newTd.getValue(StoreFileTrackerFactory.TRACKER_IMPL)); 341 if (oldSft.equals(newSft)) { 342 LOG.debug("old store file tracker {} is the same with new store file tracker, skip migration", 343 StoreFileTrackerFactory.getStoreFileTrackerName(oldSft)); 344 if (!oldTd.equals(newTd)) { 345 // we may change other things such as adding a new family, so here we still need to persist 346 // the new table descriptor 347 LOG.info("Update table descriptor from {} to {}", oldTd, newTd); 348 FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, newTd, true); 349 } 350 return; 351 } 352 LOG.info("Migrate store file tracker from {} to {}", oldSft.getSimpleName(), 353 newSft.getSimpleName()); 354 HRegionFileSystem hfs = 355 HRegionFileSystem.openRegionFromFileSystem(conf, fs, tableDir, regionInfo, false); 356 for (ColumnFamilyDescriptor oldCfd : oldTd.getColumnFamilies()) { 357 StoreFileTracker oldTracker = StoreFileTrackerFactory.create(conf, oldTd, oldCfd, hfs); 358 StoreFileTracker newTracker = StoreFileTrackerFactory.create(conf, oldTd, oldCfd, hfs); 359 List<StoreFileInfo> files = oldTracker.load(); 360 LOG.debug("Store file list for {}: {}", oldCfd.getNameAsString(), files); 361 newTracker.set(oldTracker.load()); 362 } 363 // persist the new table descriptor after migration 364 LOG.info("Update table descriptor from {} to {}", oldTd, newTd); 365 FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, newTd, true); 366 } 367 368 public static MasterRegion create(MasterRegionParams params) throws IOException { 369 TableDescriptor td = params.tableDescriptor(); 370 LOG.info("Create or load local region for table " + td); 371 Server server = params.server(); 372 Configuration baseConf = server.getConfiguration(); 373 FileSystem fs = CommonFSUtils.getRootDirFileSystem(baseConf); 374 FileSystem walFs = CommonFSUtils.getWALFileSystem(baseConf); 375 Path globalRootDir = CommonFSUtils.getRootDir(baseConf); 376 Path globalWALRootDir = CommonFSUtils.getWALRootDir(baseConf); 377 Path rootDir = new Path(globalRootDir, params.regionDirName()); 378 Path walRootDir = new Path(globalWALRootDir, params.regionDirName()); 379 // we will override some configurations so create a new one. 380 Configuration conf = new Configuration(baseConf); 381 CommonFSUtils.setRootDir(conf, rootDir); 382 CommonFSUtils.setWALRootDir(conf, walRootDir); 383 MasterRegionFlusherAndCompactor.setupConf(conf, params.flushSize(), params.flushPerChanges(), 384 params.flushIntervalMs()); 385 conf.setInt(AbstractFSWAL.MAX_LOGS, params.maxWals()); 386 if (params.useHsync() != null) { 387 conf.setBoolean(HRegion.WAL_HSYNC_CONF_KEY, params.useHsync()); 388 } 389 if (params.useMetaCellComparator() != null) { 390 conf.setBoolean(HRegion.USE_META_CELL_COMPARATOR, params.useMetaCellComparator()); 391 } 392 conf.setInt(AbstractFSWAL.RING_BUFFER_SLOT_COUNT, 393 IntMath.ceilingPowerOfTwo(params.ringBufferSlotCount())); 394 395 MasterRegionWALRoller walRoller = MasterRegionWALRoller.create( 396 td.getTableName() + "-WAL-Roller", conf, server, walFs, walRootDir, globalWALRootDir, 397 params.archivedWalSuffix(), params.rollPeriodMs(), params.flushSize()); 398 walRoller.start(); 399 400 WALFactory walFactory = new WALFactory(conf, server.getServerName(), server); 401 Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName()); 402 Path initializingFlag = new Path(tableDir, INITIALIZING_FLAG); 403 Path initializedFlag = new Path(tableDir, INITIALIZED_FLAG); 404 HRegion region; 405 if (!fs.exists(tableDir)) { 406 // bootstrap, no doubt 407 if (!fs.mkdirs(initializedFlag)) { 408 throw new IOException("Can not touch initialized flag"); 409 } 410 region = bootstrap(conf, td, fs, rootDir, walFs, walRootDir, walFactory, walRoller, 411 server.getServerName().toString(), true); 412 } else { 413 if (!fs.exists(initializedFlag)) { 414 if (!fs.exists(initializingFlag)) { 415 // should be old style, where we do not have the initializing or initialized file, persist 416 // the table descriptor, touch the initialized flag and then open the region. 417 // the store file tracker must be DEFAULT 418 LOG.info("No {} or {} file, try upgrading", INITIALIZING_FLAG, INITIALIZED_FLAG); 419 TableDescriptor oldTd = 420 TableDescriptorBuilder.newBuilder(td).setValue(StoreFileTrackerFactory.TRACKER_IMPL, 421 StoreFileTrackerFactory.Trackers.DEFAULT.name()).build(); 422 FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, oldTd, true); 423 if (!fs.mkdirs(initializedFlag)) { 424 throw new IOException("Can not touch initialized flag: " + initializedFlag); 425 } 426 RegionInfo regionInfo = loadRegionInfo(fs, tableDir); 427 tryMigrate(conf, fs, tableDir, regionInfo, oldTd, td); 428 region = open(conf, td, regionInfo, fs, rootDir, walFs, walRootDir, walFactory, walRoller, 429 server.getServerName().toString()); 430 } else { 431 // delete all contents besides the initializing flag, here we can make sure tableDir 432 // exists(unless someone delete it manually...), so we do not do null check here. 433 for (FileStatus status : fs.listStatus(tableDir)) { 434 if (!status.getPath().getName().equals(INITIALIZING_FLAG)) { 435 fs.delete(status.getPath(), true); 436 } 437 } 438 region = bootstrap(conf, td, fs, rootDir, walFs, walRootDir, walFactory, walRoller, 439 server.getServerName().toString(), false); 440 } 441 } else { 442 if (fs.exists(initializingFlag) && !fs.delete(initializingFlag, true)) { 443 LOG.warn("failed to clean up initializing flag: " + initializingFlag); 444 } 445 // open it, make sure to load the table descriptor from fs 446 TableDescriptor oldTd = FSTableDescriptors.getTableDescriptorFromFs(fs, tableDir); 447 RegionInfo regionInfo = loadRegionInfo(fs, tableDir); 448 tryMigrate(conf, fs, tableDir, regionInfo, oldTd, td); 449 region = open(conf, td, regionInfo, fs, rootDir, walFs, walRootDir, walFactory, walRoller, 450 server.getServerName().toString()); 451 } 452 } 453 454 Path globalArchiveDir = HFileArchiveUtil.getArchivePath(baseConf); 455 MasterRegionFlusherAndCompactor flusherAndCompactor = new MasterRegionFlusherAndCompactor(conf, 456 server, region, params.flushSize(), params.flushPerChanges(), params.flushIntervalMs(), 457 params.compactMin(), globalArchiveDir, params.archivedHFileSuffix()); 458 walRoller.setFlusherAndCompactor(flusherAndCompactor); 459 Path archiveDir = HFileArchiveUtil.getArchivePath(conf); 460 if (!fs.mkdirs(archiveDir)) { 461 LOG.warn("Failed to create archive directory {}. Usually this should not happen but it will" 462 + " be created again when we actually archive the hfiles later, so continue", archiveDir); 463 } 464 return new MasterRegion(server, region, walFactory, flusherAndCompactor, walRoller); 465 } 466}