001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021 022import java.util.concurrent.ExecutionException; 023import java.util.stream.Collectors; 024import java.util.stream.LongStream; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HBaseTestingUtility; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.coprocessor.AsyncAggregationClient; 030import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter; 031import org.apache.hadoop.hbase.coprocessor.AggregateImplementation; 032import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 033import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.junit.AfterClass; 037import org.junit.BeforeClass; 038import org.junit.ClassRule; 039import org.junit.Test; 040import org.junit.experimental.categories.Category; 041 042@Category({ MediumTests.class, CoprocessorTests.class }) 043public class TestAsyncAggregationClient { 044 045 @ClassRule 046 public static final HBaseClassTestRule CLASS_RULE = 047 HBaseClassTestRule.forClass(TestAsyncAggregationClient.class); 048 049 private static HBaseTestingUtility UTIL = new HBaseTestingUtility(); 050 051 private static TableName TABLE_NAME = TableName.valueOf("TestAsyncAggregationClient"); 052 053 private static byte[] CF = Bytes.toBytes("CF"); 054 055 private static byte[] CQ = Bytes.toBytes("CQ"); 056 057 private static byte[] CQ2 = Bytes.toBytes("CQ2"); 058 059 private static int COUNT = 1000; 060 061 private static AsyncConnection CONN; 062 063 private static AsyncTable<AdvancedScanResultConsumer> TABLE; 064 065 @BeforeClass 066 public static void setUp() throws Exception { 067 Configuration conf = UTIL.getConfiguration(); 068 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 069 AggregateImplementation.class.getName()); 070 UTIL.startMiniCluster(3); 071 byte[][] splitKeys = new byte[8][]; 072 for (int i = 111; i < 999; i += 111) { 073 splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); 074 } 075 UTIL.createTable(TABLE_NAME, CF, splitKeys); 076 CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); 077 TABLE = CONN.getTable(TABLE_NAME); 078 TABLE.putAll(LongStream.range(0, COUNT) 079 .mapToObj(l -> new Put(Bytes.toBytes(String.format("%03d", l))) 080 .addColumn(CF, CQ, Bytes.toBytes(l)).addColumn(CF, CQ2, Bytes.toBytes(l * l))) 081 .collect(Collectors.toList())).get(); 082 } 083 084 @AfterClass 085 public static void tearDown() throws Exception { 086 CONN.close(); 087 UTIL.shutdownMiniCluster(); 088 } 089 090 @Test 091 public void testMax() throws InterruptedException, ExecutionException { 092 assertEquals(COUNT - 1, AsyncAggregationClient 093 .max(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get().longValue()); 094 } 095 096 @Test 097 public void testMin() throws InterruptedException, ExecutionException { 098 assertEquals(0, AsyncAggregationClient 099 .min(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get().longValue()); 100 } 101 102 @Test 103 public void testRowCount() throws InterruptedException, ExecutionException { 104 assertEquals(COUNT, 105 AsyncAggregationClient 106 .rowCount(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get() 107 .longValue()); 108 } 109 110 @Test 111 public void testSum() throws InterruptedException, ExecutionException { 112 assertEquals(COUNT * (COUNT - 1) / 2, AsyncAggregationClient 113 .sum(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get().longValue()); 114 } 115 116 private static final double DELTA = 1E-3; 117 118 @Test 119 public void testAvg() throws InterruptedException, ExecutionException { 120 assertEquals((COUNT - 1) / 2.0, AsyncAggregationClient 121 .avg(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get().doubleValue(), 122 DELTA); 123 } 124 125 @Test 126 public void testStd() throws InterruptedException, ExecutionException { 127 double avgSq = 128 LongStream.range(0, COUNT).map(l -> l * l).reduce((l1, l2) -> l1 + l2).getAsLong() 129 / (double) COUNT; 130 double avg = (COUNT - 1) / 2.0; 131 double std = Math.sqrt(avgSq - avg * avg); 132 assertEquals(std, AsyncAggregationClient 133 .std(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get().doubleValue(), 134 DELTA); 135 } 136 137 @Test 138 public void testMedian() throws InterruptedException, ExecutionException { 139 long halfSum = COUNT * (COUNT - 1) / 4; 140 long median = 0L; 141 long sum = 0L; 142 for (int i = 0; i < COUNT; i++) { 143 sum += i; 144 if (sum > halfSum) { 145 median = i - 1; 146 break; 147 } 148 } 149 assertEquals(median, 150 AsyncAggregationClient 151 .median(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get() 152 .longValue()); 153 } 154 155 @Test 156 public void testMedianWithWeight() throws InterruptedException, ExecutionException { 157 long halfSum = 158 LongStream.range(0, COUNT).map(l -> l * l).reduce((l1, l2) -> l1 + l2).getAsLong() / 2; 159 long median = 0L; 160 long sum = 0L; 161 for (int i = 0; i < COUNT; i++) { 162 sum += i * i; 163 if (sum > halfSum) { 164 median = i - 1; 165 break; 166 } 167 } 168 assertEquals(median, AsyncAggregationClient 169 .median(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ).addColumn(CF, CQ2)) 170 .get().longValue()); 171 } 172}