001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.util.Date;
022import java.util.List;
023import java.util.ArrayList;
024
025import org.apache.hadoop.hbase.util.Strings;
026
027import org.apache.yetus.audience.InterfaceAudience;
028
029import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
030
031/**
032 * This class is used for exporting some of the info from replication metrics
033 */
034@InterfaceAudience.Private
035public class ReplicationLoad {
036
037  // Empty load instance.
038  public static final ReplicationLoad EMPTY_REPLICATIONLOAD = new ReplicationLoad();
039  private MetricsSink sinkMetrics;
040
041  private List<ClusterStatusProtos.ReplicationLoadSource> replicationLoadSourceEntries;
042  private ClusterStatusProtos.ReplicationLoadSink replicationLoadSink;
043
044  /** default constructor */
045  public ReplicationLoad() {
046    super();
047  }
048
049  /**
050   * buildReplicationLoad
051   * @param sources List of ReplicationSource instances for which metrics should be reported
052   * @param skMetrics
053   */
054
055  public void buildReplicationLoad(final List<ReplicationSourceInterface> sources,
056      final MetricsSink skMetrics) {
057    this.sinkMetrics = skMetrics;
058
059    // build the SinkLoad
060    ClusterStatusProtos.ReplicationLoadSink.Builder rLoadSinkBuild =
061        ClusterStatusProtos.ReplicationLoadSink.newBuilder();
062    rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp());
063    rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp());
064    rLoadSinkBuild.setTimestampStarted(sinkMetrics.getStartTimestamp());
065    rLoadSinkBuild.setTotalOpsProcessed(sinkMetrics.getAppliedOps());
066    this.replicationLoadSink = rLoadSinkBuild.build();
067
068    this.replicationLoadSourceEntries = new ArrayList<>();
069    for (ReplicationSourceInterface source : sources) {
070      MetricsSource sm = source.getSourceMetrics();
071      // Get the actual peer id
072      String peerId = sm.getPeerID();
073      String[] parts = peerId.split("-", 2);
074      peerId = parts.length != 1 ? parts[0] : peerId;
075
076      long ageOfLastShippedOp = sm.getAgeOfLastShippedOp();
077      int sizeOfLogQueue = sm.getSizeOfLogQueue();
078      long editsRead = sm.getReplicableEdits();
079      long oPsShipped = sm.getOpsShipped();
080      long timeStampOfLastShippedOp = sm.getTimestampOfLastShippedOp();
081      long timeStampOfNextToReplicate = sm.getTimeStampNextToReplicate();
082      long replicationLag = sm.getReplicationDelay();
083      ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild =
084          ClusterStatusProtos.ReplicationLoadSource.newBuilder();
085      rLoadSourceBuild.setPeerID(peerId);
086      rLoadSourceBuild.setAgeOfLastShippedOp(ageOfLastShippedOp);
087      rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue);
088      rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp);
089      rLoadSourceBuild.setReplicationLag(replicationLag);
090      rLoadSourceBuild.setTimeStampOfNextToReplicate(timeStampOfNextToReplicate);
091      rLoadSourceBuild.setEditsRead(editsRead);
092      rLoadSourceBuild.setOPsShipped(oPsShipped);
093      if (source instanceof ReplicationSource){
094        ReplicationSource replSource = (ReplicationSource)source;
095        rLoadSourceBuild.setRecovered(replSource.getReplicationQueueInfo().isQueueRecovered());
096        rLoadSourceBuild.setQueueId(replSource.getReplicationQueueInfo().getQueueId());
097        rLoadSourceBuild.setRunning(replSource.isWorkerRunning());
098        rLoadSourceBuild.setEditsSinceRestart(timeStampOfNextToReplicate>0);
099      }
100
101      this.replicationLoadSourceEntries.add(rLoadSourceBuild.build());
102    }
103  }
104
105  /**
106   * sourceToString
107   * @return a string contains sourceReplicationLoad information
108   */
109  public String sourceToString() {
110    StringBuilder sb = new StringBuilder();
111
112    for (ClusterStatusProtos.ReplicationLoadSource rls :
113        this.replicationLoadSourceEntries) {
114
115      sb = Strings.appendKeyValue(sb, "\n           PeerID", rls.getPeerID());
116      sb = Strings.appendKeyValue(sb, "AgeOfLastShippedOp", rls.getAgeOfLastShippedOp());
117      sb = Strings.appendKeyValue(sb, "SizeOfLogQueue", rls.getSizeOfLogQueue());
118      sb =
119          Strings.appendKeyValue(sb, "TimestampsOfLastShippedOp",
120              (new Date(rls.getTimeStampOfLastShippedOp()).toString()));
121      sb = Strings.appendKeyValue(sb, "Replication Lag", rls.getReplicationLag());
122    }
123
124    return sb.toString();
125  }
126
127  /**
128   * sinkToString
129   * @return a string contains sinkReplicationLoad information
130   */
131  public String sinkToString() {
132    if (this.replicationLoadSink == null) return null;
133
134    StringBuilder sb = new StringBuilder();
135    sb =
136        Strings.appendKeyValue(sb, "AgeOfLastAppliedOp",
137          this.replicationLoadSink.getAgeOfLastAppliedOp());
138    sb =
139        Strings.appendKeyValue(sb, "TimestampsOfLastAppliedOp",
140          (new Date(this.replicationLoadSink.getTimeStampsOfLastAppliedOp()).toString()));
141
142    return sb.toString();
143  }
144
145  public ClusterStatusProtos.ReplicationLoadSink getReplicationLoadSink() {
146    return this.replicationLoadSink;
147  }
148
149  public List<ClusterStatusProtos.ReplicationLoadSource> getReplicationLoadSourceEntries() {
150    return this.replicationLoadSourceEntries;
151  }
152
153  /**
154   * @see java.lang.Object#toString()
155   */
156  @Override
157  public String toString() {
158    return this.sourceToString() + System.getProperty("line.separator") + this.sinkToString();
159  }
160
161}