001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import static org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState.WAITING_TIMEOUT; 021 022import java.io.DataInputStream; 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Objects; 030import java.util.Set; 031import java.util.function.Function; 032import java.util.stream.Collectors; 033import java.util.stream.Stream; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FSDataInputStream; 036import org.apache.hadoop.fs.FileStatus; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.MetaTableAccessor; 041import org.apache.hadoop.hbase.NamespaceDescriptor; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.Connection; 044import org.apache.hadoop.hbase.client.Delete; 045import org.apache.hadoop.hbase.client.Mutation; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.client.TableState; 049import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 050import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 051import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 052import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 053import org.apache.hadoop.hbase.util.CommonFSUtils; 054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 055import org.apache.hadoop.hbase.util.FSUtils; 056import org.apache.hadoop.hbase.util.RetryCounter; 057import org.apache.yetus.audience.InterfaceAudience; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 062 063import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshMetaState; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshMetaStateData; 065 066@InterfaceAudience.Private 067public class RefreshMetaProcedure extends AbstractStateMachineTableProcedure<RefreshMetaState> { 068 private static final Logger LOG = LoggerFactory.getLogger(RefreshMetaProcedure.class); 069 private static final String HIDDEN_DIR_PATTERN = "^[._-].*"; 070 071 private List<RegionInfo> currentRegions; 072 private List<RegionInfo> latestRegions; 073 private List<Mutation> pendingMutations; 074 private RetryCounter retryCounter; 075 private static final int MUTATION_BATCH_SIZE = 100; 076 private List<RegionInfo> newlyAddedRegions; 077 private List<TableName> deletedTables; 078 079 public RefreshMetaProcedure() { 080 super(); 081 } 082 083 public RefreshMetaProcedure(MasterProcedureEnv env) { 084 super(env); 085 } 086 087 @Override 088 public TableName getTableName() { 089 return TableName.META_TABLE_NAME; 090 } 091 092 @Override 093 public TableOperationType getTableOperationType() { 094 return TableOperationType.EDIT; 095 } 096 097 @Override 098 protected Flow executeFromState(MasterProcedureEnv env, RefreshMetaState refreshMetaState) { 099 LOG.info("Executing RefreshMetaProcedure state: {}", refreshMetaState); 100 101 try { 102 return switch (refreshMetaState) { 103 case REFRESH_META_INIT -> executeInit(env); 104 case REFRESH_META_SCAN_STORAGE -> executeScanStorage(env); 105 case REFRESH_META_PREPARE -> executePrepare(); 106 case REFRESH_META_APPLY -> executeApply(env); 107 case REFRESH_META_FOLLOWUP -> executeFollowup(env); 108 case REFRESH_META_FINISH -> executeFinish(env); 109 default -> throw new UnsupportedOperationException("Unhandled state: " + refreshMetaState); 110 }; 111 } catch (Exception ex) { 112 LOG.error("Error in RefreshMetaProcedure state {}", refreshMetaState, ex); 113 setFailure("RefreshMetaProcedure", ex); 114 return Flow.NO_MORE_STATE; 115 } 116 } 117 118 private Flow executeInit(MasterProcedureEnv env) throws IOException { 119 LOG.trace("Getting current regions from {} table", TableName.META_TABLE_NAME); 120 try { 121 currentRegions = getCurrentRegions(env.getMasterServices().getConnection()); 122 LOG.info("Found {} current regions in meta table", currentRegions.size()); 123 setNextState(RefreshMetaState.REFRESH_META_SCAN_STORAGE); 124 return Flow.HAS_MORE_STATE; 125 } catch (IOException ioe) { 126 LOG.error("Failed to get current regions from meta table", ioe); 127 throw ioe; 128 } 129 } 130 131 private Flow executeScanStorage(MasterProcedureEnv env) throws IOException { 132 try { 133 latestRegions = scanBackingStorage(env.getMasterServices().getConnection()); 134 LOG.info("Found {} regions in backing storage", latestRegions.size()); 135 setNextState(RefreshMetaState.REFRESH_META_PREPARE); 136 return Flow.HAS_MORE_STATE; 137 } catch (IOException ioe) { 138 LOG.error("Failed to scan backing storage", ioe); 139 throw ioe; 140 } 141 } 142 143 private Flow executePrepare() throws IOException { 144 if (currentRegions == null || latestRegions == null) { 145 LOG.error( 146 "Can not execute update on null lists. " + "Meta Table Regions - {}, Storage Regions - {}", 147 currentRegions, latestRegions); 148 throw new IOException( 149 (currentRegions == null ? "current regions" : "latest regions") + " list is null"); 150 } 151 LOG.info("Comparing regions. Current regions: {}, Latest regions: {}", currentRegions.size(), 152 latestRegions.size()); 153 154 this.newlyAddedRegions = new ArrayList<>(); 155 this.deletedTables = new ArrayList<>(); 156 157 pendingMutations = prepareMutations( 158 currentRegions.stream() 159 .collect(Collectors.toMap(RegionInfo::getEncodedName, Function.identity())), 160 latestRegions.stream() 161 .collect(Collectors.toMap(RegionInfo::getEncodedName, Function.identity()))); 162 163 if (pendingMutations.isEmpty()) { 164 LOG.info("RefreshMetaProcedure completed, No update needed."); 165 setNextState(RefreshMetaState.REFRESH_META_FINISH); 166 } else { 167 LOG.info("Prepared {} region mutations and {} tables for cleanup.", pendingMutations.size(), 168 deletedTables.size()); 169 setNextState(RefreshMetaState.REFRESH_META_APPLY); 170 } 171 return Flow.HAS_MORE_STATE; 172 } 173 174 private Flow executeApply(MasterProcedureEnv env) throws ProcedureSuspendedException { 175 try { 176 if (pendingMutations != null && !pendingMutations.isEmpty()) { 177 applyMutations(env.getMasterServices().getConnection(), pendingMutations); 178 LOG.debug("RefreshMetaProcedure applied {} mutations to meta table", 179 pendingMutations.size()); 180 } 181 } catch (IOException ioe) { 182 if (retryCounter == null) { 183 retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); 184 } 185 long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); 186 LOG.warn("Failed to apply mutations to meta table, suspending for {} ms", backoff, ioe); 187 setTimeout(Math.toIntExact(backoff)); 188 setState(WAITING_TIMEOUT); 189 skipPersistence(); 190 throw new ProcedureSuspendedException(); 191 } 192 193 if ( 194 (this.newlyAddedRegions != null && !this.newlyAddedRegions.isEmpty()) 195 || (this.deletedTables != null && !this.deletedTables.isEmpty()) 196 ) { 197 setNextState(RefreshMetaState.REFRESH_META_FOLLOWUP); 198 } else { 199 LOG.info("RefreshMetaProcedure completed. No follow-up actions were required."); 200 setNextState(RefreshMetaState.REFRESH_META_FINISH); 201 } 202 return Flow.HAS_MORE_STATE; 203 } 204 205 private Flow executeFollowup(MasterProcedureEnv env) throws IOException { 206 207 LOG.info("Submitting assignment for new regions: {}", this.newlyAddedRegions); 208 addChildProcedure(env.getAssignmentManager().createAssignProcedures(newlyAddedRegions)); 209 210 for (TableName tableName : this.deletedTables) { 211 LOG.debug("Submitting deletion for empty table {}", tableName); 212 env.getMasterServices().getAssignmentManager().deleteTable(tableName); 213 env.getMasterServices().getTableStateManager().setDeletedTable(tableName); 214 env.getMasterServices().getTableDescriptors().remove(tableName); 215 } 216 setNextState(RefreshMetaState.REFRESH_META_FINISH); 217 return Flow.HAS_MORE_STATE; 218 } 219 220 private Flow executeFinish(MasterProcedureEnv env) { 221 invalidateTableDescriptorCache(env); 222 LOG.info("RefreshMetaProcedure completed successfully. All follow-up actions finished."); 223 currentRegions = null; 224 latestRegions = null; 225 pendingMutations = null; 226 deletedTables = null; 227 newlyAddedRegions = null; 228 return Flow.NO_MORE_STATE; 229 } 230 231 private void invalidateTableDescriptorCache(MasterProcedureEnv env) { 232 LOG.debug("Invalidating the table descriptor cache to ensure new tables are discovered"); 233 env.getMasterServices().getTableDescriptors().invalidateTableDescriptorCache(); 234 } 235 236 /** 237 * Prepares mutations by comparing the current regions in hbase:meta with the latest regions from 238 * backing storage. Also populates newlyAddedRegions and deletedTables lists for follow-up 239 * actions. 240 * @param currentMap Current regions from hbase:meta 241 * @param latestMap Latest regions from backing storage 242 * @return List of mutations to apply to the meta table 243 * @throws IOException If there is an error creating mutations 244 */ 245 private List<Mutation> prepareMutations(Map<String, RegionInfo> currentMap, 246 Map<String, RegionInfo> latestMap) throws IOException { 247 List<Mutation> mutations = new ArrayList<>(); 248 249 for (String regionId : Stream.concat(currentMap.keySet().stream(), latestMap.keySet().stream()) 250 .collect(Collectors.toSet())) { 251 RegionInfo currentRegion = currentMap.get(regionId); 252 RegionInfo latestRegion = latestMap.get(regionId); 253 254 if (latestRegion != null) { 255 if (currentRegion == null || hasBoundaryChanged(currentRegion, latestRegion)) { 256 mutations.add(MetaTableAccessor.makePutFromRegionInfo(latestRegion)); 257 newlyAddedRegions.add(latestRegion); 258 } 259 } else { 260 mutations.add(MetaTableAccessor.makeDeleteFromRegionInfo(currentRegion, 261 EnvironmentEdgeManager.currentTime())); 262 } 263 } 264 265 if (!currentMap.isEmpty() || !latestMap.isEmpty()) { 266 Set<TableName> currentTables = 267 currentMap.values().stream().map(RegionInfo::getTable).collect(Collectors.toSet()); 268 Set<TableName> latestTables = 269 latestMap.values().stream().map(RegionInfo::getTable).collect(Collectors.toSet()); 270 271 Set<TableName> tablesToDeleteState = new HashSet<>(currentTables); 272 tablesToDeleteState.removeAll(latestTables); 273 if (!tablesToDeleteState.isEmpty()) { 274 LOG.warn( 275 "The following tables have no regions on storage and WILL BE REMOVED from the meta: {}", 276 tablesToDeleteState); 277 this.deletedTables.addAll(tablesToDeleteState); 278 } 279 280 Set<TableName> tablesToRestoreState = new HashSet<>(latestTables); 281 tablesToRestoreState.removeAll(currentTables); 282 if (!tablesToRestoreState.isEmpty()) { 283 LOG.info("Adding missing table:state entry for recovered tables: {}", tablesToRestoreState); 284 for (TableName tableName : tablesToRestoreState) { 285 TableState tableState = new TableState(tableName, TableState.State.ENABLED); 286 mutations.add(MetaTableAccessor.makePutFromTableState(tableState, 287 EnvironmentEdgeManager.currentTime())); 288 } 289 } 290 } 291 return mutations; 292 } 293 294 private void applyMutations(Connection connection, List<Mutation> mutations) throws IOException { 295 List<List<Mutation>> chunks = Lists.partition(mutations, MUTATION_BATCH_SIZE); 296 297 for (int i = 0; i < chunks.size(); i++) { 298 List<Mutation> chunk = chunks.get(i); 299 300 List<Put> puts = 301 chunk.stream().filter(m -> m instanceof Put).map(m -> (Put) m).collect(Collectors.toList()); 302 303 List<Delete> deletes = chunk.stream().filter(m -> m instanceof Delete).map(m -> (Delete) m) 304 .collect(Collectors.toList()); 305 306 if (!puts.isEmpty()) { 307 MetaTableAccessor.putsToMetaTable(connection, puts); 308 } 309 if (!deletes.isEmpty()) { 310 MetaTableAccessor.deleteFromMetaTable(connection, deletes); 311 } 312 LOG.debug("Successfully processed batch {}/{}", i + 1, chunks.size()); 313 } 314 } 315 316 boolean hasBoundaryChanged(RegionInfo region1, RegionInfo region2) { 317 return !Arrays.equals(region1.getStartKey(), region2.getStartKey()) 318 || !Arrays.equals(region1.getEndKey(), region2.getEndKey()); 319 } 320 321 /** 322 * Scans the backing storage for all regions and returns a list of RegionInfo objects. This method 323 * scans the filesystem for region directories and reads their .regioninfo files. 324 * @param connection The HBase connection to use. 325 * @return List of RegionInfo objects found in the backing storage. 326 * @throws IOException If there is an error accessing the filesystem or reading region info files. 327 */ 328 List<RegionInfo> scanBackingStorage(Connection connection) throws IOException { 329 List<RegionInfo> regions = new ArrayList<>(); 330 Configuration conf = connection.getConfiguration(); 331 FileSystem fs = FileSystem.get(conf); 332 Path rootDir = CommonFSUtils.getRootDir(conf); 333 Path dataDir = new Path(rootDir, HConstants.BASE_NAMESPACE_DIR); 334 335 LOG.info("Scanning backing storage under: {}", dataDir); 336 337 if (!fs.exists(dataDir)) { 338 LOG.warn("Data directory does not exist: {}", dataDir); 339 return regions; 340 } 341 342 FileStatus[] namespaceDirs = 343 fs.listStatus(dataDir, path -> !path.getName().matches(HIDDEN_DIR_PATTERN)); 344 LOG.debug("Found {} namespace directories in data dir", Arrays.stream(namespaceDirs).toList()); 345 346 for (FileStatus nsDir : namespaceDirs) { 347 String namespaceName = nsDir.getPath().getName(); 348 if (NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR.equals(namespaceName)) { 349 LOG.info("Skipping system namespace {}", namespaceName); 350 continue; 351 } 352 try { 353 List<RegionInfo> namespaceRegions = scanTablesInNamespace(fs, nsDir.getPath()); 354 regions.addAll(namespaceRegions); 355 LOG.debug("Found {} regions in namespace {}", namespaceRegions.size(), 356 nsDir.getPath().getName()); 357 } catch (IOException e) { 358 LOG.error("Failed to scan namespace directory: {}", nsDir.getPath(), e); 359 } 360 } 361 LOG.info("Scanned backing storage and found {} regions", regions.size()); 362 return regions; 363 } 364 365 private List<RegionInfo> scanTablesInNamespace(FileSystem fs, Path namespacePath) 366 throws IOException { 367 LOG.debug("Scanning namespace {}", namespacePath.getName()); 368 List<Path> tableDirs = FSUtils.getLocalTableDirs(fs, namespacePath); 369 370 return tableDirs.stream().flatMap(tableDir -> { 371 try { 372 List<RegionInfo> tableRegions = scanRegionsInTable(fs, FSUtils.getRegionDirs(fs, tableDir)); 373 LOG.debug("Found {} regions in table {} in namespace {}", tableRegions.size(), 374 tableDir.getName(), namespacePath.getName()); 375 return tableRegions.stream(); 376 } catch (IOException e) { 377 LOG.warn("Failed to scan table directory: {} for namespace {}", tableDir, 378 namespacePath.getName(), e); 379 return Stream.empty(); 380 } 381 }).toList(); 382 } 383 384 private List<RegionInfo> scanRegionsInTable(FileSystem fs, List<Path> regionDirs) 385 throws IOException { 386 return regionDirs.stream().map(regionDir -> { 387 String encodedRegionName = regionDir.getName(); 388 try { 389 Path regionInfoPath = new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE); 390 if (fs.exists(regionInfoPath)) { 391 RegionInfo ri = readRegionInfo(fs, regionInfoPath); 392 if (ri != null && isValidRegionInfo(ri, encodedRegionName)) { 393 LOG.debug("Found region: {} -> {}", encodedRegionName, ri.getRegionNameAsString()); 394 return ri; 395 } else { 396 LOG.warn("Invalid RegionInfo in file: {}", regionInfoPath); 397 } 398 } else { 399 LOG.debug("No .regioninfo file found in region directory: {}", regionDir); 400 } 401 } catch (Exception e) { 402 LOG.warn("Failed to read region info from directory: {}", encodedRegionName, e); 403 } 404 return null; 405 }).filter(Objects::nonNull).collect(Collectors.toList()); 406 } 407 408 private boolean isValidRegionInfo(RegionInfo regionInfo, String expectedEncodedName) { 409 if (!expectedEncodedName.equals(regionInfo.getEncodedName())) { 410 LOG.warn("RegionInfo encoded name mismatch: directory={}, regioninfo={}", expectedEncodedName, 411 regionInfo.getEncodedName()); 412 return false; 413 } 414 return true; 415 } 416 417 private RegionInfo readRegionInfo(FileSystem fs, Path regionInfoPath) { 418 try (FSDataInputStream inputStream = fs.open(regionInfoPath); 419 DataInputStream dataInputStream = new DataInputStream(inputStream)) { 420 return RegionInfo.parseFrom(dataInputStream); 421 } catch (Exception e) { 422 LOG.warn("Failed to parse .regioninfo file: {}", regionInfoPath, e); 423 return null; 424 } 425 } 426 427 /** 428 * Retrieves the current regions from the hbase:meta table. 429 * @param connection The HBase connection to use. 430 * @return List of RegionInfo objects representing the current regions in meta. 431 * @throws IOException If there is an error accessing the meta table. 432 */ 433 List<RegionInfo> getCurrentRegions(Connection connection) throws IOException { 434 LOG.info("Getting all regions from meta table"); 435 return MetaTableAccessor.getAllRegions(connection, true); 436 } 437 438 @Override 439 protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { 440 setState( 441 org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState.RUNNABLE); 442 env.getProcedureScheduler().addFront(this); 443 return false; 444 } 445 446 @Override 447 protected void rollbackState(MasterProcedureEnv env, RefreshMetaState refreshMetaState) 448 throws IOException, InterruptedException { 449 // No specific rollback needed as it is generally safe to re-run the procedure. 450 LOG.trace("Rollback not implemented for RefreshMetaProcedure state: {}", refreshMetaState); 451 } 452 453 @Override 454 protected RefreshMetaState getState(int stateId) { 455 return RefreshMetaState.forNumber(stateId); 456 } 457 458 @Override 459 protected int getStateId(RefreshMetaState refreshMetaState) { 460 return refreshMetaState.getNumber(); 461 } 462 463 @Override 464 protected RefreshMetaState getInitialState() { 465 return RefreshMetaState.REFRESH_META_INIT; 466 } 467 468 @Override 469 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 470 // For now, we'll use a simple approach since we do not need to store any state data 471 RefreshMetaStateData.Builder builder = RefreshMetaStateData.newBuilder(); 472 serializer.serialize(builder.build()); 473 } 474 475 @Override 476 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 477 // For now, we'll use a simple approach since we do not need to store any state data 478 serializer.deserialize(RefreshMetaStateData.class); 479 } 480}