001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.util.Date; 022import java.util.HashMap; 023import java.util.List; 024import java.util.ArrayList; 025import java.util.Map; 026 027import org.apache.yetus.audience.InterfaceAudience; 028import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 030import org.apache.hadoop.hbase.util.Strings; 031 032/** 033 * This class is used for exporting some of the info from replication metrics 034 */ 035@InterfaceAudience.Private 036public class ReplicationLoad { 037 038 // Empty load instance. 039 public static final ReplicationLoad EMPTY_REPLICATIONLOAD = new ReplicationLoad(); 040 041 private List<MetricsSource> sourceMetricsList; 042 private MetricsSink sinkMetrics; 043 044 private List<ClusterStatusProtos.ReplicationLoadSource> replicationLoadSourceList; 045 private ClusterStatusProtos.ReplicationLoadSink replicationLoadSink; 046 047 /** default constructor */ 048 public ReplicationLoad() { 049 super(); 050 } 051 052 /** 053 * buildReplicationLoad 054 * @param srMetricsList 055 * @param skMetrics 056 */ 057 058 public void buildReplicationLoad(final List<MetricsSource> srMetricsList, 059 final MetricsSink skMetrics) { 060 this.sourceMetricsList = srMetricsList; 061 this.sinkMetrics = skMetrics; 062 063 // build the SinkLoad 064 ClusterStatusProtos.ReplicationLoadSink.Builder rLoadSinkBuild = 065 ClusterStatusProtos.ReplicationLoadSink.newBuilder(); 066 rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp()); 067 rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp()); 068 this.replicationLoadSink = rLoadSinkBuild.build(); 069 070 // build the SourceLoad List 071 Map<String, ClusterStatusProtos.ReplicationLoadSource> replicationLoadSourceMap = 072 new HashMap<>(); 073 for (MetricsSource sm : this.sourceMetricsList) { 074 // Get the actual peer id 075 String peerId = sm.getPeerID(); 076 String[] parts = peerId.split("-", 2); 077 peerId = parts.length != 1 ? parts[0] : peerId; 078 079 long ageOfLastShippedOp = sm.getAgeOfLastShippedOp(); 080 int sizeOfLogQueue = sm.getSizeOfLogQueue(); 081 long timeStampOfLastShippedOp = sm.getTimestampOfLastShippedOp(); 082 long replicationLag = 083 calculateReplicationDelay(ageOfLastShippedOp, timeStampOfLastShippedOp, sizeOfLogQueue); 084 085 ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId); 086 if (rLoadSource != null) { 087 ageOfLastShippedOp = Math.max(rLoadSource.getAgeOfLastShippedOp(), ageOfLastShippedOp); 088 sizeOfLogQueue += rLoadSource.getSizeOfLogQueue(); 089 timeStampOfLastShippedOp = Math.min(rLoadSource.getTimeStampOfLastShippedOp(), 090 timeStampOfLastShippedOp); 091 replicationLag = Math.max(rLoadSource.getReplicationLag(), replicationLag); 092 } 093 ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild = 094 ClusterStatusProtos.ReplicationLoadSource.newBuilder(); 095 rLoadSourceBuild.setPeerID(peerId); 096 rLoadSourceBuild.setAgeOfLastShippedOp(ageOfLastShippedOp); 097 rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue); 098 rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp); 099 rLoadSourceBuild.setReplicationLag(replicationLag); 100 101 replicationLoadSourceMap.put(peerId, rLoadSourceBuild.build()); 102 } 103 this.replicationLoadSourceList = new ArrayList<>(replicationLoadSourceMap.values()); 104 } 105 106 static long calculateReplicationDelay(long ageOfLastShippedOp, 107 long timeStampOfLastShippedOp, int sizeOfLogQueue) { 108 long replicationLag; 109 long timePassedAfterLastShippedOp; 110 if (timeStampOfLastShippedOp == 0) { //replication not start yet, set to Long.MAX_VALUE 111 return Long.MAX_VALUE; 112 } else { 113 timePassedAfterLastShippedOp = 114 EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp; 115 } 116 if (sizeOfLogQueue > 1) { 117 // err on the large side 118 replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp); 119 } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) { 120 replicationLag = ageOfLastShippedOp; // last shipped happen recently 121 } else { 122 // last shipped may happen last night, 123 // so NO real lag although ageOfLastShippedOp is non-zero 124 replicationLag = 0; 125 } 126 return replicationLag; 127 } 128 129 /** 130 * sourceToString 131 * @return a string contains sourceReplicationLoad information 132 */ 133 public String sourceToString() { 134 if (this.sourceMetricsList == null) return null; 135 136 StringBuilder sb = new StringBuilder(); 137 138 for (ClusterStatusProtos.ReplicationLoadSource rls : this.replicationLoadSourceList) { 139 140 sb = Strings.appendKeyValue(sb, "\n PeerID", rls.getPeerID()); 141 sb = Strings.appendKeyValue(sb, "AgeOfLastShippedOp", rls.getAgeOfLastShippedOp()); 142 sb = Strings.appendKeyValue(sb, "SizeOfLogQueue", rls.getSizeOfLogQueue()); 143 sb = 144 Strings.appendKeyValue(sb, "TimestampsOfLastShippedOp", 145 (new Date(rls.getTimeStampOfLastShippedOp()).toString())); 146 sb = Strings.appendKeyValue(sb, "Replication Lag", rls.getReplicationLag()); 147 } 148 149 return sb.toString(); 150 } 151 152 /** 153 * sinkToString 154 * @return a string contains sinkReplicationLoad information 155 */ 156 public String sinkToString() { 157 if (this.replicationLoadSink == null) return null; 158 159 StringBuilder sb = new StringBuilder(); 160 sb = 161 Strings.appendKeyValue(sb, "AgeOfLastAppliedOp", 162 this.replicationLoadSink.getAgeOfLastAppliedOp()); 163 sb = 164 Strings.appendKeyValue(sb, "TimestampsOfLastAppliedOp", 165 (new Date(this.replicationLoadSink.getTimeStampsOfLastAppliedOp()).toString())); 166 167 return sb.toString(); 168 } 169 170 public ClusterStatusProtos.ReplicationLoadSink getReplicationLoadSink() { 171 return this.replicationLoadSink; 172 } 173 174 public List<ClusterStatusProtos.ReplicationLoadSource> getReplicationLoadSourceList() { 175 return this.replicationLoadSourceList; 176 } 177 178 /** 179 * @see java.lang.Object#toString() 180 */ 181 @Override 182 public String toString() { 183 return this.sourceToString() + System.getProperty("line.separator") + this.sinkToString(); 184 } 185 186}