001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at 010 * docs/licenses/cddl.txt 011 * or http://www.opensource.org/licenses/cddl1.php. 012 * See the License for the specific language governing permissions 013 * and limitations under the License. 014 * 015 * When distributing Covered Code, include this CDDL HEADER in each 016 * file and include the License file at 017 * docs/licenses/cddl.txt. If applicable, 018 * add the following below this CDDL HEADER, with the fields enclosed 019 * by brackets "[]" replaced with your own identifying information: 020 * Portions Copyright [yyyy] [name of copyright owner] 021 * 022 * CDDL HEADER END 023 * 024 * 025 * Copyright 2011-2018 Ping Identity Corporation 026 */ 027package com.unboundid.directory.sdk.sync.api; 028 029 030import java.io.Serializable; 031import java.util.Collections; 032import java.util.Iterator; 033import java.util.LinkedList; 034import java.util.List; 035import java.util.Map; 036import java.util.concurrent.BlockingQueue; 037import java.util.concurrent.atomic.AtomicLong; 038 039import com.unboundid.directory.sdk.common.internal.Configurable; 040import com.unboundid.directory.sdk.common.internal.ExampleUsageProvider; 041import com.unboundid.directory.sdk.common.internal.UnboundIDExtension; 042import com.unboundid.directory.sdk.sync.config.SyncSourceConfig; 043import com.unboundid.directory.sdk.sync.internal.SynchronizationServerExtension; 044import com.unboundid.directory.sdk.sync.types.EndpointException; 045import com.unboundid.directory.sdk.sync.types.SetStartpointOptions; 046import com.unboundid.directory.sdk.sync.types.SyncOperation; 047import com.unboundid.directory.sdk.sync.types.SyncServerContext; 048import com.unboundid.directory.sdk.sync.types.ChangeRecord; 049import com.unboundid.ldap.sdk.Entry; 050import com.unboundid.util.Extensible; 051import com.unboundid.util.args.ArgumentException; 052import com.unboundid.util.args.ArgumentParser; 053 054 055 056/** 057 * This class defines an API that must be implemented by extensions in order to 058 * synchronize data from a generic (non-LDAP and non-JDBC) endpoint. Since the 059 * UnboundID Synchronization Server is LDAP-centric, this API allows you to take 060 * generic content and convert it into LDAP entries which can then be processed 061 * by the Synchronization Server. The lifecycle of a sync operation is as 062 * follows: 063 * <ol> 064 * <li>Detect change at the synchronization source</li> 065 * <li>Fetch full source entry</li> 066 * <li>Perform any mappings and compute the equivalent destination entry</li> 067 * <li>Fetch full destination entry</li> 068 * <li>Diff the computed destination entry and actual (fetched) destination 069 * entry</li> 070 * <li>Apply the minimal set of changes at the destination to bring it in sync 071 * </li> 072 * </ol> 073 * This implies that the 074 * {@link #fetchEntry(SyncOperation)} method will be called once for every 075 * change that is returned by 076 * {@link #getNextBatchOfChanges(int, AtomicLong)}. 077 * <p> 078 * This is a generic interface and there is no protocol-specific connection 079 * management provided. It is expected that implementers will provide their own 080 * libraries for talking to the source endpoint and handle the connection 081 * lifecycle in the {@link #initializeSyncSource(SyncServerContext, 082 * SyncSourceConfig, ArgumentParser)} and {@link #finalizeSyncSource()} 083 * methods of this extension. 084 * <p> 085 * During realtime synchronization (i.e. when a Sync Pipe is running), there is 086 * a sliding window of changes being processed, and this API provides a 087 * distinction between some different points along that window: 088 * <ul> 089 * <li><b>Old changes</b>: These are changes that the Sync Server has 090 * processed and acknowledged back to the Sync Source. The Sync Source is 091 * under no obligation to re-detect these changes.</li> 092 * <li><b>Startpoint</b>: This marks where the Sync Source will start 093 * detecting changes if it is restarted.</li> 094 * <li><b>Detected but unacknowledged</b>: These changes have been returned by 095 * <code>getNextBatchOfChanges()</code> but not completely processed and 096 * acknowledged back to the Sync Source.</li> 097 * <li><b>Undetected changes</b>: The next call to 098 * <code>getNextBatchOfChanges()</code> should return the first changes 099 * that have not been detected. This should be somewhere at or ahead of 100 * the startpoint.</li> 101 * </ul> 102 * <p> 103 * Several of these methods throw {@link EndpointException}, which should be 104 * used in the case of any connection or endpoint error. For other types of 105 * errors, runtime exceptions may be used (IllegalStateException, 106 * NullPointerException, etc.). The Synchronization Server will automatically 107 * retry operations that fail, up to a configurable amount of attempts. The 108 * EndpointException class allows you to specify a retry policy as well. 109 * <BR> 110 * <H2>Configuring Sync Sources</H2> 111 * In order to configure a Sync Source based on this API and written in Java, 112 * use a command like: 113 * <PRE> 114 * dsconfig create-sync-source \ 115 * --source-name "<I>{source-name}</I>" \ 116 * --type third-party \ 117 * --set "extension-class:<I>{class-name}</I>" \ 118 * --set "extension-argument:<I>{name=value}</I>" 119 * </PRE> 120 * where "<I>{source-name}</I>" is the name to use for the Sync Source 121 * instance, "<I>{class-name}</I>" is the fully-qualified 122 * name of the Java class written using this API, and "<I>{name=value}</I>" 123 * represents name-value pairs for any arguments to provide to the sync 124 * source. If multiple arguments should be provided to the sync source, 125 * then the "<CODE>--set extension-argument:<I>{name=value}</I></CODE>" option 126 * should be provided multiple times. 127 */ 128@Extensible() 129@SynchronizationServerExtension(appliesToLocalContent=false, 130 appliesToSynchronizedContent=true) 131public abstract class SyncSource implements UnboundIDExtension, 132 Configurable, 133 ExampleUsageProvider 134{ 135 /** 136 * Creates a new instance of this Sync Source. All sync 137 * source implementations must include a default constructor, but any 138 * initialization should generally be done in the 139 * {@link #initializeSyncSource} method. 140 */ 141 public SyncSource() 142 { 143 // No implementation is required. 144 } 145 146 147 148 /** 149 * {@inheritDoc} 150 */ 151 public abstract String getExtensionName(); 152 153 154 155 /** 156 * {@inheritDoc} 157 */ 158 public abstract String[] getExtensionDescription(); 159 160 161 162 /** 163 * {@inheritDoc} 164 */ 165 public Map<List<String>,String> getExamplesArgumentSets() 166 { 167 return Collections.emptyMap(); 168 } 169 170 171 172 /** 173 * {@inheritDoc} 174 */ 175 public void defineConfigArguments(final ArgumentParser parser) 176 throws ArgumentException 177 { 178 // No arguments will be allowed by default. 179 } 180 181 182 183 /** 184 * This hook is called when a Sync Pipe first starts up, when the 185 * <i>resync</i> process first starts up, or when the set-startpoint 186 * subcommand is called from the <i>realtime-sync</i> command line tool. 187 * Any initialization of this sync source should be performed here. This 188 * method should generally store the {@link SyncServerContext} in a class 189 * member so that it can be used elsewhere in the implementation. 190 * <p> 191 * The default implementation is empty. 192 * 193 * @param serverContext A handle to the server context for the server in 194 * which this extension is running. 195 * @param config The general configuration for this sync source. 196 * @param parser The argument parser which has been initialized from 197 * the configuration for this sync source. 198 */ 199 public void initializeSyncSource(final SyncServerContext serverContext, 200 final SyncSourceConfig config, 201 final ArgumentParser parser) 202 { 203 // No initialization will be performed by default. 204 } 205 206 207 208 /** 209 * This hook is called when a Sync Pipe shuts down, when the <i>resync</i> 210 * process shuts down, or when the set-startpoint subcommand (from the 211 * <i>realtime-sync</i> command line tool) is finished. Any clean up of this 212 * sync source should be performed here. 213 * <p> 214 * The default implementation is empty. 215 */ 216 public void finalizeSyncSource() 217 { 218 //No implementation required by default. 219 } 220 221 222 223 /** 224 * Return the URL or path identifying the source endpoint 225 * from which this extension is transmitting data. This is used for logging 226 * purposes only, so it could just be a server name or hostname and port, etc. 227 * 228 * @return the path to the source endpoint (should not return {@code null}) 229 */ 230 public abstract String getCurrentEndpointURL(); 231 232 233 234 /** 235 * This method should effectively set the starting point for synchronization 236 * to the place specified by the <code>options</code> parameter. This should 237 * cause all changes previous to the specified start point to be disregarded 238 * and only changes after that point to be returned by 239 * {@link #getNextBatchOfChanges(int, AtomicLong)}. 240 * <p> 241 * There are several different startpoint types (see 242 * {@link SetStartpointOptions}), and this implementation is not required to 243 * support them all. If the specified startpoint type is unsupported, this 244 * method should throw an {@link UnsupportedOperationException}. 245 * <p> 246 * <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type 247 * must be supported by your implementation, because this is used when a Sync 248 * Pipe first starts up. The {@link Serializable} in this case is the same 249 * type that is returned by {@link #getStartpoint()}; the Sync Server persists 250 * it and passes it back in on on a restart. 251 * <p> 252 * This method can be called from two different contexts: 253 * <ul> 254 * <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used 255 * (the Sync Pipe is required to be stopped in this context)</li> 256 * <li>Immediately after a Sync Pipe starts up and a connection is first 257 * established to the source server (e.g. before the first call to 258 * {@link #getNextBatchOfChanges(int, AtomicLong)})</li> 259 * </ul> 260 * 261 * @param options 262 * an object which indicates where exactly to start synchronizing 263 * (e.g. the end of the changelog, specific change number, a certain 264 * time ago, etc) 265 * @throws EndpointException 266 * if there is any error while setting the start point 267 */ 268 public abstract void setStartpoint(final SetStartpointOptions options) 269 throws EndpointException; 270 271 272 273 /** 274 * Gets the current value of the startpoint for change detection. This is the 275 * "bookmark" which indicates which changes have already been processed and 276 * which have not. In most cases, a change number is used to detect changes 277 * and is managed by the Synchronization Server, in which case this 278 * implementation needs only to return the latest acknowledged 279 * change number. In other cases, the return value may correspond to a 280 * different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server. 281 * In any case, this method should return the value that is updated by 282 * {@link #acknowledgeCompletedOps(LinkedList)}. 283 * <p> 284 * This method is called periodically and the return value is saved in the 285 * persistent state for the Sync Pipe that uses this extension as its Sync 286 * Source. 287 * <p> 288 * <b>IMPORTANT</b>: The internal value for the startpoint should only be 289 * updated after a sync operation is acknowledged back to this script (via 290 * {@link #acknowledgeCompletedOps(LinkedList)}). 291 * Otherwise it will be possible for changes to be missed when the 292 * Synchronization Server is restarted or a connection error occurs. 293 * @return a value to store in the persistent state for the Sync Pipe. This is 294 * usually a change number, but if a changelog table is not used to 295 * detect changes, this value should represent some other token to 296 * pass into {@link #setStartpoint(SetStartpointOptions)} 297 * when the sync pipe starts up. 298 */ 299 public abstract Serializable getStartpoint(); 300 301 302 303 /** 304 * Return the next batch of change records from the source. Change records 305 * are usually just hints that a change happened; they do not include 306 * the full contents of the target entry. In an effort to never synchronize 307 * stale data, the Synchronization Server will go back and fetch the full 308 * target entry for each change record. 309 * <p> 310 * On the first invocation, this should return changes starting from the 311 * startpoint that was set by 312 * {@link #setStartpoint(SetStartpointOptions)}. This method is also 313 * responsible for updating the internal state such that subsequent 314 * invocations do not return duplicate changes. 315 * <p> 316 * The resulting list should be limited by <code>maxChanges</code>. The 317 * <code>numStillPending</code> reference should be set to the estimated 318 * number of changes that haven't yet been retrieved from the source endpoint 319 * when this method returns, or zero if all the current changes have been 320 * retrieved. 321 * <p> 322 * <b>IMPORTANT</b>: While this method needs to keep track of which changes 323 * have already been returned so that it does not return them again, it should 324 * <b>NOT</b> modify the official startpoint. The internal value for the 325 * startpoint should only be updated after a sync operation is acknowledged 326 * back to this script (via 327 * {@link #acknowledgeCompletedOps(LinkedList)}). 328 * Otherwise it will be possible for changes to be missed when the 329 * Synchronization Server is restarted or a connection error occurs. The 330 * startpoint should not change as a result of this method. 331 * <p> 332 * This method <b>does not need to be thread-safe</b>. It will be invoked 333 * repeatedly by a single thread, based on the polling interval set in the 334 * Sync Pipe configuration. 335 * 336 * @param maxChanges 337 * the maximum number of changes to retrieve 338 * @param numStillPending 339 * this should be set to the number of unretrieved changes that 340 * are still pending after this batch has been retrieved. This will 341 * be passed in 342 * as zero, and may be left that way if the actual value cannot be 343 * determined. 344 * @return a list of {@link ChangeRecord} instances, each 345 * corresponding to a single change at the source endpoint. 346 * If there are no new changes to return, this method should return 347 * an empty list. 348 * @throws EndpointException 349 * if there is any error while retrieving the next batch of changes 350 */ 351 public abstract List<ChangeRecord> getNextBatchOfChanges( 352 final int maxChanges, 353 final AtomicLong numStillPending) 354 throws EndpointException; 355 356 357 358 /** 359 * Return a full source entry (in LDAP form) from the source, corresponding 360 * to the {@link ChangeRecord} that is passed in through the 361 * {@link SyncOperation}. This method should perform any queries necessary to 362 * gather the latest values for all the attributes to be synchronized. 363 * <p> 364 * This method <b>must be thread safe</b>, as it will be called repeatedly and 365 * concurrently by each of the Sync Pipe worker threads as they process 366 * entries. 367 * <p> 368 * If the original ChangeRecord has the full entry already set on it (which 369 * can be done using <code>ChangeRecord.Builder#fullEntry(Entry)</code>, 370 * then this method will not get called, and the Sync Server will 371 * automatically use the full entry from the ChangeRecord. In this case, the 372 * implementation can always return {@code null}. 373 * 374 * @param operation 375 * the SyncOperation which identifies the source "entry" to 376 * fetch. The ChangeRecord can be obtained by calling 377 * <code>operation.getChangeRecord()</code>. 378 * These ChangeRecords are generated by 379 * {@link #getNextBatchOfChanges(int, AtomicLong)} 380 * or by 381 * {@link #listAllEntries(BlockingQueue)}. 382 * 383 * @return a full LDAP Entry, or null if no such entry exists. 384 * @throws EndpointException 385 * if there is an error fetching the entry 386 */ 387 public abstract Entry fetchEntry(final SyncOperation operation) 388 throws EndpointException; 389 390 391 392 /** 393 * Provides a way for the Synchronization Server to acknowledge back to the 394 * script which sync operations it has processed. This method should update 395 * the official startpoint which was set by 396 * {@link #setStartpoint(SetStartpointOptions)} and is 397 * returned by {@link #getStartpoint()}. 398 * <p> 399 * <b>IMPORTANT</b>: The internal value for the startpoint should only be 400 * updated after a sync operation is acknowledged back to this extension (via 401 * this method). Otherwise it will be possible for changes to be missed when 402 * the Synchronization Server is restarted or a connection error occurs. 403 * 404 * @param completedOps 405 * a list of {@link SyncOperation}s that have finished processing. 406 * The records are listed in the order they were first detected. 407 * @throws EndpointException 408 * if there is an error acknowledging the changes back to the 409 * database 410 */ 411 public abstract void acknowledgeCompletedOps( 412 final LinkedList<SyncOperation> completedOps) 413 throws EndpointException; 414 415 416 417 /** 418 * Gets a list of all the entries in the source endpoint. This is used by the 419 * 'resync' command line tool. The default implementation throws a 420 * {@link UnsupportedOperationException}; subclasses should override if the 421 * resync functionality is needed. 422 * <p> 423 * The <code>outputQueue</code> should contain {@link ChangeRecord} objects 424 * with the <code>ChangeType</code> set to <code>null</code> to indicate that 425 * these are resync operations. {@code ChangeRecords} should be added to 426 * the queue using the blocking {@code put()} method. 427 * <p> 428 * This method should not return until all the entries at the source 429 * have been added to the output queue. Separate threads will concurrently 430 * drain entries from the queue and process them. The queue typically should 431 * not contain full entries, but rather ChangeRecord objects which identify 432 * the full source entries. These objects are then individually passed in to 433 * {@link #fetchEntry(SyncOperation)}. Therefore, it is important to make sure 434 * that the ChangeRecord instances contain enough identifiable information 435 * (e.g. primary keys) for each entry so that the entry can be found again. 436 * <p> 437 * The lifecycle of resync is similar to that of real-time sync, with a few 438 * differences: 439 * <ol> 440 * <li>Stream out a list of identifiers for the entries in the source 441 * endpoint, using a ChangeRecord as the identifier</li> 442 * <li>Fetch full source entry for a ChangeRecord</li> 443 * <li>Perform any mappings and compute the equivalent destination entry</li> 444 * <li>Fetch full destination entry</li> 445 * <li>Diff the computed destination entry and actual destination entry</li> 446 * <li>Apply the minimal set of changes at the destination to bring it in sync 447 * </li> 448 * </ol> 449 * <p> 450 * Alternatively, the full entry can be set on the ChangeRecord within this 451 * method, which will cause the "fetch full entry" step to be skipped. In this 452 * case the Sync Server will just use the entry specified on the ChangeRecord. 453 * <p> 454 * If the total set of entries is very large, it is fine to split up the work 455 * into multiple network queries within this method. The queue will not grow 456 * out of control because it blocks when it becomes full. The queue capacity 457 * is fixed at 1000. 458 * <p> 459 * @param outputQueue 460 * a queue of ChangeRecord objects which will be individually 461 * fetched via {@link #fetchEntry(SyncOperation)}. 462 * {@code ChangeRecords} should be added to the queue using the 463 * blocking {@code put()} method. 464 * @throws EndpointException 465 * if there is an error retrieving the list of entries to resync 466 */ 467 public void listAllEntries(final BlockingQueue<ChangeRecord> outputQueue) 468 throws EndpointException 469 { 470 throw new UnsupportedOperationException( 471 "The listAllEntries(BlockingQueue) " + 472 "method must be implemented in the '" + 473 getExtensionName() + "' extension (Java class " + 474 getClass().getName() + ")."); 475 } 476 477 478 479 /** 480 * Gets a list of all the entries in the source from a given file input. 481 * This is used by the 'resync' command line tool. The default implementation 482 * throws a {@link UnsupportedOperationException}; subclasses should override 483 * if the resync functionality is needed for specific records, which 484 * can be specified in the input file. 485 * <p> 486 * The format for the <code>inputLines</code> (e.g. the content of the file) 487 * is user-defined; it may be key/value pairs, primary keys, or full SQL 488 * statements, for example. The use of this method is triggered via the 489 * <i>--sourceInputFile</i> argument on the resync CLI. The 490 * <code>outputQueue</code> should contain {@link ChangeRecord} 491 * objects with the <code>ChangeType</code> set to <code>null</code> to 492 * indicate that these are resync operations. 493 * <p> 494 * This method should not return until all the entries specified by the input 495 * file have been added to the output queue. Separate threads will 496 * concurrently drain entries from the queue and process them. The queue 497 * typically should not contain full entries, but rather ChangeRecord 498 * objects which identify the full source entries. These objects are then 499 * individually passed in to {@link #fetchEntry(SyncOperation)}. Therefore, 500 * it is important to make sure that the ChangeRecord instances 501 * contain enough identifiable information (e.g. primary keys) for each entry 502 * so that the entry can be found again. 503 * <p> 504 * The lifecycle of resync is similar to that of real-time sync, with a few 505 * differences: 506 * <ol> 507 * <li>Stream out a list of identifiers for entries in the source endpoint, 508 * using the given input file as the basis for which entries to resync 509 * </li> 510 * <li>Fetch full source entry for an identifier</li> 511 * <li>Perform any mappings and compute the equivalent destination entry</li> 512 * <li>Fetch full destination entry</li> 513 * <li>Diff the computed destination entry and actual destination entry</li> 514 * <li>Apply the minimal set of changes at the destination to bring it in sync 515 * </li> 516 * </ol> 517 * <p> 518 * Alternatively, the full entry can be set on the ChangeRecord within this 519 * method, which will cause the "fetch full entry" step to be skipped. In this 520 * case the Sync Server will just use the entry specified on the ChangeRecord. 521 * <p> 522 * If the total set of entries is very large, it is fine to split up the work 523 * into multiple network queries within this method. The queue will not grow 524 * out of control because it blocks when it becomes full. The queue capacity 525 * is fixed at 1000. 526 * <p> 527 * @param inputLines 528 * an Iterator containing the lines from the specified input file to 529 * resync (this is specified on the CLI for the resync command). 530 * These lines can be any format, for example a set of primary keys, 531 * a set of WHERE clauses, a set of full SQL queries, etc. 532 * @param outputQueue 533 * a queue of ChangeRecord objects which will be individually 534 * fetched via {@link #fetchEntry(SyncOperation)}. 535 * {@code ChangeRecords} should be added to the queue using the 536 * blocking {@code put()} method. 537 * @throws EndpointException 538 * if there is an error retrieving the list of entries to resync 539 */ 540 public void listAllEntries(final Iterator<String> inputLines, 541 final BlockingQueue<ChangeRecord> outputQueue) 542 throws EndpointException 543 { 544 throw new UnsupportedOperationException( 545 "The listAllEntries(Iterator,BlockingQueue) " + 546 "method must be implemented in the '" + 547 getExtensionName() + "' extension (Java class " + 548 getClass().getName() + ")."); 549 } 550}