001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.namequeues;
019
020import com.lmax.disruptor.EventHandler;
021import com.lmax.disruptor.RingBuffer;
022import java.lang.reflect.InvocationTargetException;
023import java.util.HashMap;
024import java.util.Map;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.client.Connection;
027import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
028import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * Event Handler run by disruptor ringbuffer consumer. Although this is generic implementation for
035 * namedQueue, it can have individual queue specific logic.
036 */
037@InterfaceAudience.Private
038class LogEventHandler implements EventHandler<RingBufferEnvelope> {
039
040  private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class);
041
042  // Map that binds namedQueues to corresponding queue service implementation.
043  // If NamedQueue of specific type is enabled, corresponding service will be used to
044  // insert and retrieve records.
045  // Individual queue sizes should be determined based on their individual configs within
046  // each service.
047  private final Map<NamedQueuePayload.NamedQueueEvent, NamedQueueService> namedQueueServices =
048    new HashMap<>();
049
050  private static final String NAMED_QUEUE_PROVIDER_CLASSES = "hbase.namedqueue.provider.classes";
051
052  LogEventHandler(final Configuration conf) {
053    for (String implName : conf.getStringCollection(NAMED_QUEUE_PROVIDER_CLASSES)) {
054      Class<?> clz;
055      try {
056        clz = Class.forName(implName);
057      } catch (ClassNotFoundException e) {
058        LOG.warn("Failed to find NamedQueueService implementor class {}", implName, e);
059        continue;
060      }
061
062      if (!NamedQueueService.class.isAssignableFrom(clz)) {
063        LOG.warn("Class {} is not implementor of NamedQueueService.", clz);
064        continue;
065      }
066
067      // add all service mappings here
068      try {
069        NamedQueueService namedQueueService =
070          (NamedQueueService) clz.getConstructor(Configuration.class).newInstance(conf);
071        namedQueueServices.put(namedQueueService.getEvent(), namedQueueService);
072      } catch (InstantiationException | IllegalAccessException | NoSuchMethodException
073        | InvocationTargetException e) {
074        LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.", clz,
075          e);
076      }
077    }
078  }
079
080  /**
081   * Called when a publisher has published an event to the {@link RingBuffer}. This is generic
082   * consumer of disruptor ringbuffer and for each new namedQueue that we add, we should also
083   * provide specific consumer logic here.
084   * @param event      published to the {@link RingBuffer}
085   * @param sequence   of the event being processed
086   * @param endOfBatch flag to indicate if this is the last event in a batch from the
087   *                   {@link RingBuffer}
088   */
089  @Override
090  public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch) {
091    final NamedQueuePayload namedQueuePayload = event.getPayload();
092    // consume ringbuffer payload based on event type
093    namedQueueServices.get(namedQueuePayload.getNamedQueueEvent())
094      .consumeEventFromDisruptor(namedQueuePayload);
095  }
096
097  /**
098   * Cleans up queues maintained by services.
099   * @param namedQueueEvent type of queue to clear
100   * @return true if queue is cleaned up, false otherwise
101   */
102  boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
103    return namedQueueServices.get(namedQueueEvent).clearNamedQueue();
104  }
105
106  /**
107   * Add all in memory queue records to system table. The implementors can use system table or
108   * direct HDFS file or ZK as persistence system.
109   */
110  void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent, Connection connection) {
111    namedQueueServices.get(namedQueueEvent).persistAll(connection);
112  }
113
114  /**
115   * Retrieve in memory queue records from ringbuffer
116   * @param request namedQueue request with event type
117   * @return queue records from ringbuffer after filter (if applied)
118   */
119  NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
120    return namedQueueServices.get(request.getNamedQueueEvent()).getNamedQueueRecords(request);
121  }
122
123}