001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.io.PrintWriter;
022import java.io.StringWriter;
023import java.security.PrivilegedExceptionAction;
024
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.client.Put;
028import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
029import org.apache.hadoop.hbase.client.Table;
030import org.apache.hadoop.hbase.security.User;
031import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
032import org.apache.hadoop.util.StringUtils;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * MultiThreadedWriter that helps in testing ACL
038 */
039public class MultiThreadedWriterWithACL extends MultiThreadedWriter {
040
041  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedWriterWithACL.class);
042  private User userOwner;
043
044  public MultiThreadedWriterWithACL(LoadTestDataGenerator dataGen, Configuration conf,
045      TableName tableName, User userOwner) throws IOException {
046    super(dataGen, conf, tableName);
047    this.userOwner = userOwner;
048  }
049
050  @Override
051  public void start(long startKey, long endKey, int numThreads) throws IOException {
052    super.start(startKey, endKey, numThreads);
053  }
054
055  @Override
056  protected void createWriterThreads(int numThreads) throws IOException {
057    for (int i = 0; i < numThreads; ++i) {
058      HBaseWriterThread writer = new HBaseWriterThreadWithACL(i);
059      writers.add(writer);
060    }
061  }
062
063  public class HBaseWriterThreadWithACL extends HBaseWriterThread {
064
065    private Table table;
066    private WriteAccessAction writerAction = new WriteAccessAction();
067
068    public HBaseWriterThreadWithACL(int writerId) throws IOException {
069      super(writerId);
070    }
071
072    @Override
073    protected Table createTable() throws IOException {
074      return null;
075    }
076
077    @Override
078    protected void closeHTable() {
079      if (table != null) {
080        try {
081          table.close();
082        } catch (Exception e) {
083          LOG.error("Error in closing the table "+table.getName(), e);
084        }
085      }
086    }
087
088    @Override
089    public void insert(final Table table, Put put, final long keyBase) {
090      final long start = System.currentTimeMillis();
091      try {
092        put = (Put) dataGenerator.beforeMutate(keyBase, put);
093        writerAction.setPut(put);
094        writerAction.setKeyBase(keyBase);
095        writerAction.setStartTime(start);
096        userOwner.runAs(writerAction);
097      } catch (IOException e) {
098        recordFailure(table, put, keyBase, start, e);
099      } catch (InterruptedException e) {
100        failedKeySet.add(keyBase);
101      }
102    }
103
104    class WriteAccessAction implements PrivilegedExceptionAction<Object> {
105      private Put put;
106      private long keyBase;
107      private long start;
108
109      public WriteAccessAction() {
110      }
111
112      public void setPut(final Put put) {
113        this.put = put;
114      }
115
116      public void setKeyBase(final long keyBase) {
117        this.keyBase = keyBase;
118      }
119
120      public void setStartTime(final long start) {
121        this.start = start;
122      }
123
124      @Override
125      public Object run() throws Exception {
126        try {
127          if (table == null) {
128            table = connection.getTable(tableName);
129          }
130          table.put(put);
131        } catch (IOException e) {
132          recordFailure(table, put, keyBase, start, e);
133        }
134        return null;
135      }
136    }
137  }
138
139  private void recordFailure(final Table table, final Put put, final long keyBase,
140      final long start, IOException e) {
141    failedKeySet.add(keyBase);
142    String exceptionInfo;
143    if (e instanceof RetriesExhaustedWithDetailsException) {
144      RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
145      exceptionInfo = aggEx.getExhaustiveDescription();
146    } else {
147      StringWriter stackWriter = new StringWriter();
148      PrintWriter pw = new PrintWriter(stackWriter);
149      e.printStackTrace(pw);
150      pw.flush();
151      exceptionInfo = StringUtils.stringifyException(e);
152    }
153    LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start)
154        + "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: "
155        + exceptionInfo);
156  }
157}