001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.regionserver.slowlog; 021 022import com.lmax.disruptor.BlockingWaitStrategy; 023import com.lmax.disruptor.RingBuffer; 024import com.lmax.disruptor.dsl.Disruptor; 025import com.lmax.disruptor.dsl.ProducerType; 026 027import java.util.Collections; 028import java.util.List; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.util.Threads; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.apache.yetus.audience.InterfaceStability; 036 037import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 038 039import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; 041 042/** 043 * Online Slow/Large Log Provider Service that keeps slow/large RPC logs in the ring buffer. 044 * The service uses LMAX Disruptor to save slow records which are then consumed by 045 * a queue and based on the ring buffer size, the available records are then fetched 046 * from the queue in thread-safe manner. 047 */ 048@InterfaceAudience.Private 049@InterfaceStability.Evolving 050public class SlowLogRecorder { 051 052 private final Disruptor<RingBufferEnvelope> disruptor; 053 private final LogEventHandler logEventHandler; 054 private final int eventCount; 055 private final boolean isOnlineLogProviderEnabled; 056 057 private static final String SLOW_LOG_RING_BUFFER_SIZE = 058 "hbase.regionserver.slowlog.ringbuffer.size"; 059 060 /** 061 * Initialize disruptor with configurable ringbuffer size 062 */ 063 public SlowLogRecorder(Configuration conf) { 064 isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, 065 HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); 066 067 if (!isOnlineLogProviderEnabled) { 068 this.disruptor = null; 069 this.logEventHandler = null; 070 this.eventCount = 0; 071 return; 072 } 073 074 this.eventCount = conf.getInt(SLOW_LOG_RING_BUFFER_SIZE, 075 HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE); 076 077 // This is the 'writer' -- a single threaded executor. This single thread consumes what is 078 // put on the ringbuffer. 079 final String hostingThreadName = Thread.currentThread().getName(); 080 081 // disruptor initialization with BlockingWaitStrategy 082 this.disruptor = new Disruptor<>(RingBufferEnvelope::new, 083 getEventCount(), 084 Threads.newDaemonThreadFactory(hostingThreadName + ".slowlog.append"), 085 ProducerType.MULTI, 086 new BlockingWaitStrategy()); 087 this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler()); 088 089 // initialize ringbuffer event handler 090 final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, 091 HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); 092 this.logEventHandler = new LogEventHandler(this.eventCount, isSlowLogTableEnabled, conf); 093 this.disruptor.handleEventsWith(new LogEventHandler[]{this.logEventHandler}); 094 this.disruptor.start(); 095 } 096 097 // must be power of 2 for disruptor ringbuffer 098 private int getEventCount() { 099 Preconditions.checkArgument(eventCount >= 0, 100 SLOW_LOG_RING_BUFFER_SIZE + " must be > 0"); 101 int floor = Integer.highestOneBit(eventCount); 102 if (floor == eventCount) { 103 return floor; 104 } 105 // max capacity is 1 << 30 106 if (floor >= 1 << 29) { 107 return 1 << 30; 108 } 109 return floor << 1; 110 } 111 112 /** 113 * Retrieve online slow logs from ringbuffer 114 * 115 * @param request slow log request parameters 116 * @return online slow logs from ringbuffer 117 */ 118 public List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) { 119 return isOnlineLogProviderEnabled ? this.logEventHandler.getSlowLogPayloads(request) 120 : Collections.emptyList(); 121 } 122 123 /** 124 * Retrieve online large logs from ringbuffer 125 * 126 * @param request large log request parameters 127 * @return online large logs from ringbuffer 128 */ 129 public List<SlowLogPayload> getLargeLogPayloads(AdminProtos.SlowLogResponseRequest request) { 130 return isOnlineLogProviderEnabled ? this.logEventHandler.getLargeLogPayloads(request) 131 : Collections.emptyList(); 132 } 133 134 /** 135 * clears slow log payloads from ringbuffer 136 * 137 * @return true if slow log payloads are cleaned up or 138 * hbase.regionserver.slowlog.buffer.enabled is not set to true, false if failed to 139 * clean up slow logs 140 */ 141 public boolean clearSlowLogPayloads() { 142 if (!isOnlineLogProviderEnabled) { 143 return true; 144 } 145 return this.logEventHandler.clearSlowLogs(); 146 } 147 148 /** 149 * Add slow log rpcCall details to ringbuffer 150 * 151 * @param rpcLogDetails all details of rpc call that would be useful for ring buffer 152 * consumers 153 */ 154 public void addSlowLogPayload(RpcLogDetails rpcLogDetails) { 155 if (!isOnlineLogProviderEnabled) { 156 return; 157 } 158 RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer(); 159 long seqId = ringBuffer.next(); 160 try { 161 ringBuffer.get(seqId).load(rpcLogDetails); 162 } finally { 163 ringBuffer.publish(seqId); 164 } 165 } 166 167 /** 168 * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch 169 */ 170 public void addAllLogsToSysTable() { 171 if (this.logEventHandler != null) { 172 this.logEventHandler.addAllLogsToSysTable(); 173 } 174 } 175 176}