001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.namequeues.impl; 019 020import java.util.Arrays; 021import java.util.Collections; 022import java.util.List; 023import java.util.Queue; 024import java.util.stream.Collectors; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.client.BalancerRejection; 027import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; 028import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails; 029import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; 030import org.apache.hadoop.hbase.namequeues.NamedQueueService; 031import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 032import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; 038import org.apache.hbase.thirdparty.com.google.common.collect.Queues; 039 040import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs; 041 042/** 043 * In-memory Queue service provider for Balancer Rejection events 044 */ 045@InterfaceAudience.Private 046public class BalancerRejectionQueueService implements NamedQueueService { 047 048 private static final Logger LOG = LoggerFactory.getLogger(BalancerRejectionQueueService.class); 049 050 private final boolean isBalancerRejectionRecording; 051 private static final String BALANCER_REJECTION_QUEUE_SIZE = 052 "hbase.master.balancer.rejection.queue.size"; 053 private static final int DEFAULT_BALANCER_REJECTION_QUEUE_SIZE = 250; 054 055 private final Queue<RecentLogs.BalancerRejection> balancerRejectionQueue; 056 057 public BalancerRejectionQueueService(Configuration conf) { 058 isBalancerRejectionRecording = 059 conf.getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED, 060 BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED); 061 if (!isBalancerRejectionRecording) { 062 balancerRejectionQueue = null; 063 return; 064 } 065 final int queueSize = 066 conf.getInt(BALANCER_REJECTION_QUEUE_SIZE, DEFAULT_BALANCER_REJECTION_QUEUE_SIZE); 067 final EvictingQueue<RecentLogs.BalancerRejection> evictingQueue = 068 EvictingQueue.create(queueSize); 069 balancerRejectionQueue = Queues.synchronizedQueue(evictingQueue); 070 } 071 072 @Override 073 public NamedQueuePayload.NamedQueueEvent getEvent() { 074 return NamedQueuePayload.NamedQueueEvent.BALANCE_REJECTION; 075 } 076 077 @Override 078 public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { 079 if (!isBalancerRejectionRecording) { 080 return; 081 } 082 if (!(namedQueuePayload instanceof BalancerRejectionDetails)) { 083 LOG.warn("BalancerRejectionQueueService: NamedQueuePayload is not of type" 084 + " BalancerRejectionDetails."); 085 return; 086 } 087 BalancerRejectionDetails balancerRejectionDetails = 088 (BalancerRejectionDetails) namedQueuePayload; 089 BalancerRejection balancerRejectionRecord = balancerRejectionDetails.getBalancerRejection(); 090 RecentLogs.BalancerRejection BalancerRejection = 091 RecentLogs.BalancerRejection.newBuilder().setReason(balancerRejectionRecord.getReason()) 092 .addAllCostFuncInfo(balancerRejectionRecord.getCostFuncInfoList()).build(); 093 balancerRejectionQueue.add(BalancerRejection); 094 } 095 096 @Override 097 public boolean clearNamedQueue() { 098 if (!isBalancerRejectionRecording) { 099 return false; 100 } 101 LOG.debug("Received request to clean up balancer rejection queue."); 102 balancerRejectionQueue.clear(); 103 return true; 104 } 105 106 @Override 107 public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { 108 if (!isBalancerRejectionRecording) { 109 return null; 110 } 111 List<RecentLogs.BalancerRejection> balancerRejections = 112 Arrays.stream(balancerRejectionQueue.toArray(new RecentLogs.BalancerRejection[0])) 113 .collect(Collectors.toList()); 114 // latest records should be displayed first, hence reverse order sorting 115 Collections.reverse(balancerRejections); 116 int limit = balancerRejections.size(); 117 if (request.getBalancerRejectionsRequest().hasLimit()) { 118 limit = 119 Math.min(request.getBalancerRejectionsRequest().getLimit(), balancerRejections.size()); 120 } 121 // filter limit if provided 122 balancerRejections = balancerRejections.subList(0, limit); 123 final NamedQueueGetResponse namedQueueGetResponse = new NamedQueueGetResponse(); 124 namedQueueGetResponse.setNamedQueueEvent(BalancerRejectionDetails.BALANCER_REJECTION_EVENT); 125 namedQueueGetResponse.setBalancerRejections(balancerRejections); 126 return namedQueueGetResponse; 127 } 128 129 @Override 130 public void persistAll() { 131 // no-op for now 132 } 133 134}