001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.namequeues;
021
022import com.lmax.disruptor.BlockingWaitStrategy;
023import com.lmax.disruptor.RingBuffer;
024import com.lmax.disruptor.dsl.Disruptor;
025import com.lmax.disruptor.dsl.ProducerType;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
029import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
030import org.apache.hadoop.hbase.util.Threads;
031import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.apache.yetus.audience.InterfaceStability;
034
035import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
036
037/**
038 * NamedQueue recorder that maintains various named queues.
039 * The service uses LMAX Disruptor to save queue records which are then consumed by
040 * a queue and based on the ring buffer size, the available records are then fetched
041 * from the queue in thread-safe manner.
042 */
043@InterfaceAudience.Private
044@InterfaceStability.Evolving
045public class NamedQueueRecorder {
046
047  private final Disruptor<RingBufferEnvelope> disruptor;
048  private final LogEventHandler logEventHandler;
049
050  private static NamedQueueRecorder namedQueueRecorder;
051  private static boolean isInit = false;
052  private static final Object LOCK = new Object();
053
054  /**
055   * Initialize disruptor with configurable ringbuffer size
056   */
057  private NamedQueueRecorder(Configuration conf) {
058
059    // This is the 'writer' -- a single threaded executor. This single thread consumes what is
060    // put on the ringbuffer.
061    final String hostingThreadName = Thread.currentThread().getName();
062
063    int eventCount = conf.getInt("hbase.namedqueue.ringbuffer.size", 1024);
064
065    // disruptor initialization with BlockingWaitStrategy
066    this.disruptor = new Disruptor<>(RingBufferEnvelope::new, getEventCount(eventCount),
067      new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".slowlog.append-pool-%d")
068        .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
069      ProducerType.MULTI, new BlockingWaitStrategy());
070    this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
071
072    // initialize ringbuffer event handler
073    this.logEventHandler = new LogEventHandler(conf);
074    this.disruptor.handleEventsWith(new LogEventHandler[]{this.logEventHandler});
075    this.disruptor.start();
076  }
077
078  public static NamedQueueRecorder getInstance(Configuration conf) {
079    if (namedQueueRecorder != null) {
080      return namedQueueRecorder;
081    }
082    synchronized (LOCK) {
083      if (!isInit) {
084        namedQueueRecorder = new NamedQueueRecorder(conf);
085        isInit = true;
086      }
087    }
088    return namedQueueRecorder;
089  }
090
091  // must be power of 2 for disruptor ringbuffer
092  private int getEventCount(int eventCount) {
093    Preconditions.checkArgument(eventCount >= 0, "hbase.namedqueue.ringbuffer.size must be > 0");
094    int floor = Integer.highestOneBit(eventCount);
095    if (floor == eventCount) {
096      return floor;
097    }
098    // max capacity is 1 << 30
099    if (floor >= 1 << 29) {
100      return 1 << 30;
101    }
102    return floor << 1;
103  }
104
105  /**
106   * Retrieve in memory queue records from ringbuffer
107   *
108   * @param request namedQueue request with event type
109   * @return queue records from ringbuffer after filter (if applied)
110   */
111  public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
112    return this.logEventHandler.getNamedQueueRecords(request);
113  }
114
115  /**
116   * clears queue records from ringbuffer
117   *
118   * @param namedQueueEvent type of queue to clear
119   * @return true if slow log payloads are cleaned up or
120   *   hbase.regionserver.slowlog.buffer.enabled is not set to true, false if failed to
121   *   clean up slow logs
122   */
123  public boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
124    return this.logEventHandler.clearNamedQueue(namedQueueEvent);
125  }
126
127  /**
128   * Add various NamedQueue records to ringbuffer. Based on the type of the event (e.g slowLog),
129   * consumer of disruptor ringbuffer will have specific logic.
130   * This method is producer of disruptor ringbuffer which is initialized in NamedQueueRecorder
131   * constructor.
132   *
133   * @param namedQueuePayload namedQueue payload sent by client of ring buffer
134   *   service
135   */
136  public void addRecord(NamedQueuePayload namedQueuePayload) {
137    RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer();
138    long seqId = ringBuffer.next();
139    try {
140      ringBuffer.get(seqId).load(namedQueuePayload);
141    } finally {
142      ringBuffer.publish(seqId);
143    }
144  }
145
146  /**
147   * Add all in memory queue records to system table. The implementors can use system table
148   * or direct HDFS file or ZK as persistence system.
149   */
150  public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
151    if (this.logEventHandler != null) {
152      this.logEventHandler.persistAll(namedQueueEvent);
153    }
154  }
155
156}