Data Polling
Overview
Version 5.4 of the Cinchy platform introduced data polling, which uses the Cinchy Event Listener to continuously monitor and sync data entries from your SQLServer or DB2 server into your Cinchy table. This capability makes data polling a much easier, effective, and streamlined process and avoids implementing the complex orchestration logic that was previous necessary.
The Listener Configuration
To set up an Stream Source, you must set up a Listener Config. In Cinchy v5.7+ this can be done directly in the Connections UI, however for multiple listener requirements you must still add additional configurations in the Listener Config table.
General Parameters
Parameter | Description | Example |
---|---|---|
Auto Offset Reset | Earliest, Latest or None. In the case where the listener is started and either there is no last message ID, or when the last message ID is invalid (due to it being deleted or it's just a new listener), it will use this column as a fallback to determine where to start reading events from. Earliest will start reading from the beginning on the queue (when the CDC was enabled on the table). This might be a suggested configuration if your use case is recoverable or re-runnable and if you need to reprocess all events to ensure accuracy. Latest will fetch the last value after whatever was last processed. This is the typical configuration. None won't read or start reading any events. You are able to switch between Auto Offset Reset types after your initial configuration through the process outlined here. | Latest |
Topic
The below table can be used to help create your Topic JSON needed to set up a real-time sync. All examples are sourced from the Data polling examples section
Parameter | Description | Example |
---|---|---|
CursorConfiguration | Mandatory parameter for basic query. Use a sub-query if "CursorColumn" is not unique. | SELECT o.OrderId, o.OrderDate, o.CustomerId, o.OrderLastUpdated, o.OrderStatus FROM Orders o WHERE o.Region = 'North America' AND o.OrderLastUpdated > @TheLastTimePolled |
FromClause * | Mandatory. This must contain at least the table name but can also contain Joined tables. | "Orders" in CursorConfiguration; "TransactionLog" in ReturnDataConfiguration |
CursorColumn | Mandatory. Used in 'WHERE' conditions and query sorting. | "OrderLastUpdated" in Orders table; "Id" in TransactionLog table |
BatchSize | Mandatory. Minimum data batch size per query. | 100 (as seen in JSON configurations) |
FilterCondition * | Filters used in 'WHERE' conditions. | "Region = 'North America'" in Orders table. |
Columns | Mandatory. List of result columns. | ["OrderId", "OrderDate", "CustomerId", "OrderLastUpdated", "OrderStatus"] in Orders table; ["Id", "Table", "RecordChangedId"] in TransactionLog table |
ReturnDataConfiguration | Used in complex queries. Relevant parameters are outside first "(" and last ")". | SELECT ol.OrderLineId, ol.OrderId, ol.ItemId, ol.Quantity, ol.Price, ol.LineStatus FROM OrderLines ol INNER JOIN Orders o ON o.OrderId = ol.OrderId WHERE o.OrderLastUpdated > @TheLastTimePolled |
CursorAlias | Mandatory. Alias for subquery result table. | "o" in ReturnDataConfiguration for Orders table |
JoinClause | Mandatory. Join condition for result table. | "Orders o ON o.OrderId = tl.RecordChangedId" for TransactionLog; "OrderLines ol ON ol.OrderId = o.OrderId" for OrderLines |
FilterCondition | Filters used in 'WHERE' conditions. | "ol.LineStatus != 'Cancelled'" in ReturnDataConfiguration for OrderLines |
OrderByClause | Mandatory. Sort order of final result. | "tl.Id, o.OrderId" for TransactionLog; "o.OrderId, ol.OrderLineId" for OrderLines |
Columns | Mandatory. List of result columns. | ["o.OrderId", "o.OrderDate", "o.CustomerId", "o.OrderLastUpdated", "o.OrderStatus"] in Orders table; ["ol.OrderLineId", "ol.OrderId", "ol.ItemId", "ol.Quantity", "ol.Price", "ol.LineStatus"] in OrderLines |
Delay | Mandatory. Delay in seconds between data sync cycles. | 10 (as seen in JSON configurations) |
messageKeyExpression | Optional. Mitigates data loss. See Appendix A. | "id" (suggested in Appendix A) |
CursorConfiguration.CursorColumnDataType | Mandatory. This property works in tandem with an update that ensures that the database query always moves the offset, regardless of if the query returned the records or not—this helps to ensure that the performance of the source database isn't being weighed down by constantly running heavy queries on a wide range of records when the queries returned no data. This value of this mandatory property must match the column type of the source database system for proper casting of parameters. | int |
CursorConfiguration.Distinct | Mandatory. This property is a true/false Boolean type that, when set to true, applies a distinct clause on your query to avoid any duplicate records. | true |
*NEXTOFFSET and MAXOFFSET reserved keyword values
Cinchy has added support for {'NEXTOFFSET'} and {'MAXOFFSET'} reserved keyword values to narrow the search window of the query for both the FromClause
and FilterCondition
parameters. These values help to create a focused range for data polls using unique CursorColumn
values to reduce the execution time of the query. You can read more about these values here
Connection Attributes
Parameter | Description | Example |
---|---|---|
databaseType | Mandatory. TSQL or DB2 | TSQL |
connectionString | Mandatory. This should be the connection string for your data source. | "Server=;Database=;User ID=cinchy;password=example;Trusted_Connection=False;Connection Timeout=30;Min Pool Size=10;" |
Example:
{
"databaseType": "TSQL",
"connectionString": "Server=;Database=;User ID=cinchy;password=example;Trusted_Connection=False;Connection Timeout=30;Min Pool Size=10;",
}
Examples
This section provides detailed examples and configurations for implementing data polling in Cinchy, specifically focusing on scenarios involving MSSQL and Aurora DBs. It shows how to effectively monitor and track changes in database tables through various polling techniques and configurations.
This section has three examples:
- CursorConfiguration Example: Demonstrates how to set up and utilize cursor-based configurations to track and poll data changes in the Orders table.
- ReturnDataConfiguration Example: Provides insights on configuring data return mechanisms. Using an OrderLines table example, it shows how to gather relevant data efficiently.
- CursorColumn Types: Discusses different types of cursor columns like date and identity columns, using the TransactionLog table.
CursorConfiguration example
The Orders table is structured with the following columns:
OrderId | OrderDate | CustomerId | OrderLastUpdated | OrderStatus |
---|---|---|---|---|
1 | 2023-01-01 14:00 | 10 | 2023-01-01 14:10 | Order being picked |
2 | 2023-01-01 14:03 | 20 | 2023-01-01 14:03 | Order placed |
- OrderId: Unique identifier for each order.
- OrderDate: Represents the date and time when the order was initially placed.
- CustomerId: Identifies the customer associated with the order.
- OrderLastUpdated: Records the date and time of the most recent update to the order, reflecting changes such as order line item shipments, quantity adjustments, or cancellations.
- OrderStatus: Indicates the current lifecycle stage of the order, which can change over time.
The OrderDate
remains constant once set during the order placement. To track
changes to the order or its status, the OrderLastUpdated
column is utilized.
Whenever an order is initially placed or undergoes updates, this column reflects
the latest date and time.
The OrderLastUpdated
column serves as a reliable forward-moving cursor for
detecting new orders and capturing changes to the OrderStatus
. This ensures
efficient monitoring and tracking of the evolving lifecycle stages of each order
in the system.
SQL query
The SQL query to identify Orders inserted or updated since the last polling time:
SELECT
o.OrderId,
o.OrderDate,
o.CustomerId,
o.OrderLastUpdated,
o.OrderStatus
FROM Orders o
WHERE
o.Region = 'North America'
AND o.OrderLastUpdated > @TheLastTimePolled
Listener config topic JSON
The corresponding [Cinchy].[Listener Config].[Topic] configuration:
{
"CursorConfiguration": {
"FromClause": "Orders",
"CursorColumn": "OrderLastUpdated",
"BatchSize": 100,
"FilterCondition": "Region = 'North America'",
"Columns": [
"OrderId",
"OrderDate",
"CustomerId",
"OrderLastUpdated",
"OrderStatus"
]
},
"Delay": 10
}
ReturnDataConfiguration Example
The OrderLines table represents line item records of an order with the following columns:
OrderLineId | OrderId | ItemId | Quantity | Price | LineStatus |
---|---|---|---|---|---|
1 | 1 | 45 | 6 | 8.49 | Cancelled |
2 | 1 | 97 | 1 | 12.68 | Picked |
3 | 1 | 102 | 4 | 9.87 | Backordered |
4 | 2 | 34 | 3 | 14.73 | To be picked |
5 | 2 | 88 | 10 | 1.99 | To be picked |
- OrderLineId: Unique identifier for each order line.
- OrderId: Identifies the order to which the line item belongs.
- ItemId: Identifier for the item in the line.
- Quantity: Represents the quantity of the item in the order.
- Price: Indicates the unit price of the item.
- LineStatus: Reflects the status of the order line, such as Cancelled, Picked, or Backordered.
Unlike the Orders table, the OrderLines table lacks a specific column
for detecting changes or updates to individual line items. However, changes to
the order, including its line items, trigger an update to the
Orders.OrderLastUpdated
column. Leveraging the relationship between OrderLines
and Orders, we can use the Orders.OrderLastUpdated
column to identify inserted
or updated OrderLines.
SQL query
The SQL query to identify OrderLines inserted or updated since the last polling time:
SELECT
ol.OrderLineId,
ol.OrderId,
ol.ItemId,
ol.Quantity,
ol.Price,
ol.LineStatus
FROM OrderLines ol
INNER JOIN Orders o ON o.OrderId = ol.OrderId
WHERE
o.OrderLastUpdated > @TheLastTimePolled
Listener config topic JSON
To poll data from the OrderLines table, the corresponding [Cinchy].[Listener
Config].[Topic] includes both a CursorConfiguration
object, which identifies
the table being polled to detect changes, and a ReturnDataConfiguration
object, which represents the data you want to collect. The configuration is the
following:
{
"CursorConfiguration": {
"FromClause": "Orders",
"CursorColumn": "OrderLastUpdated",
"BatchSize": 100,
"FilterCondition": "Region = 'North America'",
"Columns": [
"OrderId",
"OrderDate",
"CustomerId",
"OrderLastUpdated",
"OrderStatus"
]
},
"ReturnDataConfiguration": {
"CursorAlias": "o",
"JoinClause": "OrderLines ol ON ol.OrderId = o.OrderId",
"FilterCondition": "ol.LineStatus != 'Cancelled'",
"OrderByClause": "o.OrderId, ol.OrderLineId",
"Columns": [
"ol.OrderLineId",
"ol.OrderId",
"ol.ItemId",
"ol.Quantity",
"ol.Price",
"ol.LineStatus"
]
},
"Delay": 10
}
CursorColumn Types
A data polling cursor can be a Date column like in the previous example, or it can be an identity column where an insert always represents the latest change to that table. Consider the following TransactionLog table example:
Id | Table | RecordChangedId | ChangeDate | ChangeTime |
---|---|---|---|---|
1 | Orders | 1 | 2023-01-01 | 14:00 |
2 | OrderLines | 1 | 2023-01-01 | 14:00 |
3 | OrderLines | 2 | 2023-01-01 | 14:00 |
4 | OrderLines | 3 | 2023-01-01 | 14:00 |
5 | Orders | 2 | 2023-01-01 | 14:03 |
6 | OrderLines | 4 | 2023-01-01 | 14:03 |
7 | OrderLines | 5 | 2023-01-01 | 14:03 |
8 | Orders | 1 | 2023-01-01 | 14:10 |
In this example, data is only inserted into TransactionLog and updates never
occur. The forward-moving (and likely indexed) Id
column would be an efficient
and reliable data polling cursor to use.
SQL query
This alternative SQL query incorporates change polling from a TransactionLog table for our Orders example:
SELECT
o.OrderId,
o.OrderDate,
o.CustomerId,
o.OrderLastUpdated,
o.OrderStatus
FROM Orders o
INNER JOIN TransactionLog tl
ON tl.RecordChangedId = o.OrderId
AND tl.Id > @TheLastCursorValuePolled
WHERE
o.Region = 'North America'
AND tl.Table = 'Orders'
Listener config topic JSON
The corresponding [Cinchy].[Listener Config].[Topic] configuration, which
introduces a ReturnDataConfiguration
to the Orders table:
{
"CursorConfiguration": {
"FromClause": "TransactionLog",
"CursorColumn": "Id",
"BatchSize": 100,
"FilterCondition": "",
"Columns": ["Id", "Table", "RecordChangedId"]
},
"ReturnDataConfiguration": {
"CursorAlias": "tl",
"JoinClause": "Orders o ON o.OrderId = tl.RecordChangedId",
"FilterCondition": "o.Region = 'North America' AND tl.Table = 'Orders'",
"OrderByClause": "tl.Id, o.OrderId",
"Columns": [
"o.OrderId",
"o.OrderDate",
"o.CustomerId",
"o.OrderLastUpdated",
"o.OrderStatus"
]
},
"Delay": 10
}
Appendix A
Optional Configurations
There are a few optional configurations that you can enable in your appsettings.json deployment file for the Data Polling source. These are detailed below.
DataPollingConcurrencyIndex: This property allows only a certain number of threads to run queries against the source database, which works to reduce the load against the database.
- The default number of threads is set to 12.
- To configure this property, navigate to your appSettings.json deployment file > "DataPollingConcurrencyIndex":
<numberOfThreads>
QueueWriteConcurrencyIndex: This property allows only a certain number of threads to be concurrently sending messages to the queue. This works to provide a more consistent batching by the worker and reduce your batching errors. run queries against the source database, which works to reduce the load against the database.
- The default number of threads is set to 12.
- To configure this property, navigate to your appSettings.json deployment file > "QueueWriteConcurrencyIndex":
<numberOfThreads>
. - Note that this index is shared across all listener configs, meaning that if it's set to 1 only one listener config will be pushing the messages to the queue at a single moment in time.
AppSettings JSON Examples
// App Settings JSON Example
// Example of the configurable properties: DataPollingConcurrencyIndex (set to "1" and QueueWriteConcurrencyIndex (set to "1")
"AppSettings": {
"GetNewListenerConfigsInterval": "",
"StateFileWriteDelaySeconds": "",
"KafkaClientConfig": {
"BootstrapServers": ""
},
"KafkaRealtimeDatasyncTopic": "",
"KafkaJobCancellationTopic": "",
"DataPollingConcurrencyIndex": 1,
"QueueWriteConcurrencyIndex": 1
}
Appendix B
NEXTOFFSET and MAXOFFSET
Cinchy has added support for {'NEXTOFFSET'} and {'MAXOFFSET'} reserved keyword values to narrow the search window of the query for both the FromClause
and FilterCondition
parameters. These values help to create a focused range for data polls using unique CursorColumn
values to reduce the execution time of the query.
These values can be used to filter records from batch queries.
MAXOFFSET: Returns the maximum value of the CursorColumn value from the query.
NEXTOFFSET: Returns the highest CursorColumn value from queries that have already been processed.
If using {'NEXTOFFSET'}, it's recommended to use it with ISNULL to avoid situations where the {'NEXTOFFSET'} value isn't provided.
Please make sure your ISNULL check matches the database language you use.
ISNULL example
//ISNULL TSQL/DB2 example
ISNULL({NEXTOFFSET}, 40000000);
ISNULL might also affect query performance. If you are confident that you don't require an ISNULL check, it's recommended to remove a ISNULL check.
Escaped reserved keywords
If you use {'NEXTOFFSET'} or {'MAXOFFSET'} as CursorColumn
values, you must escape the
CursorColumn
value to differentiate it from the actual {NEXTOFFSET} reserved keyword.
Escaped reserved keywords example
The following generic JSON schema shows the use of both {NEXTOFFSET} as both a value and a reserved keyword.
{
"CursorConfiguration": {
"BatchSize": 1000,
"CursorColumn": "generic_table.SERIAL_COLUMN",
"Distinct": true,
"CursorColumnDataType": "INTEGER",
"FromClause": "SCHEMA_NAME.GENERIC_TABLE generic_table INNER JOIN SCHEMA_NAME.RELATED_TABLE related_table ON related_table.RELATED_SERIAL = generic_table.SERIAL_COLUMN AND generic_table.SERIAL_COLUMN >= ISNULL({NEXTOFFSET}, default_start_value)",
"FilterCondition": "related_table.CRITERIA_COLUMN != 'Exclusion_Criteria' AND related_table.REFERENCE_TABLE = 'TARGET_TABLE_NAME' AND generic_table.SERIAL_COLUMN >= ISNULL({NEXTOFFSET}, default_start_value) AND generic_table.ADDITIONAL_CRITERIA_COLUMN == '\\{NEXTOFFSET\\}'",
"Columns": [
"related_table.TARGET_COLUMN"
]
}
}
messageKeyExpression
The messageKeyExpression parameter is an optional, but recommended, parameter that can be used to ensure that you aren't faced with a unique constraint violation during your data sync. This violation could occur if both an insert and an update statement happened at nearly the same time. If you choose not to use the messageKeyExpression parameter, you could face data loss in your sync.
v5.6. :::
Each of your Event Listener message keys a message key. By default, this key is unique for every message in the queue.
When the worker processes your Event Listener messages it does so in batches and, for efficiency and to guarantee order, messages that contain the same key will not be processed in the same batch.
The messageKeyExpression property allows you to change the default message key to something else.
Example:
"MessageKeyExpression": "id"