001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.namequeues.impl; 021 022import java.util.Arrays; 023import java.util.Collections; 024import java.util.List; 025import java.util.Queue; 026import java.util.stream.Collectors; 027 028import org.apache.commons.lang3.StringUtils; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.client.SlowLogParams; 032import org.apache.hadoop.hbase.ipc.RpcCall; 033import org.apache.hadoop.hbase.namequeues.LogHandlerUtils; 034import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; 035import org.apache.hadoop.hbase.namequeues.NamedQueueService; 036import org.apache.hadoop.hbase.namequeues.RpcLogDetails; 037import org.apache.hadoop.hbase.namequeues.SlowLogPersistentService; 038import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 039import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; 045import org.apache.hbase.thirdparty.com.google.common.collect.Queues; 046import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; 047import org.apache.hbase.thirdparty.com.google.protobuf.Message; 048 049import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; 053 054/** 055 * In-memory Queue service provider for Slow/LargeLog events 056 */ 057@InterfaceAudience.Private 058public class SlowLogQueueService implements NamedQueueService { 059 060 private static final Logger LOG = LoggerFactory.getLogger(SlowLogQueueService.class); 061 062 private static final String SLOW_LOG_RING_BUFFER_SIZE = 063 "hbase.regionserver.slowlog.ringbuffer.size"; 064 065 private final boolean isOnlineLogProviderEnabled; 066 private final boolean isSlowLogTableEnabled; 067 private final SlowLogPersistentService slowLogPersistentService; 068 private final Queue<TooSlowLog.SlowLogPayload> slowLogQueue; 069 070 public SlowLogQueueService(Configuration conf) { 071 this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, 072 HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); 073 074 if (!isOnlineLogProviderEnabled) { 075 this.isSlowLogTableEnabled = false; 076 this.slowLogPersistentService = null; 077 this.slowLogQueue = null; 078 return; 079 } 080 081 // Initialize SlowLog Queue 082 int slowLogQueueSize = 083 conf.getInt(SLOW_LOG_RING_BUFFER_SIZE, HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE); 084 085 EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueue = 086 EvictingQueue.create(slowLogQueueSize); 087 slowLogQueue = Queues.synchronizedQueue(evictingQueue); 088 089 this.isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, 090 HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); 091 if (isSlowLogTableEnabled) { 092 slowLogPersistentService = new SlowLogPersistentService(conf); 093 } else { 094 slowLogPersistentService = null; 095 } 096 } 097 098 @Override 099 public NamedQueuePayload.NamedQueueEvent getEvent() { 100 return NamedQueuePayload.NamedQueueEvent.SLOW_LOG; 101 } 102 103 /** 104 * This implementation is specific to slowLog event. This consumes slowLog event from 105 * disruptor and inserts records to EvictingQueue. 106 * 107 * @param namedQueuePayload namedQueue payload from disruptor ring buffer 108 */ 109 @Override 110 public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { 111 if (!isOnlineLogProviderEnabled) { 112 return; 113 } 114 if (!(namedQueuePayload instanceof RpcLogDetails)) { 115 LOG.warn("SlowLogQueueService: NamedQueuePayload is not of type RpcLogDetails."); 116 return; 117 } 118 final RpcLogDetails rpcLogDetails = (RpcLogDetails) namedQueuePayload; 119 final RpcCall rpcCall = rpcLogDetails.getRpcCall(); 120 final String clientAddress = rpcLogDetails.getClientAddress(); 121 final long responseSize = rpcLogDetails.getResponseSize(); 122 final String className = rpcLogDetails.getClassName(); 123 final TooSlowLog.SlowLogPayload.Type type = getLogType(rpcLogDetails); 124 if (type == null) { 125 return; 126 } 127 Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod(); 128 Message param = rpcLogDetails.getParam(); 129 long receiveTime = rpcCall.getReceiveTime(); 130 long startTime = rpcCall.getStartTime(); 131 long endTime = System.currentTimeMillis(); 132 int processingTime = (int) (endTime - startTime); 133 int qTime = (int) (startTime - receiveTime); 134 final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param); 135 int numGets = 0; 136 int numMutations = 0; 137 int numServiceCalls = 0; 138 if (param instanceof ClientProtos.MultiRequest) { 139 ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param; 140 for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) { 141 for (ClientProtos.Action action : regionAction.getActionList()) { 142 if (action.hasMutation()) { 143 numMutations++; 144 } 145 if (action.hasGet()) { 146 numGets++; 147 } 148 if (action.hasServiceCall()) { 149 numServiceCalls++; 150 } 151 } 152 } 153 } 154 final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY); 155 final String methodDescriptorName = 156 methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY; 157 TooSlowLog.SlowLogPayload slowLogPayload = TooSlowLog.SlowLogPayload.newBuilder() 158 .setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")") 159 .setClientAddress(clientAddress) 160 .setMethodName(methodDescriptorName) 161 .setMultiGets(numGets) 162 .setMultiMutations(numMutations) 163 .setMultiServiceCalls(numServiceCalls) 164 .setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY) 165 .setProcessingTime(processingTime) 166 .setQueueTime(qTime) 167 .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY) 168 .setResponseSize(responseSize) 169 .setServerClass(className) 170 .setStartTime(startTime) 171 .setType(type) 172 .setUserName(userName) 173 .build(); 174 slowLogQueue.add(slowLogPayload); 175 if (isSlowLogTableEnabled) { 176 if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) { 177 slowLogPersistentService.addToQueueForSysTable(slowLogPayload); 178 } 179 } 180 } 181 182 @Override 183 public boolean clearNamedQueue() { 184 if (!isOnlineLogProviderEnabled) { 185 return false; 186 } 187 LOG.debug("Received request to clean up online slowlog buffer."); 188 slowLogQueue.clear(); 189 return true; 190 } 191 192 @Override 193 public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { 194 if (!isOnlineLogProviderEnabled) { 195 return null; 196 } 197 final AdminProtos.SlowLogResponseRequest slowLogResponseRequest = 198 request.getSlowLogResponseRequest(); 199 final List<TooSlowLog.SlowLogPayload> slowLogPayloads; 200 if (AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG 201 .equals(slowLogResponseRequest.getLogType())) { 202 slowLogPayloads = getLargeLogPayloads(slowLogResponseRequest); 203 } else { 204 slowLogPayloads = getSlowLogPayloads(slowLogResponseRequest); 205 } 206 NamedQueueGetResponse response = new NamedQueueGetResponse(); 207 response.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT); 208 response.setSlowLogPayloads(slowLogPayloads); 209 return response; 210 } 211 212 private TooSlowLog.SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) { 213 final boolean isSlowLog = rpcCallDetails.isSlowLog(); 214 final boolean isLargeLog = rpcCallDetails.isLargeLog(); 215 final TooSlowLog.SlowLogPayload.Type type; 216 if (!isSlowLog && !isLargeLog) { 217 LOG.error("slowLog and largeLog both are false. Ignoring the event. rpcCallDetails: {}", 218 rpcCallDetails); 219 return null; 220 } 221 if (isSlowLog && isLargeLog) { 222 type = TooSlowLog.SlowLogPayload.Type.ALL; 223 } else if (isSlowLog) { 224 type = TooSlowLog.SlowLogPayload.Type.SLOW_LOG; 225 } else { 226 type = TooSlowLog.SlowLogPayload.Type.LARGE_LOG; 227 } 228 return type; 229 } 230 231 /** 232 * Add all slowLog events to system table. This is only for slowLog event's persistence on 233 * system table. 234 */ 235 @Override 236 public void persistAll() { 237 if (!isOnlineLogProviderEnabled) { 238 return; 239 } 240 if (slowLogPersistentService != null) { 241 slowLogPersistentService.addAllLogsToSysTable(); 242 } 243 } 244 245 private List<TooSlowLog.SlowLogPayload> getSlowLogPayloads( 246 final AdminProtos.SlowLogResponseRequest request) { 247 List<TooSlowLog.SlowLogPayload> slowLogPayloadList = 248 Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0])).filter( 249 e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL 250 || e.getType() == TooSlowLog.SlowLogPayload.Type.SLOW_LOG).collect(Collectors.toList()); 251 // latest slow logs first, operator is interested in latest records from in-memory buffer 252 Collections.reverse(slowLogPayloadList); 253 return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList); 254 } 255 256 private List<TooSlowLog.SlowLogPayload> getLargeLogPayloads( 257 final AdminProtos.SlowLogResponseRequest request) { 258 List<TooSlowLog.SlowLogPayload> slowLogPayloadList = 259 Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0])).filter( 260 e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL 261 || e.getType() == TooSlowLog.SlowLogPayload.Type.LARGE_LOG).collect(Collectors.toList()); 262 // latest large logs first, operator is interested in latest records from in-memory buffer 263 Collections.reverse(slowLogPayloadList); 264 return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList); 265 } 266 267}