The Listener Config table
The Listener Config table in Cinchy allows you to configure stream sources for real-time data synchronization. Access these settings through the Connections UI on the Data Sync Source page.
As of Cinchy version 5.7, manage your listener configurations on the Sources tab within the Listener section of the Connections UI. For multiple listener requirements, add additional configurations in the Listener Config table.
The Listener Config table is also where you review and manage existing configurations. Most columns are standard across all Stream Sources, with exceptions noted as needed. Detailed parameters and descriptions are provided in the subsequent tables.
Listener parameters
The following column parameters can be found in the Listener Config table and should be filled in when setting up a real-time sync:
Parameter | Description | Example |
---|---|---|
Name | Mandatory. Provide a name for your listener config. | CDC Real-Time Sync |
Event Connector Type | Mandatory. Select your Connector type from the drop-down menu. | Cinchy CDC |
Topic | Mandatory. This field is expecting a JSON formatted value specific to the connector type you are configuring. | See the Topic tab. |
Connection Attributes | Mandatory. This field is expecting a JSON formatted value specific to the connector type you are configuring. | See the Connection Attributes tab. |
Status | Mandatory. Set to "Enabled" to activate the listener. Leave on "Disabled" until you are ready to start syncing. | Enabled /Disabled |
Active | Managed by User/System. Indicates whether the listener is set to retry after failure ("Yes") or has stopped attempting to sync and requires user intervention ("No"). | Yes /No |
Data Sync Config | Mandatory. This drop-down will list all the data syncs on your platform. Select the one that you want to use for your real-time sync. | CDC Data Sync |
Subscription Expires On | Salesforce Stream Sources only. This field is a timestamp that's auto-populated when it has successfully subscribed to a topic. | |
Message | Auto-populated. This field reports errors that occur during the sync. | |
Auto Offset Reset | Earliest, Latest, None. Determines where to start reading events if there is no last message ID or if it's invalid. Can be adjusted post-configuration. Learn more | Latest ,Latest , None |
Assigned Topic | (v5.12) This column allows you to set individual listener configs to route messages to a designated Kafka Topic of various sizes. For more information on Kafka Topic Isolation, review Appendix B. |
Listener status management v5.9
This section has details on status management for real-time sync sources.
Basic Workflow
The Enabled/Disabled setting controls the initiation and halting of data synchronization activities. When set to Enabled, the system begins the synchronization process, following a lifecycle with the following phases:
- Starting: The system attempts to establish a connection.
- Running: The system is actively synchronizing data.
- Failed: The system encountered an error during synchronization.
- Disabled: The user changed listener config Status to Disabled.
In case of a synchronization failure, a detailed error message is logged in the Message column and the State column shows Failed
. Despite the failure, the synchronization remains Enabled and will retry after 60 seconds. This retry process continues until the synchronization is manually set to Disabled.
The event listener will automatically set itself to Disabled in two scenarios:
- An invalid configuration is detected, such as an erroneous Topic JSON.
- A validation error occurs during synchronization. For example, a missing mandatory field in the Topic JSON.
Initial States
- Enabled/Disabled: Toggle to initiate or halt synchronization.
Status
- Starting: Connection attempts begin.
- Running: Active data synchronization.
- Failed: Synchronization has stopped due to an error, will retry after 60 seconds.
This updated documentation and chart provide a clearer representation of the synchronization process, its states, and the conditions that lead to changes in these states.
Appendix A - Filtering
Filtering
You can add filters to your listener configuration when setting up real-time data syncs. Much like source and destination filters, a Listener Filter uses Cinchy Query Language to define specific subsets of data that you want to listen in on.
A filter can be defined in the Connections UI Source tab (Image 1), or in the Topic column of the [Cinchy].[Listener Config] table.
When using the Connections UI, the filter syntax is as follows:
[Status] = 'Ready for Testing'
When using the Listener Config table Topic column, the filter syntax is as follows (note the addition of "filter":):
"filter":"[Status] = 'Ready for Testing'"
Filter Examples
The following examples use the syntax as you would input into the Connection UI.
Example 1: Listening to records where the [Status] column equals 'Ready for Testing'.
[Status] = 'Ready for Testing'
Example 2: Listening to records where the [Ticket Type] column does not equal 'Bug Fix'.
[Status] != 'Bug Fix
Example 3: Listening to records created on or after December 1st, 2023.
[Created] >= 'Dec 1 2023'
Old vs New Filter
The Cinchy Event Broker/CDC Stream Source has the unique capability to use "Old" and "New" parameters when filtering data. This filter can be a powerful tool for ensuring that you sync only the specific data that you want.
The "New" and "Old" parameters are based on updates to single records, not columns/rows.
"New" Example:
In the below filter, we only want to sync data where the [Approval State] of a record is newly 'Approved'. For example, if a record was changed from 'Draft' to 'Approved', the filter would sync the record.
Due to internal logic, newly created records will be tagged as both "New" and "Old".
"filter": "New.[Approval State] = 'Approved'
"Old" Example:
In the below filter, we only want to sync data where the [Status] of a record was 'In Progress' but has since been updated to any other [Status]. For example, if a record was changed from 'In Progress' to 'Done', the filter would sync the record.
Due to internal logic, newly created records will be tagged as both "New" and "Old".
"filter": "Old.[Status] = 'In Progress'
Appendix B - Kafka Topic Isolation
This feature is available from Cinchy v5.12.0 onwards, where Kafka (not SQL Server Service Broker) is used for real-time messaging. This is the default for Cinchy in Kubernetes.
Kafka Topic Isolation is a feature designed to optimize the performance of designated real-time syncs. Users can assign custom topics to any listener config, essentially creating dedicated queues to 'fast track' the processing of associated data. When configured appropriately, high priority listener configs will benefit from dedicated resources, while lower priority listener configs will continue to share resources. This provides a mechanism to improve the throughput of critical or high volume workloads, preserving the default behaviour for most workloads.
Default behaviour
When Kafka is used for real-time messaging, the default behaviour is as follows.
- The listener pushes messages to a single Kafka topic, which has 100 partitions.
- The messages are distributed across those partitions.
- The partitions are allocated to worker threads. Workers typically host 12 worker threads each.
- Assuming 2 workers, the 24 worker threads would service approximately 4 partitions each.
100 partitions / (2 workers * 12 threads ) = 4.16… partitions per worker thread
. - Worker threads consume messages from their associated partitions.
This provides a balanced strategy which scales out based on the number of workers, with the following caveats:
- Listener configs share available resources equally, so higher priority syncs are not treated as such.
- While the relatively large number of partitions allows the system as a whole to scale (up to a point), it also means that data for a given sync is spread across a large number of partitions, resulting in smaller batches in each partition.
- Smaller batch sizes reduce the throughput of individual syncs, as 'per batch' processing overhead becomes more significant.
- Overall throughput is ultimately constrained by the target system/database, so adding workers to increase concurrency has a practical upper limit. Going beyond this may negatively impact performance.
- Data for different syncs will share partitions, with no control over grouping or relative priority.
Behaviour with topic isolation
Topic Isolation provides a way to dedicate resources to specific syncs, control the sharing of partitions, and implicitly increase the average batch size for higher throughput. It can be used to prioritize specific syncs while limiting the total number of syncs concurrently writing to a given target. It provides a means to tune the system based on available resources, priority and expected volumes.
The optimal settings depend on the specific requirements and performance characteristics of your environment. Generally, an ideal configuration would isolate a smaller number of higher priority/volume syncs, and allow a larger number of lower priority/volume syncs to share resources.
Prerequisites & considerations
- Both Kafka and Redis must be provisioned.
- By default, only admin users can manage the topics in the User Defined Topics table, so that topic creation can be aligned with the provisioning of worker threads.
- The total number of worker threads must be greater than the number of custom topics created.. This is because each worker thread can only be associated with a single topic.
- If there are sufficient worker threads, multiple worker threads can be associated with a given topic, up to the number of partitions in the topic.
- Once a record is created in the User Defined Topics table, assigned to a configuration, and
Enabled
, the value in theSize
column should not be changed. At this point, the topic is already created in Kafka, so if you need to update the topic size it must be done from Kafka directly. Warning: Changing the size of an already created topic can result in data loss and is not recommended.
Usage
To make use of the Kafka Topic Isolation feature:
- Configure a real-time data sync (which resides in
[Cinchy].[Listener Configuration]
). - As a user with the necessary entitlements (typically administrator), create a topic in the
[Cinchy].[User Defined Topics]
table, and select the appropriateSize
, where Small = 1 partition, Medium = 2 partitions, Large = 4 partitions, and Extra-large = 8 partitions. - Set the
[Assigned Topic]
column of the[Cinchy].[Listener Configuration]
to the aforementioned topic. - Ensure the listener config is set to Enabled.
User Defined Topics table
The User Defined Topics table is used to define a designated small, medium, large, or XL Kafka topic. It represents a reference of topics in the platform's Kafka instance. If these topics don't yet exist in Kafka, they will be automatically created once the linked listener configuration is enabled.
The table contains the following fields:
Field Name | Description |
---|---|
Name | Mandatory. Assign a unique name for the topic. |
Size | Mandatory. Select which (partition) size the topic will be. Small: 1 Medium: 2 Large: 4 Extra-Large: 8 Once a record is created in the User Defined Topics table, assigned to a configuration, and Enabled , the value in the Size column should not be changed. At this point, the topic is already created in Kafka, so if you need to update the topic size it must be done from Kafka directly. Warning: Changing the size of an already created topic can result in data loss and is not recommended. |
Assigned Topic column
The Assigned Topic Column of the [Cinchy].[Listener Configuration] table is used to assign a topic(s) to a data sync. Once you've configured your data sync, including creating a new row in the Listener Config table, use this choice column to select a topic (which was defined in the User Defined Topics table previously.)
If the field is left blank/no topic is linked, the listener configuration will be assigned to a default, real-time Kafka topic as specified in the app settings.
Runtime behaviour
From a worker perspective, the subscription flow is as follows.
- On the startup of the worker, worker threads are created.
- Each worker thread is subscribed to a Kafka topic on a round-robin basis, so that each thread is assigned to the topic that has the lowest 'fill percentage', i.e.
(RequiredSize - CurrentSize) / RequiredSize
, until all user defined topics are 'full'. During this process, the default topic is treated as large, i.e. 4 partitions. Once all of the user defined topics have been assigned, available worker threads are shared among the remaining default topic partitions. - Every 30 seconds, the worker refreshes the subscriptions in Redis for healthy worker threads. This is done by setting the
Expires At
value for each healthy thread to the current time plusKafkaAssignedTopicExpirationSeconds
seconds (default 600). - Unhealthy worker threads will not have their subscription refreshed, and once the Expiry timestamp of the individual subscription has passed, they will be removed from Redis so that the topic's partition becomes available to another worker thread.
- Completed (including failed or cancelled) worker threads are replaced with new worker threads that are subscribed to available topics.
- The system will monitor the
[Listener Config]
and[User Defined Topics]
tables every 30 seconds. If relevant changes are detected, worker threads will be resubscribed. The frequency of this check is determined by theKafkaAssignedTopicSubscriptionRefreshSeconds
setting.
Relevant changes include:
- Size value of any user defined topic (in the Cinchy table) has changed.
- Name of any user defined topic (in the Cinchy table) has changed.
- New Assigned Topic has been linked in the Listener Configs table.
- Status of any listener config has changed to/from Enabled/Disabled.
Recovery
The system will automatically recover when any worker pod fails. Every 30 seconds, the Expires At
value of every worker thread, across all workers, is checked. If this time has passed for ANY worker thread, resubscription is triggered for ALL workers, even those having only healthy threads. This ensures the ideal distribution of worker threads to partitions. As new workers are provisioned, resubscription will again ensure the appropriate distribution of available resources.
Worker App settings
The following new fields were added to the Worker appsettings.json
file to support Kafka Topic Isolation:
Name | Description |
---|---|
KafkaAssignedTopicExpirationSeconds | The length of time, in seconds, that needs to pass before a resubscription process is queued up. |
KafkaAssignedTopicSubscriptionRefreshSeconds | The interval, in seconds, that the WorkerManager checks the subscriptions in Redis and refreshes for existing healthy subscriptions, or cancels expired subscriptions. |
"KafkaAssignedTopicExpirationSeconds": 600,
"KafkaAssignedTopicSubscriptionRefreshSeconds": 30
"ConnectionStrings": {
"Redis": "127.0.0.1:16379,password=redis_Passw0rd"
}
Additional resources
Information about setting up your Topic JSON can be found on the individual real-time sync stream source configuration pages.