001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import org.apache.hadoop.hbase.DoNotRetryIOException;
023import org.apache.hadoop.hbase.TableName;
024import org.apache.hadoop.hbase.TableNotFoundException;
025import org.apache.hadoop.hbase.client.TableDescriptor;
026import org.apache.hadoop.hbase.client.TableState;
027import org.apache.hadoop.hbase.master.TableStateManager;
028import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
029import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
030import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
031import org.apache.hadoop.hbase.procedure2.Procedure;
032import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
033import org.apache.hadoop.hbase.replication.ReplicationException;
034import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
040
041/**
042 * The base class for all replication peer related procedure except sync replication state
043 * transition.
044 */
045@InterfaceAudience.Private
046public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModificationState> {
047
048  private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class);
049
050  protected ModifyPeerProcedure() {
051  }
052
053  protected ModifyPeerProcedure(String peerId) {
054    super(peerId);
055  }
056
057  /**
058   * Called before we start the actual processing. The implementation should call the pre CP hook,
059   * and also the pre-check for the peer modification.
060   * <p>
061   * If an IOException is thrown then we will give up and mark the procedure as failed directly. If
062   * all checks passes then the procedure can not be rolled back any more.
063   */
064  protected abstract void prePeerModification(MasterProcedureEnv env)
065    throws IOException, ReplicationException, ProcedureSuspendedException;
066
067  protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException;
068
069  /**
070   * Called before we finish the procedure. The implementation can do some logging work, and also
071   * call the coprocessor hook if any.
072   * <p>
073   * Notice that, since we have already done the actual work, throwing {@code IOException} here will
074   * not fail this procedure, we will just ignore it and finish the procedure as suceeded. If
075   * {@code ReplicationException} is thrown we will retry since this usually means we fails to
076   * update the peer storage.
077   */
078  protected abstract void postPeerModification(MasterProcedureEnv env)
079    throws IOException, ReplicationException, ProcedureSuspendedException;
080
081  protected void releaseLatch(MasterProcedureEnv env) {
082    ProcedurePrepareLatch.releaseLatch(latch, this);
083  }
084
085  /**
086   * Implementation class can override this method. By default we will jump to
087   * POST_PEER_MODIFICATION and finish the procedure.
088   */
089  protected PeerModificationState nextStateAfterRefresh() {
090    return PeerModificationState.POST_PEER_MODIFICATION;
091  }
092
093  /**
094   * The implementation class should override this method if the procedure may enter the serial
095   * related states.
096   */
097  protected boolean enablePeerBeforeFinish() {
098    throw new UnsupportedOperationException();
099  }
100
101  protected ReplicationPeerConfig getOldPeerConfig() {
102    return null;
103  }
104
105  protected ReplicationPeerConfig getNewPeerConfig() {
106    throw new UnsupportedOperationException();
107  }
108
109  protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env)
110    throws IOException, ReplicationException {
111    throw new UnsupportedOperationException();
112  }
113
114  // If the table is in enabling state, we need to wait until it is enabled and then reopen all its
115  // regions.
116  private boolean needReopen(TableStateManager tsm, TableName tn) throws IOException {
117    for (;;) {
118      try {
119        TableState state = tsm.getTableState(tn);
120        if (state.isEnabled()) {
121          return true;
122        }
123        if (!state.isEnabling()) {
124          return false;
125        }
126        Thread.sleep(SLEEP_INTERVAL_MS);
127      } catch (TableNotFoundException e) {
128        return false;
129      } catch (InterruptedException e) {
130        throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
131      }
132    }
133  }
134
135  // will be override in test to simulate error
136  protected void reopenRegions(MasterProcedureEnv env) throws IOException {
137    ReplicationPeerConfig peerConfig = getNewPeerConfig();
138    ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
139    TableStateManager tsm = env.getMasterServices().getTableStateManager();
140    for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
141      if (!td.hasGlobalReplicationScope()) {
142        continue;
143      }
144      TableName tn = td.getTableName();
145      if (!peerConfig.needToReplicate(tn)) {
146        continue;
147      }
148      if (oldPeerConfig != null && oldPeerConfig.isSerial() && oldPeerConfig.needToReplicate(tn)) {
149        continue;
150      }
151      if (needReopen(tsm, tn)) {
152        addChildProcedure(new ReopenTableRegionsProcedure(tn));
153      }
154    }
155  }
156
157  private boolean shouldFailForMigrating(MasterProcedureEnv env) throws IOException {
158    long parentProcId = getParentProcId();
159    if (
160      parentProcId != Procedure.NO_PROC_ID && env.getMasterServices().getMasterProcedureExecutor()
161        .getProcedure(parentProcId) instanceof MigrateReplicationQueueFromZkToTableProcedure
162    ) {
163      // this is scheduled by MigrateReplicationQueueFromZkToTableProcedure, should not fail it
164      return false;
165    }
166    return env.getMasterServices().getProcedures().stream()
167      .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure)
168      .anyMatch(p -> !p.isFinished());
169  }
170
171  @Override
172  protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
173    throws ProcedureSuspendedException, InterruptedException {
174    switch (state) {
175      case PRE_PEER_MODIFICATION:
176        try {
177          if (shouldFailForMigrating(env)) {
178            LOG.info("There is a pending {}, give up execution of {}",
179              MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName(),
180              getClass().getName());
181            setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer",
182              new DoNotRetryIOException("There is a pending "
183                + MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName()));
184            releaseLatch(env);
185            return Flow.NO_MORE_STATE;
186          }
187          checkPeerModificationEnabled(env);
188          prePeerModification(env);
189        } catch (IOException e) {
190          LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, "
191            + "mark the procedure as failure and give up", getClass().getName(), peerId, e);
192          setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e);
193          releaseLatch(env);
194          return Flow.NO_MORE_STATE;
195        } catch (ReplicationException e) {
196          throw suspend(env.getMasterConfiguration(),
197            backoff -> LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs",
198              getClass().getName(), peerId, backoff / 1000, e));
199        }
200        resetRetry();
201        setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
202        return Flow.HAS_MORE_STATE;
203      case UPDATE_PEER_STORAGE:
204        try {
205          updatePeerStorage(env);
206        } catch (ReplicationException e) {
207          throw suspend(env.getMasterConfiguration(),
208            backoff -> LOG.warn("{} update peer storage for peer {} failed, sleep {} secs",
209              getClass().getName(), peerId, backoff / 1000, e));
210        }
211        resetRetry();
212        setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
213        return Flow.HAS_MORE_STATE;
214      case REFRESH_PEER_ON_RS:
215        refreshPeer(env, getPeerOperationType());
216        setNextState(nextStateAfterRefresh());
217        return Flow.HAS_MORE_STATE;
218      case SERIAL_PEER_REOPEN_REGIONS:
219        try {
220          reopenRegions(env);
221        } catch (Exception e) {
222          throw suspend(env.getMasterConfiguration(),
223            backoff -> LOG.warn("{} reopen regions for peer {} failed,  sleep {} secs",
224              getClass().getName(), peerId, backoff / 1000, e));
225        }
226        resetRetry();
227        setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID);
228        return Flow.HAS_MORE_STATE;
229      case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
230        try {
231          updateLastPushedSequenceIdForSerialPeer(env);
232        } catch (Exception e) {
233          throw suspend(env.getMasterConfiguration(),
234            backoff -> LOG.warn("{} set last sequence id for peer {} failed,  sleep {} secs",
235              getClass().getName(), peerId, backoff / 1000, e));
236        }
237        resetRetry();
238        setNextState(enablePeerBeforeFinish()
239          ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED
240          : PeerModificationState.POST_PEER_MODIFICATION);
241        return Flow.HAS_MORE_STATE;
242      case SERIAL_PEER_SET_PEER_ENABLED:
243        try {
244          enablePeer(env);
245        } catch (ReplicationException e) {
246          throw suspend(env.getMasterConfiguration(),
247            backoff -> LOG.warn("{} enable peer before finish for peer {} failed,  sleep {} secs",
248              getClass().getName(), peerId, backoff / 1000, e));
249        }
250        resetRetry();
251        setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS);
252        return Flow.HAS_MORE_STATE;
253      case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS:
254        refreshPeer(env, PeerOperationType.ENABLE);
255        setNextState(PeerModificationState.POST_PEER_MODIFICATION);
256        return Flow.HAS_MORE_STATE;
257      case POST_PEER_MODIFICATION:
258        try {
259          postPeerModification(env);
260        } catch (ReplicationException e) {
261          throw suspend(env.getMasterConfiguration(),
262            backoff -> LOG.warn(
263              "{} failed to call postPeerModification for peer {},  sleep {} secs",
264              getClass().getName(), peerId, backoff / 1000, e));
265        } catch (IOException e) {
266          LOG.warn("{} failed to call post CP hook for peer {}, "
267            + "ignore since the procedure has already done", getClass().getName(), peerId, e);
268        }
269        releaseLatch(env);
270        return Flow.NO_MORE_STATE;
271      default:
272        throw new UnsupportedOperationException("unhandled state=" + state);
273    }
274  }
275
276  @Override
277  protected PeerModificationState getState(int stateId) {
278    return PeerModificationState.forNumber(stateId);
279  }
280
281  @Override
282  protected int getStateId(PeerModificationState state) {
283    return state.getNumber();
284  }
285
286  @Override
287  protected PeerModificationState getInitialState() {
288    return PeerModificationState.PRE_PEER_MODIFICATION;
289  }
290}