001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.namequeues.impl; 019 020import java.util.Arrays; 021import java.util.Collection; 022import java.util.Collections; 023import java.util.List; 024import java.util.Map; 025import java.util.Queue; 026import java.util.stream.Collectors; 027import org.apache.commons.lang3.StringUtils; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.client.Connection; 031import org.apache.hadoop.hbase.client.SlowLogParams; 032import org.apache.hadoop.hbase.ipc.RpcCall; 033import org.apache.hadoop.hbase.namequeues.LogHandlerUtils; 034import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; 035import org.apache.hadoop.hbase.namequeues.NamedQueueService; 036import org.apache.hadoop.hbase.namequeues.RpcLogDetails; 037import org.apache.hadoop.hbase.namequeues.SlowLogPersistentService; 038import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 039import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; 046import org.apache.hbase.thirdparty.com.google.common.collect.Queues; 047import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 048import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; 049import org.apache.hbase.thirdparty.com.google.protobuf.Message; 050 051import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; 056 057/** 058 * In-memory Queue service provider for Slow/LargeLog events 059 */ 060@InterfaceAudience.Private 061public class SlowLogQueueService implements NamedQueueService { 062 063 private static final Logger LOG = LoggerFactory.getLogger(SlowLogQueueService.class); 064 065 private static final String SLOW_LOG_RING_BUFFER_SIZE = 066 "hbase.regionserver.slowlog.ringbuffer.size"; 067 068 private final boolean isOnlineLogProviderEnabled; 069 private final boolean isSlowLogTableEnabled; 070 private final SlowLogPersistentService slowLogPersistentService; 071 private final Queue<TooSlowLog.SlowLogPayload> slowLogQueue; 072 private final boolean slowLogScanPayloadEnabled; 073 074 public SlowLogQueueService(Configuration conf) { 075 this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, 076 HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); 077 this.slowLogScanPayloadEnabled = conf.getBoolean(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED, 078 HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED_DEFAULT); 079 080 if (!isOnlineLogProviderEnabled) { 081 this.isSlowLogTableEnabled = false; 082 this.slowLogPersistentService = null; 083 this.slowLogQueue = null; 084 return; 085 } 086 087 // Initialize SlowLog Queue 088 int slowLogQueueSize = 089 conf.getInt(SLOW_LOG_RING_BUFFER_SIZE, HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE); 090 091 EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueue = EvictingQueue.create(slowLogQueueSize); 092 slowLogQueue = Queues.synchronizedQueue(evictingQueue); 093 094 this.isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, 095 HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); 096 if (isSlowLogTableEnabled) { 097 slowLogPersistentService = new SlowLogPersistentService(conf); 098 } else { 099 slowLogPersistentService = null; 100 } 101 } 102 103 @Override 104 public NamedQueuePayload.NamedQueueEvent getEvent() { 105 return NamedQueuePayload.NamedQueueEvent.SLOW_LOG; 106 } 107 108 /** 109 * This implementation is specific to slowLog event. This consumes slowLog event from disruptor 110 * and inserts records to EvictingQueue. 111 * @param namedQueuePayload namedQueue payload from disruptor ring buffer 112 */ 113 @Override 114 public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { 115 if (!isOnlineLogProviderEnabled) { 116 return; 117 } 118 if (!(namedQueuePayload instanceof RpcLogDetails)) { 119 LOG.warn("SlowLogQueueService: NamedQueuePayload is not of type RpcLogDetails."); 120 return; 121 } 122 final RpcLogDetails rpcLogDetails = (RpcLogDetails) namedQueuePayload; 123 final RpcCall rpcCall = rpcLogDetails.getRpcCall(); 124 final String clientAddress = rpcLogDetails.getClientAddress(); 125 final long responseSize = rpcLogDetails.getResponseSize(); 126 final long blockBytesScanned = rpcLogDetails.getBlockBytesScanned(); 127 final long fsReadTime = rpcLogDetails.getFsReadTime(); 128 final String className = rpcLogDetails.getClassName(); 129 final TooSlowLog.SlowLogPayload.Type type = getLogType(rpcLogDetails); 130 if (type == null) { 131 return; 132 } 133 Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod(); 134 Message param = rpcLogDetails.getParam(); 135 long receiveTime = rpcCall.getReceiveTime(); 136 long startTime = rpcCall.getStartTime(); 137 long endTime = EnvironmentEdgeManager.currentTime(); 138 int processingTime = (int) (endTime - startTime); 139 int qTime = (int) (startTime - receiveTime); 140 final SlowLogParams slowLogParams = 141 ProtobufUtil.getSlowLogParams(param, slowLogScanPayloadEnabled); 142 int numGets = 0; 143 int numMutations = 0; 144 int numServiceCalls = 0; 145 if (param instanceof ClientProtos.MultiRequest) { 146 ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param; 147 for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) { 148 for (ClientProtos.Action action : regionAction.getActionList()) { 149 if (action.hasMutation()) { 150 numMutations++; 151 } 152 if (action.hasGet()) { 153 numGets++; 154 } 155 if (action.hasServiceCall()) { 156 numServiceCalls++; 157 } 158 } 159 } 160 } 161 final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY); 162 final String methodDescriptorName = 163 methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY; 164 TooSlowLog.SlowLogPayload.Builder slowLogPayloadBuilder = TooSlowLog.SlowLogPayload.newBuilder() 165 .setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")") 166 .setClientAddress(clientAddress).setMethodName(methodDescriptorName).setMultiGets(numGets) 167 .setMultiMutations(numMutations).setMultiServiceCalls(numServiceCalls) 168 .setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY) 169 .setProcessingTime(processingTime).setQueueTime(qTime) 170 .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY) 171 .setResponseSize(responseSize).setBlockBytesScanned(blockBytesScanned) 172 .setFsReadTime(fsReadTime).setServerClass(className).setStartTime(startTime).setType(type) 173 .setUserName(userName) 174 .addAllRequestAttribute(buildNameBytesPairs(rpcLogDetails.getRequestAttributes())) 175 .addAllConnectionAttribute(buildNameBytesPairs(rpcLogDetails.getConnectionAttributes())); 176 if (slowLogParams != null && slowLogParams.getScan() != null) { 177 slowLogPayloadBuilder.setScan(slowLogParams.getScan()); 178 } 179 TooSlowLog.SlowLogPayload slowLogPayload = slowLogPayloadBuilder.build(); 180 slowLogQueue.add(slowLogPayload); 181 if (isSlowLogTableEnabled) { 182 if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) { 183 slowLogPersistentService.addToQueueForSysTable(slowLogPayload); 184 } 185 } 186 } 187 188 private static Collection<HBaseProtos.NameBytesPair> 189 buildNameBytesPairs(Map<String, byte[]> attributes) { 190 if (attributes == null) { 191 return Collections.emptySet(); 192 } 193 return attributes.entrySet().stream().map(attr -> HBaseProtos.NameBytesPair.newBuilder() 194 .setName(attr.getKey()).setValue(ByteString.copyFrom(attr.getValue())).build()) 195 .collect(Collectors.toSet()); 196 } 197 198 @Override 199 public boolean clearNamedQueue() { 200 if (!isOnlineLogProviderEnabled) { 201 return false; 202 } 203 LOG.debug("Received request to clean up online slowlog buffer."); 204 slowLogQueue.clear(); 205 return true; 206 } 207 208 @Override 209 public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { 210 if (!isOnlineLogProviderEnabled) { 211 return null; 212 } 213 final AdminProtos.SlowLogResponseRequest slowLogResponseRequest = 214 request.getSlowLogResponseRequest(); 215 final List<TooSlowLog.SlowLogPayload> slowLogPayloads; 216 if ( 217 AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG 218 .equals(slowLogResponseRequest.getLogType()) 219 ) { 220 slowLogPayloads = getLargeLogPayloads(slowLogResponseRequest); 221 } else { 222 slowLogPayloads = getSlowLogPayloads(slowLogResponseRequest); 223 } 224 NamedQueueGetResponse response = new NamedQueueGetResponse(); 225 response.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT); 226 response.setSlowLogPayloads(slowLogPayloads); 227 return response; 228 } 229 230 private TooSlowLog.SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) { 231 final boolean isSlowLog = rpcCallDetails.isSlowLog(); 232 final boolean isLargeLog = rpcCallDetails.isLargeLog(); 233 final TooSlowLog.SlowLogPayload.Type type; 234 if (!isSlowLog && !isLargeLog) { 235 LOG.error("slowLog and largeLog both are false. Ignoring the event. rpcCallDetails: {}", 236 rpcCallDetails); 237 return null; 238 } 239 if (isSlowLog && isLargeLog) { 240 type = TooSlowLog.SlowLogPayload.Type.ALL; 241 } else if (isSlowLog) { 242 type = TooSlowLog.SlowLogPayload.Type.SLOW_LOG; 243 } else { 244 type = TooSlowLog.SlowLogPayload.Type.LARGE_LOG; 245 } 246 return type; 247 } 248 249 /** 250 * Add all slowLog events to system table. This is only for slowLog event's persistence on system 251 * table. 252 */ 253 @Override 254 public void persistAll(Connection connection) { 255 if (!isOnlineLogProviderEnabled) { 256 return; 257 } 258 if (slowLogPersistentService != null) { 259 slowLogPersistentService.addAllLogsToSysTable(connection); 260 } 261 } 262 263 private List<TooSlowLog.SlowLogPayload> 264 getSlowLogPayloads(final AdminProtos.SlowLogResponseRequest request) { 265 List<TooSlowLog.SlowLogPayload> slowLogPayloadList = 266 Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0])) 267 .filter(e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL 268 || e.getType() == TooSlowLog.SlowLogPayload.Type.SLOW_LOG) 269 .collect(Collectors.toList()); 270 // latest slow logs first, operator is interested in latest records from in-memory buffer 271 Collections.reverse(slowLogPayloadList); 272 return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList); 273 } 274 275 private List<TooSlowLog.SlowLogPayload> 276 getLargeLogPayloads(final AdminProtos.SlowLogResponseRequest request) { 277 List< 278 TooSlowLog.SlowLogPayload> slowLogPayloadList = 279 Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0])) 280 .filter(e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL 281 || e.getType() == TooSlowLog.SlowLogPayload.Type.LARGE_LOG) 282 .collect(Collectors.toList()); 283 // latest large logs first, operator is interested in latest records from in-memory buffer 284 Collections.reverse(slowLogPayloadList); 285 return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList); 286 } 287 288}