001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master.assignment;
020
021import java.io.IOException;
022import java.util.concurrent.atomic.AtomicBoolean;
023import org.apache.hadoop.hbase.ServerName;
024import org.apache.hadoop.hbase.TableName;
025import org.apache.hadoop.hbase.client.RegionInfo;
026import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
027import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
028import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
029import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
030import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
031import org.apache.hadoop.hbase.procedure2.Procedure;
032import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
033import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
034import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
035import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
036import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
037import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
043
044import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
045import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
047
048/**
049 * Base class for the Assign and Unassign Procedure.
050 *
051 * Locking:
052 * Takes exclusive lock on the region being assigned/unassigned. Thus, there can only be one
053 * RegionTransitionProcedure per region running at a time (see MasterProcedureScheduler).
054 *
055 * <p>This procedure is asynchronous and responds to external events.
056 * The AssignmentManager will notify this procedure when the RS completes
057 * the operation and reports the transitioned state
058 * (see the Assign and Unassign class for more detail).</p>
059 *
060 * <p>Procedures move from the REGION_TRANSITION_QUEUE state when they are
061 * first submitted, to the REGION_TRANSITION_DISPATCH state when the request
062 * to remote server is sent and the Procedure is suspended waiting on external
063 * event to be woken again. Once the external event is triggered, Procedure
064 * moves to the REGION_TRANSITION_FINISH state.</p>
065 *
066 * <p>NOTE: {@link AssignProcedure} and {@link UnassignProcedure} should not be thought of
067 * as being asymmetric, at least currently.
068 * <ul>
069 * <li>{@link AssignProcedure} moves through all the above described states and implements methods
070 * associated with each while {@link UnassignProcedure} starts at state
071 * REGION_TRANSITION_DISPATCH and state REGION_TRANSITION_QUEUE is not supported.</li>
072 *
073 * <li>When any step in {@link AssignProcedure} fails, failure handler
074 * AssignProcedure#handleFailure(MasterProcedureEnv, RegionStateNode) re-attempts the
075 * assignment by setting the procedure state to REGION_TRANSITION_QUEUE and forces
076 * assignment to a different target server by setting {@link AssignProcedure#forceNewPlan}. When
077 * the number of attempts reaches threshold configuration 'hbase.assignment.maximum.attempts',
078 * the procedure is aborted. For {@link UnassignProcedure}, similar re-attempts are
079 * intentionally not implemented. It is a 'one shot' procedure. See its class doc for how it
080 * handles failure.
081 * </li>
082 * <li>If we find a region in an 'unexpected' state, we'll complain and retry with backoff forever.
083 * The 'unexpected' state needs to be fixed either by another running Procedure or by operator
084 * intervention (Regions in 'unexpected' state indicates bug or unexpected transition type).
085 * For this to work, subclasses need to persist the 'attempt' counter kept in this class when
086 * they do serializeStateData and restore it inside their deserializeStateData, just as they do
087 * for {@link #regionInfo}.
088 * </li>
089 * </ul>
090 * </p>
091 *
092 * <p>TODO: Considering it is a priority doing all we can to get make a region available as soon as
093 * possible, re-attempting with any target makes sense if specified target fails in case of
094 * {@link AssignProcedure}. For {@link UnassignProcedure}, our concern is preventing data loss
095 * on failed unassign. See class doc for explanation.
096 */
097@InterfaceAudience.Private
098public abstract class RegionTransitionProcedure
099    extends Procedure<MasterProcedureEnv>
100    implements TableProcedureInterface,
101      RemoteProcedure<MasterProcedureEnv, ServerName> {
102  private static final Logger LOG = LoggerFactory.getLogger(RegionTransitionProcedure.class);
103
104  protected final AtomicBoolean aborted = new AtomicBoolean(false);
105
106  private RegionTransitionState transitionState = RegionTransitionState.REGION_TRANSITION_QUEUE;
107  /**
108   * This data member must be persisted. Expectation is that it is done by subclasses in their
109   * {@link #serializeStateData(ProcedureStateSerializer)} call, restoring {@link #regionInfo}
110   * in their {@link #deserializeStateData(ProcedureStateSerializer)} method.
111   */
112  private RegionInfo regionInfo;
113
114  /**
115   * this data member must also be persisted.
116   * @see #regionInfo
117   */
118  private boolean override;
119
120  /**
121   * Like {@link #regionInfo}, the expectation is that subclasses persist the value of this
122   * data member. It is used doing backoff when Procedure gets stuck.
123   */
124  private int attempt;
125
126  // Required by the Procedure framework to create the procedure on replay
127  public RegionTransitionProcedure() {}
128
129  public RegionTransitionProcedure(final RegionInfo regionInfo, boolean override) {
130    this.regionInfo = regionInfo;
131    this.override = override;
132  }
133
134  @VisibleForTesting
135  public RegionInfo getRegionInfo() {
136    return regionInfo;
137  }
138
139  /**
140   * This setter is for subclasses to call in their
141   * {@link #deserializeStateData(ProcedureStateSerializer)} method. Expectation is that
142   * subclasses will persist `regioninfo` in their
143   * {@link #serializeStateData(ProcedureStateSerializer)} method and then restore `regionInfo` on
144   * deserialization by calling this.
145   */
146  protected void setRegionInfo(final RegionInfo regionInfo) {
147    this.regionInfo = regionInfo;
148  }
149
150  /**
151   * This setter is for subclasses to call in their
152   * {@link #deserializeStateData(ProcedureStateSerializer)} method. Expectation is that
153   * subclasses will persist `override` in their
154   * {@link #serializeStateData(ProcedureStateSerializer)} method and then restore `override` on
155   * deserialization by calling this.
156   */
157  protected void setOverride(boolean override) {
158    this.override = override;
159  }
160
161
162    /**
163     * This setter is for subclasses to call in their
164     * {@link #deserializeStateData(ProcedureStateSerializer)} method.
165     * @see #setRegionInfo(RegionInfo)
166     */
167  protected void setAttempt(int attempt) {
168    this.attempt = attempt;
169  }
170
171  protected int getAttempt() {
172    return this.attempt;
173  }
174
175  @Override
176  public TableName getTableName() {
177    RegionInfo hri = getRegionInfo();
178    return hri != null? hri.getTable(): null;
179  }
180
181  public boolean isMeta() {
182    return TableName.isMetaTableName(getTableName());
183  }
184
185  @Override
186  public void toStringClassDetails(final StringBuilder sb) {
187    sb.append(getClass().getSimpleName());
188    sb.append(" table=");
189    sb.append(getTableName());
190    sb.append(", region=");
191    sb.append(getRegionInfo() == null? null: getRegionInfo().getEncodedName());
192    if (isOverride()) {
193      // Only log if set.
194      sb.append(", override=");
195      sb.append(isOverride());
196    }
197  }
198
199  public RegionStateNode getRegionState(final MasterProcedureEnv env) {
200    return env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(getRegionInfo());
201  }
202
203  void setTransitionState(final RegionTransitionState state) {
204    this.transitionState = state;
205  }
206
207  protected RegionTransitionState getTransitionState() {
208    return transitionState;
209  }
210
211  protected abstract boolean startTransition(MasterProcedureEnv env, RegionStateNode regionNode)
212    throws IOException, ProcedureSuspendedException;
213
214  /**
215   * Called when the Procedure is in the REGION_TRANSITION_DISPATCH state.
216   * In here we do the RPC call to OPEN/CLOSE the region. The suspending of
217   * the thread so it sleeps until it gets update that the OPEN/CLOSE has
218   * succeeded is complicated. Read the implementations to learn more.
219   */
220  protected abstract boolean updateTransition(MasterProcedureEnv env, RegionStateNode regionNode)
221    throws IOException, ProcedureSuspendedException;
222
223  protected abstract void finishTransition(MasterProcedureEnv env, RegionStateNode regionNode)
224    throws IOException, ProcedureSuspendedException;
225
226  protected abstract void reportTransition(MasterProcedureEnv env,
227      RegionStateNode regionNode, TransitionCode code, long seqId) throws UnexpectedStateException;
228
229  @Override
230  public abstract RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName serverName);
231
232  /**
233   * @return True if processing of fail is complete; the procedure will be woken from its suspend
234   * and we'll go back to running through procedure steps:
235   * otherwise if false we leave the procedure in suspended state.
236   */
237  protected abstract boolean remoteCallFailed(MasterProcedureEnv env,
238      RegionStateNode regionNode, IOException exception);
239
240  @Override
241  public synchronized boolean remoteCallFailed(final MasterProcedureEnv env,
242      final ServerName serverName, final IOException exception) {
243    final RegionStateNode regionNode = getRegionState(env);
244    LOG.warn("Remote call failed {} {}", this, regionNode.toShortString(), exception);
245    if (remoteCallFailed(env, regionNode, exception)) {
246      // NOTE: This call to wakeEvent puts this Procedure back on the scheduler.
247      // Thereafter, another Worker can be in here so DO NOT MESS WITH STATE beyond
248      // this method. Just get out of this current processing quickly.
249      regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
250      return true;
251    }
252    // else leave the procedure in suspended state; it is waiting on another call to this callback
253    return false;
254  }
255
256  /**
257   * Be careful! At the end of this method, the procedure has either succeeded
258   * and this procedure has been set into a suspended state OR, we failed and
259   * this procedure has been put back on the scheduler ready for another worker
260   * to pick it up. In both cases, we need to exit the current Worker processing
261   * immediately!
262   * @return True if we successfully dispatched the call and false if we failed;
263   * if failed, we need to roll back any setup done for the dispatch.
264   */
265  protected boolean addToRemoteDispatcher(final MasterProcedureEnv env,
266      final ServerName targetServer) {
267    LOG.info("Dispatch {}", this);
268
269    // Put this procedure into suspended mode to wait on report of state change
270    // from remote regionserver. Means Procedure associated ProcedureEvent is marked not 'ready'.
271    getRegionState(env).getProcedureEvent().suspend();
272
273    // Tricky because the below call to addOperationToNode can fail. If it fails, we need to
274    // backtrack on stuff like the 'suspend' done above -- tricky as the 'wake' requests us -- and
275    // ditto up in the caller; it needs to undo state changes. Inside in remoteCallFailed, it does
276    // wake to undo the above suspend.
277    try {
278      env.getRemoteDispatcher().addOperationToNode(targetServer, this);
279    } catch (FailedRemoteDispatchException frde) {
280      remoteCallFailed(env, targetServer, frde);
281      return false;
282    }
283    return true;
284  }
285
286  @Override
287  public boolean storeInDispatchedQueue() {
288    return false;
289  }
290
291  protected void reportTransition(final MasterProcedureEnv env, final ServerName serverName,
292      final TransitionCode code, final long seqId) throws UnexpectedStateException {
293    final RegionStateNode regionNode = getRegionState(env);
294    if (LOG.isDebugEnabled()) {
295      LOG.debug("Received report " + code + " seqId=" + seqId + ", " +
296            this + "; " + regionNode.toShortString());
297    }
298    if (!serverName.equals(regionNode.getRegionLocation())) {
299      if (isMeta() && regionNode.getRegionLocation() == null) {
300        regionNode.setRegionLocation(serverName);
301      } else {
302        throw new UnexpectedStateException(String.format(
303          "Unexpected state=%s from server=%s; expected server=%s; %s; %s",
304          code, serverName, regionNode.getRegionLocation(),
305          this, regionNode.toShortString()));
306      }
307    }
308
309    reportTransition(env, regionNode, code, seqId);
310
311    // NOTE: This call adds this procedure back on the scheduler.
312    // This makes it so this procedure can run again. Another worker will take
313    // processing to the next stage. At an extreme, the other worker may run in
314    // parallel so DO  NOT CHANGE any state hereafter! This should be last thing
315    // done in this processing step.
316    regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
317  }
318
319  protected boolean isServerOnline(final MasterProcedureEnv env, final RegionStateNode regionNode) {
320    return isServerOnline(env, regionNode.getRegionLocation());
321  }
322
323  protected boolean isServerOnline(final MasterProcedureEnv env, final ServerName serverName) {
324    return env.getMasterServices().getServerManager().isServerOnline(serverName);
325  }
326
327  @Override
328  protected void toStringState(StringBuilder builder) {
329    super.toStringState(builder);
330    RegionTransitionState ts = this.transitionState;
331    if (!isFinished() && ts != null) {
332      builder.append(":").append(ts);
333    }
334  }
335
336  @Override
337  protected Procedure[] execute(final MasterProcedureEnv env) throws ProcedureSuspendedException {
338    final AssignmentManager am = env.getAssignmentManager();
339    final RegionStateNode regionNode = getRegionState(env);
340    if (!am.addRegionInTransition(regionNode, this)) {
341      if (this.isOverride()) {
342        LOG.info("{} owned by pid={}, OVERRIDDEN by 'this' (pid={}, override=true).",
343            regionNode.getRegionInfo().getEncodedName(),
344            regionNode.getProcedure().getProcId(), getProcId());
345        regionNode.unsetProcedure(regionNode.getProcedure());
346      } else {
347        String msg = String.format("%s owned by pid=%d, CANNOT run 'this' (pid=%d).",
348            regionNode.getRegionInfo().getEncodedName(),
349            regionNode.getProcedure().getProcId(), getProcId());
350        LOG.warn(msg);
351        setAbortFailure(getClass().getSimpleName(), msg);
352        return null;
353      }
354    }
355    try {
356      boolean retry;
357      do {
358        retry = false;
359        switch (transitionState) {
360          case REGION_TRANSITION_QUEUE:
361            // 1. push into the AM queue for balancer policy
362            if (!startTransition(env, regionNode)) {
363              // The operation figured it is done or it aborted; check getException()
364              am.removeRegionInTransition(getRegionState(env), this);
365              return null;
366            }
367            transitionState = RegionTransitionState.REGION_TRANSITION_DISPATCH;
368            if (regionNode.getProcedureEvent().suspendIfNotReady(this)) {
369              // Why this suspend? Because we want to ensure Store happens before proceed?
370              throw new ProcedureSuspendedException();
371            }
372            break;
373
374          case REGION_TRANSITION_DISPATCH:
375            // 2. send the request to the target server
376            if (!updateTransition(env, regionNode)) {
377              // The operation figured it is done or it aborted; check getException()
378              am.removeRegionInTransition(regionNode, this);
379              return null;
380            }
381            if (transitionState != RegionTransitionState.REGION_TRANSITION_DISPATCH) {
382              retry = true;
383              break;
384            }
385            if (regionNode.getProcedureEvent().suspendIfNotReady(this)) {
386              throw new ProcedureSuspendedException();
387            }
388            break;
389
390          case REGION_TRANSITION_FINISH:
391            // 3. wait assignment response. completion/failure
392            LOG.debug("Finishing {}; {}", this, regionNode.toShortString());
393            finishTransition(env, regionNode);
394            am.removeRegionInTransition(regionNode, this);
395            return null;
396        }
397      } while (retry);
398      // If here, success so clear out the attempt counter so we start fresh each time we get stuck.
399      this.attempt = 0;
400    } catch (IOException e) {
401      long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++);
402      LOG.warn("Failed transition, suspend {}secs {}; {}; waiting on rectified condition fixed " +
403              "by other Procedure or operator intervention", backoff / 1000, this,
404          regionNode.toShortString(), e);
405      setTimeout(Math.toIntExact(backoff));
406      setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
407      throw new ProcedureSuspendedException();
408    }
409
410    return new Procedure[] {this};
411  }
412
413  /**
414   * At end of timeout, wake ourselves up so we run again.
415   */
416  @Override
417  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
418    setState(ProcedureProtos.ProcedureState.RUNNABLE);
419    env.getProcedureScheduler().addFront(this);
420    return false; // 'false' means that this procedure handled the timeout
421  }
422
423  @Override
424  protected void rollback(final MasterProcedureEnv env) {
425    if (isRollbackSupported(transitionState)) {
426      // Nothing done up to this point. abort safely.
427      // This should happen when something like disableTable() is triggered.
428      env.getAssignmentManager().removeRegionInTransition(getRegionState(env), this);
429      return;
430    }
431
432    // There is no rollback for assignment unless we cancel the operation by
433    // dropping/disabling the table.
434    LOG.warn("Unhandled state {}; no rollback for assignment! Doing NOTHING!" +
435        " May need manual intervention; {}",
436        transitionState, this);
437  }
438
439  protected abstract boolean isRollbackSupported(final RegionTransitionState state);
440
441  @Override
442  protected boolean abort(final MasterProcedureEnv env) {
443    if (isRollbackSupported(transitionState)) {
444      aborted.set(true);
445      return true;
446    }
447    return false;
448  }
449
450  @Override
451  protected boolean waitInitialized(MasterProcedureEnv env) {
452    // Unless we are assigning meta, wait for meta to be available and loaded.
453    if (isMeta()) {
454      return false;
455    }
456    AssignmentManager am = env.getAssignmentManager();
457    return am.waitMetaLoaded(this) || am.waitMetaAssigned(this, regionInfo);
458  }
459
460  @Override
461  protected LockState acquireLock(final MasterProcedureEnv env) {
462    // TODO: Revisit this and move it to the executor
463    if (env.getProcedureScheduler().waitRegion(this, getRegionInfo())) {
464      try {
465        // Enable TRACE on this class to see lock dump. Can be really large when cluster is big
466        // or big tables being enabled/disabled.
467        if (LOG.isTraceEnabled()) {
468          LOG.trace("{} pid={} {}", LockState.LOCK_EVENT_WAIT, getProcId(),
469              env.getProcedureScheduler().dumpLocks());
470        }
471      } catch (IOException e) {
472        // ignore, just for logging
473      }
474      return LockState.LOCK_EVENT_WAIT;
475    }
476    return LockState.LOCK_ACQUIRED;
477  }
478
479  @Override
480  protected void releaseLock(final MasterProcedureEnv env) {
481    env.getProcedureScheduler().wakeRegion(this, getRegionInfo());
482  }
483
484  @Override
485  protected boolean holdLock(final MasterProcedureEnv env) {
486    return true;
487  }
488
489  @Override
490  protected boolean shouldWaitClientAck(MasterProcedureEnv env) {
491    // The operation is triggered internally on the server
492    // the client does not know about this procedure.
493    return false;
494  }
495
496  /**
497   * Used by ServerCrashProcedure to see if this Assign/Unassign needs processing.
498   * @return ServerName the Assign or Unassign is going against.
499   */
500  public abstract ServerName getServer(final MasterProcedureEnv env);
501
502  @Override
503  public void remoteOperationCompleted(MasterProcedureEnv env) {
504    // should not be called for region operation until we modified the open/close region procedure
505    throw new UnsupportedOperationException();
506  }
507
508  @Override
509  public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) {
510    // should not be called for region operation until we modified the open/close region procedure
511    throw new UnsupportedOperationException();
512  }
513
514  @Override
515  protected void bypass(MasterProcedureEnv env) {
516    // This override is just so I can write a note on how bypass is done in
517    // RTP. For RTP procedures -- i.e. assign/unassign -- if bypass is called,
518    // we intentionally do NOT cleanup our state. We leave a reference to the
519    // bypassed Procedure in the RegionStateNode. Doing this makes it so the
520    // RSN is in an odd state. The bypassed Procedure is finished but no one
521    // else can make progress on this RSN entity (see the #execute above where
522    // we check the RSN to see if an already registered procedure and if so,
523    // we exit without proceeding). This is done to intentionally block
524    // subsequent Procedures from running. Only a Procedure with the 'override' flag
525    // set can overwrite the RSN and make progress.
526    super.bypass(env);
527  }
528
529  boolean isOverride() {
530    return this.override;
531  }
532}