001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.namequeues; 021 022import com.lmax.disruptor.BlockingWaitStrategy; 023import com.lmax.disruptor.RingBuffer; 024import com.lmax.disruptor.dsl.Disruptor; 025import com.lmax.disruptor.dsl.ProducerType; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 029import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 030import org.apache.hadoop.hbase.util.Threads; 031import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.apache.yetus.audience.InterfaceStability; 034 035import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 036 037/** 038 * NamedQueue recorder that maintains various named queues. 039 * The service uses LMAX Disruptor to save queue records which are then consumed by 040 * a queue and based on the ring buffer size, the available records are then fetched 041 * from the queue in thread-safe manner. 042 */ 043@InterfaceAudience.Private 044@InterfaceStability.Evolving 045public class NamedQueueRecorder { 046 047 private final Disruptor<RingBufferEnvelope> disruptor; 048 private final LogEventHandler logEventHandler; 049 050 private static NamedQueueRecorder namedQueueRecorder; 051 private static boolean isInit = false; 052 private static final Object LOCK = new Object(); 053 054 /** 055 * Initialize disruptor with configurable ringbuffer size 056 */ 057 private NamedQueueRecorder(Configuration conf) { 058 059 // This is the 'writer' -- a single threaded executor. This single thread consumes what is 060 // put on the ringbuffer. 061 final String hostingThreadName = Thread.currentThread().getName(); 062 063 int eventCount = conf.getInt("hbase.namedqueue.ringbuffer.size", 1024); 064 065 // disruptor initialization with BlockingWaitStrategy 066 this.disruptor = new Disruptor<>(RingBufferEnvelope::new, getEventCount(eventCount), 067 new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".slowlog.append-pool-%d") 068 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 069 ProducerType.MULTI, new BlockingWaitStrategy()); 070 this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler()); 071 072 // initialize ringbuffer event handler 073 this.logEventHandler = new LogEventHandler(conf); 074 this.disruptor.handleEventsWith(new LogEventHandler[]{this.logEventHandler}); 075 this.disruptor.start(); 076 } 077 078 public static NamedQueueRecorder getInstance(Configuration conf) { 079 if (namedQueueRecorder != null) { 080 return namedQueueRecorder; 081 } 082 synchronized (LOCK) { 083 if (!isInit) { 084 namedQueueRecorder = new NamedQueueRecorder(conf); 085 isInit = true; 086 } 087 } 088 return namedQueueRecorder; 089 } 090 091 // must be power of 2 for disruptor ringbuffer 092 private int getEventCount(int eventCount) { 093 Preconditions.checkArgument(eventCount >= 0, "hbase.namedqueue.ringbuffer.size must be > 0"); 094 int floor = Integer.highestOneBit(eventCount); 095 if (floor == eventCount) { 096 return floor; 097 } 098 // max capacity is 1 << 30 099 if (floor >= 1 << 29) { 100 return 1 << 30; 101 } 102 return floor << 1; 103 } 104 105 /** 106 * Retrieve in memory queue records from ringbuffer 107 * 108 * @param request namedQueue request with event type 109 * @return queue records from ringbuffer after filter (if applied) 110 */ 111 public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { 112 return this.logEventHandler.getNamedQueueRecords(request); 113 } 114 115 /** 116 * clears queue records from ringbuffer 117 * 118 * @param namedQueueEvent type of queue to clear 119 * @return true if slow log payloads are cleaned up or 120 * hbase.regionserver.slowlog.buffer.enabled is not set to true, false if failed to 121 * clean up slow logs 122 */ 123 public boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { 124 return this.logEventHandler.clearNamedQueue(namedQueueEvent); 125 } 126 127 /** 128 * Add various NamedQueue records to ringbuffer. Based on the type of the event (e.g slowLog), 129 * consumer of disruptor ringbuffer will have specific logic. 130 * This method is producer of disruptor ringbuffer which is initialized in NamedQueueRecorder 131 * constructor. 132 * 133 * @param namedQueuePayload namedQueue payload sent by client of ring buffer 134 * service 135 */ 136 public void addRecord(NamedQueuePayload namedQueuePayload) { 137 RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer(); 138 long seqId = ringBuffer.next(); 139 try { 140 ringBuffer.get(seqId).load(namedQueuePayload); 141 } finally { 142 ringBuffer.publish(seqId); 143 } 144 } 145 146 /** 147 * Add all in memory queue records to system table. The implementors can use system table 148 * or direct HDFS file or ZK as persistence system. 149 */ 150 public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { 151 if (this.logEventHandler != null) { 152 this.logEventHandler.persistAll(namedQueueEvent); 153 } 154 } 155 156}