001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.regionserver.slowlog;
021
022import com.lmax.disruptor.BlockingWaitStrategy;
023import com.lmax.disruptor.RingBuffer;
024import com.lmax.disruptor.dsl.Disruptor;
025import com.lmax.disruptor.dsl.ProducerType;
026
027import java.util.Collections;
028import java.util.List;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.util.Threads;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.apache.yetus.audience.InterfaceStability;
036
037import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
038
039import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
041
042/**
043 * Online Slow/Large Log Provider Service that keeps slow/large RPC logs in the ring buffer.
044 * The service uses LMAX Disruptor to save slow records which are then consumed by
045 * a queue and based on the ring buffer size, the available records are then fetched
046 * from the queue in thread-safe manner.
047 */
048@InterfaceAudience.Private
049@InterfaceStability.Evolving
050public class SlowLogRecorder {
051
052  private final Disruptor<RingBufferEnvelope> disruptor;
053  private final LogEventHandler logEventHandler;
054  private final int eventCount;
055  private final boolean isOnlineLogProviderEnabled;
056
057  private static final String SLOW_LOG_RING_BUFFER_SIZE =
058    "hbase.regionserver.slowlog.ringbuffer.size";
059
060  /**
061   * Initialize disruptor with configurable ringbuffer size
062   */
063  public SlowLogRecorder(Configuration conf) {
064    isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
065      HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
066
067    if (!isOnlineLogProviderEnabled) {
068      this.disruptor = null;
069      this.logEventHandler = null;
070      this.eventCount = 0;
071      return;
072    }
073
074    this.eventCount = conf.getInt(SLOW_LOG_RING_BUFFER_SIZE,
075      HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE);
076
077    // This is the 'writer' -- a single threaded executor. This single thread consumes what is
078    // put on the ringbuffer.
079    final String hostingThreadName = Thread.currentThread().getName();
080
081    // disruptor initialization with BlockingWaitStrategy
082    this.disruptor = new Disruptor<>(RingBufferEnvelope::new,
083      getEventCount(),
084      Threads.newDaemonThreadFactory(hostingThreadName + ".slowlog.append"),
085      ProducerType.MULTI,
086      new BlockingWaitStrategy());
087    this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
088
089    // initialize ringbuffer event handler
090    final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
091      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
092    this.logEventHandler = new LogEventHandler(this.eventCount, isSlowLogTableEnabled, conf);
093    this.disruptor.handleEventsWith(new LogEventHandler[]{this.logEventHandler});
094    this.disruptor.start();
095  }
096
097  // must be power of 2 for disruptor ringbuffer
098  private int getEventCount() {
099    Preconditions.checkArgument(eventCount >= 0,
100      SLOW_LOG_RING_BUFFER_SIZE + " must be > 0");
101    int floor = Integer.highestOneBit(eventCount);
102    if (floor == eventCount) {
103      return floor;
104    }
105    // max capacity is 1 << 30
106    if (floor >= 1 << 29) {
107      return 1 << 30;
108    }
109    return floor << 1;
110  }
111
112  /**
113   * Retrieve online slow logs from ringbuffer
114   *
115   * @param request slow log request parameters
116   * @return online slow logs from ringbuffer
117   */
118  public List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) {
119    return isOnlineLogProviderEnabled ? this.logEventHandler.getSlowLogPayloads(request)
120      : Collections.emptyList();
121  }
122
123  /**
124   * Retrieve online large logs from ringbuffer
125   *
126   * @param request large log request parameters
127   * @return online large logs from ringbuffer
128   */
129  public List<SlowLogPayload> getLargeLogPayloads(AdminProtos.SlowLogResponseRequest request) {
130    return isOnlineLogProviderEnabled ? this.logEventHandler.getLargeLogPayloads(request)
131      : Collections.emptyList();
132  }
133
134  /**
135   * clears slow log payloads from ringbuffer
136   *
137   * @return true if slow log payloads are cleaned up or
138   *   hbase.regionserver.slowlog.buffer.enabled is not set to true, false if failed to
139   *   clean up slow logs
140   */
141  public boolean clearSlowLogPayloads() {
142    if (!isOnlineLogProviderEnabled) {
143      return true;
144    }
145    return this.logEventHandler.clearSlowLogs();
146  }
147
148  /**
149   * Add slow log rpcCall details to ringbuffer
150   *
151   * @param rpcLogDetails all details of rpc call that would be useful for ring buffer
152   *   consumers
153   */
154  public void addSlowLogPayload(RpcLogDetails rpcLogDetails) {
155    if (!isOnlineLogProviderEnabled) {
156      return;
157    }
158    RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer();
159    long seqId = ringBuffer.next();
160    try {
161      ringBuffer.get(seqId).load(rpcLogDetails);
162    } finally {
163      ringBuffer.publish(seqId);
164    }
165  }
166
167  /**
168   * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
169   */
170  public void addAllLogsToSysTable() {
171    if (this.logEventHandler != null) {
172      this.logEventHandler.addAllLogsToSysTable();
173    }
174  }
175
176}