001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.util.HashMap;
023import java.util.Map;
024import org.apache.hadoop.hbase.TableName;
025import org.apache.hadoop.hbase.TableNotFoundException;
026import org.apache.hadoop.hbase.client.Connection;
027import org.apache.hadoop.hbase.client.TableDescriptor;
028import org.apache.hadoop.hbase.master.TableStateManager;
029import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
030import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
031import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
032import org.apache.hadoop.hbase.replication.ReplicationBarrierFamilyFormat;
033import org.apache.hadoop.hbase.replication.ReplicationException;
034import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
035import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
036import org.apache.hadoop.hbase.util.Pair;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * The base class for all replication peer related procedure.
043 */
044@InterfaceAudience.Private
045public abstract class AbstractPeerProcedure<TState> extends AbstractPeerNoLockProcedure<TState>
046  implements PeerProcedureInterface {
047
048  private static final Logger LOG = LoggerFactory.getLogger(AbstractPeerProcedure.class);
049
050  protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000;
051
052  // The sleep interval when waiting table to be enabled or disabled.
053  protected static final int SLEEP_INTERVAL_MS = 1000;
054
055  // used to keep compatible with old client where we can only return after updateStorage.
056  protected ProcedurePrepareLatch latch;
057
058  protected AbstractPeerProcedure() {
059  }
060
061  protected AbstractPeerProcedure(String peerId) {
062    super(peerId);
063    this.latch = ProcedurePrepareLatch.createLatch(2, 1);
064  }
065
066  public ProcedurePrepareLatch getLatch() {
067    return latch;
068  }
069
070  @Override
071  protected LockState acquireLock(MasterProcedureEnv env) {
072    if (env.getProcedureScheduler().waitPeerExclusiveLock(this, peerId)) {
073      return LockState.LOCK_EVENT_WAIT;
074    }
075    return LockState.LOCK_ACQUIRED;
076  }
077
078  @Override
079  protected void releaseLock(MasterProcedureEnv env) {
080    env.getProcedureScheduler().wakePeerExclusiveLock(this, peerId);
081  }
082
083  @Override
084  protected boolean holdLock(MasterProcedureEnv env) {
085    return true;
086  }
087
088  protected final void refreshPeer(MasterProcedureEnv env, PeerOperationType type) {
089    addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
090      .map(sn -> new RefreshPeerProcedure(peerId, type, sn)).toArray(RefreshPeerProcedure[]::new));
091  }
092
093  // will be override in test to simulate error
094  protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
095    env.getReplicationPeerManager().enablePeer(peerId);
096  }
097
098  private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
099    ReplicationQueueStorage queueStorage) throws ReplicationException {
100    if (barrier >= 0) {
101      lastSeqIds.put(encodedRegionName, barrier);
102      if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
103        queueStorage.setLastSequenceIds(peerId, lastSeqIds);
104        lastSeqIds.clear();
105      }
106    }
107  }
108
109  protected final void setLastPushedSequenceId(MasterProcedureEnv env,
110    ReplicationPeerConfig peerConfig) throws IOException, ReplicationException {
111    Map<String, Long> lastSeqIds = new HashMap<String, Long>();
112    for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
113      if (!td.hasGlobalReplicationScope()) {
114        continue;
115      }
116      TableName tn = td.getTableName();
117      if (!peerConfig.needToReplicate(tn)) {
118        continue;
119      }
120      setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
121    }
122    if (!lastSeqIds.isEmpty()) {
123      env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds);
124    }
125  }
126
127  // If the table is currently disabling, then we need to wait until it is disabled.We will write
128  // replication barrier for a disabled table. And return whether we need to update the last pushed
129  // sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException,
130  // then we do not need to update last pushed sequence id for this table.
131  private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn)
132    throws IOException {
133    for (;;) {
134      try {
135        if (!tsm.getTableState(tn).isDisabling()) {
136          return true;
137        }
138        Thread.sleep(SLEEP_INTERVAL_MS);
139      } catch (TableNotFoundException e) {
140        return false;
141      } catch (InterruptedException e) {
142        throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
143      }
144    }
145  }
146
147  // Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is
148  // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller
149  // should not forget to check whether the map is empty at last, if not you should call
150  // queueStorage.setLastSequenceIds to write out the remaining entries in the map.
151  protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName,
152    Map<String, Long> lastSeqIds) throws IOException, ReplicationException {
153    TableStateManager tsm = env.getMasterServices().getTableStateManager();
154    ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
155    Connection conn = env.getMasterServices().getConnection();
156    if (!needSetLastPushedSequenceId(tsm, tableName)) {
157      LOG.debug("Skip settting last pushed sequence id for {}", tableName);
158      return;
159    }
160    for (Pair<String, Long> name2Barrier : ReplicationBarrierFamilyFormat
161      .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
162      LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier);
163      addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
164        queueStorage);
165    }
166  }
167
168  protected final void checkPeerModificationEnabled(MasterProcedureEnv env) throws IOException {
169    if (!env.getMasterServices().isReplicationPeerModificationEnabled()) {
170      throw new IOException("Replication peer modification disabled");
171    }
172  }
173}