001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.namequeues; 019 020import com.lmax.disruptor.BlockingWaitStrategy; 021import com.lmax.disruptor.RingBuffer; 022import com.lmax.disruptor.dsl.Disruptor; 023import com.lmax.disruptor.dsl.ProducerType; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 026import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 027import org.apache.hadoop.hbase.util.Threads; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.apache.yetus.audience.InterfaceStability; 030 031import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 032import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 033 034/** 035 * NamedQueue recorder that maintains various named queues. The service uses LMAX Disruptor to save 036 * queue records which are then consumed by a queue and based on the ring buffer size, the available 037 * records are then fetched from the queue in thread-safe manner. 038 */ 039@InterfaceAudience.Private 040@InterfaceStability.Evolving 041public class NamedQueueRecorder { 042 043 private final Disruptor<RingBufferEnvelope> disruptor; 044 private final LogEventHandler logEventHandler; 045 046 private static NamedQueueRecorder namedQueueRecorder; 047 private static boolean isInit = false; 048 private static final Object LOCK = new Object(); 049 050 /** 051 * Initialize disruptor with configurable ringbuffer size 052 */ 053 private NamedQueueRecorder(Configuration conf) { 054 055 // This is the 'writer' -- a single threaded executor. This single thread consumes what is 056 // put on the ringbuffer. 057 final String hostingThreadName = Thread.currentThread().getName(); 058 059 int eventCount = conf.getInt("hbase.namedqueue.ringbuffer.size", 1024); 060 061 // disruptor initialization with BlockingWaitStrategy 062 this.disruptor = new Disruptor<>(RingBufferEnvelope::new, getEventCount(eventCount), 063 new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".slowlog.append-pool-%d") 064 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 065 ProducerType.MULTI, new BlockingWaitStrategy()); 066 this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler()); 067 068 // initialize ringbuffer event handler 069 this.logEventHandler = new LogEventHandler(conf); 070 this.disruptor.handleEventsWith(new LogEventHandler[] { this.logEventHandler }); 071 this.disruptor.start(); 072 } 073 074 public static NamedQueueRecorder getInstance(Configuration conf) { 075 if (namedQueueRecorder != null) { 076 return namedQueueRecorder; 077 } 078 synchronized (LOCK) { 079 if (!isInit) { 080 namedQueueRecorder = new NamedQueueRecorder(conf); 081 isInit = true; 082 } 083 } 084 return namedQueueRecorder; 085 } 086 087 // must be power of 2 for disruptor ringbuffer 088 private int getEventCount(int eventCount) { 089 Preconditions.checkArgument(eventCount >= 0, "hbase.namedqueue.ringbuffer.size must be > 0"); 090 int floor = Integer.highestOneBit(eventCount); 091 if (floor == eventCount) { 092 return floor; 093 } 094 // max capacity is 1 << 30 095 if (floor >= 1 << 29) { 096 return 1 << 30; 097 } 098 return floor << 1; 099 } 100 101 /** 102 * Retrieve in memory queue records from ringbuffer 103 * @param request namedQueue request with event type 104 * @return queue records from ringbuffer after filter (if applied) 105 */ 106 public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { 107 return this.logEventHandler.getNamedQueueRecords(request); 108 } 109 110 /** 111 * clears queue records from ringbuffer 112 * @param namedQueueEvent type of queue to clear 113 * @return true if slow log payloads are cleaned up or hbase.regionserver.slowlog.buffer.enabled 114 * is not set to true, false if failed to clean up slow logs 115 */ 116 public boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { 117 return this.logEventHandler.clearNamedQueue(namedQueueEvent); 118 } 119 120 /** 121 * Add various NamedQueue records to ringbuffer. Based on the type of the event (e.g slowLog), 122 * consumer of disruptor ringbuffer will have specific logic. This method is producer of disruptor 123 * ringbuffer which is initialized in NamedQueueRecorder constructor. 124 * @param namedQueuePayload namedQueue payload sent by client of ring buffer service 125 */ 126 public void addRecord(NamedQueuePayload namedQueuePayload) { 127 RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer(); 128 long seqId = ringBuffer.next(); 129 try { 130 ringBuffer.get(seqId).load(namedQueuePayload); 131 } finally { 132 ringBuffer.publish(seqId); 133 } 134 } 135 136 /** 137 * Add all in memory queue records to system table. The implementors can use system table or 138 * direct HDFS file or ZK as persistence system. 139 */ 140 public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { 141 if (this.logEventHandler != null) { 142 this.logEventHandler.persistAll(namedQueueEvent); 143 } 144 } 145 146}