001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.namequeues;
019
020import com.lmax.disruptor.EventHandler;
021import com.lmax.disruptor.RingBuffer;
022import java.lang.reflect.InvocationTargetException;
023import java.util.HashMap;
024import java.util.Map;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
027import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * Event Handler run by disruptor ringbuffer consumer. Although this is generic implementation for
034 * namedQueue, it can have individual queue specific logic.
035 */
036@InterfaceAudience.Private
037class LogEventHandler implements EventHandler<RingBufferEnvelope> {
038
039  private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class);
040
041  // Map that binds namedQueues to corresponding queue service implementation.
042  // If NamedQueue of specific type is enabled, corresponding service will be used to
043  // insert and retrieve records.
044  // Individual queue sizes should be determined based on their individual configs within
045  // each service.
046  private final Map<NamedQueuePayload.NamedQueueEvent, NamedQueueService> namedQueueServices =
047    new HashMap<>();
048
049  private static final String NAMED_QUEUE_PROVIDER_CLASSES = "hbase.namedqueue.provider.classes";
050
051  LogEventHandler(final Configuration conf) {
052    for (String implName : conf.getStringCollection(NAMED_QUEUE_PROVIDER_CLASSES)) {
053      Class<?> clz;
054      try {
055        clz = Class.forName(implName);
056      } catch (ClassNotFoundException e) {
057        LOG.warn("Failed to find NamedQueueService implementor class {}", implName, e);
058        continue;
059      }
060
061      if (!NamedQueueService.class.isAssignableFrom(clz)) {
062        LOG.warn("Class {} is not implementor of NamedQueueService.", clz);
063        continue;
064      }
065
066      // add all service mappings here
067      try {
068        NamedQueueService namedQueueService =
069          (NamedQueueService) clz.getConstructor(Configuration.class).newInstance(conf);
070        namedQueueServices.put(namedQueueService.getEvent(), namedQueueService);
071      } catch (InstantiationException | IllegalAccessException | NoSuchMethodException
072        | InvocationTargetException e) {
073        LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.", clz);
074      }
075    }
076  }
077
078  /**
079   * Called when a publisher has published an event to the {@link RingBuffer}. This is generic
080   * consumer of disruptor ringbuffer and for each new namedQueue that we add, we should also
081   * provide specific consumer logic here.
082   * @param event      published to the {@link RingBuffer}
083   * @param sequence   of the event being processed
084   * @param endOfBatch flag to indicate if this is the last event in a batch from the
085   *                   {@link RingBuffer}
086   */
087  @Override
088  public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch) {
089    final NamedQueuePayload namedQueuePayload = event.getPayload();
090    // consume ringbuffer payload based on event type
091    namedQueueServices.get(namedQueuePayload.getNamedQueueEvent())
092      .consumeEventFromDisruptor(namedQueuePayload);
093  }
094
095  /**
096   * Cleans up queues maintained by services.
097   * @param namedQueueEvent type of queue to clear
098   * @return true if queue is cleaned up, false otherwise
099   */
100  boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
101    return namedQueueServices.get(namedQueueEvent).clearNamedQueue();
102  }
103
104  /**
105   * Add all in memory queue records to system table. The implementors can use system table or
106   * direct HDFS file or ZK as persistence system.
107   */
108  void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
109    namedQueueServices.get(namedQueueEvent).persistAll();
110  }
111
112  /**
113   * Retrieve in memory queue records from ringbuffer
114   * @param request namedQueue request with event type
115   * @return queue records from ringbuffer after filter (if applied)
116   */
117  NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
118    return namedQueueServices.get(request.getNamedQueueEvent()).getNamedQueueRecords(request);
119  }
120
121}