001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.List;
023import java.util.concurrent.TimeUnit;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.CellScanner;
029import org.apache.hadoop.hbase.ScheduledChore;
030import org.apache.hadoop.hbase.Server;
031import org.apache.hadoop.hbase.Stoppable;
032import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
033import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
034import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink;
035import org.apache.hadoop.hbase.wal.WALFactory;
036import org.apache.hadoop.hbase.wal.WALProvider;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
042
043@InterfaceAudience.Private
044public class ReplicationSinkServiceImpl implements ReplicationSinkService {
045  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSinkServiceImpl.class);
046
047  private Configuration conf;
048
049  private Server server;
050
051  private ReplicationSink replicationSink;
052
053  // ReplicationLoad to access replication metrics
054  private ReplicationLoad replicationLoad;
055
056  private int statsPeriodInSecond;
057
058  @Override
059  public void replicateLogEntries(List<AdminProtos.WALEntry> entries, CellScanner cells,
060    String replicationClusterId, String sourceBaseNamespaceDirPath,
061    String sourceHFileArchiveDirPath) throws IOException {
062    this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
063      sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
064  }
065
066  @Override
067  public void initialize(Server server, FileSystem fs, Path logdir, Path oldLogDir,
068    WALFactory walFactory) throws IOException {
069    this.server = server;
070    this.conf = server.getConfiguration();
071    this.statsPeriodInSecond =
072      this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
073    this.replicationLoad = new ReplicationLoad();
074  }
075
076  @Override
077  public void startReplicationService() throws IOException {
078    this.replicationSink = new ReplicationSink(this.conf);
079    this.server.getChoreService().scheduleChore(
080        new ReplicationStatisticsChore("ReplicationSinkStatistics", server,
081            (int) TimeUnit.SECONDS.toMillis(statsPeriodInSecond)));
082  }
083
084  @Override
085  public void stopReplicationService() {
086    if (this.replicationSink != null) {
087      this.replicationSink.stopReplicationSinkServices();
088    }
089  }
090
091  @Override
092  public ReplicationLoad refreshAndGetReplicationLoad() {
093    if (replicationLoad == null) {
094      return null;
095    }
096    // always build for latest data
097    replicationLoad.buildReplicationLoad(Collections.emptyList(), replicationSink.getSinkMetrics());
098    return replicationLoad;
099  }
100
101  private final class ReplicationStatisticsChore extends ScheduledChore {
102
103    ReplicationStatisticsChore(String name, Stoppable stopper, int period) {
104      super(name, stopper, period);
105    }
106
107    @Override
108    protected void chore() {
109      printStats(replicationSink.getStats());
110    }
111
112    private void printStats(String stats) {
113      if (!stats.isEmpty()) {
114        LOG.info(stats);
115      }
116    }
117  }
118}