001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.concurrent.CountDownLatch; 024import java.util.concurrent.atomic.AtomicLong; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 027import org.apache.hadoop.hbase.client.coprocessor.Batch; 028import org.apache.hadoop.hbase.testclassification.ClientTests; 029import org.apache.hadoop.hbase.testclassification.LargeTests; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031import org.junit.After; 032import org.junit.Before; 033import org.junit.ClassRule; 034import org.junit.experimental.categories.Category; 035 036import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 037 038@Category({ LargeTests.class, ClientTests.class }) 039public class TestClientPushback extends ClientPushbackTestBase { 040 041 @ClassRule 042 public static final HBaseClassTestRule CLASS_RULE = 043 HBaseClassTestRule.forClass(TestClientPushback.class); 044 045 private ConnectionImplementation conn; 046 047 private BufferedMutatorImpl mutator; 048 049 @Before 050 public void setUp() throws IOException { 051 conn = (ConnectionImplementation) ConnectionFactory.createConnection(UTIL.getConfiguration()); 052 mutator = (BufferedMutatorImpl) conn.getBufferedMutator(tableName); 053 } 054 055 @After 056 public void tearDown() throws IOException { 057 Closeables.close(mutator, true); 058 Closeables.close(conn, true); 059 } 060 061 @Override 062 protected ClientBackoffPolicy getBackoffPolicy() throws IOException { 063 return conn.getBackoffPolicy(); 064 } 065 066 @Override 067 protected ServerStatisticTracker getStatisticsTracker() throws IOException { 068 return conn.getStatisticsTracker(); 069 } 070 071 @Override 072 protected MetricsConnection getConnectionMetrics() throws IOException { 073 return conn.getConnectionMetrics(); 074 } 075 076 @Override 077 protected void mutate(Put put) throws IOException { 078 mutator.mutate(put); 079 mutator.flush(); 080 } 081 082 @Override 083 protected void mutate(Put put, AtomicLong endTime, CountDownLatch latch) throws IOException { 084 // Reach into the connection and submit work directly to AsyncProcess so we can 085 // monitor how long the submission was delayed via a callback 086 List<Row> ops = new ArrayList<>(1); 087 ops.add(put); 088 Batch.Callback<Result> callback = (byte[] r, byte[] row, Result result) -> { 089 endTime.set(EnvironmentEdgeManager.currentTime()); 090 latch.countDown(); 091 }; 092 AsyncProcessTask<Result> task = 093 AsyncProcessTask.newBuilder(callback).setPool(mutator.getPool()).setTableName(tableName) 094 .setRowAccess(ops).setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE) 095 .setOperationTimeout(conn.getConnectionConfiguration().getOperationTimeout()) 096 .setRpcTimeout(60 * 1000).build(); 097 mutator.getAsyncProcess().submit(task); 098 } 099 100 @Override 101 protected void mutateRow(RowMutations mutations) throws IOException { 102 try (Table table = conn.getTable(tableName)) { 103 table.mutateRow(mutations); 104 } 105 } 106}