001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.Optional; 023import java.util.stream.Collectors; 024import org.apache.hadoop.hbase.ServerName; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.hadoop.hbase.client.RegionInfo; 027import org.apache.hadoop.hbase.master.RegionState.State; 028import org.apache.hadoop.hbase.master.assignment.RegionStateNode; 029import org.apache.hadoop.hbase.master.assignment.RegionStates; 030import org.apache.hadoop.hbase.master.assignment.ServerState; 031import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; 032import org.apache.hadoop.hbase.procedure2.Procedure; 033import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 034import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 035import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 036import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 037import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 038import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; 039import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; 040import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; 041import org.apache.hadoop.hbase.regionserver.FlushRegionCallable; 042import org.apache.hadoop.hbase.util.RetryCounter; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 048import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 049 050import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionParameter; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionProcedureStateData; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 054 055@InterfaceAudience.Private 056public class FlushRegionProcedure extends Procedure<MasterProcedureEnv> 057 implements TableProcedureInterface, RemoteProcedure<MasterProcedureEnv, ServerName> { 058 private static final Logger LOG = LoggerFactory.getLogger(FlushRegionProcedure.class); 059 060 private RegionInfo region; 061 private List<byte[]> columnFamilies; 062 private ProcedureEvent<?> event; 063 private boolean dispatched; 064 private boolean succ; 065 private RetryCounter retryCounter; 066 067 public FlushRegionProcedure() { 068 } 069 070 public FlushRegionProcedure(RegionInfo region) { 071 this(region, null); 072 } 073 074 public FlushRegionProcedure(RegionInfo region, List<byte[]> columnFamilies) { 075 this.region = region; 076 this.columnFamilies = columnFamilies; 077 } 078 079 @Override 080 protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env) 081 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 082 if (dispatched) { 083 if (succ) { 084 return null; 085 } 086 dispatched = false; 087 } 088 089 RegionStates regionStates = env.getAssignmentManager().getRegionStates(); 090 RegionStateNode regionNode = regionStates.getRegionStateNode(region); 091 if (regionNode == null) { 092 LOG.debug("Region {} is not in region states, it is very likely that it has been cleared by" 093 + " other procedures such as merge or split, so skip {}. See HBASE-28226", region, this); 094 return null; 095 } 096 regionNode.lock(); 097 try { 098 if (!regionNode.isInState(State.OPEN) || regionNode.isInTransition()) { 099 LOG.info("State of region {} is not OPEN or in transition. Skip {} ...", region, this); 100 return null; 101 } 102 ServerName targetServer = regionNode.getRegionLocation(); 103 if (targetServer == null) { 104 setTimeoutForSuspend(env, 105 String.format("target server of region %s is null", region.getRegionNameAsString())); 106 throw new ProcedureSuspendedException(); 107 } 108 ServerState serverState = regionStates.getServerNode(targetServer).getState(); 109 if (serverState != ServerState.ONLINE) { 110 setTimeoutForSuspend(env, String.format("target server of region %s %s is in state %s", 111 region.getRegionNameAsString(), targetServer, serverState)); 112 throw new ProcedureSuspendedException(); 113 } 114 try { 115 env.getRemoteDispatcher().addOperationToNode(targetServer, this); 116 dispatched = true; 117 event = new ProcedureEvent<>(this); 118 event.suspendIfNotReady(this); 119 throw new ProcedureSuspendedException(); 120 } catch (FailedRemoteDispatchException e) { 121 setTimeoutForSuspend(env, "Failed send request to " + targetServer); 122 throw new ProcedureSuspendedException(); 123 } 124 } finally { 125 regionNode.unlock(); 126 } 127 } 128 129 @Override 130 protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { 131 setState(ProcedureProtos.ProcedureState.RUNNABLE); 132 env.getProcedureScheduler().addFront(this); 133 return false; 134 } 135 136 @Override 137 protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { 138 throw new UnsupportedOperationException(); 139 } 140 141 @Override 142 protected boolean abort(MasterProcedureEnv env) { 143 return false; 144 } 145 146 @Override 147 public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, IOException e) { 148 complete(env, e); 149 } 150 151 @Override 152 public void remoteOperationCompleted(MasterProcedureEnv env) { 153 complete(env, null); 154 } 155 156 @Override 157 public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { 158 complete(env, error); 159 } 160 161 private void complete(MasterProcedureEnv env, Throwable error) { 162 if (isFinished()) { 163 LOG.info("This procedure {} is already finished, skip the rest processes", this.getProcId()); 164 return; 165 } 166 if (event == null) { 167 LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", 168 getProcId()); 169 return; 170 } 171 if (error == null) { 172 succ = true; 173 } 174 event.wake(env.getProcedureScheduler()); 175 event = null; 176 } 177 178 private void setTimeoutForSuspend(MasterProcedureEnv env, String reason) { 179 if (retryCounter == null) { 180 retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); 181 } 182 long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); 183 LOG.warn("{} can not run currently because {}, wait {} ms to retry", this, reason, backoff); 184 setTimeout(Math.toIntExact(backoff)); 185 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 186 skipPersistence(); 187 } 188 189 @Override 190 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 191 FlushRegionProcedureStateData.Builder builder = FlushRegionProcedureStateData.newBuilder(); 192 builder.setRegion(ProtobufUtil.toRegionInfo(region)); 193 if (columnFamilies != null) { 194 for (byte[] columnFamily : columnFamilies) { 195 if (columnFamily != null && columnFamily.length > 0) { 196 builder.addColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily)); 197 } 198 } 199 } 200 serializer.serialize(builder.build()); 201 } 202 203 @Override 204 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 205 FlushRegionProcedureStateData data = 206 serializer.deserialize(FlushRegionProcedureStateData.class); 207 this.region = ProtobufUtil.toRegionInfo(data.getRegion()); 208 if (data.getColumnFamilyCount() > 0) { 209 this.columnFamilies = data.getColumnFamilyList().stream().filter(cf -> !cf.isEmpty()) 210 .map(ByteString::toByteArray).collect(Collectors.toList()); 211 } 212 } 213 214 @Override 215 public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName serverName) { 216 FlushRegionParameter.Builder builder = FlushRegionParameter.newBuilder(); 217 builder.setRegion(ProtobufUtil.toRegionInfo(region)); 218 if (columnFamilies != null) { 219 for (byte[] columnFamily : columnFamilies) { 220 if (columnFamily != null && columnFamily.length > 0) { 221 builder.addColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily)); 222 } 223 } 224 } 225 return Optional 226 .of(new RSProcedureDispatcher.ServerOperation(this, getProcId(), FlushRegionCallable.class, 227 builder.build().toByteArray(), env.getMasterServices().getMasterActiveTime())); 228 } 229 230 @Override 231 public TableOperationType getTableOperationType() { 232 return TableOperationType.FLUSH; 233 } 234 235 @Override 236 protected boolean waitInitialized(MasterProcedureEnv env) { 237 return env.waitInitialized(this); 238 } 239 240 @Override 241 public TableName getTableName() { 242 return region.getTable(); 243 } 244}