001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.OptionalLong;
025import java.util.UUID;
026import java.util.concurrent.Executors;
027import java.util.concurrent.ScheduledExecutorService;
028import java.util.concurrent.TimeUnit;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.CellScanner;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.Server;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
037import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
038import org.apache.hadoop.hbase.replication.ReplicationException;
039import org.apache.hadoop.hbase.replication.ReplicationFactory;
040import org.apache.hadoop.hbase.replication.ReplicationPeers;
041import org.apache.hadoop.hbase.replication.ReplicationQueues;
042import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
043import org.apache.hadoop.hbase.replication.ReplicationTracker;
044import org.apache.hadoop.hbase.replication.ReplicationUtils;
045import org.apache.hadoop.hbase.util.Pair;
046import org.apache.hadoop.hbase.wal.WALProvider;
047import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.apache.zookeeper.KeeperException;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
054
055import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
056
057/**
058 * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
059 */
060@InterfaceAudience.Private
061public class Replication implements ReplicationSourceService, ReplicationSinkService {
062  private static final Logger LOG =
063      LoggerFactory.getLogger(Replication.class);
064  private boolean isReplicationForBulkLoadDataEnabled;
065  private ReplicationSourceManager replicationManager;
066  private ReplicationQueues replicationQueues;
067  private ReplicationPeers replicationPeers;
068  private ReplicationTracker replicationTracker;
069  private Configuration conf;
070  private ReplicationSink replicationSink;
071  // Hosting server
072  private Server server;
073  /** Statistics thread schedule pool */
074  private ScheduledExecutorService scheduleThreadPool;
075  private int statsThreadPeriod;
076  // ReplicationLoad to access replication metrics
077  private ReplicationLoad replicationLoad;
078
079  /**
080   * Empty constructor
081   */
082  public Replication() {
083  }
084
085  @Override
086  public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir,
087      WALProvider walProvider) throws IOException {
088    this.server = server;
089    this.conf = this.server.getConfiguration();
090    this.isReplicationForBulkLoadDataEnabled =
091      ReplicationUtils.isReplicationForBulkLoadDataEnabled(this.conf);
092    this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
093      new ThreadFactoryBuilder()
094        .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
095        .setDaemon(true)
096        .build());
097    if (this.isReplicationForBulkLoadDataEnabled) {
098      if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null
099          || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) {
100        throw new IllegalArgumentException(HConstants.REPLICATION_CLUSTER_ID
101            + " cannot be null/empty when " + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY
102            + " is set to true.");
103      }
104    }
105
106    try {
107      this.replicationQueues =
108          ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, this.server,
109            server.getZooKeeper()));
110      this.replicationQueues.init(this.server.getServerName().toString());
111      this.replicationPeers =
112          ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
113      this.replicationPeers.init();
114      this.replicationTracker =
115          ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
116            this.conf, this.server, this.server);
117    } catch (Exception e) {
118      throw new IOException("Failed replication handler create", e);
119    }
120    UUID clusterId = null;
121    try {
122      clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
123    } catch (KeeperException ke) {
124      throw new IOException("Could not read cluster id", ke);
125    }
126    this.replicationManager = new ReplicationSourceManager(replicationQueues, replicationPeers,
127        replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
128        walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty());
129    if (walProvider != null) {
130      walProvider
131        .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
132    }
133    this.statsThreadPeriod =
134        this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
135    LOG.debug("Replication stats-in-log period={} seconds",  this.statsThreadPeriod);
136    this.replicationLoad = new ReplicationLoad();
137  }
138
139  /**
140   * Stops replication service.
141   */
142  @Override
143  public void stopReplicationService() {
144    join();
145  }
146
147  /**
148   * Join with the replication threads
149   */
150  public void join() {
151    this.replicationManager.join();
152    if (this.replicationSink != null) {
153      this.replicationSink.stopReplicationSinkServices();
154    }
155    scheduleThreadPool.shutdown();
156  }
157
158  /**
159   * Carry on the list of log entries down to the sink
160   * @param entries list of entries to replicate
161   * @param cells The data -- the cells -- that <code>entries</code> describes (the entries do not
162   *          contain the Cells we are replicating; they are passed here on the side in this
163   *          CellScanner).
164   * @param replicationClusterId Id which will uniquely identify source cluster FS client
165   *          configurations in the replication configuration directory
166   * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
167   *          directory required for replicating hfiles
168   * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
169   * @throws IOException
170   */
171  @Override
172  public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
173      String replicationClusterId, String sourceBaseNamespaceDirPath,
174      String sourceHFileArchiveDirPath) throws IOException {
175    this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
176      sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
177  }
178
179  /**
180   * If replication is enabled and this cluster is a master,
181   * it starts
182   * @throws IOException
183   */
184  @Override
185  public void startReplicationService() throws IOException {
186    try {
187      this.replicationManager.init();
188    } catch (ReplicationException e) {
189      throw new IOException(e);
190    }
191    this.replicationSink = new ReplicationSink(this.conf, this.server);
192    this.scheduleThreadPool.scheduleAtFixedRate(
193      new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
194      statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
195  }
196
197  /**
198   * Get the replication sources manager
199   * @return the manager if replication is enabled, else returns false
200   */
201  public ReplicationSourceManager getReplicationManager() {
202    return this.replicationManager;
203  }
204
205  void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
206      throws IOException {
207    try {
208      this.replicationManager.addHFileRefs(tableName, family, pairs);
209    } catch (ReplicationException e) {
210      LOG.error("Failed to add hfile references in the replication queue.", e);
211      throw new IOException(e);
212    }
213  }
214
215  /**
216   * Statistics task. Periodically prints the cache statistics to the log.
217   */
218  private final static class ReplicationStatisticsTask implements Runnable {
219
220    private final ReplicationSink replicationSink;
221    private final ReplicationSourceManager replicationManager;
222
223    public ReplicationStatisticsTask(ReplicationSink replicationSink,
224        ReplicationSourceManager replicationManager) {
225      this.replicationManager = replicationManager;
226      this.replicationSink = replicationSink;
227    }
228
229    @Override
230    public void run() {
231      printStats(this.replicationManager.getStats());
232      printStats(this.replicationSink.getStats());
233    }
234
235    private void printStats(String stats) {
236      if (!stats.isEmpty()) {
237        LOG.info(stats);
238      }
239    }
240  }
241
242  @Override
243  public ReplicationLoad refreshAndGetReplicationLoad() {
244    if (this.replicationLoad == null) {
245      return null;
246    }
247    // always build for latest data
248    buildReplicationLoad();
249    return this.replicationLoad;
250  }
251
252  private void buildReplicationLoad() {
253    List<MetricsSource> sourceMetricsList = new ArrayList<>();
254
255    // get source
256    List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
257    for (ReplicationSourceInterface source : sources) {
258      sourceMetricsList.add(source.getSourceMetrics());
259    }
260
261    // get old source
262    List<ReplicationSourceInterface> oldSources = this.replicationManager.getOldSources();
263    for (ReplicationSourceInterface source : oldSources) {
264      if (source instanceof ReplicationSource) {
265        sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
266      }
267    }
268
269    // get sink
270    MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
271    this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics);
272  }
273}