001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.UUID;
024import java.util.concurrent.TimeUnit;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.ScheduledChore;
031import org.apache.hadoop.hbase.Server;
032import org.apache.hadoop.hbase.Stoppable;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
035import org.apache.hadoop.hbase.replication.ReplicationFactory;
036import org.apache.hadoop.hbase.replication.ReplicationPeers;
037import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
038import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
039import org.apache.hadoop.hbase.replication.ReplicationUtils;
040import org.apache.hadoop.hbase.replication.SyncReplicationState;
041import org.apache.hadoop.hbase.util.Pair;
042import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
043import org.apache.hadoop.hbase.wal.WALFactory;
044import org.apache.hadoop.hbase.wal.WALProvider;
045import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.apache.zookeeper.KeeperException;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
053 */
054@InterfaceAudience.Private
055public class Replication implements ReplicationSourceService {
056  private static final Logger LOG = LoggerFactory.getLogger(Replication.class);
057  private boolean isReplicationForBulkLoadDataEnabled;
058  private ReplicationSourceManager replicationManager;
059  private ReplicationQueueStorage queueStorage;
060  private ReplicationPeers replicationPeers;
061  private Configuration conf;
062  private SyncReplicationPeerInfoProvider syncReplicationPeerInfoProvider;
063  // Hosting server
064  private Server server;
065  private int statsPeriodInSecond;
066  // ReplicationLoad to access replication metrics
067  private ReplicationLoad replicationLoad;
068  private MetricsReplicationGlobalSourceSource globalMetricsSource;
069
070  private PeerProcedureHandler peerProcedureHandler;
071
072  /**
073   * Empty constructor
074   */
075  public Replication() {
076  }
077
078  @Override
079  public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir,
080    WALFactory walFactory) throws IOException {
081    this.server = server;
082    this.conf = this.server.getConfiguration();
083    this.isReplicationForBulkLoadDataEnabled =
084      ReplicationUtils.isReplicationForBulkLoadDataEnabled(this.conf);
085    if (this.isReplicationForBulkLoadDataEnabled) {
086      if (
087        conf.get(HConstants.REPLICATION_CLUSTER_ID) == null
088          || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()
089      ) {
090        throw new IllegalArgumentException(
091          HConstants.REPLICATION_CLUSTER_ID + " cannot be null/empty when "
092            + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY + " is set to true.");
093      }
094    }
095
096    try {
097      this.queueStorage =
098        ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
099      this.replicationPeers =
100        ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf);
101      this.replicationPeers.init();
102    } catch (Exception e) {
103      throw new IOException("Failed replication handler create", e);
104    }
105    UUID clusterId = null;
106    try {
107      clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
108    } catch (KeeperException ke) {
109      throw new IOException("Could not read cluster id", ke);
110    }
111    SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager();
112    this.globalMetricsSource = CompatibilitySingletonFactory
113      .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
114    this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, conf,
115      this.server, fs, logDir, oldLogDir, clusterId, walFactory, mapping, globalMetricsSource);
116    this.syncReplicationPeerInfoProvider =
117      new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping);
118    PeerActionListener peerActionListener = PeerActionListener.DUMMY;
119    // Get the user-space WAL provider
120    WALProvider walProvider = walFactory != null ? walFactory.getWALProvider() : null;
121    if (walProvider != null) {
122      walProvider
123        .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
124      if (walProvider instanceof SyncReplicationWALProvider) {
125        SyncReplicationWALProvider syncWALProvider = (SyncReplicationWALProvider) walProvider;
126        peerActionListener = syncWALProvider;
127        syncWALProvider.setPeerInfoProvider(syncReplicationPeerInfoProvider);
128        // for sync replication state change, we need to reload the state twice, you can see the
129        // code in PeerProcedureHandlerImpl, so here we need to go over the sync replication peers
130        // to see if any of them are in the middle of the two refreshes, if so, we need to manually
131        // repeat the action we have done in the first refresh, otherwise when the second refresh
132        // comes we will be in trouble, such as NPE.
133        replicationPeers.getAllPeerIds().stream().map(replicationPeers::getPeer)
134          .filter(p -> p.getPeerConfig().isSyncReplication())
135          .filter(p -> p.getNewSyncReplicationState() != SyncReplicationState.NONE)
136          .forEach(p -> syncWALProvider.peerSyncReplicationStateChange(p.getId(),
137            p.getSyncReplicationState(), p.getNewSyncReplicationState(), 0));
138      }
139    }
140    this.statsPeriodInSecond = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
141    this.replicationLoad = new ReplicationLoad();
142
143    this.peerProcedureHandler =
144      new PeerProcedureHandlerImpl(replicationManager, peerActionListener);
145  }
146
147  @Override
148  public PeerProcedureHandler getPeerProcedureHandler() {
149    return peerProcedureHandler;
150  }
151
152  /**
153   * Stops replication service.
154   */
155  @Override
156  public void stopReplicationService() {
157    this.replicationManager.join();
158  }
159
160  /**
161   * If replication is enabled and this cluster is a master, it starts
162   */
163  @Override
164  public void startReplicationService() throws IOException {
165    this.replicationManager.init();
166    this.server.getChoreService().scheduleChore(new ReplicationStatisticsChore(
167      "ReplicationSourceStatistics", server, (int) TimeUnit.SECONDS.toMillis(statsPeriodInSecond)));
168    LOG.info("{} started", this.server.toString());
169  }
170
171  /**
172   * Get the replication sources manager
173   * @return the manager if replication is enabled, else returns false
174   */
175  public ReplicationSourceManager getReplicationManager() {
176    return this.replicationManager;
177  }
178
179  void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
180    throws IOException {
181    try {
182      this.replicationManager.addHFileRefs(tableName, family, pairs);
183    } catch (IOException e) {
184      LOG.error("Failed to add hfile references in the replication queue.", e);
185      throw e;
186    }
187  }
188
189  /**
190   * Statistics task. Periodically prints the cache statistics to the log.
191   */
192  private final class ReplicationStatisticsChore extends ScheduledChore {
193
194    ReplicationStatisticsChore(String name, Stoppable stopper, int period) {
195      super(name, stopper, period);
196    }
197
198    @Override
199    protected void chore() {
200      printStats(replicationManager.getStats());
201    }
202
203    private void printStats(String stats) {
204      if (!stats.isEmpty()) {
205        LOG.info(stats);
206      }
207    }
208  }
209
210  @Override
211  public ReplicationLoad refreshAndGetReplicationLoad() {
212    if (this.replicationLoad == null) {
213      return null;
214    }
215    // always build for latest data
216    List<ReplicationSourceInterface> allSources = new ArrayList<>();
217    allSources.addAll(this.replicationManager.getSources());
218    allSources.addAll(this.replicationManager.getOldSources());
219    this.replicationLoad.buildReplicationLoad(allSources, null);
220    return this.replicationLoad;
221  }
222
223  @Override
224  public SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider() {
225    return syncReplicationPeerInfoProvider;
226  }
227
228  @Override
229  public ReplicationPeers getReplicationPeers() {
230    return replicationPeers;
231  }
232}