001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * docs/licenses/cddl.txt 011 * or http://www.opensource.org/licenses/cddl1.php. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * docs/licenses/cddl.txt. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2010-2012 UnboundID Corp. 026 */ 027 package com.unboundid.directory.sdk.sync.api; 028 029 030 031 import java.io.Serializable; 032 import java.sql.SQLException; 033 import java.util.Collections; 034 import java.util.Iterator; 035 import java.util.LinkedList; 036 import java.util.List; 037 import java.util.Map; 038 import java.util.concurrent.BlockingQueue; 039 import java.util.concurrent.atomic.AtomicLong; 040 041 import com.unboundid.directory.sdk.common.internal.Configurable; 042 import com.unboundid.directory.sdk.common.internal.ExampleUsageProvider; 043 import com.unboundid.directory.sdk.common.internal.UnboundIDExtension; 044 import com.unboundid.directory.sdk.sync.config.JDBCSyncSourceConfig; 045 import com.unboundid.directory.sdk.sync.internal.SynchronizationServerExtension; 046 import com.unboundid.directory.sdk.sync.types.DatabaseChangeRecord; 047 import com.unboundid.directory.sdk.sync.types.SetStartpointOptions; 048 import com.unboundid.directory.sdk.sync.types.SyncOperation; 049 import com.unboundid.directory.sdk.sync.types.SyncServerContext; 050 import com.unboundid.directory.sdk.sync.types.TransactionContext; 051 import com.unboundid.ldap.sdk.Entry; 052 import com.unboundid.util.Extensible; 053 import com.unboundid.util.ThreadSafety; 054 import com.unboundid.util.ThreadSafetyLevel; 055 import com.unboundid.util.args.ArgumentException; 056 import com.unboundid.util.args.ArgumentParser; 057 058 059 060 /** 061 * This class defines an API that must be implemented by extensions 062 * in order to synchronize data out of a relational database. Since the 063 * UnboundID Synchronization Server is LDAP-centric, 064 * this API allows you to take database content and convert it into LDAP 065 * entries which can then be processed by the Synchronization Server. The 066 * lifecycle of 067 * a sync operation is as follows: 068 * <ol> 069 * <li>Detect change at the synchronization source</li> 070 * <li>Fetch full source entry</li> 071 * <li>Perform any mappings and compute the equivalent destination entry</li> 072 * <li>Fetch full destination entry</li> 073 * <li>Diff the computed destination entry and actual (fetched) destination 074 * entry</li> 075 * <li>Apply the minimal set of changes at the destination to bring it in sync 076 * </li> 077 * </ol> 078 * This implies that the 079 * {@link #fetchEntry(TransactionContext, SyncOperation)} method will be 080 * called once for every change that is returned by 081 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)}. 082 * <p> 083 * In several places a {@link TransactionContext} is provided, which allows 084 * controlled access to the target database. By default, methods in this class 085 * are always provided with a fresh connection (i.e. a new transaction), and the 086 * Synchronization Server will always commit or rollback the transaction 087 * automatically, depending on whether the method returned normally or threw an 088 * exception. Implementers may optionally perform their own transaction 089 * management within these methods if necessary. 090 * <p> 091 * Several of these methods throw {@link SQLException}, which should be used in 092 * the case of any database access error. For other types of errors, runtime 093 * exceptions may be used (IllegalStateException, NullPointerException, etc.). 094 * The Synchronization Server will automatically retry operations that fail, up 095 * to a configurable amount of attempts. The exception to this rule is if a 096 * SQLException is thrown with a SQL state string beginning with "08"; this 097 * indicates a connection error, and in this case the operation is retried 098 * indefinitely. 099 * <BR> 100 * <H2>Configuring JDBC Sync Sources</H2> 101 * In order to configure a JDBC sync source based on this API and 102 * written in Java, use a command like: 103 * <PRE> 104 * dsconfig create-sync-source \ 105 * --source-name "<I>{source-name}</I>" \ 106 * --type third-party-jdbc \ 107 * --set "server:{server-name}" \ 108 * --set "extension-class:<I>{class-name}</I>" \ 109 * --set "extension-argument:<I>{name=value}</I>" 110 * </PRE> 111 * where "<I>{source-name}</I>" is the name to use for the JDBC sync source 112 * instance, "<I>{server-name}</I>" is the name of the JDBC external server that 113 * will be used as the sync source, "<I>{class-name}</I>" is the fully-qualified 114 * name of the Java class written using this API, and "<I>{name=value}</I>" 115 * represents name-value pairs for any arguments to provide to the JDBC sync 116 * source. If multiple arguments should be provided to the JDBC sync source, 117 * then the "<CODE>--set extension-argument:<I>{name=value}</I></CODE>" option 118 * should be provided multiple times. 119 */ 120 @Extensible() 121 @SynchronizationServerExtension(appliesToLocalContent=false, 122 appliesToSynchronizedContent=true) 123 public abstract class JDBCSyncSource 124 implements UnboundIDExtension, 125 Configurable, 126 ExampleUsageProvider 127 { 128 /** 129 * Creates a new instance of this JDBC Sync Source. All sync 130 * source implementations must include a default constructor, but any 131 * initialization should generally be done in the 132 * {@link #initializeJDBCSyncSource} method. 133 */ 134 public JDBCSyncSource() 135 { 136 // No implementation is required. 137 } 138 139 140 141 /** 142 * {@inheritDoc} 143 */ 144 public abstract String getExtensionName(); 145 146 147 148 /** 149 * {@inheritDoc} 150 */ 151 public abstract String[] getExtensionDescription(); 152 153 154 155 /** 156 * {@inheritDoc} 157 */ 158 public Map<List<String>,String> getExamplesArgumentSets() 159 { 160 return Collections.emptyMap(); 161 } 162 163 164 165 /** 166 * {@inheritDoc} 167 */ 168 public void defineConfigArguments(final ArgumentParser parser) 169 throws ArgumentException 170 { 171 // No arguments will be allowed by default. 172 } 173 174 175 176 /** 177 * This hook is called when a Sync Pipe first starts up, when the 178 * <i>resync</i> process first starts up, or when the set-startpoint 179 * subcommand is called from the <i>realtime-sync</i> command line tool. 180 * Any initialization of this sync source should be performed here. This 181 * method should generally store the {@link SyncServerContext} in a class 182 * member so that it can be used elsewhere in the implementation. 183 * <p> 184 * A {@link TransactionContext} is provided, which allows 185 * controlled access to the target database. The context will contain a fresh 186 * fresh connection (i.e. a new transaction), and the Synchronization Server 187 * will always commit or rollback the transaction automatically, depending on 188 * whether this method returns normally or throws an exception. Implementers 189 * may optionally perform their own transaction management within this method 190 * if necessary. 191 * <p> 192 * The default implementation is empty. 193 * 194 * @param ctx 195 * a TransactionContext which provides a valid JDBC connection to the 196 * database. 197 * @param serverContext A handle to the server context for the server in 198 * which this extension is running. 199 * @param config The general configuration for this sync source. 200 * @param parser The argument parser which has been initialized from 201 * the configuration for this JDBC sync source. 202 */ 203 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 204 public void initializeJDBCSyncSource(final TransactionContext ctx, 205 final SyncServerContext serverContext, 206 final JDBCSyncSourceConfig config, 207 final ArgumentParser parser) 208 { 209 // No initialization will be performed by default. 210 } 211 212 213 214 /** 215 * This hook is called when a Sync Pipe shuts down, when the <i>resync</i> 216 * process shuts down, or when the set-startpoint subcommand (from the 217 * <i>realtime-sync</i> command line tool) is finished. Any clean up of this 218 * sync source should be performed here. 219 * <p> 220 * A {@link TransactionContext} is provided, which allows 221 * controlled access to the target database. The context will contain a fresh 222 * fresh connection (i.e. a new transaction), and the Synchronization Server 223 * will always commit or rollback the transaction automatically, depending on 224 * whether this method returns normally or throws an exception. Implementers 225 * may optionally perform their own transaction management within this method 226 * if necessary. 227 * <p> 228 * The default implementation is empty. 229 * 230 * @param ctx 231 * a TransactionContext which provides a valid JDBC connection to the 232 * database. 233 */ 234 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 235 public void finalizeJDBCSyncSource(final TransactionContext ctx) 236 { 237 //No implementation required by default. 238 } 239 240 241 242 /** 243 * This method should effectively set the starting point for synchronization 244 * to the place specified by the <code>options</code> parameter. This should 245 * cause all changes previous to the specified start point to be disregarded 246 * and only changes after that point to be returned by 247 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)}. 248 * <p> 249 * There are several different startpoint types (see 250 * {@link SetStartpointOptions}), and this implementation is not required to 251 * support them all. If the specified startpoint type is unsupported, this 252 * method should throw an {@link IllegalArgumentException}. 253 * <p> 254 * <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type 255 * must be supported by your implementation, because this is used when a Sync 256 * Pipe first starts up. 257 * <p> 258 * This method can be called from two different contexts: 259 * <ul> 260 * <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used 261 * (the Sync Pipe is required to be stopped in this context)</li> 262 * <li>Immediately after a Sync Pipe starts up and a connection is first 263 * established to the source server (e.g. before the first call to 264 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)})</li> 265 * </ul> 266 * <p> 267 * A {@link TransactionContext} is provided, which allows 268 * controlled access to the target database. The context will contain a fresh 269 * fresh connection (i.e. a new transaction), and the Synchronization Server 270 * will always commit or rollback the transaction automatically, depending on 271 * whether this method returns normally or throws an exception. Implementers 272 * may optionally perform their own transaction management within this method 273 * if necessary. 274 * 275 * @param ctx 276 * a TransactionContext which provides a valid JDBC connection to the 277 * database. 278 * @param options 279 * an object which indicates where exactly to start synchronizing 280 * (e.g. 281 * the end of the changelog, specific change number, a certain time 282 * ago, etc) 283 * @throws SQLException 284 * if there is any error while setting the start point 285 */ 286 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 287 public abstract void setStartpoint(final TransactionContext ctx, 288 final SetStartpointOptions options) 289 throws SQLException; 290 291 292 293 /** 294 * Gets the current value of the startpoint for change detection. This is the 295 * "bookmark" which indicates which changes have already been processed and 296 * which have not. In most cases, a change number is used to detect changes 297 * and is managed by the Synchronization Server, in which case this 298 * implementation needs only to return the latest acknowledged 299 * change number. In other cases, the return value may correspond to a 300 * different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server. 301 * In any case, this method should return the value that is updated by 302 * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}. 303 * <p> 304 * This method is called periodically and the return value is saved in the 305 * persistent state for the Sync Pipe that uses this script as its Sync 306 * Source. 307 * <p> 308 * <b>IMPORTANT</b>: The internal value for the startpoint should only be 309 * updated after a sync operation is acknowledged back to this script (via 310 * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}). 311 * Otherwise it will be possible for changes to be missed when the 312 * Synchronization Server is restarted or a connection error occurs. 313 * @return a value to store in the persistent state for the Sync Pipe. This is 314 * usually 315 * a change number, but if a changelog table is not used to detect 316 * changes, 317 * this value should represent some other token to pass into 318 * {@link #setStartpoint(TransactionContext, SetStartpointOptions)} 319 * when the sync pipe starts up. 320 */ 321 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 322 public abstract Serializable getStartpoint(); 323 324 325 326 /** 327 * Return the next batch of change records from the database. Change records 328 * are just hints that a change happened; they do not include the actual data 329 * of the change. In an effort to never synchronize stale data, the 330 * Synchronization Server will go back and fetch the full source entry for 331 * each change record. 332 * <p> 333 * On the first invocation, this should return changes starting from the 334 * startpoint that was set by 335 * {@link #setStartpoint(TransactionContext, SetStartpointOptions)}. This 336 * method is responsible for updating the internal state such that subsequent 337 * invocations do not return duplicate changes. 338 * <p> 339 * The resulting list should be limited by <code>maxChanges</code>. The 340 * <code>numStillPending</code> reference should be set to the estimated 341 * number of changes that haven't yet been retrieved from the changelog table 342 * when this method returns, or zero if all the current changes have been 343 * retrieved. 344 * <p> 345 * <b>IMPORTANT</b>: While this method needs to keep track of which changes 346 * have already been returned so that it does not return them again, it should 347 * <b>NOT</b> modify the official startpoint. The internal value for the 348 * startpoint should only be updated after a sync operation is acknowledged 349 * back to this script (via 350 * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}). 351 * Otherwise it will be possible for changes to be missed when the 352 * Synchronization Server is restarted or a connection error occurs. The 353 * startpoint should not change as a result of this method. 354 * <p> 355 * A {@link TransactionContext} is provided, which allows 356 * controlled access to the target database. The context will contain a fresh 357 * fresh connection (i.e. a new transaction), and the Synchronization Server 358 * will always commit or rollback the transaction automatically, depending on 359 * whether this method returns normally or throws an exception. Implementers 360 * may optionally perform their own transaction management within this method 361 * if necessary. 362 * <p> 363 * This method <b>does not need to be thread-safe</b>. It will be invoked 364 * repeatedly by a single thread, based on the polling interval set in the 365 * Sync Pipe configuration. 366 * @param ctx 367 * a TransactionContext which provides a valid JDBC connection to the 368 * database. 369 * @param maxChanges 370 * the maximum number of changes to retrieve 371 * @param numStillPending 372 * this should be set to the number of unretrieved changes that 373 * are still pending after this batch has been retrieved. This will 374 * be passed in 375 * as zero, and may be left that way if the actual value cannot be 376 * determined. 377 * @return a list of {@link DatabaseChangeRecord} instances, each 378 * corresponding 379 * to a row in the changelog table (or the equivalent if some other 380 * change 381 * tracking mechanism is being used). If there are no new changes to 382 * return, this 383 * method should return an empty list. 384 * @throws SQLException 385 * if there is any error while retrieving the next batch of changes 386 */ 387 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 388 public abstract List<DatabaseChangeRecord> getNextBatchOfChanges( 389 final TransactionContext ctx, 390 final int maxChanges, 391 final AtomicLong numStillPending) 392 throws SQLException; 393 394 395 396 /** 397 * Return a full source entry (in LDAP form) from the database, corresponding 398 * to the {@link DatabaseChangeRecord} that is passed in through the 399 * {@link SyncOperation}. This method should perform any queries necessary to 400 * gather the latest values for all the attributes to be synchronized. 401 * <p> 402 * A {@link TransactionContext} is provided, which allows 403 * controlled access to the target database. The context will contain a fresh 404 * fresh connection (i.e. a new transaction), and the Synchronization Server 405 * will always commit or rollback the transaction automatically, depending on 406 * whether this method returns normally or throws an exception. Implementers 407 * may optionally perform their own transaction management within this method 408 * if necessary. 409 * <p> 410 * This method <b>must be thread safe</b>, as it will be called repeatedly and 411 * concurrently by each of the Sync Pipe worker threads as they process 412 * entries. 413 * @param ctx 414 * a TransactionContext which provides a valid JDBC connection to the 415 * database. 416 * @param operation 417 * the SyncOperation which identifies the database "entry" to 418 * fetch. The DatabaseChangeRecord can be obtained by calling 419 * <code>operation.getDatabaseChangeRecord()</code>. 420 * This is returned by 421 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)} 422 * or by 423 * {@link #listAllEntries(TransactionContext, String, BlockingQueue)} 424 * . 425 * @return a full LDAP Entry, or null if no such entry exists. 426 * @throws SQLException 427 * if there is an error fetching the entry 428 */ 429 @ThreadSafety(level = ThreadSafetyLevel.METHOD_THREADSAFE) 430 public abstract Entry fetchEntry(final TransactionContext ctx, 431 final SyncOperation operation) 432 throws SQLException; 433 434 435 436 /** 437 * Provides a way for the Synchronization Server to acknowledge back to the 438 * script which sync operations it has processed. This method should update 439 * the official startpoint which was set by 440 * {@link #setStartpoint(TransactionContext, SetStartpointOptions)} and is 441 * returned by {@link #getStartpoint()}. 442 * <p> 443 * <b>IMPORTANT</b>: The internal value for the startpoint should only be 444 * updated after a sync operation is acknowledged back to this script (via 445 * this method). Otherwise it will be possible for changes to be missed when 446 * the Synchronization Server is restarted or a connection error occurs. 447 * <p> 448 * A {@link TransactionContext} is provided in case the acknowledgment needs 449 * to make it all the way back to the database itself (for example if you were 450 * using Oracle's Change Data Capture). The context will contain a fresh 451 * fresh connection (i.e. a new transaction), and the Synchronization Server 452 * will always commit or rollback the transaction automatically, depending on 453 * whether this method returns normally or throws an exception. Implementers 454 * may optionally perform their own transaction management within this method 455 * if necessary. 456 * 457 * @param ctx 458 * a TransactionContext which provides a valid JDBC connection to the 459 * database. 460 * @param completedOps 461 * a list of {@link SyncOperation}s that have finished processing. 462 * The records are listed in the order they were first detected. 463 * @throws SQLException 464 * if there is an error acknowledging the changes back to the 465 * database 466 */ 467 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 468 public abstract void acknowledgeCompletedOps( 469 final TransactionContext ctx, 470 final LinkedList<SyncOperation> completedOps) 471 throws SQLException; 472 473 474 475 /** 476 * Performs a cleanup of the changelog table (if desired). There is a 477 * background thread that periodically invokes this method. It should remove 478 * any rows in the changelog table that are more than 479 * <code>maxAgeMillis</code> milliseconds old. 480 * <p> 481 * <b>NOTE:</b> If the system clock on the database server is not in sync with 482 * the system clock on the Synchronization Server, this method should query 483 * the database for its current time in order to determine the cut-off point 484 * for deleting changelog records. 485 * <p> 486 * A {@link TransactionContext} is provided, which allows 487 * controlled access to the target database. The context will contain a fresh 488 * fresh connection (i.e. a new transaction), and the Synchronization Server 489 * will always commit or rollback the transaction automatically, depending on 490 * whether this method returns normally or throws an exception. Implementers 491 * may optionally perform their own transaction management within this method 492 * if necessary. 493 * <p> 494 * If a separate mechanism will be used to manage the changelog table, this 495 * method may be implemented as a no-op and always return zero. This is how 496 * the default implementation behaves. 497 * @param ctx 498 * a TransactionContext which provides a valid JDBC connection to the 499 * database. 500 * @param maxAgeMillis 501 * the period of time (in milliseconds) after which a changelog table 502 * record should be deleted 503 * @return the number of rows that were deleted from the changelog table 504 * @throws SQLException 505 * if there is an error purging records from the changelog table 506 */ 507 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 508 public int cleanupChangelog(final TransactionContext ctx, 509 final long maxAgeMillis) 510 throws SQLException 511 { 512 //no implementation provided by default; this is an opt-in feature. 513 return 0; 514 } 515 516 517 518 /** 519 * Gets a list of all the entries in the database for a given entry type. This 520 * is used by the 'resync' command line tool. The default implementation 521 * throws a {@link UnsupportedOperationException}; subclasses should override 522 * if the resync functionality is needed. 523 * <p> 524 * The <code>entryType</code> is user-defined; it will be 525 * passed in on the command line for resync. The <code>outputQueue</code> 526 * should contain {@link DatabaseChangeRecord} objects with the 527 * <code>ChangeType</code> set to <i>resync</i>. 528 * <p> 529 * This method should not return until all the entries of the given entryType 530 * have been added to the output queue. Separate threads will concurrently 531 * drain entries from the queue and process them. The queue should not 532 * actually contain full entries, but rather DatabaseChangeRecord objects 533 * which identify the full database entries. These objects are then 534 * individually passed in to 535 * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore, 536 * it is important to make sure that the DatabaseChangeRecord instances 537 * contain enough identifiable information (e.g. primary keys) for each entry 538 * so that the entry can be found again. 539 * <p> 540 * The lifecycle of resync is similar to that of real-time sync, with a few 541 * differences: 542 * <ol> 543 * <li>Stream out a list of all IDs in the database (for a given entryType) 544 * </li> 545 * <li>Fetch full source entry for an ID</li> 546 * <li>Perform any mappings and compute the equivalent destination entry</li> 547 * <li>Fetch full destination entry</li> 548 * <li>Diff the computed destination entry and actual destination entry</li> 549 * <li>Apply the minimal set of changes at the destination to bring it in sync 550 * </li> 551 * </ol> 552 * If the total set of entries is very large, it is fine to split up the work 553 * into multiple database queries within this method. The queue will not grow 554 * out of control because it blocks when it becomes full. The queue capacity 555 * is fixed at 1000. 556 * <p> 557 * A {@link TransactionContext} is provided, which allows 558 * controlled access to the target database. The context will contain a fresh 559 * fresh connection (i.e. a new transaction), and the Synchronization Server 560 * will always commit or rollback the transaction automatically, depending on 561 * whether this method returns normally or throws an exception. Implementers 562 * may optionally perform their own transaction management within this method 563 * if necessary. 564 * 565 * @param ctx 566 * a TransactionContext which provides a valid JDBC connection to the 567 * database. 568 * @param entryType 569 * the type of database entry to be fetched (this is specified 570 * on the CLI for the resync command) 571 * @param outputQueue 572 * a queue of DatabaseChangeRecord objects which will be individually 573 * fetched via {@link #fetchEntry(TransactionContext, SyncOperation)} 574 * @throws SQLException 575 * if there is an error retrieving the list of entries to resync 576 */ 577 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 578 public void listAllEntries(final TransactionContext ctx, 579 final String entryType, 580 final BlockingQueue<DatabaseChangeRecord> 581 outputQueue) throws SQLException 582 { 583 throw new UnsupportedOperationException(); 584 } 585 586 587 588 /** 589 * Note: This method is deprecated and may be removed in a future release. 590 * All new and existing code should be changed to use the version of this 591 * method which includes the <code>entryType</code> parameter. 592 * <p> 593 * Gets a list of all the entries in the database from a given file input. 594 * This is used by the 'resync' command line tool. The default implementation 595 * throws a {@link UnsupportedOperationException}; subclasses should override 596 * if the resync functionality is needed for specific database records, which 597 * can be specified in the input file. 598 * <p> 599 * The format for the <code>inputLines</code> (e.g. the content of the file) 600 * is user-defined; it may be key/value pairs, primary keys, or full SQL 601 * statements, for example. The use of this method is triggered via the 602 * <i>--sourceInputFile</i> argument on the resync CLI. The 603 * <code>outputQueue</code> should contain {@link DatabaseChangeRecord} 604 * objects with the <code>ChangeType</code> set to <i>resync</i>. 605 * <p> 606 * This method should not return until all the entries specified by the input 607 * file have been added to the output queue. Separate threads will 608 * concurrently drain entries from the queue and process them. The queue 609 * should not actually contain full entries, but rather DatabaseChangeRecord 610 * objects which identify the full database entries. These objects are then 611 * individually passed in to 612 * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore, 613 * it is important to make sure that the DatabaseChangeRecord instances 614 * contain enough identifiable information (e.g. primary keys) for each entry 615 * so that the entry can be found again. 616 * <p> 617 * The lifecycle of resync is similar to that of real-time sync, with a few 618 * differences: 619 * <ol> 620 * <li>Stream out a list of all IDs in the database (using the given input 621 * file)</li> 622 * <li>Fetch full source entry for an ID</li> 623 * <li>Perform any mappings and compute the equivalent destination entry</li> 624 * <li>Fetch full destination entry</li> 625 * <li>Diff the computed destination entry and actual destination entry</li> 626 * <li>Apply the minimal set of changes at the destination to bring it in sync 627 * </li> 628 * </ol> 629 * If the total set of entries is very large, it is fine to split up the work 630 * into multiple database queries within this method. The queue will not grow 631 * out of control because it blocks when it becomes full. The queue capacity 632 * is fixed at 1000. 633 * <p> 634 * A {@link TransactionContext} is provided, which allows 635 * controlled access to the target database. The context will contain a fresh 636 * fresh connection (i.e. a new transaction), and the Synchronization Server 637 * will always commit or rollback the transaction automatically, depending on 638 * whether this method returns normally or throws an exception. Implementers 639 * may optionally perform their own transaction management within this method 640 * if necessary. 641 * 642 * @param ctx 643 * a TransactionContext which provides a valid JDBC connection to the 644 * database. 645 * @param inputLines 646 * an Iterator containing the lines from the specified input file to 647 * resync (this is specified on the CLI for the resync command). 648 * These lines can be any format, for example a set of primary keys, 649 * a set of WHERE clauses, a set of full SQL queries, etc. 650 * @param outputQueue 651 * a queue of DatabaseChangeRecord objects which will be individually 652 * fetched via {@link #fetchEntry(TransactionContext, SyncOperation)} 653 * @throws SQLException 654 * if there is an error retrieving the list of entries to resync 655 */ 656 @Deprecated() 657 public void listAllEntries(final TransactionContext ctx, 658 final Iterator<String> inputLines, 659 final BlockingQueue<DatabaseChangeRecord> 660 outputQueue) throws SQLException 661 { 662 throw new UnsupportedOperationException(); 663 } 664 665 666 /** 667 * Gets a list of all the entries in the database from a given file input. 668 * This is used by the 'resync' command line tool. The default implementation 669 * throws a {@link UnsupportedOperationException}; subclasses should override 670 * if the resync functionality is needed for specific database records, which 671 * can be specified in the input file. 672 * <p> 673 * The format for the <code>inputLines</code> (e.g. the content of the file) 674 * is user-defined; it may be key/value pairs, primary keys, or full SQL 675 * statements, for example. The use of this method is triggered via the 676 * <i>--sourceInputFile</i> argument on the resync CLI. The 677 * <code>outputQueue</code> should contain {@link DatabaseChangeRecord} 678 * objects with the <code>ChangeType</code> set to <i>resync</i>. 679 * <p> 680 * This method should not return until all the entries specified by the input 681 * file have been added to the output queue. Separate threads will 682 * concurrently drain entries from the queue and process them. The queue 683 * should not actually contain full entries, but rather DatabaseChangeRecord 684 * objects which identify the full database entries. These objects are then 685 * individually passed in to 686 * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore, 687 * it is important to make sure that the DatabaseChangeRecord instances 688 * contain enough identifiable information (e.g. primary keys) for each entry 689 * so that the entry can be found again. 690 * <p> 691 * The lifecycle of resync is similar to that of real-time sync, with a few 692 * differences: 693 * <ol> 694 * <li>Stream out a list of all IDs in the database (using the given input 695 * file)</li> 696 * <li>Fetch full source entry for an ID</li> 697 * <li>Perform any mappings and compute the equivalent destination entry</li> 698 * <li>Fetch full destination entry</li> 699 * <li>Diff the computed destination entry and actual destination entry</li> 700 * <li>Apply the minimal set of changes at the destination to bring it in sync 701 * </li> 702 * </ol> 703 * If the total set of entries is very large, it is fine to split up the work 704 * into multiple database queries within this method. The queue will not grow 705 * out of control because it blocks when it becomes full. The queue capacity 706 * is fixed at 1000. 707 * <p> 708 * A {@link TransactionContext} is provided, which allows 709 * controlled access to the target database. The context will contain a fresh 710 * fresh connection (i.e. a new transaction), and the Synchronization Server 711 * will always commit or rollback the transaction automatically, depending on 712 * whether this method returns normally or throws an exception. Implementers 713 * may optionally perform their own transaction management within this method 714 * if necessary. 715 * 716 * @param ctx 717 * a TransactionContext which provides a valid JDBC connection to the 718 * database. 719 * @param entryType 720 * the type of database entry to be fetched (this is specified 721 * on the CLI for the resync command) 722 * @param inputLines 723 * an Iterator containing the lines from the specified input file to 724 * resync (this is specified on the CLI for the resync command). 725 * These lines can be any format, for example a set of primary keys, 726 * a set of WHERE clauses, a set of full SQL queries, etc. 727 * @param outputQueue 728 * a queue of DatabaseChangeRecord objects which will be individually 729 * fetched via {@link #fetchEntry(TransactionContext, SyncOperation)} 730 * @throws SQLException 731 * if there is an error retrieving the list of entries to resync 732 */ 733 public void listAllEntries(final TransactionContext ctx, 734 final String entryType, 735 final Iterator<String> inputLines, 736 final BlockingQueue<DatabaseChangeRecord> 737 outputQueue) throws SQLException 738 { 739 throw new UnsupportedOperationException(); 740 } 741 }