001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.UUID;
024import java.util.concurrent.TimeUnit;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.ScheduledChore;
032import org.apache.hadoop.hbase.Server;
033import org.apache.hadoop.hbase.Stoppable;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
036import org.apache.hadoop.hbase.replication.ReplicationFactory;
037import org.apache.hadoop.hbase.replication.ReplicationPeers;
038import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
039import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
040import org.apache.hadoop.hbase.replication.ReplicationTracker;
041import org.apache.hadoop.hbase.replication.ReplicationUtils;
042import org.apache.hadoop.hbase.replication.SyncReplicationState;
043import org.apache.hadoop.hbase.util.Pair;
044import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
045import org.apache.hadoop.hbase.wal.WALFactory;
046import org.apache.hadoop.hbase.wal.WALProvider;
047import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.apache.zookeeper.KeeperException;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
055 */
056@InterfaceAudience.Private
057public class Replication implements ReplicationSourceService {
058  private static final Logger LOG =
059      LoggerFactory.getLogger(Replication.class);
060  private boolean isReplicationForBulkLoadDataEnabled;
061  private ReplicationSourceManager replicationManager;
062  private ReplicationQueueStorage queueStorage;
063  private ReplicationPeers replicationPeers;
064  private ReplicationTracker replicationTracker;
065  private Configuration conf;
066  private SyncReplicationPeerInfoProvider syncReplicationPeerInfoProvider;
067  // Hosting server
068  private Server server;
069  private int statsPeriodInSecond;
070  // ReplicationLoad to access replication metrics
071  private ReplicationLoad replicationLoad;
072  private MetricsReplicationGlobalSourceSource globalMetricsSource;
073
074  private PeerProcedureHandler peerProcedureHandler;
075
076  /**
077   * Empty constructor
078   */
079  public Replication() {
080  }
081
082  @Override
083  public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir,
084      WALFactory walFactory) throws IOException {
085    this.server = server;
086    this.conf = this.server.getConfiguration();
087    this.isReplicationForBulkLoadDataEnabled =
088      ReplicationUtils.isReplicationForBulkLoadDataEnabled(this.conf);
089    if (this.isReplicationForBulkLoadDataEnabled) {
090      if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null
091          || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) {
092        throw new IllegalArgumentException(HConstants.REPLICATION_CLUSTER_ID
093            + " cannot be null/empty when " + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY
094            + " is set to true.");
095      }
096    }
097
098    try {
099      this.queueStorage =
100          ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
101      this.replicationPeers =
102          ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf);
103      this.replicationPeers.init();
104      this.replicationTracker =
105          ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.server, this.server);
106    } catch (Exception e) {
107      throw new IOException("Failed replication handler create", e);
108    }
109    UUID clusterId = null;
110    try {
111      clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
112    } catch (KeeperException ke) {
113      throw new IOException("Could not read cluster id", ke);
114    }
115    SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager();
116    this.globalMetricsSource = CompatibilitySingletonFactory
117        .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
118    this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers,
119        replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walFactory,
120        mapping, globalMetricsSource);
121    this.syncReplicationPeerInfoProvider =
122        new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping);
123    PeerActionListener peerActionListener = PeerActionListener.DUMMY;
124    // Get the user-space WAL provider
125    WALProvider walProvider = walFactory != null? walFactory.getWALProvider(): null;
126    if (walProvider != null) {
127      walProvider
128        .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
129      if (walProvider instanceof SyncReplicationWALProvider) {
130        SyncReplicationWALProvider syncWALProvider = (SyncReplicationWALProvider) walProvider;
131        peerActionListener = syncWALProvider;
132        syncWALProvider.setPeerInfoProvider(syncReplicationPeerInfoProvider);
133        // for sync replication state change, we need to reload the state twice, you can see the
134        // code in PeerProcedureHandlerImpl, so here we need to go over the sync replication peers
135        // to see if any of them are in the middle of the two refreshes, if so, we need to manually
136        // repeat the action we have done in the first refresh, otherwise when the second refresh
137        // comes we will be in trouble, such as NPE.
138        replicationPeers.getAllPeerIds().stream().map(replicationPeers::getPeer)
139            .filter(p -> p.getPeerConfig().isSyncReplication())
140            .filter(p -> p.getNewSyncReplicationState() != SyncReplicationState.NONE)
141            .forEach(p -> syncWALProvider.peerSyncReplicationStateChange(p.getId(),
142              p.getSyncReplicationState(), p.getNewSyncReplicationState(), 0));
143      }
144    }
145    this.statsPeriodInSecond =
146        this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
147    this.replicationLoad = new ReplicationLoad();
148
149    this.peerProcedureHandler =
150      new PeerProcedureHandlerImpl(replicationManager, peerActionListener);
151  }
152
153  @Override
154  public PeerProcedureHandler getPeerProcedureHandler() {
155    return peerProcedureHandler;
156  }
157
158  /**
159   * Stops replication service.
160   */
161  @Override
162  public void stopReplicationService() {
163    this.replicationManager.join();
164  }
165
166  /**
167   * If replication is enabled and this cluster is a master,
168   * it starts
169   */
170  @Override
171  public void startReplicationService() throws IOException {
172    this.replicationManager.init();
173    this.server.getChoreService().scheduleChore(
174      new ReplicationStatisticsChore("ReplicationSourceStatistics", server,
175          (int) TimeUnit.SECONDS.toMillis(statsPeriodInSecond)));
176    LOG.info("{} started", this.server.toString());
177  }
178
179  /**
180   * Get the replication sources manager
181   * @return the manager if replication is enabled, else returns false
182   */
183  public ReplicationSourceManager getReplicationManager() {
184    return this.replicationManager;
185  }
186
187  void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
188      throws IOException {
189    try {
190      this.replicationManager.addHFileRefs(tableName, family, pairs);
191    } catch (IOException e) {
192      LOG.error("Failed to add hfile references in the replication queue.", e);
193      throw e;
194    }
195  }
196
197  /**
198   * Statistics task. Periodically prints the cache statistics to the log.
199   */
200  private final class ReplicationStatisticsChore extends ScheduledChore {
201
202    ReplicationStatisticsChore(String name, Stoppable stopper, int period) {
203      super(name, stopper, period);
204    }
205
206    @Override
207    protected void chore() {
208      printStats(replicationManager.getStats());
209    }
210
211    private void printStats(String stats) {
212      if (!stats.isEmpty()) {
213        LOG.info(stats);
214      }
215    }
216  }
217
218  @Override
219  public ReplicationLoad refreshAndGetReplicationLoad() {
220    if (this.replicationLoad == null) {
221      return null;
222    }
223    // always build for latest data
224    List<ReplicationSourceInterface> allSources = new ArrayList<>();
225    allSources.addAll(this.replicationManager.getSources());
226    allSources.addAll(this.replicationManager.getOldSources());
227    this.replicationLoad.buildReplicationLoad(allSources, null);
228    return this.replicationLoad;
229  }
230
231  @Override
232  public SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider() {
233    return syncReplicationPeerInfoProvider;
234  }
235
236  @Override
237  public ReplicationPeers getReplicationPeers() {
238    return replicationPeers;
239  }
240}