001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.example; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.concurrent.Callable; 024import java.util.concurrent.ExecutionException; 025import java.util.concurrent.ExecutorService; 026import java.util.concurrent.Executors; 027import java.util.concurrent.Future; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.TimeoutException; 030import org.apache.hadoop.conf.Configured; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.BufferedMutator; 033import org.apache.hadoop.hbase.client.BufferedMutatorParams; 034import org.apache.hadoop.hbase.client.Connection; 035import org.apache.hadoop.hbase.client.ConnectionFactory; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.util.Tool; 040import org.apache.hadoop.util.ToolRunner; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * An example of using the {@link BufferedMutator} interface. 047 */ 048@InterfaceAudience.Private 049public class BufferedMutatorExample extends Configured implements Tool { 050 051 private static final Logger LOG = LoggerFactory.getLogger(BufferedMutatorExample.class); 052 053 private static final int POOL_SIZE = 10; 054 private static final int TASK_COUNT = 100; 055 private static final TableName TABLE = TableName.valueOf("foo"); 056 private static final byte[] FAMILY = Bytes.toBytes("f"); 057 058 @Override 059 public int run(String[] args) throws InterruptedException, ExecutionException, TimeoutException { 060 061 /** a callback invoked when an asynchronous write fails. */ 062 final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() { 063 @Override 064 public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) { 065 for (int i = 0; i < e.getNumExceptions(); i++) { 066 LOG.info("Failed to sent put " + e.getRow(i) + "."); 067 } 068 } 069 }; 070 BufferedMutatorParams params = new BufferedMutatorParams(TABLE).listener(listener); 071 072 // 073 // step 1: create a single Connection and a BufferedMutator, shared by all worker threads. 074 // 075 try (final Connection conn = ConnectionFactory.createConnection(getConf()); 076 final BufferedMutator mutator = conn.getBufferedMutator(params)) { 077 078 /** worker pool that operates on BufferedTable instances */ 079 final ExecutorService workerPool = Executors.newFixedThreadPool(POOL_SIZE); 080 List<Future<Void>> futures = new ArrayList<>(TASK_COUNT); 081 082 for (int i = 0; i < TASK_COUNT; i++) { 083 futures.add(workerPool.submit(new Callable<Void>() { 084 @Override 085 public Void call() throws Exception { 086 // 087 // step 2: each worker sends edits to the shared BufferedMutator instance. They all use 088 // the same backing buffer, call-back "listener", and RPC executor pool. 089 // 090 Put p = new Put(Bytes.toBytes("someRow")); 091 p.addColumn(FAMILY, Bytes.toBytes("someQualifier"), Bytes.toBytes("some value")); 092 mutator.mutate(p); 093 // do work... maybe you want to call mutator.flush() after many edits to ensure any of 094 // this worker's edits are sent before exiting the Callable 095 return null; 096 } 097 })); 098 } 099 100 // 101 // step 3: clean up the worker pool, shut down. 102 // 103 for (Future<Void> f : futures) { 104 f.get(5, TimeUnit.MINUTES); 105 } 106 workerPool.shutdown(); 107 } catch (IOException e) { 108 // exception while creating/destroying Connection or BufferedMutator 109 LOG.info("exception while creating/destroying Connection or BufferedMutator", e); 110 } // BufferedMutator.close() ensures all work is flushed. Could be the custom listener is 111 // invoked from here. 112 return 0; 113 } 114 115 public static void main(String[] args) throws Exception { 116 ToolRunner.run(new BufferedMutatorExample(), args); 117 } 118}