001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.namequeues; 019 020import com.lmax.disruptor.EventHandler; 021import com.lmax.disruptor.RingBuffer; 022import java.lang.reflect.InvocationTargetException; 023import java.util.HashMap; 024import java.util.Map; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.client.Connection; 027import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 028import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * Event Handler run by disruptor ringbuffer consumer. Although this is generic implementation for 035 * namedQueue, it can have individual queue specific logic. 036 */ 037@InterfaceAudience.Private 038class LogEventHandler implements EventHandler<RingBufferEnvelope> { 039 040 private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class); 041 042 // Map that binds namedQueues to corresponding queue service implementation. 043 // If NamedQueue of specific type is enabled, corresponding service will be used to 044 // insert and retrieve records. 045 // Individual queue sizes should be determined based on their individual configs within 046 // each service. 047 private final Map<NamedQueuePayload.NamedQueueEvent, NamedQueueService> namedQueueServices = 048 new HashMap<>(); 049 050 private static final String NAMED_QUEUE_PROVIDER_CLASSES = "hbase.namedqueue.provider.classes"; 051 052 LogEventHandler(final Configuration conf) { 053 for (String implName : conf.getStringCollection(NAMED_QUEUE_PROVIDER_CLASSES)) { 054 Class<?> clz; 055 try { 056 clz = Class.forName(implName); 057 } catch (ClassNotFoundException e) { 058 LOG.warn("Failed to find NamedQueueService implementor class {}", implName, e); 059 continue; 060 } 061 062 if (!NamedQueueService.class.isAssignableFrom(clz)) { 063 LOG.warn("Class {} is not implementor of NamedQueueService.", clz); 064 continue; 065 } 066 067 // add all service mappings here 068 try { 069 NamedQueueService namedQueueService = 070 (NamedQueueService) clz.getConstructor(Configuration.class).newInstance(conf); 071 namedQueueServices.put(namedQueueService.getEvent(), namedQueueService); 072 } catch (InstantiationException | IllegalAccessException | NoSuchMethodException 073 | InvocationTargetException e) { 074 LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.", clz, 075 e); 076 } 077 } 078 } 079 080 /** 081 * Called when a publisher has published an event to the {@link RingBuffer}. This is generic 082 * consumer of disruptor ringbuffer and for each new namedQueue that we add, we should also 083 * provide specific consumer logic here. 084 * @param event published to the {@link RingBuffer} 085 * @param sequence of the event being processed 086 * @param endOfBatch flag to indicate if this is the last event in a batch from the 087 * {@link RingBuffer} 088 */ 089 @Override 090 public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch) { 091 final NamedQueuePayload namedQueuePayload = event.getPayload(); 092 // consume ringbuffer payload based on event type 093 namedQueueServices.get(namedQueuePayload.getNamedQueueEvent()) 094 .consumeEventFromDisruptor(namedQueuePayload); 095 } 096 097 /** 098 * Cleans up queues maintained by services. 099 * @param namedQueueEvent type of queue to clear 100 * @return true if queue is cleaned up, false otherwise 101 */ 102 boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { 103 return namedQueueServices.get(namedQueueEvent).clearNamedQueue(); 104 } 105 106 /** 107 * Add all in memory queue records to system table. The implementors can use system table or 108 * direct HDFS file or ZK as persistence system. 109 */ 110 void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent, Connection connection) { 111 namedQueueServices.get(namedQueueEvent).persistAll(connection); 112 } 113 114 /** 115 * Retrieve in memory queue records from ringbuffer 116 * @param request namedQueue request with event type 117 * @return queue records from ringbuffer after filter (if applied) 118 */ 119 NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { 120 return namedQueueServices.get(request.getNamedQueueEvent()).getNamedQueueRecords(request); 121 } 122 123}