/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * docs/licenses/cddl.txt * or http://www.opensource.org/licenses/cddl1.php. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * docs/licenses/cddl.txt. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2019-2023 Ping Identity Corporation */ package com.unboundid.directory.sdk.examples; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.unboundid.directory.sdk.common.types.LogSeverity; import com.unboundid.directory.sdk.sync.api.KafkaSyncDestinationPlugin; import com.unboundid.directory.sdk.sync.config.KafkaSyncDestinationPluginConfig; import com.unboundid.directory.sdk.sync.types.KafkaMessage; import com.unboundid.directory.sdk.sync.types.PreStepResult; import com.unboundid.directory.sdk.sync.types.SyncOperation; import com.unboundid.directory.sdk.sync.types.SyncServerContext; import com.unboundid.ldap.sdk.ResultCode; import com.unboundid.util.args.ArgumentException; import com.unboundid.util.args.ArgumentParser; import com.unboundid.util.args.StringArgument; import java.io.IOException; import java.util.Arrays; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; /** * This class provides a simple example of a Kafka sync destination plugin * which filters out test data and removes passwords from Kafka messages. * Currently, a single API is provided for manipulating messages prior to * their being published to a Kafka topic. * <UL> * <LI> * {@link #prePublish(KafkaMessage, SyncOperation)} } -- Allows for the * transformation or filtering of messages prior to publishing. * </LI> * </UL> * * It takes the following argument: * <UL> * <LI>filter-attribute -- One or more attribute names which will * be removed from any messages prior to * publishing.</LI> * </UL> * * Note: This extension is provided as an example of the capabilities * of a Kafka sync destination plugin. In a production environment, attribute * modification and filtering would be handled by attribute mapping and * do not require the use of any plugins. * */ public class ExampleKafkaSyncDestinationPlugin extends KafkaSyncDestinationPlugin { private static final ObjectMapper MAPPER = new ObjectMapper(); private static final String ARG_NAME_FILTER_ATTRIBUTES = "filter-attribute"; private SyncServerContext serverContext; private volatile List<String> filterAttributes; /** * Retrieves a human-readable name for this extension. * * @return A human-readable name for this extension. */ @Override public String getExtensionName() { return "Example Kafka Sync Destination Plugin"; } /** * Retrieves a human-readable description for this extension. Each element * of the array that is returned will be considered a separate paragraph in * generated documentation. * * @return A human-readable description for this extension, or {@code null} * or an empty array if no description should be available. */ @Override public String[] getExtensionDescription() { return new String[]{ "This is a simple example of a Kafka sync destination plugin " + "which will filter out any messages intended to be " + "test data. Additionally, it can be configured to filter " + "out individual attributes on messages. For example, " + "passwords or other sensitive attributes." }; } /** * Updates the provided argument parser to define any configuration arguments * which may be used by this sync pipe plugin. The argument parser may * also be updated to define relationships between arguments (e.g., to specify * required, exclusive, or dependent argument sets). * * @param parser The argument parser to be updated with the configuration * arguments which may be used by this sync pipe plugin. * * @throws ArgumentException If a problem is encountered while updating the * provided argument parser. */ @Override public void defineConfigArguments(final ArgumentParser parser) throws ArgumentException { // Add an argument that allows for the specification of a list // of attribute names to filter out of Kafka Messages. Character shortIdentifier = null; String longIdentifier = ARG_NAME_FILTER_ATTRIBUTES; boolean required = false; int maxOccurences = -1; // multi-valued String placeHolder = "{attr-name}"; String description = "The name of the attribute (post mapping) whose" + " value should be filtered out of any messages."; StringArgument arg = new StringArgument( shortIdentifier, longIdentifier, required, maxOccurences, placeHolder, description ); parser.addArgument(arg); } /** * Initialize this Kafka sync destination plugin. This method will be called * before any other methods in this class. * * @param serverContext A handle to the server context for the * Data Sync Server in which this extension is * running. Extensions should typically store this * in a class member. * @param config The general configuration for this plugin. * @param parser The argument parser which has been initialized from * the configuration for this Kafka sync destination * plugin. */ @Override public void initializeKafkaSyncDestinationPlugin( final SyncServerContext serverContext, final KafkaSyncDestinationPluginConfig config, final ArgumentParser parser) { this.serverContext = serverContext; this.filterAttributes = parser.getStringArgument(ARG_NAME_FILTER_ATTRIBUTES).getValues(); } /** * Attempts to apply the configuration contained in the provided argument * parser. * * @param config The general configuration for this Kafka sync * destination plugin. * @param parser The argument parser which has been * initialized with the new configuration. * @param adminActionsRequired A list that can be updated with information * about any administrative actions that may be * required before one or more of the * configuration changes will be applied. * @param messages A list that can be updated with information * about the result of applying the new * configuration. * * @return A result code that provides information about the result of * attempting to apply the configuration change. */ @Override public ResultCode applyConfiguration( final KafkaSyncDestinationPluginConfig config, final ArgumentParser parser, final List<String> adminActionsRequired, final List<String> messages ) { this.filterAttributes = parser.getStringArgument(ARG_NAME_FILTER_ATTRIBUTES).getValues(); return ResultCode.SUCCESS; } /** * Retrieves a map containing examples of configurations that may be used for * this extension. The map key should be a list of sample arguments, and the * corresponding value should be a description of the behavior that will be * exhibited by the extension when used with that configuration. * * @return A map containing examples of configurations that may be used for * this extension. It may be {@code null} or empty if there should * not be any example argument sets. */ @Override public Map<List<String>, String> getExamplesArgumentSets() { final LinkedHashMap<List<String>, String> exampleMap = new LinkedHashMap<>(1); exampleMap.put( Arrays.asList( ARG_NAME_FILTER_ATTRIBUTES + "=userPassword", ARG_NAME_FILTER_ATTRIBUTES + "=authPassword" ), "Filter password attributes." ); return exampleMap; } /** * Provides the ability to alter or suppress Kafka messages. * This method is called before a message is published to the * configured Kafka Topic. * * @param message The message to be published to the * Kafka topic. * @param operation The synchronization operation for this * change. * @return The result of the plugin processing. */ @Override public PreStepResult prePublish(final KafkaMessage message, final SyncOperation operation) { // Skip any operations which contain test data. if (message.getKey().contains("test-data")) { operation.logInfo("Not sending Kafka message for test data."); return PreStepResult.ABORT_OPERATION; } // Retrieve the current message. String currentMessageValueString = message.getValue(); try { // Parse it into JSON. JsonNode currentMessageValueJSON = MAPPER.readTree(currentMessageValueString); // Strip out sensitive fields // Note: The removal of sensitive attributes can also be // handled by attribute mapping. for (String attrName : filterAttributes) { for (String entryField : Arrays.asList("current", "previous")) { if (currentMessageValueJSON.has(entryField)) { if (currentMessageValueJSON.get(entryField).has(attrName)) { operation.logDebug("Removing sensitive attribute " + attrName + " from the operation."); ((ObjectNode) currentMessageValueJSON.get(entryField)) .remove(attrName); } } } } // Save the filtered JSON back into the message. message.setValue(currentMessageValueJSON.toString()); // Return thumbs up. return PreStepResult.CONTINUE; } catch (IOException e) { serverContext.debugCaught(e); serverContext.logMessage(LogSeverity.FATAL_ERROR, "Unable to parse Kafka Message as JSON! Exception: " + e.getLocalizedMessage()); // Try again a few times in case of a transient issue. return PreStepResult.RETRY_OPERATION_LIMITED; } } /** * Appends a string representation of this Kafka sync destination plugin to * the provided buffer. * * @param buffer The buffer to which the string representation should be * appended. */ @Override public void toString(final StringBuilder buffer) { buffer.append("ExampleKafkaSyncDestinationPlugin("); if (filterAttributes.size() > 0) { for (int i = 0; i < filterAttributes.size(); i++) { buffer.append(ARG_NAME_FILTER_ATTRIBUTES + "='") .append(filterAttributes.get(i)).append("'"); if (i != filterAttributes.size() - 1) { buffer.append(", "); } } } buffer.append(")"); } }