001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * docs/licenses/cddl.txt 011 * or http://www.opensource.org/licenses/cddl1.php. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * docs/licenses/cddl.txt. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Portions Copyright 2010-2023 Ping Identity Corporation 026 */ 027package com.unboundid.directory.sdk.sync.scripting; 028 029import java.io.Serializable; 030import java.sql.SQLException; 031import java.util.LinkedList; 032import java.util.List; 033import java.util.Iterator; 034import java.util.concurrent.BlockingQueue; 035import java.util.concurrent.atomic.AtomicLong; 036 037import com.unboundid.ldap.sdk.Entry; 038import com.unboundid.util.Extensible; 039import com.unboundid.util.ThreadSafety; 040import com.unboundid.util.ThreadSafetyLevel; 041import com.unboundid.util.args.ArgumentException; 042import com.unboundid.util.args.ArgumentParser; 043import com.unboundid.directory.sdk.common.internal.Configurable; 044import com.unboundid.directory.sdk.sync.config.JDBCSyncSourceConfig; 045import com.unboundid.directory.sdk.sync.internal.SynchronizationServerExtension; 046import com.unboundid.directory.sdk.sync.types.DatabaseChangeRecord; 047import com.unboundid.directory.sdk.sync.types.SetStartpointOptions; 048import com.unboundid.directory.sdk.sync.types.SyncOperation; 049import com.unboundid.directory.sdk.sync.types.SyncServerContext; 050import com.unboundid.directory.sdk.sync.types.TransactionContext; 051 052/** 053 * This class defines an API that must be implemented by scripted extensions 054 * in order to synchronize data out of a relational database. Since the 055 * Ping Identity Data Sync Server is LDAP-centric, 056 * this API allows you to take database content and convert it into LDAP 057 * entries which can then be processed by the Data Sync Server. The 058 * lifecycle of 059 * a sync operation is as follows: 060 * <ol> 061 * <li>Detect change at the synchronization source</li> 062 * <li>Fetch full source entry</li> 063 * <li>Perform any mappings and compute the equivalent destination entry</li> 064 * <li>Fetch full destination entry</li> 065 * <li>Diff the computed destination entry and actual (fetched) destination 066 * entry</li> 067 * <li>Apply the minimal set of changes at the destination to bring it in sync 068 * </li> 069 * </ol> 070 * This implies that the 071 * {@link #fetchEntry(TransactionContext, SyncOperation)} method will be 072 * called once for every change that is returned by 073 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)}. 074 * <p> 075 * During realtime synchronization (i.e. when a Sync Pipe is running), there is 076 * a sliding window of changes being processed, and this API provides a 077 * distinction between some different points along that window: 078 * <ul> 079 * <li><b>Old changes</b>: These are changes that the Sync Server has 080 * processed and acknowledged back to the Sync Source. The Sync Source is 081 * under no obligation to re-detect these changes.</li> 082 * <li><b>Startpoint</b>: This marks where the Sync Source will start 083 * detecting changes if it is restarted.</li> 084 * <li><b>Detected but unacknowledged</b>: These changes have been returned by 085 * <code>getNextBatchOfChanges()</code> but not completely processed and 086 * acknowledged back to the Sync Source.</li> 087 * <li><b>Undetected changes</b>: The next call to 088 * <code>getNextBatchOfChanges()</code> should return the first changes 089 * that have not been detected. This should be somewhere at or ahead of 090 * the startpoint.</li> 091 * </ul> 092 * <p> 093 * In several places a {@link TransactionContext} is provided, which allows 094 * controlled access to the target database. By default, methods in this class 095 * are always provided with a fresh connection (i.e. a new transaction), and the 096 * Data Sync Server will always commit or rollback the transaction 097 * automatically, depending on whether the method returned normally or threw an 098 * exception. Implementers may optionally perform their own transaction 099 * management within these methods if necessary. 100 * <p> 101 * Several of these methods throw {@link SQLException}, which should be used in 102 * the case of any database access error. For other types of errors, runtime 103 * exceptions may be used (IllegalStateException, NullPointerException, etc.). 104 * The Data Sync Server will automatically retry operations that fail, 105 * up to a configurable amount of attempts. The exception to this rule is if a 106 * SQLException is thrown with a SQL state string beginning with "08"; this 107 * indicates a connection error, and in this case the operation is retried 108 * indefinitely. 109 * <BR> 110 * <H2>Configuring Groovy Scripted JDBC Sync Sources</H2> 111 * In order to configure a scripted JDBC sync source based on this API and 112 * written in the Groovy scripting language, use a command like: 113 * <PRE> 114 * dsconfig create-sync-source \ 115 * --source-name "<I>{source-name}</I>" \ 116 * --type groovy-scripted-jdbc \ 117 * --set "server:{server-name}" \ 118 * --set "script-class:<I>{class-name}</I>" \ 119 * --set "script-argument:<I>{name=value}</I>" 120 * </PRE> 121 * where "<I>{source-name}</I>" is the name to use for the JDBC sync source 122 * instance, "<I>{server-name}</I>" is the name of the JDBC external server that 123 * will be used as the sync source, "<I>{class-name}</I>" is the fully-qualified 124 * name of the Groovy class written using this API, and "<I>{name=value}</I>" 125 * represents name-value pairs for any arguments to provide to the JDBC sync 126 * source. If multiple arguments should be provided to the JDBC sync source, 127 * then the "<CODE>--set script-argument:<I>{name=value}</I></CODE>" option 128 * should be provided multiple times. 129 */ 130@Extensible() 131@SynchronizationServerExtension(appliesToLocalContent=false, 132 appliesToSynchronizedContent=true) 133public abstract class ScriptedJDBCSyncSource implements Configurable 134{ 135 /** 136 * {@inheritDoc} 137 */ 138 public void defineConfigArguments(final ArgumentParser parser) 139 throws ArgumentException 140 { 141 // No arguments will be allowed by default. 142 } 143 144 /** 145 * This hook is called when a Sync Pipe first starts up, when the 146 * <i>resync</i> process first starts up, or when the set-startpoint 147 * subcommand is called from the <i>realtime-sync</i> command line tool. 148 * Any initialization of this sync source should be performed here. This 149 * method should generally store the {@link SyncServerContext} in a class 150 * member so that it can be used elsewhere in the implementation. 151 * <p> 152 * A {@link TransactionContext} is provided, which allows 153 * controlled access to the target database. The context will contain a fresh 154 * fresh connection (i.e. a new transaction), and the Data Sync Server 155 * will always commit or rollback the transaction automatically, depending on 156 * whether this method returns normally or throws an exception. Implementers 157 * may optionally perform their own transaction management within this method 158 * if necessary. 159 * <p> 160 * The default implementation is empty. 161 * 162 * @param ctx 163 * a TransactionContext which provides a valid JDBC connection to the 164 * database. 165 * @param serverContext A handle to the server context for the server in 166 * which this extension is running. 167 * @param config The general configuration for this sync source. 168 * @param parser The argument parser which has been initialized from 169 * the configuration for this JDBC sync source. 170 */ 171 public void initializeJDBCSyncSource(final TransactionContext ctx, 172 final SyncServerContext serverContext, 173 final JDBCSyncSourceConfig config, 174 final ArgumentParser parser) 175 { 176 // No initialization will be performed by default. 177 } 178 179 /** 180 * This hook is called when a Sync Pipe shuts down, when the <i>resync</i> 181 * process shuts down, or when the set-startpoint subcommand (from the 182 * <i>realtime-sync</i> command line tool) is finished. Any clean up of this 183 * sync source should be performed here. 184 * <p> 185 * A {@link TransactionContext} is provided, which allows 186 * controlled access to the target database. The context will contain a fresh 187 * fresh connection (i.e. a new transaction), and the Data Sync Server 188 * will always commit or rollback the transaction automatically, depending on 189 * whether this method returns normally or throws an exception. Implementers 190 * may optionally perform their own transaction management within this method 191 * if necessary. 192 * <p> 193 * The default implementation is empty. 194 * 195 * @param ctx 196 * a TransactionContext which provides a valid JDBC connection to the 197 * database. 198 */ 199 public void finalizeJDBCSyncSource(final TransactionContext ctx) 200 { 201 //no implementation required 202 } 203 204 /** 205 * This method should effectively set the starting point for synchronization 206 * to the place specified by the <code>options</code> parameter. This should 207 * cause all changes previous to the specified start point to be disregarded 208 * and only changes after that point to be returned by 209 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)}. 210 * <p> 211 * There are several different startpoint types (see 212 * {@link SetStartpointOptions}), and this implementation is not required to 213 * support them all. If the specified startpoint type is unsupported, this 214 * method should throw an {@link UnsupportedOperationException}. 215 * <p> 216 * <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type 217 * must be supported by your implementation, because this is used when a Sync 218 * Pipe first starts up. 219 * <p> 220 * This method can be called from two different contexts: 221 * <ul> 222 * <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used 223 * (the Sync Pipe is required to be stopped in this context)</li> 224 * <li>Immediately after a Sync Pipe starts up and a connection is first 225 * established to the source server (e.g. before the first call to 226 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)})</li> 227 * </ul> 228 * <p> 229 * A {@link TransactionContext} is provided, which allows 230 * controlled access to the target database. The context will contain a fresh 231 * fresh connection (i.e. a new transaction), and the Data Sync Server 232 * will always commit or rollback the transaction automatically, depending on 233 * whether this method returns normally or throws an exception. Implementers 234 * may optionally perform their own transaction management within this method 235 * if necessary. 236 * 237 * @param ctx 238 * a TransactionContext which provides a valid JDBC connection to the 239 * database. 240 * @param options 241 * an object which indicates where exactly to start synchronizing 242 * (e.g. 243 * the end of the changelog, specific change number, a certain time 244 * ago, etc) 245 * @throws SQLException 246 * if there is any error while setting the start point 247 */ 248 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 249 public abstract void setStartpoint(final TransactionContext ctx, 250 final SetStartpointOptions options) 251 throws SQLException; 252 253 /** 254 * Gets the current value of the startpoint for change detection. This is the 255 * "bookmark" which indicates which changes have already been processed and 256 * which have not. In most cases, a change number is used to detect changes 257 * and is managed by the Data Sync Server, in which case this 258 * implementation needs only to return the latest acknowledged 259 * change number. In other cases, the return value may correspond to a 260 * different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server. 261 * In any case, this method should return the value that is updated by 262 * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}. 263 * <p> 264 * This method is called periodically and the return value is saved in the 265 * persistent state for the Sync Pipe that uses this script as its Sync 266 * Source. 267 * <p> 268 * <b>IMPORTANT</b>: The internal value for the startpoint should only be 269 * updated after a sync operation is acknowledged back to this script (via 270 * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}). 271 * Otherwise it will be possible for changes to be missed when the 272 * Data Sync Server is restarted or a connection error occurs. 273 * @return a value to store in the persistent state for the Sync Pipe. This is 274 * usually 275 * a change number, but if a changelog table is not used to detect 276 * changes, 277 * this value should represent some other token to pass into 278 * {@link #setStartpoint(TransactionContext, SetStartpointOptions)} 279 * when the sync pipe starts up. 280 */ 281 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 282 public abstract Serializable getStartpoint(); 283 284 /** 285 * Return the next batch of change records from the database. Change records 286 * are just hints that a change happened; they do not include the actual data 287 * of the change. In an effort to never synchronize stale data, the 288 * Data Sync Server will go back and fetch the full source entry for 289 * each change record. 290 * <p> 291 * On the first invocation, this should return changes starting from the 292 * startpoint that was set by 293 * {@link #setStartpoint(TransactionContext, SetStartpointOptions)}. This 294 * method is responsible for updating the internal state such that subsequent 295 * invocations do not return duplicate changes. 296 * <p> 297 * The resulting list should be limited by <code>maxChanges</code>. The 298 * <code>numStillPending</code> reference should be set to the estimated 299 * number of changes that haven't yet been retrieved from the changelog table 300 * when this method returns, or zero if all the current changes have been 301 * retrieved. 302 * <p> 303 * <b>IMPORTANT</b>: While this method needs to keep track of which changes 304 * have already been returned so that it does not return them again, it should 305 * <b>NOT</b> modify the official startpoint. The internal value for the 306 * startpoint should only be updated after a sync operation is acknowledged 307 * back to this script (via 308 * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}). 309 * Otherwise it will be possible for changes to be missed when the 310 * Data Sync Server is restarted or a connection error occurs. The 311 * startpoint should not change as a result of this method. 312 * <p> 313 * A {@link TransactionContext} is provided, which allows 314 * controlled access to the target database. The context will contain a fresh 315 * fresh connection (i.e. a new transaction), and the Data Sync Server 316 * will always commit or rollback the transaction automatically, depending on 317 * whether this method returns normally or throws an exception. Implementers 318 * may optionally perform their own transaction management within this method 319 * if necessary. 320 * <p> 321 * This method <b>does not need to be thread-safe</b>. It will be invoked 322 * repeatedly by a single thread, based on the polling interval set in the 323 * Sync Pipe configuration. 324 * @param ctx 325 * a TransactionContext which provides a valid JDBC connection to the 326 * database. 327 * @param maxChanges 328 * the maximum number of changes to retrieve 329 * @param numStillPending 330 * this should be set to the number of unretrieved changes that 331 * are still pending after this batch has been retrieved. This will 332 * be passed in 333 * as zero, and may be left that way if the actual value cannot be 334 * determined. 335 * @return a list of {@link DatabaseChangeRecord} instances, each 336 * corresponding 337 * to a row in the changelog table (or the equivalent if some other 338 * change 339 * tracking mechanism is being used). If there are no new changes to 340 * return, this 341 * method should return an empty list. 342 * @throws SQLException 343 * if there is any error while retrieving the next batch of changes 344 */ 345 public abstract List<DatabaseChangeRecord> getNextBatchOfChanges( 346 final TransactionContext ctx, 347 final int maxChanges, 348 final AtomicLong numStillPending) 349 throws SQLException; 350 351 /** 352 * Return a full source entry (in LDAP form) from the database, corresponding 353 * to the {@link DatabaseChangeRecord} that is passed in through the 354 * {@link SyncOperation}. This method should perform any queries necessary to 355 * gather the latest values for all the attributes to be synchronized. 356 * <p> 357 * A {@link TransactionContext} is provided, which allows 358 * controlled access to the target database. The context will contain a fresh 359 * fresh connection (i.e. a new transaction), and the Data Sync Server 360 * will always commit or rollback the transaction automatically, depending on 361 * whether this method returns normally or throws an exception. Implementers 362 * may optionally perform their own transaction management within this method 363 * if necessary. 364 * <p> 365 * This method <b>must be thread safe</b>, as it will be called repeatedly and 366 * concurrently by each of the Sync Pipe worker threads as they process 367 * entries. 368 * @param ctx 369 * a TransactionContext which provides a valid JDBC connection to the 370 * database. 371 * @param operation 372 * the SyncOperation which identifies the database "entry" to 373 * fetch. The DatabaseChangeRecord can be obtained by calling 374 * <code>operation.getDatabaseChangeRecord()</code>. 375 * This is returned by 376 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)} 377 * or by 378 * {@link #listAllEntries(TransactionContext, String, BlockingQueue)} 379 * . 380 * @return a full LDAP Entry, or null if no such entry exists. 381 * @throws SQLException 382 * if there is an error fetching the entry 383 */ 384 public abstract Entry fetchEntry(final TransactionContext ctx, 385 final SyncOperation operation) 386 throws SQLException; 387 388 /** 389 * Provides a way for the Data Sync Server to acknowledge back to the 390 * script which sync operations it has processed. This method should update 391 * the official startpoint which was set by 392 * {@link #setStartpoint(TransactionContext, SetStartpointOptions)} and is 393 * returned by {@link #getStartpoint()}. 394 * <p> 395 * <b>IMPORTANT</b>: The internal value for the startpoint should only be 396 * updated after a sync operation is acknowledged back to this script (via 397 * this method). Otherwise it will be possible for changes to be missed when 398 * the Data Sync Server is restarted or a connection error occurs. 399 * <p> 400 * A {@link TransactionContext} is provided in case the acknowledgment needs 401 * to make it all the way back to the database itself (for example if you were 402 * using Oracle's Change Data Capture). The context will contain a fresh 403 * fresh connection (i.e. a new transaction), and the Data Sync Server 404 * will always commit or rollback the transaction automatically, depending on 405 * whether this method returns normally or throws an exception. Implementers 406 * may optionally perform their own transaction management within this method 407 * if necessary. 408 * 409 * @param ctx 410 * a TransactionContext which provides a valid JDBC connection to the 411 * database. 412 * @param completedOps 413 * a list of {@link SyncOperation}s that have finished processing. 414 * The records are listed in the order they were 415 * first detected. 416 * @throws SQLException 417 * if there is an error acknowledging the changes back to the 418 * database 419 */ 420 public abstract void acknowledgeCompletedOps( 421 final TransactionContext ctx, 422 final LinkedList<SyncOperation> completedOps) 423 throws SQLException; 424 425 /** 426 * Performs a cleanup of the changelog table (if desired). There is a 427 * background thread that periodically invokes this method. It should remove 428 * any rows in the changelog table that are more than 429 * <code>maxAgeMillis</code> milliseconds old. 430 * <p> 431 * <b>NOTE:</b> If the system clock on the database server is not in sync with 432 * the system clock on the Data Sync Server, this method should query 433 * the database for its current time in order to determine the cut-off point 434 * for deleting changelog records. 435 * <p> 436 * A {@link TransactionContext} is provided, which allows 437 * controlled access to the target database. The context will contain a fresh 438 * fresh connection (i.e. a new transaction), and the Data Sync Server 439 * will always commit or rollback the transaction automatically, depending on 440 * whether this method returns normally or throws an exception. Implementers 441 * may optionally perform their own transaction management within this method 442 * if necessary. 443 * <p> 444 * If a separate mechanism will be used to manage the changelog table, this 445 * method may be implemented as a no-op and always return zero. This is how 446 * the default implementation behaves. 447 * @param ctx 448 * a TransactionContext which provides a valid JDBC connection to the 449 * database. 450 * @param maxAgeMillis 451 * the period of time (in milliseconds) after which a changelog table 452 * record should be deleted 453 * @return the number of rows that were deleted from the changelog table 454 * @throws SQLException 455 * if there is an error purging records from the changelog table 456 */ 457 public int cleanupChangelog(final TransactionContext ctx, 458 final long maxAgeMillis) 459 throws SQLException 460 { 461 //no implementation provided by default; this is an opt-in feature. 462 return 0; 463 } 464 465 /** 466 * Gets a list of all the entries in the database for a given entry type. This 467 * is used by the 'resync' command line tool. The default implementation 468 * throws a {@link UnsupportedOperationException}; subclasses should override 469 * if the resync functionality is needed. 470 * <p> 471 * The <code>entryType</code> is user-defined; it will be 472 * passed in on the command line for resync. The <code>outputQueue</code> 473 * should contain {@link DatabaseChangeRecord} objects with the 474 * <code>ChangeType</code> set to <i>resync</i>. 475 * <p> 476 * This method should not return until all the entries of the given entryType 477 * have been added to the output queue. Separate threads will concurrently 478 * drain entries from the queue and process them. The queue should not 479 * actually contain full entries, but rather DatabaseChangeRecord objects 480 * which identify the full database entries. These objects are then 481 * individually passed in to 482 * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore, 483 * it is important to make sure that the DatabaseChangeRecord instances 484 * contain enough identifiable information (e.g. primary keys) for each entry 485 * so that the entry can be found again. 486 * <p> 487 * The lifecycle of resync is similar to that of real-time sync, with a few 488 * differences: 489 * <ol> 490 * <li>Stream out a list of all IDs in the database (for a given entryType) 491 * </li> 492 * <li>Fetch full source entry for an ID</li> 493 * <li>Perform any mappings and compute the equivalent destination entry</li> 494 * <li>Fetch full destination entry</li> 495 * <li>Diff the computed destination entry and actual destination entry</li> 496 * <li>Apply the minimal set of changes at the destination to bring it in sync 497 * </li> 498 * </ol> 499 * If the total set of entries is very large, it is fine to split up the work 500 * into multiple database queries within this method. The queue will not grow 501 * out of control because it blocks when it becomes full. The queue capacity 502 * is fixed at 1000. 503 * <p> 504 * A {@link TransactionContext} is provided, which allows 505 * controlled access to the target database. The context will contain a fresh 506 * fresh connection (i.e. a new transaction), and the Data Sync Server 507 * will always commit or rollback the transaction automatically, depending on 508 * whether this method returns normally or throws an exception. Implementers 509 * may optionally perform their own transaction management within this method 510 * if necessary. 511 * 512 * @param ctx 513 * a TransactionContext which provides a valid JDBC connection to the 514 * database. 515 * @param entryType 516 * the type of database entry to be fetched (this is specified 517 * on the CLI for the resync command) 518 * @param outputQueue 519 * a queue of DatabaseChangeRecord objects which will be individually 520 * fetched via 521 * {@link #fetchEntry(TransactionContext, SyncOperation)} 522 * @throws SQLException 523 * if there is an error retrieving the list of entries to resync 524 */ 525 public void listAllEntries(final TransactionContext ctx, 526 final String entryType, 527 final BlockingQueue<DatabaseChangeRecord> 528 outputQueue) throws SQLException 529 { 530 throw new UnsupportedOperationException( 531 "The listAllEntries(TransactionContext,String,BlockingQueue) " + 532 "method must be implemented in the " + 533 getClass().getName() + " extension."); 534 } 535 536 /** 537 * Note: This method is deprecated and may be removed in a future release. 538 * All new and existing code should be changed to use the version of this 539 * method which includes the <code>entryType</code> parameter. 540 * <p> 541 * Gets a list of all the entries in the database from a given file input. 542 * This is used by the 'resync' command line tool. The default implementation 543 * throws a {@link UnsupportedOperationException}; subclasses should override 544 * if the resync functionality is needed for specific database records, which 545 * can be specified in the input file. 546 * <p> 547 * The format for the <code>inputLines</code> (e.g. the content of the file) 548 * is user-defined; it may be key/value pairs, primary keys, or full SQL 549 * statements, for example. The use of this method is triggered via the 550 * <i>--sourceInputFile</i> argument on the resync CLI. The 551 * <code>outputQueue</code> should contain {@link DatabaseChangeRecord} 552 * objects with the <code>ChangeType</code> set to <i>resync</i>. 553 * <p> 554 * This method should not return until all the entries specified by the input 555 * file have been added to the output queue. Separate threads will 556 * concurrently drain entries from the queue and process them. The queue 557 * should not actually contain full entries, but rather DatabaseChangeRecord 558 * objects which identify the full database entries. These objects are then 559 * individually passed in to 560 * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore, 561 * it is important to make sure that the DatabaseChangeRecord instances 562 * contain enough identifiable information (e.g. primary keys) for each entry 563 * so that the entry can be found again. 564 * <p> 565 * The lifecycle of resync is similar to that of real-time sync, with a few 566 * differences: 567 * <ol> 568 * <li>Stream out a list of all IDs in the database (using the given input 569 * file)</li> 570 * <li>Fetch full source entry for an ID</li> 571 * <li>Perform any mappings and compute the equivalent destination entry</li> 572 * <li>Fetch full destination entry</li> 573 * <li>Diff the computed destination entry and actual destination entry</li> 574 * <li>Apply the minimal set of changes at the destination to bring it in sync 575 * </li> 576 * </ol> 577 * If the total set of entries is very large, it is fine to split up the work 578 * into multiple database queries within this method. The queue will not grow 579 * out of control because it blocks when it becomes full. The queue capacity 580 * is fixed at 1000. 581 * <p> 582 * A {@link TransactionContext} is provided, which allows 583 * controlled access to the target database. The context will contain a fresh 584 * fresh connection (i.e. a new transaction), and the Data Sync Server 585 * will always commit or rollback the transaction automatically, depending on 586 * whether this method returns normally or throws an exception. Implementers 587 * may optionally perform their own transaction management within this method 588 * if necessary. 589 * 590 * @param ctx 591 * a TransactionContext which provides a valid JDBC connection to the 592 * database. 593 * @param inputLines 594 * an Iterator containing the lines from the specified input file to 595 * resync (this is specified on the CLI for the resync command). 596 * These lines can be any format, for example a set of primary keys, 597 * a set of WHERE clauses, a set of full SQL queries, etc. 598 * @param outputQueue 599 * a queue of DatabaseChangeRecord objects which will be individually 600 * fetched via {@link #fetchEntry(TransactionContext, SyncOperation)} 601 * @throws SQLException 602 * if there is an error retrieving the list of entries to resync 603 */ 604 @Deprecated 605 public void listAllEntries(final TransactionContext ctx, 606 final Iterator<String> inputLines, 607 final BlockingQueue<DatabaseChangeRecord> 608 outputQueue) throws SQLException 609 { 610 throw new UnsupportedOperationException( 611 "The listAllEntries(TransactionContext,Iterator,BlockingQueue) " + 612 "method must be implemented in the " + 613 getClass().getName() + " extension."); 614 } 615 616 /** 617 * Gets a list of all the entries in the database from a given file input. 618 * This is used by the 'resync' command line tool. The default implementation 619 * throws a {@link UnsupportedOperationException}; subclasses should override 620 * if the resync functionality is needed for specific database records, which 621 * can be specified in the input file. 622 * <p> 623 * The format for the <code>inputLines</code> (e.g. the content of the file) 624 * is user-defined; it may be key/value pairs, primary keys, or full SQL 625 * statements, for example. The use of this method is triggered via the 626 * <i>--sourceInputFile</i> argument on the resync CLI. The 627 * <code>outputQueue</code> should contain {@link DatabaseChangeRecord} 628 * objects with the <code>ChangeType</code> set to <i>resync</i>. 629 * <p> 630 * This method should not return until all the entries specified by the input 631 * file have been added to the output queue. Separate threads will 632 * concurrently drain entries from the queue and process them. The queue 633 * should not actually contain full entries, but rather DatabaseChangeRecord 634 * objects which identify the full database entries. These objects are then 635 * individually passed in to 636 * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore, 637 * it is important to make sure that the DatabaseChangeRecord instances 638 * contain enough identifiable information (e.g. primary keys) for each entry 639 * so that the entry can be found again. 640 * <p> 641 * The lifecycle of resync is similar to that of real-time sync, with a few 642 * differences: 643 * <ol> 644 * <li>Stream out a list of all IDs in the database (using the given input 645 * file)</li> 646 * <li>Fetch full source entry for an ID</li> 647 * <li>Perform any mappings and compute the equivalent destination entry</li> 648 * <li>Fetch full destination entry</li> 649 * <li>Diff the computed destination entry and actual destination entry</li> 650 * <li>Apply the minimal set of changes at the destination to bring it in sync 651 * </li> 652 * </ol> 653 * If the total set of entries is very large, it is fine to split up the work 654 * into multiple database queries within this method. The queue will not grow 655 * out of control because it blocks when it becomes full. The queue capacity 656 * is fixed at 1000. 657 * <p> 658 * A {@link TransactionContext} is provided, which allows 659 * controlled access to the target database. The context will contain a fresh 660 * fresh connection (i.e. a new transaction), and the Data Sync Server 661 * will always commit or rollback the transaction automatically, depending on 662 * whether this method returns normally or throws an exception. Implementers 663 * may optionally perform their own transaction management within this method 664 * if necessary. 665 * 666 * @param ctx 667 * a TransactionContext which provides a valid JDBC connection to the 668 * database. 669 * @param entryType 670 * the type of database entry to be fetched (this is specified 671 * on the CLI for the resync command) 672 * @param inputLines 673 * an Iterator containing the lines from the specified input file to 674 * resync (this is specified on the CLI for the resync command). 675 * These lines can be any format, for example a set of primary keys, 676 * a set of WHERE clauses, a set of full SQL queries, etc. 677 * @param outputQueue 678 * a queue of DatabaseChangeRecord objects which will be individually 679 * fetched via {@link #fetchEntry(TransactionContext, SyncOperation)} 680 * @throws SQLException 681 * if there is an error retrieving the list of entries to resync 682 */ 683 public void listAllEntries(final TransactionContext ctx, 684 final String entryType, 685 final Iterator<String> inputLines, 686 final BlockingQueue<DatabaseChangeRecord> 687 outputQueue) throws SQLException 688 { 689 throw new UnsupportedOperationException( 690 "The listAllEntries(TransactionContext,String,Iterator,BlockingQueue) "+ 691 "method must be implemented in the " + 692 getClass().getName() + " extension."); 693 } 694}