001 /* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * docs/licenses/cddl.txt 011 * or http://www.opensource.org/licenses/cddl1.php. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * docs/licenses/cddl.txt. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2010-2012 UnboundID Corp. 026 */ 027 package com.unboundid.directory.sdk.sync.scripting; 028 029 import java.io.Serializable; 030 import java.sql.SQLException; 031 import java.util.LinkedList; 032 import java.util.List; 033 import java.util.Iterator; 034 import java.util.concurrent.BlockingQueue; 035 import java.util.concurrent.atomic.AtomicLong; 036 037 import com.unboundid.ldap.sdk.Entry; 038 import com.unboundid.util.Extensible; 039 import com.unboundid.util.ThreadSafety; 040 import com.unboundid.util.ThreadSafetyLevel; 041 import com.unboundid.util.args.ArgumentException; 042 import com.unboundid.util.args.ArgumentParser; 043 import com.unboundid.directory.sdk.common.internal.Configurable; 044 import com.unboundid.directory.sdk.sync.config.JDBCSyncSourceConfig; 045 import com.unboundid.directory.sdk.sync.internal.SynchronizationServerExtension; 046 import com.unboundid.directory.sdk.sync.types.DatabaseChangeRecord; 047 import com.unboundid.directory.sdk.sync.types.SetStartpointOptions; 048 import com.unboundid.directory.sdk.sync.types.SyncOperation; 049 import com.unboundid.directory.sdk.sync.types.SyncServerContext; 050 import com.unboundid.directory.sdk.sync.types.TransactionContext; 051 052 /** 053 * This class defines an API that must be implemented by scripted extensions 054 * in order to synchronize data out of a relational database. Since the 055 * UnboundID Synchronization Server is LDAP-centric, 056 * this API allows you to take database content and convert it into LDAP 057 * entries which can then be processed by the Synchronization Server. The 058 * lifecycle of 059 * a sync operation is as follows: 060 * <ol> 061 * <li>Detect change at the synchronization source</li> 062 * <li>Fetch full source entry</li> 063 * <li>Perform any mappings and compute the equivalent destination entry</li> 064 * <li>Fetch full destination entry</li> 065 * <li>Diff the computed destination entry and actual (fetched) destination 066 * entry</li> 067 * <li>Apply the minimal set of changes at the destination to bring it in sync 068 * </li> 069 * </ol> 070 * This implies that the 071 * {@link #fetchEntry(TransactionContext, SyncOperation)} method will be 072 * called once for every change that is returned by 073 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)}. 074 * <p> 075 * In several places a {@link TransactionContext} is provided, which allows 076 * controlled access to the target database. By default, methods in this class 077 * are always provided with a fresh connection (i.e. a new transaction), and the 078 * Synchronization Server will always commit or rollback the transaction 079 * automatically, depending on whether the method returned normally or threw an 080 * exception. Implementers may optionally perform their own transaction 081 * management within these methods if necessary. 082 * <p> 083 * Several of these methods throw {@link SQLException}, which should be used in 084 * the case of any database access error. For other types of errors, runtime 085 * exceptions may be used (IllegalStateException, NullPointerException, etc.). 086 * The Synchronization Server will automatically retry operations that fail, up 087 * to a configurable amount of attempts. The exception to this rule is if a 088 * SQLException is thrown with a SQL state string beginning with "08"; this 089 * indicates a connection error, and in this case the operation is retried 090 * indefinitely. 091 * <BR> 092 * <H2>Configuring Groovy-Scripted JDBC Sync Sources</H2> 093 * In order to configure a scripted JDBC sync source based on this API and 094 * written in the Groovy scripting language, use a command like: 095 * <PRE> 096 * dsconfig create-sync-source \ 097 * --source-name "<I>{source-name}</I>" \ 098 * --type groovy-scripted-jdbc \ 099 * --set "server:{server-name}" \ 100 * --set "script-class:<I>{class-name}</I>" \ 101 * --set "script-argument:<I>{name=value}</I>" 102 * </PRE> 103 * where "<I>{source-name}</I>" is the name to use for the JDBC sync source 104 * instance, "<I>{server-name}</I>" is the name of the JDBC external server that 105 * will be used as the sync source, "<I>{class-name}</I>" is the fully-qualified 106 * name of the Groovy class written using this API, and "<I>{name=value}</I>" 107 * represents name-value pairs for any arguments to provide to the JDBC sync 108 * source. If multiple arguments should be provided to the JDBC sync source, 109 * then the "<CODE>--set script-argument:<I>{name=value}</I></CODE>" option 110 * should be provided multiple times. 111 */ 112 @Extensible() 113 @SynchronizationServerExtension(appliesToLocalContent=false, 114 appliesToSynchronizedContent=true) 115 public abstract class ScriptedJDBCSyncSource implements Configurable 116 { 117 /** 118 * {@inheritDoc} 119 */ 120 public void defineConfigArguments(final ArgumentParser parser) 121 throws ArgumentException 122 { 123 // No arguments will be allowed by default. 124 } 125 126 /** 127 * This hook is called when a Sync Pipe first starts up, when the 128 * <i>resync</i> process first starts up, or when the set-startpoint 129 * subcommand is called from the <i>realtime-sync</i> command line tool. 130 * Any initialization of this sync source should be performed here. This 131 * method should generally store the {@link SyncServerContext} in a class 132 * member so that it can be used elsewhere in the implementation. 133 * <p> 134 * A {@link TransactionContext} is provided, which allows 135 * controlled access to the target database. The context will contain a fresh 136 * fresh connection (i.e. a new transaction), and the Synchronization Server 137 * will always commit or rollback the transaction automatically, depending on 138 * whether this method returns normally or throws an exception. Implementers 139 * may optionally perform their own transaction management within this method 140 * if necessary. 141 * <p> 142 * The default implementation is empty. 143 * 144 * @param ctx 145 * a TransactionContext which provides a valid JDBC connection to the 146 * database. 147 * @param serverContext A handle to the server context for the server in 148 * which this extension is running. 149 * @param config The general configuration for this sync source. 150 * @param parser The argument parser which has been initialized from 151 * the configuration for this JDBC sync source. 152 */ 153 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 154 public void initializeJDBCSyncSource(final TransactionContext ctx, 155 final SyncServerContext serverContext, 156 final JDBCSyncSourceConfig config, 157 final ArgumentParser parser) 158 { 159 // No initialization will be performed by default. 160 } 161 162 /** 163 * This hook is called when a Sync Pipe shuts down, when the <i>resync</i> 164 * process shuts down, or when the set-startpoint subcommand (from the 165 * <i>realtime-sync</i> command line tool) is finished. Any clean up of this 166 * sync source should be performed here. 167 * <p> 168 * A {@link TransactionContext} is provided, which allows 169 * controlled access to the target database. The context will contain a fresh 170 * fresh connection (i.e. a new transaction), and the Synchronization Server 171 * will always commit or rollback the transaction automatically, depending on 172 * whether this method returns normally or throws an exception. Implementers 173 * may optionally perform their own transaction management within this method 174 * if necessary. 175 * <p> 176 * The default implementation is empty. 177 * 178 * @param ctx 179 * a TransactionContext which provides a valid JDBC connection to the 180 * database. 181 */ 182 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 183 public void finalizeJDBCSyncSource(final TransactionContext ctx) 184 { 185 //no implementation required 186 } 187 188 /** 189 * This method should effectively set the starting point for synchronization 190 * to the place specified by the <code>options</code> parameter. This should 191 * cause all changes previous to the specified start point to be disregarded 192 * and only changes after that point to be returned by 193 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)}. 194 * <p> 195 * There are several different startpoint types (see 196 * {@link SetStartpointOptions}), and this implementation is not required to 197 * support them all. If the specified startpoint type is unsupported, this 198 * method should throw an {@link IllegalArgumentException}. 199 * <p> 200 * <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type 201 * must be supported by your implementation, because this is used when a Sync 202 * Pipe first starts up. 203 * <p> 204 * This method can be called from two different contexts: 205 * <ul> 206 * <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used 207 * (the Sync Pipe is required to be stopped in this context)</li> 208 * <li>Immediately after a Sync Pipe starts up and a connection is first 209 * established to the source server (e.g. before the first call to 210 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)})</li> 211 * </ul> 212 * <p> 213 * A {@link TransactionContext} is provided, which allows 214 * controlled access to the target database. The context will contain a fresh 215 * fresh connection (i.e. a new transaction), and the Synchronization Server 216 * will always commit or rollback the transaction automatically, depending on 217 * whether this method returns normally or throws an exception. Implementers 218 * may optionally perform their own transaction management within this method 219 * if necessary. 220 * 221 * @param ctx 222 * a TransactionContext which provides a valid JDBC connection to the 223 * database. 224 * @param options 225 * an object which indicates where exactly to start synchronizing 226 * (e.g. 227 * the end of the changelog, specific change number, a certain time 228 * ago, etc) 229 * @throws SQLException 230 * if there is any error while setting the start point 231 */ 232 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 233 public abstract void setStartpoint(final TransactionContext ctx, 234 final SetStartpointOptions options) 235 throws SQLException; 236 237 /** 238 * Gets the current value of the startpoint for change detection. This is the 239 * "bookmark" which indicates which changes have already been processed and 240 * which have not. In most cases, a change number is used to detect changes 241 * and is managed by the Synchronization Server, in which case this 242 * implementation needs only to return the latest acknowledged 243 * change number. In other cases, the return value may correspond to a 244 * different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server. 245 * In any case, this method should return the value that is updated by 246 * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}. 247 * <p> 248 * This method is called periodically and the return value is saved in the 249 * persistent state for the Sync Pipe that uses this script as its Sync 250 * Source. 251 * <p> 252 * <b>IMPORTANT</b>: The internal value for the startpoint should only be 253 * updated after a sync operation is acknowledged back to this script (via 254 * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}). 255 * Otherwise it will be possible for changes to be missed when the 256 * Synchronization Server is restarted or a connection error occurs. 257 * @return a value to store in the persistent state for the Sync Pipe. This is 258 * usually 259 * a change number, but if a changelog table is not used to detect 260 * changes, 261 * this value should represent some other token to pass into 262 * {@link #setStartpoint(TransactionContext, SetStartpointOptions)} 263 * when the sync pipe starts up. 264 */ 265 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 266 public abstract Serializable getStartpoint(); 267 268 /** 269 * Return the next batch of change records from the database. Change records 270 * are just hints that a change happened; they do not include the actual data 271 * of the change. In an effort to never synchronize stale data, the 272 * Synchronization Server will go back and fetch the full source entry for 273 * each change record. 274 * <p> 275 * On the first invocation, this should return changes starting from the 276 * startpoint that was set by 277 * {@link #setStartpoint(TransactionContext, SetStartpointOptions)}. This 278 * method is responsible for updating the internal state such that subsequent 279 * invocations do not return duplicate changes. 280 * <p> 281 * The resulting list should be limited by <code>maxChanges</code>. The 282 * <code>numStillPending</code> reference should be set to the estimated 283 * number of changes that haven't yet been retrieved from the changelog table 284 * when this method returns, or zero if all the current changes have been 285 * retrieved. 286 * <p> 287 * <b>IMPORTANT</b>: While this method needs to keep track of which changes 288 * have already been returned so that it does not return them again, it should 289 * <b>NOT</b> modify the official startpoint. The internal value for the 290 * startpoint should only be updated after a sync operation is acknowledged 291 * back to this script (via 292 * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}). 293 * Otherwise it will be possible for changes to be missed when the 294 * Synchronization Server is restarted or a connection error occurs. The 295 * startpoint should not change as a result of this method. 296 * <p> 297 * A {@link TransactionContext} is provided, which allows 298 * controlled access to the target database. The context will contain a fresh 299 * fresh connection (i.e. a new transaction), and the Synchronization Server 300 * will always commit or rollback the transaction automatically, depending on 301 * whether this method returns normally or throws an exception. Implementers 302 * may optionally perform their own transaction management within this method 303 * if necessary. 304 * <p> 305 * This method <b>does not need to be thread-safe</b>. It will be invoked 306 * repeatedly by a single thread, based on the polling interval set in the 307 * Sync Pipe configuration. 308 * @param ctx 309 * a TransactionContext which provides a valid JDBC connection to the 310 * database. 311 * @param maxChanges 312 * the maximum number of changes to retrieve 313 * @param numStillPending 314 * this should be set to the number of unretrieved changes that 315 * are still pending after this batch has been retrieved. This will 316 * be passed in 317 * as zero, and may be left that way if the actual value cannot be 318 * determined. 319 * @return a list of {@link DatabaseChangeRecord} instances, each 320 * corresponding 321 * to a row in the changelog table (or the equivalent if some other 322 * change 323 * tracking mechanism is being used). If there are no new changes to 324 * return, this 325 * method should return an empty list. 326 * @throws SQLException 327 * if there is any error while retrieving the next batch of changes 328 */ 329 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 330 public abstract List<DatabaseChangeRecord> getNextBatchOfChanges( 331 final TransactionContext ctx, 332 final int maxChanges, 333 final AtomicLong numStillPending) 334 throws SQLException; 335 336 /** 337 * Return a full source entry (in LDAP form) from the database, corresponding 338 * to the {@link DatabaseChangeRecord} that is passed in through the 339 * {@link SyncOperation}. This method should perform any queries necessary to 340 * gather the latest values for all the attributes to be synchronized. 341 * <p> 342 * A {@link TransactionContext} is provided, which allows 343 * controlled access to the target database. The context will contain a fresh 344 * fresh connection (i.e. a new transaction), and the Synchronization Server 345 * will always commit or rollback the transaction automatically, depending on 346 * whether this method returns normally or throws an exception. Implementers 347 * may optionally perform their own transaction management within this method 348 * if necessary. 349 * <p> 350 * This method <b>must be thread safe</b>, as it will be called repeatedly and 351 * concurrently by each of the Sync Pipe worker threads as they process 352 * entries. 353 * @param ctx 354 * a TransactionContext which provides a valid JDBC connection to the 355 * database. 356 * @param operation 357 * the SyncOperation which identifies the database "entry" to 358 * fetch. The DatabaseChangeRecord can be obtained by calling 359 * <code>operation.getDatabaseChangeRecord()</code>. 360 * This is returned by 361 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)} 362 * or by 363 * {@link #listAllEntries(TransactionContext, String, BlockingQueue)} 364 * . 365 * @return a full LDAP Entry, or null if no such entry exists. 366 * @throws SQLException 367 * if there is an error fetching the entry 368 */ 369 @ThreadSafety(level = ThreadSafetyLevel.METHOD_THREADSAFE) 370 public abstract Entry fetchEntry(final TransactionContext ctx, 371 final SyncOperation operation) 372 throws SQLException; 373 374 /** 375 * Provides a way for the Synchronization Server to acknowledge back to the 376 * script which sync operations it has processed. This method should update 377 * the official startpoint which was set by 378 * {@link #setStartpoint(TransactionContext, SetStartpointOptions)} and is 379 * returned by {@link #getStartpoint()}. 380 * <p> 381 * <b>IMPORTANT</b>: The internal value for the startpoint should only be 382 * updated after a sync operation is acknowledged back to this script (via 383 * this method). Otherwise it will be possible for changes to be missed when 384 * the Synchronization Server is restarted or a connection error occurs. 385 * <p> 386 * A {@link TransactionContext} is provided in case the acknowledgment needs 387 * to make it all the way back to the database itself (for example if you were 388 * using Oracle's Change Data Capture). The context will contain a fresh 389 * fresh connection (i.e. a new transaction), and the Synchronization Server 390 * will always commit or rollback the transaction automatically, depending on 391 * whether this method returns normally or throws an exception. Implementers 392 * may optionally perform their own transaction management within this method 393 * if necessary. 394 * 395 * @param ctx 396 * a TransactionContext which provides a valid JDBC connection to the 397 * database. 398 * @param completedOps 399 * a list of {@link SyncOperation}s that have finished processing. 400 * The records are listed in the order they were 401 * first detected. 402 * @throws SQLException 403 * if there is an error acknowledging the changes back to the 404 * database 405 */ 406 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 407 public abstract void acknowledgeCompletedOps( 408 final TransactionContext ctx, 409 final LinkedList<SyncOperation> completedOps) 410 throws SQLException; 411 412 /** 413 * Performs a cleanup of the changelog table (if desired). There is a 414 * background thread that periodically invokes this method. It should remove 415 * any rows in the changelog table that are more than 416 * <code>maxAgeMillis</code> milliseconds old. 417 * <p> 418 * <b>NOTE:</b> If the system clock on the database server is not in sync with 419 * the system clock on the Synchronization Server, this method should query 420 * the database for its current time in order to determine the cut-off point 421 * for deleting changelog records. 422 * <p> 423 * A {@link TransactionContext} is provided, which allows 424 * controlled access to the target database. The context will contain a fresh 425 * fresh connection (i.e. a new transaction), and the Synchronization Server 426 * will always commit or rollback the transaction automatically, depending on 427 * whether this method returns normally or throws an exception. Implementers 428 * may optionally perform their own transaction management within this method 429 * if necessary. 430 * <p> 431 * If a separate mechanism will be used to manage the changelog table, this 432 * method may be implemented as a no-op and always return zero. This is how 433 * the default implementation behaves. 434 * @param ctx 435 * a TransactionContext which provides a valid JDBC connection to the 436 * database. 437 * @param maxAgeMillis 438 * the period of time (in milliseconds) after which a changelog table 439 * record should be deleted 440 * @return the number of rows that were deleted from the changelog table 441 * @throws SQLException 442 * if there is an error purging records from the changelog table 443 */ 444 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 445 public int cleanupChangelog(final TransactionContext ctx, 446 final long maxAgeMillis) 447 throws SQLException 448 { 449 //no implementation provided by default; this is an opt-in feature. 450 return 0; 451 } 452 453 /** 454 * Gets a list of all the entries in the database for a given entry type. This 455 * is used by the 'resync' command line tool. The default implementation 456 * throws a {@link UnsupportedOperationException}; subclasses should override 457 * if the resync functionality is needed. 458 * <p> 459 * The <code>entryType</code> is user-defined; it will be 460 * passed in on the command line for resync. The <code>outputQueue</code> 461 * should contain {@link DatabaseChangeRecord} objects with the 462 * <code>ChangeType</code> set to <i>resync</i>. 463 * <p> 464 * This method should not return until all the entries of the given entryType 465 * have been added to the output queue. Separate threads will concurrently 466 * drain entries from the queue and process them. The queue should not 467 * actually contain full entries, but rather DatabaseChangeRecord objects 468 * which identify the full database entries. These objects are then 469 * individually passed in to 470 * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore, 471 * it is important to make sure that the DatabaseChangeRecord instances 472 * contain enough identifiable information (e.g. primary keys) for each entry 473 * so that the entry can be found again. 474 * <p> 475 * The lifecycle of resync is similar to that of real-time sync, with a few 476 * differences: 477 * <ol> 478 * <li>Stream out a list of all IDs in the database (for a given entryType) 479 * </li> 480 * <li>Fetch full source entry for an ID</li> 481 * <li>Perform any mappings and compute the equivalent destination entry</li> 482 * <li>Fetch full destination entry</li> 483 * <li>Diff the computed destination entry and actual destination entry</li> 484 * <li>Apply the minimal set of changes at the destination to bring it in sync 485 * </li> 486 * </ol> 487 * If the total set of entries is very large, it is fine to split up the work 488 * into multiple database queries within this method. The queue will not grow 489 * out of control because it blocks when it becomes full. The queue capacity 490 * is fixed at 1000. 491 * <p> 492 * A {@link TransactionContext} is provided, which allows 493 * controlled access to the target database. The context will contain a fresh 494 * fresh connection (i.e. a new transaction), and the Synchronization Server 495 * will always commit or rollback the transaction automatically, depending on 496 * whether this method returns normally or throws an exception. Implementers 497 * may optionally perform their own transaction management within this method 498 * if necessary. 499 * 500 * @param ctx 501 * a TransactionContext which provides a valid JDBC connection to the 502 * database. 503 * @param entryType 504 * the type of database entry to be fetched (this is specified 505 * on the CLI for the resync command) 506 * @param outputQueue 507 * a queue of DatabaseChangeRecord objects which will be individually 508 * fetched via 509 * {@link #fetchEntry(TransactionContext, SyncOperation)} 510 * @throws SQLException 511 * if there is an error retrieving the list of entries to resync 512 */ 513 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 514 public void listAllEntries(final TransactionContext ctx, 515 final String entryType, 516 final BlockingQueue<DatabaseChangeRecord> 517 outputQueue) throws SQLException 518 { 519 throw new UnsupportedOperationException(); 520 } 521 522 /** 523 * Note: This method is deprecated and may be removed in a future release. 524 * All new and existing code should be changed to use the version of this 525 * method which includes the <code>entryType</code> parameter. 526 * <p> 527 * Gets a list of all the entries in the database from a given file input. 528 * This is used by the 'resync' command line tool. The default implementation 529 * throws a {@link UnsupportedOperationException}; subclasses should override 530 * if the resync functionality is needed for specific database records, which 531 * can be specified in the input file. 532 * <p> 533 * The format for the <code>inputLines</code> (e.g. the content of the file) 534 * is user-defined; it may be key/value pairs, primary keys, or full SQL 535 * statements, for example. The use of this method is triggered via the 536 * <i>--sourceInputFile</i> argument on the resync CLI. The 537 * <code>outputQueue</code> should contain {@link DatabaseChangeRecord} 538 * objects with the <code>ChangeType</code> set to <i>resync</i>. 539 * <p> 540 * This method should not return until all the entries specified by the input 541 * file have been added to the output queue. Separate threads will 542 * concurrently drain entries from the queue and process them. The queue 543 * should not actually contain full entries, but rather DatabaseChangeRecord 544 * objects which identify the full database entries. These objects are then 545 * individually passed in to 546 * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore, 547 * it is important to make sure that the DatabaseChangeRecord instances 548 * contain enough identifiable information (e.g. primary keys) for each entry 549 * so that the entry can be found again. 550 * <p> 551 * The lifecycle of resync is similar to that of real-time sync, with a few 552 * differences: 553 * <ol> 554 * <li>Stream out a list of all IDs in the database (using the given input 555 * file)</li> 556 * <li>Fetch full source entry for an ID</li> 557 * <li>Perform any mappings and compute the equivalent destination entry</li> 558 * <li>Fetch full destination entry</li> 559 * <li>Diff the computed destination entry and actual destination entry</li> 560 * <li>Apply the minimal set of changes at the destination to bring it in sync 561 * </li> 562 * </ol> 563 * If the total set of entries is very large, it is fine to split up the work 564 * into multiple database queries within this method. The queue will not grow 565 * out of control because it blocks when it becomes full. The queue capacity 566 * is fixed at 1000. 567 * <p> 568 * A {@link TransactionContext} is provided, which allows 569 * controlled access to the target database. The context will contain a fresh 570 * fresh connection (i.e. a new transaction), and the Synchronization Server 571 * will always commit or rollback the transaction automatically, depending on 572 * whether this method returns normally or throws an exception. Implementers 573 * may optionally perform their own transaction management within this method 574 * if necessary. 575 * 576 * @param ctx 577 * a TransactionContext which provides a valid JDBC connection to the 578 * database. 579 * @param inputLines 580 * an Iterator containing the lines from the specified input file to 581 * resync (this is specified on the CLI for the resync command). 582 * These lines can be any format, for example a set of primary keys, 583 * a set of WHERE clauses, a set of full SQL queries, etc. 584 * @param outputQueue 585 * a queue of DatabaseChangeRecord objects which will be individually 586 * fetched via {@link #fetchEntry(TransactionContext, SyncOperation)} 587 * @throws SQLException 588 * if there is an error retrieving the list of entries to resync 589 */ 590 @Deprecated 591 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 592 public void listAllEntries(final TransactionContext ctx, 593 final Iterator<String> inputLines, 594 final BlockingQueue<DatabaseChangeRecord> 595 outputQueue) throws SQLException 596 { 597 throw new UnsupportedOperationException(); 598 } 599 600 /** 601 * Gets a list of all the entries in the database from a given file input. 602 * This is used by the 'resync' command line tool. The default implementation 603 * throws a {@link UnsupportedOperationException}; subclasses should override 604 * if the resync functionality is needed for specific database records, which 605 * can be specified in the input file. 606 * <p> 607 * The format for the <code>inputLines</code> (e.g. the content of the file) 608 * is user-defined; it may be key/value pairs, primary keys, or full SQL 609 * statements, for example. The use of this method is triggered via the 610 * <i>--sourceInputFile</i> argument on the resync CLI. The 611 * <code>outputQueue</code> should contain {@link DatabaseChangeRecord} 612 * objects with the <code>ChangeType</code> set to <i>resync</i>. 613 * <p> 614 * This method should not return until all the entries specified by the input 615 * file have been added to the output queue. Separate threads will 616 * concurrently drain entries from the queue and process them. The queue 617 * should not actually contain full entries, but rather DatabaseChangeRecord 618 * objects which identify the full database entries. These objects are then 619 * individually passed in to 620 * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore, 621 * it is important to make sure that the DatabaseChangeRecord instances 622 * contain enough identifiable information (e.g. primary keys) for each entry 623 * so that the entry can be found again. 624 * <p> 625 * The lifecycle of resync is similar to that of real-time sync, with a few 626 * differences: 627 * <ol> 628 * <li>Stream out a list of all IDs in the database (using the given input 629 * file)</li> 630 * <li>Fetch full source entry for an ID</li> 631 * <li>Perform any mappings and compute the equivalent destination entry</li> 632 * <li>Fetch full destination entry</li> 633 * <li>Diff the computed destination entry and actual destination entry</li> 634 * <li>Apply the minimal set of changes at the destination to bring it in sync 635 * </li> 636 * </ol> 637 * If the total set of entries is very large, it is fine to split up the work 638 * into multiple database queries within this method. The queue will not grow 639 * out of control because it blocks when it becomes full. The queue capacity 640 * is fixed at 1000. 641 * <p> 642 * A {@link TransactionContext} is provided, which allows 643 * controlled access to the target database. The context will contain a fresh 644 * fresh connection (i.e. a new transaction), and the Synchronization Server 645 * will always commit or rollback the transaction automatically, depending on 646 * whether this method returns normally or throws an exception. Implementers 647 * may optionally perform their own transaction management within this method 648 * if necessary. 649 * 650 * @param ctx 651 * a TransactionContext which provides a valid JDBC connection to the 652 * database. 653 * @param entryType 654 * the type of database entry to be fetched (this is specified 655 * on the CLI for the resync command) 656 * @param inputLines 657 * an Iterator containing the lines from the specified input file to 658 * resync (this is specified on the CLI for the resync command). 659 * These lines can be any format, for example a set of primary keys, 660 * a set of WHERE clauses, a set of full SQL queries, etc. 661 * @param outputQueue 662 * a queue of DatabaseChangeRecord objects which will be individually 663 * fetched via {@link #fetchEntry(TransactionContext, SyncOperation)} 664 * @throws SQLException 665 * if there is an error retrieving the list of entries to resync 666 */ 667 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 668 public void listAllEntries(final TransactionContext ctx, 669 final String entryType, 670 final Iterator<String> inputLines, 671 final BlockingQueue<DatabaseChangeRecord> 672 outputQueue) throws SQLException 673 { 674 throw new UnsupportedOperationException(); 675 } 676 }