This blog post assumes you have at least a basic understanding of Kafka – topics, partitioning, offsets.
If you have been in software for any decent amount of time, then you know the truth that the scope of any functionality is heavily weighted on handling all the error conditions rather than the happy path. Stream processing is no exception. In this post, we will dive deep into the transformation pattern of stream processing and discuss what makes it complicated as well as some core components required in building services to support such transformations.
Let’s Transform Some Data
When we start talking about stream processing and transformation, it’s helpful to get deep into the steps. Several of them are handled by the Kafka clients or the application framework being used, while the others are your responsibility. For the discussion, we will be using the term “system” to logically define an application or set of applications which produce or consume events. Events will flow from a source system to the target system, but system could also define a set of Kafka topics which house canonical messages designed to be consumed by multiple target systems in a fan out pattern.
The transformation steps are as follows:
- Transformation process receives the message from the source topic
- Deserialize the message into the source data structure
- Create target data structure based on the target topic
- Loop over all the necessary fields and map them from source data structure to the target data structure
- For each field, determine if the data value needs to be mapped from the source system value to the target system value. For example, if we are transforming a message from an source system to a canonical message where a particular source field contains a value that doesn’t match what our canonical message expects, then we will require some foundational support to lookup what that value should be. More on this later.
- Validate the completed target data structure
- Serialize the target data structure into a message
- Produce the message onto the target topic
What we are going to focus on in this discussion is the transformation steps, the required supporting systems as well as the error handling methodologies for runtime transformation issues.
Data Transformation Patterns
The transformation patterns are the important part as most of the other steps such as serialization and message handling are typically handled by existing frameworks or the Kafka client itself. There are four of them.
- Message Formats – the transformation process needs to know the structure of the source data and the structure of the target data. For example, if the streaming process is processing output from Debezium and converting it into a canonical message for multiple downstream systems to consume, then the transformation process may be expecting a JSON payload and want to transform to a Protobuf target.
- Data Value Formats – this is transforming the same data but into a possibly different format. Maybe the source value is an integer but the target value expects a string? The source system may have an ALL CAPS value but the target system expects a CamelCase value? The best example of this might be timestamps where the source system produces a custom timestamp but the target system expects an ISO-8601 format.
- Enumerated Data Transformations – these are the simple lookups for data normalization such as Unit of Measure (UOM) or Currency. A source system may use a different value than the target value but these transformations are simple mappings that rarely change. There are several notorious examples of this in energy trading. What is a barrel? BBL, BBl, bbl? What about the Side of a trade? BUY, Buy, B, or 1?
- Reference Data Transformations – these are the more complex transformations which require discrete contextual knowledge of the source and target system data. The best example is the transformation between foreign key based data relationships. Let’s say a transaction message references a customer by the source system’s customer identifier but the customer in the target system has a different customer identifier, then the transformation will need to lookup the mapping and apply it during the transformation.
What Makes This Complicated?
For the most part, the message formats are well known during development. Additionally, the requirements around data value formats and enumerated data transformations can also be documented and handled during development time. The complication in these transformations is rooted in the dynamic nature of the reference data transformations.
Using the example above, let’s say a transaction message is coming from a source system and the transformation process is hoping to transform the message to a canonical transaction data format. However, the source transaction contains a reference to a new customer and no one has updated the mappings. In this scenario, we don’t want to send an invalid transaction message downstream because then all the consuming system transformations would also fail. Ignoring the failure is obviously not an option but we also don’t want to use the “Stop on Error” pattern either, because subsequent transactions may pile up behind the “bad” one and the other downstream systems may not be able to handle time-sensitive activities within the expected SLAs. The dead letter queue pattern only gets us a piece of the required functionality because we will then need to get the problem fixed and reprocess the message.
What Are Some Other Considerations?
The first decision regarding an implementation would be whether or not to embrace the retry topic pattern where invalid messages based on missing dependent data, such as a valid value mapping, are placed into a retry topic to then be checked periodically and reprocessed if the data is valid. I don’t like this pattern for a few reasons. The reprocesses loop, for example, would loop over what? The entire topic? What if one mapping gets fixed but others aren’t fixed. The entire retry topic would become fragmented and the retry process would need to be smart about what retry messages still need to be retried.
There would also be some questions that would need to get flushed out as it pertains to event ordering. Using the example earlier of the transaction that get blocked in transformation due to a lack of a customer mapping, we would need to dive deep into the the requirements around ordering dependencies as the scenario where the same transaction is updated multiple times in the source system could result in a definite need to retain order. Whatever reprocessing logic is implemented would not only need to be able to identify messages which need to retain order, but also identify how to filter subsequent messages into the retry queue if there was no way for the target system to identify the out of order scenario.
A quick example of this would be a transaction created with a new customer with no proper system value map but then the same transaction updated to identify a customer with a valid mapping. This would actually result in the second message processing normally, thereby “passing” the original message stuck in triage. If someone goes and updates the mapping for the new customer and reprocesses the message successfully, then the downstream topic will get those messages out of order.
What Would a Solution Look Like?
The core parts of the scenario are required including the Kafka cluster, the topics and of course, the transformation process itself. In addition to those, we will need a data value mapping service allowing us to create and maintain the data value mapping between systems. The transformation processes should be able to contact the data value mapping service to not only retrieve all the relevant data mappings for the source and target system combination, but the transformation process would also want to get real time updates from the data mapping system to avoid transformation errors associated with stale caches.
We will also need some error management system. The error management system would perform a couple of different activities. The application would have an endpoint to allow the transformation process to provide the details that an error has occurred. The error event would need to include several parts. First, the message metadata including the cluster information, topic names, partitions, offsets, key and value. Second, it should also include a list of associated errors with the message processing. These should include the particulars about the field itself, the source value and what target system it was attempting to map to. Once the event is pushed to the error management system, there should also be a notification system to the appropriate users regarding the error and language to support the required activities.
The transformation process itself would need a couple of different capabilities. First, it would be very helpful if the process could identify the full set of transformation errors prior to sending the information to the error management system. If the process throws an exception on the first mapping error, but there happens to be 3 additional subsequent missing mappings in the message, then the administration staff will be required to perform the loop of work four times instead of updating all the mappings in one batch.
The transformation process would also need to have that separate endpoint for reprocessing. Perhaps this could be a REST endpoint which allows the administrators to be able to click a button on the error management system’s UI that submits the metadata for the message back to the transformation process to reprocess that single message from the source topic. Another option would be to store the message contents in the message triage system itself and submit the value itself back to be reprocessed.