001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.HashMap;
023import java.util.Map;
024import java.util.function.LongConsumer;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.MetaTableAccessor;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.Connection;
029import org.apache.hadoop.hbase.client.TableDescriptor;
030import org.apache.hadoop.hbase.client.TableState;
031import org.apache.hadoop.hbase.master.TableStateManager;
032import org.apache.hadoop.hbase.master.TableStateManager.TableStateNotFoundException;
033import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
034import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
035import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
036import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
037import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
038import org.apache.hadoop.hbase.replication.ReplicationException;
039import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
040import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
041import org.apache.hadoop.hbase.util.Pair;
042import org.apache.hadoop.hbase.util.RetryCounter;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
048
049import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
051
052/**
053 * The base class for all replication peer related procedure except sync replication state
054 * transition.
055 */
056@InterfaceAudience.Private
057public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModificationState> {
058
059  private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class);
060
061  protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000;
062
063  // The sleep interval when waiting table to be enabled or disabled.
064  protected static final int SLEEP_INTERVAL_MS = 1000;
065
066  private RetryCounter retryCounter;
067
068  protected ModifyPeerProcedure() {
069  }
070
071  protected ModifyPeerProcedure(String peerId) {
072    super(peerId);
073  }
074
075  /**
076   * Called before we start the actual processing. The implementation should call the pre CP hook,
077   * and also the pre-check for the peer modification.
078   * <p>
079   * If an IOException is thrown then we will give up and mark the procedure as failed directly. If
080   * all checks passes then the procedure can not be rolled back any more.
081   */
082  protected abstract void prePeerModification(MasterProcedureEnv env)
083      throws IOException, ReplicationException;
084
085  protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException;
086
087  /**
088   * Called before we finish the procedure. The implementation can do some logging work, and also
089   * call the coprocessor hook if any.
090   * <p>
091   * Notice that, since we have already done the actual work, throwing {@code IOException} here will
092   * not fail this procedure, we will just ignore it and finish the procedure as suceeded. If
093   * {@code ReplicationException} is thrown we will retry since this usually means we fails to
094   * update the peer storage.
095   */
096  protected abstract void postPeerModification(MasterProcedureEnv env)
097      throws IOException, ReplicationException;
098
099  private void releaseLatch() {
100    ProcedurePrepareLatch.releaseLatch(latch, this);
101  }
102
103  /**
104   * Implementation class can override this method. By default we will jump to
105   * POST_PEER_MODIFICATION and finish the procedure.
106   */
107  protected PeerModificationState nextStateAfterRefresh() {
108    return PeerModificationState.POST_PEER_MODIFICATION;
109  }
110
111  /**
112   * The implementation class should override this method if the procedure may enter the serial
113   * related states.
114   */
115  protected boolean enablePeerBeforeFinish() {
116    throw new UnsupportedOperationException();
117  }
118
119  private void refreshPeer(MasterProcedureEnv env, PeerOperationType type) {
120    addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
121      .map(sn -> new RefreshPeerProcedure(peerId, type, sn))
122      .toArray(RefreshPeerProcedure[]::new));
123  }
124
125  protected ReplicationPeerConfig getOldPeerConfig() {
126    return null;
127  }
128
129  protected ReplicationPeerConfig getNewPeerConfig() {
130    throw new UnsupportedOperationException();
131  }
132
133  protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env)
134      throws IOException, ReplicationException {
135    throw new UnsupportedOperationException();
136  }
137
138  // If the table is in enabling state, we need to wait until it is enabled and then reopen all its
139  // regions.
140  private boolean needReopen(TableStateManager tsm, TableName tn) throws IOException {
141    for (;;) {
142      try {
143        TableState state = tsm.getTableState(tn);
144        if (state.isEnabled()) {
145          return true;
146        }
147        if (!state.isEnabling()) {
148          return false;
149        }
150        Thread.sleep(SLEEP_INTERVAL_MS);
151      } catch (TableStateNotFoundException e) {
152        return false;
153      } catch (InterruptedException e) {
154        throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
155      }
156    }
157  }
158
159  // will be override in test to simulate error
160  @VisibleForTesting
161  protected void reopenRegions(MasterProcedureEnv env) throws IOException {
162    ReplicationPeerConfig peerConfig = getNewPeerConfig();
163    ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
164    TableStateManager tsm = env.getMasterServices().getTableStateManager();
165    for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
166      if (!td.hasGlobalReplicationScope()) {
167        continue;
168      }
169      TableName tn = td.getTableName();
170      if (!peerConfig.needToReplicate(tn)) {
171        continue;
172      }
173      if (oldPeerConfig != null && oldPeerConfig.isSerial() &&
174        oldPeerConfig.needToReplicate(tn)) {
175        continue;
176      }
177      if (needReopen(tsm, tn)) {
178        addChildProcedure(new ReopenTableRegionsProcedure(tn));
179      }
180    }
181  }
182
183  // will be override in test to simulate error
184  @VisibleForTesting
185  protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
186    env.getReplicationPeerManager().enablePeer(peerId);
187  }
188
189  private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
190      ReplicationQueueStorage queueStorage) throws ReplicationException {
191    if (barrier >= 0) {
192      lastSeqIds.put(encodedRegionName, barrier);
193      if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
194        queueStorage.setLastSequenceIds(peerId, lastSeqIds);
195        lastSeqIds.clear();
196      }
197    }
198  }
199
200  protected final void setLastPushedSequenceId(MasterProcedureEnv env,
201      ReplicationPeerConfig peerConfig) throws IOException, ReplicationException {
202    Map<String, Long> lastSeqIds = new HashMap<String, Long>();
203    for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
204      if (!td.hasGlobalReplicationScope()) {
205        continue;
206      }
207      TableName tn = td.getTableName();
208      if (!peerConfig.needToReplicate(tn)) {
209        continue;
210      }
211      setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
212    }
213    if (!lastSeqIds.isEmpty()) {
214      env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds);
215    }
216  }
217
218  // If the table is currently disabling, then we need to wait until it is disabled.We will write
219  // replication barrier for a disabled table. And return whether we need to update the last pushed
220  // sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException,
221  // then we do not need to update last pushed sequence id for this table.
222  private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn)
223      throws IOException {
224    for (;;) {
225      try {
226        if (!tsm.getTableState(tn).isDisabling()) {
227          return true;
228        }
229        Thread.sleep(SLEEP_INTERVAL_MS);
230      } catch (TableStateNotFoundException e) {
231        return false;
232      } catch (InterruptedException e) {
233        throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
234      }
235    }
236  }
237
238  // Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is
239  // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller
240  // should not forget to check whether the map is empty at last, if not you should call
241  // queueStorage.setLastSequenceIds to write out the remaining entries in the map.
242  protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName,
243      Map<String, Long> lastSeqIds) throws IOException, ReplicationException {
244    TableStateManager tsm = env.getMasterServices().getTableStateManager();
245    ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
246    Connection conn = env.getMasterServices().getConnection();
247    if (!needSetLastPushedSequenceId(tsm, tableName)) {
248      LOG.debug("Skip settting last pushed sequence id for {}", tableName);
249      return;
250    }
251    for (Pair<String, Long> name2Barrier : MetaTableAccessor
252      .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
253      LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier);
254      addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
255        queueStorage);
256    }
257  }
258
259  @Override
260  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
261    setState(ProcedureProtos.ProcedureState.RUNNABLE);
262    env.getProcedureScheduler().addFront(this);
263    return false;
264  }
265
266  private ProcedureSuspendedException suspend(Configuration conf,
267      LongConsumer backoffConsumer) throws ProcedureSuspendedException {
268    if (retryCounter == null) {
269      retryCounter = ProcedureUtil.createRetryCounter(conf);
270    }
271    long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
272    backoffConsumer.accept(backoff);
273    setTimeout(Math.toIntExact(backoff));
274    setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
275    skipPersistence();
276    throw new ProcedureSuspendedException();
277  }
278
279  @Override
280  protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
281      throws ProcedureSuspendedException {
282    switch (state) {
283      case PRE_PEER_MODIFICATION:
284        try {
285          prePeerModification(env);
286        } catch (IOException e) {
287          LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " +
288            "mark the procedure as failure and give up", getClass().getName(), peerId, e);
289          setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e);
290          releaseLatch();
291          return Flow.NO_MORE_STATE;
292        } catch (ReplicationException e) {
293          throw suspend(env.getMasterConfiguration(),
294            backoff -> LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs",
295              getClass().getName(), peerId, backoff / 1000, e));
296        }
297        retryCounter = null;
298        setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
299        return Flow.HAS_MORE_STATE;
300      case UPDATE_PEER_STORAGE:
301        try {
302          updatePeerStorage(env);
303        } catch (ReplicationException e) {
304          throw suspend(env.getMasterConfiguration(),
305            backoff -> LOG.warn("{} update peer storage for peer {} failed, sleep {} secs",
306              getClass().getName(), peerId, backoff / 1000, e));
307        }
308        retryCounter = null;
309        setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
310        return Flow.HAS_MORE_STATE;
311      case REFRESH_PEER_ON_RS:
312        refreshPeer(env, getPeerOperationType());
313        setNextState(nextStateAfterRefresh());
314        return Flow.HAS_MORE_STATE;
315      case SERIAL_PEER_REOPEN_REGIONS:
316        try {
317          reopenRegions(env);
318        } catch (Exception e) {
319          throw suspend(env.getMasterConfiguration(),
320            backoff -> LOG.warn("{} reopen regions for peer {} failed,  sleep {} secs",
321              getClass().getName(), peerId, backoff / 1000, e));
322        }
323        retryCounter = null;
324        setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID);
325        return Flow.HAS_MORE_STATE;
326      case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
327        try {
328          updateLastPushedSequenceIdForSerialPeer(env);
329        } catch (Exception e) {
330          throw suspend(env.getMasterConfiguration(),
331            backoff -> LOG.warn("{} set last sequence id for peer {} failed,  sleep {} secs",
332              getClass().getName(), peerId, backoff / 1000, e));
333        }
334        retryCounter = null;
335        setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED
336          : PeerModificationState.POST_PEER_MODIFICATION);
337        return Flow.HAS_MORE_STATE;
338      case SERIAL_PEER_SET_PEER_ENABLED:
339        try {
340          enablePeer(env);
341        } catch (ReplicationException e) {
342          throw suspend(env.getMasterConfiguration(),
343            backoff -> LOG.warn("{} enable peer before finish for peer {} failed,  sleep {} secs",
344              getClass().getName(), peerId, backoff / 1000, e));
345        }
346        retryCounter = null;
347        setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS);
348        return Flow.HAS_MORE_STATE;
349      case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS:
350        refreshPeer(env, PeerOperationType.ENABLE);
351        setNextState(PeerModificationState.POST_PEER_MODIFICATION);
352        return Flow.HAS_MORE_STATE;
353      case POST_PEER_MODIFICATION:
354        try {
355          postPeerModification(env);
356        } catch (ReplicationException e) {
357          throw suspend(env.getMasterConfiguration(),
358            backoff -> LOG.warn(
359              "{} failed to call postPeerModification for peer {},  sleep {} secs",
360              getClass().getName(), peerId, backoff / 1000, e));
361        } catch (IOException e) {
362          LOG.warn("{} failed to call post CP hook for peer {}, " +
363            "ignore since the procedure has already done", getClass().getName(), peerId, e);
364        }
365        releaseLatch();
366        return Flow.NO_MORE_STATE;
367      default:
368        throw new UnsupportedOperationException("unhandled state=" + state);
369    }
370  }
371
372  @Override
373  protected void rollbackState(MasterProcedureEnv env, PeerModificationState state)
374      throws IOException, InterruptedException {
375    if (state == PeerModificationState.PRE_PEER_MODIFICATION) {
376      // actually the peer related operations has no rollback, but if we haven't done any
377      // modifications on the peer storage yet, we can just return.
378      return;
379    }
380    throw new UnsupportedOperationException();
381  }
382
383  @Override
384  protected PeerModificationState getState(int stateId) {
385    return PeerModificationState.forNumber(stateId);
386  }
387
388  @Override
389  protected int getStateId(PeerModificationState state) {
390    return state.getNumber();
391  }
392
393  @Override
394  protected PeerModificationState getInitialState() {
395    return PeerModificationState.PRE_PEER_MODIFICATION;
396  }
397}