001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.concurrent.CountDownLatch;
024import java.util.concurrent.atomic.AtomicLong;
025import org.apache.hadoop.hbase.HBaseClassTestRule;
026import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
027import org.apache.hadoop.hbase.client.coprocessor.Batch;
028import org.apache.hadoop.hbase.testclassification.ClientTests;
029import org.apache.hadoop.hbase.testclassification.LargeTests;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.junit.After;
032import org.junit.Before;
033import org.junit.ClassRule;
034import org.junit.experimental.categories.Category;
035
036import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
037
038@Category({ LargeTests.class, ClientTests.class })
039public class TestClientPushback extends ClientPushbackTestBase {
040
041  @ClassRule
042  public static final HBaseClassTestRule CLASS_RULE =
043    HBaseClassTestRule.forClass(TestClientPushback.class);
044
045  private ConnectionImplementation conn;
046
047  private BufferedMutatorImpl mutator;
048
049  @Before
050  public void setUp() throws IOException {
051    conn = (ConnectionImplementation) ConnectionFactory.createConnection(UTIL.getConfiguration());
052    mutator = (BufferedMutatorImpl) conn.getBufferedMutator(tableName);
053  }
054
055  @After
056  public void tearDown() throws IOException {
057    Closeables.close(mutator, true);
058    Closeables.close(conn, true);
059  }
060
061  @Override
062  protected ClientBackoffPolicy getBackoffPolicy() throws IOException {
063    return conn.getBackoffPolicy();
064  }
065
066  @Override
067  protected ServerStatisticTracker getStatisticsTracker() throws IOException {
068    return conn.getStatisticsTracker();
069  }
070
071  @Override
072  protected MetricsConnection getConnectionMetrics() throws IOException {
073    return conn.getConnectionMetrics();
074  }
075
076  @Override
077  protected void mutate(Put put) throws IOException {
078    mutator.mutate(put);
079    mutator.flush();
080  }
081
082  @Override
083  protected void mutate(Put put, AtomicLong endTime, CountDownLatch latch) throws IOException {
084    // Reach into the connection and submit work directly to AsyncProcess so we can
085    // monitor how long the submission was delayed via a callback
086    List<Row> ops = new ArrayList<>(1);
087    ops.add(put);
088    Batch.Callback<Result> callback = (byte[] r, byte[] row, Result result) -> {
089      endTime.set(EnvironmentEdgeManager.currentTime());
090      latch.countDown();
091    };
092    AsyncProcessTask<Result> task =
093      AsyncProcessTask.newBuilder(callback).setPool(mutator.getPool()).setTableName(tableName)
094        .setRowAccess(ops).setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
095        .setOperationTimeout(conn.getConnectionConfiguration().getOperationTimeout())
096        .setRpcTimeout(60 * 1000).build();
097    mutator.getAsyncProcess().submit(task);
098  }
099
100  @Override
101  protected void mutateRow(RowMutations mutations) throws IOException {
102    try (Table table = conn.getTable(tableName)) {
103      table.mutateRow(mutations);
104    }
105  }
106}