001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.util.Date;
022import java.util.HashMap;
023import java.util.List;
024import java.util.ArrayList;
025import java.util.Map;
026
027import org.apache.yetus.audience.InterfaceAudience;
028import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
030import org.apache.hadoop.hbase.util.Strings;
031
032/**
033 * This class is used for exporting some of the info from replication metrics
034 */
035@InterfaceAudience.Private
036public class ReplicationLoad {
037
038  // Empty load instance.
039  public static final ReplicationLoad EMPTY_REPLICATIONLOAD = new ReplicationLoad();
040
041  private List<MetricsSource> sourceMetricsList;
042  private MetricsSink sinkMetrics;
043
044  private List<ClusterStatusProtos.ReplicationLoadSource> replicationLoadSourceList;
045  private ClusterStatusProtos.ReplicationLoadSink replicationLoadSink;
046
047  /** default constructor */
048  public ReplicationLoad() {
049    super();
050  }
051
052  /**
053   * buildReplicationLoad
054   * @param srMetricsList
055   * @param skMetrics
056   */
057
058  public void buildReplicationLoad(final List<MetricsSource> srMetricsList,
059      final MetricsSink skMetrics) {
060    this.sourceMetricsList = srMetricsList;
061    this.sinkMetrics = skMetrics;
062
063    // build the SinkLoad
064    ClusterStatusProtos.ReplicationLoadSink.Builder rLoadSinkBuild =
065        ClusterStatusProtos.ReplicationLoadSink.newBuilder();
066    rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp());
067    rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp());
068    this.replicationLoadSink = rLoadSinkBuild.build();
069
070    // build the SourceLoad List
071    Map<String, ClusterStatusProtos.ReplicationLoadSource> replicationLoadSourceMap =
072        new HashMap<>();
073    for (MetricsSource sm : this.sourceMetricsList) {
074      // Get the actual peer id
075      String peerId = sm.getPeerID();
076      String[] parts = peerId.split("-", 2);
077      peerId = parts.length != 1 ? parts[0] : peerId;
078
079      long ageOfLastShippedOp = sm.getAgeOfLastShippedOp();
080      int sizeOfLogQueue = sm.getSizeOfLogQueue();
081      long timeStampOfLastShippedOp = sm.getTimestampOfLastShippedOp();
082      long replicationLag =
083          calculateReplicationDelay(ageOfLastShippedOp, timeStampOfLastShippedOp, sizeOfLogQueue);
084
085      ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId);
086      if (rLoadSource != null) {
087        ageOfLastShippedOp = Math.max(rLoadSource.getAgeOfLastShippedOp(), ageOfLastShippedOp);
088        sizeOfLogQueue += rLoadSource.getSizeOfLogQueue();
089        timeStampOfLastShippedOp = Math.min(rLoadSource.getTimeStampOfLastShippedOp(),
090          timeStampOfLastShippedOp);
091        replicationLag = Math.max(rLoadSource.getReplicationLag(), replicationLag);
092      }
093      ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild =
094          ClusterStatusProtos.ReplicationLoadSource.newBuilder();
095      rLoadSourceBuild.setPeerID(peerId);
096      rLoadSourceBuild.setAgeOfLastShippedOp(ageOfLastShippedOp);
097      rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue);
098      rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp);
099      rLoadSourceBuild.setReplicationLag(replicationLag);
100
101      replicationLoadSourceMap.put(peerId, rLoadSourceBuild.build());
102    }
103    this.replicationLoadSourceList = new ArrayList<>(replicationLoadSourceMap.values());
104  }
105
106  static long calculateReplicationDelay(long ageOfLastShippedOp,
107      long timeStampOfLastShippedOp, int sizeOfLogQueue) {
108    long replicationLag;
109    long timePassedAfterLastShippedOp;
110    if (timeStampOfLastShippedOp == 0) { //replication not start yet, set to Long.MAX_VALUE
111      return Long.MAX_VALUE;
112    } else {
113      timePassedAfterLastShippedOp =
114          EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp;
115    }
116    if (sizeOfLogQueue > 1) {
117      // err on the large side
118      replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp);
119    } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) {
120      replicationLag = ageOfLastShippedOp; // last shipped happen recently
121    } else {
122      // last shipped may happen last night,
123      // so NO real lag although ageOfLastShippedOp is non-zero
124      replicationLag = 0;
125    }
126    return replicationLag;
127  }
128
129  /**
130   * sourceToString
131   * @return a string contains sourceReplicationLoad information
132   */
133  public String sourceToString() {
134    if (this.sourceMetricsList == null) return null;
135
136    StringBuilder sb = new StringBuilder();
137
138    for (ClusterStatusProtos.ReplicationLoadSource rls : this.replicationLoadSourceList) {
139
140      sb = Strings.appendKeyValue(sb, "\n           PeerID", rls.getPeerID());
141      sb = Strings.appendKeyValue(sb, "AgeOfLastShippedOp", rls.getAgeOfLastShippedOp());
142      sb = Strings.appendKeyValue(sb, "SizeOfLogQueue", rls.getSizeOfLogQueue());
143      sb =
144          Strings.appendKeyValue(sb, "TimestampsOfLastShippedOp",
145            (new Date(rls.getTimeStampOfLastShippedOp()).toString()));
146      sb = Strings.appendKeyValue(sb, "Replication Lag", rls.getReplicationLag());
147    }
148
149    return sb.toString();
150  }
151
152  /**
153   * sinkToString
154   * @return a string contains sinkReplicationLoad information
155   */
156  public String sinkToString() {
157    if (this.replicationLoadSink == null) return null;
158
159    StringBuilder sb = new StringBuilder();
160    sb =
161        Strings.appendKeyValue(sb, "AgeOfLastAppliedOp",
162          this.replicationLoadSink.getAgeOfLastAppliedOp());
163    sb =
164        Strings.appendKeyValue(sb, "TimestampsOfLastAppliedOp",
165          (new Date(this.replicationLoadSink.getTimeStampsOfLastAppliedOp()).toString()));
166
167    return sb.toString();
168  }
169
170  public ClusterStatusProtos.ReplicationLoadSink getReplicationLoadSink() {
171    return this.replicationLoadSink;
172  }
173
174  public List<ClusterStatusProtos.ReplicationLoadSource> getReplicationLoadSourceList() {
175    return this.replicationLoadSourceList;
176  }
177
178  /**
179   * @see java.lang.Object#toString()
180   */
181  @Override
182  public String toString() {
183    return this.sourceToString() + System.getProperty("line.separator") + this.sinkToString();
184  }
185
186}