001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store; 019 020import java.io.IOException; 021 022import org.apache.hadoop.hbase.procedure2.Procedure; 023import org.apache.yetus.audience.InterfaceAudience; 024import org.apache.yetus.audience.InterfaceStability; 025 026/** 027 * The ProcedureStore is used by the executor to persist the state of each procedure execution. 028 * This allows to resume the execution of pending/in-progress procedures in case 029 * of machine failure or service shutdown. 030 */ 031@InterfaceAudience.Private 032@InterfaceStability.Evolving 033public interface ProcedureStore { 034 /** 035 * Store listener interface. 036 * <p/> 037 * The main process should register a listener and respond to the store events. 038 */ 039 public interface ProcedureStoreListener { 040 041 /** 042 * triggered when the store sync is completed. 043 */ 044 default void postSync() { 045 } 046 047 /** 048 * triggered when the store is not able to write out data. the main process should abort. 049 */ 050 default void abortProcess() { 051 } 052 053 /** 054 * Suggest that the upper layer should update the state of some procedures. Ignore this call 055 * will not effect correctness but performance. 056 * <p/> 057 * For a WAL based ProcedureStore implementation, if all the procedures stored in a WAL file 058 * have been deleted, or updated later in another WAL file, then we can delete the WAL file. If 059 * there are old procedures in a WAL file which are never deleted or updated, then we can not 060 * delete the WAL file and this will cause we hold lots of WAL file and slow down the master 061 * restarts. So here we introduce this method to tell the upper layer that please update the 062 * states of these procedures so that we can delete the old WAL file. 063 * @param procIds the id for the procedures 064 */ 065 default void forceUpdate(long[] procIds) { 066 } 067 } 068 069 /** 070 * An Iterator over a collection of Procedure 071 */ 072 public interface ProcedureIterator { 073 /** 074 * Reset the Iterator by seeking to the beginning of the list. 075 */ 076 void reset(); 077 078 /** 079 * Returns true if the iterator has more elements. 080 * (In other words, returns true if next() would return a Procedure 081 * rather than throwing an exception.) 082 * @return true if the iterator has more procedures 083 */ 084 boolean hasNext(); 085 086 /** 087 * Calling this method does not need to convert the protobuf message to the Procedure class, so 088 * if it returns true we can call {@link #skipNext()} to skip the procedure without 089 * deserializing. This could increase the performance. 090 * @return true if the iterator next element is a completed procedure. 091 */ 092 boolean isNextFinished(); 093 094 /** 095 * Skip the next procedure 096 * <p/> 097 * This method is used to skip the deserializing of the procedure to increase performance, as 098 * when calling next we need to convert the protobuf message to the Procedure class. 099 */ 100 void skipNext(); 101 102 /** 103 * Returns the next procedure in the iteration. 104 * @throws IOException if there was an error fetching/deserializing the procedure 105 * @return the next procedure in the iteration. 106 */ 107 @SuppressWarnings("rawtypes") 108 Procedure next() throws IOException; 109 } 110 111 /** 112 * Interface passed to the ProcedureStore.load() method to handle the store-load events. 113 */ 114 public interface ProcedureLoader { 115 /** 116 * Called by ProcedureStore.load() to notify about the maximum proc-id in the store. 117 * @param maxProcId the highest proc-id in the store 118 */ 119 void setMaxProcId(long maxProcId); 120 121 /** 122 * Called by the ProcedureStore.load() every time a set of procedures are ready to be executed. 123 * The ProcedureIterator passed to the method, has the procedure sorted in replay-order. 124 * @param procIter iterator over the procedures ready to be added to the executor. 125 */ 126 void load(ProcedureIterator procIter) throws IOException; 127 128 /** 129 * Called by the ProcedureStore.load() in case we have procedures not-ready to be added to 130 * the executor, which probably means they are corrupted since some information/link is missing. 131 * @param procIter iterator over the procedures not ready to be added to the executor, corrupted 132 */ 133 void handleCorrupted(ProcedureIterator procIter) throws IOException; 134 } 135 136 /** 137 * Add the listener to the notification list. 138 * @param listener The AssignmentListener to register 139 */ 140 void registerListener(ProcedureStoreListener listener); 141 142 /** 143 * Remove the listener from the notification list. 144 * @param listener The AssignmentListener to unregister 145 * @return true if the listner was in the list and it was removed, otherwise false. 146 */ 147 boolean unregisterListener(ProcedureStoreListener listener); 148 149 /** 150 * Start/Open the procedure store 151 * @param numThreads number of threads to be used by the procedure store 152 */ 153 void start(int numThreads) throws IOException; 154 155 /** 156 * Stop/Close the procedure store 157 * @param abort true if the stop is an abort 158 */ 159 void stop(boolean abort); 160 161 /** 162 * @return true if the store is running, otherwise false. 163 */ 164 boolean isRunning(); 165 166 /** 167 * @return the number of threads/slots passed to start() 168 */ 169 int getNumThreads(); 170 171 /** 172 * Set the number of procedure running. 173 * This can be used, for example, by the store to know how long to wait before a sync. 174 * @return how many procedures are running (may not be same as <code>count</code>). 175 */ 176 int setRunningProcedureCount(int count); 177 178 /** 179 * Acquire the lease for the procedure store. 180 */ 181 void recoverLease() throws IOException; 182 183 /** 184 * Load the Procedures in the store. 185 * @param loader the ProcedureLoader that will handle the store-load events 186 */ 187 void load(ProcedureLoader loader) throws IOException; 188 189 /** 190 * When a procedure is submitted to the executor insert(proc, null) will be called. 191 * 'proc' has a 'RUNNABLE' state and the initial information required to start up. 192 * 193 * When a procedure is executed and it returns children insert(proc, subprocs) will be called. 194 * 'proc' has a 'WAITING' state and an update state. 195 * 'subprocs' are the children in 'RUNNABLE' state with the initial information. 196 * 197 * @param proc the procedure to serialize and write to the store. 198 * @param subprocs the newly created child of the proc. 199 */ 200 void insert(Procedure<?> proc, Procedure<?>[] subprocs); 201 202 /** 203 * Serialize a set of new procedures. 204 * These procedures are freshly submitted to the executor and each procedure 205 * has a 'RUNNABLE' state and the initial information required to start up. 206 * 207 * @param procs the procedures to serialize and write to the store. 208 */ 209 void insert(Procedure<?>[] procs); 210 211 /** 212 * The specified procedure was executed, 213 * and the new state should be written to the store. 214 * @param proc the procedure to serialize and write to the store. 215 */ 216 void update(Procedure<?> proc); 217 218 /** 219 * The specified procId was removed from the executor, 220 * due to completion, abort or failure. 221 * The store implementor should remove all the information about the specified procId. 222 * @param procId the ID of the procedure to remove. 223 */ 224 void delete(long procId); 225 226 /** 227 * The parent procedure completed. 228 * Update the state and mark all the child deleted. 229 * @param parentProc the parent procedure to serialize and write to the store. 230 * @param subProcIds the IDs of the sub-procedure to remove. 231 */ 232 void delete(Procedure<?> parentProc, long[] subProcIds); 233 234 /** 235 * The specified procIds were removed from the executor, 236 * due to completion, abort or failure. 237 * The store implementor should remove all the information about the specified procIds. 238 * @param procIds the IDs of the procedures to remove. 239 * @param offset the array offset from where to start to delete 240 * @param count the number of IDs to delete 241 */ 242 void delete(long[] procIds, int offset, int count); 243}