001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.master.assignment;
021
022import java.io.IOException;
023import java.util.concurrent.atomic.AtomicBoolean;
024
025import org.apache.hadoop.hbase.ServerName;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.client.RegionInfo;
028import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
029import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
030import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
031import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
032import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
033import org.apache.hadoop.hbase.procedure2.Procedure;
034import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
035import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
036import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
037import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
038import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
044
045import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
048
049/**
050 * Base class for the Assign and Unassign Procedure.
051 *
052 * Locking:
053 * Takes exclusive lock on the region being assigned/unassigned. Thus, there can only be one
054 * RegionTransitionProcedure per region running at a time (see MasterProcedureScheduler).
055 *
056 * <p>This procedure is asynchronous and responds to external events.
057 * The AssignmentManager will notify this procedure when the RS completes
058 * the operation and reports the transitioned state
059 * (see the Assign and Unassign class for more detail).</p>
060 *
061 * <p>Procedures move from the REGION_TRANSITION_QUEUE state when they are
062 * first submitted, to the REGION_TRANSITION_DISPATCH state when the request
063 * to remote server is sent and the Procedure is suspended waiting on external
064 * event to be woken again. Once the external event is triggered, Procedure
065 * moves to the REGION_TRANSITION_FINISH state.</p>
066 *
067 * <p>NOTE: {@link AssignProcedure} and {@link UnassignProcedure} should not be thought of
068 * as being asymmetric, at least currently.
069 * <ul>
070 * <li>{@link AssignProcedure} moves through all the above described states and implements methods
071 * associated with each while {@link UnassignProcedure} starts at state
072 * REGION_TRANSITION_DISPATCH and state REGION_TRANSITION_QUEUE is not supported.</li>
073 *
074 * <li>When any step in {@link AssignProcedure} fails, failure handler
075 * AssignProcedure#handleFailure(MasterProcedureEnv, RegionStateNode) re-attempts the
076 * assignment by setting the procedure state to REGION_TRANSITION_QUEUE and forces
077 * assignment to a different target server by setting {@link AssignProcedure#forceNewPlan}. When
078 * the number of attempts reaches threshold configuration 'hbase.assignment.maximum.attempts',
079 * the procedure is aborted. For {@link UnassignProcedure}, similar re-attempts are
080 * intentionally not implemented. It is a 'one shot' procedure. See its class doc for how it
081 * handles failure.
082 * </li>
083 * <li>If we find a region in an 'unexpected' state, we'll complain and retry with backoff forever.
084 * The 'unexpected' state needs to be fixed either by another running Procedure or by operator
085 * intervention (Regions in 'unexpected' state indicates bug or unexpected transition type).
086 * For this to work, subclasses need to persist the 'attempt' counter kept in this class when
087 * they do serializeStateData and restore it inside their deserializeStateData, just as they do
088 * for {@link #regionInfo}.
089 * </li>
090 * </ul>
091 * </p>
092 *
093 * <p>TODO: Considering it is a priority doing all we can to get make a region available as soon as
094 * possible, re-attempting with any target makes sense if specified target fails in case of
095 * {@link AssignProcedure}. For {@link UnassignProcedure}, our concern is preventing data loss
096 * on failed unassign. See class doc for explanation.
097 */
098@InterfaceAudience.Private
099public abstract class RegionTransitionProcedure
100    extends Procedure<MasterProcedureEnv>
101    implements TableProcedureInterface,
102      RemoteProcedure<MasterProcedureEnv, ServerName> {
103  private static final Logger LOG = LoggerFactory.getLogger(RegionTransitionProcedure.class);
104
105  protected final AtomicBoolean aborted = new AtomicBoolean(false);
106
107  private RegionTransitionState transitionState = RegionTransitionState.REGION_TRANSITION_QUEUE;
108  /**
109   * This data member must be persisted. Expectation is that it is done by subclasses in their
110   * {@link #serializeStateData(ProcedureStateSerializer)} call, restoring {@link #regionInfo}
111   * in their {@link #deserializeStateData(ProcedureStateSerializer)} method.
112   */
113  private RegionInfo regionInfo;
114
115  /**
116   * this data member must also be persisted.
117   * @see #regionInfo
118   */
119  private boolean override;
120
121  /**
122   * Like {@link #regionInfo}, the expectation is that subclasses persist the value of this
123   * data member. It is used doing backoff when Procedure gets stuck.
124   */
125  private int attempt;
126
127  // Required by the Procedure framework to create the procedure on replay
128  public RegionTransitionProcedure() {}
129
130  public RegionTransitionProcedure(final RegionInfo regionInfo, boolean override) {
131    this.regionInfo = regionInfo;
132    this.override = override;
133  }
134
135  @VisibleForTesting
136  public RegionInfo getRegionInfo() {
137    return regionInfo;
138  }
139
140  /**
141   * This setter is for subclasses to call in their
142   * {@link #deserializeStateData(ProcedureStateSerializer)} method. Expectation is that
143   * subclasses will persist `regioninfo` in their
144   * {@link #serializeStateData(ProcedureStateSerializer)} method and then restore `regionInfo` on
145   * deserialization by calling this.
146   */
147  protected void setRegionInfo(final RegionInfo regionInfo) {
148    this.regionInfo = regionInfo;
149  }
150
151  /**
152   * This setter is for subclasses to call in their
153   * {@link #deserializeStateData(ProcedureStateSerializer)} method. Expectation is that
154   * subclasses will persist `override` in their
155   * {@link #serializeStateData(ProcedureStateSerializer)} method and then restore `override` on
156   * deserialization by calling this.
157   */
158  protected void setOverride(boolean override) {
159    this.override = override;
160  }
161
162
163    /**
164     * This setter is for subclasses to call in their
165     * {@link #deserializeStateData(ProcedureStateSerializer)} method.
166     * @see #setRegionInfo(RegionInfo)
167     */
168  protected void setAttempt(int attempt) {
169    this.attempt = attempt;
170  }
171
172  protected int getAttempt() {
173    return this.attempt;
174  }
175
176  @Override
177  public TableName getTableName() {
178    RegionInfo hri = getRegionInfo();
179    return hri != null? hri.getTable(): null;
180  }
181
182  public boolean isMeta() {
183    return TableName.isMetaTableName(getTableName());
184  }
185
186  @Override
187  public void toStringClassDetails(final StringBuilder sb) {
188    sb.append(getClass().getSimpleName());
189    sb.append(" table=");
190    sb.append(getTableName());
191    sb.append(", region=");
192    sb.append(getRegionInfo() == null? null: getRegionInfo().getEncodedName());
193    if (isOverride()) {
194      // Only log if set.
195      sb.append(", override=");
196      sb.append(isOverride());
197    }
198  }
199
200  public RegionStateNode getRegionState(final MasterProcedureEnv env) {
201    return env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(getRegionInfo());
202  }
203
204  void setTransitionState(final RegionTransitionState state) {
205    this.transitionState = state;
206  }
207
208  protected RegionTransitionState getTransitionState() {
209    return transitionState;
210  }
211
212  protected abstract boolean startTransition(MasterProcedureEnv env, RegionStateNode regionNode)
213    throws IOException, ProcedureSuspendedException;
214
215  /**
216   * Called when the Procedure is in the REGION_TRANSITION_DISPATCH state.
217   * In here we do the RPC call to OPEN/CLOSE the region. The suspending of
218   * the thread so it sleeps until it gets update that the OPEN/CLOSE has
219   * succeeded is complicated. Read the implementations to learn more.
220   */
221  protected abstract boolean updateTransition(MasterProcedureEnv env, RegionStateNode regionNode)
222    throws IOException, ProcedureSuspendedException;
223
224  protected abstract void finishTransition(MasterProcedureEnv env, RegionStateNode regionNode)
225    throws IOException, ProcedureSuspendedException;
226
227  protected abstract void reportTransition(MasterProcedureEnv env,
228      RegionStateNode regionNode, TransitionCode code, long seqId) throws UnexpectedStateException;
229
230  @Override
231  public abstract RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName serverName);
232
233  /**
234   * @return True if processing of fail is complete; the procedure will be woken from its suspend
235   * and we'll go back to running through procedure steps:
236   * otherwise if false we leave the procedure in suspended state.
237   */
238  protected abstract boolean remoteCallFailed(MasterProcedureEnv env,
239      RegionStateNode regionNode, IOException exception);
240
241  @Override
242  public void remoteCallCompleted(final MasterProcedureEnv env,
243      final ServerName serverName, final RemoteOperation response) {
244    // Ignore the response? reportTransition() is the one that count?
245  }
246
247  @Override
248  public synchronized boolean remoteCallFailed(final MasterProcedureEnv env,
249      final ServerName serverName, final IOException exception) {
250    final RegionStateNode regionNode = getRegionState(env);
251    LOG.warn("Remote call failed {} {}", this, regionNode.toShortString(), exception);
252    if (remoteCallFailed(env, regionNode, exception)) {
253      // NOTE: This call to wakeEvent puts this Procedure back on the scheduler.
254      // Thereafter, another Worker can be in here so DO NOT MESS WITH STATE beyond
255      // this method. Just get out of this current processing quickly.
256      regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
257      return true;
258    }
259    return false;
260    // else leave the procedure in suspended state; it is waiting on another call to this callback
261  }
262
263  /**
264   * Be careful! At the end of this method, the procedure has either succeeded
265   * and this procedure has been set into a suspended state OR, we failed and
266   * this procedure has been put back on the scheduler ready for another worker
267   * to pick it up. In both cases, we need to exit the current Worker processing
268   * immediately!
269   * @return True if we successfully dispatched the call and false if we failed;
270   * if failed, we need to roll back any setup done for the dispatch.
271   */
272  protected boolean addToRemoteDispatcher(final MasterProcedureEnv env,
273      final ServerName targetServer) {
274    LOG.info("Dispatch {}", this);
275
276    // Put this procedure into suspended mode to wait on report of state change
277    // from remote regionserver. Means Procedure associated ProcedureEvent is marked not 'ready'.
278    getRegionState(env).getProcedureEvent().suspend();
279
280    // Tricky because the below call to addOperationToNode can fail. If it fails, we need to
281    // backtrack on stuff like the 'suspend' done above -- tricky as the 'wake' requests us -- and
282    // ditto up in the caller; it needs to undo state changes. Inside in remoteCallFailed, it does
283    // wake to undo the above suspend.
284    try {
285      env.getRemoteDispatcher().addOperationToNode(targetServer, this);
286    } catch (FailedRemoteDispatchException frde) {
287      remoteCallFailed(env, targetServer, frde);
288      return false;
289    }
290    return true;
291  }
292
293  protected void reportTransition(final MasterProcedureEnv env, final ServerName serverName,
294      final TransitionCode code, final long seqId) throws UnexpectedStateException {
295    final RegionStateNode regionNode = getRegionState(env);
296    if (LOG.isDebugEnabled()) {
297      LOG.debug("Received report " + code + " seqId=" + seqId + ", " +
298            this + "; " + regionNode.toShortString());
299    }
300    if (!serverName.equals(regionNode.getRegionLocation())) {
301      if (isMeta() && regionNode.getRegionLocation() == null) {
302        regionNode.setRegionLocation(serverName);
303      } else {
304        throw new UnexpectedStateException(String.format(
305          "Unexpected state=%s from server=%s; expected server=%s; %s; %s",
306          code, serverName, regionNode.getRegionLocation(),
307          this, regionNode.toShortString()));
308      }
309    }
310
311    reportTransition(env, regionNode, code, seqId);
312
313    // NOTE: This call adds this procedure back on the scheduler.
314    // This makes it so this procedure can run again. Another worker will take
315    // processing to the next stage. At an extreme, the other worker may run in
316    // parallel so DO  NOT CHANGE any state hereafter! This should be last thing
317    // done in this processing step.
318    regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
319  }
320
321  protected boolean isServerOnline(final MasterProcedureEnv env, final RegionStateNode regionNode) {
322    return isServerOnline(env, regionNode.getRegionLocation());
323  }
324
325  protected boolean isServerOnline(final MasterProcedureEnv env, final ServerName serverName) {
326    return env.getMasterServices().getServerManager().isServerOnline(serverName);
327  }
328
329  @Override
330  protected void toStringState(StringBuilder builder) {
331    super.toStringState(builder);
332    RegionTransitionState ts = this.transitionState;
333    if (!isFinished() && ts != null) {
334      builder.append(":").append(ts);
335    }
336  }
337
338  @Override
339  protected Procedure[] execute(final MasterProcedureEnv env) throws ProcedureSuspendedException {
340    final AssignmentManager am = env.getAssignmentManager();
341    final RegionStateNode regionNode = getRegionState(env);
342    if (!am.addRegionInTransition(regionNode, this)) {
343      if (this.isOverride()) {
344        LOG.info("{} owned by pid={}, OVERRIDDEN by 'this' (pid={}, override=true).",
345            regionNode.getRegionInfo().getEncodedName(),
346            regionNode.getProcedure().getProcId(), getProcId());
347        regionNode.unsetProcedure(regionNode.getProcedure());
348      } else {
349        String msg = String.format("%s owned by pid=%d, CANNOT run 'this' (pid=%d).",
350            regionNode.getRegionInfo().getEncodedName(),
351            regionNode.getProcedure().getProcId(), getProcId());
352        LOG.warn(msg);
353        setAbortFailure(getClass().getSimpleName(), msg);
354        return null;
355      }
356    }
357    try {
358      boolean retry;
359      do {
360        retry = false;
361        switch (transitionState) {
362          case REGION_TRANSITION_QUEUE:
363            // 1. push into the AM queue for balancer policy
364            if (!startTransition(env, regionNode)) {
365              // The operation figured it is done or it aborted; check getException()
366              am.removeRegionInTransition(getRegionState(env), this);
367              return null;
368            }
369            transitionState = RegionTransitionState.REGION_TRANSITION_DISPATCH;
370            if (regionNode.getProcedureEvent().suspendIfNotReady(this)) {
371              // Why this suspend? Because we want to ensure Store happens before proceed?
372              throw new ProcedureSuspendedException();
373            }
374            break;
375
376          case REGION_TRANSITION_DISPATCH:
377            // 2. send the request to the target server
378            if (!updateTransition(env, regionNode)) {
379              // The operation figured it is done or it aborted; check getException()
380              am.removeRegionInTransition(regionNode, this);
381              return null;
382            }
383            if (transitionState != RegionTransitionState.REGION_TRANSITION_DISPATCH) {
384              retry = true;
385              break;
386            }
387            if (regionNode.getProcedureEvent().suspendIfNotReady(this)) {
388              throw new ProcedureSuspendedException();
389            }
390            break;
391
392          case REGION_TRANSITION_FINISH:
393            // 3. wait assignment response. completion/failure
394            LOG.debug("Finishing {}; {}", this, regionNode.toShortString());
395            finishTransition(env, regionNode);
396            am.removeRegionInTransition(regionNode, this);
397            return null;
398        }
399      } while (retry);
400      // If here, success so clear out the attempt counter so we start fresh each time we get stuck.
401      this.attempt = 0;
402    } catch (IOException e) {
403      long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++);
404      LOG.warn("Failed transition, suspend {}secs {}; {}; waiting on rectified condition fixed " +
405              "by other Procedure or operator intervention", backoff / 1000, this,
406          regionNode.toShortString(), e);
407      setTimeout(Math.toIntExact(backoff));
408      setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
409      throw new ProcedureSuspendedException();
410    }
411
412    return new Procedure[] {this};
413  }
414
415  /**
416   * At end of timeout, wake ourselves up so we run again.
417   */
418  @Override
419  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
420    setState(ProcedureProtos.ProcedureState.RUNNABLE);
421    env.getProcedureScheduler().addFront(this);
422    return false; // 'false' means that this procedure handled the timeout
423  }
424
425  @Override
426  protected void rollback(final MasterProcedureEnv env) {
427    if (isRollbackSupported(transitionState)) {
428      // Nothing done up to this point. abort safely.
429      // This should happen when something like disableTable() is triggered.
430      env.getAssignmentManager().removeRegionInTransition(getRegionState(env), this);
431      return;
432    }
433
434    // There is no rollback for assignment unless we cancel the operation by
435    // dropping/disabling the table.
436    LOG.warn("Unhandled state {}; no rollback for assignment! Doing NOTHING!" +
437        " May need manual intervention; {}",
438        transitionState, this);
439  }
440
441  protected abstract boolean isRollbackSupported(final RegionTransitionState state);
442
443  @Override
444  protected boolean abort(final MasterProcedureEnv env) {
445    if (isRollbackSupported(transitionState)) {
446      aborted.set(true);
447      return true;
448    }
449    return false;
450  }
451
452  @Override
453  protected boolean waitInitialized(MasterProcedureEnv env) {
454    // Unless we are assigning meta, wait for meta to be available and loaded.
455    if (isMeta()) {
456      return false;
457    }
458    AssignmentManager am = env.getAssignmentManager();
459    return am.waitMetaLoaded(this) || am.waitMetaAssigned(this, regionInfo);
460  }
461
462  @Override
463  protected LockState acquireLock(final MasterProcedureEnv env) {
464    // TODO: Revisit this and move it to the executor
465    if (env.getProcedureScheduler().waitRegion(this, getRegionInfo())) {
466      try {
467        // Enable TRACE on this class to see lock dump. Can be really large when cluster is big
468        // or big tables being enabled/disabled.
469        if (LOG.isTraceEnabled()) {
470          LOG.trace("{} pid={} {}", LockState.LOCK_EVENT_WAIT, getProcId(),
471              env.getProcedureScheduler().dumpLocks());
472        }
473      } catch (IOException e) {
474        // ignore, just for logging
475      }
476      return LockState.LOCK_EVENT_WAIT;
477    }
478    return LockState.LOCK_ACQUIRED;
479  }
480
481  @Override
482  protected void releaseLock(final MasterProcedureEnv env) {
483    env.getProcedureScheduler().wakeRegion(this, getRegionInfo());
484  }
485
486  @Override
487  protected boolean holdLock(final MasterProcedureEnv env) {
488    return true;
489  }
490
491  @Override
492  protected boolean shouldWaitClientAck(MasterProcedureEnv env) {
493    // The operation is triggered internally on the server
494    // the client does not know about this procedure.
495    return false;
496  }
497
498  /**
499   * Used by ServerCrashProcedure to see if this Assign/Unassign needs processing.
500   * @return ServerName the Assign or Unassign is going against.
501   */
502  public abstract ServerName getServer(final MasterProcedureEnv env);
503
504  @Override
505  protected void bypass(MasterProcedureEnv env) {
506    // This override is just so I can write a note on how bypass is done in
507    // RTP. For RTP procedures -- i.e. assign/unassign -- if bypass is called,
508    // we intentionally do NOT cleanup our state. We leave a reference to the
509    // bypassed Procedure in the RegionStateNode. Doing this makes it so the
510    // RSN is in an odd state. The bypassed Procedure is finished but no one
511    // else can make progress on this RSN entity (see the #execute above where
512    // we check the RSN to see if an already registered procedure and if so,
513    // we exit without proceeding). This is done to intentionally block
514    // subsequent Procedures from running. Only a Procedure with the 'override' flag
515    // set can overwrite the RSN and make progress.
516    super.bypass(env);
517  }
518
519  boolean isOverride() {
520    return this.override;
521  }
522}