001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * docs/licenses/cddl.txt 011 * or http://www.opensource.org/licenses/cddl1.php. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * docs/licenses/cddl.txt. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2018-2020 Ping Identity Corporation 026 */ 027package com.unboundid.directory.sdk.sync.api; 028 029import com.unboundid.directory.sdk.common.internal.ExampleUsageProvider; 030import com.unboundid.directory.sdk.common.internal.Reconfigurable; 031import com.unboundid.directory.sdk.common.internal.UnboundIDExtension; 032import com.unboundid.directory.sdk.sync.config.ChangeDetectorConfig; 033import com.unboundid.directory.sdk.sync.internal.SynchronizationServerExtension; 034import com.unboundid.directory.sdk.sync.types.ChangeRecord; 035import com.unboundid.directory.sdk.sync.types.EndpointException; 036import com.unboundid.directory.sdk.sync.types.SetStartpointOptions; 037import com.unboundid.directory.sdk.sync.types.SyncOperation; 038import com.unboundid.directory.sdk.sync.types.SyncServerContext; 039import com.unboundid.directory.sdk.sync.types.SyncSourceContext; 040import com.unboundid.ldap.sdk.ResultCode; 041import com.unboundid.util.Extensible; 042import com.unboundid.util.args.ArgumentException; 043import com.unboundid.util.args.ArgumentParser; 044 045import java.io.Serializable; 046import java.util.Collections; 047import java.util.LinkedList; 048import java.util.List; 049import java.util.Map; 050import java.util.concurrent.atomic.AtomicLong; 051 052/** 053 * This class defines an API that must be implemented by extensions that 054 * detect changes for an LDAP based Sync Source. 055 * A Change Detector can be used to 056 * <ul> 057 * <li>Process logs or other flat files for changes.</li> 058 * <li>Process changes from a Queue (Kafka, RabbitMQ, etc)</li> 059 * <li>Override the standard cn=changelog 060 * based approach for detecting changes.</li> 061 * </ul> 062 * <br> 063 * <h2>Configuring Change Detectors</h2> 064 * In order to configure a Change Detector created using this API, use a 065 * command like: 066 * <pre> 067 * dsconfig create-change-detector \ 068 * --detector-name "<i>{detector-name}</i>" \ 069 * --type third-party \ 070 * --set "extension-class:<i>{class-name}</i>" \ 071 * --set "extension-argument:<i>{name=vale}</i>" 072 * </pre> 073 * where "<i>{plugin-name}</i>" is the name to use for the Change Detector 074 * instance, "<i>{class-name}</i>" is the fully-qualified name of the Java 075 * class that extends 076 * {@code com.unboundid.directory.sdk.sync.api.ChangeDetector}, 077 * and "<i>{name=value}</i>" represents name-value pairs for any arguments to 078 * provide to the Change Detector. If multiple arguments should be provided 079 * to the Change Detector, then the 080 * "<CODE>--set extension-argument:<I>{name=value}</I></CODE>" 081 * option should be provided multiple times. 082 */ 083@Extensible() 084@SynchronizationServerExtension(appliesToLocalContent = false, 085 appliesToSynchronizedContent = true) 086public abstract class ChangeDetector 087 implements UnboundIDExtension, 088 Reconfigurable<ChangeDetectorConfig>, 089 ExampleUsageProvider { 090 091 /** 092 * Creates a new instance of this LDAP Change Detector. All Change 093 * Detector implementations must include a default constructor, but any 094 * initialization should generally be done in the 095 * {@code initializeChangeDetector} method. 096 */ 097 public ChangeDetector() { 098 // No implementation is required. 099 } 100 101 /** 102 * {@inheritDoc} 103 */ 104 @Override 105 public abstract String getExtensionName(); 106 107 /** 108 * {@inheritDoc} 109 */ 110 @Override 111 public abstract String[] getExtensionDescription(); 112 113 /** 114 * {@inheritDoc} 115 */ 116 @Override 117 public Map<List<String>, String> getExamplesArgumentSets() { 118 return Collections.emptyMap(); 119 } 120 121 /** 122 * {@inheritDoc} 123 */ 124 @Override 125 public void defineConfigArguments(final ArgumentParser parser) 126 throws ArgumentException { 127 // No arguments will be allowed by default. 128 } 129 130 /** 131 * This hook is called when a Sync Pipe first starts up or when the 132 * set-startpoint subcommand is called from the <i>realtime-sync</i> command 133 * line tool. Any initialization of this change detector should be performed 134 * here. This method should generally store the {@link SyncServerContext} 135 * and {@link SyncSourceContext} in a class member so that it can be used 136 * elsewhere in the implementation. 137 * <p> 138 * The default implementation is empty. 139 * 140 * @param serverContext A handle to the server context for the server in 141 * which this extension is running. 142 * @param syncSourceContext An interface for interacting with the Sync 143 * Source that owns this Change Detector or 144 * {@code null} if the Change Detector is only 145 * being initialized to validate its configuration. 146 * @param parser The argument parser which has been initialized 147 * from the configuration for this sync source. 148 */ 149 public void initializeChangeDetector( 150 final SyncServerContext serverContext, 151 final SyncSourceContext syncSourceContext, 152 final ArgumentParser parser) { 153 // No initialization will be performed by default. 154 } 155 156 /** 157 * This hook is called when a Sync Pipe shuts down or when the set-startpoint 158 * subcommand (from the <i>realtime-sync</i> command line tool) is finished. 159 * Any clean up of this change detector should be performed here. 160 * <p> 161 * The default implementation is empty. 162 */ 163 public void finalizeChangeDetector() { 164 //No implementation required by default. 165 } 166 167 /** 168 * {@inheritDoc} 169 */ 170 public boolean isConfigurationAcceptable( 171 final ChangeDetectorConfig config, 172 final ArgumentParser parser, 173 final List<String> unacceptableReasons) { 174 // No implementation required by default. 175 return true; 176 } 177 178 /** 179 * {@inheritDoc} 180 */ 181 public ResultCode applyConfiguration(final ChangeDetectorConfig config, 182 final ArgumentParser parser, 183 final List<String> adminActionsRequired, 184 final List<String> messages) { 185 // No implementation required by default. 186 return ResultCode.SUCCESS; 187 } 188 189 /** 190 * This method should effectively set the starting point for synchronization 191 * to the place specified by the <code>options</code> parameter. This should 192 * cause all changes previous to the specified start point to be disregarded 193 * and only changes after that point to be returned by 194 * {@link #getNextBatchOfChanges(int, AtomicLong)}. 195 * <p> 196 * There are several different startpoint types (see 197 * {@link SetStartpointOptions}), and this implementation is not required to 198 * support them all. If the specified startpoint type is unsupported, this 199 * method should throw an {@link UnsupportedOperationException}. 200 * 201 * <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type 202 * must be supported by your implementation, because this is used when a Sync 203 * Pipe first starts up. The {@link Serializable} in this case is the same 204 * type that is returned by {@link #getStartpoint()}; the Sync Server persists 205 * it and passes it back in on a restart. 206 * <p> 207 * This method can be called from two different contexts: 208 * <ul> 209 * <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used 210 * (the Sync Pipe is required to be stopped in this context)</li> 211 * <li>Immediately after a Sync Pipe starts up and a connection is first 212 * established to the source server (e.g. before the first call to 213 * {@link #getNextBatchOfChanges(int, AtomicLong)})</li> 214 * </ul> 215 * 216 * @param options an object which indicates where exactly to start 217 * synchronizing (e.g. the end of the changelog, specific 218 * change number, a certain time ago, etc) 219 * @throws EndpointException if there is any error while setting the 220 * start point 221 */ 222 public abstract void setStartpoint(final SetStartpointOptions options) 223 throws EndpointException; 224 225 226 /** 227 * Gets the current value of the startpoint for change detection. This is the 228 * "bookmark" which indicates which changes have already been processed and 229 * which have not. In most cases, a change number is used to detect changes 230 * and is managed by the Data Sync Server, in which case this 231 * implementation needs only to return the latest acknowledged 232 * change number. In other cases, the return value may correspond to a 233 * different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server. 234 * In any case, this method should return the value that is updated by 235 * {@link #acknowledgeCompletedOps(LinkedList)}. 236 * <p> 237 * This method is called periodically and the return value is saved in the 238 * persistent state for the Sync Pipe that uses this extension as its Sync 239 * Source. 240 * 241 * <b>IMPORTANT</b>: The internal value for the startpoint should only be 242 * updated after a sync operation is acknowledged back to this extension (via 243 * {@link #acknowledgeCompletedOps(LinkedList)}). 244 * Otherwise it will be possible for changes to be missed when the 245 * Data Sync Server is restarted or a connection error occurs. 246 * 247 * @return a value to store in the persistent state for the Sync Pipe. This is 248 * usually a change number, but if a changelog table is not used to 249 * detect changes, this value should represent some other token to 250 * pass into {@link #setStartpoint(SetStartpointOptions)} 251 * when the sync pipe starts up. 252 */ 253 public abstract Serializable getStartpoint(); 254 255 /** 256 * Return the next batch of change records from the source. Change records 257 * are usually just hints that a change happened; they do not include 258 * the full contents of the target entry. In an effort to never synchronize 259 * stale data, the Data Sync Server will go back and fetch the full 260 * target entry for each change record. 261 * <p> 262 * On the first invocation, this should return changes starting from the 263 * startpoint that was set by 264 * {@link #setStartpoint(SetStartpointOptions)}. This method is also 265 * responsible for updating the internal state such that subsequent 266 * invocations do not return duplicate changes. 267 * <p> 268 * The resulting list should be limited by <code>maxChanges</code>. The 269 * <code>numStillPending</code> reference should be set to the estimated 270 * number of changes that haven't yet been retrieved from the source endpoint 271 * when this method returns, or zero if all the current changes have been 272 * retrieved. 273 * 274 * <b>IMPORTANT</b>: While this method needs to keep track of which changes 275 * have already been returned so that it does not return them again, it should 276 * <b>NOT</b> modify the official startpoint. The internal value for the 277 * startpoint should only be updated after a sync operation is acknowledged 278 * back to this extension (via 279 * {@link #acknowledgeCompletedOps(LinkedList)}). 280 * Otherwise it will be possible for changes to be missed when the 281 * Data Sync Server is restarted or a connection error occurs. The 282 * startpoint should not change as a result of this method. 283 * <p> 284 * This method <b>does not need to be thread-safe</b>. It will be invoked 285 * repeatedly by a single thread, based on the polling interval set in the 286 * Sync Pipe configuration. 287 * 288 * @param maxChanges the maximum number of changes to retrieve 289 * @param numStillPending this should be set to the number of unretrieved 290 * changes that are still pending after this batch has 291 * been retrieved. This will be passed in as zero, and 292 * may be left that way if the actual value cannot be 293 * determined. 294 * @return a list of {@link ChangeRecord} instances, each 295 * corresponding to a single change at the source endpoint. 296 * If there are no new changes to return, this method should return 297 * an empty list. 298 * @throws EndpointException if there is any error while retrieving the 299 * next batch of changes 300 */ 301 public abstract List<ChangeRecord> getNextBatchOfChanges( 302 final int maxChanges, 303 final AtomicLong numStillPending) 304 throws EndpointException; 305 306 /** 307 * Provides a way for the Data Sync Server to acknowledge back to the 308 * extension which sync operations it has processed. This method should update 309 * the official startpoint which was set by 310 * {@link #setStartpoint(SetStartpointOptions)} and is 311 * returned by {@link #getStartpoint()}. 312 * 313 * <b>IMPORTANT</b>: The internal value for the startpoint should only be 314 * updated after a sync operation is acknowledged back to this extension (via 315 * this method). Otherwise it will be possible for changes to be missed when 316 * the Data Sync Server is restarted or a connection error occurs. 317 * 318 * @param completedOps a list of {@link SyncOperation}s that have finished 319 * processing. The records are listed in the order they 320 * were first detected. 321 * @throws EndpointException if there is an error acknowledging the changes 322 * back to the source 323 */ 324 public abstract void acknowledgeCompletedOps( 325 final LinkedList<SyncOperation> completedOps) 326 throws EndpointException; 327}