001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import java.util.Optional; 022import org.apache.hadoop.hbase.ServerName; 023import org.apache.hadoop.hbase.TableName; 024import org.apache.hadoop.hbase.client.RegionInfo; 025import org.apache.hadoop.hbase.master.RegionState; 026import org.apache.hadoop.hbase.master.assignment.RegionStateNode; 027import org.apache.hadoop.hbase.master.assignment.RegionStates; 028import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; 029import org.apache.hadoop.hbase.procedure2.Procedure; 030import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 031import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 032import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 033import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 034import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 035import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; 036import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; 037import org.apache.hadoop.hbase.regionserver.RefreshHFilesCallable; 038import org.apache.hadoop.hbase.util.RetryCounter; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 044import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; 045import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 046 047/** 048 * A master-side procedure that handles refreshing HFiles (store files) for a specific region in 049 * HBase. It performs remote procedure dispatch to the RegionServer hosting the region and manages 050 * retries, suspensions, and timeouts as needed. This procedure ensures safe execution by verifying 051 * the region state, handling remote operation results, and applying retry mechanisms in case of 052 * failures. It gives the call to {@link RefreshHFilesCallable} which gets executed on region 053 * server. 054 */ 055 056@InterfaceAudience.Private 057public class RefreshHFilesRegionProcedure extends Procedure<MasterProcedureEnv> 058 implements TableProcedureInterface, 059 RemoteProcedureDispatcher.RemoteProcedure<MasterProcedureEnv, ServerName> { 060 private static final Logger LOG = LoggerFactory.getLogger(RefreshHFilesRegionProcedure.class); 061 private RegionInfo region; 062 private ProcedureEvent<?> event; 063 private boolean dispatched; 064 private boolean succ; 065 private RetryCounter retryCounter; 066 067 public RefreshHFilesRegionProcedure() { 068 } 069 070 public RefreshHFilesRegionProcedure(RegionInfo region) { 071 this.region = region; 072 } 073 074 @Override 075 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 076 MasterProcedureProtos.RefreshHFilesRegionProcedureStateData data = 077 serializer.deserialize(MasterProcedureProtos.RefreshHFilesRegionProcedureStateData.class); 078 this.region = ProtobufUtil.toRegionInfo(data.getRegion()); 079 } 080 081 @Override 082 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 083 MasterProcedureProtos.RefreshHFilesRegionProcedureStateData.Builder builder = 084 MasterProcedureProtos.RefreshHFilesRegionProcedureStateData.newBuilder(); 085 builder.setRegion(ProtobufUtil.toRegionInfo(region)); 086 serializer.serialize(builder.build()); 087 } 088 089 @Override 090 protected boolean abort(MasterProcedureEnv env) { 091 return false; 092 } 093 094 @Override 095 protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { 096 throw new UnsupportedOperationException(); 097 } 098 099 private void setTimeoutForSuspend(MasterProcedureEnv env, String reason) { 100 if (retryCounter == null) { 101 retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); 102 } 103 long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); 104 LOG.warn("{} can not run currently because {}, wait {} ms to retry", this, reason, backoff); 105 setTimeout(Math.toIntExact(backoff)); 106 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 107 skipPersistence(); 108 } 109 110 @Override 111 protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env) 112 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 113 if (dispatched) { 114 if (succ) { 115 return null; 116 } 117 dispatched = false; 118 } 119 120 RegionStates regionStates = env.getAssignmentManager().getRegionStates(); 121 RegionStateNode regionNode = regionStates.getRegionStateNode(region); 122 123 if (regionNode.getProcedure() != null) { 124 setTimeoutForSuspend(env, String.format("region %s has a TRSP attached %s", 125 region.getRegionNameAsString(), regionNode.getProcedure())); 126 throw new ProcedureSuspendedException(); 127 } 128 129 if (!regionNode.isInState(RegionState.State.OPEN)) { 130 LOG.warn("State of region {} is not OPEN. Skip {} ...", region, this); 131 setTimeoutForSuspend(env, String.format("region state of %s is %s", 132 region.getRegionNameAsString(), regionNode.getState())); 133 throw new ProcedureSuspendedException(); 134 } 135 136 ServerName targetServer = regionNode.getRegionLocation(); 137 if (targetServer == null) { 138 setTimeoutForSuspend(env, 139 String.format("target server of region %s is null", region.getRegionNameAsString())); 140 throw new ProcedureSuspendedException(); 141 } 142 143 try { 144 env.getRemoteDispatcher().addOperationToNode(targetServer, this); 145 dispatched = true; 146 event = new ProcedureEvent<>(this); 147 event.suspendIfNotReady(this); 148 throw new ProcedureSuspendedException(); 149 } catch (FailedRemoteDispatchException e) { 150 setTimeoutForSuspend(env, "Failed send request to " + targetServer); 151 throw new ProcedureSuspendedException(); 152 } 153 } 154 155 @Override 156 public TableOperationType getTableOperationType() { 157 return TableOperationType.REFRESH_HFILES; 158 } 159 160 @Override 161 public TableName getTableName() { 162 return region.getTable(); 163 } 164 165 @Override 166 public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { 167 complete(env, error); 168 } 169 170 @Override 171 public void remoteOperationCompleted(MasterProcedureEnv env, byte[] remoteResultData) { 172 complete(env, null); 173 } 174 175 @Override 176 public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, IOException e) { 177 complete(env, e); 178 } 179 180 private void complete(MasterProcedureEnv env, Throwable error) { 181 if (isFinished()) { 182 LOG.info("This procedure {} is already finished. Skip the rest of the processes", 183 this.getProcId()); 184 return; 185 } 186 if (event == null) { 187 LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", 188 getProcId()); 189 return; 190 } 191 if (error == null) { 192 succ = true; 193 } 194 event.wake(env.getProcedureScheduler()); 195 event = null; 196 } 197 198 @Override 199 public Optional<RemoteProcedureDispatcher.RemoteOperation> remoteCallBuild(MasterProcedureEnv env, 200 ServerName serverName) { 201 MasterProcedureProtos.RefreshHFilesRegionParameter.Builder builder = 202 MasterProcedureProtos.RefreshHFilesRegionParameter.newBuilder(); 203 builder.setRegion(ProtobufUtil.toRegionInfo(region)); 204 return Optional 205 .of(new RSProcedureDispatcher.ServerOperation(this, getProcId(), RefreshHFilesCallable.class, 206 builder.build().toByteArray(), env.getMasterServices().getMasterActiveTime())); 207 } 208}