001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import static org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState.WAITING_TIMEOUT;
021
022import java.io.DataInputStream;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Objects;
030import java.util.Set;
031import java.util.function.Function;
032import java.util.stream.Collectors;
033import java.util.stream.Stream;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FSDataInputStream;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.MetaTableAccessor;
041import org.apache.hadoop.hbase.NamespaceDescriptor;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.Connection;
044import org.apache.hadoop.hbase.client.Delete;
045import org.apache.hadoop.hbase.client.Mutation;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.TableState;
049import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
050import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
051import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
052import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
053import org.apache.hadoop.hbase.util.CommonFSUtils;
054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
055import org.apache.hadoop.hbase.util.FSUtils;
056import org.apache.hadoop.hbase.util.RetryCounter;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
062
063import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshMetaState;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshMetaStateData;
065
066@InterfaceAudience.Private
067public class RefreshMetaProcedure extends AbstractStateMachineTableProcedure<RefreshMetaState> {
068  private static final Logger LOG = LoggerFactory.getLogger(RefreshMetaProcedure.class);
069  private static final String HIDDEN_DIR_PATTERN = "^[._-].*";
070
071  private List<RegionInfo> currentRegions;
072  private List<RegionInfo> latestRegions;
073  private List<Mutation> pendingMutations;
074  private RetryCounter retryCounter;
075  private static final int MUTATION_BATCH_SIZE = 100;
076  private List<RegionInfo> newlyAddedRegions;
077  private List<TableName> deletedTables;
078
079  public RefreshMetaProcedure() {
080    super();
081  }
082
083  public RefreshMetaProcedure(MasterProcedureEnv env) {
084    super(env);
085  }
086
087  @Override
088  public TableName getTableName() {
089    return TableName.META_TABLE_NAME;
090  }
091
092  @Override
093  public TableOperationType getTableOperationType() {
094    return TableOperationType.EDIT;
095  }
096
097  @Override
098  protected Flow executeFromState(MasterProcedureEnv env, RefreshMetaState refreshMetaState) {
099    LOG.info("Executing RefreshMetaProcedure state: {}", refreshMetaState);
100
101    try {
102      return switch (refreshMetaState) {
103        case REFRESH_META_INIT -> executeInit(env);
104        case REFRESH_META_SCAN_STORAGE -> executeScanStorage(env);
105        case REFRESH_META_PREPARE -> executePrepare();
106        case REFRESH_META_APPLY -> executeApply(env);
107        case REFRESH_META_FOLLOWUP -> executeFollowup(env);
108        case REFRESH_META_FINISH -> executeFinish(env);
109        default -> throw new UnsupportedOperationException("Unhandled state: " + refreshMetaState);
110      };
111    } catch (Exception ex) {
112      LOG.error("Error in RefreshMetaProcedure state {}", refreshMetaState, ex);
113      setFailure("RefreshMetaProcedure", ex);
114      return Flow.NO_MORE_STATE;
115    }
116  }
117
118  private Flow executeInit(MasterProcedureEnv env) throws IOException {
119    LOG.trace("Getting current regions from {} table", TableName.META_TABLE_NAME);
120    try {
121      currentRegions = getCurrentRegions(env.getMasterServices().getConnection());
122      LOG.info("Found {} current regions in meta table", currentRegions.size());
123      setNextState(RefreshMetaState.REFRESH_META_SCAN_STORAGE);
124      return Flow.HAS_MORE_STATE;
125    } catch (IOException ioe) {
126      LOG.error("Failed to get current regions from meta table", ioe);
127      throw ioe;
128    }
129  }
130
131  private Flow executeScanStorage(MasterProcedureEnv env) throws IOException {
132    try {
133      latestRegions = scanBackingStorage(env.getMasterServices().getConnection());
134      LOG.info("Found {} regions in backing storage", latestRegions.size());
135      setNextState(RefreshMetaState.REFRESH_META_PREPARE);
136      return Flow.HAS_MORE_STATE;
137    } catch (IOException ioe) {
138      LOG.error("Failed to scan backing storage", ioe);
139      throw ioe;
140    }
141  }
142
143  private Flow executePrepare() throws IOException {
144    if (currentRegions == null || latestRegions == null) {
145      LOG.error(
146        "Can not execute update on null lists. " + "Meta Table Regions - {}, Storage Regions - {}",
147        currentRegions, latestRegions);
148      throw new IOException(
149        (currentRegions == null ? "current regions" : "latest regions") + " list is null");
150    }
151    LOG.info("Comparing regions. Current regions: {}, Latest regions: {}", currentRegions.size(),
152      latestRegions.size());
153
154    this.newlyAddedRegions = new ArrayList<>();
155    this.deletedTables = new ArrayList<>();
156
157    pendingMutations = prepareMutations(
158      currentRegions.stream()
159        .collect(Collectors.toMap(RegionInfo::getEncodedName, Function.identity())),
160      latestRegions.stream()
161        .collect(Collectors.toMap(RegionInfo::getEncodedName, Function.identity())));
162
163    if (pendingMutations.isEmpty()) {
164      LOG.info("RefreshMetaProcedure completed, No update needed.");
165      setNextState(RefreshMetaState.REFRESH_META_FINISH);
166    } else {
167      LOG.info("Prepared {} region mutations and {} tables for cleanup.", pendingMutations.size(),
168        deletedTables.size());
169      setNextState(RefreshMetaState.REFRESH_META_APPLY);
170    }
171    return Flow.HAS_MORE_STATE;
172  }
173
174  private Flow executeApply(MasterProcedureEnv env) throws ProcedureSuspendedException {
175    try {
176      if (pendingMutations != null && !pendingMutations.isEmpty()) {
177        applyMutations(env.getMasterServices().getConnection(), pendingMutations);
178        LOG.debug("RefreshMetaProcedure applied {} mutations to meta table",
179          pendingMutations.size());
180      }
181    } catch (IOException ioe) {
182      if (retryCounter == null) {
183        retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
184      }
185      long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
186      LOG.warn("Failed to apply mutations to meta table, suspending for {} ms", backoff, ioe);
187      setTimeout(Math.toIntExact(backoff));
188      setState(WAITING_TIMEOUT);
189      skipPersistence();
190      throw new ProcedureSuspendedException();
191    }
192
193    if (
194      (this.newlyAddedRegions != null && !this.newlyAddedRegions.isEmpty())
195        || (this.deletedTables != null && !this.deletedTables.isEmpty())
196    ) {
197      setNextState(RefreshMetaState.REFRESH_META_FOLLOWUP);
198    } else {
199      LOG.info("RefreshMetaProcedure completed. No follow-up actions were required.");
200      setNextState(RefreshMetaState.REFRESH_META_FINISH);
201    }
202    return Flow.HAS_MORE_STATE;
203  }
204
205  private Flow executeFollowup(MasterProcedureEnv env) throws IOException {
206
207    LOG.info("Submitting assignment for new regions: {}", this.newlyAddedRegions);
208    addChildProcedure(env.getAssignmentManager().createAssignProcedures(newlyAddedRegions));
209
210    for (TableName tableName : this.deletedTables) {
211      LOG.debug("Submitting deletion for empty table {}", tableName);
212      env.getMasterServices().getAssignmentManager().deleteTable(tableName);
213      env.getMasterServices().getTableStateManager().setDeletedTable(tableName);
214      env.getMasterServices().getTableDescriptors().remove(tableName);
215    }
216    setNextState(RefreshMetaState.REFRESH_META_FINISH);
217    return Flow.HAS_MORE_STATE;
218  }
219
220  private Flow executeFinish(MasterProcedureEnv env) {
221    invalidateTableDescriptorCache(env);
222    LOG.info("RefreshMetaProcedure completed successfully. All follow-up actions finished.");
223    currentRegions = null;
224    latestRegions = null;
225    pendingMutations = null;
226    deletedTables = null;
227    newlyAddedRegions = null;
228    return Flow.NO_MORE_STATE;
229  }
230
231  private void invalidateTableDescriptorCache(MasterProcedureEnv env) {
232    LOG.debug("Invalidating the table descriptor cache to ensure new tables are discovered");
233    env.getMasterServices().getTableDescriptors().invalidateTableDescriptorCache();
234  }
235
236  /**
237   * Prepares mutations by comparing the current regions in hbase:meta with the latest regions from
238   * backing storage. Also populates newlyAddedRegions and deletedTables lists for follow-up
239   * actions.
240   * @param currentMap Current regions from hbase:meta
241   * @param latestMap  Latest regions from backing storage
242   * @return List of mutations to apply to the meta table
243   * @throws IOException If there is an error creating mutations
244   */
245  private List<Mutation> prepareMutations(Map<String, RegionInfo> currentMap,
246    Map<String, RegionInfo> latestMap) throws IOException {
247    List<Mutation> mutations = new ArrayList<>();
248
249    for (String regionId : Stream.concat(currentMap.keySet().stream(), latestMap.keySet().stream())
250      .collect(Collectors.toSet())) {
251      RegionInfo currentRegion = currentMap.get(regionId);
252      RegionInfo latestRegion = latestMap.get(regionId);
253
254      if (latestRegion != null) {
255        if (currentRegion == null || hasBoundaryChanged(currentRegion, latestRegion)) {
256          mutations.add(MetaTableAccessor.makePutFromRegionInfo(latestRegion));
257          newlyAddedRegions.add(latestRegion);
258        }
259      } else {
260        mutations.add(MetaTableAccessor.makeDeleteFromRegionInfo(currentRegion,
261          EnvironmentEdgeManager.currentTime()));
262      }
263    }
264
265    if (!currentMap.isEmpty() || !latestMap.isEmpty()) {
266      Set<TableName> currentTables =
267        currentMap.values().stream().map(RegionInfo::getTable).collect(Collectors.toSet());
268      Set<TableName> latestTables =
269        latestMap.values().stream().map(RegionInfo::getTable).collect(Collectors.toSet());
270
271      Set<TableName> tablesToDeleteState = new HashSet<>(currentTables);
272      tablesToDeleteState.removeAll(latestTables);
273      if (!tablesToDeleteState.isEmpty()) {
274        LOG.warn(
275          "The following tables have no regions on storage and WILL BE REMOVED from the meta: {}",
276          tablesToDeleteState);
277        this.deletedTables.addAll(tablesToDeleteState);
278      }
279
280      Set<TableName> tablesToRestoreState = new HashSet<>(latestTables);
281      tablesToRestoreState.removeAll(currentTables);
282      if (!tablesToRestoreState.isEmpty()) {
283        LOG.info("Adding missing table:state entry for recovered tables: {}", tablesToRestoreState);
284        for (TableName tableName : tablesToRestoreState) {
285          TableState tableState = new TableState(tableName, TableState.State.ENABLED);
286          mutations.add(MetaTableAccessor.makePutFromTableState(tableState,
287            EnvironmentEdgeManager.currentTime()));
288        }
289      }
290    }
291    return mutations;
292  }
293
294  private void applyMutations(Connection connection, List<Mutation> mutations) throws IOException {
295    List<List<Mutation>> chunks = Lists.partition(mutations, MUTATION_BATCH_SIZE);
296
297    for (int i = 0; i < chunks.size(); i++) {
298      List<Mutation> chunk = chunks.get(i);
299
300      List<Put> puts =
301        chunk.stream().filter(m -> m instanceof Put).map(m -> (Put) m).collect(Collectors.toList());
302
303      List<Delete> deletes = chunk.stream().filter(m -> m instanceof Delete).map(m -> (Delete) m)
304        .collect(Collectors.toList());
305
306      if (!puts.isEmpty()) {
307        MetaTableAccessor.putsToMetaTable(connection, puts);
308      }
309      if (!deletes.isEmpty()) {
310        MetaTableAccessor.deleteFromMetaTable(connection, deletes);
311      }
312      LOG.debug("Successfully processed batch {}/{}", i + 1, chunks.size());
313    }
314  }
315
316  boolean hasBoundaryChanged(RegionInfo region1, RegionInfo region2) {
317    return !Arrays.equals(region1.getStartKey(), region2.getStartKey())
318      || !Arrays.equals(region1.getEndKey(), region2.getEndKey());
319  }
320
321  /**
322   * Scans the backing storage for all regions and returns a list of RegionInfo objects. This method
323   * scans the filesystem for region directories and reads their .regioninfo files.
324   * @param connection The HBase connection to use.
325   * @return List of RegionInfo objects found in the backing storage.
326   * @throws IOException If there is an error accessing the filesystem or reading region info files.
327   */
328  List<RegionInfo> scanBackingStorage(Connection connection) throws IOException {
329    List<RegionInfo> regions = new ArrayList<>();
330    Configuration conf = connection.getConfiguration();
331    FileSystem fs = FileSystem.get(conf);
332    Path rootDir = CommonFSUtils.getRootDir(conf);
333    Path dataDir = new Path(rootDir, HConstants.BASE_NAMESPACE_DIR);
334
335    LOG.info("Scanning backing storage under: {}", dataDir);
336
337    if (!fs.exists(dataDir)) {
338      LOG.warn("Data directory does not exist: {}", dataDir);
339      return regions;
340    }
341
342    FileStatus[] namespaceDirs =
343      fs.listStatus(dataDir, path -> !path.getName().matches(HIDDEN_DIR_PATTERN));
344    LOG.debug("Found {} namespace directories in data dir", Arrays.stream(namespaceDirs).toList());
345
346    for (FileStatus nsDir : namespaceDirs) {
347      String namespaceName = nsDir.getPath().getName();
348      if (NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR.equals(namespaceName)) {
349        LOG.info("Skipping system namespace {}", namespaceName);
350        continue;
351      }
352      try {
353        List<RegionInfo> namespaceRegions = scanTablesInNamespace(fs, nsDir.getPath());
354        regions.addAll(namespaceRegions);
355        LOG.debug("Found {} regions in namespace {}", namespaceRegions.size(),
356          nsDir.getPath().getName());
357      } catch (IOException e) {
358        LOG.error("Failed to scan namespace directory: {}", nsDir.getPath(), e);
359      }
360    }
361    LOG.info("Scanned backing storage and found {} regions", regions.size());
362    return regions;
363  }
364
365  private List<RegionInfo> scanTablesInNamespace(FileSystem fs, Path namespacePath)
366    throws IOException {
367    LOG.debug("Scanning namespace {}", namespacePath.getName());
368    List<Path> tableDirs = FSUtils.getLocalTableDirs(fs, namespacePath);
369
370    return tableDirs.stream().flatMap(tableDir -> {
371      try {
372        List<RegionInfo> tableRegions = scanRegionsInTable(fs, FSUtils.getRegionDirs(fs, tableDir));
373        LOG.debug("Found {} regions in table {} in namespace {}", tableRegions.size(),
374          tableDir.getName(), namespacePath.getName());
375        return tableRegions.stream();
376      } catch (IOException e) {
377        LOG.warn("Failed to scan table directory: {} for namespace {}", tableDir,
378          namespacePath.getName(), e);
379        return Stream.empty();
380      }
381    }).toList();
382  }
383
384  private List<RegionInfo> scanRegionsInTable(FileSystem fs, List<Path> regionDirs)
385    throws IOException {
386    return regionDirs.stream().map(regionDir -> {
387      String encodedRegionName = regionDir.getName();
388      try {
389        Path regionInfoPath = new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE);
390        if (fs.exists(regionInfoPath)) {
391          RegionInfo ri = readRegionInfo(fs, regionInfoPath);
392          if (ri != null && isValidRegionInfo(ri, encodedRegionName)) {
393            LOG.debug("Found region: {} -> {}", encodedRegionName, ri.getRegionNameAsString());
394            return ri;
395          } else {
396            LOG.warn("Invalid RegionInfo in file: {}", regionInfoPath);
397          }
398        } else {
399          LOG.debug("No .regioninfo file found in region directory: {}", regionDir);
400        }
401      } catch (Exception e) {
402        LOG.warn("Failed to read region info from directory: {}", encodedRegionName, e);
403      }
404      return null;
405    }).filter(Objects::nonNull).collect(Collectors.toList());
406  }
407
408  private boolean isValidRegionInfo(RegionInfo regionInfo, String expectedEncodedName) {
409    if (!expectedEncodedName.equals(regionInfo.getEncodedName())) {
410      LOG.warn("RegionInfo encoded name mismatch: directory={}, regioninfo={}", expectedEncodedName,
411        regionInfo.getEncodedName());
412      return false;
413    }
414    return true;
415  }
416
417  private RegionInfo readRegionInfo(FileSystem fs, Path regionInfoPath) {
418    try (FSDataInputStream inputStream = fs.open(regionInfoPath);
419      DataInputStream dataInputStream = new DataInputStream(inputStream)) {
420      return RegionInfo.parseFrom(dataInputStream);
421    } catch (Exception e) {
422      LOG.warn("Failed to parse .regioninfo file: {}", regionInfoPath, e);
423      return null;
424    }
425  }
426
427  /**
428   * Retrieves the current regions from the hbase:meta table.
429   * @param connection The HBase connection to use.
430   * @return List of RegionInfo objects representing the current regions in meta.
431   * @throws IOException If there is an error accessing the meta table.
432   */
433  List<RegionInfo> getCurrentRegions(Connection connection) throws IOException {
434    LOG.info("Getting all regions from meta table");
435    return MetaTableAccessor.getAllRegions(connection, true);
436  }
437
438  @Override
439  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
440    setState(
441      org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState.RUNNABLE);
442    env.getProcedureScheduler().addFront(this);
443    return false;
444  }
445
446  @Override
447  protected void rollbackState(MasterProcedureEnv env, RefreshMetaState refreshMetaState)
448    throws IOException, InterruptedException {
449    // No specific rollback needed as it is generally safe to re-run the procedure.
450    LOG.trace("Rollback not implemented for RefreshMetaProcedure state: {}", refreshMetaState);
451  }
452
453  @Override
454  protected RefreshMetaState getState(int stateId) {
455    return RefreshMetaState.forNumber(stateId);
456  }
457
458  @Override
459  protected int getStateId(RefreshMetaState refreshMetaState) {
460    return refreshMetaState.getNumber();
461  }
462
463  @Override
464  protected RefreshMetaState getInitialState() {
465    return RefreshMetaState.REFRESH_META_INIT;
466  }
467
468  @Override
469  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
470    // For now, we'll use a simple approach since we do not need to store any state data
471    RefreshMetaStateData.Builder builder = RefreshMetaStateData.newBuilder();
472    serializer.serialize(builder.build());
473  }
474
475  @Override
476  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
477    // For now, we'll use a simple approach since we do not need to store any state data
478    serializer.deserialize(RefreshMetaStateData.class);
479  }
480}