001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.regionserver.slowlog;
021
022import com.lmax.disruptor.EventHandler;
023import com.lmax.disruptor.RingBuffer;
024
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.List;
029import java.util.Queue;
030import java.util.concurrent.locks.ReentrantLock;
031import java.util.stream.Collectors;
032
033import org.apache.commons.lang3.StringUtils;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.client.SlowLogParams;
036import org.apache.hadoop.hbase.ipc.RpcCall;
037import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
043import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
044import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
045import org.apache.hbase.thirdparty.com.google.protobuf.Message;
046
047import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
051
052/**
053 * Event Handler run by disruptor ringbuffer consumer
054 */
055@InterfaceAudience.Private
056class LogEventHandler implements EventHandler<RingBufferEnvelope> {
057
058  private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class);
059
060  private static final String SYS_TABLE_QUEUE_SIZE =
061    "hbase.regionserver.slowlog.systable.queue.size";
062  private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000;
063  private static final int SYSTABLE_PUT_BATCH_SIZE = 100;
064
065  private final Queue<SlowLogPayload> queueForRingBuffer;
066  private final Queue<SlowLogPayload> queueForSysTable;
067  private final boolean isSlowLogTableEnabled;
068
069  private Configuration configuration;
070
071  private static final ReentrantLock LOCK = new ReentrantLock();
072
073  LogEventHandler(int eventCount, boolean isSlowLogTableEnabled, Configuration conf) {
074    this.configuration = conf;
075    EvictingQueue<SlowLogPayload> evictingQueue = EvictingQueue.create(eventCount);
076    queueForRingBuffer = Queues.synchronizedQueue(evictingQueue);
077    this.isSlowLogTableEnabled = isSlowLogTableEnabled;
078    if (isSlowLogTableEnabled) {
079      int sysTableQueueSize = conf.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE);
080      EvictingQueue<SlowLogPayload> evictingQueueForTable =
081        EvictingQueue.create(sysTableQueueSize);
082      queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable);
083    } else {
084      queueForSysTable = null;
085    }
086  }
087
088  /**
089   * Called when a publisher has published an event to the {@link RingBuffer}
090   *
091   * @param event published to the {@link RingBuffer}
092   * @param sequence of the event being processed
093   * @param endOfBatch flag to indicate if this is the last event in a batch from
094   *   the {@link RingBuffer}
095   * @throws Exception if the EventHandler would like the exception handled further up the chain
096   */
097  @Override
098  public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch)
099      throws Exception {
100    final RpcLogDetails rpcCallDetails = event.getPayload();
101    final RpcCall rpcCall = rpcCallDetails.getRpcCall();
102    final String clientAddress = rpcCallDetails.getClientAddress();
103    final long responseSize = rpcCallDetails.getResponseSize();
104    final String className = rpcCallDetails.getClassName();
105    final SlowLogPayload.Type type = getLogType(rpcCallDetails);
106    if (type == null) {
107      return;
108    }
109    Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod();
110    Message param = rpcCallDetails.getParam();
111    long receiveTime = rpcCall.getReceiveTime();
112    long startTime = rpcCall.getStartTime();
113    long endTime = System.currentTimeMillis();
114    int processingTime = (int) (endTime - startTime);
115    int qTime = (int) (startTime - receiveTime);
116    final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param);
117    int numGets = 0;
118    int numMutations = 0;
119    int numServiceCalls = 0;
120    if (param instanceof ClientProtos.MultiRequest) {
121      ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param;
122      for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
123        for (ClientProtos.Action action : regionAction.getActionList()) {
124          if (action.hasMutation()) {
125            numMutations++;
126          }
127          if (action.hasGet()) {
128            numGets++;
129          }
130          if (action.hasServiceCall()) {
131            numServiceCalls++;
132          }
133        }
134      }
135    }
136    final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY);
137    final String methodDescriptorName =
138      methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY;
139    SlowLogPayload slowLogPayload = SlowLogPayload.newBuilder()
140      .setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")")
141      .setClientAddress(clientAddress)
142      .setMethodName(methodDescriptorName)
143      .setMultiGets(numGets)
144      .setMultiMutations(numMutations)
145      .setMultiServiceCalls(numServiceCalls)
146      .setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY)
147      .setProcessingTime(processingTime)
148      .setQueueTime(qTime)
149      .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY)
150      .setResponseSize(responseSize)
151      .setServerClass(className)
152      .setStartTime(startTime)
153      .setType(type)
154      .setUserName(userName)
155      .build();
156    queueForRingBuffer.add(slowLogPayload);
157    if (isSlowLogTableEnabled) {
158      if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) {
159        queueForSysTable.add(slowLogPayload);
160      }
161    }
162  }
163
164  private SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) {
165    final boolean isSlowLog = rpcCallDetails.isSlowLog();
166    final boolean isLargeLog = rpcCallDetails.isLargeLog();
167    final SlowLogPayload.Type type;
168    if (!isSlowLog && !isLargeLog) {
169      LOG.error("slowLog and largeLog both are false. Ignoring the event. rpcCallDetails: {}",
170        rpcCallDetails);
171      return null;
172    }
173    if (isSlowLog && isLargeLog) {
174      type = SlowLogPayload.Type.ALL;
175    } else if (isSlowLog) {
176      type = SlowLogPayload.Type.SLOW_LOG;
177    } else {
178      type = SlowLogPayload.Type.LARGE_LOG;
179    }
180    return type;
181  }
182
183  /**
184   * Cleans up slow log payloads
185   *
186   * @return true if slow log payloads are cleaned up, false otherwise
187   */
188  boolean clearSlowLogs() {
189    if (LOG.isDebugEnabled()) {
190      LOG.debug("Received request to clean up online slowlog buffer..");
191    }
192    queueForRingBuffer.clear();
193    return true;
194  }
195
196  /**
197   * Retrieve list of slow log payloads
198   *
199   * @param request slow log request parameters
200   * @return list of slow log payloads
201   */
202  List<SlowLogPayload> getSlowLogPayloads(final AdminProtos.SlowLogResponseRequest request) {
203    List<SlowLogPayload> slowLogPayloadList =
204      Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0]))
205        .filter(e -> e.getType() == SlowLogPayload.Type.ALL
206          || e.getType() == SlowLogPayload.Type.SLOW_LOG)
207        .collect(Collectors.toList());
208
209    // latest slow logs first, operator is interested in latest records from in-memory buffer
210    Collections.reverse(slowLogPayloadList);
211
212    return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
213  }
214
215  /**
216   * Retrieve list of large log payloads
217   *
218   * @param request large log request parameters
219   * @return list of large log payloads
220   */
221  List<SlowLogPayload> getLargeLogPayloads(final AdminProtos.SlowLogResponseRequest request) {
222    List<SlowLogPayload> slowLogPayloadList =
223      Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0]))
224        .filter(e -> e.getType() == SlowLogPayload.Type.ALL
225          || e.getType() == SlowLogPayload.Type.LARGE_LOG)
226        .collect(Collectors.toList());
227
228    // latest large logs first, operator is interested in latest records from in-memory buffer
229    Collections.reverse(slowLogPayloadList);
230
231    return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
232  }
233
234  /**
235   * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
236   */
237  void addAllLogsToSysTable() {
238    if (queueForSysTable == null) {
239      // hbase.regionserver.slowlog.systable.enabled is turned off. Exiting.
240      return;
241    }
242    if (LOCK.isLocked()) {
243      return;
244    }
245    LOCK.lock();
246    try {
247      List<SlowLogPayload> slowLogPayloads = new ArrayList<>();
248      int i = 0;
249      while (!queueForSysTable.isEmpty()) {
250        slowLogPayloads.add(queueForSysTable.poll());
251        i++;
252        if (i == SYSTABLE_PUT_BATCH_SIZE) {
253          SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
254          slowLogPayloads.clear();
255          i = 0;
256        }
257      }
258      if (slowLogPayloads.size() > 0) {
259        SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
260      }
261    } finally {
262      LOCK.unlock();
263    }
264  }
265
266}