001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * docs/licenses/cddl.txt
011 * or http://www.opensource.org/licenses/cddl1.php.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * docs/licenses/cddl.txt.  If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 *      Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 *      Portions Copyright 2018-2023 Ping Identity Corporation
026 */
027package com.unboundid.directory.sdk.sync.api;
028
029import com.unboundid.directory.sdk.common.internal.ExampleUsageProvider;
030import com.unboundid.directory.sdk.common.internal.Reconfigurable;
031import com.unboundid.directory.sdk.common.internal.UnboundIDExtension;
032import com.unboundid.directory.sdk.sync.config.ChangeDetectorConfig;
033import com.unboundid.directory.sdk.sync.internal.SynchronizationServerExtension;
034import com.unboundid.directory.sdk.sync.types.ChangeRecord;
035import com.unboundid.directory.sdk.sync.types.EndpointException;
036import com.unboundid.directory.sdk.sync.types.SetStartpointOptions;
037import com.unboundid.directory.sdk.sync.types.SyncOperation;
038import com.unboundid.directory.sdk.sync.types.SyncServerContext;
039import com.unboundid.directory.sdk.sync.types.SyncSourceContext;
040import com.unboundid.ldap.sdk.ResultCode;
041import com.unboundid.util.Extensible;
042import com.unboundid.util.args.ArgumentException;
043import com.unboundid.util.args.ArgumentParser;
044
045import java.io.Serializable;
046import java.util.Collections;
047import java.util.LinkedList;
048import java.util.List;
049import java.util.Map;
050import java.util.concurrent.atomic.AtomicLong;
051
052/**
053 * This class defines an API that must be implemented by extensions that
054 * detect changes for an LDAP based Sync Source.
055 * A Change Detector can be used to
056 * <ul>
057 * <li>Process logs or other flat files for changes.</li>
058 * <li>Process changes from a Queue (Kafka, RabbitMQ, etc)</li>
059 * <li>Override the standard cn=changelog
060 * based approach for detecting changes.</li>
061 * </ul>
062 * <br>
063 * <h2>Configuring Change Detectors</h2>
064 * In order to configure a Change Detector created using this API, use a
065 * command like:
066 * <pre>
067 *    dsconfig create-change-detector \
068 *          --detector-name "<i>{detector-name}</i>" \
069 *          --type third-party \
070 *          --set "extension-class:<i>{class-name}</i>" \
071 *          --set "extension-argument:<i>{name=vale}</i>"
072 * </pre>
073 * where "<i>{plugin-name}</i>" is the name to use for the Change Detector
074 * instance, "<i>{class-name}</i>" is the fully-qualified name of the Java
075 * class that extends
076 * {@code com.unboundid.directory.sdk.sync.api.ChangeDetector},
077 * and "<i>{name=value}</i>" represents name-value pairs for any arguments to
078 * provide to the Change Detector. If multiple arguments should be provided
079 * to the Change Detector, then the
080 * "<CODE>--set extension-argument:<I>{name=value}</I></CODE>"
081 * option should be provided multiple times.
082 */
083@Extensible()
084@SynchronizationServerExtension(appliesToLocalContent = false,
085        appliesToSynchronizedContent = true)
086public abstract class ChangeDetector
087        implements UnboundIDExtension,
088        Reconfigurable<ChangeDetectorConfig>,
089        ExampleUsageProvider {
090
091  /**
092   * Creates a new instance of this LDAP Change Detector.  All Change
093   * Detector implementations must include a default constructor, but any
094   * initialization should generally be done in the
095   * {@code initializeChangeDetector} method.
096   */
097  public ChangeDetector() {
098    // No implementation is required.
099  }
100
101  /**
102   * {@inheritDoc}
103   */
104  @Override
105  public abstract String getExtensionName();
106
107  /**
108   * {@inheritDoc}
109   */
110  @Override
111  public abstract String[] getExtensionDescription();
112
113  /**
114   * {@inheritDoc}
115   */
116  @Override
117  public Map<List<String>, String> getExamplesArgumentSets() {
118    return Collections.emptyMap();
119  }
120
121  /**
122   * {@inheritDoc}
123   */
124  @Override
125  public void defineConfigArguments(final ArgumentParser parser)
126          throws ArgumentException {
127    // No arguments will be allowed by default.
128  }
129
130  /**
131   * This hook is called when a Sync Pipe first starts up  or when the
132   * set-startpoint subcommand is called from the <i>realtime-sync</i> command
133   * line tool. Any initialization of this change detector should be performed
134   * here. This method should generally store the {@link SyncServerContext}
135   * and {@link SyncSourceContext} in a class member so that it can be used
136   * elsewhere in the implementation.
137   * <p>
138   * The default implementation is empty.
139   *
140   * @param serverContext     A handle to the server context for the server in
141   *                          which this extension is running.
142   * @param syncSourceContext An interface for interacting with the Sync
143   *                          Source that owns this Change Detector or
144   *                          {@code null} if the Change Detector is only
145   *                          being initialized to validate its configuration.
146   * @param parser            The argument parser which has been initialized
147   *                          from the configuration for this sync source.
148   */
149  public void initializeChangeDetector(
150          final SyncServerContext serverContext,
151          final SyncSourceContext syncSourceContext,
152          final ArgumentParser parser) {
153    // No initialization will be performed by default.
154  }
155
156  /**
157   * This hook is called when a Sync Pipe shuts down or when the set-startpoint
158   * subcommand (from the <i>realtime-sync</i> command line tool) is finished.
159   * Any clean up of this change detector should be performed here.
160   * <p>
161   * The default implementation is empty.
162   */
163  public void finalizeChangeDetector() {
164    //No implementation required by default.
165  }
166
167  /**
168   * {@inheritDoc}
169   */
170  public boolean isConfigurationAcceptable(
171          final ChangeDetectorConfig config,
172          final ArgumentParser parser,
173          final List<String> unacceptableReasons) {
174    // No implementation required by default.
175    return true;
176  }
177
178  /**
179   * {@inheritDoc}
180   */
181  public ResultCode applyConfiguration(final ChangeDetectorConfig config,
182                                       final ArgumentParser parser,
183                                       final List<String> adminActionsRequired,
184                                       final List<String> messages) {
185    // No implementation required by default.
186    return ResultCode.SUCCESS;
187  }
188
189  /**
190   * This method should effectively set the starting point for synchronization
191   * to the place specified by the <code>options</code> parameter. This should
192   * cause all changes previous to the specified start point to be disregarded
193   * and only changes after that point to be returned by
194   * {@link #getNextBatchOfChanges(int, AtomicLong)}.
195   * <p>
196   * There are several different startpoint types (see
197   * {@link SetStartpointOptions}), and this implementation is not required to
198   * support them all. If the specified startpoint type is unsupported, this
199   * method should throw an {@link UnsupportedOperationException}.
200   *
201   * <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type
202   * must be supported by your implementation, because this is used when a Sync
203   * Pipe first starts up. The {@link Serializable} in this case is the same
204   * type that is returned by {@link #getStartpoint()}; the Sync Server persists
205   * it and passes it back in on a restart.
206   * <p>
207   * This method can be called from two different contexts:
208   * <ul>
209   * <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used
210   * (the Sync Pipe is required to be stopped in this context)</li>
211   * <li>Immediately after a Sync Pipe starts up and a connection is first
212   * established to the source server (e.g. before the first call to
213   * {@link #getNextBatchOfChanges(int, AtomicLong)})</li>
214   * </ul>
215   *
216   * @param options an object which indicates where exactly to start
217   *                synchronizing (e.g. the end of the changelog, specific
218   *                change number, a certain time ago, etc)
219   * @throws EndpointException if there is any error while setting the
220   *                           start point
221   */
222  public abstract void setStartpoint(final SetStartpointOptions options)
223          throws EndpointException;
224
225
226  /**
227   * Gets the current value of the startpoint for change detection. This is the
228   * "bookmark" which indicates which changes have already been processed and
229   * which have not. In most cases, a change number is used to detect changes
230   * and is managed by the Data Sync Server, in which case this
231   * implementation needs only to return the latest acknowledged
232   * change number. In other cases, the return value may correspond to a
233   * different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server.
234   * In any case, this method should return the value that is updated by
235   * {@link #acknowledgeCompletedOps(LinkedList)}.
236   * <p>
237   * This method is called periodically and the return value is saved in the
238   * persistent state for the Sync Pipe that uses this extension as its Sync
239   * Source.
240   *
241   * <b>IMPORTANT</b>: The internal value for the startpoint should only be
242   * updated after a sync operation is acknowledged back to this extension (via
243   * {@link #acknowledgeCompletedOps(LinkedList)}).
244   * Otherwise it will be possible for changes to be missed when the
245   * Data Sync Server is restarted or a connection error occurs.
246   *
247   * @return a value to store in the persistent state for the Sync Pipe. This is
248   * usually a change number, but if a changelog table is not used to
249   * detect changes, this value should represent some other token to
250   * pass into {@link #setStartpoint(SetStartpointOptions)}
251   * when the sync pipe starts up.
252   */
253  public abstract Serializable getStartpoint();
254
255  /**
256   * Return the next batch of change records from the source. Change records
257   * are usually just hints that a change happened; they do not include
258   * the full contents of the target entry. In an effort to never synchronize
259   * stale data, the Data Sync Server will go back and fetch the full
260   * target entry for each change record.
261   * <p>
262   * On the first invocation, this should return changes starting from the
263   * startpoint that was set by
264   * {@link #setStartpoint(SetStartpointOptions)}. This method is also
265   * responsible for updating the internal state such that subsequent
266   * invocations do not return duplicate changes.
267   * <p>
268   * The resulting list should be limited by <code>maxChanges</code>. The
269   * <code>numStillPending</code> reference should be set to the estimated
270   * number of changes that haven't yet been retrieved from the source endpoint
271   * when this method returns, or zero if all the current changes have been
272   * retrieved.
273   *
274   * <b>IMPORTANT</b>: While this method needs to keep track of which changes
275   * have already been returned so that it does not return them again, it should
276   * <b>NOT</b> modify the official startpoint. The internal value for the
277   * startpoint should only be updated after a sync operation is acknowledged
278   * back to this extension (via
279   * {@link #acknowledgeCompletedOps(LinkedList)}).
280   * Otherwise it will be possible for changes to be missed when the
281   * Data Sync Server is restarted or a connection error occurs. The
282   * startpoint should not change as a result of this method.
283   * <p>
284   * This method <b>does not need to be thread-safe</b>. It will be invoked
285   * repeatedly by a single thread, based on the polling interval set in the
286   * Sync Pipe configuration.
287   *
288   * @param maxChanges      the maximum number of changes to retrieve
289   * @param numStillPending this should be set to the number of unretrieved
290   *                        changes that are still pending after this batch has
291   *                        been retrieved. This will be passed in as zero, and
292   *                        may be left that way if the actual value cannot be
293   *                        determined.
294   * @return a list of {@link ChangeRecord} instances, each
295   * corresponding to a single change at the source endpoint.
296   * If there are no new changes to return, this method should return
297   * an empty list.
298   * @throws EndpointException if there is any error while retrieving the
299   *                           next batch of changes
300   */
301  public abstract List<ChangeRecord> getNextBatchOfChanges(
302          final int maxChanges,
303          final AtomicLong numStillPending)
304          throws EndpointException;
305
306  /**
307   * Provides a way for the Data Sync Server to acknowledge back to the
308   * extension which sync operations it has processed. This method should update
309   * the official startpoint which was set by
310   * {@link #setStartpoint(SetStartpointOptions)} and is
311   * returned by {@link #getStartpoint()}.
312   *
313   * <b>IMPORTANT</b>: The internal value for the startpoint should only be
314   * updated after a sync operation is acknowledged back to this extension (via
315   * this method). Otherwise it will be possible for changes to be missed when
316   * the Data Sync Server is restarted or a connection error occurs.
317   *
318   * @param completedOps a list of {@link SyncOperation}s that have finished
319   *                     processing. The records are listed in the order they
320   *                     were first detected.
321   * @throws EndpointException if there is an error acknowledging the changes
322   *                           back to the source
323   */
324  public abstract void acknowledgeCompletedOps(
325          final LinkedList<SyncOperation> completedOps)
326          throws EndpointException;
327}