001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * docs/licenses/cddl.txt 011 * or http://www.opensource.org/licenses/cddl1.php. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * docs/licenses/cddl.txt. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2010-2013 UnboundID Corp. 026 */ 027 package com.unboundid.directory.sdk.sync.api; 028 029 030 031 import java.io.Serializable; 032 import java.sql.SQLException; 033 import java.util.Collections; 034 import java.util.Iterator; 035 import java.util.LinkedList; 036 import java.util.List; 037 import java.util.Map; 038 import java.util.concurrent.BlockingQueue; 039 import java.util.concurrent.atomic.AtomicLong; 040 041 import com.unboundid.directory.sdk.common.internal.Configurable; 042 import com.unboundid.directory.sdk.common.internal.ExampleUsageProvider; 043 import com.unboundid.directory.sdk.common.internal.UnboundIDExtension; 044 import com.unboundid.directory.sdk.sync.config.JDBCSyncSourceConfig; 045 import com.unboundid.directory.sdk.sync.internal.SynchronizationServerExtension; 046 import com.unboundid.directory.sdk.sync.types.DatabaseChangeRecord; 047 import com.unboundid.directory.sdk.sync.types.SetStartpointOptions; 048 import com.unboundid.directory.sdk.sync.types.SyncOperation; 049 import com.unboundid.directory.sdk.sync.types.SyncServerContext; 050 import com.unboundid.directory.sdk.sync.types.TransactionContext; 051 import com.unboundid.ldap.sdk.Entry; 052 import com.unboundid.util.Extensible; 053 import com.unboundid.util.ThreadSafety; 054 import com.unboundid.util.ThreadSafetyLevel; 055 import com.unboundid.util.args.ArgumentException; 056 import com.unboundid.util.args.ArgumentParser; 057 058 059 060 /** 061 * This class defines an API that must be implemented by extensions 062 * in order to synchronize data out of a relational database. Since the 063 * UnboundID Synchronization Server is LDAP-centric, 064 * this API allows you to take database content and convert it into LDAP 065 * entries which can then be processed by the Synchronization Server. The 066 * lifecycle of 067 * a sync operation is as follows: 068 * <ol> 069 * <li>Detect change at the synchronization source</li> 070 * <li>Fetch full source entry</li> 071 * <li>Perform any mappings and compute the equivalent destination entry</li> 072 * <li>Fetch full destination entry</li> 073 * <li>Diff the computed destination entry and actual (fetched) destination 074 * entry</li> 075 * <li>Apply the minimal set of changes at the destination to bring it in sync 076 * </li> 077 * </ol> 078 * This implies that the 079 * {@link #fetchEntry(TransactionContext, SyncOperation)} method will be 080 * called once for every change that is returned by 081 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)}. 082 * <p> 083 * During realtime synchronization (i.e. when a Sync Pipe is running), there is 084 * a sliding window of changes being processed, and this API provides a 085 * distinction between some different points along that window: 086 * <ul> 087 * <li><b>Old changes</b>: These are changes that the Sync Server has 088 * processed and acknowledged back to the Sync Source. The Sync Source is 089 * under no obligation to re-detect these changes.</li> 090 * <li><b>Startpoint</b>: This marks where the Sync Source will start 091 * detecting changes if it is restarted.</li> 092 * <li><b>Detected but unacknowledged</b>: These changes have been returned by 093 * <code>getNextBatchOfChanges()</code> but not completely processed and 094 * acknowledged back to the Sync Source.</li> 095 * <li><b>Undetected changes</b>: The next call to 096 * <code>getNextBatchOfChanges()</code> should return the first changes 097 * that have not been detected. This should be somewhere at or ahead of 098 * the startpoint.</li> 099 * </ul> 100 * <p> 101 * In several places a {@link TransactionContext} is provided, which allows 102 * controlled access to the target database. By default, methods in this class 103 * are always provided with a fresh connection (i.e. a new transaction), and the 104 * Synchronization Server will always commit or rollback the transaction 105 * automatically, depending on whether the method returned normally or threw an 106 * exception. Implementers may optionally perform their own transaction 107 * management within these methods if necessary. 108 * <p> 109 * Several of these methods throw {@link SQLException}, which should be used in 110 * the case of any database access error. For other types of errors, runtime 111 * exceptions may be used (IllegalStateException, NullPointerException, etc.). 112 * The Synchronization Server will automatically retry operations that fail, up 113 * to a configurable amount of attempts. The exception to this rule is if a 114 * SQLException is thrown with a SQL state string beginning with "08"; this 115 * indicates a connection error, and in this case the operation is retried 116 * indefinitely. 117 * <BR> 118 * <H2>Configuring JDBC Sync Sources</H2> 119 * In order to configure a JDBC sync source based on this API and 120 * written in Java, use a command like: 121 * <PRE> 122 * dsconfig create-sync-source \ 123 * --source-name "<I>{source-name}</I>" \ 124 * --type third-party-jdbc \ 125 * --set "server:{server-name}" \ 126 * --set "extension-class:<I>{class-name}</I>" \ 127 * --set "extension-argument:<I>{name=value}</I>" 128 * </PRE> 129 * where "<I>{source-name}</I>" is the name to use for the JDBC sync source 130 * instance, "<I>{server-name}</I>" is the name of the JDBC external server that 131 * will be used as the sync source, "<I>{class-name}</I>" is the fully-qualified 132 * name of the Java class written using this API, and "<I>{name=value}</I>" 133 * represents name-value pairs for any arguments to provide to the JDBC sync 134 * source. If multiple arguments should be provided to the JDBC sync source, 135 * then the "<CODE>--set extension-argument:<I>{name=value}</I></CODE>" option 136 * should be provided multiple times. 137 */ 138 @Extensible() 139 @SynchronizationServerExtension(appliesToLocalContent=false, 140 appliesToSynchronizedContent=true) 141 public abstract class JDBCSyncSource 142 implements UnboundIDExtension, 143 Configurable, 144 ExampleUsageProvider 145 { 146 /** 147 * Creates a new instance of this JDBC Sync Source. All sync 148 * source implementations must include a default constructor, but any 149 * initialization should generally be done in the 150 * {@link #initializeJDBCSyncSource} method. 151 */ 152 public JDBCSyncSource() 153 { 154 // No implementation is required. 155 } 156 157 158 159 /** 160 * {@inheritDoc} 161 */ 162 public abstract String getExtensionName(); 163 164 165 166 /** 167 * {@inheritDoc} 168 */ 169 public abstract String[] getExtensionDescription(); 170 171 172 173 /** 174 * {@inheritDoc} 175 */ 176 public Map<List<String>,String> getExamplesArgumentSets() 177 { 178 return Collections.emptyMap(); 179 } 180 181 182 183 /** 184 * {@inheritDoc} 185 */ 186 public void defineConfigArguments(final ArgumentParser parser) 187 throws ArgumentException 188 { 189 // No arguments will be allowed by default. 190 } 191 192 193 194 /** 195 * This hook is called when a Sync Pipe first starts up, when the 196 * <i>resync</i> process first starts up, or when the set-startpoint 197 * subcommand is called from the <i>realtime-sync</i> command line tool. 198 * Any initialization of this sync source should be performed here. This 199 * method should generally store the {@link SyncServerContext} in a class 200 * member so that it can be used elsewhere in the implementation. 201 * <p> 202 * A {@link TransactionContext} is provided, which allows 203 * controlled access to the target database. The context will contain a fresh 204 * fresh connection (i.e. a new transaction), and the Synchronization Server 205 * will always commit or rollback the transaction automatically, depending on 206 * whether this method returns normally or throws an exception. Implementers 207 * may optionally perform their own transaction management within this method 208 * if necessary. 209 * <p> 210 * The default implementation is empty. 211 * 212 * @param ctx 213 * a TransactionContext which provides a valid JDBC connection to the 214 * database. 215 * @param serverContext A handle to the server context for the server in 216 * which this extension is running. 217 * @param config The general configuration for this sync source. 218 * @param parser The argument parser which has been initialized from 219 * the configuration for this JDBC sync source. 220 */ 221 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 222 public void initializeJDBCSyncSource(final TransactionContext ctx, 223 final SyncServerContext serverContext, 224 final JDBCSyncSourceConfig config, 225 final ArgumentParser parser) 226 { 227 // No initialization will be performed by default. 228 } 229 230 231 232 /** 233 * This hook is called when a Sync Pipe shuts down, when the <i>resync</i> 234 * process shuts down, or when the set-startpoint subcommand (from the 235 * <i>realtime-sync</i> command line tool) is finished. Any clean up of this 236 * sync source should be performed here. 237 * <p> 238 * A {@link TransactionContext} is provided, which allows 239 * controlled access to the target database. The context will contain a fresh 240 * fresh connection (i.e. a new transaction), and the Synchronization Server 241 * will always commit or rollback the transaction automatically, depending on 242 * whether this method returns normally or throws an exception. Implementers 243 * may optionally perform their own transaction management within this method 244 * if necessary. 245 * <p> 246 * The default implementation is empty. 247 * 248 * @param ctx 249 * a TransactionContext which provides a valid JDBC connection to the 250 * database. 251 */ 252 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 253 public void finalizeJDBCSyncSource(final TransactionContext ctx) 254 { 255 //No implementation required by default. 256 } 257 258 259 260 /** 261 * This method should effectively set the starting point for synchronization 262 * to the place specified by the <code>options</code> parameter. This should 263 * cause all changes previous to the specified start point to be disregarded 264 * and only changes after that point to be returned by 265 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)}. 266 * <p> 267 * There are several different startpoint types (see 268 * {@link SetStartpointOptions}), and this implementation is not required to 269 * support them all. If the specified startpoint type is unsupported, this 270 * method should throw an {@link UnsupportedOperationException}. 271 * <p> 272 * <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type 273 * must be supported by your implementation, because this is used when a Sync 274 * Pipe first starts up. 275 * <p> 276 * This method can be called from two different contexts: 277 * <ul> 278 * <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used 279 * (the Sync Pipe is required to be stopped in this context)</li> 280 * <li>Immediately after a Sync Pipe starts up and a connection is first 281 * established to the source server (e.g. before the first call to 282 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)})</li> 283 * </ul> 284 * <p> 285 * A {@link TransactionContext} is provided, which allows 286 * controlled access to the target database. The context will contain a fresh 287 * fresh connection (i.e. a new transaction), and the Synchronization Server 288 * will always commit or rollback the transaction automatically, depending on 289 * whether this method returns normally or throws an exception. Implementers 290 * may optionally perform their own transaction management within this method 291 * if necessary. 292 * 293 * @param ctx 294 * a TransactionContext which provides a valid JDBC connection to the 295 * database. 296 * @param options 297 * an object which indicates where exactly to start synchronizing 298 * (e.g. 299 * the end of the changelog, specific change number, a certain time 300 * ago, etc) 301 * @throws SQLException 302 * if there is any error while setting the start point 303 */ 304 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 305 public abstract void setStartpoint(final TransactionContext ctx, 306 final SetStartpointOptions options) 307 throws SQLException; 308 309 310 311 /** 312 * Gets the current value of the startpoint for change detection. This is the 313 * "bookmark" which indicates which changes have already been processed and 314 * which have not. In most cases, a change number is used to detect changes 315 * and is managed by the Synchronization Server, in which case this 316 * implementation needs only to return the latest acknowledged 317 * change number. In other cases, the return value may correspond to a 318 * different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server. 319 * In any case, this method should return the value that is updated by 320 * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}. 321 * <p> 322 * This method is called periodically and the return value is saved in the 323 * persistent state for the Sync Pipe that uses this script as its Sync 324 * Source. 325 * <p> 326 * <b>IMPORTANT</b>: The internal value for the startpoint should only be 327 * updated after a sync operation is acknowledged back to this script (via 328 * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}). 329 * Otherwise it will be possible for changes to be missed when the 330 * Synchronization Server is restarted or a connection error occurs. 331 * @return a value to store in the persistent state for the Sync Pipe. This is 332 * usually 333 * a change number, but if a changelog table is not used to detect 334 * changes, 335 * this value should represent some other token to pass into 336 * {@link #setStartpoint(TransactionContext, SetStartpointOptions)} 337 * when the sync pipe starts up. 338 */ 339 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 340 public abstract Serializable getStartpoint(); 341 342 343 344 /** 345 * Return the next batch of change records from the database. Change records 346 * are just hints that a change happened; they do not include the actual data 347 * of the change. In an effort to never synchronize stale data, the 348 * Synchronization Server will go back and fetch the full source entry for 349 * each change record. 350 * <p> 351 * On the first invocation, this should return changes starting from the 352 * startpoint that was set by 353 * {@link #setStartpoint(TransactionContext, SetStartpointOptions)}. This 354 * method is responsible for updating the internal state such that subsequent 355 * invocations do not return duplicate changes. 356 * <p> 357 * The resulting list should be limited by <code>maxChanges</code>. The 358 * <code>numStillPending</code> reference should be set to the estimated 359 * number of changes that haven't yet been retrieved from the changelog table 360 * when this method returns, or zero if all the current changes have been 361 * retrieved. 362 * <p> 363 * <b>IMPORTANT</b>: While this method needs to keep track of which changes 364 * have already been returned so that it does not return them again, it should 365 * <b>NOT</b> modify the official startpoint. The internal value for the 366 * startpoint should only be updated after a sync operation is acknowledged 367 * back to this script (via 368 * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}). 369 * Otherwise it will be possible for changes to be missed when the 370 * Synchronization Server is restarted or a connection error occurs. The 371 * startpoint should not change as a result of this method. 372 * <p> 373 * A {@link TransactionContext} is provided, which allows 374 * controlled access to the target database. The context will contain a fresh 375 * fresh connection (i.e. a new transaction), and the Synchronization Server 376 * will always commit or rollback the transaction automatically, depending on 377 * whether this method returns normally or throws an exception. Implementers 378 * may optionally perform their own transaction management within this method 379 * if necessary. 380 * <p> 381 * This method <b>does not need to be thread-safe</b>. It will be invoked 382 * repeatedly by a single thread, based on the polling interval set in the 383 * Sync Pipe configuration. 384 * @param ctx 385 * a TransactionContext which provides a valid JDBC connection to the 386 * database. 387 * @param maxChanges 388 * the maximum number of changes to retrieve 389 * @param numStillPending 390 * this should be set to the number of unretrieved changes that 391 * are still pending after this batch has been retrieved. This will 392 * be passed in 393 * as zero, and may be left that way if the actual value cannot be 394 * determined. 395 * @return a list of {@link DatabaseChangeRecord} instances, each 396 * corresponding 397 * to a row in the changelog table (or the equivalent if some other 398 * change 399 * tracking mechanism is being used). If there are no new changes to 400 * return, this 401 * method should return an empty list. 402 * @throws SQLException 403 * if there is any error while retrieving the next batch of changes 404 */ 405 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 406 public abstract List<DatabaseChangeRecord> getNextBatchOfChanges( 407 final TransactionContext ctx, 408 final int maxChanges, 409 final AtomicLong numStillPending) 410 throws SQLException; 411 412 413 414 /** 415 * Return a full source entry (in LDAP form) from the database, corresponding 416 * to the {@link DatabaseChangeRecord} that is passed in through the 417 * {@link SyncOperation}. This method should perform any queries necessary to 418 * gather the latest values for all the attributes to be synchronized. 419 * <p> 420 * A {@link TransactionContext} is provided, which allows 421 * controlled access to the target database. The context will contain a fresh 422 * fresh connection (i.e. a new transaction), and the Synchronization Server 423 * will always commit or rollback the transaction automatically, depending on 424 * whether this method returns normally or throws an exception. Implementers 425 * may optionally perform their own transaction management within this method 426 * if necessary. 427 * <p> 428 * This method <b>must be thread safe</b>, as it will be called repeatedly and 429 * concurrently by each of the Sync Pipe worker threads as they process 430 * entries. 431 * @param ctx 432 * a TransactionContext which provides a valid JDBC connection to the 433 * database. 434 * @param operation 435 * the SyncOperation which identifies the database "entry" to 436 * fetch. The DatabaseChangeRecord can be obtained by calling 437 * <code>operation.getDatabaseChangeRecord()</code>. 438 * This is returned by 439 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)} 440 * or by 441 * {@link #listAllEntries(TransactionContext, String, BlockingQueue)} 442 * . 443 * @return a full LDAP Entry, or null if no such entry exists. 444 * @throws SQLException 445 * if there is an error fetching the entry 446 */ 447 @ThreadSafety(level = ThreadSafetyLevel.METHOD_THREADSAFE) 448 public abstract Entry fetchEntry(final TransactionContext ctx, 449 final SyncOperation operation) 450 throws SQLException; 451 452 453 454 /** 455 * Provides a way for the Synchronization Server to acknowledge back to the 456 * script which sync operations it has processed. This method should update 457 * the official startpoint which was set by 458 * {@link #setStartpoint(TransactionContext, SetStartpointOptions)} and is 459 * returned by {@link #getStartpoint()}. 460 * <p> 461 * <b>IMPORTANT</b>: The internal value for the startpoint should only be 462 * updated after a sync operation is acknowledged back to this script (via 463 * this method). Otherwise it will be possible for changes to be missed when 464 * the Synchronization Server is restarted or a connection error occurs. 465 * <p> 466 * A {@link TransactionContext} is provided in case the acknowledgment needs 467 * to make it all the way back to the database itself (for example if you were 468 * using Oracle's Change Data Capture). The context will contain a fresh 469 * fresh connection (i.e. a new transaction), and the Synchronization Server 470 * will always commit or rollback the transaction automatically, depending on 471 * whether this method returns normally or throws an exception. Implementers 472 * may optionally perform their own transaction management within this method 473 * if necessary. 474 * 475 * @param ctx 476 * a TransactionContext which provides a valid JDBC connection to the 477 * database. 478 * @param completedOps 479 * a list of {@link SyncOperation}s that have finished processing. 480 * The records are listed in the order they were first detected. 481 * @throws SQLException 482 * if there is an error acknowledging the changes back to the 483 * database 484 */ 485 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 486 public abstract void acknowledgeCompletedOps( 487 final TransactionContext ctx, 488 final LinkedList<SyncOperation> completedOps) 489 throws SQLException; 490 491 492 493 /** 494 * Performs a cleanup of the changelog table (if desired). There is a 495 * background thread that periodically invokes this method. It should remove 496 * any rows in the changelog table that are more than 497 * <code>maxAgeMillis</code> milliseconds old. 498 * <p> 499 * <b>NOTE:</b> If the system clock on the database server is not in sync with 500 * the system clock on the Synchronization Server, this method should query 501 * the database for its current time in order to determine the cut-off point 502 * for deleting changelog records. 503 * <p> 504 * A {@link TransactionContext} is provided, which allows 505 * controlled access to the target database. The context will contain a fresh 506 * fresh connection (i.e. a new transaction), and the Synchronization Server 507 * will always commit or rollback the transaction automatically, depending on 508 * whether this method returns normally or throws an exception. Implementers 509 * may optionally perform their own transaction management within this method 510 * if necessary. 511 * <p> 512 * If a separate mechanism will be used to manage the changelog table, this 513 * method may be implemented as a no-op and always return zero. This is how 514 * the default implementation behaves. 515 * @param ctx 516 * a TransactionContext which provides a valid JDBC connection to the 517 * database. 518 * @param maxAgeMillis 519 * the period of time (in milliseconds) after which a changelog table 520 * record should be deleted 521 * @return the number of rows that were deleted from the changelog table 522 * @throws SQLException 523 * if there is an error purging records from the changelog table 524 */ 525 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 526 public int cleanupChangelog(final TransactionContext ctx, 527 final long maxAgeMillis) 528 throws SQLException 529 { 530 //no implementation provided by default; this is an opt-in feature. 531 return 0; 532 } 533 534 535 536 /** 537 * Gets a list of all the entries in the database for a given entry type. This 538 * is used by the 'resync' command line tool. The default implementation 539 * throws a {@link UnsupportedOperationException}; subclasses should override 540 * if the resync functionality is needed. 541 * <p> 542 * The <code>entryType</code> is user-defined; it will be 543 * passed in on the command line for resync. The <code>outputQueue</code> 544 * should contain {@link DatabaseChangeRecord} objects with the 545 * <code>ChangeType</code> set to <i>resync</i>. 546 * <p> 547 * This method should not return until all the entries of the given entryType 548 * have been added to the output queue. Separate threads will concurrently 549 * drain entries from the queue and process them. The queue should not 550 * actually contain full entries, but rather DatabaseChangeRecord objects 551 * which identify the full database entries. These objects are then 552 * individually passed in to 553 * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore, 554 * it is important to make sure that the DatabaseChangeRecord instances 555 * contain enough identifiable information (e.g. primary keys) for each entry 556 * so that the entry can be found again. 557 * <p> 558 * The lifecycle of resync is similar to that of real-time sync, with a few 559 * differences: 560 * <ol> 561 * <li>Stream out a list of all IDs in the database (for a given entryType) 562 * </li> 563 * <li>Fetch full source entry for an ID</li> 564 * <li>Perform any mappings and compute the equivalent destination entry</li> 565 * <li>Fetch full destination entry</li> 566 * <li>Diff the computed destination entry and actual destination entry</li> 567 * <li>Apply the minimal set of changes at the destination to bring it in sync 568 * </li> 569 * </ol> 570 * If the total set of entries is very large, it is fine to split up the work 571 * into multiple database queries within this method. The queue will not grow 572 * out of control because it blocks when it becomes full. The queue capacity 573 * is fixed at 1000. 574 * <p> 575 * A {@link TransactionContext} is provided, which allows 576 * controlled access to the target database. The context will contain a fresh 577 * fresh connection (i.e. a new transaction), and the Synchronization Server 578 * will always commit or rollback the transaction automatically, depending on 579 * whether this method returns normally or throws an exception. Implementers 580 * may optionally perform their own transaction management within this method 581 * if necessary. 582 * 583 * @param ctx 584 * a TransactionContext which provides a valid JDBC connection to the 585 * database. 586 * @param entryType 587 * the type of database entry to be fetched (this is specified 588 * on the CLI for the resync command) 589 * @param outputQueue 590 * a queue of DatabaseChangeRecord objects which will be individually 591 * fetched via {@link #fetchEntry(TransactionContext, SyncOperation)} 592 * @throws SQLException 593 * if there is an error retrieving the list of entries to resync 594 */ 595 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 596 public void listAllEntries(final TransactionContext ctx, 597 final String entryType, 598 final BlockingQueue<DatabaseChangeRecord> 599 outputQueue) throws SQLException 600 { 601 throw new UnsupportedOperationException(); 602 } 603 604 605 606 /** 607 * Note: This method is deprecated and may be removed in a future release. 608 * All new and existing code should be changed to use the version of this 609 * method which includes the <code>entryType</code> parameter. 610 * <p> 611 * Gets a list of all the entries in the database from a given file input. 612 * This is used by the 'resync' command line tool. The default implementation 613 * throws a {@link UnsupportedOperationException}; subclasses should override 614 * if the resync functionality is needed for specific database records, which 615 * can be specified in the input file. 616 * <p> 617 * The format for the <code>inputLines</code> (e.g. the content of the file) 618 * is user-defined; it may be key/value pairs, primary keys, or full SQL 619 * statements, for example. The use of this method is triggered via the 620 * <i>--sourceInputFile</i> argument on the resync CLI. The 621 * <code>outputQueue</code> should contain {@link DatabaseChangeRecord} 622 * objects with the <code>ChangeType</code> set to <i>resync</i>. 623 * <p> 624 * This method should not return until all the entries specified by the input 625 * file have been added to the output queue. Separate threads will 626 * concurrently drain entries from the queue and process them. The queue 627 * should not actually contain full entries, but rather DatabaseChangeRecord 628 * objects which identify the full database entries. These objects are then 629 * individually passed in to 630 * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore, 631 * it is important to make sure that the DatabaseChangeRecord instances 632 * contain enough identifiable information (e.g. primary keys) for each entry 633 * so that the entry can be found again. 634 * <p> 635 * The lifecycle of resync is similar to that of real-time sync, with a few 636 * differences: 637 * <ol> 638 * <li>Stream out a list of all IDs in the database (using the given input 639 * file)</li> 640 * <li>Fetch full source entry for an ID</li> 641 * <li>Perform any mappings and compute the equivalent destination entry</li> 642 * <li>Fetch full destination entry</li> 643 * <li>Diff the computed destination entry and actual destination entry</li> 644 * <li>Apply the minimal set of changes at the destination to bring it in sync 645 * </li> 646 * </ol> 647 * If the total set of entries is very large, it is fine to split up the work 648 * into multiple database queries within this method. The queue will not grow 649 * out of control because it blocks when it becomes full. The queue capacity 650 * is fixed at 1000. 651 * <p> 652 * A {@link TransactionContext} is provided, which allows 653 * controlled access to the target database. The context will contain a fresh 654 * fresh connection (i.e. a new transaction), and the Synchronization Server 655 * will always commit or rollback the transaction automatically, depending on 656 * whether this method returns normally or throws an exception. Implementers 657 * may optionally perform their own transaction management within this method 658 * if necessary. 659 * 660 * @param ctx 661 * a TransactionContext which provides a valid JDBC connection to the 662 * database. 663 * @param inputLines 664 * an Iterator containing the lines from the specified input file to 665 * resync (this is specified on the CLI for the resync command). 666 * These lines can be any format, for example a set of primary keys, 667 * a set of WHERE clauses, a set of full SQL queries, etc. 668 * @param outputQueue 669 * a queue of DatabaseChangeRecord objects which will be individually 670 * fetched via {@link #fetchEntry(TransactionContext, SyncOperation)} 671 * @throws SQLException 672 * if there is an error retrieving the list of entries to resync 673 */ 674 @Deprecated() 675 public void listAllEntries(final TransactionContext ctx, 676 final Iterator<String> inputLines, 677 final BlockingQueue<DatabaseChangeRecord> 678 outputQueue) throws SQLException 679 { 680 throw new UnsupportedOperationException(); 681 } 682 683 684 /** 685 * Gets a list of all the entries in the database from a given file input. 686 * This is used by the 'resync' command line tool. The default implementation 687 * throws a {@link UnsupportedOperationException}; subclasses should override 688 * if the resync functionality is needed for specific database records, which 689 * can be specified in the input file. 690 * <p> 691 * The format for the <code>inputLines</code> (e.g. the content of the file) 692 * is user-defined; it may be key/value pairs, primary keys, or full SQL 693 * statements, for example. The use of this method is triggered via the 694 * <i>--sourceInputFile</i> argument on the resync CLI. The 695 * <code>outputQueue</code> should contain {@link DatabaseChangeRecord} 696 * objects with the <code>ChangeType</code> set to <i>resync</i>. 697 * <p> 698 * This method should not return until all the entries specified by the input 699 * file have been added to the output queue. Separate threads will 700 * concurrently drain entries from the queue and process them. The queue 701 * should not actually contain full entries, but rather DatabaseChangeRecord 702 * objects which identify the full database entries. These objects are then 703 * individually passed in to 704 * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore, 705 * it is important to make sure that the DatabaseChangeRecord instances 706 * contain enough identifiable information (e.g. primary keys) for each entry 707 * so that the entry can be found again. 708 * <p> 709 * The lifecycle of resync is similar to that of real-time sync, with a few 710 * differences: 711 * <ol> 712 * <li>Stream out a list of all IDs in the database (using the given input 713 * file)</li> 714 * <li>Fetch full source entry for an ID</li> 715 * <li>Perform any mappings and compute the equivalent destination entry</li> 716 * <li>Fetch full destination entry</li> 717 * <li>Diff the computed destination entry and actual destination entry</li> 718 * <li>Apply the minimal set of changes at the destination to bring it in sync 719 * </li> 720 * </ol> 721 * If the total set of entries is very large, it is fine to split up the work 722 * into multiple database queries within this method. The queue will not grow 723 * out of control because it blocks when it becomes full. The queue capacity 724 * is fixed at 1000. 725 * <p> 726 * A {@link TransactionContext} is provided, which allows 727 * controlled access to the target database. The context will contain a fresh 728 * fresh connection (i.e. a new transaction), and the Synchronization Server 729 * will always commit or rollback the transaction automatically, depending on 730 * whether this method returns normally or throws an exception. Implementers 731 * may optionally perform their own transaction management within this method 732 * if necessary. 733 * 734 * @param ctx 735 * a TransactionContext which provides a valid JDBC connection to the 736 * database. 737 * @param entryType 738 * the type of database entry to be fetched (this is specified 739 * on the CLI for the resync command) 740 * @param inputLines 741 * an Iterator containing the lines from the specified input file to 742 * resync (this is specified on the CLI for the resync command). 743 * These lines can be any format, for example a set of primary keys, 744 * a set of WHERE clauses, a set of full SQL queries, etc. 745 * @param outputQueue 746 * a queue of DatabaseChangeRecord objects which will be individually 747 * fetched via {@link #fetchEntry(TransactionContext, SyncOperation)} 748 * @throws SQLException 749 * if there is an error retrieving the list of entries to resync 750 */ 751 public void listAllEntries(final TransactionContext ctx, 752 final String entryType, 753 final Iterator<String> inputLines, 754 final BlockingQueue<DatabaseChangeRecord> 755 outputQueue) throws SQLException 756 { 757 throw new UnsupportedOperationException(); 758 } 759 }