001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * docs/licenses/cddl.txt 011 * or http://www.opensource.org/licenses/cddl1.php. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * docs/licenses/cddl.txt. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Portions Copyright 2010-2023 Ping Identity Corporation 026 */ 027package com.unboundid.directory.sdk.sync.api; 028 029 030 031import java.io.Serializable; 032import java.sql.SQLException; 033import java.util.Collections; 034import java.util.Iterator; 035import java.util.LinkedList; 036import java.util.List; 037import java.util.Map; 038import java.util.concurrent.BlockingQueue; 039import java.util.concurrent.atomic.AtomicLong; 040 041import com.unboundid.directory.sdk.common.internal.Configurable; 042import com.unboundid.directory.sdk.common.internal.ExampleUsageProvider; 043import com.unboundid.directory.sdk.common.internal.UnboundIDExtension; 044import com.unboundid.directory.sdk.sync.config.JDBCSyncSourceConfig; 045import com.unboundid.directory.sdk.sync.internal.SynchronizationServerExtension; 046import com.unboundid.directory.sdk.sync.types.DatabaseChangeRecord; 047import com.unboundid.directory.sdk.sync.types.SetStartpointOptions; 048import com.unboundid.directory.sdk.sync.types.SyncOperation; 049import com.unboundid.directory.sdk.sync.types.SyncServerContext; 050import com.unboundid.directory.sdk.sync.types.TransactionContext; 051import com.unboundid.ldap.sdk.Entry; 052import com.unboundid.util.Extensible; 053import com.unboundid.util.ThreadSafety; 054import com.unboundid.util.ThreadSafetyLevel; 055import com.unboundid.util.args.ArgumentException; 056import com.unboundid.util.args.ArgumentParser; 057 058 059 060/** 061 * This class defines an API that must be implemented by extensions 062 * in order to synchronize data out of a relational database. Since the 063 * Ping Identity Data Sync Server is LDAP-centric, 064 * this API allows you to take database content and convert it into LDAP 065 * entries which can then be processed by the Data Sync Server. The 066 * lifecycle of 067 * a sync operation is as follows: 068 * <ol> 069 * <li>Detect change at the synchronization source</li> 070 * <li>Fetch full source entry</li> 071 * <li>Perform any mappings and compute the equivalent destination entry</li> 072 * <li>Fetch full destination entry</li> 073 * <li>Diff the computed destination entry and actual (fetched) destination 074 * entry</li> 075 * <li>Apply the minimal set of changes at the destination to bring it in sync 076 * </li> 077 * </ol> 078 * This implies that the 079 * {@link #fetchEntry(TransactionContext, SyncOperation)} method will be 080 * called once for every change that is returned by 081 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)}. 082 * <p> 083 * During realtime synchronization (i.e. when a Sync Pipe is running), there is 084 * a sliding window of changes being processed, and this API provides a 085 * distinction between some different points along that window: 086 * <ul> 087 * <li><b>Old changes</b>: These are changes that the Sync Server has 088 * processed and acknowledged back to the Sync Source. The Sync Source is 089 * under no obligation to re-detect these changes.</li> 090 * <li><b>Startpoint</b>: This marks where the Sync Source will start 091 * detecting changes if it is restarted.</li> 092 * <li><b>Detected but unacknowledged</b>: These changes have been returned by 093 * <code>getNextBatchOfChanges()</code> but not completely processed and 094 * acknowledged back to the Sync Source.</li> 095 * <li><b>Undetected changes</b>: The next call to 096 * <code>getNextBatchOfChanges()</code> should return the first changes 097 * that have not been detected. This should be somewhere at or ahead of 098 * the startpoint.</li> 099 * </ul> 100 * <p> 101 * In several places a {@link TransactionContext} is provided, which allows 102 * controlled access to the target database. By default, methods in this class 103 * are always provided with a fresh connection (i.e. a new transaction), and the 104 * Data Sync Server will always commit or rollback the transaction 105 * automatically, depending on how the method returns. If a method call returns 106 * successfully, then the transaction will be committed. If an exception is 107 * thrown, then the transaction will be rolled back. In rare situations, it 108 * might be necessary for an implementation to perform its own commit or 109 * rollback of transactions by calling methods on {@link TransactionContext}. 110 * <p> 111 * Several of these methods throw {@link SQLException}, which should be used in 112 * the case of any database access error. For other types of errors, runtime 113 * exceptions may be used (IllegalStateException, NullPointerException, etc.). 114 * The Data Sync Server will automatically retry operations that fail, 115 * up to a configurable amount of attempts. The exception to this rule is if a 116 * SQLException is thrown with a SQL state string beginning with "08"; this 117 * indicates a connection error, and in this case the operation is retried 118 * indefinitely. For these reasons implementers should refrain from handling 119 * or wrapping any {@link SQLException} and instead let it be handled by the 120 * calling code. 121 * <BR> 122 * <H2>Configuring JDBC Sync Sources</H2> 123 * In order to configure a JDBC sync source based on this API and 124 * written in Java, use a command like: 125 * <PRE> 126 * dsconfig create-sync-source \ 127 * --source-name "<I>{source-name}</I>" \ 128 * --type third-party-jdbc \ 129 * --set "server:{server-name}" \ 130 * --set "extension-class:<I>{class-name}</I>" \ 131 * --set "extension-argument:<I>{name=value}</I>" 132 * </PRE> 133 * where "<I>{source-name}</I>" is the name to use for the JDBC sync source 134 * instance, "<I>{server-name}</I>" is the name of the JDBC external server that 135 * will be used as the sync source, "<I>{class-name}</I>" is the fully-qualified 136 * name of the Java class written using this API, and "<I>{name=value}</I>" 137 * represents name-value pairs for any arguments to provide to the JDBC sync 138 * source. If multiple arguments should be provided to the JDBC sync source, 139 * then the "<CODE>--set extension-argument:<I>{name=value}</I></CODE>" option 140 * should be provided multiple times. 141 */ 142@Extensible() 143@SynchronizationServerExtension(appliesToLocalContent=false, 144 appliesToSynchronizedContent=true) 145public abstract class JDBCSyncSource 146 implements UnboundIDExtension, 147 Configurable, 148 ExampleUsageProvider 149{ 150 /** 151 * Creates a new instance of this JDBC Sync Source. All sync 152 * source implementations must include a default constructor, but any 153 * initialization should generally be done in the 154 * {@link #initializeJDBCSyncSource} method. 155 */ 156 public JDBCSyncSource() 157 { 158 // No implementation is required. 159 } 160 161 162 163 /** 164 * {@inheritDoc} 165 */ 166 public abstract String getExtensionName(); 167 168 169 170 /** 171 * {@inheritDoc} 172 */ 173 public abstract String[] getExtensionDescription(); 174 175 176 177 /** 178 * {@inheritDoc} 179 */ 180 public Map<List<String>,String> getExamplesArgumentSets() 181 { 182 return Collections.emptyMap(); 183 } 184 185 186 187 /** 188 * {@inheritDoc} 189 */ 190 public void defineConfigArguments(final ArgumentParser parser) 191 throws ArgumentException 192 { 193 // No arguments will be allowed by default. 194 } 195 196 197 198 /** 199 * This hook is called when a Sync Pipe first starts up, when the 200 * <i>resync</i> process first starts up, or when the set-startpoint 201 * subcommand is called from the <i>realtime-sync</i> command line tool. 202 * Any initialization of this sync source should be performed here. This 203 * method should generally store the {@link SyncServerContext} in a class 204 * member so that it can be used elsewhere in the implementation. 205 * <p> 206 * A {@link TransactionContext} is provided, which allows 207 * controlled access to the target database. The context will contain a fresh 208 * fresh connection (i.e. a new transaction), and the Data Sync Server 209 * will always commit or rollback the transaction automatically, depending on 210 * whether this method returns normally or throws an exception. See the class 211 * level documentation for warnings and additional details. 212 * <p> 213 * The default implementation is empty. 214 * 215 * @param ctx 216 * a TransactionContext which provides a valid JDBC connection to the 217 * database. 218 * @param serverContext A handle to the server context for the server in 219 * which this extension is running. 220 * @param config The general configuration for this sync source. 221 * @param parser The argument parser which has been initialized from 222 * the configuration for this JDBC sync source. 223 */ 224 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 225 public void initializeJDBCSyncSource(final TransactionContext ctx, 226 final SyncServerContext serverContext, 227 final JDBCSyncSourceConfig config, 228 final ArgumentParser parser) 229 { 230 // No initialization will be performed by default. 231 } 232 233 234 235 /** 236 * This hook is called when a Sync Pipe shuts down, when the <i>resync</i> 237 * process shuts down, or when the set-startpoint subcommand (from the 238 * <i>realtime-sync</i> command line tool) is finished. Any clean up of this 239 * sync source should be performed here. 240 * <p> 241 * A {@link TransactionContext} is provided, which allows 242 * controlled access to the target database. The context will contain a fresh 243 * fresh connection (i.e. a new transaction), and the Data Sync Server 244 * will always commit or rollback the transaction automatically, depending on 245 * whether this method returns normally or throws an exception. See the class 246 * level documentation for warnings and additional details. 247 * <p> 248 * The default implementation is empty. 249 * 250 * @param ctx 251 * a TransactionContext which provides a valid JDBC connection to the 252 * database. 253 */ 254 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 255 public void finalizeJDBCSyncSource(final TransactionContext ctx) 256 { 257 //No implementation required by default. 258 } 259 260 261 262 /** 263 * This method should effectively set the starting point for synchronization 264 * to the place specified by the <code>options</code> parameter. This should 265 * cause all changes previous to the specified start point to be disregarded 266 * and only changes after that point to be returned by 267 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)}. 268 * <p> 269 * There are several different startpoint types (see 270 * {@link SetStartpointOptions}), and this implementation is not required to 271 * support them all. If the specified startpoint type is unsupported, this 272 * method should throw an {@link UnsupportedOperationException}. 273 * <p> 274 * <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type 275 * must be supported by your implementation, because this is used when a Sync 276 * Pipe first starts up. 277 * <p> 278 * This method can be called from two different contexts: 279 * <ul> 280 * <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used 281 * (the Sync Pipe is required to be stopped in this context)</li> 282 * <li>Immediately after a Sync Pipe starts up and a connection is first 283 * established to the source server (e.g. before the first call to 284 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)})</li> 285 * </ul> 286 * <p> 287 * A {@link TransactionContext} is provided, which allows 288 * controlled access to the target database. The context will contain a fresh 289 * fresh connection (i.e. a new transaction), and the Data Sync Server 290 * will always commit or rollback the transaction automatically, depending on 291 * whether this method returns normally or throws an exception. See the class 292 * level documentation for warnings and additional details. 293 * 294 * @param ctx 295 * a TransactionContext which provides a valid JDBC connection to the 296 * database. 297 * @param options 298 * an object which indicates where exactly to start synchronizing 299 * (e.g. 300 * the end of the changelog, specific change number, a certain time 301 * ago, etc) 302 * @throws SQLException 303 * if there is any error while setting the start point 304 */ 305 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 306 public abstract void setStartpoint(final TransactionContext ctx, 307 final SetStartpointOptions options) 308 throws SQLException; 309 310 311 312 /** 313 * Gets the current value of the startpoint for change detection. This is the 314 * "bookmark" which indicates which changes have already been processed and 315 * which have not. In most cases, a change number is used to detect changes 316 * and is managed by the Data Sync Server, in which case this 317 * implementation needs only to return the latest acknowledged 318 * change number. In other cases, the return value may correspond to a 319 * different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server. 320 * In any case, this method should return the value that is updated by 321 * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}. 322 * <p> 323 * This method is called periodically and the return value is saved in the 324 * persistent state for the Sync Pipe that uses this extension as its Sync 325 * Source. 326 * <p> 327 * <b>IMPORTANT</b>: The internal value for the startpoint should only be 328 * updated after a sync operation is acknowledged back to this extension (via 329 * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}). 330 * Otherwise it will be possible for changes to be missed when the 331 * Data Sync Server is restarted or a connection error occurs. 332 * @return a value to store in the persistent state for the Sync Pipe. This is 333 * usually 334 * a change number, but if a changelog table is not used to detect 335 * changes, 336 * this value should represent some other token to pass into 337 * {@link #setStartpoint(TransactionContext, SetStartpointOptions)} 338 * when the sync pipe starts up. 339 */ 340 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 341 public abstract Serializable getStartpoint(); 342 343 344 345 /** 346 * Return the next batch of change records from the database. Change records 347 * are just hints that a change happened; they do not include the actual data 348 * of the change. In an effort to never synchronize stale data, the 349 * Data Sync Server will go back and fetch the full source entry for 350 * each change record. 351 * <p> 352 * On the first invocation, this should return changes starting from the 353 * startpoint that was set by 354 * {@link #setStartpoint(TransactionContext, SetStartpointOptions)}. This 355 * method is responsible for updating the internal state such that subsequent 356 * invocations do not return duplicate changes. 357 * <p> 358 * The resulting list should be limited by <code>maxChanges</code>. The 359 * <code>numStillPending</code> reference should be set to the estimated 360 * number of changes that haven't yet been retrieved from the changelog table 361 * when this method returns, or zero if all the current changes have been 362 * retrieved. 363 * <p> 364 * <b>IMPORTANT</b>: While this method needs to keep track of which changes 365 * have already been returned so that it does not return them again, it should 366 * <b>NOT</b> modify the official startpoint. The internal value for the 367 * startpoint should only be updated after a sync operation is acknowledged 368 * back to this extension (via 369 * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}). 370 * Otherwise it will be possible for changes to be missed when the 371 * Data Sync Server is restarted or a connection error occurs. The 372 * startpoint should not change as a result of this method. 373 * <p> 374 * A {@link TransactionContext} is provided, which allows 375 * controlled access to the target database. The context will contain a fresh 376 * fresh connection (i.e. a new transaction), and the Data Sync Server 377 * will always commit or rollback the transaction automatically, depending on 378 * whether this method returns normally or throws an exception. See the class 379 * level documentation for warnings and additional details. 380 * <p> 381 * This method <b>does not need to be thread-safe</b>. It will be invoked 382 * repeatedly by a single thread, based on the polling interval set in the 383 * Sync Pipe configuration. 384 * @param ctx 385 * a TransactionContext which provides a valid JDBC connection to the 386 * database. 387 * @param maxChanges 388 * the maximum number of changes to retrieve 389 * @param numStillPending 390 * this should be set to the number of unretrieved changes that 391 * are still pending after this batch has been retrieved. This will 392 * be passed in 393 * as zero, and may be left that way if the actual value cannot be 394 * determined. 395 * @return a list of {@link DatabaseChangeRecord} instances, each 396 * corresponding 397 * to a row in the changelog table (or the equivalent if some other 398 * change 399 * tracking mechanism is being used). If there are no new changes to 400 * return, this 401 * method should return an empty list. 402 * @throws SQLException 403 * if there is any error while retrieving the next batch of changes 404 */ 405 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 406 public abstract List<DatabaseChangeRecord> getNextBatchOfChanges( 407 final TransactionContext ctx, 408 final int maxChanges, 409 final AtomicLong numStillPending) 410 throws SQLException; 411 412 413 414 /** 415 * Return a full source entry (in LDAP form) from the database, corresponding 416 * to the {@link DatabaseChangeRecord} that is passed in through the 417 * {@link SyncOperation}. This method should perform any queries necessary to 418 * gather the latest values for all the attributes to be synchronized. 419 * <p> 420 * A {@link TransactionContext} is provided, which allows 421 * controlled access to the target database. The context will contain a fresh 422 * fresh connection (i.e. a new transaction), and the Data Sync Server 423 * will always commit or rollback the transaction automatically, depending on 424 * whether this method returns normally or throws an exception. See the class 425 * level documentation for warnings and additional details. 426 * <p> 427 * This method <b>must be thread safe</b>, as it will be called repeatedly and 428 * concurrently by each of the Sync Pipe worker threads as they process 429 * entries. 430 * @param ctx 431 * a TransactionContext which provides a valid JDBC connection to the 432 * database. 433 * @param operation 434 * the SyncOperation which identifies the database "entry" to 435 * fetch. The DatabaseChangeRecord can be obtained by calling 436 * <code>operation.getDatabaseChangeRecord()</code>. 437 * This is returned by 438 * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)} 439 * or by 440 * {@link #listAllEntries(TransactionContext, String, BlockingQueue)} 441 * . 442 * @return a full LDAP Entry, or null if no such entry exists. 443 * @throws SQLException 444 * if there is an error fetching the entry 445 */ 446 @ThreadSafety(level = ThreadSafetyLevel.METHOD_THREADSAFE) 447 public abstract Entry fetchEntry(final TransactionContext ctx, 448 final SyncOperation operation) 449 throws SQLException; 450 451 452 453 /** 454 * Provides a way for the Data Sync Server to acknowledge back to the 455 * extension which sync operations it has processed. This method should update 456 * the official startpoint which was set by 457 * {@link #setStartpoint(TransactionContext, SetStartpointOptions)} and is 458 * returned by {@link #getStartpoint()}. 459 * <p> 460 * <b>IMPORTANT</b>: The internal value for the startpoint should only be 461 * updated after a sync operation is acknowledged back to this extension (via 462 * this method). Otherwise it will be possible for changes to be missed when 463 * the Data Sync Server is restarted or a connection error occurs. 464 * <p> 465 * A {@link TransactionContext} is provided in case the acknowledgment needs 466 * to make it all the way back to the database itself (for example if you were 467 * using Oracle's Change Data Capture). The context will contain a fresh 468 * fresh connection (i.e. a new transaction), and the Data Sync Server 469 * will always commit or rollback the transaction automatically, depending on 470 * whether this method returns normally or throws an exception. See the class 471 * level documentation for warnings and additional details. 472 * 473 * @param ctx 474 * a TransactionContext which provides a valid JDBC connection to the 475 * database. 476 * @param completedOps 477 * a list of {@link SyncOperation}s that have finished processing. 478 * The records are listed in the order they were first detected. 479 * @throws SQLException 480 * if there is an error acknowledging the changes back to the 481 * database 482 */ 483 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 484 public abstract void acknowledgeCompletedOps( 485 final TransactionContext ctx, 486 final LinkedList<SyncOperation> completedOps) 487 throws SQLException; 488 489 490 491 /** 492 * Performs a cleanup of the changelog table (if desired). There is a 493 * background thread that periodically invokes this method. It should remove 494 * any rows in the changelog table that are more than 495 * <code>maxAgeMillis</code> milliseconds old. 496 * <p> 497 * <b>NOTE:</b> If the system clock on the database server is not in sync with 498 * the system clock on the Data Sync Server, this method should query 499 * the database for its current time in order to determine the cut-off point 500 * for deleting changelog records. 501 * <p> 502 * A {@link TransactionContext} is provided, which allows 503 * controlled access to the target database. The context will contain a fresh 504 * fresh connection (i.e. a new transaction), and the Data Sync Server 505 * will always commit or rollback the transaction automatically, depending on 506 * whether this method returns normally or throws an exception. See the class 507 * level documentation for warnings and additional details. 508 * <p> 509 * If a separate mechanism will be used to manage the changelog table, this 510 * method may be implemented as a no-op and always return zero. This is how 511 * the default implementation behaves. 512 * @param ctx 513 * a TransactionContext which provides a valid JDBC connection to the 514 * database. 515 * @param maxAgeMillis 516 * the period of time (in milliseconds) after which a changelog table 517 * record should be deleted 518 * @return the number of rows that were deleted from the changelog table 519 * @throws SQLException 520 * if there is an error purging records from the changelog table 521 */ 522 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 523 public int cleanupChangelog(final TransactionContext ctx, 524 final long maxAgeMillis) 525 throws SQLException 526 { 527 //no implementation provided by default; this is an opt-in feature. 528 return 0; 529 } 530 531 532 533 /** 534 * Gets a list of all the entries in the database for a given entry type. This 535 * is used by the 'resync' command line tool. The default implementation 536 * throws a {@link UnsupportedOperationException}; subclasses should override 537 * if the resync functionality is needed. 538 * <p> 539 * The <code>entryType</code> is user-defined; it will be 540 * passed in on the command line for resync. The <code>outputQueue</code> 541 * should contain {@link DatabaseChangeRecord} objects with the 542 * <code>ChangeType</code> set to <i>resync</i>. 543 * <p> 544 * This method should not return until all the entries of the given entryType 545 * have been added to the output queue. Separate threads will concurrently 546 * drain entries from the queue and process them. The queue should not 547 * actually contain full entries, but rather DatabaseChangeRecord objects 548 * which identify the full database entries. These objects are then 549 * individually passed in to 550 * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore, 551 * it is important to make sure that the DatabaseChangeRecord instances 552 * contain enough identifiable information (e.g. primary keys) for each entry 553 * so that the entry can be found again. 554 * <p> 555 * The lifecycle of resync is similar to that of real-time sync, with a few 556 * differences: 557 * <ol> 558 * <li>Stream out a list of all IDs in the database (for a given entryType) 559 * </li> 560 * <li>Fetch full source entry for an ID</li> 561 * <li>Perform any mappings and compute the equivalent destination entry</li> 562 * <li>Fetch full destination entry</li> 563 * <li>Diff the computed destination entry and actual destination entry</li> 564 * <li>Apply the minimal set of changes at the destination to bring it in sync 565 * </li> 566 * </ol> 567 * If the total set of entries is very large, it is fine to split up the work 568 * into multiple database queries within this method. The queue will not grow 569 * out of control because it blocks when it becomes full. The queue capacity 570 * is fixed at 1000. 571 * <p> 572 * A {@link TransactionContext} is provided, which allows 573 * controlled access to the target database. The context will contain a fresh 574 * fresh connection (i.e. a new transaction), and the Data Sync Server 575 * will always commit or rollback the transaction automatically, depending on 576 * whether this method returns normally or throws an exception. See the class 577 * level documentation for warnings and additional details. 578 * 579 * @param ctx 580 * a TransactionContext which provides a valid JDBC connection to the 581 * database. 582 * @param entryType 583 * the type of database entry to be fetched (this is specified 584 * on the CLI for the resync command) 585 * @param outputQueue 586 * a queue of DatabaseChangeRecord objects which will be individually 587 * fetched via {@link #fetchEntry(TransactionContext, SyncOperation)} 588 * @throws SQLException 589 * if there is an error retrieving the list of entries to resync 590 */ 591 @ThreadSafety(level = ThreadSafetyLevel.METHOD_NOT_THREADSAFE) 592 public void listAllEntries(final TransactionContext ctx, 593 final String entryType, 594 final BlockingQueue<DatabaseChangeRecord> 595 outputQueue) throws SQLException 596 { 597 throw new UnsupportedOperationException( 598 "The listAllEntries(TransactionContext,String,BlockingQueue) " + 599 "method must be implemented in the '" + 600 getExtensionName() + "' extension (Java class " + 601 getClass().getName() + ")."); 602 } 603 604 605 606 /** 607 * Note: This method is deprecated and may be removed in a future release. 608 * All new and existing code should be changed to use the version of this 609 * method which includes the <code>entryType</code> parameter. 610 * <p> 611 * Gets a list of all the entries in the database from a given file input. 612 * This is used by the 'resync' command line tool. The default implementation 613 * throws a {@link UnsupportedOperationException}; subclasses should override 614 * if the resync functionality is needed for specific database records, which 615 * can be specified in the input file. 616 * <p> 617 * The format for the <code>inputLines</code> (e.g. the content of the file) 618 * is user-defined; it may be key/value pairs, primary keys, or full SQL 619 * statements, for example. The use of this method is triggered via the 620 * <i>--sourceInputFile</i> argument on the resync CLI. The 621 * <code>outputQueue</code> should contain {@link DatabaseChangeRecord} 622 * objects with the <code>ChangeType</code> set to <i>resync</i>. 623 * <p> 624 * This method should not return until all the entries specified by the input 625 * file have been added to the output queue. Separate threads will 626 * concurrently drain entries from the queue and process them. The queue 627 * should not actually contain full entries, but rather DatabaseChangeRecord 628 * objects which identify the full database entries. These objects are then 629 * individually passed in to 630 * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore, 631 * it is important to make sure that the DatabaseChangeRecord instances 632 * contain enough identifiable information (e.g. primary keys) for each entry 633 * so that the entry can be found again. 634 * <p> 635 * The lifecycle of resync is similar to that of real-time sync, with a few 636 * differences: 637 * <ol> 638 * <li>Stream out a list of all IDs in the database (using the given input 639 * file)</li> 640 * <li>Fetch full source entry for an ID</li> 641 * <li>Perform any mappings and compute the equivalent destination entry</li> 642 * <li>Fetch full destination entry</li> 643 * <li>Diff the computed destination entry and actual destination entry</li> 644 * <li>Apply the minimal set of changes at the destination to bring it in sync 645 * </li> 646 * </ol> 647 * If the total set of entries is very large, it is fine to split up the work 648 * into multiple database queries within this method. The queue will not grow 649 * out of control because it blocks when it becomes full. The queue capacity 650 * is fixed at 1000. 651 * <p> 652 * A {@link TransactionContext} is provided, which allows 653 * controlled access to the target database. The context will contain a fresh 654 * fresh connection (i.e. a new transaction), and the Data Sync Server 655 * will always commit or rollback the transaction automatically, depending on 656 * whether this method returns normally or throws an exception. See the class 657 * level documentation for warnings and additional details. 658 * 659 * @param ctx 660 * a TransactionContext which provides a valid JDBC connection to the 661 * database. 662 * @param inputLines 663 * an Iterator containing the lines from the specified input file to 664 * resync (this is specified on the CLI for the resync command). 665 * These lines can be any format, for example a set of primary keys, 666 * a set of WHERE clauses, a set of full SQL queries, etc. 667 * @param outputQueue 668 * a queue of DatabaseChangeRecord objects which will be individually 669 * fetched via {@link #fetchEntry(TransactionContext, SyncOperation)} 670 * @throws SQLException 671 * if there is an error retrieving the list of entries to resync 672 */ 673 @Deprecated() 674 public void listAllEntries(final TransactionContext ctx, 675 final Iterator<String> inputLines, 676 final BlockingQueue<DatabaseChangeRecord> 677 outputQueue) throws SQLException 678 { 679 throw new UnsupportedOperationException( 680 "The listAllEntries(TransactionContext,Iterator,BlockingQueue) " + 681 "method must be implemented in the '" + 682 getExtensionName() + "' extension (Java class " + 683 getClass().getName() + ")."); 684 } 685 686 687 /** 688 * Gets a list of all the entries in the database from a given file input. 689 * This is used by the 'resync' command line tool. The default implementation 690 * throws a {@link UnsupportedOperationException}; subclasses should override 691 * if the resync functionality is needed for specific database records, which 692 * can be specified in the input file. 693 * <p> 694 * The format for the <code>inputLines</code> (e.g. the content of the file) 695 * is user-defined; it may be key/value pairs, primary keys, or full SQL 696 * statements, for example. The use of this method is triggered via the 697 * <i>--sourceInputFile</i> argument on the resync CLI. The 698 * <code>outputQueue</code> should contain {@link DatabaseChangeRecord} 699 * objects with the <code>ChangeType</code> set to <i>resync</i>. 700 * <p> 701 * This method should not return until all the entries specified by the input 702 * file have been added to the output queue. Separate threads will 703 * concurrently drain entries from the queue and process them. The queue 704 * should not actually contain full entries, but rather DatabaseChangeRecord 705 * objects which identify the full database entries. These objects are then 706 * individually passed in to 707 * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore, 708 * it is important to make sure that the DatabaseChangeRecord instances 709 * contain enough identifiable information (e.g. primary keys) for each entry 710 * so that the entry can be found again. 711 * <p> 712 * The lifecycle of resync is similar to that of real-time sync, with a few 713 * differences: 714 * <ol> 715 * <li>Stream out a list of all IDs in the database (using the given input 716 * file)</li> 717 * <li>Fetch full source entry for an ID</li> 718 * <li>Perform any mappings and compute the equivalent destination entry</li> 719 * <li>Fetch full destination entry</li> 720 * <li>Diff the computed destination entry and actual destination entry</li> 721 * <li>Apply the minimal set of changes at the destination to bring it in sync 722 * </li> 723 * </ol> 724 * If the total set of entries is very large, it is fine to split up the work 725 * into multiple database queries within this method. The queue will not grow 726 * out of control because it blocks when it becomes full. The queue capacity 727 * is fixed at 1000. 728 * <p> 729 * A {@link TransactionContext} is provided, which allows 730 * controlled access to the target database. The context will contain a fresh 731 * fresh connection (i.e. a new transaction), and the Data Sync Server 732 * will always commit or rollback the transaction automatically, depending on 733 * whether this method returns normally or throws an exception. See the class 734 * level documentation for warnings and additional details. 735 * 736 * @param ctx 737 * a TransactionContext which provides a valid JDBC connection to the 738 * database. 739 * @param entryType 740 * the type of database entry to be fetched (this is specified 741 * on the CLI for the resync command) 742 * @param inputLines 743 * an Iterator containing the lines from the specified input file to 744 * resync (this is specified on the CLI for the resync command). 745 * These lines can be any format, for example a set of primary keys, 746 * a set of WHERE clauses, a set of full SQL queries, etc. 747 * @param outputQueue 748 * a queue of DatabaseChangeRecord objects which will be individually 749 * fetched via {@link #fetchEntry(TransactionContext, SyncOperation)} 750 * @throws SQLException 751 * if there is an error retrieving the list of entries to resync 752 */ 753 public void listAllEntries(final TransactionContext ctx, 754 final String entryType, 755 final Iterator<String> inputLines, 756 final BlockingQueue<DatabaseChangeRecord> 757 outputQueue) throws SQLException 758 { 759 throw new UnsupportedOperationException( 760 "The listAllEntries(TransactionContext,String,Iterator,BlockingQueue) "+ 761 "method must be implemented in the '" + 762 getExtensionName() + "' extension (Java class " + 763 getClass().getName() + ")."); 764 } 765}