001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import org.apache.hadoop.hbase.TableName;
023import org.apache.hadoop.hbase.TableNotFoundException;
024import org.apache.hadoop.hbase.client.TableDescriptor;
025import org.apache.hadoop.hbase.client.TableState;
026import org.apache.hadoop.hbase.master.TableStateManager;
027import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
028import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
029import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
030import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
031import org.apache.hadoop.hbase.replication.ReplicationException;
032import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
038
039import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
040
041/**
042 * The base class for all replication peer related procedure except sync replication state
043 * transition.
044 */
045@InterfaceAudience.Private
046public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModificationState> {
047
048  private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class);
049
050  protected ModifyPeerProcedure() {
051  }
052
053  protected ModifyPeerProcedure(String peerId) {
054    super(peerId);
055  }
056
057  /**
058   * Called before we start the actual processing. The implementation should call the pre CP hook,
059   * and also the pre-check for the peer modification.
060   * <p>
061   * If an IOException is thrown then we will give up and mark the procedure as failed directly. If
062   * all checks passes then the procedure can not be rolled back any more.
063   */
064  protected abstract void prePeerModification(MasterProcedureEnv env)
065      throws IOException, ReplicationException, InterruptedException;
066
067  protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException;
068
069  /**
070   * Called before we finish the procedure. The implementation can do some logging work, and also
071   * call the coprocessor hook if any.
072   * <p>
073   * Notice that, since we have already done the actual work, throwing {@code IOException} here will
074   * not fail this procedure, we will just ignore it and finish the procedure as suceeded. If
075   * {@code ReplicationException} is thrown we will retry since this usually means we fails to
076   * update the peer storage.
077   */
078  protected abstract void postPeerModification(MasterProcedureEnv env)
079      throws IOException, ReplicationException;
080
081  protected void releaseLatch(MasterProcedureEnv env) {
082    ProcedurePrepareLatch.releaseLatch(latch, this);
083  }
084
085  /**
086   * Implementation class can override this method. By default we will jump to
087   * POST_PEER_MODIFICATION and finish the procedure.
088   */
089  protected PeerModificationState nextStateAfterRefresh() {
090    return PeerModificationState.POST_PEER_MODIFICATION;
091  }
092
093  /**
094   * The implementation class should override this method if the procedure may enter the serial
095   * related states.
096   */
097  protected boolean enablePeerBeforeFinish() {
098    throw new UnsupportedOperationException();
099  }
100
101  protected ReplicationPeerConfig getOldPeerConfig() {
102    return null;
103  }
104
105  protected ReplicationPeerConfig getNewPeerConfig() {
106    throw new UnsupportedOperationException();
107  }
108
109  protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env)
110      throws IOException, ReplicationException {
111    throw new UnsupportedOperationException();
112  }
113
114  // If the table is in enabling state, we need to wait until it is enabled and then reopen all its
115  // regions.
116  private boolean needReopen(TableStateManager tsm, TableName tn) throws IOException {
117    for (;;) {
118      try {
119        TableState state = tsm.getTableState(tn);
120        if (state.isEnabled()) {
121          return true;
122        }
123        if (!state.isEnabling()) {
124          return false;
125        }
126        Thread.sleep(SLEEP_INTERVAL_MS);
127      } catch (TableNotFoundException e) {
128        return false;
129      } catch (InterruptedException e) {
130        throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
131      }
132    }
133  }
134
135  // will be override in test to simulate error
136  @VisibleForTesting
137  protected void reopenRegions(MasterProcedureEnv env) throws IOException {
138    ReplicationPeerConfig peerConfig = getNewPeerConfig();
139    ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
140    TableStateManager tsm = env.getMasterServices().getTableStateManager();
141    for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
142      if (!td.hasGlobalReplicationScope()) {
143        continue;
144      }
145      TableName tn = td.getTableName();
146      if (!peerConfig.needToReplicate(tn)) {
147        continue;
148      }
149      if (oldPeerConfig != null && oldPeerConfig.isSerial() &&
150        oldPeerConfig.needToReplicate(tn)) {
151        continue;
152      }
153      if (needReopen(tsm, tn)) {
154        addChildProcedure(new ReopenTableRegionsProcedure(tn));
155      }
156    }
157  }
158
159  @Override
160  protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
161      throws ProcedureSuspendedException, InterruptedException {
162    switch (state) {
163      case PRE_PEER_MODIFICATION:
164        try {
165          prePeerModification(env);
166        } catch (IOException e) {
167          LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " +
168            "mark the procedure as failure and give up", getClass().getName(), peerId, e);
169          setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e);
170          releaseLatch(env);
171          return Flow.NO_MORE_STATE;
172        } catch (ReplicationException e) {
173          throw suspend(env.getMasterConfiguration(),
174            backoff -> LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs",
175              getClass().getName(), peerId, backoff / 1000, e));
176        }
177        resetRetry();
178        setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
179        return Flow.HAS_MORE_STATE;
180      case UPDATE_PEER_STORAGE:
181        try {
182          updatePeerStorage(env);
183        } catch (ReplicationException e) {
184          throw suspend(env.getMasterConfiguration(),
185            backoff -> LOG.warn("{} update peer storage for peer {} failed, sleep {} secs",
186              getClass().getName(), peerId, backoff / 1000, e));
187        }
188        resetRetry();
189        setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
190        return Flow.HAS_MORE_STATE;
191      case REFRESH_PEER_ON_RS:
192        refreshPeer(env, getPeerOperationType());
193        setNextState(nextStateAfterRefresh());
194        return Flow.HAS_MORE_STATE;
195      case SERIAL_PEER_REOPEN_REGIONS:
196        try {
197          reopenRegions(env);
198        } catch (Exception e) {
199          throw suspend(env.getMasterConfiguration(),
200            backoff -> LOG.warn("{} reopen regions for peer {} failed,  sleep {} secs",
201              getClass().getName(), peerId, backoff / 1000, e));
202        }
203        resetRetry();
204        setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID);
205        return Flow.HAS_MORE_STATE;
206      case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
207        try {
208          updateLastPushedSequenceIdForSerialPeer(env);
209        } catch (Exception e) {
210          throw suspend(env.getMasterConfiguration(),
211            backoff -> LOG.warn("{} set last sequence id for peer {} failed,  sleep {} secs",
212              getClass().getName(), peerId, backoff / 1000, e));
213        }
214        resetRetry();
215        setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED
216          : PeerModificationState.POST_PEER_MODIFICATION);
217        return Flow.HAS_MORE_STATE;
218      case SERIAL_PEER_SET_PEER_ENABLED:
219        try {
220          enablePeer(env);
221        } catch (ReplicationException e) {
222          throw suspend(env.getMasterConfiguration(),
223            backoff -> LOG.warn("{} enable peer before finish for peer {} failed,  sleep {} secs",
224              getClass().getName(), peerId, backoff / 1000, e));
225        }
226        resetRetry();
227        setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS);
228        return Flow.HAS_MORE_STATE;
229      case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS:
230        refreshPeer(env, PeerOperationType.ENABLE);
231        setNextState(PeerModificationState.POST_PEER_MODIFICATION);
232        return Flow.HAS_MORE_STATE;
233      case POST_PEER_MODIFICATION:
234        try {
235          postPeerModification(env);
236        } catch (ReplicationException e) {
237          throw suspend(env.getMasterConfiguration(),
238            backoff -> LOG.warn(
239              "{} failed to call postPeerModification for peer {},  sleep {} secs",
240              getClass().getName(), peerId, backoff / 1000, e));
241        } catch (IOException e) {
242          LOG.warn("{} failed to call post CP hook for peer {}, " +
243            "ignore since the procedure has already done", getClass().getName(), peerId, e);
244        }
245        releaseLatch(env);
246        return Flow.NO_MORE_STATE;
247      default:
248        throw new UnsupportedOperationException("unhandled state=" + state);
249    }
250  }
251
252  @Override
253  protected PeerModificationState getState(int stateId) {
254    return PeerModificationState.forNumber(stateId);
255  }
256
257  @Override
258  protected int getStateId(PeerModificationState state) {
259    return state.getNumber();
260  }
261
262  @Override
263  protected PeerModificationState getInitialState() {
264    return PeerModificationState.PRE_PEER_MODIFICATION;
265  }
266}