001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.namequeues.impl;
021
022import java.util.Arrays;
023import java.util.Collections;
024import java.util.List;
025import java.util.Queue;
026import java.util.stream.Collectors;
027
028import org.apache.commons.lang3.StringUtils;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.client.SlowLogParams;
032import org.apache.hadoop.hbase.ipc.RpcCall;
033import org.apache.hadoop.hbase.namequeues.LogHandlerUtils;
034import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
035import org.apache.hadoop.hbase.namequeues.NamedQueueService;
036import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
037import org.apache.hadoop.hbase.namequeues.SlowLogPersistentService;
038import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
039import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
045import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
046import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
047import org.apache.hbase.thirdparty.com.google.protobuf.Message;
048
049import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
053
054/**
055 * In-memory Queue service provider for Slow/LargeLog events
056 */
057@InterfaceAudience.Private
058public class SlowLogQueueService implements NamedQueueService {
059
060  private static final Logger LOG = LoggerFactory.getLogger(SlowLogQueueService.class);
061
062  private static final String SLOW_LOG_RING_BUFFER_SIZE =
063    "hbase.regionserver.slowlog.ringbuffer.size";
064
065  private final boolean isOnlineLogProviderEnabled;
066  private final boolean isSlowLogTableEnabled;
067  private final SlowLogPersistentService slowLogPersistentService;
068  private final Queue<TooSlowLog.SlowLogPayload> slowLogQueue;
069
070  public SlowLogQueueService(Configuration conf) {
071    this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
072      HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
073
074    if (!isOnlineLogProviderEnabled) {
075      this.isSlowLogTableEnabled = false;
076      this.slowLogPersistentService = null;
077      this.slowLogQueue = null;
078      return;
079    }
080
081    // Initialize SlowLog Queue
082    int slowLogQueueSize =
083      conf.getInt(SLOW_LOG_RING_BUFFER_SIZE, HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE);
084
085    EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueue =
086      EvictingQueue.create(slowLogQueueSize);
087    slowLogQueue = Queues.synchronizedQueue(evictingQueue);
088
089    this.isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
090      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
091    if (isSlowLogTableEnabled) {
092      slowLogPersistentService = new SlowLogPersistentService(conf);
093    } else {
094      slowLogPersistentService = null;
095    }
096  }
097
098  @Override
099  public NamedQueuePayload.NamedQueueEvent getEvent() {
100    return NamedQueuePayload.NamedQueueEvent.SLOW_LOG;
101  }
102
103  /**
104   * This implementation is specific to slowLog event. This consumes slowLog event from
105   * disruptor and inserts records to EvictingQueue.
106   *
107   * @param namedQueuePayload namedQueue payload from disruptor ring buffer
108   */
109  @Override
110  public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
111    if (!isOnlineLogProviderEnabled) {
112      return;
113    }
114    if (!(namedQueuePayload instanceof RpcLogDetails)) {
115      LOG.warn("SlowLogQueueService: NamedQueuePayload is not of type RpcLogDetails.");
116      return;
117    }
118    final RpcLogDetails rpcLogDetails = (RpcLogDetails) namedQueuePayload;
119    final RpcCall rpcCall = rpcLogDetails.getRpcCall();
120    final String clientAddress = rpcLogDetails.getClientAddress();
121    final long responseSize = rpcLogDetails.getResponseSize();
122    final String className = rpcLogDetails.getClassName();
123    final TooSlowLog.SlowLogPayload.Type type = getLogType(rpcLogDetails);
124    if (type == null) {
125      return;
126    }
127    Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod();
128    Message param = rpcLogDetails.getParam();
129    long receiveTime = rpcCall.getReceiveTime();
130    long startTime = rpcCall.getStartTime();
131    long endTime = System.currentTimeMillis();
132    int processingTime = (int) (endTime - startTime);
133    int qTime = (int) (startTime - receiveTime);
134    final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param);
135    int numGets = 0;
136    int numMutations = 0;
137    int numServiceCalls = 0;
138    if (param instanceof ClientProtos.MultiRequest) {
139      ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param;
140      for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
141        for (ClientProtos.Action action : regionAction.getActionList()) {
142          if (action.hasMutation()) {
143            numMutations++;
144          }
145          if (action.hasGet()) {
146            numGets++;
147          }
148          if (action.hasServiceCall()) {
149            numServiceCalls++;
150          }
151        }
152      }
153    }
154    final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY);
155    final String methodDescriptorName =
156      methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY;
157    TooSlowLog.SlowLogPayload slowLogPayload = TooSlowLog.SlowLogPayload.newBuilder()
158      .setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")")
159      .setClientAddress(clientAddress)
160      .setMethodName(methodDescriptorName)
161      .setMultiGets(numGets)
162      .setMultiMutations(numMutations)
163      .setMultiServiceCalls(numServiceCalls)
164      .setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY)
165      .setProcessingTime(processingTime)
166      .setQueueTime(qTime)
167      .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY)
168      .setResponseSize(responseSize)
169      .setServerClass(className)
170      .setStartTime(startTime)
171      .setType(type)
172      .setUserName(userName)
173      .build();
174    slowLogQueue.add(slowLogPayload);
175    if (isSlowLogTableEnabled) {
176      if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) {
177        slowLogPersistentService.addToQueueForSysTable(slowLogPayload);
178      }
179    }
180  }
181
182  @Override
183  public boolean clearNamedQueue() {
184    if (!isOnlineLogProviderEnabled) {
185      return false;
186    }
187    LOG.debug("Received request to clean up online slowlog buffer.");
188    slowLogQueue.clear();
189    return true;
190  }
191
192  @Override
193  public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
194    if (!isOnlineLogProviderEnabled) {
195      return null;
196    }
197    final AdminProtos.SlowLogResponseRequest slowLogResponseRequest =
198      request.getSlowLogResponseRequest();
199    final List<TooSlowLog.SlowLogPayload> slowLogPayloads;
200    if (AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG
201        .equals(slowLogResponseRequest.getLogType())) {
202      slowLogPayloads = getLargeLogPayloads(slowLogResponseRequest);
203    } else {
204      slowLogPayloads = getSlowLogPayloads(slowLogResponseRequest);
205    }
206    NamedQueueGetResponse response = new NamedQueueGetResponse();
207    response.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
208    response.setSlowLogPayloads(slowLogPayloads);
209    return response;
210  }
211
212  private TooSlowLog.SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) {
213    final boolean isSlowLog = rpcCallDetails.isSlowLog();
214    final boolean isLargeLog = rpcCallDetails.isLargeLog();
215    final TooSlowLog.SlowLogPayload.Type type;
216    if (!isSlowLog && !isLargeLog) {
217      LOG.error("slowLog and largeLog both are false. Ignoring the event. rpcCallDetails: {}",
218        rpcCallDetails);
219      return null;
220    }
221    if (isSlowLog && isLargeLog) {
222      type = TooSlowLog.SlowLogPayload.Type.ALL;
223    } else if (isSlowLog) {
224      type = TooSlowLog.SlowLogPayload.Type.SLOW_LOG;
225    } else {
226      type = TooSlowLog.SlowLogPayload.Type.LARGE_LOG;
227    }
228    return type;
229  }
230
231  /**
232   * Add all slowLog events to system table. This is only for slowLog event's persistence on
233   * system table.
234   */
235  @Override
236  public void persistAll() {
237    if (!isOnlineLogProviderEnabled) {
238      return;
239    }
240    if (slowLogPersistentService != null) {
241      slowLogPersistentService.addAllLogsToSysTable();
242    }
243  }
244
245  private List<TooSlowLog.SlowLogPayload> getSlowLogPayloads(
246      final AdminProtos.SlowLogResponseRequest request) {
247    List<TooSlowLog.SlowLogPayload> slowLogPayloadList =
248      Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0])).filter(
249        e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL
250          || e.getType() == TooSlowLog.SlowLogPayload.Type.SLOW_LOG).collect(Collectors.toList());
251    // latest slow logs first, operator is interested in latest records from in-memory buffer
252    Collections.reverse(slowLogPayloadList);
253    return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
254  }
255
256  private List<TooSlowLog.SlowLogPayload> getLargeLogPayloads(
257      final AdminProtos.SlowLogResponseRequest request) {
258    List<TooSlowLog.SlowLogPayload> slowLogPayloadList =
259      Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0])).filter(
260        e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL
261          || e.getType() == TooSlowLog.SlowLogPayload.Type.LARGE_LOG).collect(Collectors.toList());
262    // latest large logs first, operator is interested in latest records from in-memory buffer
263    Collections.reverse(slowLogPayloadList);
264    return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
265  }
266
267}