001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.namequeues; 021 022import com.lmax.disruptor.EventHandler; 023import com.lmax.disruptor.RingBuffer; 024 025import java.lang.reflect.InvocationTargetException; 026import java.util.HashMap; 027import java.util.Map; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 031import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * Event Handler run by disruptor ringbuffer consumer. 038 * Although this is generic implementation for namedQueue, it can have individual queue specific 039 * logic. 040 */ 041@InterfaceAudience.Private 042class LogEventHandler implements EventHandler<RingBufferEnvelope> { 043 044 private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class); 045 046 // Map that binds namedQueues to corresponding queue service implementation. 047 // If NamedQueue of specific type is enabled, corresponding service will be used to 048 // insert and retrieve records. 049 // Individual queue sizes should be determined based on their individual configs within 050 // each service. 051 private final Map<NamedQueuePayload.NamedQueueEvent, NamedQueueService> namedQueueServices = 052 new HashMap<>(); 053 054 private static final String NAMED_QUEUE_PROVIDER_CLASSES = "hbase.namedqueue.provider.classes"; 055 056 LogEventHandler(final Configuration conf) { 057 for (String implName : conf.getStringCollection(NAMED_QUEUE_PROVIDER_CLASSES)) { 058 Class<?> clz; 059 try { 060 clz = Class.forName(implName); 061 } catch (ClassNotFoundException e) { 062 LOG.warn("Failed to find NamedQueueService implementor class {}", implName, e); 063 continue; 064 } 065 066 if (!NamedQueueService.class.isAssignableFrom(clz)) { 067 LOG.warn("Class {} is not implementor of NamedQueueService.", clz); 068 continue; 069 } 070 071 // add all service mappings here 072 try { 073 NamedQueueService namedQueueService = 074 (NamedQueueService) clz.getConstructor(Configuration.class).newInstance(conf); 075 namedQueueServices.put(namedQueueService.getEvent(), namedQueueService); 076 } catch (InstantiationException | IllegalAccessException | NoSuchMethodException 077 | InvocationTargetException e) { 078 LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.", 079 clz); 080 } 081 } 082 } 083 084 /** 085 * Called when a publisher has published an event to the {@link RingBuffer}. 086 * This is generic consumer of disruptor ringbuffer and for each new namedQueue that we 087 * add, we should also provide specific consumer logic here. 088 * 089 * @param event published to the {@link RingBuffer} 090 * @param sequence of the event being processed 091 * @param endOfBatch flag to indicate if this is the last event in a batch from 092 * the {@link RingBuffer} 093 */ 094 @Override 095 public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch) { 096 final NamedQueuePayload namedQueuePayload = event.getPayload(); 097 // consume ringbuffer payload based on event type 098 namedQueueServices.get(namedQueuePayload.getNamedQueueEvent()) 099 .consumeEventFromDisruptor(namedQueuePayload); 100 } 101 102 /** 103 * Cleans up queues maintained by services. 104 * 105 * @param namedQueueEvent type of queue to clear 106 * @return true if queue is cleaned up, false otherwise 107 */ 108 boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { 109 return namedQueueServices.get(namedQueueEvent).clearNamedQueue(); 110 } 111 112 /** 113 * Add all in memory queue records to system table. The implementors can use system table 114 * or direct HDFS file or ZK as persistence system. 115 */ 116 void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { 117 namedQueueServices.get(namedQueueEvent).persistAll(); 118 } 119 120 /** 121 * Retrieve in memory queue records from ringbuffer 122 * 123 * @param request namedQueue request with event type 124 * @return queue records from ringbuffer after filter (if applied) 125 */ 126 NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { 127 return namedQueueServices.get(request.getNamedQueueEvent()).getNamedQueueRecords(request); 128 } 129 130}