001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.io.PrintWriter;
022import java.io.StringWriter;
023import java.security.PrivilegedExceptionAction;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.Put;
027import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
028import org.apache.hadoop.hbase.client.Table;
029import org.apache.hadoop.hbase.security.User;
030import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
031import org.apache.hadoop.util.StringUtils;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * MultiThreadedWriter that helps in testing ACL
037 */
038public class MultiThreadedWriterWithACL extends MultiThreadedWriter {
039
040  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedWriterWithACL.class);
041  private User userOwner;
042
043  public MultiThreadedWriterWithACL(LoadTestDataGenerator dataGen, Configuration conf,
044    TableName tableName, User userOwner) throws IOException {
045    super(dataGen, conf, tableName);
046    this.userOwner = userOwner;
047  }
048
049  @Override
050  public void start(long startKey, long endKey, int numThreads) throws IOException {
051    super.start(startKey, endKey, numThreads);
052  }
053
054  @Override
055  protected void createWriterThreads(int numThreads) throws IOException {
056    for (int i = 0; i < numThreads; ++i) {
057      HBaseWriterThread writer = new HBaseWriterThreadWithACL(i);
058      writers.add(writer);
059    }
060  }
061
062  public class HBaseWriterThreadWithACL extends HBaseWriterThread {
063
064    private Table table;
065    private WriteAccessAction writerAction = new WriteAccessAction();
066
067    public HBaseWriterThreadWithACL(int writerId) throws IOException {
068      super(writerId);
069    }
070
071    @Override
072    protected Table createTable() throws IOException {
073      return null;
074    }
075
076    @Override
077    protected void closeHTable() {
078      if (table != null) {
079        try {
080          table.close();
081        } catch (Exception e) {
082          LOG.error("Error in closing the table " + table.getName(), e);
083        }
084      }
085    }
086
087    @Override
088    public void insert(final Table table, Put put, final long keyBase) {
089      final long start = EnvironmentEdgeManager.currentTime();
090      try {
091        put = (Put) dataGenerator.beforeMutate(keyBase, put);
092        writerAction.setPut(put);
093        writerAction.setKeyBase(keyBase);
094        writerAction.setStartTime(start);
095        userOwner.runAs(writerAction);
096      } catch (IOException e) {
097        recordFailure(table, put, keyBase, start, e);
098      } catch (InterruptedException e) {
099        failedKeySet.add(keyBase);
100      }
101    }
102
103    class WriteAccessAction implements PrivilegedExceptionAction<Object> {
104      private Put put;
105      private long keyBase;
106      private long start;
107
108      public WriteAccessAction() {
109      }
110
111      public void setPut(final Put put) {
112        this.put = put;
113      }
114
115      public void setKeyBase(final long keyBase) {
116        this.keyBase = keyBase;
117      }
118
119      public void setStartTime(final long start) {
120        this.start = start;
121      }
122
123      @Override
124      public Object run() throws Exception {
125        try {
126          if (table == null) {
127            table = connection.getTable(tableName);
128          }
129          table.put(put);
130        } catch (IOException e) {
131          recordFailure(table, put, keyBase, start, e);
132        }
133        return null;
134      }
135    }
136  }
137
138  private void recordFailure(final Table table, final Put put, final long keyBase, final long start,
139    IOException e) {
140    failedKeySet.add(keyBase);
141    String exceptionInfo;
142    if (e instanceof RetriesExhaustedWithDetailsException) {
143      RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
144      exceptionInfo = aggEx.getExhaustiveDescription();
145    } else {
146      StringWriter stackWriter = new StringWriter();
147      PrintWriter pw = new PrintWriter(stackWriter);
148      e.printStackTrace(pw);
149      pw.flush();
150      exceptionInfo = StringUtils.stringifyException(e);
151    }
152    LOG.error("Failed to insert: " + keyBase + " after "
153      + (EnvironmentEdgeManager.currentTime() - start) + "ms; region information: "
154      + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: " + exceptionInfo);
155  }
156}