001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.List;
023import java.util.concurrent.TimeUnit;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.CellScanner;
028import org.apache.hadoop.hbase.ScheduledChore;
029import org.apache.hadoop.hbase.Server;
030import org.apache.hadoop.hbase.Stoppable;
031import org.apache.hadoop.hbase.regionserver.HRegionServer;
032import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
033import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
034import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
035import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink;
036import org.apache.hadoop.hbase.wal.WALFactory;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
042
043@InterfaceAudience.Private
044public class ReplicationSinkServiceImpl implements ReplicationSinkService {
045  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSinkServiceImpl.class);
046
047  private Configuration conf;
048
049  private Server server;
050
051  private ReplicationSink replicationSink;
052
053  // ReplicationLoad to access replication metrics
054  private ReplicationLoad replicationLoad;
055
056  private int statsPeriodInSecond;
057
058  @Override
059  public void replicateLogEntries(List<AdminProtos.WALEntry> entries, CellScanner cells,
060    String replicationClusterId, String sourceBaseNamespaceDirPath,
061    String sourceHFileArchiveDirPath) throws IOException {
062    this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
063      sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
064  }
065
066  @Override
067  public void initialize(Server server, FileSystem fs, Path logdir, Path oldLogDir,
068    WALFactory walFactory) throws IOException {
069    this.server = server;
070    this.conf = server.getConfiguration();
071    this.statsPeriodInSecond = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
072    this.replicationLoad = new ReplicationLoad();
073  }
074
075  @Override
076  public void startReplicationService() throws IOException {
077    RegionServerCoprocessorHost rsServerHost = null;
078    if (server instanceof HRegionServer) {
079      rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
080    }
081    this.replicationSink = new ReplicationSink(this.conf, rsServerHost);
082    this.server.getChoreService().scheduleChore(new ReplicationStatisticsChore(
083      "ReplicationSinkStatistics", server, (int) TimeUnit.SECONDS.toMillis(statsPeriodInSecond)));
084  }
085
086  @Override
087  public void stopReplicationService() {
088    if (this.replicationSink != null) {
089      this.replicationSink.stopReplicationSinkServices();
090    }
091  }
092
093  @Override
094  public ReplicationLoad refreshAndGetReplicationLoad() {
095    if (replicationLoad == null) {
096      return null;
097    }
098    // always build for latest data
099    replicationLoad.buildReplicationLoad(Collections.emptyList(), replicationSink.getSinkMetrics());
100    return replicationLoad;
101  }
102
103  private final class ReplicationStatisticsChore extends ScheduledChore {
104
105    ReplicationStatisticsChore(String name, Stoppable stopper, int period) {
106      super(name, stopper, period);
107    }
108
109    @Override
110    protected void chore() {
111      printStats(replicationSink.getStats());
112    }
113
114    private void printStats(String stats) {
115      if (!stats.isEmpty()) {
116        LOG.info(stats);
117      }
118    }
119  }
120}