001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import java.io.IOException;
021import java.util.Optional;
022import java.util.concurrent.CompletableFuture;
023import org.apache.hadoop.hbase.HConstants;
024import org.apache.hadoop.hbase.ServerName;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.RegionInfo;
027import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
028import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
029import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
030import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
031import org.apache.hadoop.hbase.procedure2.Procedure;
032import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
033import org.apache.hadoop.hbase.procedure2.ProcedureFutureUtil;
034import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
035import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
036import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
037import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
038import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
039import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
040import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
041import org.apache.hadoop.hbase.util.RetryCounter;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionRemoteProcedureBaseState;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionRemoteProcedureBaseStateData;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
052
053/**
054 * The base class for the remote procedures used to open/close a region.
055 * <p/>
056 * Notice that here we do not care about the result of the remote call, if the remote call is
057 * finished, either succeeded or not, we will always finish the procedure. The parent procedure
058 * should take care of the result and try to reschedule if the result is not good.
059 */
060@InterfaceAudience.Private
061public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedureEnv>
062  implements TableProcedureInterface, RemoteProcedure<MasterProcedureEnv, ServerName> {
063
064  private static final Logger LOG = LoggerFactory.getLogger(RegionRemoteProcedureBase.class);
065
066  protected RegionInfo region;
067
068  protected ServerName targetServer;
069
070  private RegionRemoteProcedureBaseState state =
071    RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH;
072
073  private TransitionCode transitionCode;
074
075  private long seqId;
076
077  private RetryCounter retryCounter;
078
079  private CompletableFuture<Void> future;
080
081  protected RegionRemoteProcedureBase() {
082  }
083
084  protected RegionRemoteProcedureBase(TransitRegionStateProcedure parent, RegionInfo region,
085    ServerName targetServer) {
086    this.region = region;
087    this.targetServer = targetServer;
088    parent.attachRemoteProc(this);
089  }
090
091  @Override
092  public Optional<RemoteProcedureDispatcher.RemoteOperation> remoteCallBuild(MasterProcedureEnv env,
093    ServerName remote) {
094    // REPORT_SUCCEED means that this remote open/close request already executed in RegionServer.
095    // So return empty operation and RSProcedureDispatcher no need to send it again.
096    if (state == RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED) {
097      return Optional.empty();
098    }
099    return Optional.of(newRemoteOperation(env));
100  }
101
102  protected abstract RemoteProcedureDispatcher.RemoteOperation
103    newRemoteOperation(MasterProcedureEnv env);
104
105  @Override
106  public void remoteOperationCompleted(MasterProcedureEnv env, byte[] remoteResultData) {
107    // should not be called since we use reportRegionStateTransition to report the result
108    throw new UnsupportedOperationException();
109  }
110
111  @Override
112  public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) {
113    // should not be called since we use reportRegionStateTransition to report the result
114    throw new UnsupportedOperationException();
115  }
116
117  private RegionStateNode getRegionNode(MasterProcedureEnv env) {
118    return env.getAssignmentManager().getRegionStates().getRegionStateNode(region);
119  }
120
121  @Override
122  public void remoteCallFailed(MasterProcedureEnv env, ServerName remote, IOException exception) {
123    RegionStateNode regionNode = getRegionNode(env);
124    regionNode.lock();
125    try {
126      if (!env.getMasterServices().getServerManager().isServerOnline(remote)) {
127        // the SCP will interrupt us, give up
128        LOG.debug("{} for region {}, targetServer {} is dead, SCP will interrupt us, give up", this,
129          regionNode, remote);
130        return;
131      }
132      if (state != RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH) {
133        // not sure how can this happen but anyway let's add a check here to avoid waking the wrong
134        // procedure...
135        LOG.warn("{} for region {}, targetServer={} has already been woken up, ignore", this,
136          regionNode, remote);
137        return;
138      }
139      LOG.warn("The remote operation {} for region {} to server {} failed", this, regionNode,
140        remote, exception);
141      // It is OK to not persist the state here, as we do not need to change the region state if the
142      // remote call is failed. If the master crashed before we actually execute the procedure and
143      // persist the new state, it is fine to retry on the same target server again.
144      state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH_FAIL;
145      regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
146    } finally {
147      regionNode.unlock();
148    }
149  }
150
151  @Override
152  public TableName getTableName() {
153    return region.getTable();
154  }
155
156  @Override
157  protected boolean waitInitialized(MasterProcedureEnv env) {
158    if (isCriticalSystemTable()) {
159      return false;
160    }
161    if (TableName.isMetaTableName(getTableName())) {
162      return false;
163    }
164    // First we need meta to be loaded, and second, if meta is not online then we will likely to
165    // fail when updating meta so we wait until it is assigned.
166    AssignmentManager am = env.getAssignmentManager();
167    return am.waitMetaLoaded(this) || am.waitMetaAssigned(this, region);
168  }
169
170  @Override
171  protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
172    throw new UnsupportedOperationException();
173  }
174
175  @Override
176  protected boolean abort(MasterProcedureEnv env) {
177    return false;
178  }
179
180  // do some checks to see if the report is valid
181  protected abstract void checkTransition(RegionStateNode regionNode, TransitionCode transitionCode,
182    long seqId) throws UnexpectedStateException;
183
184  // change the in memory state of the regionNode, but do not update meta.
185  protected abstract void updateTransitionWithoutPersistingToMeta(MasterProcedureEnv env,
186    RegionStateNode regionNode, TransitionCode transitionCode, long seqId) throws IOException;
187
188  // A bit strange but the procedure store will throw RuntimeException if we can not persist the
189  // state, so upper layer should take care of this...
190  private void persistAndWake(MasterProcedureEnv env, RegionStateNode regionNode) {
191    // The synchronization here is to guard with ProcedureExecutor.executeRollback, as here we will
192    // not hold the procedure execution lock, but we should not persist a procedure in ROLLEDBACK
193    // state to the procedure store.
194    // The ProcedureStore.update must be inside the lock, so here the check for procedure state and
195    // update could be atomic. In ProcedureExecutor.cleanupAfterRollbackOneStep, we will set the
196    // state to ROLLEDBACK, which will hold the same lock too as the Procedure.setState method is
197    // synchronized. This is the key to keep us safe.
198    synchronized (this) {
199      if (getState() == ProcedureState.ROLLEDBACK) {
200        LOG.warn("Procedure {} has already been rolled back, skip persistent", this);
201        return;
202      }
203      env.getMasterServices().getMasterProcedureExecutor().getStore().update(this);
204    }
205    regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
206  }
207
208  // should be called with RegionStateNode locked, to avoid race with the execute method below
209  void reportTransition(MasterProcedureEnv env, RegionStateNode regionNode, ServerName serverName,
210    TransitionCode transitionCode, long seqId) throws IOException {
211    if (state != RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH) {
212      // should be a retry
213      return;
214    }
215    if (!targetServer.equals(serverName)) {
216      throw new UnexpectedStateException("Received report from " + serverName + ", expected "
217        + targetServer + ", " + regionNode + ", proc=" + this);
218    }
219    checkTransition(regionNode, transitionCode, seqId);
220    // this state means we have received the report from RS, does not mean the result is fine, as we
221    // may received a FAILED_OPEN.
222    this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED;
223    this.transitionCode = transitionCode;
224    this.seqId = seqId;
225    // Persist the transition code and openSeqNum(if provided).
226    // We should not update the hbase:meta directly as this may cause races when master restarts,
227    // as the old active master may incorrectly report back to RS and cause the new master to hang
228    // on a OpenRegionProcedure forever. See HBASE-22060 and HBASE-22074 for more details.
229    boolean succ = false;
230    try {
231      persistAndWake(env, regionNode);
232      succ = true;
233    } finally {
234      if (!succ) {
235        this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH;
236        this.transitionCode = null;
237        this.seqId = HConstants.NO_SEQNUM;
238      }
239    }
240    try {
241      updateTransitionWithoutPersistingToMeta(env, regionNode, transitionCode, seqId);
242    } catch (IOException e) {
243      throw new AssertionError("should not happen", e);
244    }
245  }
246
247  void serverCrashed(MasterProcedureEnv env, RegionStateNode regionNode, ServerName serverName) {
248    if (state == RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_SERVER_CRASH) {
249      // should be a retry
250      return;
251    }
252    RegionRemoteProcedureBaseState oldState = state;
253    // it is possible that the state is in REGION_REMOTE_PROCEDURE_SERVER_CRASH, think of this
254    // sequence
255    // 1. region is open on the target server and the above reportTransition call is succeeded
256    // 2. before we are woken up and update the meta, the target server crashes, and then we arrive
257    // here
258    this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_SERVER_CRASH;
259    boolean succ = false;
260    try {
261      persistAndWake(env, regionNode);
262      succ = true;
263    } finally {
264      if (!succ) {
265        this.state = oldState;
266      }
267    }
268  }
269
270  protected abstract void restoreSucceedState(AssignmentManager am, RegionStateNode regionNode,
271    long seqId) throws IOException;
272
273  void stateLoaded(AssignmentManager am, RegionStateNode regionNode) {
274    if (state == RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED) {
275      try {
276        restoreSucceedState(am, regionNode, seqId);
277      } catch (IOException e) {
278        // should not happen as we are just restoring the state
279        throw new AssertionError(e);
280      }
281    }
282  }
283
284  private TransitRegionStateProcedure getParent(MasterProcedureEnv env) {
285    return (TransitRegionStateProcedure) env.getMasterServices().getMasterProcedureExecutor()
286      .getProcedure(getParentProcId());
287  }
288
289  private void unattach(MasterProcedureEnv env) {
290    getParent(env).unattachRemoteProc(this);
291  }
292
293  private CompletableFuture<Void> getFuture() {
294    return future;
295  }
296
297  private void setFuture(CompletableFuture<Void> f) {
298    future = f;
299  }
300
301  @Override
302  protected void beforeExec(MasterProcedureEnv env) throws ProcedureSuspendedException {
303    RegionStateNode regionNode = getRegionNode(env);
304    if (!regionNode.isLockedBy(this)) {
305      // The wake up action will be called under the lock inside RegionStateNode for implementing
306      // RegionStateNodeLock, so if we call ProcedureUtil.wakeUp where we will acquire the procedure
307      // execution lock directly, it may cause dead lock since in normal case procedure execution
308      // case, we will acquire the procedure execution lock first and then acquire the lock inside
309      // RegionStateNodeLock. This is the reason why we need to schedule the task to a thread pool
310      // and execute asynchronously.
311      regionNode.lock(this,
312        () -> env.getAsyncTaskExecutor().execute(() -> ProcedureFutureUtil.wakeUp(this, env)));
313    }
314  }
315
316  @Override
317  protected void afterExec(MasterProcedureEnv env) {
318    // only release the lock if there is no pending updating meta operation
319    if (future == null) {
320      RegionStateNode regionNode = getRegionNode(env);
321      // in beforeExec, we may throw ProcedureSuspendedException which means we do not get the lock,
322      // in this case we should not call unlock
323      if (regionNode.isLockedBy(this)) {
324        regionNode.unlock(this);
325      }
326    }
327  }
328
329  @Override
330  protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
331    throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
332    RegionStateNode regionNode = getRegionNode(env);
333    try {
334      switch (state) {
335        case REGION_REMOTE_PROCEDURE_DISPATCH: {
336          // The code which wakes us up also needs to lock the RSN so here we do not need to
337          // synchronize on the event.
338          ProcedureEvent<?> event = regionNode.getProcedureEvent();
339          try {
340            env.getRemoteDispatcher().addOperationToNode(targetServer, this);
341          } catch (FailedRemoteDispatchException e) {
342            LOG.warn("Can not add remote operation {} for region {} to server {}, this usually "
343              + "because the server is alread dead, give up and mark the procedure as complete, "
344              + "the parent procedure will take care of this.", this, region, targetServer, e);
345            unattach(env);
346            return null;
347          }
348          event.suspend();
349          event.suspendIfNotReady(this);
350          throw new ProcedureSuspendedException();
351        }
352        case REGION_REMOTE_PROCEDURE_REPORT_SUCCEED:
353          if (
354            ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture,
355              () -> unattach(env))
356          ) {
357            return null;
358          }
359          ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture,
360            env.getAssignmentManager().persistToMeta(regionNode), env, () -> unattach(env));
361          return null;
362        case REGION_REMOTE_PROCEDURE_DISPATCH_FAIL:
363          // the remote call is failed so we do not need to change the region state, just return.
364          unattach(env);
365          return null;
366        case REGION_REMOTE_PROCEDURE_SERVER_CRASH:
367          if (
368            ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture,
369              () -> unattach(env))
370          ) {
371            return null;
372          }
373          ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture,
374            env.getAssignmentManager().regionClosedAbnormally(regionNode), env,
375            () -> unattach(env));
376          return null;
377        default:
378          throw new IllegalStateException("Unknown state: " + state);
379      }
380    } catch (IOException e) {
381      if (retryCounter == null) {
382        retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
383      }
384      long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
385      LOG.warn("Failed updating meta, suspend {}secs {}; {};", backoff / 1000, this, regionNode, e);
386      throw suspend(Math.toIntExact(backoff), true);
387    }
388  }
389
390  @Override
391  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
392    setState(ProcedureProtos.ProcedureState.RUNNABLE);
393    env.getProcedureScheduler().addFront(this);
394    return false; // 'false' means that this procedure handled the timeout
395  }
396
397  @Override
398  public boolean storeInDispatchedQueue() {
399    return false;
400  }
401
402  @Override
403  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
404    RegionRemoteProcedureBaseStateData.Builder builder =
405      RegionRemoteProcedureBaseStateData.newBuilder().setRegion(ProtobufUtil.toRegionInfo(region))
406        .setTargetServer(ProtobufUtil.toServerName(targetServer)).setState(state);
407    if (transitionCode != null) {
408      builder.setTransitionCode(transitionCode);
409      builder.setSeqId(seqId);
410    }
411    serializer.serialize(builder.build());
412  }
413
414  @Override
415  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
416    RegionRemoteProcedureBaseStateData data =
417      serializer.deserialize(RegionRemoteProcedureBaseStateData.class);
418    region = ProtobufUtil.toRegionInfo(data.getRegion());
419    targetServer = ProtobufUtil.toServerName(data.getTargetServer());
420    // 'state' may not be present if we are reading an 'old' form of this pb Message.
421    if (data.hasState()) {
422      state = data.getState();
423    }
424    if (data.hasTransitionCode()) {
425      transitionCode = data.getTransitionCode();
426      seqId = data.getSeqId();
427    }
428  }
429
430  @Override
431  protected void afterReplay(MasterProcedureEnv env) {
432    getParent(env).attachRemoteProc(this);
433  }
434
435  @Override
436  public String getProcName() {
437    return getClass().getSimpleName() + " " + region.getEncodedName();
438  }
439
440  @Override
441  protected void toStringClassDetails(StringBuilder builder) {
442    builder.append(getProcName());
443    if (targetServer != null) {
444      builder.append(", server=");
445      builder.append(this.targetServer);
446    }
447    if (this.retryCounter != null) {
448      builder.append(", retry=");
449      builder.append(this.retryCounter);
450    }
451  }
452}