001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.namequeues.impl;
019
020import java.util.Arrays;
021import java.util.Collections;
022import java.util.List;
023import java.util.Queue;
024import java.util.stream.Collectors;
025import org.apache.commons.lang3.StringUtils;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.client.SlowLogParams;
029import org.apache.hadoop.hbase.ipc.RpcCall;
030import org.apache.hadoop.hbase.namequeues.LogHandlerUtils;
031import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
032import org.apache.hadoop.hbase.namequeues.NamedQueueService;
033import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
034import org.apache.hadoop.hbase.namequeues.SlowLogPersistentService;
035import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
036import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
043import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
044import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
045import org.apache.hbase.thirdparty.com.google.protobuf.Message;
046
047import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
051
052/**
053 * In-memory Queue service provider for Slow/LargeLog events
054 */
055@InterfaceAudience.Private
056public class SlowLogQueueService implements NamedQueueService {
057
058  private static final Logger LOG = LoggerFactory.getLogger(SlowLogQueueService.class);
059
060  private static final String SLOW_LOG_RING_BUFFER_SIZE =
061    "hbase.regionserver.slowlog.ringbuffer.size";
062
063  private final boolean isOnlineLogProviderEnabled;
064  private final boolean isSlowLogTableEnabled;
065  private final SlowLogPersistentService slowLogPersistentService;
066  private final Queue<TooSlowLog.SlowLogPayload> slowLogQueue;
067
068  public SlowLogQueueService(Configuration conf) {
069    this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
070      HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
071
072    if (!isOnlineLogProviderEnabled) {
073      this.isSlowLogTableEnabled = false;
074      this.slowLogPersistentService = null;
075      this.slowLogQueue = null;
076      return;
077    }
078
079    // Initialize SlowLog Queue
080    int slowLogQueueSize =
081      conf.getInt(SLOW_LOG_RING_BUFFER_SIZE, HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE);
082
083    EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueue = EvictingQueue.create(slowLogQueueSize);
084    slowLogQueue = Queues.synchronizedQueue(evictingQueue);
085
086    this.isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
087      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
088    if (isSlowLogTableEnabled) {
089      slowLogPersistentService = new SlowLogPersistentService(conf);
090    } else {
091      slowLogPersistentService = null;
092    }
093  }
094
095  @Override
096  public NamedQueuePayload.NamedQueueEvent getEvent() {
097    return NamedQueuePayload.NamedQueueEvent.SLOW_LOG;
098  }
099
100  /**
101   * This implementation is specific to slowLog event. This consumes slowLog event from disruptor
102   * and inserts records to EvictingQueue.
103   * @param namedQueuePayload namedQueue payload from disruptor ring buffer
104   */
105  @Override
106  public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
107    if (!isOnlineLogProviderEnabled) {
108      return;
109    }
110    if (!(namedQueuePayload instanceof RpcLogDetails)) {
111      LOG.warn("SlowLogQueueService: NamedQueuePayload is not of type RpcLogDetails.");
112      return;
113    }
114    final RpcLogDetails rpcLogDetails = (RpcLogDetails) namedQueuePayload;
115    final RpcCall rpcCall = rpcLogDetails.getRpcCall();
116    final String clientAddress = rpcLogDetails.getClientAddress();
117    final long responseSize = rpcLogDetails.getResponseSize();
118    final String className = rpcLogDetails.getClassName();
119    final TooSlowLog.SlowLogPayload.Type type = getLogType(rpcLogDetails);
120    if (type == null) {
121      return;
122    }
123    Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod();
124    Message param = rpcLogDetails.getParam();
125    long receiveTime = rpcCall.getReceiveTime();
126    long startTime = rpcCall.getStartTime();
127    long endTime = EnvironmentEdgeManager.currentTime();
128    int processingTime = (int) (endTime - startTime);
129    int qTime = (int) (startTime - receiveTime);
130    final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param);
131    int numGets = 0;
132    int numMutations = 0;
133    int numServiceCalls = 0;
134    if (param instanceof ClientProtos.MultiRequest) {
135      ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param;
136      for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
137        for (ClientProtos.Action action : regionAction.getActionList()) {
138          if (action.hasMutation()) {
139            numMutations++;
140          }
141          if (action.hasGet()) {
142            numGets++;
143          }
144          if (action.hasServiceCall()) {
145            numServiceCalls++;
146          }
147        }
148      }
149    }
150    final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY);
151    final String methodDescriptorName =
152      methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY;
153    TooSlowLog.SlowLogPayload slowLogPayload = TooSlowLog.SlowLogPayload.newBuilder()
154      .setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")")
155      .setClientAddress(clientAddress).setMethodName(methodDescriptorName).setMultiGets(numGets)
156      .setMultiMutations(numMutations).setMultiServiceCalls(numServiceCalls)
157      .setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY)
158      .setProcessingTime(processingTime).setQueueTime(qTime)
159      .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY)
160      .setResponseSize(responseSize).setServerClass(className).setStartTime(startTime).setType(type)
161      .setUserName(userName).build();
162    slowLogQueue.add(slowLogPayload);
163    if (isSlowLogTableEnabled) {
164      if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) {
165        slowLogPersistentService.addToQueueForSysTable(slowLogPayload);
166      }
167    }
168  }
169
170  @Override
171  public boolean clearNamedQueue() {
172    if (!isOnlineLogProviderEnabled) {
173      return false;
174    }
175    LOG.debug("Received request to clean up online slowlog buffer.");
176    slowLogQueue.clear();
177    return true;
178  }
179
180  @Override
181  public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
182    if (!isOnlineLogProviderEnabled) {
183      return null;
184    }
185    final AdminProtos.SlowLogResponseRequest slowLogResponseRequest =
186      request.getSlowLogResponseRequest();
187    final List<TooSlowLog.SlowLogPayload> slowLogPayloads;
188    if (
189      AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG
190        .equals(slowLogResponseRequest.getLogType())
191    ) {
192      slowLogPayloads = getLargeLogPayloads(slowLogResponseRequest);
193    } else {
194      slowLogPayloads = getSlowLogPayloads(slowLogResponseRequest);
195    }
196    NamedQueueGetResponse response = new NamedQueueGetResponse();
197    response.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
198    response.setSlowLogPayloads(slowLogPayloads);
199    return response;
200  }
201
202  private TooSlowLog.SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) {
203    final boolean isSlowLog = rpcCallDetails.isSlowLog();
204    final boolean isLargeLog = rpcCallDetails.isLargeLog();
205    final TooSlowLog.SlowLogPayload.Type type;
206    if (!isSlowLog && !isLargeLog) {
207      LOG.error("slowLog and largeLog both are false. Ignoring the event. rpcCallDetails: {}",
208        rpcCallDetails);
209      return null;
210    }
211    if (isSlowLog && isLargeLog) {
212      type = TooSlowLog.SlowLogPayload.Type.ALL;
213    } else if (isSlowLog) {
214      type = TooSlowLog.SlowLogPayload.Type.SLOW_LOG;
215    } else {
216      type = TooSlowLog.SlowLogPayload.Type.LARGE_LOG;
217    }
218    return type;
219  }
220
221  /**
222   * Add all slowLog events to system table. This is only for slowLog event's persistence on system
223   * table.
224   */
225  @Override
226  public void persistAll() {
227    if (!isOnlineLogProviderEnabled) {
228      return;
229    }
230    if (slowLogPersistentService != null) {
231      slowLogPersistentService.addAllLogsToSysTable();
232    }
233  }
234
235  private List<TooSlowLog.SlowLogPayload>
236    getSlowLogPayloads(final AdminProtos.SlowLogResponseRequest request) {
237    List<TooSlowLog.SlowLogPayload> slowLogPayloadList =
238      Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0]))
239        .filter(e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL
240          || e.getType() == TooSlowLog.SlowLogPayload.Type.SLOW_LOG)
241        .collect(Collectors.toList());
242    // latest slow logs first, operator is interested in latest records from in-memory buffer
243    Collections.reverse(slowLogPayloadList);
244    return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
245  }
246
247  private List<TooSlowLog.SlowLogPayload>
248    getLargeLogPayloads(final AdminProtos.SlowLogResponseRequest request) {
249    List<
250      TooSlowLog.SlowLogPayload> slowLogPayloadList =
251        Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0]))
252          .filter(e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL
253            || e.getType() == TooSlowLog.SlowLogPayload.Type.LARGE_LOG)
254          .collect(Collectors.toList());
255    // latest large logs first, operator is interested in latest records from in-memory buffer
256    Collections.reverse(slowLogPayloadList);
257    return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
258  }
259
260}