001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.OptionalLong;
024import java.util.UUID;
025import java.util.concurrent.Executors;
026import java.util.concurrent.ScheduledExecutorService;
027import java.util.concurrent.TimeUnit;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.CellScanner;
032import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.Server;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
037import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
038import org.apache.hadoop.hbase.replication.ReplicationFactory;
039import org.apache.hadoop.hbase.replication.ReplicationPeers;
040import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
041import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
042import org.apache.hadoop.hbase.replication.ReplicationTracker;
043import org.apache.hadoop.hbase.replication.ReplicationUtils;
044import org.apache.hadoop.hbase.replication.SyncReplicationState;
045import org.apache.hadoop.hbase.util.Pair;
046import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
047import org.apache.hadoop.hbase.wal.WALProvider;
048import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.apache.zookeeper.KeeperException;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
055
056import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
057
058/**
059 * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
060 */
061@InterfaceAudience.Private
062public class Replication implements ReplicationSourceService, ReplicationSinkService {
063  private static final Logger LOG =
064      LoggerFactory.getLogger(Replication.class);
065  private boolean isReplicationForBulkLoadDataEnabled;
066  private ReplicationSourceManager replicationManager;
067  private ReplicationQueueStorage queueStorage;
068  private ReplicationPeers replicationPeers;
069  private ReplicationTracker replicationTracker;
070  private Configuration conf;
071  private ReplicationSink replicationSink;
072  private SyncReplicationPeerInfoProvider syncReplicationPeerInfoProvider;
073  // Hosting server
074  private Server server;
075  /** Statistics thread schedule pool */
076  private ScheduledExecutorService scheduleThreadPool;
077  private int statsThreadPeriod;
078  // ReplicationLoad to access replication metrics
079  private ReplicationLoad replicationLoad;
080  private MetricsReplicationGlobalSourceSource globalMetricsSource;
081
082  private PeerProcedureHandler peerProcedureHandler;
083
084  /**
085   * Empty constructor
086   */
087  public Replication() {
088  }
089
090  @Override
091  public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir,
092      WALProvider walProvider) throws IOException {
093    this.server = server;
094    this.conf = this.server.getConfiguration();
095    this.isReplicationForBulkLoadDataEnabled =
096      ReplicationUtils.isReplicationForBulkLoadDataEnabled(this.conf);
097    this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
098      new ThreadFactoryBuilder()
099        .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
100        .setDaemon(true)
101        .build());
102    if (this.isReplicationForBulkLoadDataEnabled) {
103      if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null
104          || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) {
105        throw new IllegalArgumentException(HConstants.REPLICATION_CLUSTER_ID
106            + " cannot be null/empty when " + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY
107            + " is set to true.");
108      }
109    }
110
111    try {
112      this.queueStorage =
113          ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
114      this.replicationPeers =
115          ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf);
116      this.replicationPeers.init();
117      this.replicationTracker =
118          ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.server, this.server);
119    } catch (Exception e) {
120      throw new IOException("Failed replication handler create", e);
121    }
122    UUID clusterId = null;
123    try {
124      clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
125    } catch (KeeperException ke) {
126      throw new IOException("Could not read cluster id", ke);
127    }
128    SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager();
129    this.globalMetricsSource = CompatibilitySingletonFactory
130        .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
131    this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers,
132        replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
133        walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
134        mapping, globalMetricsSource);
135    this.syncReplicationPeerInfoProvider =
136        new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping);
137    PeerActionListener peerActionListener = PeerActionListener.DUMMY;
138    if (walProvider != null) {
139      walProvider
140        .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
141      if (walProvider instanceof SyncReplicationWALProvider) {
142        SyncReplicationWALProvider syncWALProvider = (SyncReplicationWALProvider) walProvider;
143        peerActionListener = syncWALProvider;
144        syncWALProvider.setPeerInfoProvider(syncReplicationPeerInfoProvider);
145        // for sync replication state change, we need to reload the state twice, you can see the
146        // code in PeerProcedureHandlerImpl, so here we need to go over the sync replication peers
147        // to see if any of them are in the middle of the two refreshes, if so, we need to manually
148        // repeat the action we have done in the first refresh, otherwise when the second refresh
149        // comes we will be in trouble, such as NPE.
150        replicationPeers.getAllPeerIds().stream().map(replicationPeers::getPeer)
151            .filter(p -> p.getPeerConfig().isSyncReplication())
152            .filter(p -> p.getNewSyncReplicationState() != SyncReplicationState.NONE)
153            .forEach(p -> syncWALProvider.peerSyncReplicationStateChange(p.getId(),
154              p.getSyncReplicationState(), p.getNewSyncReplicationState(), 0));
155      }
156    }
157    this.statsThreadPeriod =
158        this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
159    LOG.debug("Replication stats-in-log period={} seconds",  this.statsThreadPeriod);
160    this.replicationLoad = new ReplicationLoad();
161
162    this.peerProcedureHandler =
163      new PeerProcedureHandlerImpl(replicationManager, peerActionListener);
164  }
165
166  @Override
167  public PeerProcedureHandler getPeerProcedureHandler() {
168    return peerProcedureHandler;
169  }
170
171  /**
172   * Stops replication service.
173   */
174  @Override
175  public void stopReplicationService() {
176    join();
177  }
178
179  /**
180   * Join with the replication threads
181   */
182  public void join() {
183    this.replicationManager.join();
184    if (this.replicationSink != null) {
185      this.replicationSink.stopReplicationSinkServices();
186    }
187    scheduleThreadPool.shutdown();
188  }
189
190  /**
191   * Carry on the list of log entries down to the sink
192   * @param entries list of entries to replicate
193   * @param cells The data -- the cells -- that <code>entries</code> describes (the entries do not
194   *          contain the Cells we are replicating; they are passed here on the side in this
195   *          CellScanner).
196   * @param replicationClusterId Id which will uniquely identify source cluster FS client
197   *          configurations in the replication configuration directory
198   * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
199   *          directory required for replicating hfiles
200   * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
201   * @throws IOException
202   */
203  @Override
204  public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
205      String replicationClusterId, String sourceBaseNamespaceDirPath,
206      String sourceHFileArchiveDirPath) throws IOException {
207    this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
208      sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
209  }
210
211  /**
212   * If replication is enabled and this cluster is a master,
213   * it starts
214   * @throws IOException
215   */
216  @Override
217  public void startReplicationService() throws IOException {
218    this.replicationManager.init();
219    this.replicationSink = new ReplicationSink(this.conf);
220    this.scheduleThreadPool.scheduleAtFixedRate(
221      new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
222      statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
223    LOG.info("{} started", this.server.toString());
224  }
225
226  /**
227   * Get the replication sources manager
228   * @return the manager if replication is enabled, else returns false
229   */
230  public ReplicationSourceManager getReplicationManager() {
231    return this.replicationManager;
232  }
233
234  void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
235      throws IOException {
236    try {
237      this.replicationManager.addHFileRefs(tableName, family, pairs);
238    } catch (IOException e) {
239      LOG.error("Failed to add hfile references in the replication queue.", e);
240      throw e;
241    }
242  }
243
244  /**
245   * Statistics task. Periodically prints the cache statistics to the log.
246   */
247  private final static class ReplicationStatisticsTask implements Runnable {
248
249    private final ReplicationSink replicationSink;
250    private final ReplicationSourceManager replicationManager;
251
252    public ReplicationStatisticsTask(ReplicationSink replicationSink,
253        ReplicationSourceManager replicationManager) {
254      this.replicationManager = replicationManager;
255      this.replicationSink = replicationSink;
256    }
257
258    @Override
259    public void run() {
260      printStats(this.replicationManager.getStats());
261      printStats(this.replicationSink.getStats());
262    }
263
264    private void printStats(String stats) {
265      if (!stats.isEmpty()) {
266        LOG.info(stats);
267      }
268    }
269  }
270
271  @Override
272  public ReplicationLoad refreshAndGetReplicationLoad() {
273    if (this.replicationLoad == null) {
274      return null;
275    }
276    // always build for latest data
277    buildReplicationLoad();
278    return this.replicationLoad;
279  }
280
281  private void buildReplicationLoad() {
282    List<ReplicationSourceInterface> allSources = new ArrayList<>();
283    allSources.addAll(this.replicationManager.getSources());
284    allSources.addAll(this.replicationManager.getOldSources());
285    // get sink
286    MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
287    this.replicationLoad.buildReplicationLoad(allSources, sinkMetrics);
288  }
289
290  @Override
291  public SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider() {
292    return syncReplicationPeerInfoProvider;
293  }
294
295  @Override
296  public ReplicationPeers getReplicationPeers() {
297    return replicationPeers;
298  }
299}