001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.lang.reflect.Constructor;
023import java.lang.reflect.Modifier;
024import java.util.concurrent.TimeUnit;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.util.NonceKey;
028import org.apache.hadoop.hbase.util.RetryCounter;
029import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit;
030import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
031import org.apache.yetus.audience.InterfaceAudience;
032
033import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
034import org.apache.hbase.thirdparty.com.google.protobuf.Any;
035import org.apache.hbase.thirdparty.com.google.protobuf.Internal;
036import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
037import org.apache.hbase.thirdparty.com.google.protobuf.Message;
038import org.apache.hbase.thirdparty.com.google.protobuf.Parser;
039import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
040
041import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
043
044/**
045 * Helper to convert to/from ProcedureProtos
046 */
047@InterfaceAudience.Private
048public final class ProcedureUtil {
049  private ProcedureUtil() { }
050
051  // ==========================================================================
052  //  Reflection helpers to create/validate a Procedure object
053  // ==========================================================================
054  private static Procedure<?> newProcedure(String className) throws BadProcedureException {
055    try {
056      Class<?> clazz = Class.forName(className);
057      if (!Modifier.isPublic(clazz.getModifiers())) {
058        throw new Exception("the " + clazz + " class is not public");
059      }
060
061      @SuppressWarnings("rawtypes")
062      Constructor<? extends Procedure> ctor = clazz.asSubclass(Procedure.class).getConstructor();
063      assert ctor != null : "no constructor found";
064      if (!Modifier.isPublic(ctor.getModifiers())) {
065        throw new Exception("the " + clazz + " constructor is not public");
066      }
067      return ctor.newInstance();
068    } catch (Exception e) {
069      throw new BadProcedureException(
070        "The procedure class " + className + " must be accessible and have an empty constructor",
071        e);
072    }
073  }
074
075  static void validateClass(Procedure<?> proc) throws BadProcedureException {
076    try {
077      Class<?> clazz = proc.getClass();
078      if (!Modifier.isPublic(clazz.getModifiers())) {
079        throw new Exception("the " + clazz + " class is not public");
080      }
081
082      Constructor<?> ctor = clazz.getConstructor();
083      assert ctor != null;
084      if (!Modifier.isPublic(ctor.getModifiers())) {
085        throw new Exception("the " + clazz + " constructor is not public");
086      }
087    } catch (Exception e) {
088      throw new BadProcedureException("The procedure class " + proc.getClass().getName() +
089        " must be accessible and have an empty constructor", e);
090    }
091  }
092
093  // ==========================================================================
094  //  convert to and from Procedure object
095  // ==========================================================================
096
097  /**
098   * A serializer for our Procedures. Instead of the previous serializer, it
099   * uses the stateMessage list to store the internal state of the Procedures.
100   */
101  private static class StateSerializer implements ProcedureStateSerializer {
102    private final ProcedureProtos.Procedure.Builder builder;
103    private int deserializeIndex;
104
105    public StateSerializer(ProcedureProtos.Procedure.Builder builder) {
106      this.builder = builder;
107    }
108
109    @Override
110    public void serialize(Message message) throws IOException {
111      Any packedMessage = Any.pack(message);
112      builder.addStateMessage(packedMessage);
113    }
114
115    @Override
116    public <M extends Message> M deserialize(Class<M> clazz)
117        throws IOException {
118      if (deserializeIndex >= builder.getStateMessageCount()) {
119        throw new IOException("Invalid state message index: " + deserializeIndex);
120      }
121
122      try {
123        Any packedMessage = builder.getStateMessage(deserializeIndex++);
124        return packedMessage.unpack(clazz);
125      } catch (InvalidProtocolBufferException e) {
126        throw e.unwrapIOException();
127      }
128    }
129  }
130
131  /**
132   * A serializer (deserializer) for those Procedures which were serialized
133   * before this patch. It deserializes the old, binary stateData field.
134   */
135  private static class CompatStateSerializer implements ProcedureStateSerializer {
136    private InputStream inputStream;
137
138    public CompatStateSerializer(InputStream inputStream) {
139      this.inputStream = inputStream;
140    }
141
142    @Override
143    public void serialize(Message message) throws IOException {
144      throw new UnsupportedOperationException();
145    }
146
147    @SuppressWarnings("unchecked")
148    @Override
149    public <M extends Message> M deserialize(Class<M> clazz)
150        throws IOException {
151      Parser<M> parser = (Parser<M>) Internal.getDefaultInstance(clazz).getParserForType();
152      try {
153        return parser.parseDelimitedFrom(inputStream);
154      } catch (InvalidProtocolBufferException e) {
155        throw e.unwrapIOException();
156      }
157    }
158  }
159
160  /**
161   * Helper to convert the procedure to protobuf.
162   * <p/>
163   * Used by ProcedureStore implementations.
164   */
165  public static ProcedureProtos.Procedure convertToProtoProcedure(Procedure<?> proc)
166      throws IOException {
167    Preconditions.checkArgument(proc != null);
168    validateClass(proc);
169
170    final ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder()
171      .setClassName(proc.getClass().getName())
172      .setProcId(proc.getProcId())
173      .setState(proc.getState())
174      .setSubmittedTime(proc.getSubmittedTime())
175      .setLastUpdate(proc.getLastUpdate());
176
177    if (proc.hasParent()) {
178      builder.setParentId(proc.getParentProcId());
179    }
180
181    if (proc.hasTimeout()) {
182      builder.setTimeout(proc.getTimeout());
183    }
184
185    if (proc.hasOwner()) {
186      builder.setOwner(proc.getOwner());
187    }
188
189    final int[] stackIds = proc.getStackIndexes();
190    if (stackIds != null) {
191      for (int i = 0; i < stackIds.length; ++i) {
192        builder.addStackId(stackIds[i]);
193      }
194    }
195
196    if (proc.hasException()) {
197      RemoteProcedureException exception = proc.getException();
198      builder.setException(
199        RemoteProcedureException.toProto(exception.getSource(), exception.getCause()));
200    }
201
202    final byte[] result = proc.getResult();
203    if (result != null) {
204      builder.setResult(UnsafeByteOperations.unsafeWrap(result));
205    }
206
207    ProcedureStateSerializer serializer = new StateSerializer(builder);
208    proc.serializeStateData(serializer);
209
210    if (proc.getNonceKey() != null) {
211      builder.setNonceGroup(proc.getNonceKey().getNonceGroup());
212      builder.setNonce(proc.getNonceKey().getNonce());
213    }
214
215    if (proc.hasLock()) {
216      builder.setLocked(true);
217    }
218
219    if (proc.isBypass()) {
220      builder.setBypass(true);
221    }
222    return builder.build();
223  }
224
225  /**
226   * Helper to convert the protobuf procedure.
227   * <p/>
228   * Used by ProcedureStore implementations.
229   * <p/>
230   * TODO: OPTIMIZATION: some of the field never change during the execution (e.g. className,
231   * procId, parentId, ...). We can split in 'data' and 'state', and the store may take advantage of
232   * it by storing the data only on insert().
233   */
234  public static Procedure<?> convertToProcedure(ProcedureProtos.Procedure proto)
235      throws IOException {
236    // Procedure from class name
237    Procedure<?> proc = newProcedure(proto.getClassName());
238
239    // set fields
240    proc.setProcId(proto.getProcId());
241    proc.setState(proto.getState());
242    proc.setSubmittedTime(proto.getSubmittedTime());
243    proc.setLastUpdate(proto.getLastUpdate());
244
245    if (proto.hasParentId()) {
246      proc.setParentProcId(proto.getParentId());
247    }
248
249    if (proto.hasOwner()) {
250      proc.setOwner(proto.getOwner());
251    }
252
253    if (proto.hasTimeout()) {
254      proc.setTimeout(proto.getTimeout());
255    }
256
257    if (proto.getStackIdCount() > 0) {
258      proc.setStackIndexes(proto.getStackIdList());
259    }
260
261    if (proto.hasException()) {
262      assert proc.getState() == ProcedureProtos.ProcedureState.FAILED ||
263             proc.getState() == ProcedureProtos.ProcedureState.ROLLEDBACK :
264             "The procedure must be failed (waiting to rollback) or rolledback";
265      proc.setFailure(RemoteProcedureException.fromProto(proto.getException()));
266    }
267
268    if (proto.hasResult()) {
269      proc.setResult(proto.getResult().toByteArray());
270    }
271
272    if (proto.getNonce() != HConstants.NO_NONCE) {
273      proc.setNonceKey(new NonceKey(proto.getNonceGroup(), proto.getNonce()));
274    }
275
276    if (proto.getLocked()) {
277      proc.lockedWhenLoading();
278    }
279
280    if (proto.getBypass()) {
281      proc.bypass(null);
282    }
283
284    ProcedureStateSerializer serializer = null;
285
286    if (proto.getStateMessageCount() > 0) {
287      serializer = new StateSerializer(proto.toBuilder());
288    } else if (proto.hasStateData()) {
289      InputStream inputStream = proto.getStateData().newInput();
290      serializer = new CompatStateSerializer(inputStream);
291    }
292
293    if (serializer != null) {
294      proc.deserializeStateData(serializer);
295    }
296
297    return proc;
298  }
299
300  // ==========================================================================
301  //  convert from LockedResource object
302  // ==========================================================================
303
304  public static LockServiceProtos.LockedResourceType convertToProtoResourceType(
305      LockedResourceType resourceType) {
306    return LockServiceProtos.LockedResourceType.valueOf(resourceType.name());
307  }
308
309  public static LockServiceProtos.LockType convertToProtoLockType(LockType lockType) {
310    return LockServiceProtos.LockType.valueOf(lockType.name());
311  }
312
313  public static LockServiceProtos.LockedResource convertToProtoLockedResource(
314      LockedResource lockedResource) throws IOException {
315    LockServiceProtos.LockedResource.Builder builder =
316        LockServiceProtos.LockedResource.newBuilder();
317
318    builder
319        .setResourceType(convertToProtoResourceType(lockedResource.getResourceType()))
320        .setResourceName(lockedResource.getResourceName())
321        .setLockType(convertToProtoLockType(lockedResource.getLockType()));
322
323    Procedure<?> exclusiveLockOwnerProcedure = lockedResource.getExclusiveLockOwnerProcedure();
324
325    if (exclusiveLockOwnerProcedure != null) {
326      ProcedureProtos.Procedure exclusiveLockOwnerProcedureProto =
327          convertToProtoProcedure(exclusiveLockOwnerProcedure);
328      builder.setExclusiveLockOwnerProcedure(exclusiveLockOwnerProcedureProto);
329    }
330
331    builder.setSharedLockCount(lockedResource.getSharedLockCount());
332
333    for (Procedure<?> waitingProcedure : lockedResource.getWaitingProcedures()) {
334      ProcedureProtos.Procedure waitingProcedureProto =
335          convertToProtoProcedure(waitingProcedure);
336      builder.addWaitingProcedures(waitingProcedureProto);
337    }
338
339    return builder.build();
340  }
341
342  public static final String PROCEDURE_RETRY_SLEEP_INTERVAL_MS =
343    "hbase.procedure.retry.sleep.interval.ms";
344
345  // default to 1 second
346  public static final long DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS = 1000;
347
348  public static final String PROCEDURE_RETRY_MAX_SLEEP_TIME_MS =
349    "hbase.procedure.retry.max.sleep.time.ms";
350
351  // default to 10 minutes
352  public static final long DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS =
353    TimeUnit.MINUTES.toMillis(10);
354
355  /**
356   * Get a retry counter for getting the backoff time. We will use the
357   * {@link ExponentialBackoffPolicyWithLimit} policy, and the base unit is 1 second, max sleep time
358   * is 10 minutes by default.
359   * <p/>
360   * For UTs, you can set the {@link #PROCEDURE_RETRY_SLEEP_INTERVAL_MS} and
361   * {@link #PROCEDURE_RETRY_MAX_SLEEP_TIME_MS} to make more frequent retry so your UT will not
362   * timeout.
363   */
364  public static RetryCounter createRetryCounter(Configuration conf) {
365    long sleepIntervalMs =
366      conf.getLong(PROCEDURE_RETRY_SLEEP_INTERVAL_MS, DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS);
367    long maxSleepTimeMs =
368      conf.getLong(PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS);
369    RetryConfig retryConfig = new RetryConfig().setSleepInterval(sleepIntervalMs)
370      .setMaxSleepTime(maxSleepTimeMs).setBackoffPolicy(new ExponentialBackoffPolicyWithLimit());
371    return new RetryCounter(retryConfig);
372  }
373}