001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.regionserver.slowlog; 021 022import com.lmax.disruptor.EventHandler; 023import com.lmax.disruptor.RingBuffer; 024 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.List; 029import java.util.Queue; 030import java.util.concurrent.locks.ReentrantLock; 031import java.util.stream.Collectors; 032 033import org.apache.commons.lang3.StringUtils; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.client.SlowLogParams; 036import org.apache.hadoop.hbase.ipc.RpcCall; 037import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; 043import org.apache.hbase.thirdparty.com.google.common.collect.Queues; 044import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; 045import org.apache.hbase.thirdparty.com.google.protobuf.Message; 046 047import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; 051 052/** 053 * Event Handler run by disruptor ringbuffer consumer 054 */ 055@InterfaceAudience.Private 056class LogEventHandler implements EventHandler<RingBufferEnvelope> { 057 058 private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class); 059 060 private static final String SYS_TABLE_QUEUE_SIZE = 061 "hbase.regionserver.slowlog.systable.queue.size"; 062 private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000; 063 private static final int SYSTABLE_PUT_BATCH_SIZE = 100; 064 065 private final Queue<SlowLogPayload> queueForRingBuffer; 066 private final Queue<SlowLogPayload> queueForSysTable; 067 private final boolean isSlowLogTableEnabled; 068 069 private Configuration configuration; 070 071 private static final ReentrantLock LOCK = new ReentrantLock(); 072 073 LogEventHandler(int eventCount, boolean isSlowLogTableEnabled, Configuration conf) { 074 this.configuration = conf; 075 EvictingQueue<SlowLogPayload> evictingQueue = EvictingQueue.create(eventCount); 076 queueForRingBuffer = Queues.synchronizedQueue(evictingQueue); 077 this.isSlowLogTableEnabled = isSlowLogTableEnabled; 078 if (isSlowLogTableEnabled) { 079 int sysTableQueueSize = conf.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE); 080 EvictingQueue<SlowLogPayload> evictingQueueForTable = 081 EvictingQueue.create(sysTableQueueSize); 082 queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable); 083 } else { 084 queueForSysTable = null; 085 } 086 } 087 088 /** 089 * Called when a publisher has published an event to the {@link RingBuffer} 090 * 091 * @param event published to the {@link RingBuffer} 092 * @param sequence of the event being processed 093 * @param endOfBatch flag to indicate if this is the last event in a batch from 094 * the {@link RingBuffer} 095 * @throws Exception if the EventHandler would like the exception handled further up the chain 096 */ 097 @Override 098 public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch) 099 throws Exception { 100 final RpcLogDetails rpcCallDetails = event.getPayload(); 101 final RpcCall rpcCall = rpcCallDetails.getRpcCall(); 102 final String clientAddress = rpcCallDetails.getClientAddress(); 103 final long responseSize = rpcCallDetails.getResponseSize(); 104 final String className = rpcCallDetails.getClassName(); 105 final SlowLogPayload.Type type = getLogType(rpcCallDetails); 106 if (type == null) { 107 return; 108 } 109 Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod(); 110 Message param = rpcCallDetails.getParam(); 111 long receiveTime = rpcCall.getReceiveTime(); 112 long startTime = rpcCall.getStartTime(); 113 long endTime = System.currentTimeMillis(); 114 int processingTime = (int) (endTime - startTime); 115 int qTime = (int) (startTime - receiveTime); 116 final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param); 117 int numGets = 0; 118 int numMutations = 0; 119 int numServiceCalls = 0; 120 if (param instanceof ClientProtos.MultiRequest) { 121 ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param; 122 for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) { 123 for (ClientProtos.Action action : regionAction.getActionList()) { 124 if (action.hasMutation()) { 125 numMutations++; 126 } 127 if (action.hasGet()) { 128 numGets++; 129 } 130 if (action.hasServiceCall()) { 131 numServiceCalls++; 132 } 133 } 134 } 135 } 136 final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY); 137 final String methodDescriptorName = 138 methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY; 139 SlowLogPayload slowLogPayload = SlowLogPayload.newBuilder() 140 .setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")") 141 .setClientAddress(clientAddress) 142 .setMethodName(methodDescriptorName) 143 .setMultiGets(numGets) 144 .setMultiMutations(numMutations) 145 .setMultiServiceCalls(numServiceCalls) 146 .setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY) 147 .setProcessingTime(processingTime) 148 .setQueueTime(qTime) 149 .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY) 150 .setResponseSize(responseSize) 151 .setServerClass(className) 152 .setStartTime(startTime) 153 .setType(type) 154 .setUserName(userName) 155 .build(); 156 queueForRingBuffer.add(slowLogPayload); 157 if (isSlowLogTableEnabled) { 158 if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) { 159 queueForSysTable.add(slowLogPayload); 160 } 161 } 162 } 163 164 private SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) { 165 final boolean isSlowLog = rpcCallDetails.isSlowLog(); 166 final boolean isLargeLog = rpcCallDetails.isLargeLog(); 167 final SlowLogPayload.Type type; 168 if (!isSlowLog && !isLargeLog) { 169 LOG.error("slowLog and largeLog both are false. Ignoring the event. rpcCallDetails: {}", 170 rpcCallDetails); 171 return null; 172 } 173 if (isSlowLog && isLargeLog) { 174 type = SlowLogPayload.Type.ALL; 175 } else if (isSlowLog) { 176 type = SlowLogPayload.Type.SLOW_LOG; 177 } else { 178 type = SlowLogPayload.Type.LARGE_LOG; 179 } 180 return type; 181 } 182 183 /** 184 * Cleans up slow log payloads 185 * 186 * @return true if slow log payloads are cleaned up, false otherwise 187 */ 188 boolean clearSlowLogs() { 189 if (LOG.isDebugEnabled()) { 190 LOG.debug("Received request to clean up online slowlog buffer.."); 191 } 192 queueForRingBuffer.clear(); 193 return true; 194 } 195 196 /** 197 * Retrieve list of slow log payloads 198 * 199 * @param request slow log request parameters 200 * @return list of slow log payloads 201 */ 202 List<SlowLogPayload> getSlowLogPayloads(final AdminProtos.SlowLogResponseRequest request) { 203 List<SlowLogPayload> slowLogPayloadList = 204 Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0])) 205 .filter(e -> e.getType() == SlowLogPayload.Type.ALL 206 || e.getType() == SlowLogPayload.Type.SLOW_LOG) 207 .collect(Collectors.toList()); 208 209 // latest slow logs first, operator is interested in latest records from in-memory buffer 210 Collections.reverse(slowLogPayloadList); 211 212 return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList); 213 } 214 215 /** 216 * Retrieve list of large log payloads 217 * 218 * @param request large log request parameters 219 * @return list of large log payloads 220 */ 221 List<SlowLogPayload> getLargeLogPayloads(final AdminProtos.SlowLogResponseRequest request) { 222 List<SlowLogPayload> slowLogPayloadList = 223 Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0])) 224 .filter(e -> e.getType() == SlowLogPayload.Type.ALL 225 || e.getType() == SlowLogPayload.Type.LARGE_LOG) 226 .collect(Collectors.toList()); 227 228 // latest large logs first, operator is interested in latest records from in-memory buffer 229 Collections.reverse(slowLogPayloadList); 230 231 return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList); 232 } 233 234 /** 235 * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch 236 */ 237 void addAllLogsToSysTable() { 238 if (queueForSysTable == null) { 239 // hbase.regionserver.slowlog.systable.enabled is turned off. Exiting. 240 return; 241 } 242 if (LOCK.isLocked()) { 243 return; 244 } 245 LOCK.lock(); 246 try { 247 List<SlowLogPayload> slowLogPayloads = new ArrayList<>(); 248 int i = 0; 249 while (!queueForSysTable.isEmpty()) { 250 slowLogPayloads.add(queueForSysTable.poll()); 251 i++; 252 if (i == SYSTABLE_PUT_BATCH_SIZE) { 253 SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration); 254 slowLogPayloads.clear(); 255 i = 0; 256 } 257 } 258 if (slowLogPayloads.size() > 0) { 259 SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration); 260 } 261 } finally { 262 LOCK.unlock(); 263 } 264 } 265 266}