001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import java.io.IOException;
021import java.io.PrintWriter;
022import java.io.StringWriter;
023import java.security.PrivilegedExceptionAction;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.Put;
027import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
028import org.apache.hadoop.hbase.client.Table;
029import org.apache.hadoop.hbase.security.User;
030import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
031import org.apache.hadoop.util.StringUtils;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * MultiThreadedWriter that helps in testing ACL
038 */
039@InterfaceAudience.Private
040public class MultiThreadedWriterWithACL extends MultiThreadedWriter {
041
042  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedWriterWithACL.class);
043  private User userOwner;
044
045  public MultiThreadedWriterWithACL(LoadTestDataGenerator dataGen, Configuration conf,
046    TableName tableName, User userOwner) throws IOException {
047    super(dataGen, conf, tableName);
048    this.userOwner = userOwner;
049  }
050
051  @Override
052  public void start(long startKey, long endKey, int numThreads) throws IOException {
053    super.start(startKey, endKey, numThreads);
054  }
055
056  @Override
057  protected void createWriterThreads(int numThreads) throws IOException {
058    for (int i = 0; i < numThreads; ++i) {
059      HBaseWriterThread writer = new HBaseWriterThreadWithACL(i);
060      writers.add(writer);
061    }
062  }
063
064  public class HBaseWriterThreadWithACL extends HBaseWriterThread {
065
066    private Table table;
067    private WriteAccessAction writerAction = new WriteAccessAction();
068
069    public HBaseWriterThreadWithACL(int writerId) throws IOException {
070      super(writerId);
071    }
072
073    @Override
074    protected Table createTable() throws IOException {
075      return null;
076    }
077
078    @Override
079    protected void closeHTable() {
080      if (table != null) {
081        try {
082          table.close();
083        } catch (Exception e) {
084          LOG.error("Error in closing the table " + table.getName(), e);
085        }
086      }
087    }
088
089    @Override
090    public void insert(final Table table, Put put, final long keyBase) {
091      final long start = EnvironmentEdgeManager.currentTime();
092      try {
093        put = (Put) dataGenerator.beforeMutate(keyBase, put);
094        writerAction.setPut(put);
095        writerAction.setKeyBase(keyBase);
096        writerAction.setStartTime(start);
097        userOwner.runAs(writerAction);
098      } catch (IOException e) {
099        recordFailure(table, put, keyBase, start, e);
100      } catch (InterruptedException e) {
101        failedKeySet.add(keyBase);
102      }
103    }
104
105    class WriteAccessAction implements PrivilegedExceptionAction<Object> {
106      private Put put;
107      private long keyBase;
108      private long start;
109
110      public WriteAccessAction() {
111      }
112
113      public void setPut(final Put put) {
114        this.put = put;
115      }
116
117      public void setKeyBase(final long keyBase) {
118        this.keyBase = keyBase;
119      }
120
121      public void setStartTime(final long start) {
122        this.start = start;
123      }
124
125      @Override
126      public Object run() throws Exception {
127        try {
128          if (table == null) {
129            table = connection.getTable(tableName);
130          }
131          table.put(put);
132        } catch (IOException e) {
133          recordFailure(table, put, keyBase, start, e);
134        }
135        return null;
136      }
137    }
138  }
139
140  private void recordFailure(final Table table, final Put put, final long keyBase, final long start,
141    IOException e) {
142    failedKeySet.add(keyBase);
143    String exceptionInfo;
144    if (e instanceof RetriesExhaustedWithDetailsException) {
145      RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
146      exceptionInfo = aggEx.getExhaustiveDescription();
147    } else {
148      StringWriter stackWriter = new StringWriter();
149      PrintWriter pw = new PrintWriter(stackWriter);
150      e.printStackTrace(pw);
151      pw.flush();
152      exceptionInfo = StringUtils.stringifyException(e);
153    }
154    LOG.error("Failed to insert: " + keyBase + " after "
155      + (EnvironmentEdgeManager.currentTime() - start) + "ms; region information: "
156      + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: " + exceptionInfo);
157  }
158}