001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.namequeues.impl; 019 020import java.util.Arrays; 021import java.util.Collections; 022import java.util.List; 023import java.util.Queue; 024import java.util.stream.Collectors; 025import org.apache.commons.lang3.StringUtils; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.client.SlowLogParams; 029import org.apache.hadoop.hbase.ipc.RpcCall; 030import org.apache.hadoop.hbase.namequeues.LogHandlerUtils; 031import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; 032import org.apache.hadoop.hbase.namequeues.NamedQueueService; 033import org.apache.hadoop.hbase.namequeues.RpcLogDetails; 034import org.apache.hadoop.hbase.namequeues.SlowLogPersistentService; 035import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 036import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; 043import org.apache.hbase.thirdparty.com.google.common.collect.Queues; 044import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; 045import org.apache.hbase.thirdparty.com.google.protobuf.Message; 046 047import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; 051 052/** 053 * In-memory Queue service provider for Slow/LargeLog events 054 */ 055@InterfaceAudience.Private 056public class SlowLogQueueService implements NamedQueueService { 057 058 private static final Logger LOG = LoggerFactory.getLogger(SlowLogQueueService.class); 059 060 private static final String SLOW_LOG_RING_BUFFER_SIZE = 061 "hbase.regionserver.slowlog.ringbuffer.size"; 062 063 private final boolean isOnlineLogProviderEnabled; 064 private final boolean isSlowLogTableEnabled; 065 private final SlowLogPersistentService slowLogPersistentService; 066 private final Queue<TooSlowLog.SlowLogPayload> slowLogQueue; 067 068 public SlowLogQueueService(Configuration conf) { 069 this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, 070 HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); 071 072 if (!isOnlineLogProviderEnabled) { 073 this.isSlowLogTableEnabled = false; 074 this.slowLogPersistentService = null; 075 this.slowLogQueue = null; 076 return; 077 } 078 079 // Initialize SlowLog Queue 080 int slowLogQueueSize = 081 conf.getInt(SLOW_LOG_RING_BUFFER_SIZE, HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE); 082 083 EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueue = EvictingQueue.create(slowLogQueueSize); 084 slowLogQueue = Queues.synchronizedQueue(evictingQueue); 085 086 this.isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, 087 HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); 088 if (isSlowLogTableEnabled) { 089 slowLogPersistentService = new SlowLogPersistentService(conf); 090 } else { 091 slowLogPersistentService = null; 092 } 093 } 094 095 @Override 096 public NamedQueuePayload.NamedQueueEvent getEvent() { 097 return NamedQueuePayload.NamedQueueEvent.SLOW_LOG; 098 } 099 100 /** 101 * This implementation is specific to slowLog event. This consumes slowLog event from disruptor 102 * and inserts records to EvictingQueue. 103 * @param namedQueuePayload namedQueue payload from disruptor ring buffer 104 */ 105 @Override 106 public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { 107 if (!isOnlineLogProviderEnabled) { 108 return; 109 } 110 if (!(namedQueuePayload instanceof RpcLogDetails)) { 111 LOG.warn("SlowLogQueueService: NamedQueuePayload is not of type RpcLogDetails."); 112 return; 113 } 114 final RpcLogDetails rpcLogDetails = (RpcLogDetails) namedQueuePayload; 115 final RpcCall rpcCall = rpcLogDetails.getRpcCall(); 116 final String clientAddress = rpcLogDetails.getClientAddress(); 117 final long responseSize = rpcLogDetails.getResponseSize(); 118 final String className = rpcLogDetails.getClassName(); 119 final TooSlowLog.SlowLogPayload.Type type = getLogType(rpcLogDetails); 120 if (type == null) { 121 return; 122 } 123 Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod(); 124 Message param = rpcLogDetails.getParam(); 125 long receiveTime = rpcCall.getReceiveTime(); 126 long startTime = rpcCall.getStartTime(); 127 long endTime = EnvironmentEdgeManager.currentTime(); 128 int processingTime = (int) (endTime - startTime); 129 int qTime = (int) (startTime - receiveTime); 130 final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param); 131 int numGets = 0; 132 int numMutations = 0; 133 int numServiceCalls = 0; 134 if (param instanceof ClientProtos.MultiRequest) { 135 ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param; 136 for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) { 137 for (ClientProtos.Action action : regionAction.getActionList()) { 138 if (action.hasMutation()) { 139 numMutations++; 140 } 141 if (action.hasGet()) { 142 numGets++; 143 } 144 if (action.hasServiceCall()) { 145 numServiceCalls++; 146 } 147 } 148 } 149 } 150 final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY); 151 final String methodDescriptorName = 152 methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY; 153 TooSlowLog.SlowLogPayload slowLogPayload = TooSlowLog.SlowLogPayload.newBuilder() 154 .setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")") 155 .setClientAddress(clientAddress).setMethodName(methodDescriptorName).setMultiGets(numGets) 156 .setMultiMutations(numMutations).setMultiServiceCalls(numServiceCalls) 157 .setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY) 158 .setProcessingTime(processingTime).setQueueTime(qTime) 159 .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY) 160 .setResponseSize(responseSize).setServerClass(className).setStartTime(startTime).setType(type) 161 .setUserName(userName).build(); 162 slowLogQueue.add(slowLogPayload); 163 if (isSlowLogTableEnabled) { 164 if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) { 165 slowLogPersistentService.addToQueueForSysTable(slowLogPayload); 166 } 167 } 168 } 169 170 @Override 171 public boolean clearNamedQueue() { 172 if (!isOnlineLogProviderEnabled) { 173 return false; 174 } 175 LOG.debug("Received request to clean up online slowlog buffer."); 176 slowLogQueue.clear(); 177 return true; 178 } 179 180 @Override 181 public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { 182 if (!isOnlineLogProviderEnabled) { 183 return null; 184 } 185 final AdminProtos.SlowLogResponseRequest slowLogResponseRequest = 186 request.getSlowLogResponseRequest(); 187 final List<TooSlowLog.SlowLogPayload> slowLogPayloads; 188 if ( 189 AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG 190 .equals(slowLogResponseRequest.getLogType()) 191 ) { 192 slowLogPayloads = getLargeLogPayloads(slowLogResponseRequest); 193 } else { 194 slowLogPayloads = getSlowLogPayloads(slowLogResponseRequest); 195 } 196 NamedQueueGetResponse response = new NamedQueueGetResponse(); 197 response.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT); 198 response.setSlowLogPayloads(slowLogPayloads); 199 return response; 200 } 201 202 private TooSlowLog.SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) { 203 final boolean isSlowLog = rpcCallDetails.isSlowLog(); 204 final boolean isLargeLog = rpcCallDetails.isLargeLog(); 205 final TooSlowLog.SlowLogPayload.Type type; 206 if (!isSlowLog && !isLargeLog) { 207 LOG.error("slowLog and largeLog both are false. Ignoring the event. rpcCallDetails: {}", 208 rpcCallDetails); 209 return null; 210 } 211 if (isSlowLog && isLargeLog) { 212 type = TooSlowLog.SlowLogPayload.Type.ALL; 213 } else if (isSlowLog) { 214 type = TooSlowLog.SlowLogPayload.Type.SLOW_LOG; 215 } else { 216 type = TooSlowLog.SlowLogPayload.Type.LARGE_LOG; 217 } 218 return type; 219 } 220 221 /** 222 * Add all slowLog events to system table. This is only for slowLog event's persistence on system 223 * table. 224 */ 225 @Override 226 public void persistAll() { 227 if (!isOnlineLogProviderEnabled) { 228 return; 229 } 230 if (slowLogPersistentService != null) { 231 slowLogPersistentService.addAllLogsToSysTable(); 232 } 233 } 234 235 private List<TooSlowLog.SlowLogPayload> 236 getSlowLogPayloads(final AdminProtos.SlowLogResponseRequest request) { 237 List<TooSlowLog.SlowLogPayload> slowLogPayloadList = 238 Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0])) 239 .filter(e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL 240 || e.getType() == TooSlowLog.SlowLogPayload.Type.SLOW_LOG) 241 .collect(Collectors.toList()); 242 // latest slow logs first, operator is interested in latest records from in-memory buffer 243 Collections.reverse(slowLogPayloadList); 244 return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList); 245 } 246 247 private List<TooSlowLog.SlowLogPayload> 248 getLargeLogPayloads(final AdminProtos.SlowLogResponseRequest request) { 249 List< 250 TooSlowLog.SlowLogPayload> slowLogPayloadList = 251 Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0])) 252 .filter(e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL 253 || e.getType() == TooSlowLog.SlowLogPayload.Type.LARGE_LOG) 254 .collect(Collectors.toList()); 255 // latest large logs first, operator is interested in latest records from in-memory buffer 256 Collections.reverse(slowLogPayloadList); 257 return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList); 258 } 259 260}