001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.HashMap;
023import java.util.Map;
024import java.util.function.LongConsumer;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.MetaTableAccessor;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.TableNotFoundException;
029import org.apache.hadoop.hbase.client.Connection;
030import org.apache.hadoop.hbase.client.TableDescriptor;
031import org.apache.hadoop.hbase.client.TableState;
032import org.apache.hadoop.hbase.master.TableStateManager;
033import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
034import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
035import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
036import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
037import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
038import org.apache.hadoop.hbase.replication.ReplicationException;
039import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
040import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
041import org.apache.hadoop.hbase.util.Pair;
042import org.apache.hadoop.hbase.util.RetryCounter;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
049
050/**
051 * The base class for all replication peer related procedure except sync replication state
052 * transition.
053 */
054@InterfaceAudience.Private
055public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModificationState> {
056
057  private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class);
058
059  protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000;
060
061  // The sleep interval when waiting table to be enabled or disabled.
062  protected static final int SLEEP_INTERVAL_MS = 1000;
063
064  private RetryCounter retryCounter;
065
066  protected ModifyPeerProcedure() {
067  }
068
069  protected ModifyPeerProcedure(String peerId) {
070    super(peerId);
071  }
072
073  /**
074   * Called before we start the actual processing. The implementation should call the pre CP hook,
075   * and also the pre-check for the peer modification.
076   * <p>
077   * If an IOException is thrown then we will give up and mark the procedure as failed directly. If
078   * all checks passes then the procedure can not be rolled back any more.
079   */
080  protected abstract void prePeerModification(MasterProcedureEnv env)
081    throws IOException, ReplicationException;
082
083  protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException;
084
085  /**
086   * Called before we finish the procedure. The implementation can do some logging work, and also
087   * call the coprocessor hook if any.
088   * <p>
089   * Notice that, since we have already done the actual work, throwing {@code IOException} here will
090   * not fail this procedure, we will just ignore it and finish the procedure as suceeded. If
091   * {@code ReplicationException} is thrown we will retry since this usually means we fails to
092   * update the peer storage.
093   */
094  protected abstract void postPeerModification(MasterProcedureEnv env)
095    throws IOException, ReplicationException;
096
097  private void releaseLatch() {
098    ProcedurePrepareLatch.releaseLatch(latch, this);
099  }
100
101  /**
102   * Implementation class can override this method. By default we will jump to
103   * POST_PEER_MODIFICATION and finish the procedure.
104   */
105  protected PeerModificationState nextStateAfterRefresh() {
106    return PeerModificationState.POST_PEER_MODIFICATION;
107  }
108
109  /**
110   * The implementation class should override this method if the procedure may enter the serial
111   * related states.
112   */
113  protected boolean enablePeerBeforeFinish() {
114    throw new UnsupportedOperationException();
115  }
116
117  private void refreshPeer(MasterProcedureEnv env, PeerOperationType type) {
118    addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
119      .map(sn -> new RefreshPeerProcedure(peerId, type, sn)).toArray(RefreshPeerProcedure[]::new));
120  }
121
122  protected ReplicationPeerConfig getOldPeerConfig() {
123    return null;
124  }
125
126  protected ReplicationPeerConfig getNewPeerConfig() {
127    throw new UnsupportedOperationException();
128  }
129
130  protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env)
131    throws IOException, ReplicationException {
132    throw new UnsupportedOperationException();
133  }
134
135  // If the table is in enabling state, we need to wait until it is enabled and then reopen all its
136  // regions.
137  private boolean needReopen(TableStateManager tsm, TableName tn) throws IOException {
138    for (;;) {
139      try {
140        TableState state = tsm.getTableState(tn);
141        if (state.isEnabled()) {
142          return true;
143        }
144        if (!state.isEnabling()) {
145          return false;
146        }
147        Thread.sleep(SLEEP_INTERVAL_MS);
148      } catch (TableNotFoundException e) {
149        return false;
150      } catch (InterruptedException e) {
151        throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
152      }
153    }
154  }
155
156  // will be override in test to simulate error
157  protected void reopenRegions(MasterProcedureEnv env) throws IOException {
158    ReplicationPeerConfig peerConfig = getNewPeerConfig();
159    ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
160    TableStateManager tsm = env.getMasterServices().getTableStateManager();
161    for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
162      if (!td.hasGlobalReplicationScope()) {
163        continue;
164      }
165      TableName tn = td.getTableName();
166      if (!peerConfig.needToReplicate(tn)) {
167        continue;
168      }
169      if (oldPeerConfig != null && oldPeerConfig.isSerial() && oldPeerConfig.needToReplicate(tn)) {
170        continue;
171      }
172      if (needReopen(tsm, tn)) {
173        addChildProcedure(new ReopenTableRegionsProcedure(tn));
174      }
175    }
176  }
177
178  // will be override in test to simulate error
179  protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
180    env.getReplicationPeerManager().enablePeer(peerId);
181  }
182
183  private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
184    ReplicationQueueStorage queueStorage) throws ReplicationException {
185    if (barrier >= 0) {
186      lastSeqIds.put(encodedRegionName, barrier);
187      if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
188        queueStorage.setLastSequenceIds(peerId, lastSeqIds);
189        lastSeqIds.clear();
190      }
191    }
192  }
193
194  protected final void setLastPushedSequenceId(MasterProcedureEnv env,
195    ReplicationPeerConfig peerConfig) throws IOException, ReplicationException {
196    Map<String, Long> lastSeqIds = new HashMap<String, Long>();
197    for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
198      if (!td.hasGlobalReplicationScope()) {
199        continue;
200      }
201      TableName tn = td.getTableName();
202      if (!peerConfig.needToReplicate(tn)) {
203        continue;
204      }
205      setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
206    }
207    if (!lastSeqIds.isEmpty()) {
208      env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds);
209    }
210  }
211
212  // If the table is currently disabling, then we need to wait until it is disabled.We will write
213  // replication barrier for a disabled table. And return whether we need to update the last pushed
214  // sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException,
215  // then we do not need to update last pushed sequence id for this table.
216  private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn)
217    throws IOException {
218    for (;;) {
219      try {
220        if (!tsm.getTableState(tn).isDisabling()) {
221          return true;
222        }
223        Thread.sleep(SLEEP_INTERVAL_MS);
224      } catch (TableNotFoundException e) {
225        return false;
226      } catch (InterruptedException e) {
227        throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
228      }
229    }
230  }
231
232  // Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is
233  // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller
234  // should not forget to check whether the map is empty at last, if not you should call
235  // queueStorage.setLastSequenceIds to write out the remaining entries in the map.
236  protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName,
237    Map<String, Long> lastSeqIds) throws IOException, ReplicationException {
238    TableStateManager tsm = env.getMasterServices().getTableStateManager();
239    ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
240    Connection conn = env.getMasterServices().getConnection();
241    if (!needSetLastPushedSequenceId(tsm, tableName)) {
242      LOG.debug("Skip settting last pushed sequence id for {}", tableName);
243      return;
244    }
245    for (Pair<String, Long> name2Barrier : MetaTableAccessor
246      .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
247      LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier);
248      addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
249        queueStorage);
250    }
251  }
252
253  @Override
254  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
255    setState(ProcedureProtos.ProcedureState.RUNNABLE);
256    env.getProcedureScheduler().addFront(this);
257    return false;
258  }
259
260  private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer)
261    throws ProcedureSuspendedException {
262    if (retryCounter == null) {
263      retryCounter = ProcedureUtil.createRetryCounter(conf);
264    }
265    long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
266    backoffConsumer.accept(backoff);
267    setTimeout(Math.toIntExact(backoff));
268    setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
269    skipPersistence();
270    throw new ProcedureSuspendedException();
271  }
272
273  @Override
274  protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
275    throws ProcedureSuspendedException {
276    switch (state) {
277      case PRE_PEER_MODIFICATION:
278        try {
279          prePeerModification(env);
280        } catch (IOException e) {
281          LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, "
282            + "mark the procedure as failure and give up", getClass().getName(), peerId, e);
283          setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e);
284          releaseLatch();
285          return Flow.NO_MORE_STATE;
286        } catch (ReplicationException e) {
287          throw suspend(env.getMasterConfiguration(),
288            backoff -> LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs",
289              getClass().getName(), peerId, backoff / 1000, e));
290        }
291        retryCounter = null;
292        setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
293        return Flow.HAS_MORE_STATE;
294      case UPDATE_PEER_STORAGE:
295        try {
296          updatePeerStorage(env);
297        } catch (ReplicationException e) {
298          throw suspend(env.getMasterConfiguration(),
299            backoff -> LOG.warn("{} update peer storage for peer {} failed, sleep {} secs",
300              getClass().getName(), peerId, backoff / 1000, e));
301        }
302        retryCounter = null;
303        setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
304        return Flow.HAS_MORE_STATE;
305      case REFRESH_PEER_ON_RS:
306        refreshPeer(env, getPeerOperationType());
307        setNextState(nextStateAfterRefresh());
308        return Flow.HAS_MORE_STATE;
309      case SERIAL_PEER_REOPEN_REGIONS:
310        try {
311          reopenRegions(env);
312        } catch (Exception e) {
313          throw suspend(env.getMasterConfiguration(),
314            backoff -> LOG.warn("{} reopen regions for peer {} failed,  sleep {} secs",
315              getClass().getName(), peerId, backoff / 1000, e));
316        }
317        retryCounter = null;
318        setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID);
319        return Flow.HAS_MORE_STATE;
320      case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
321        try {
322          updateLastPushedSequenceIdForSerialPeer(env);
323        } catch (Exception e) {
324          throw suspend(env.getMasterConfiguration(),
325            backoff -> LOG.warn("{} set last sequence id for peer {} failed,  sleep {} secs",
326              getClass().getName(), peerId, backoff / 1000, e));
327        }
328        retryCounter = null;
329        setNextState(enablePeerBeforeFinish()
330          ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED
331          : PeerModificationState.POST_PEER_MODIFICATION);
332        return Flow.HAS_MORE_STATE;
333      case SERIAL_PEER_SET_PEER_ENABLED:
334        try {
335          enablePeer(env);
336        } catch (ReplicationException e) {
337          throw suspend(env.getMasterConfiguration(),
338            backoff -> LOG.warn("{} enable peer before finish for peer {} failed,  sleep {} secs",
339              getClass().getName(), peerId, backoff / 1000, e));
340        }
341        retryCounter = null;
342        setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS);
343        return Flow.HAS_MORE_STATE;
344      case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS:
345        refreshPeer(env, PeerOperationType.ENABLE);
346        setNextState(PeerModificationState.POST_PEER_MODIFICATION);
347        return Flow.HAS_MORE_STATE;
348      case POST_PEER_MODIFICATION:
349        try {
350          postPeerModification(env);
351        } catch (ReplicationException e) {
352          throw suspend(env.getMasterConfiguration(),
353            backoff -> LOG.warn(
354              "{} failed to call postPeerModification for peer {},  sleep {} secs",
355              getClass().getName(), peerId, backoff / 1000, e));
356        } catch (IOException e) {
357          LOG.warn("{} failed to call post CP hook for peer {}, "
358            + "ignore since the procedure has already done", getClass().getName(), peerId, e);
359        }
360        releaseLatch();
361        return Flow.NO_MORE_STATE;
362      default:
363        throw new UnsupportedOperationException("unhandled state=" + state);
364    }
365  }
366
367  @Override
368  protected void rollbackState(MasterProcedureEnv env, PeerModificationState state)
369    throws IOException, InterruptedException {
370    if (state == PeerModificationState.PRE_PEER_MODIFICATION) {
371      // actually the peer related operations has no rollback, but if we haven't done any
372      // modifications on the peer storage yet, we can just return.
373      return;
374    }
375    throw new UnsupportedOperationException();
376  }
377
378  @Override
379  protected PeerModificationState getState(int stateId) {
380    return PeerModificationState.forNumber(stateId);
381  }
382
383  @Override
384  protected int getStateId(PeerModificationState state) {
385    return state.getNumber();
386  }
387
388  @Override
389  protected PeerModificationState getInitialState() {
390    return PeerModificationState.PRE_PEER_MODIFICATION;
391  }
392}