001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.namequeues;
021
022import com.lmax.disruptor.EventHandler;
023import com.lmax.disruptor.RingBuffer;
024
025import java.lang.reflect.InvocationTargetException;
026import java.util.HashMap;
027import java.util.Map;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
031import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * Event Handler run by disruptor ringbuffer consumer.
038 * Although this is generic implementation for namedQueue, it can have individual queue specific
039 * logic.
040 */
041@InterfaceAudience.Private
042class LogEventHandler implements EventHandler<RingBufferEnvelope> {
043
044  private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class);
045
046  // Map that binds namedQueues to corresponding queue service implementation.
047  // If NamedQueue of specific type is enabled, corresponding service will be used to
048  // insert and retrieve records.
049  // Individual queue sizes should be determined based on their individual configs within
050  // each service.
051  private final Map<NamedQueuePayload.NamedQueueEvent, NamedQueueService> namedQueueServices =
052    new HashMap<>();
053
054  private static final String NAMED_QUEUE_PROVIDER_CLASSES = "hbase.namedqueue.provider.classes";
055
056  LogEventHandler(final Configuration conf) {
057    for (String implName : conf.getStringCollection(NAMED_QUEUE_PROVIDER_CLASSES)) {
058      Class<?> clz;
059      try {
060        clz = Class.forName(implName);
061      } catch (ClassNotFoundException e) {
062        LOG.warn("Failed to find NamedQueueService implementor class {}", implName, e);
063        continue;
064      }
065
066      if (!NamedQueueService.class.isAssignableFrom(clz)) {
067        LOG.warn("Class {} is not implementor of NamedQueueService.", clz);
068        continue;
069      }
070
071      // add all service mappings here
072      try {
073        NamedQueueService namedQueueService =
074          (NamedQueueService) clz.getConstructor(Configuration.class).newInstance(conf);
075        namedQueueServices.put(namedQueueService.getEvent(), namedQueueService);
076      } catch (InstantiationException | IllegalAccessException | NoSuchMethodException
077          | InvocationTargetException e) {
078        LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.",
079          clz);
080      }
081    }
082  }
083
084  /**
085   * Called when a publisher has published an event to the {@link RingBuffer}.
086   * This is generic consumer of disruptor ringbuffer and for each new namedQueue that we
087   * add, we should also provide specific consumer logic here.
088   *
089   * @param event published to the {@link RingBuffer}
090   * @param sequence of the event being processed
091   * @param endOfBatch flag to indicate if this is the last event in a batch from
092   *   the {@link RingBuffer}
093   */
094  @Override
095  public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch) {
096    final NamedQueuePayload namedQueuePayload = event.getPayload();
097    // consume ringbuffer payload based on event type
098    namedQueueServices.get(namedQueuePayload.getNamedQueueEvent())
099      .consumeEventFromDisruptor(namedQueuePayload);
100  }
101
102  /**
103   * Cleans up queues maintained by services.
104   *
105   * @param namedQueueEvent type of queue to clear
106   * @return true if queue is cleaned up, false otherwise
107   */
108  boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
109    return namedQueueServices.get(namedQueueEvent).clearNamedQueue();
110  }
111
112  /**
113   * Add all in memory queue records to system table. The implementors can use system table
114   * or direct HDFS file or ZK as persistence system.
115   */
116  void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
117    namedQueueServices.get(namedQueueEvent).persistAll();
118  }
119
120  /**
121   * Retrieve in memory queue records from ringbuffer
122   *
123   * @param request namedQueue request with event type
124   * @return queue records from ringbuffer after filter (if applied)
125   */
126  NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
127    return namedQueueServices.get(request.getNamedQueueEvent()).getNamedQueueRecords(request);
128  }
129
130}