001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.namequeues;
019
020import com.lmax.disruptor.BlockingWaitStrategy;
021import com.lmax.disruptor.RingBuffer;
022import com.lmax.disruptor.dsl.Disruptor;
023import com.lmax.disruptor.dsl.ProducerType;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
026import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
027import org.apache.hadoop.hbase.util.Threads;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.apache.yetus.audience.InterfaceStability;
030
031import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
032import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
033
034/**
035 * NamedQueue recorder that maintains various named queues. The service uses LMAX Disruptor to save
036 * queue records which are then consumed by a queue and based on the ring buffer size, the available
037 * records are then fetched from the queue in thread-safe manner.
038 */
039@InterfaceAudience.Private
040@InterfaceStability.Evolving
041public class NamedQueueRecorder {
042
043  private final Disruptor<RingBufferEnvelope> disruptor;
044  private final LogEventHandler logEventHandler;
045
046  private static NamedQueueRecorder namedQueueRecorder;
047  private static boolean isInit = false;
048  private static final Object LOCK = new Object();
049
050  /**
051   * Initialize disruptor with configurable ringbuffer size
052   */
053  private NamedQueueRecorder(Configuration conf) {
054
055    // This is the 'writer' -- a single threaded executor. This single thread consumes what is
056    // put on the ringbuffer.
057    final String hostingThreadName = Thread.currentThread().getName();
058
059    int eventCount = conf.getInt("hbase.namedqueue.ringbuffer.size", 1024);
060
061    // disruptor initialization with BlockingWaitStrategy
062    this.disruptor = new Disruptor<>(RingBufferEnvelope::new, getEventCount(eventCount),
063      new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".slowlog.append-pool-%d")
064        .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
065      ProducerType.MULTI, new BlockingWaitStrategy());
066    this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
067
068    // initialize ringbuffer event handler
069    this.logEventHandler = new LogEventHandler(conf);
070    this.disruptor.handleEventsWith(new LogEventHandler[] { this.logEventHandler });
071    this.disruptor.start();
072  }
073
074  public static NamedQueueRecorder getInstance(Configuration conf) {
075    if (namedQueueRecorder != null) {
076      return namedQueueRecorder;
077    }
078    synchronized (LOCK) {
079      if (!isInit) {
080        namedQueueRecorder = new NamedQueueRecorder(conf);
081        isInit = true;
082      }
083    }
084    return namedQueueRecorder;
085  }
086
087  // must be power of 2 for disruptor ringbuffer
088  private int getEventCount(int eventCount) {
089    Preconditions.checkArgument(eventCount >= 0, "hbase.namedqueue.ringbuffer.size must be > 0");
090    int floor = Integer.highestOneBit(eventCount);
091    if (floor == eventCount) {
092      return floor;
093    }
094    // max capacity is 1 << 30
095    if (floor >= 1 << 29) {
096      return 1 << 30;
097    }
098    return floor << 1;
099  }
100
101  /**
102   * Retrieve in memory queue records from ringbuffer
103   * @param request namedQueue request with event type
104   * @return queue records from ringbuffer after filter (if applied)
105   */
106  public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
107    return this.logEventHandler.getNamedQueueRecords(request);
108  }
109
110  /**
111   * clears queue records from ringbuffer
112   * @param namedQueueEvent type of queue to clear
113   * @return true if slow log payloads are cleaned up or hbase.regionserver.slowlog.buffer.enabled
114   *         is not set to true, false if failed to clean up slow logs
115   */
116  public boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
117    return this.logEventHandler.clearNamedQueue(namedQueueEvent);
118  }
119
120  /**
121   * Add various NamedQueue records to ringbuffer. Based on the type of the event (e.g slowLog),
122   * consumer of disruptor ringbuffer will have specific logic. This method is producer of disruptor
123   * ringbuffer which is initialized in NamedQueueRecorder constructor.
124   * @param namedQueuePayload namedQueue payload sent by client of ring buffer service
125   */
126  public void addRecord(NamedQueuePayload namedQueuePayload) {
127    RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer();
128    long seqId = ringBuffer.next();
129    try {
130      ringBuffer.get(seqId).load(namedQueuePayload);
131    } finally {
132      ringBuffer.publish(seqId);
133    }
134  }
135
136  /**
137   * Add all in memory queue records to system table. The implementors can use system table or
138   * direct HDFS file or ZK as persistence system.
139   */
140  public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
141    if (this.logEventHandler != null) {
142      this.logEventHandler.persistAll(namedQueueEvent);
143    }
144  }
145
146}