View Javadoc

1   /*
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.coprocessor;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.NavigableSet;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.KeyValue;
30  import org.apache.hadoop.hbase.client.Scan;
31  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
32  import org.apache.hadoop.hbase.ipc.ProtocolSignature;
33  import org.apache.hadoop.hbase.regionserver.InternalScanner;
34  import org.apache.hadoop.hbase.util.Pair;
35  
36  /**
37   * A concrete AggregateProtocol implementation. Its system level coprocessor
38   * that computes the aggregate function at a region level.
39   */
40  public class AggregateImplementation extends BaseEndpointCoprocessor implements
41      AggregateProtocol {
42    protected static Log log = LogFactory.getLog(AggregateImplementation.class);
43  
44    @Override
45    public ProtocolSignature getProtocolSignature(
46        String protocol, long version, int clientMethodsHashCode)
47    throws IOException {
48      if (AggregateProtocol.class.getName().equals(protocol)) {
49        return new ProtocolSignature(AggregateProtocol.VERSION, null);
50      }
51      throw new IOException("Unknown protocol: " + protocol);
52    }
53  
54    @Override
55    public <T, S> T getMax(ColumnInterpreter<T, S> ci, Scan scan)
56        throws IOException {
57      T temp;
58      T max = null;
59      InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
60          .getRegion().getScanner(scan);
61      List<KeyValue> results = new ArrayList<KeyValue>();
62      byte[] colFamily = scan.getFamilies()[0];
63      byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
64      // qualifier can be null.
65      try {
66        boolean hasMoreRows = false;
67        do {
68          hasMoreRows = scanner.next(results);
69          for (KeyValue kv : results) {
70            temp = ci.getValue(colFamily, qualifier, kv);
71            max = (max == null || (temp != null && ci.compare(temp, max) > 0)) ? temp : max;
72          }
73          results.clear();
74        } while (hasMoreRows);
75      } finally {
76        scanner.close();
77      }
78      log.info("Maximum from this region is "
79          + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
80              .getRegionNameAsString() + ": " + max);
81      return max;
82    }
83  
84    @Override
85    public <T, S> T getMin(ColumnInterpreter<T, S> ci, Scan scan)
86        throws IOException {
87      T min = null;
88      T temp;
89      InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
90          .getRegion().getScanner(scan);
91      List<KeyValue> results = new ArrayList<KeyValue>();
92      byte[] colFamily = scan.getFamilies()[0];
93      byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
94      try {
95        boolean hasMoreRows = false;
96        do {
97          hasMoreRows = scanner.next(results);
98          for (KeyValue kv : results) {
99            temp = ci.getValue(colFamily, qualifier, kv);
100           min = (min == null || (temp != null && ci.compare(temp, min) < 0)) ? temp : min;
101         }
102         results.clear();
103       } while (hasMoreRows);
104     } finally {
105       scanner.close();
106     }
107     log.info("Minimum from this region is "
108         + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
109             .getRegionNameAsString() + ": " + min);
110     return min;
111   }
112 
113   @Override
114   public <T, S> S getSum(ColumnInterpreter<T, S> ci, Scan scan)
115       throws IOException {
116     long sum = 0l;
117     S sumVal = null;
118     T temp;
119     InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
120         .getRegion().getScanner(scan);
121     byte[] colFamily = scan.getFamilies()[0];
122     byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
123     List<KeyValue> results = new ArrayList<KeyValue>();
124     try {
125       boolean hasMoreRows = false;
126       do {
127         hasMoreRows = scanner.next(results);
128         for (KeyValue kv : results) {
129           temp = ci.getValue(colFamily, qualifier, kv);
130           if (temp != null)
131             sumVal = ci.add(sumVal, ci.castToReturnType(temp));
132         }
133         results.clear();
134       } while (hasMoreRows);
135     } finally {
136       scanner.close();
137     }
138     log.debug("Sum from this region is "
139         + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
140             .getRegionNameAsString() + ": " + sum);
141     return sumVal;
142   }
143 
144   @Override
145   public <T, S> long getRowNum(ColumnInterpreter<T, S> ci, Scan scan)
146       throws IOException {
147     long counter = 0l;
148     List<KeyValue> results = new ArrayList<KeyValue>();
149     byte[] colFamily = scan.getFamilies()[0];
150     byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
151     if (scan.getFilter() == null && qualifier == null)
152       scan.setFilter(new FirstKeyOnlyFilter());
153     InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
154         .getRegion().getScanner(scan);
155     try {
156       boolean hasMoreRows = false;
157       do {
158         hasMoreRows = scanner.next(results);
159         if (results.size() > 0) {
160           counter++;
161         }
162         results.clear();
163       } while (hasMoreRows);
164     } finally {
165       scanner.close();
166     }
167     log.info("Row counter from this region is "
168         + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
169             .getRegionNameAsString() + ": " + counter);
170     return counter;
171   }
172 
173   @Override
174   public <T, S> Pair<S, Long> getAvg(ColumnInterpreter<T, S> ci, Scan scan)
175       throws IOException {
176     S sumVal = null;
177     Long rowCountVal = 0l;
178     InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
179         .getRegion().getScanner(scan);
180     byte[] colFamily = scan.getFamilies()[0];
181     byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
182     List<KeyValue> results = new ArrayList<KeyValue>();
183     boolean hasMoreRows = false;
184     try {
185       do {
186         results.clear();
187         hasMoreRows = scanner.next(results);
188         for (KeyValue kv : results) {
189           sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily,
190               qualifier, kv)));
191         }
192         rowCountVal++;
193       } while (hasMoreRows);
194     } finally {
195       scanner.close();
196     }
197     Pair<S, Long> pair = new Pair<S, Long>(sumVal, rowCountVal);
198     return pair;
199   }
200 
201   @Override
202   public <T, S> Pair<List<S>, Long> getStd(ColumnInterpreter<T, S> ci, Scan scan)
203       throws IOException {
204     S sumVal = null, sumSqVal = null, tempVal = null;
205     long rowCountVal = 0l;
206     InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
207         .getRegion().getScanner(scan);
208     byte[] colFamily = scan.getFamilies()[0];
209     byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
210     List<KeyValue> results = new ArrayList<KeyValue>();
211 
212     boolean hasMoreRows = false;
213     try {
214       do {
215         tempVal = null;
216         hasMoreRows = scanner.next(results);
217         for (KeyValue kv : results) {
218           tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
219               qualifier, kv)));
220         }
221         results.clear();
222         sumVal = ci.add(sumVal, tempVal);
223         sumSqVal = ci.add(sumSqVal, ci.multiply(tempVal, tempVal));
224         rowCountVal++;
225       } while (hasMoreRows);
226     } finally {
227       scanner.close();
228     }
229     List<S> l = new ArrayList<S>();
230     l.add(sumVal);
231     l.add(sumSqVal);
232     Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal);
233     return p;
234   }
235 
236   @Override
237   public <T, S> List<S> getMedian(ColumnInterpreter<T, S> ci, Scan scan)
238   throws IOException {
239     S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
240 
241     InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
242     .getRegion().getScanner(scan);
243     byte[] colFamily = scan.getFamilies()[0];
244     NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
245     byte[] valQualifier = quals.pollFirst();
246     // if weighted median is requested, get qualifier for the weight column
247     byte[] weightQualifier = quals.size() > 1 ? quals.pollLast() : null;
248     List<KeyValue> results = new ArrayList<KeyValue>();
249 
250     boolean hasMoreRows = false;
251     try {
252       do {
253         tempVal = null;
254         tempWeight = null;
255         hasMoreRows = scanner.next(results);
256         for (KeyValue kv : results) {
257           tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
258               valQualifier, kv)));
259           if (weightQualifier != null) {
260             tempWeight = ci.add(tempWeight,
261                 ci.castToReturnType(ci.getValue(colFamily, weightQualifier, kv)));
262           }
263         }
264         results.clear();
265         sumVal = ci.add(sumVal, tempVal);
266         sumWeights = ci.add(sumWeights, tempWeight);
267       } while (hasMoreRows);
268     } finally {
269       scanner.close();
270     }
271     List<S> l = new ArrayList<S>();
272     l.add(sumVal);
273     l.add(sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights);
274     return l;
275   }
276   
277 }