001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.UUID;
024import java.util.concurrent.Executors;
025import java.util.concurrent.ScheduledExecutorService;
026import java.util.concurrent.TimeUnit;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.CellScanner;
031import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.Server;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
036import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
037import org.apache.hadoop.hbase.replication.ReplicationFactory;
038import org.apache.hadoop.hbase.replication.ReplicationPeers;
039import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
040import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
041import org.apache.hadoop.hbase.replication.ReplicationTracker;
042import org.apache.hadoop.hbase.replication.ReplicationUtils;
043import org.apache.hadoop.hbase.util.Pair;
044import org.apache.hadoop.hbase.wal.WALFactory;
045import org.apache.hadoop.hbase.wal.WALProvider;
046import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.apache.zookeeper.KeeperException;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
053
054import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
055/**
056 * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
057 */
058@InterfaceAudience.Private
059public class Replication implements ReplicationSourceService, ReplicationSinkService {
060  private static final Logger LOG =
061      LoggerFactory.getLogger(Replication.class);
062  private boolean isReplicationForBulkLoadDataEnabled;
063  private ReplicationSourceManager replicationManager;
064  private ReplicationQueueStorage queueStorage;
065  private ReplicationPeers replicationPeers;
066  private ReplicationTracker replicationTracker;
067  private Configuration conf;
068  private ReplicationSink replicationSink;
069  // Hosting server
070  private Server server;
071  /** Statistics thread schedule pool */
072  private ScheduledExecutorService scheduleThreadPool;
073  private int statsThreadPeriod;
074  // ReplicationLoad to access replication metrics
075  private ReplicationLoad replicationLoad;
076  private MetricsReplicationGlobalSourceSource globalMetricsSource;
077
078  private PeerProcedureHandler peerProcedureHandler;
079
080  /**
081   * Empty constructor
082   */
083  public Replication() {
084  }
085
086  @Override
087  public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir,
088      WALFactory walFactory) throws IOException {
089    this.server = server;
090    this.conf = this.server.getConfiguration();
091    this.isReplicationForBulkLoadDataEnabled =
092      ReplicationUtils.isReplicationForBulkLoadDataEnabled(this.conf);
093    this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
094      new ThreadFactoryBuilder()
095        .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
096        .setDaemon(true)
097        .build());
098    if (this.isReplicationForBulkLoadDataEnabled) {
099      if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null
100          || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) {
101        throw new IllegalArgumentException(HConstants.REPLICATION_CLUSTER_ID
102            + " cannot be null/empty when " + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY
103            + " is set to true.");
104      }
105    }
106
107    try {
108      this.queueStorage =
109          ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
110      this.replicationPeers =
111          ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf);
112      this.replicationPeers.init();
113      this.replicationTracker =
114          ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.server, this.server);
115    } catch (Exception e) {
116      throw new IOException("Failed replication handler create", e);
117    }
118    UUID clusterId = null;
119    try {
120      clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
121    } catch (KeeperException ke) {
122      throw new IOException("Could not read cluster id", ke);
123    }
124    this.globalMetricsSource = CompatibilitySingletonFactory
125        .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
126    this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers,
127        replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walFactory,
128      globalMetricsSource);
129    // Get the user-space WAL provider
130    WALProvider walProvider = walFactory != null? walFactory.getWALProvider(): null;
131    if (walProvider != null) {
132      walProvider
133        .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
134    }
135    this.statsThreadPeriod =
136        this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
137    LOG.debug("Replication stats-in-log period={} seconds",  this.statsThreadPeriod);
138    this.replicationLoad = new ReplicationLoad();
139
140    this.peerProcedureHandler = new PeerProcedureHandlerImpl(replicationManager);
141  }
142
143  @Override
144  public PeerProcedureHandler getPeerProcedureHandler() {
145    return peerProcedureHandler;
146  }
147
148  /**
149   * Stops replication service.
150   */
151  @Override
152  public void stopReplicationService() {
153    join();
154  }
155
156  /**
157   * Join with the replication threads
158   */
159  public void join() {
160    this.replicationManager.join();
161    if (this.replicationSink != null) {
162      this.replicationSink.stopReplicationSinkServices();
163    }
164    scheduleThreadPool.shutdown();
165  }
166
167  /**
168   * Carry on the list of log entries down to the sink
169   * @param entries list of entries to replicate
170   * @param cells The data -- the cells -- that <code>entries</code> describes (the entries do not
171   *          contain the Cells we are replicating; they are passed here on the side in this
172   *          CellScanner).
173   * @param replicationClusterId Id which will uniquely identify source cluster FS client
174   *          configurations in the replication configuration directory
175   * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
176   *          directory required for replicating hfiles
177   * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
178   * @throws IOException
179   */
180  @Override
181  public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
182      String replicationClusterId, String sourceBaseNamespaceDirPath,
183      String sourceHFileArchiveDirPath) throws IOException {
184    this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
185      sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
186  }
187
188  /**
189   * If replication is enabled and this cluster is a master,
190   * it starts
191   */
192  @Override
193  public void startReplicationService() throws IOException {
194    this.replicationManager.init();
195    this.replicationSink = new ReplicationSink(this.conf);
196    this.scheduleThreadPool.scheduleAtFixedRate(
197      new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
198      statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
199    LOG.info("{} started", this.server.toString());
200  }
201
202  /**
203   * Get the replication sources manager
204   * @return the manager if replication is enabled, else returns false
205   */
206  public ReplicationSourceManager getReplicationManager() {
207    return this.replicationManager;
208  }
209
210  void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
211      throws IOException {
212    try {
213      this.replicationManager.addHFileRefs(tableName, family, pairs);
214    } catch (IOException e) {
215      LOG.error("Failed to add hfile references in the replication queue.", e);
216      throw e;
217    }
218  }
219
220  /**
221   * Statistics task. Periodically prints the cache statistics to the log.
222   */
223  private final static class ReplicationStatisticsTask implements Runnable {
224
225    private final ReplicationSink replicationSink;
226    private final ReplicationSourceManager replicationManager;
227
228    public ReplicationStatisticsTask(ReplicationSink replicationSink,
229        ReplicationSourceManager replicationManager) {
230      this.replicationManager = replicationManager;
231      this.replicationSink = replicationSink;
232    }
233
234    @Override
235    public void run() {
236      printStats(this.replicationManager.getStats());
237      printStats(this.replicationSink.getStats());
238    }
239
240    private void printStats(String stats) {
241      if (!stats.isEmpty()) {
242        LOG.info(stats);
243      }
244    }
245  }
246
247  @Override
248  public ReplicationLoad refreshAndGetReplicationLoad() {
249    if (this.replicationLoad == null) {
250      return null;
251    }
252    // always build for latest data
253    buildReplicationLoad();
254    return this.replicationLoad;
255  }
256
257  private void buildReplicationLoad() {
258    List<ReplicationSourceInterface> allSources = new ArrayList<>();
259    allSources.addAll(this.replicationManager.getSources());
260    allSources.addAll(this.replicationManager.getOldSources());
261
262    // get sink
263    MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
264    this.replicationLoad.buildReplicationLoad(allSources, sinkMetrics);
265  }
266}