001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.namequeues.impl;
019
020import java.util.Arrays;
021import java.util.Collection;
022import java.util.Collections;
023import java.util.List;
024import java.util.Map;
025import java.util.Queue;
026import java.util.stream.Collectors;
027import org.apache.commons.lang3.StringUtils;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.client.Connection;
031import org.apache.hadoop.hbase.client.SlowLogParams;
032import org.apache.hadoop.hbase.ipc.RpcCall;
033import org.apache.hadoop.hbase.namequeues.LogHandlerUtils;
034import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
035import org.apache.hadoop.hbase.namequeues.NamedQueueService;
036import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
037import org.apache.hadoop.hbase.namequeues.SlowLogPersistentService;
038import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
039import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
046import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
047import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
048import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
049import org.apache.hbase.thirdparty.com.google.protobuf.Message;
050
051import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
056
057/**
058 * In-memory Queue service provider for Slow/LargeLog events
059 */
060@InterfaceAudience.Private
061public class SlowLogQueueService implements NamedQueueService {
062
063  private static final Logger LOG = LoggerFactory.getLogger(SlowLogQueueService.class);
064
065  private static final String SLOW_LOG_RING_BUFFER_SIZE =
066    "hbase.regionserver.slowlog.ringbuffer.size";
067
068  private final boolean isOnlineLogProviderEnabled;
069  private final boolean isSlowLogTableEnabled;
070  private final SlowLogPersistentService slowLogPersistentService;
071  private final Queue<TooSlowLog.SlowLogPayload> slowLogQueue;
072  private final boolean slowLogScanPayloadEnabled;
073
074  public SlowLogQueueService(Configuration conf) {
075    this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
076      HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
077    this.slowLogScanPayloadEnabled = conf.getBoolean(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED,
078      HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED_DEFAULT);
079
080    if (!isOnlineLogProviderEnabled) {
081      this.isSlowLogTableEnabled = false;
082      this.slowLogPersistentService = null;
083      this.slowLogQueue = null;
084      return;
085    }
086
087    // Initialize SlowLog Queue
088    int slowLogQueueSize =
089      conf.getInt(SLOW_LOG_RING_BUFFER_SIZE, HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE);
090
091    EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueue = EvictingQueue.create(slowLogQueueSize);
092    slowLogQueue = Queues.synchronizedQueue(evictingQueue);
093
094    this.isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
095      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
096    if (isSlowLogTableEnabled) {
097      slowLogPersistentService = new SlowLogPersistentService(conf);
098    } else {
099      slowLogPersistentService = null;
100    }
101  }
102
103  @Override
104  public NamedQueuePayload.NamedQueueEvent getEvent() {
105    return NamedQueuePayload.NamedQueueEvent.SLOW_LOG;
106  }
107
108  /**
109   * This implementation is specific to slowLog event. This consumes slowLog event from disruptor
110   * and inserts records to EvictingQueue.
111   * @param namedQueuePayload namedQueue payload from disruptor ring buffer
112   */
113  @Override
114  public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
115    if (!isOnlineLogProviderEnabled) {
116      return;
117    }
118    if (!(namedQueuePayload instanceof RpcLogDetails)) {
119      LOG.warn("SlowLogQueueService: NamedQueuePayload is not of type RpcLogDetails.");
120      return;
121    }
122    final RpcLogDetails rpcLogDetails = (RpcLogDetails) namedQueuePayload;
123    final RpcCall rpcCall = rpcLogDetails.getRpcCall();
124    final String clientAddress = rpcLogDetails.getClientAddress();
125    final long responseSize = rpcLogDetails.getResponseSize();
126    final long blockBytesScanned = rpcLogDetails.getBlockBytesScanned();
127    final long fsReadTime = rpcLogDetails.getFsReadTime();
128    final String className = rpcLogDetails.getClassName();
129    final TooSlowLog.SlowLogPayload.Type type = getLogType(rpcLogDetails);
130    if (type == null) {
131      return;
132    }
133    Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod();
134    Message param = rpcLogDetails.getParam();
135    long receiveTime = rpcCall.getReceiveTime();
136    long startTime = rpcCall.getStartTime();
137    long endTime = EnvironmentEdgeManager.currentTime();
138    int processingTime = (int) (endTime - startTime);
139    int qTime = (int) (startTime - receiveTime);
140    final SlowLogParams slowLogParams =
141      ProtobufUtil.getSlowLogParams(param, slowLogScanPayloadEnabled);
142    int numGets = 0;
143    int numMutations = 0;
144    int numServiceCalls = 0;
145    if (param instanceof ClientProtos.MultiRequest) {
146      ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param;
147      for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
148        for (ClientProtos.Action action : regionAction.getActionList()) {
149          if (action.hasMutation()) {
150            numMutations++;
151          }
152          if (action.hasGet()) {
153            numGets++;
154          }
155          if (action.hasServiceCall()) {
156            numServiceCalls++;
157          }
158        }
159      }
160    }
161    final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY);
162    final String methodDescriptorName =
163      methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY;
164    TooSlowLog.SlowLogPayload.Builder slowLogPayloadBuilder = TooSlowLog.SlowLogPayload.newBuilder()
165      .setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")")
166      .setClientAddress(clientAddress).setMethodName(methodDescriptorName).setMultiGets(numGets)
167      .setMultiMutations(numMutations).setMultiServiceCalls(numServiceCalls)
168      .setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY)
169      .setProcessingTime(processingTime).setQueueTime(qTime)
170      .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY)
171      .setResponseSize(responseSize).setBlockBytesScanned(blockBytesScanned)
172      .setFsReadTime(fsReadTime).setServerClass(className).setStartTime(startTime).setType(type)
173      .setUserName(userName)
174      .addAllRequestAttribute(buildNameBytesPairs(rpcLogDetails.getRequestAttributes()))
175      .addAllConnectionAttribute(buildNameBytesPairs(rpcLogDetails.getConnectionAttributes()));
176    if (slowLogParams != null && slowLogParams.getScan() != null) {
177      slowLogPayloadBuilder.setScan(slowLogParams.getScan());
178    }
179    TooSlowLog.SlowLogPayload slowLogPayload = slowLogPayloadBuilder.build();
180    slowLogQueue.add(slowLogPayload);
181    if (isSlowLogTableEnabled) {
182      if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) {
183        slowLogPersistentService.addToQueueForSysTable(slowLogPayload);
184      }
185    }
186  }
187
188  private static Collection<HBaseProtos.NameBytesPair>
189    buildNameBytesPairs(Map<String, byte[]> attributes) {
190    if (attributes == null) {
191      return Collections.emptySet();
192    }
193    return attributes.entrySet().stream().map(attr -> HBaseProtos.NameBytesPair.newBuilder()
194      .setName(attr.getKey()).setValue(ByteString.copyFrom(attr.getValue())).build())
195      .collect(Collectors.toSet());
196  }
197
198  @Override
199  public boolean clearNamedQueue() {
200    if (!isOnlineLogProviderEnabled) {
201      return false;
202    }
203    LOG.debug("Received request to clean up online slowlog buffer.");
204    slowLogQueue.clear();
205    return true;
206  }
207
208  @Override
209  public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
210    if (!isOnlineLogProviderEnabled) {
211      return null;
212    }
213    final AdminProtos.SlowLogResponseRequest slowLogResponseRequest =
214      request.getSlowLogResponseRequest();
215    final List<TooSlowLog.SlowLogPayload> slowLogPayloads;
216    if (
217      AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG
218        .equals(slowLogResponseRequest.getLogType())
219    ) {
220      slowLogPayloads = getLargeLogPayloads(slowLogResponseRequest);
221    } else {
222      slowLogPayloads = getSlowLogPayloads(slowLogResponseRequest);
223    }
224    NamedQueueGetResponse response = new NamedQueueGetResponse();
225    response.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT);
226    response.setSlowLogPayloads(slowLogPayloads);
227    return response;
228  }
229
230  private TooSlowLog.SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) {
231    final boolean isSlowLog = rpcCallDetails.isSlowLog();
232    final boolean isLargeLog = rpcCallDetails.isLargeLog();
233    final TooSlowLog.SlowLogPayload.Type type;
234    if (!isSlowLog && !isLargeLog) {
235      LOG.error("slowLog and largeLog both are false. Ignoring the event. rpcCallDetails: {}",
236        rpcCallDetails);
237      return null;
238    }
239    if (isSlowLog && isLargeLog) {
240      type = TooSlowLog.SlowLogPayload.Type.ALL;
241    } else if (isSlowLog) {
242      type = TooSlowLog.SlowLogPayload.Type.SLOW_LOG;
243    } else {
244      type = TooSlowLog.SlowLogPayload.Type.LARGE_LOG;
245    }
246    return type;
247  }
248
249  /**
250   * Add all slowLog events to system table. This is only for slowLog event's persistence on system
251   * table.
252   */
253  @Override
254  public void persistAll(Connection connection) {
255    if (!isOnlineLogProviderEnabled) {
256      return;
257    }
258    if (slowLogPersistentService != null) {
259      slowLogPersistentService.addAllLogsToSysTable(connection);
260    }
261  }
262
263  private List<TooSlowLog.SlowLogPayload>
264    getSlowLogPayloads(final AdminProtos.SlowLogResponseRequest request) {
265    List<TooSlowLog.SlowLogPayload> slowLogPayloadList =
266      Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0]))
267        .filter(e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL
268          || e.getType() == TooSlowLog.SlowLogPayload.Type.SLOW_LOG)
269        .collect(Collectors.toList());
270    // latest slow logs first, operator is interested in latest records from in-memory buffer
271    Collections.reverse(slowLogPayloadList);
272    return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
273  }
274
275  private List<TooSlowLog.SlowLogPayload>
276    getLargeLogPayloads(final AdminProtos.SlowLogResponseRequest request) {
277    List<
278      TooSlowLog.SlowLogPayload> slowLogPayloadList =
279        Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0]))
280          .filter(e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL
281            || e.getType() == TooSlowLog.SlowLogPayload.Type.LARGE_LOG)
282          .collect(Collectors.toList());
283    // latest large logs first, operator is interested in latest records from in-memory buffer
284    Collections.reverse(slowLogPayloadList);
285    return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
286  }
287
288}