001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.client.example; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.Callable; 025import java.util.concurrent.ExecutionException; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.Future; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.TimeoutException; 031import org.apache.hadoop.conf.Configured; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.BufferedMutator; 034import org.apache.hadoop.hbase.client.BufferedMutatorParams; 035import org.apache.hadoop.hbase.client.Connection; 036import org.apache.hadoop.hbase.client.ConnectionFactory; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.util.Tool; 041import org.apache.hadoop.util.ToolRunner; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * An example of using the {@link BufferedMutator} interface. 048 */ 049@InterfaceAudience.Private 050public class BufferedMutatorExample extends Configured implements Tool { 051 052 private static final Logger LOG = LoggerFactory.getLogger(BufferedMutatorExample.class); 053 054 private static final int POOL_SIZE = 10; 055 private static final int TASK_COUNT = 100; 056 private static final TableName TABLE = TableName.valueOf("foo"); 057 private static final byte[] FAMILY = Bytes.toBytes("f"); 058 059 @Override 060 public int run(String[] args) throws InterruptedException, ExecutionException, TimeoutException { 061 062 /** a callback invoked when an asynchronous write fails. */ 063 final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() { 064 @Override 065 public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) { 066 for (int i = 0; i < e.getNumExceptions(); i++) { 067 LOG.info("Failed to sent put " + e.getRow(i) + "."); 068 } 069 } 070 }; 071 BufferedMutatorParams params = new BufferedMutatorParams(TABLE) 072 .listener(listener); 073 074 // 075 // step 1: create a single Connection and a BufferedMutator, shared by all worker threads. 076 // 077 try (final Connection conn = ConnectionFactory.createConnection(getConf()); 078 final BufferedMutator mutator = conn.getBufferedMutator(params)) { 079 080 /** worker pool that operates on BufferedTable instances */ 081 final ExecutorService workerPool = Executors.newFixedThreadPool(POOL_SIZE); 082 List<Future<Void>> futures = new ArrayList<>(TASK_COUNT); 083 084 for (int i = 0; i < TASK_COUNT; i++) { 085 futures.add(workerPool.submit(new Callable<Void>() { 086 @Override 087 public Void call() throws Exception { 088 // 089 // step 2: each worker sends edits to the shared BufferedMutator instance. They all use 090 // the same backing buffer, call-back "listener", and RPC executor pool. 091 // 092 Put p = new Put(Bytes.toBytes("someRow")); 093 p.addColumn(FAMILY, Bytes.toBytes("someQualifier"), Bytes.toBytes("some value")); 094 mutator.mutate(p); 095 // do work... maybe you want to call mutator.flush() after many edits to ensure any of 096 // this worker's edits are sent before exiting the Callable 097 return null; 098 } 099 })); 100 } 101 102 // 103 // step 3: clean up the worker pool, shut down. 104 // 105 for (Future<Void> f : futures) { 106 f.get(5, TimeUnit.MINUTES); 107 } 108 workerPool.shutdown(); 109 } catch (IOException e) { 110 // exception while creating/destroying Connection or BufferedMutator 111 LOG.info("exception while creating/destroying Connection or BufferedMutator", e); 112 } // BufferedMutator.close() ensures all work is flushed. Could be the custom listener is 113 // invoked from here. 114 return 0; 115 } 116 117 public static void main(String[] args) throws Exception { 118 ToolRunner.run(new BufferedMutatorExample(), args); 119 } 120}