View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.avro;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.util.HashMap;
24  
25  import org.apache.avro.Schema;
26  import org.apache.avro.generic.GenericArray;
27  import org.apache.avro.generic.GenericData;
28  import org.apache.avro.ipc.HttpServer;
29  import org.apache.avro.ipc.specific.SpecificResponder;
30  import org.apache.avro.util.Utf8;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.HBaseConfiguration;
35  import org.apache.hadoop.hbase.HTableDescriptor;
36  import org.apache.hadoop.hbase.MasterNotRunningException;
37  import org.apache.hadoop.hbase.TableExistsException;
38  import org.apache.hadoop.hbase.TableNotFoundException;
39  import org.apache.hadoop.hbase.avro.generated.AClusterStatus;
40  import org.apache.hadoop.hbase.avro.generated.ADelete;
41  import org.apache.hadoop.hbase.avro.generated.AFamilyDescriptor;
42  import org.apache.hadoop.hbase.avro.generated.AGet;
43  import org.apache.hadoop.hbase.avro.generated.AIOError;
44  import org.apache.hadoop.hbase.avro.generated.AIllegalArgument;
45  import org.apache.hadoop.hbase.avro.generated.AMasterNotRunning;
46  import org.apache.hadoop.hbase.avro.generated.APut;
47  import org.apache.hadoop.hbase.avro.generated.AResult;
48  import org.apache.hadoop.hbase.avro.generated.AScan;
49  import org.apache.hadoop.hbase.avro.generated.ATableDescriptor;
50  import org.apache.hadoop.hbase.avro.generated.ATableExists;
51  import org.apache.hadoop.hbase.avro.generated.HBase;
52  import org.apache.hadoop.hbase.client.HBaseAdmin;
53  import org.apache.hadoop.hbase.client.HTableInterface;
54  import org.apache.hadoop.hbase.client.HTablePool;
55  import org.apache.hadoop.hbase.client.ResultScanner;
56  import org.apache.hadoop.hbase.client.Scan;
57  import org.apache.hadoop.hbase.util.Bytes;
58  
59  /**
60   * Start an Avro server
61   */
62  public class AvroServer {
63  
64    /**
65     * The HBaseImpl is a glue object that connects Avro RPC calls to the
66     * HBase client API primarily defined in the HBaseAdmin and HTable objects.
67     */
68    public static class HBaseImpl implements HBase {
69      //
70      // PROPERTIES
71      //
72      protected Configuration conf = null;
73      protected HBaseAdmin admin = null;
74      protected HTablePool htablePool = null;
75      protected final Log LOG = LogFactory.getLog(this.getClass().getName());
76  
77      // nextScannerId and scannerMap are used to manage scanner state
78      protected int nextScannerId = 0;
79      protected HashMap<Integer, ResultScanner> scannerMap = null;
80  
81      //
82      // UTILITY METHODS
83      //
84  
85      /**
86       * Assigns a unique ID to the scanner and adds the mapping to an internal
87       * hash-map.
88       *
89       * @param scanner
90       * @return integer scanner id
91       */
92      protected synchronized int addScanner(ResultScanner scanner) {
93        int id = nextScannerId++;
94        scannerMap.put(id, scanner);
95        return id;
96      }
97  
98      /**
99       * Returns the scanner associated with the specified ID.
100      *
101      * @param id
102      * @return a Scanner, or null if ID was invalid.
103      */
104     protected synchronized ResultScanner getScanner(int id) {
105       return scannerMap.get(id);
106     }
107 
108     /**
109      * Removes the scanner associated with the specified ID from the internal
110      * id->scanner hash-map.
111      *
112      * @param id
113      * @return a Scanner, or null if ID was invalid.
114      */
115     protected synchronized ResultScanner removeScanner(int id) {
116       return scannerMap.remove(id);
117     }
118 
119     //
120     // CTOR METHODS
121     //
122 
123     // TODO(hammer): figure out appropriate setting of maxSize for htablePool
124     /**
125      * Constructs an HBaseImpl object.
126      * @throws IOException 
127      */
128     HBaseImpl() throws IOException {
129       this(HBaseConfiguration.create());
130     }
131 
132     HBaseImpl(final Configuration c) throws IOException {
133       conf = c;
134       admin = new HBaseAdmin(conf);
135       htablePool = new HTablePool(conf, 10);
136       scannerMap = new HashMap<Integer, ResultScanner>();
137     }
138 
139     //
140     // SERVICE METHODS
141     //
142 
143     // TODO(hammer): Investigate use of the Command design pattern
144 
145     //
146     // Cluster metadata
147     //
148 
149     public Utf8 getHBaseVersion() throws AIOError {
150       try {
151 	return new Utf8(admin.getClusterStatus().getHBaseVersion());
152       } catch (IOException e) {
153 	AIOError ioe = new AIOError();
154 	ioe.message = new Utf8(e.getMessage());
155         throw ioe;
156       }
157     }
158 
159     public AClusterStatus getClusterStatus() throws AIOError {
160       try {
161 	return AvroUtil.csToACS(admin.getClusterStatus());
162       } catch (IOException e) {
163 	AIOError ioe = new AIOError();
164 	ioe.message = new Utf8(e.getMessage());
165         throw ioe;
166       }
167     }
168 
169     public GenericArray<ATableDescriptor> listTables() throws AIOError {
170       try {
171         HTableDescriptor[] tables = admin.listTables();
172 	Schema atdSchema = Schema.createArray(ATableDescriptor.SCHEMA$);
173         GenericData.Array<ATableDescriptor> result = null;
174 	result = new GenericData.Array<ATableDescriptor>(tables.length, atdSchema);
175         for (HTableDescriptor table : tables) {
176 	  result.add(AvroUtil.htdToATD(table));
177 	}
178         return result;
179       } catch (IOException e) {
180 	AIOError ioe = new AIOError();
181 	ioe.message = new Utf8(e.getMessage());
182         throw ioe;
183       }
184     }
185 
186     //
187     // Table metadata
188     //
189 
190     // TODO(hammer): Handle the case where the table does not exist explicitly?
191     public ATableDescriptor describeTable(ByteBuffer table) throws AIOError {
192       try {
193 	return AvroUtil.htdToATD(admin.getTableDescriptor(Bytes.toBytes(table)));
194       } catch (TableNotFoundException e) {
195         return null;
196       } catch (IOException e) {
197         AIOError ioe = new AIOError();
198         ioe.message = new Utf8(e.getMessage());
199         throw ioe;
200       }
201     }
202 
203     public boolean isTableEnabled(ByteBuffer table) throws AIOError {
204       try {
205 	return admin.isTableEnabled(Bytes.toBytes(table));
206       } catch (IOException e) {
207 	AIOError ioe = new AIOError();
208 	ioe.message = new Utf8(e.getMessage());
209         throw ioe;
210       }
211     }
212 
213     public boolean tableExists(ByteBuffer table) throws AIOError {
214       try {
215 	return admin.tableExists(Bytes.toBytes(table));
216       } catch (IOException e) {
217 	AIOError ioe = new AIOError();
218 	ioe.message = new Utf8(e.getMessage());
219         throw ioe;
220       }
221     }
222 
223     //
224     // Family metadata
225     //
226 
227     // TODO(hammer): Handle the case where the family does not exist explicitly?
228     public AFamilyDescriptor describeFamily(ByteBuffer table, ByteBuffer family) throws AIOError {
229       try {
230 	HTableDescriptor htd = admin.getTableDescriptor(Bytes.toBytes(table));
231 	return AvroUtil.hcdToAFD(htd.getFamily(Bytes.toBytes(family)));
232       } catch (IOException e) {
233         AIOError ioe = new AIOError();
234         ioe.message = new Utf8(e.getMessage());
235         throw ioe;
236       }
237     }
238 
239     //
240     // Table admin
241     //
242 
243     public Void createTable(ATableDescriptor table) throws AIOError, 
244                                                            AIllegalArgument,
245                                                            ATableExists,
246                                                            AMasterNotRunning {
247       try {
248         admin.createTable(AvroUtil.atdToHTD(table));
249 	return null;
250       } catch (IllegalArgumentException e) {
251 	AIllegalArgument iae = new AIllegalArgument();
252 	iae.message = new Utf8(e.getMessage());
253         throw iae;
254       } catch (TableExistsException e) {
255 	ATableExists tee = new ATableExists();
256 	tee.message = new Utf8(e.getMessage());
257         throw tee;
258       } catch (MasterNotRunningException e) {
259 	AMasterNotRunning mnre = new AMasterNotRunning();
260 	mnre.message = new Utf8(e.getMessage());
261         throw mnre;
262       } catch (IOException e) {
263 	AIOError ioe = new AIOError();
264 	ioe.message = new Utf8(e.getMessage());
265         throw ioe;
266       }
267     }
268 
269     // Note that disable, flush and major compaction of .META. needed in client
270     // TODO(hammer): more selective cache dirtying than flush?
271     public Void deleteTable(ByteBuffer table) throws AIOError {
272       try {
273 	admin.deleteTable(Bytes.toBytes(table));
274 	return null;
275       } catch (IOException e) {
276 	AIOError ioe = new AIOError();
277 	ioe.message = new Utf8(e.getMessage());
278         throw ioe;
279       }
280     }
281 
282     // NB: Asynchronous operation
283     public Void modifyTable(ByteBuffer tableName, ATableDescriptor tableDescriptor) throws AIOError {
284       try {
285 	admin.modifyTable(Bytes.toBytes(tableName),
286                           AvroUtil.atdToHTD(tableDescriptor));
287 	return null;
288       } catch (IOException e) {
289 	AIOError ioe = new AIOError();
290 	ioe.message = new Utf8(e.getMessage());
291         throw ioe;
292       }
293     }
294 
295     public Void enableTable(ByteBuffer table) throws AIOError {
296       try {
297 	admin.enableTable(Bytes.toBytes(table));
298 	return null;
299       } catch (IOException e) {
300 	AIOError ioe = new AIOError();
301 	ioe.message = new Utf8(e.getMessage());
302         throw ioe;
303       }
304     }
305     
306     public Void disableTable(ByteBuffer table) throws AIOError {
307       try {
308 	admin.disableTable(Bytes.toBytes(table));
309 	return null;
310       } catch (IOException e) {
311 	AIOError ioe = new AIOError();
312 	ioe.message = new Utf8(e.getMessage());
313         throw ioe;
314       }
315     }
316     
317     // NB: Asynchronous operation
318     public Void flush(ByteBuffer table) throws AIOError {
319       try {
320 	admin.flush(Bytes.toBytes(table));
321 	return null;
322       } catch (InterruptedException e) {
323 	AIOError ioe = new AIOError();
324 	ioe.message = new Utf8(e.getMessage());
325         throw ioe;
326       } catch (IOException e) {
327 	AIOError ioe = new AIOError();
328 	ioe.message = new Utf8(e.getMessage());
329         throw ioe;
330       }
331     }
332 
333     // NB: Asynchronous operation
334     public Void split(ByteBuffer table) throws AIOError {
335       try {
336 	admin.split(Bytes.toBytes(table));
337 	return null;
338       } catch (InterruptedException e) {
339 	AIOError ioe = new AIOError();
340 	ioe.message = new Utf8(e.getMessage());
341         throw ioe;
342       } catch (IOException e) {
343 	AIOError ioe = new AIOError();
344 	ioe.message = new Utf8(e.getMessage());
345         throw ioe;
346       }
347     }
348 
349     //
350     // Family admin
351     //
352 
353     public Void addFamily(ByteBuffer table, AFamilyDescriptor family) throws AIOError {
354       try {
355 	admin.addColumn(Bytes.toBytes(table), 
356                         AvroUtil.afdToHCD(family));
357 	return null;
358       } catch (IOException e) {
359 	AIOError ioe = new AIOError();
360 	ioe.message = new Utf8(e.getMessage());
361         throw ioe;
362       }
363     }
364 
365     // NB: Asynchronous operation
366     public Void deleteFamily(ByteBuffer table, ByteBuffer family) throws AIOError {
367       try {
368 	admin.deleteColumn(Bytes.toBytes(table), Bytes.toBytes(family));
369 	return null;
370       } catch (IOException e) {
371 	AIOError ioe = new AIOError();
372 	ioe.message = new Utf8(e.getMessage());
373         throw ioe;
374       }
375     }
376 
377     // NB: Asynchronous operation
378     public Void modifyFamily(ByteBuffer table, ByteBuffer familyName, AFamilyDescriptor familyDescriptor) throws AIOError {
379       try {
380 	admin.modifyColumn(Bytes.toBytes(table), AvroUtil.afdToHCD(familyDescriptor));
381 	return null;
382       } catch (IOException e) {
383 	AIOError ioe = new AIOError();
384 	ioe.message = new Utf8(e.getMessage());
385         throw ioe;
386       }
387     }
388 
389     //
390     // Single-row DML
391     //
392 
393     // TODO(hammer): Java with statement for htablepool concision?
394     // TODO(hammer): Can Get have timestamp and timerange simultaneously?
395     // TODO(hammer): Do I need to catch the RuntimeException of getTable?
396     // TODO(hammer): Handle gets with no results
397     // TODO(hammer): Uses exists(Get) to ensure columns exist
398     public AResult get(ByteBuffer table, AGet aget) throws AIOError {
399       HTableInterface htable = htablePool.getTable(Bytes.toBytes(table));
400       try {
401         return AvroUtil.resultToAResult(htable.get(AvroUtil.agetToGet(aget)));
402       } catch (IOException e) {
403     	AIOError ioe = new AIOError();
404 	ioe.message = new Utf8(e.getMessage());
405         throw ioe;
406       } finally {
407         try {
408           htablePool.putTable(htable);
409         } catch (IOException e) {
410           AIOError ioe = new AIOError();
411           ioe.message = new Utf8(e.getMessage());
412           throw ioe;
413         }
414       }
415     }
416 
417     public boolean exists(ByteBuffer table, AGet aget) throws AIOError {
418       HTableInterface htable = htablePool.getTable(Bytes.toBytes(table));
419       try {
420         return htable.exists(AvroUtil.agetToGet(aget));
421       } catch (IOException e) {
422     	AIOError ioe = new AIOError();
423 	ioe.message = new Utf8(e.getMessage());
424         throw ioe;
425       } finally {
426         try {
427           htablePool.putTable(htable);
428         } catch (IOException e) {
429           AIOError ioe = new AIOError();
430           ioe.message = new Utf8(e.getMessage());
431           throw ioe;
432         } 
433       }
434     }
435 
436     public Void put(ByteBuffer table, APut aput) throws AIOError {
437       HTableInterface htable = htablePool.getTable(Bytes.toBytes(table));
438       try {
439 	htable.put(AvroUtil.aputToPut(aput));
440         return null;
441       } catch (IOException e) {
442         AIOError ioe = new AIOError();
443         ioe.message = new Utf8(e.getMessage());
444         throw ioe;
445       } finally {
446         try {
447           htablePool.putTable(htable);
448         } catch (IOException e) {
449           AIOError ioe = new AIOError();
450           ioe.message = new Utf8(e.getMessage());
451           throw ioe;
452         }
453       }
454     }
455 
456     public Void delete(ByteBuffer table, ADelete adelete) throws AIOError {
457       HTableInterface htable = htablePool.getTable(Bytes.toBytes(table));
458       try {
459         htable.delete(AvroUtil.adeleteToDelete(adelete));
460         return null;
461       } catch (IOException e) {
462     	AIOError ioe = new AIOError();
463 	ioe.message = new Utf8(e.getMessage());
464         throw ioe;
465       } finally {
466         try {
467           htablePool.putTable(htable);
468         } catch (IOException e) {
469           AIOError ioe = new AIOError();
470           ioe.message = new Utf8(e.getMessage());
471           throw ioe;
472         }
473       }
474     }
475 
476     public long incrementColumnValue(ByteBuffer table, ByteBuffer row, ByteBuffer family, ByteBuffer qualifier, long amount, boolean writeToWAL) throws AIOError {
477       HTableInterface htable = htablePool.getTable(Bytes.toBytes(table));
478       try {
479 	return htable.incrementColumnValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(qualifier), amount, writeToWAL);
480       } catch (IOException e) {
481         AIOError ioe = new AIOError();
482         ioe.message = new Utf8(e.getMessage());
483         throw ioe;
484       } finally {
485         try {
486           htablePool.putTable(htable);
487         } catch (IOException e) {
488           AIOError ioe = new AIOError();
489           ioe.message = new Utf8(e.getMessage());
490           throw ioe;
491         }
492       }
493     }
494 
495     //
496     // Multi-row DML
497     //
498 
499     public int scannerOpen(ByteBuffer table, AScan ascan) throws AIOError {
500       HTableInterface htable = htablePool.getTable(Bytes.toBytes(table));
501       try {
502         Scan scan = AvroUtil.ascanToScan(ascan);
503         return addScanner(htable.getScanner(scan));
504       } catch (IOException e) {
505     	AIOError ioe = new AIOError();
506 	ioe.message = new Utf8(e.getMessage());
507         throw ioe;
508       } finally {
509         try {
510           htablePool.putTable(htable);
511         } catch (IOException e) {
512           AIOError ioe = new AIOError();
513           ioe.message = new Utf8(e.getMessage());
514           throw ioe;
515         }
516       }
517     }
518 
519     public Void scannerClose(int scannerId) throws AIOError, AIllegalArgument {
520       try {
521         ResultScanner scanner = getScanner(scannerId);
522         if (scanner == null) {
523       	  AIllegalArgument aie = new AIllegalArgument();
524 	  aie.message = new Utf8("scanner ID is invalid: " + scannerId);
525           throw aie;
526         }
527         scanner.close();
528         removeScanner(scannerId);
529         return null;
530       } catch (IOException e) {
531     	AIOError ioe = new AIOError();
532 	ioe.message = new Utf8(e.getMessage());
533         throw ioe;
534       }
535     }
536 
537     public GenericArray<AResult> scannerGetRows(int scannerId, int numberOfRows) throws AIOError, AIllegalArgument {
538       try {
539         ResultScanner scanner = getScanner(scannerId);
540         if (scanner == null) {
541       	  AIllegalArgument aie = new AIllegalArgument();
542 	  aie.message = new Utf8("scanner ID is invalid: " + scannerId);
543           throw aie;
544         }
545         return AvroUtil.resultsToAResults(scanner.next(numberOfRows));
546       } catch (IOException e) {
547     	AIOError ioe = new AIOError();
548 	ioe.message = new Utf8(e.getMessage());
549         throw ioe;
550       }
551     }
552   }
553 
554   //
555   // MAIN PROGRAM
556   //
557 
558   private static void printUsageAndExit() {
559     printUsageAndExit(null);
560   }
561   
562   private static void printUsageAndExit(final String message) {
563     if (message != null) {
564       System.err.println(message);
565     }
566     System.out.println("Usage: java org.apache.hadoop.hbase.avro.AvroServer " +
567       "--help | [--port=PORT] start");
568     System.out.println("Arguments:");
569     System.out.println(" start Start Avro server");
570     System.out.println(" stop  Stop Avro server");
571     System.out.println("Options:");
572     System.out.println(" port  Port to listen on. Default: 9090");
573     System.out.println(" help  Print this message and exit");
574     System.exit(0);
575   }
576 
577   protected static void doMain(final String[] args) throws Exception {
578     if (args.length < 1) {
579       printUsageAndExit();
580     }
581     int port = 9090;
582     final String portArgKey = "--port=";
583     for (String cmd: args) {
584       if (cmd.startsWith(portArgKey)) {
585         port = Integer.parseInt(cmd.substring(portArgKey.length()));
586         continue;
587       } else if (cmd.equals("--help") || cmd.equals("-h")) {
588         printUsageAndExit();
589       } else if (cmd.equals("start")) {
590         continue;
591       } else if (cmd.equals("stop")) {
592         printUsageAndExit("To shutdown the Avro server run " +
593           "bin/hbase-daemon.sh stop avro or send a kill signal to " +
594           "the Avro server pid");
595       }
596       
597       // Print out usage if we get to here.
598       printUsageAndExit();
599     }
600     Log LOG = LogFactory.getLog("AvroServer");
601     LOG.info("starting HBase Avro server on port " + Integer.toString(port));
602     SpecificResponder r = new SpecificResponder(HBase.class, new HBaseImpl());
603     HttpServer server = new HttpServer(r, port);
604     server.start();
605     server.join();
606   }
607 
608   // TODO(hammer): Look at Cassandra's daemonization and integration with JSVC
609   // TODO(hammer): Don't eat it after a single exception
610   // TODO(hammer): Figure out why we do doMain()
611   // TODO(hammer): Figure out if we want String[] or String [] syntax
612   public static void main(String[] args) throws Exception {
613     doMain(args);
614   }
615 }