001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.util.ArrayList;
021import java.util.Date;
022import java.util.List;
023import org.apache.hadoop.hbase.util.Strings;
024import org.apache.yetus.audience.InterfaceAudience;
025
026import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
027
028/**
029 * This class is used for exporting some of the info from replication metrics
030 */
031@InterfaceAudience.Private
032public class ReplicationLoad {
033
034  // Empty load instance.
035  public static final ReplicationLoad EMPTY_REPLICATIONLOAD = new ReplicationLoad();
036
037  private List<ClusterStatusProtos.ReplicationLoadSource> replicationLoadSourceEntries;
038  private ClusterStatusProtos.ReplicationLoadSink replicationLoadSink;
039
040  /** default constructor */
041  public ReplicationLoad() {
042    super();
043  }
044
045  /**
046   * buildReplicationLoad
047   * @param sources     List of ReplicationSource instances for which metrics should be reported
048   * @param sinkMetrics metrics of the replication sink
049   */
050
051  public void buildReplicationLoad(final List<ReplicationSourceInterface> sources,
052    final MetricsSink sinkMetrics) {
053
054    if (sinkMetrics != null) {
055      // build the SinkLoad
056      ClusterStatusProtos.ReplicationLoadSink.Builder rLoadSinkBuild =
057        ClusterStatusProtos.ReplicationLoadSink.newBuilder();
058      rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp());
059      rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp());
060      rLoadSinkBuild.setTimestampStarted(sinkMetrics.getStartTimestamp());
061      rLoadSinkBuild.setTotalOpsProcessed(sinkMetrics.getAppliedOps());
062      this.replicationLoadSink = rLoadSinkBuild.build();
063    }
064
065    this.replicationLoadSourceEntries = new ArrayList<>();
066    for (ReplicationSourceInterface source : sources) {
067      MetricsSource sm = source.getSourceMetrics();
068      // Get the actual peer id
069      String peerId = sm.getPeerID();
070      String[] parts = peerId.split("-", 2);
071      peerId = parts.length != 1 ? parts[0] : peerId;
072
073      long ageOfLastShippedOp = sm.getAgeOfLastShippedOp();
074      int sizeOfLogQueue = sm.getSizeOfLogQueue();
075      long editsRead = sm.getReplicableEdits();
076      long oPsShipped = sm.getOpsShipped();
077      long timeStampOfLastShippedOp = sm.getTimestampOfLastShippedOp();
078      long timeStampOfNextToReplicate = sm.getTimeStampNextToReplicate();
079      long replicationLag = sm.getReplicationDelay();
080      ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild =
081        ClusterStatusProtos.ReplicationLoadSource.newBuilder();
082      rLoadSourceBuild.setPeerID(peerId);
083      rLoadSourceBuild.setAgeOfLastShippedOp(ageOfLastShippedOp);
084      rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue);
085      rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp);
086      rLoadSourceBuild.setReplicationLag(replicationLag);
087      rLoadSourceBuild.setTimeStampOfNextToReplicate(timeStampOfNextToReplicate);
088      rLoadSourceBuild.setEditsRead(editsRead);
089      rLoadSourceBuild.setOPsShipped(oPsShipped);
090      if (source instanceof ReplicationSource) {
091        ReplicationSource replSource = (ReplicationSource) source;
092        rLoadSourceBuild.setRecovered(replSource.getQueueId().isRecovered());
093        rLoadSourceBuild.setQueueId(replSource.getQueueId().toString());
094        rLoadSourceBuild.setRunning(replSource.isWorkerRunning());
095        rLoadSourceBuild.setEditsSinceRestart(timeStampOfNextToReplicate > 0);
096      }
097
098      this.replicationLoadSourceEntries.add(rLoadSourceBuild.build());
099    }
100  }
101
102  /**
103   * sourceToString
104   * @return a string contains sourceReplicationLoad information
105   */
106  public String sourceToString() {
107    StringBuilder sb = new StringBuilder();
108
109    for (ClusterStatusProtos.ReplicationLoadSource rls : this.replicationLoadSourceEntries) {
110
111      sb = Strings.appendKeyValue(sb, "\n           PeerID", rls.getPeerID());
112      sb = Strings.appendKeyValue(sb, "AgeOfLastShippedOp", rls.getAgeOfLastShippedOp());
113      sb = Strings.appendKeyValue(sb, "SizeOfLogQueue", rls.getSizeOfLogQueue());
114      sb = Strings.appendKeyValue(sb, "TimestampsOfLastShippedOp",
115        (new Date(rls.getTimeStampOfLastShippedOp()).toString()));
116      sb = Strings.appendKeyValue(sb, "Replication Lag", rls.getReplicationLag());
117    }
118
119    return sb.toString();
120  }
121
122  /**
123   * sinkToString
124   * @return a string contains sinkReplicationLoad information
125   */
126  public String sinkToString() {
127    if (this.replicationLoadSink == null) return null;
128
129    StringBuilder sb = new StringBuilder();
130    sb = Strings.appendKeyValue(sb, "AgeOfLastAppliedOp",
131      this.replicationLoadSink.getAgeOfLastAppliedOp());
132    sb = Strings.appendKeyValue(sb, "TimestampsOfLastAppliedOp",
133      (new Date(this.replicationLoadSink.getTimeStampsOfLastAppliedOp()).toString()));
134
135    return sb.toString();
136  }
137
138  public ClusterStatusProtos.ReplicationLoadSink getReplicationLoadSink() {
139    return this.replicationLoadSink;
140  }
141
142  public List<ClusterStatusProtos.ReplicationLoadSource> getReplicationLoadSourceEntries() {
143    return this.replicationLoadSourceEntries;
144  }
145
146  /**
147   * @see java.lang.Object#toString()
148   */
149  @Override
150  public String toString() {
151    return this.sourceToString() + System.getProperty("line.separator") + this.sinkToString();
152  }
153}