001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.List; 025import java.util.stream.Collectors; 026import org.apache.hadoop.hbase.HRegionLocation; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.master.assignment.RegionStateNode; 029import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; 030import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 031import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 032import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 033import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.RetryCounter; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 041import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 042 043import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 044import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsState; 045import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReopenTableRegionsStateData; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 047 048/** 049 * Used for reopening the regions for a table. 050 */ 051@InterfaceAudience.Private 052public class ReopenTableRegionsProcedure 053 extends AbstractStateMachineTableProcedure<ReopenTableRegionsState> { 054 055 private static final Logger LOG = LoggerFactory.getLogger(ReopenTableRegionsProcedure.class); 056 057 public static final String PROGRESSIVE_BATCH_BACKOFF_MILLIS_KEY = 058 "hbase.reopen.table.regions.progressive.batch.backoff.ms"; 059 public static final long PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT = 0L; 060 public static final String PROGRESSIVE_BATCH_SIZE_MAX_KEY = 061 "hbase.reopen.table.regions.progressive.batch.size.max"; 062 public static final int PROGRESSIVE_BATCH_SIZE_MAX_DISABLED = -1; 063 private static final int PROGRESSIVE_BATCH_SIZE_MAX_DEFAULT_VALUE = Integer.MAX_VALUE; 064 065 // this minimum prevents a max which would break this procedure 066 private static final int MINIMUM_BATCH_SIZE_MAX = 1; 067 068 private TableName tableName; 069 070 // Specify specific regions of a table to reopen. 071 // if specified null, all regions of the table will be reopened. 072 private List<byte[]> regionNames; 073 074 private List<HRegionLocation> regions = Collections.emptyList(); 075 076 private List<HRegionLocation> currentRegionBatch = Collections.emptyList(); 077 078 private RetryCounter retryCounter; 079 080 private long reopenBatchBackoffMillis; 081 private int reopenBatchSize; 082 private int reopenBatchSizeMax; 083 private long regionsReopened = 0; 084 private long batchesProcessed = 0; 085 086 public ReopenTableRegionsProcedure() { 087 this(null); 088 } 089 090 public ReopenTableRegionsProcedure(TableName tableName) { 091 this(tableName, Collections.emptyList()); 092 } 093 094 public ReopenTableRegionsProcedure(final TableName tableName, final List<byte[]> regionNames) { 095 this(tableName, regionNames, PROGRESSIVE_BATCH_BACKOFF_MILLIS_DEFAULT, 096 PROGRESSIVE_BATCH_SIZE_MAX_DISABLED); 097 } 098 099 public ReopenTableRegionsProcedure(final TableName tableName, long reopenBatchBackoffMillis, 100 int reopenBatchSizeMax) { 101 this(tableName, Collections.emptyList(), reopenBatchBackoffMillis, reopenBatchSizeMax); 102 } 103 104 public ReopenTableRegionsProcedure(final TableName tableName, final List<byte[]> regionNames, 105 long reopenBatchBackoffMillis, int reopenBatchSizeMax) { 106 this.tableName = tableName; 107 this.regionNames = regionNames; 108 this.reopenBatchBackoffMillis = reopenBatchBackoffMillis; 109 if (reopenBatchSizeMax == PROGRESSIVE_BATCH_SIZE_MAX_DISABLED) { 110 this.reopenBatchSize = Integer.MAX_VALUE; 111 this.reopenBatchSizeMax = Integer.MAX_VALUE; 112 } else { 113 this.reopenBatchSize = 1; 114 this.reopenBatchSizeMax = Math.max(reopenBatchSizeMax, MINIMUM_BATCH_SIZE_MAX); 115 } 116 } 117 118 @Override 119 public TableName getTableName() { 120 return tableName; 121 } 122 123 @Override 124 public TableOperationType getTableOperationType() { 125 return TableOperationType.REGION_EDIT; 126 } 127 128 @RestrictedApi(explanation = "Should only be called in tests", link = "", 129 allowedOnPath = ".*/src/test/.*") 130 public long getRegionsReopened() { 131 return regionsReopened; 132 } 133 134 @RestrictedApi(explanation = "Should only be called in tests", link = "", 135 allowedOnPath = ".*/src/test/.*") 136 public long getBatchesProcessed() { 137 return batchesProcessed; 138 } 139 140 @RestrictedApi(explanation = "Should only be called internally or in tests", link = "", 141 allowedOnPath = ".*(/src/test/.*|ReopenTableRegionsProcedure).java") 142 protected int progressBatchSize() { 143 int previousBatchSize = reopenBatchSize; 144 reopenBatchSize = Math.min(reopenBatchSizeMax, 2 * reopenBatchSize); 145 if (reopenBatchSize < previousBatchSize) { 146 // the batch size should never decrease. this must be overflow, so just use max 147 reopenBatchSize = reopenBatchSizeMax; 148 } 149 return reopenBatchSize; 150 } 151 152 private boolean canSchedule(MasterProcedureEnv env, HRegionLocation loc) { 153 if (loc.getSeqNum() < 0) { 154 return false; 155 } 156 RegionStateNode regionNode = 157 env.getAssignmentManager().getRegionStates().getRegionStateNode(loc.getRegion()); 158 // If the region node is null, then at least in the next round we can remove this region to make 159 // progress. And the second condition is a normal one, if there are no TRSP with it then we can 160 // schedule one to make progress. 161 return regionNode == null || !regionNode.isInTransition(); 162 } 163 164 @Override 165 protected Flow executeFromState(MasterProcedureEnv env, ReopenTableRegionsState state) 166 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 167 switch (state) { 168 case REOPEN_TABLE_REGIONS_GET_REGIONS: 169 if (!isTableEnabled(env)) { 170 LOG.info("Table {} is disabled, give up reopening its regions", tableName); 171 return Flow.NO_MORE_STATE; 172 } 173 List<HRegionLocation> tableRegions = 174 env.getAssignmentManager().getRegionStates().getRegionsOfTableForReopen(tableName); 175 regions = getRegionLocationsForReopen(tableRegions); 176 setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS); 177 return Flow.HAS_MORE_STATE; 178 case REOPEN_TABLE_REGIONS_REOPEN_REGIONS: 179 // if we didn't finish reopening the last batch yet, let's keep trying until we do. 180 // at that point, the batch will be empty and we can generate a new batch 181 if (!regions.isEmpty() && currentRegionBatch.isEmpty()) { 182 currentRegionBatch = regions.stream().limit(reopenBatchSize).collect(Collectors.toList()); 183 batchesProcessed++; 184 } 185 for (HRegionLocation loc : currentRegionBatch) { 186 RegionStateNode regionNode = 187 env.getAssignmentManager().getRegionStates().getRegionStateNode(loc.getRegion()); 188 // this possible, maybe the region has already been merged or split, see HBASE-20921 189 if (regionNode == null) { 190 continue; 191 } 192 TransitRegionStateProcedure proc; 193 regionNode.lock(); 194 try { 195 if (regionNode.getProcedure() != null) { 196 continue; 197 } 198 proc = TransitRegionStateProcedure.reopen(env, regionNode.getRegionInfo()); 199 regionNode.setProcedure(proc); 200 } finally { 201 regionNode.unlock(); 202 } 203 addChildProcedure(proc); 204 regionsReopened++; 205 } 206 setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_CONFIRM_REOPENED); 207 return Flow.HAS_MORE_STATE; 208 case REOPEN_TABLE_REGIONS_CONFIRM_REOPENED: 209 // update region lists based on what's been reopened 210 regions = filterReopened(env, regions); 211 currentRegionBatch = filterReopened(env, currentRegionBatch); 212 213 // existing batch didn't fully reopen, so try to resolve that first. 214 // since this is a retry, don't do the batch backoff 215 if (!currentRegionBatch.isEmpty()) { 216 return reopenIfSchedulable(env, currentRegionBatch, false); 217 } 218 219 if (regions.isEmpty()) { 220 return Flow.NO_MORE_STATE; 221 } 222 223 // current batch is finished, schedule more regions 224 return reopenIfSchedulable(env, regions, true); 225 default: 226 throw new UnsupportedOperationException("unhandled state=" + state); 227 } 228 } 229 230 private List<HRegionLocation> filterReopened(MasterProcedureEnv env, 231 List<HRegionLocation> regionsToCheck) { 232 return regionsToCheck.stream().map(env.getAssignmentManager().getRegionStates()::checkReopened) 233 .filter(l -> l != null).collect(Collectors.toList()); 234 } 235 236 private Flow reopenIfSchedulable(MasterProcedureEnv env, List<HRegionLocation> regionsToReopen, 237 boolean shouldBatchBackoff) throws ProcedureSuspendedException { 238 if (regionsToReopen.stream().anyMatch(loc -> canSchedule(env, loc))) { 239 retryCounter = null; 240 setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS); 241 if (shouldBatchBackoff && reopenBatchBackoffMillis > 0) { 242 progressBatchSize(); 243 setBackoffState(reopenBatchBackoffMillis); 244 throw new ProcedureSuspendedException(); 245 } else { 246 return Flow.HAS_MORE_STATE; 247 } 248 } 249 250 // We can not schedule TRSP for all the regions need to reopen, wait for a while and retry 251 // again. 252 if (retryCounter == null) { 253 retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); 254 } 255 long backoffMillis = retryCounter.getBackoffTimeAndIncrementAttempts(); 256 LOG.info( 257 "There are still {} region(s) which need to be reopened for table {}. {} are in " 258 + "OPENING state, suspend {}secs and try again later", 259 regions.size(), tableName, currentRegionBatch.size(), backoffMillis / 1000); 260 setBackoffState(backoffMillis); 261 throw new ProcedureSuspendedException(); 262 } 263 264 private void setBackoffState(long millis) { 265 setTimeout(Math.toIntExact(millis)); 266 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 267 skipPersistence(); 268 } 269 270 private List<HRegionLocation> 271 getRegionLocationsForReopen(List<HRegionLocation> tableRegionsForReopen) { 272 273 List<HRegionLocation> regionsToReopen = new ArrayList<>(); 274 if ( 275 CollectionUtils.isNotEmpty(regionNames) && CollectionUtils.isNotEmpty(tableRegionsForReopen) 276 ) { 277 for (byte[] regionName : regionNames) { 278 for (HRegionLocation hRegionLocation : tableRegionsForReopen) { 279 if (Bytes.equals(regionName, hRegionLocation.getRegion().getRegionName())) { 280 regionsToReopen.add(hRegionLocation); 281 break; 282 } 283 } 284 } 285 } else { 286 regionsToReopen = tableRegionsForReopen; 287 } 288 return regionsToReopen; 289 } 290 291 /** 292 * At end of timeout, wake ourselves up so we run again. 293 */ 294 @Override 295 protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { 296 setState(ProcedureProtos.ProcedureState.RUNNABLE); 297 env.getProcedureScheduler().addFront(this); 298 return false; // 'false' means that this procedure handled the timeout 299 } 300 301 @Override 302 protected void rollbackState(MasterProcedureEnv env, ReopenTableRegionsState state) 303 throws IOException, InterruptedException { 304 throw new UnsupportedOperationException("unhandled state=" + state); 305 } 306 307 @Override 308 protected ReopenTableRegionsState getState(int stateId) { 309 return ReopenTableRegionsState.forNumber(stateId); 310 } 311 312 @Override 313 protected int getStateId(ReopenTableRegionsState state) { 314 return state.getNumber(); 315 } 316 317 @Override 318 protected ReopenTableRegionsState getInitialState() { 319 return ReopenTableRegionsState.REOPEN_TABLE_REGIONS_GET_REGIONS; 320 } 321 322 @Override 323 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 324 super.serializeStateData(serializer); 325 ReopenTableRegionsStateData.Builder builder = ReopenTableRegionsStateData.newBuilder() 326 .setTableName(ProtobufUtil.toProtoTableName(tableName)); 327 regions.stream().map(ProtobufUtil::toRegionLocation).forEachOrdered(builder::addRegion); 328 if (CollectionUtils.isNotEmpty(regionNames)) { 329 // As of this writing, wrapping this statement withing if condition is only required 330 // for backward compatibility as we used to have 'regionNames' as null for cases 331 // where all regions of given table should be reopened. Now, we have kept emptyList() 332 // for 'regionNames' to indicate all regions of given table should be reopened unless 333 // 'regionNames' contains at least one specific region, in which case only list of regions 334 // that 'regionNames' contain should be reopened, not all regions of given table. 335 // Now, we don't need this check since we are not dealing with null 'regionNames' and hence, 336 // guarding by this if condition can be removed in HBase 4.0.0. 337 regionNames.stream().map(ByteString::copyFrom).forEachOrdered(builder::addRegionNames); 338 } 339 serializer.serialize(builder.build()); 340 } 341 342 @Override 343 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 344 super.deserializeStateData(serializer); 345 ReopenTableRegionsStateData data = serializer.deserialize(ReopenTableRegionsStateData.class); 346 tableName = ProtobufUtil.toTableName(data.getTableName()); 347 regions = data.getRegionList().stream().map(ProtobufUtil::toRegionLocation) 348 .collect(Collectors.toList()); 349 if (CollectionUtils.isNotEmpty(data.getRegionNamesList())) { 350 regionNames = data.getRegionNamesList().stream().map(ByteString::toByteArray) 351 .collect(Collectors.toList()); 352 } else { 353 regionNames = Collections.emptyList(); 354 } 355 } 356}