001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * docs/licenses/cddl.txt 011 * or http://www.opensource.org/licenses/cddl1.php. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * docs/licenses/cddl.txt. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Portions Copyright 2011-2023 Ping Identity Corporation 026 */ 027package com.unboundid.directory.sdk.sync.scripting; 028 029 030import java.io.Serializable; 031import java.util.Iterator; 032import java.util.LinkedList; 033import java.util.List; 034import java.util.concurrent.BlockingQueue; 035import java.util.concurrent.atomic.AtomicLong; 036 037import com.unboundid.directory.sdk.common.internal.Configurable; 038import com.unboundid.directory.sdk.sync.config.SyncSourceConfig; 039import com.unboundid.directory.sdk.sync.internal.SynchronizationServerExtension; 040import com.unboundid.directory.sdk.sync.types.EndpointException; 041import com.unboundid.directory.sdk.sync.types.SetStartpointOptions; 042import com.unboundid.directory.sdk.sync.types.SyncOperation; 043import com.unboundid.directory.sdk.sync.types.SyncServerContext; 044import com.unboundid.directory.sdk.sync.types.ChangeRecord; 045import com.unboundid.ldap.sdk.Entry; 046import com.unboundid.util.Extensible; 047import com.unboundid.util.args.ArgumentException; 048import com.unboundid.util.args.ArgumentParser; 049 050 051 052/** 053 * This class defines an API that must be implemented by extensions in order to 054 * synchronize data from a generic (non-LDAP and non-JDBC) endpoint. Since the 055 * Ping Identity Data Sync Server is LDAP-centric, this API allows you to take 056 * generic content and convert it into LDAP entries which can then be processed 057 * by the Data Sync Server. The lifecycle of a sync operation is as 058 * follows: 059 * <ol> 060 * <li>Detect change at the synchronization source</li> 061 * <li>Fetch full source entry</li> 062 * <li>Perform any mappings and compute the equivalent destination entry</li> 063 * <li>Fetch full destination entry</li> 064 * <li>Diff the computed destination entry and actual (fetched) destination 065 * entry</li> 066 * <li>Apply the minimal set of changes at the destination to bring it in sync 067 * </li> 068 * </ol> 069 * This implies that the 070 * {@link #fetchEntry(SyncOperation)} method will be called once for every 071 * change that is returned by 072 * {@link #getNextBatchOfChanges(int, AtomicLong)}. 073 * <p> 074 * This is a generic interface and there is no protocol-specific connection 075 * management provided. It is expected that implementers will provide their own 076 * libraries for talking to the source endpoint and handle the connection 077 * lifecycle in the {@link #initializeSyncSource(SyncServerContext, 078 * SyncSourceConfig, ArgumentParser)} and {@link #finalizeSyncSource()} 079 * methods of this extension. 080 * <p> 081 * During realtime synchronization (i.e. when a Sync Pipe is running), there is 082 * a sliding window of changes being processed, and this API provides a 083 * distinction between some different points along that window: 084 * <ul> 085 * <li><b>Old changes</b>: These are changes that the Sync Server has 086 * processed and acknowledged back to the Sync Source. The Sync Source is 087 * under no obligation to re-detect these changes.</li> 088 * <li><b>Startpoint</b>: This marks where the Sync Source will start 089 * detecting changes if it is restarted.</li> 090 * <li><b>Detected but unacknowledged</b>: These changes have been returned by 091 * <code>getNextBatchOfChanges()</code> but not completely processed and 092 * acknowledged back to the Sync Source.</li> 093 * <li><b>Undetected changes</b>: The next call to 094 * <code>getNextBatchOfChanges()</code> should return the first changes 095 * that have not been detected. This should be somewhere at or ahead of 096 * the startpoint.</li> 097 * </ul> 098 * <p> 099 * Several of these methods throw {@link EndpointException}, which should be 100 * used in the case of any connection or endpoint error. For other types of 101 * errors, runtime exceptions may be used (IllegalStateException, 102 * NullPointerException, etc.). The Data Sync Server will automatically 103 * retry operations that fail, up to a configurable amount of attempts. The 104 * EndpointException class allows you to specify a retry policy as well. 105 * <BR> 106 * <H2>Configuring Groovy Scripted Sync Sources</H2> 107 * In order to configure a Sync Source based on this API and written in Groovy, 108 * use a command like: 109 * <PRE> 110 * dsconfig create-sync-source \ 111 * --source-name "<I>{source-name}</I>" \ 112 * --type groovy-scripted \ 113 * --set "script-class:<I>{class-name}</I>" \ 114 * --set "script-argument:<I>{name=value}</I>" 115 * </PRE> 116 * where "<I>{source-name}</I>" is the name to use for the Sync Source 117 * instance, "<I>{class-name}</I>" is the fully-qualified 118 * name of the Groovy class written using this API, and "<I>{name=value}</I>" 119 * represents name-value pairs for any arguments to provide to the sync 120 * source. If multiple arguments should be provided to the sync source, 121 * then the "<CODE>--set script-argument:<I>{name=value}</I></CODE>" option 122 * should be provided multiple times. 123 */ 124@Extensible() 125@SynchronizationServerExtension(appliesToLocalContent=false, 126 appliesToSynchronizedContent=true) 127public abstract class ScriptedSyncSource implements Configurable 128{ 129 /** 130 * {@inheritDoc} 131 */ 132 public void defineConfigArguments(final ArgumentParser parser) 133 throws ArgumentException 134 { 135 // No arguments will be allowed by default. 136 } 137 138 139 140 /** 141 * This hook is called when a Sync Pipe first starts up, when the 142 * <i>resync</i> process first starts up, or when the set-startpoint 143 * subcommand is called from the <i>realtime-sync</i> command line tool. 144 * Any initialization of this sync source should be performed here. This 145 * method should generally store the {@link SyncServerContext} in a class 146 * member so that it can be used elsewhere in the implementation. 147 * <p> 148 * The default implementation is empty. 149 * 150 * @param serverContext A handle to the server context for the server in 151 * which this extension is running. 152 * @param config The general configuration for this sync source. 153 * @param parser The argument parser which has been initialized from 154 * the configuration for this JDBC sync source. 155 */ 156 public void initializeSyncSource(final SyncServerContext serverContext, 157 final SyncSourceConfig config, 158 final ArgumentParser parser) 159 { 160 // No initialization will be performed by default. 161 } 162 163 164 165 /** 166 * This hook is called when a Sync Pipe shuts down, when the <i>resync</i> 167 * process shuts down, or when the set-startpoint subcommand (from the 168 * <i>realtime-sync</i> command line tool) is finished. Any clean up of this 169 * sync source should be performed here. 170 * <p> 171 * The default implementation is empty. 172 */ 173 public void finalizeSyncSource() 174 { 175 //No implementation required by default. 176 } 177 178 179 180 /** 181 * Return the URL or path identifying the source endpoint 182 * from which this extension is transmitting data. This is used for logging 183 * purposes only, so it could just be a server name or hostname and port, etc. 184 * 185 * @return the path to the source endpoint 186 */ 187 public abstract String getCurrentEndpointURL(); 188 189 190 191 /** 192 * This method should effectively set the starting point for synchronization 193 * to the place specified by the <code>options</code> parameter. This should 194 * cause all changes previous to the specified start point to be disregarded 195 * and only changes after that point to be returned by 196 * {@link #getNextBatchOfChanges(int, AtomicLong)}. 197 * <p> 198 * There are several different startpoint types (see 199 * {@link SetStartpointOptions}), and this implementation is not required to 200 * support them all. If the specified startpoint type is unsupported, this 201 * method should throw an {@link UnsupportedOperationException}. 202 * <p> 203 * <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type 204 * must be supported by your implementation, because this is used when a Sync 205 * Pipe first starts up. The {@link Serializable} in this case is the same 206 * type that is returned by {@link #getStartpoint()}; the Sync Server persists 207 * it and passes it back in on a restart. 208 * <p> 209 * This method can be called from two different contexts: 210 * <ul> 211 * <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used 212 * (the Sync Pipe is required to be stopped in this context)</li> 213 * <li>Immediately after a Sync Pipe starts up and a connection is first 214 * established to the source server (e.g. before the first call to 215 * {@link #getNextBatchOfChanges(int, AtomicLong)})</li> 216 * </ul> 217 * 218 * @param options 219 * an object which indicates where exactly to start synchronizing 220 * (e.g. the end of the changelog, specific change number, a certain 221 * time ago, etc) 222 * @throws EndpointException 223 * if there is any error while setting the start point 224 */ 225 public abstract void setStartpoint(final SetStartpointOptions options) 226 throws EndpointException; 227 228 229 230 /** 231 * Gets the current value of the startpoint for change detection. This is the 232 * "bookmark" which indicates which changes have already been processed and 233 * which have not. In most cases, a change number is used to detect changes 234 * and is managed by the Data Sync Server, in which case this 235 * implementation needs only to return the latest acknowledged 236 * change number. In other cases, the return value may correspond to a 237 * different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server. 238 * In any case, this method should return the value that is updated by 239 * {@link #acknowledgeCompletedOps(LinkedList)}. 240 * <p> 241 * This method is called periodically and the return value is saved in the 242 * persistent state for the Sync Pipe that uses this extension as its Sync 243 * Source. 244 * <p> 245 * <b>IMPORTANT</b>: The internal value for the startpoint should only be 246 * updated after a sync operation is acknowledged back to this script (via 247 * {@link #acknowledgeCompletedOps(LinkedList)}). 248 * Otherwise it will be possible for changes to be missed when the 249 * Data Sync Server is restarted or a connection error occurs. 250 * @return a value to store in the persistent state for the Sync Pipe. This is 251 * usually a change number, but if a changelog table is not used to 252 * detect changes, this value should represent some other token to 253 * pass into {@link #setStartpoint(SetStartpointOptions)} 254 * when the sync pipe starts up. 255 */ 256 public abstract Serializable getStartpoint(); 257 258 259 260 /** 261 * Return the next batch of change records from the source. Change records 262 * are usually just hints that a change happened; they do not include 263 * the full contents of the target entry. In an effort to never synchronize 264 * stale data, the Data Sync Server will go back and fetch the full 265 * target entry for each change record. 266 * <p> 267 * On the first invocation, this should return changes starting from the 268 * startpoint that was set by 269 * {@link #setStartpoint(SetStartpointOptions)}. This method is also 270 * responsible for updating the internal state such that subsequent 271 * invocations do not return duplicate changes. 272 * <p> 273 * The resulting list should be limited by <code>maxChanges</code>. The 274 * <code>numStillPending</code> reference should be set to the estimated 275 * number of changes that haven't yet been retrieved from the source endpoint 276 * when this method returns, or zero if all the current changes have been 277 * retrieved. 278 * <p> 279 * <b>IMPORTANT</b>: While this method needs to keep track of which changes 280 * have already been returned so that it does not return them again, it should 281 * <b>NOT</b> modify the official startpoint. The internal value for the 282 * startpoint should only be updated after a sync operation is acknowledged 283 * back to this script (via 284 * {@link #acknowledgeCompletedOps(LinkedList)}). 285 * Otherwise it will be possible for changes to be missed when the 286 * Data Sync Server is restarted or a connection error occurs. The 287 * startpoint should not change as a result of this method. 288 * <p> 289 * This method <b>does not need to be thread-safe</b>. It will be invoked 290 * repeatedly by a single thread, based on the polling interval set in the 291 * Sync Pipe configuration. 292 * 293 * @param maxChanges 294 * the maximum number of changes to retrieve 295 * @param numStillPending 296 * this should be set to the number of unretrieved changes that 297 * are still pending after this batch has been retrieved. This will 298 * be passed in 299 * as zero, and may be left that way if the actual value cannot be 300 * determined. 301 * @return a list of {@link ChangeRecord} instances, each 302 * corresponding to a single change at the source endpoint. 303 * If there are no new changes to return, this method should return 304 * an empty list. 305 * @throws EndpointException 306 * if there is any error while retrieving the next batch of changes 307 */ 308 public abstract List<ChangeRecord> getNextBatchOfChanges( 309 final int maxChanges, 310 final AtomicLong numStillPending) 311 throws EndpointException; 312 313 314 315 /** 316 * Return a full source entry (in LDAP form) from the source, corresponding 317 * to the {@link ChangeRecord} that is passed in through the 318 * {@link SyncOperation}. This method should perform any queries necessary to 319 * gather the latest values for all the attributes to be synchronized. 320 * <p> 321 * This method <b>must be thread safe</b>, as it will be called repeatedly and 322 * concurrently by each of the Sync Pipe worker threads as they process 323 * entries. 324 * <p> 325 * If the original ChangeRecord has the full entry already set on it (which 326 * can be done using <code>ChangeRecord.Builder#fullEntry(Entry)</code>, 327 * then this method will not get called, and the Sync Server will 328 * automatically use the full entry from the ChangeRecord. In this case, the 329 * implementation can always return {@code null}. 330 * 331 * @param operation 332 * the SyncOperation which identifies the source "entry" to 333 * fetch. The ChangeRecord can be obtained by calling 334 * <code>operation.getChangeRecord()</code>. 335 * These ChangeRecords are generated by 336 * {@link #getNextBatchOfChanges(int, AtomicLong)} 337 * or by 338 * {@link #listAllEntries(BlockingQueue)}. 339 * 340 * @return a full LDAP Entry, or null if no such entry exists. 341 * @throws EndpointException 342 * if there is an error fetching the entry 343 */ 344 public abstract Entry fetchEntry(final SyncOperation operation) 345 throws EndpointException; 346 347 348 349 /** 350 * Provides a way for the Data Sync Server to acknowledge back to the 351 * script which sync operations it has processed. This method should update 352 * the official startpoint which was set by 353 * {@link #setStartpoint(SetStartpointOptions)} and is 354 * returned by {@link #getStartpoint()}. 355 * <p> 356 * <b>IMPORTANT</b>: The internal value for the startpoint should only be 357 * updated after a sync operation is acknowledged back to this extension (via 358 * this method). Otherwise it will be possible for changes to be missed when 359 * the Data Sync Server is restarted or a connection error occurs. 360 * 361 * @param completedOps 362 * a list of {@link SyncOperation}s that have finished processing. 363 * The records are listed in the order they were first detected. 364 * @throws EndpointException 365 * if there is an error acknowledging the changes back to the 366 * database 367 */ 368 public abstract void acknowledgeCompletedOps( 369 final LinkedList<SyncOperation> completedOps) 370 throws EndpointException; 371 372 373 374 /** 375 * Gets a list of all the entries in the source endpoint. This is used by the 376 * 'resync' command line tool. The default implementation throws a 377 * {@link UnsupportedOperationException}; subclasses should override if the 378 * resync functionality is needed. 379 * <p> 380 * The <code>outputQueue</code> should contain {@link ChangeRecord} objects 381 * with the <code>ChangeType</code> set to <code>null</code> to indicate that 382 * these are resync operations. 383 * <p> 384 * This method should not return until all the entries at the source 385 * have been added to the output queue. Separate threads will concurrently 386 * drain entries from the queue and process them. The queue typically should 387 * not contain full entries, but rather ChangeRecord objects which identify 388 * the full source entries. These objects are then individually passed in to 389 * {@link #fetchEntry(SyncOperation)}. Therefore, it is important to make sure 390 * that the ChangeRecord instances contain enough identifiable information 391 * (e.g. primary keys) for each entry so that the entry can be found again. 392 * <p> 393 * The lifecycle of resync is similar to that of real-time sync, with a few 394 * differences: 395 * <ol> 396 * <li>Stream out a list of identifiers for the entries in the source 397 * endpoint, using a ChangeRecord as the identifier</li> 398 * <li>Fetch full source entry for a ChangeRecord</li> 399 * <li>Perform any mappings and compute the equivalent destination entry</li> 400 * <li>Fetch full destination entry</li> 401 * <li>Diff the computed destination entry and actual destination entry</li> 402 * <li>Apply the minimal set of changes at the destination to bring it in sync 403 * </li> 404 * </ol> 405 * <p> 406 * Alternatively, the full entry can be set on the ChangeRecord within this 407 * method, which will cause the "fetch full entry" step to be skipped. In this 408 * case the Sync Server will just use the entry specified on the ChangeRecord. 409 * <p> 410 * If the total set of entries is very large, it is fine to split up the work 411 * into multiple network queries within this method. The queue will not grow 412 * out of control because it blocks when it becomes full. The queue capacity 413 * is fixed at 1000. 414 * <p> 415 * @param outputQueue 416 * a queue of ChangeRecord objects which will be individually 417 * fetched via {@link #fetchEntry(SyncOperation)} 418 * @throws EndpointException 419 * if there is an error retrieving the list of entries to resync 420 */ 421 public void listAllEntries(final BlockingQueue<ChangeRecord> outputQueue) 422 throws EndpointException 423 { 424 throw new UnsupportedOperationException( 425 "The listAllEntries(BlockingQueue) " + 426 "method must be implemented in the " + 427 getClass().getName() + " extension."); 428 } 429 430 431 432 /** 433 * Gets a list of all the entries in the source from a given file input. 434 * This is used by the 'resync' command line tool. The default implementation 435 * throws a {@link UnsupportedOperationException}; subclasses should override 436 * if the resync functionality is needed for specific records, which 437 * can be specified in the input file. 438 * <p> 439 * The format for the <code>inputLines</code> (e.g. the content of the file) 440 * is user-defined; it may be key/value pairs, primary keys, or full SQL 441 * statements, for example. The use of this method is triggered via the 442 * <i>--sourceInputFile</i> argument on the resync CLI. The 443 * <code>outputQueue</code> should contain {@link ChangeRecord} 444 * objects with the <code>ChangeType</code> set to <code>null</code> to 445 * indicate that these are resync operations. 446 * <p> 447 * This method should not return until all the entries specified by the input 448 * file have been added to the output queue. Separate threads will 449 * concurrently drain entries from the queue and process them. The queue 450 * typically should not contain full entries, but rather ChangeRecord 451 * objects which identify the full source entries. These objects are then 452 * individually passed in to {@link #fetchEntry(SyncOperation)}. Therefore, 453 * it is important to make sure that the ChangeRecord instances 454 * contain enough identifiable information (e.g. primary keys) for each entry 455 * so that the entry can be found again. 456 * <p> 457 * The lifecycle of resync is similar to that of real-time sync, with a few 458 * differences: 459 * <ol> 460 * <li>Stream out a list of identifiers for entries in the source endpoint, 461 * using the given input file as the basis for which entries to resync 462 * </li> 463 * <li>Fetch full source entry for an identifier</li> 464 * <li>Perform any mappings and compute the equivalent destination entry</li> 465 * <li>Fetch full destination entry</li> 466 * <li>Diff the computed destination entry and actual destination entry</li> 467 * <li>Apply the minimal set of changes at the destination to bring it in sync 468 * </li> 469 * </ol> 470 * <p> 471 * Alternatively, the full entry can be set on the ChangeRecord within this 472 * method, which will cause the "fetch full entry" step to be skipped. In this 473 * case the Sync Server will just use the entry specified on the ChangeRecord. 474 * <p> 475 * If the total set of entries is very large, it is fine to split up the work 476 * into multiple network queries within this method. The queue will not grow 477 * out of control because it blocks when it becomes full. The queue capacity 478 * is fixed at 1000. 479 * <p> 480 * @param inputLines 481 * an Iterator containing the lines from the specified input file to 482 * resync (this is specified on the CLI for the resync command). 483 * These lines can be any format, for example a set of primary keys, 484 * a set of WHERE clauses, a set of full SQL queries, etc. 485 * @param outputQueue 486 * a queue of ChangeRecord objects which will be individually 487 * fetched via {@link #fetchEntry(SyncOperation)} 488 * @throws EndpointException 489 * if there is an error retrieving the list of entries to resync 490 */ 491 public void listAllEntries(final Iterator<String> inputLines, 492 final BlockingQueue<ChangeRecord> outputQueue) 493 throws EndpointException 494 { 495 throw new UnsupportedOperationException( 496 "The listAllEntries(Iterator,BlockingQueue) " + 497 "method must be implemented in the " + 498 getClass().getName() + " extension."); 499 } 500}