001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import com.google.protobuf.RpcCallback; 021import com.google.protobuf.RpcController; 022import com.google.protobuf.Service; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.List; 028 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.CellUtil; 031import org.apache.hadoop.hbase.CoprocessorEnvironment; 032import org.apache.hadoop.hbase.client.Scan; 033import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.ColumnAggregationService; 034import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumRequest; 035import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse; 036import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 037import org.apache.hadoop.hbase.regionserver.InternalScanner; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * The aggregation implementation at a region. 044 */ 045public class ColumnAggregationEndpoint extends ColumnAggregationService 046 implements RegionCoprocessor { 047 private static final Logger LOG = LoggerFactory.getLogger(ColumnAggregationEndpoint.class); 048 private RegionCoprocessorEnvironment env = null; 049 050 @Override 051 public Iterable<Service> getServices() { 052 return Collections.singleton(this); 053 } 054 055 @Override 056 public void start(CoprocessorEnvironment env) throws IOException { 057 if (env instanceof RegionCoprocessorEnvironment) { 058 this.env = (RegionCoprocessorEnvironment)env; 059 return; 060 } 061 throw new CoprocessorException("Must be loaded on a table region!"); 062 } 063 064 @Override 065 public void stop(CoprocessorEnvironment env) throws IOException { 066 // Nothing to do. 067 } 068 069 @Override 070 public void sum(RpcController controller, SumRequest request, RpcCallback<SumResponse> done) { 071 // aggregate at each region 072 Scan scan = new Scan(); 073 // Family is required in pb. Qualifier is not. 074 byte [] family = request.getFamily().toByteArray(); 075 byte [] qualifier = request.hasQualifier()? request.getQualifier().toByteArray(): null; 076 if (request.hasQualifier()) { 077 scan.addColumn(family, qualifier); 078 } else { 079 scan.addFamily(family); 080 } 081 int sumResult = 0; 082 InternalScanner scanner = null; 083 try { 084 scanner = this.env.getRegion().getScanner(scan); 085 List<Cell> curVals = new ArrayList<>(); 086 boolean hasMore = false; 087 do { 088 curVals.clear(); 089 hasMore = scanner.next(curVals); 090 for (Cell kv : curVals) { 091 if (CellUtil.matchingQualifier(kv, qualifier)) { 092 sumResult += Bytes.toInt(kv.getValueArray(), kv.getValueOffset()); 093 } 094 } 095 } while (hasMore); 096 } catch (IOException e) { 097 CoprocessorRpcUtils.setControllerException(controller, e); 098 // Set result to -1 to indicate error. 099 sumResult = -1; 100 LOG.info("Setting sum result to -1 to indicate error", e); 101 } finally { 102 if (scanner != null) { 103 try { 104 scanner.close(); 105 } catch (IOException e) { 106 CoprocessorRpcUtils.setControllerException(controller, e); 107 sumResult = -1; 108 LOG.info("Setting sum result to -1 to indicate error", e); 109 } 110 } 111 } 112 LOG.info("Returning result " + sumResult); 113 done.run(SumResponse.newBuilder().setSum(sumResult).build()); 114 } 115}