001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.UUID;
024import java.util.concurrent.TimeUnit;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.ScheduledChore;
031import org.apache.hadoop.hbase.Server;
032import org.apache.hadoop.hbase.Stoppable;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.conf.ConfigurationManager;
035import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
036import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
037import org.apache.hadoop.hbase.replication.ReplicationFactory;
038import org.apache.hadoop.hbase.replication.ReplicationPeers;
039import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
040import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
041import org.apache.hadoop.hbase.replication.ReplicationUtils;
042import org.apache.hadoop.hbase.replication.SyncReplicationState;
043import org.apache.hadoop.hbase.util.Pair;
044import org.apache.hadoop.hbase.wal.WALFactory;
045import org.apache.hadoop.hbase.wal.WALProvider;
046import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.apache.zookeeper.KeeperException;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
054 * <p>
055 * Implement {@link PropagatingConfigurationObserver} mainly for registering
056 * {@link ReplicationPeers}, so we can recreating the replication peer storage.
057 */
058@InterfaceAudience.Private
059public class Replication implements ReplicationSourceService, PropagatingConfigurationObserver {
060  private static final Logger LOG = LoggerFactory.getLogger(Replication.class);
061  private boolean isReplicationForBulkLoadDataEnabled;
062  private ReplicationSourceManager replicationManager;
063  private ReplicationQueueStorage queueStorage;
064  private ReplicationPeers replicationPeers;
065  private volatile Configuration conf;
066  private SyncReplicationPeerInfoProvider syncReplicationPeerInfoProvider;
067  // Hosting server
068  private Server server;
069  private int statsPeriodInSecond;
070  // ReplicationLoad to access replication metrics
071  private ReplicationLoad replicationLoad;
072  private MetricsReplicationGlobalSourceSource globalMetricsSource;
073
074  private PeerProcedureHandler peerProcedureHandler;
075
076  /**
077   * Empty constructor
078   */
079  public Replication() {
080  }
081
082  @Override
083  public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir,
084    WALFactory walFactory) throws IOException {
085    this.server = server;
086    this.conf = this.server.getConfiguration();
087    this.isReplicationForBulkLoadDataEnabled =
088      ReplicationUtils.isReplicationForBulkLoadDataEnabled(this.conf);
089    if (this.isReplicationForBulkLoadDataEnabled) {
090      if (
091        conf.get(HConstants.REPLICATION_CLUSTER_ID) == null
092          || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()
093      ) {
094        throw new IllegalArgumentException(
095          HConstants.REPLICATION_CLUSTER_ID + " cannot be null/empty when "
096            + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY + " is set to true.");
097      }
098    }
099
100    try {
101      this.queueStorage =
102        ReplicationStorageFactory.getReplicationQueueStorage(server.getConnection(), conf);
103      this.replicationPeers = ReplicationFactory.getReplicationPeers(server.getFileSystem(),
104        server.getZooKeeper(), this.conf);
105      this.replicationPeers.init();
106    } catch (Exception e) {
107      throw new IOException("Failed replication handler create", e);
108    }
109    UUID clusterId = null;
110    try {
111      clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
112    } catch (KeeperException ke) {
113      throw new IOException("Could not read cluster id", ke);
114    }
115    SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager();
116    this.globalMetricsSource = CompatibilitySingletonFactory
117      .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
118    this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, conf,
119      this.server, fs, logDir, oldLogDir, clusterId, walFactory, mapping, globalMetricsSource);
120    this.statsPeriodInSecond = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
121    this.replicationLoad = new ReplicationLoad();
122
123    this.syncReplicationPeerInfoProvider =
124      new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping);
125    // Get the user-space WAL provider
126    WALProvider walProvider = walFactory != null ? walFactory.getWALProvider() : null;
127    if (walProvider != null) {
128      walProvider
129        .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
130      PeerActionListener peerActionListener = walProvider.getPeerActionListener();
131      walProvider.setSyncReplicationPeerInfoProvider(syncReplicationPeerInfoProvider);
132      // for sync replication state change, we need to reload the state twice, you can see the
133      // code in PeerProcedureHandlerImpl, so here we need to go over the sync replication peers
134      // to see if any of them are in the middle of the two refreshes, if so, we need to manually
135      // repeat the action we have done in the first refresh, otherwise when the second refresh
136      // comes we will be in trouble, such as NPE.
137      replicationPeers.getAllPeerIds().stream().map(replicationPeers::getPeer)
138        .filter(p -> p.getPeerConfig().isSyncReplication())
139        .filter(p -> p.getNewSyncReplicationState() != SyncReplicationState.NONE)
140        .forEach(p -> peerActionListener.peerSyncReplicationStateChange(p.getId(),
141          p.getSyncReplicationState(), p.getNewSyncReplicationState(), 0));
142      this.peerProcedureHandler =
143        new PeerProcedureHandlerImpl(replicationManager, peerActionListener);
144    } else {
145      this.peerProcedureHandler =
146        new PeerProcedureHandlerImpl(replicationManager, PeerActionListener.DUMMY);
147    }
148  }
149
150  @Override
151  public PeerProcedureHandler getPeerProcedureHandler() {
152    return peerProcedureHandler;
153  }
154
155  /**
156   * Stops replication service.
157   */
158  @Override
159  public void stopReplicationService() {
160    this.replicationManager.join();
161  }
162
163  /**
164   * If replication is enabled and this cluster is a master, it starts
165   */
166  @Override
167  public void startReplicationService() throws IOException {
168    this.replicationManager.init();
169    this.server.getChoreService().scheduleChore(new ReplicationStatisticsChore(
170      "ReplicationSourceStatistics", server, (int) TimeUnit.SECONDS.toMillis(statsPeriodInSecond)));
171    LOG.info("{} started", this.server.toString());
172  }
173
174  /**
175   * Get the replication sources manager
176   * @return the manager if replication is enabled, else returns false
177   */
178  public ReplicationSourceManager getReplicationManager() {
179    return this.replicationManager;
180  }
181
182  void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
183    throws IOException {
184    try {
185      this.replicationManager.addHFileRefs(tableName, family, pairs);
186    } catch (IOException e) {
187      LOG.error("Failed to add hfile references in the replication queue.", e);
188      throw e;
189    }
190  }
191
192  /**
193   * Statistics task. Periodically prints the cache statistics to the log.
194   */
195  private final class ReplicationStatisticsChore extends ScheduledChore {
196
197    ReplicationStatisticsChore(String name, Stoppable stopper, int period) {
198      super(name, stopper, period);
199    }
200
201    @Override
202    protected void chore() {
203      printStats(replicationManager.getStats());
204    }
205
206    private void printStats(String stats) {
207      if (!stats.isEmpty()) {
208        LOG.info(stats);
209      }
210    }
211  }
212
213  @Override
214  public ReplicationLoad refreshAndGetReplicationLoad() {
215    if (this.replicationLoad == null) {
216      return null;
217    }
218    // always build for latest data
219    List<ReplicationSourceInterface> allSources = new ArrayList<>();
220    allSources.addAll(this.replicationManager.getSources());
221    allSources.addAll(this.replicationManager.getOldSources());
222    this.replicationLoad.buildReplicationLoad(allSources, null);
223    return this.replicationLoad;
224  }
225
226  @Override
227  public SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider() {
228    return syncReplicationPeerInfoProvider;
229  }
230
231  @Override
232  public ReplicationPeers getReplicationPeers() {
233    return replicationPeers;
234  }
235
236  @Override
237  public void onConfigurationChange(Configuration conf) {
238    this.conf = conf;
239  }
240
241  @Override
242  public void registerChildren(ConfigurationManager manager) {
243    manager.registerObserver(replicationPeers);
244  }
245
246  @Override
247  public void deregisterChildren(ConfigurationManager manager) {
248    manager.deregisterObserver(replicationPeers);
249  }
250}