Skip to main content

Data Polling

Overview

Version 5.4 of the Cinchy platform introduced data polling, which uses the Cinchy Event Listener to continuously monitor and sync data entries from your SQLServer or DB2 server into your Cinchy table. This capability makes data polling a much easier, effective, and streamlined process and avoids implementing the complex orchestration logic that was previous necessary.

The Listener Configuration

To set up an Stream Source, you must set up a Listener Config. In Cinchy v5.7+ this can be done directly in the Connections UI, however for multiple listener requirements you must still add additional configurations in the Listener Config table.

General Parameters

ParameterDescriptionExample
Auto Offset ResetEarliest, Latest or None.
In the case where the listener is started and either there is no last message ID, or when the last message ID is invalid (due to it being deleted or it's just a new listener), it will use this column as a fallback to determine where to start reading events from.
Earliest will start reading from the beginning on the queue (when the CDC was enabled on the table). This might be a suggested configuration if your use case is recoverable or re-runnable and if you need to reprocess all events to ensure accuracy.
Latest will fetch the last value after whatever was last processed. This is the typical configuration.
None won't read or start reading any events.
You are able to switch between Auto Offset Reset types after your initial configuration through the process outlined here.
Latest

Topic

The below table can be used to help create your Topic JSON needed to set up a real-time sync. All examples are sourced from the Data polling examples section

ParameterDescriptionExample
CursorConfigurationMandatory parameter for basic query. Use a sub-query if "CursorColumn" is not unique.SELECT o.OrderId, o.OrderDate, o.CustomerId, o.OrderLastUpdated, o.OrderStatus FROM Orders o WHERE o.Region = 'North America' AND o.OrderLastUpdated > @TheLastTimePolled
FromClause*Mandatory. This must contain at least the table name but can also contain Joined tables."Orders" in CursorConfiguration; "TransactionLog" in ReturnDataConfiguration
CursorColumnMandatory. Used in 'WHERE' conditions and query sorting."OrderLastUpdated" in Orders table; "Id" in TransactionLog table
BatchSizeMandatory. Minimum data batch size per query.100 (as seen in JSON configurations)
FilterCondition*Filters used in 'WHERE' conditions."Region = 'North America'" in Orders table.
ColumnsMandatory. List of result columns.["OrderId", "OrderDate", "CustomerId", "OrderLastUpdated", "OrderStatus"] in Orders table; ["Id", "Table", "RecordChangedId"] in TransactionLog table
ReturnDataConfigurationUsed in complex queries. Relevant parameters are outside first "(" and last ")".SELECT ol.OrderLineId, ol.OrderId, ol.ItemId, ol.Quantity, ol.Price, ol.LineStatus FROM OrderLines ol INNER JOIN Orders o ON o.OrderId = ol.OrderId WHERE o.OrderLastUpdated > @TheLastTimePolled
CursorAliasMandatory. Alias for subquery result table."o" in ReturnDataConfiguration for Orders table
JoinClauseMandatory. Join condition for result table."Orders o ON o.OrderId = tl.RecordChangedId" for TransactionLog; "OrderLines ol ON ol.OrderId = o.OrderId" for OrderLines
FilterConditionFilters used in 'WHERE' conditions."ol.LineStatus != 'Cancelled'" in ReturnDataConfiguration for OrderLines
OrderByClauseMandatory. Sort order of final result."tl.Id, o.OrderId" for TransactionLog; "o.OrderId, ol.OrderLineId" for OrderLines
ColumnsMandatory. List of result columns.["o.OrderId", "o.OrderDate", "o.CustomerId", "o.OrderLastUpdated", "o.OrderStatus"] in Orders table; ["ol.OrderLineId", "ol.OrderId", "ol.ItemId", "ol.Quantity", "ol.Price", "ol.LineStatus"] in OrderLines
DelayMandatory. Delay in seconds between data sync cycles.10 (as seen in JSON configurations)
messageKeyExpressionOptional. Mitigates data loss. See Appendix A."id" (suggested in Appendix A)
CursorConfiguration.CursorColumnDataTypeMandatory. This property works in tandem with an update that ensures that the database query always moves the offset, regardless of if the query returned the records or not—this helps to ensure that the performance of the source database isn't being weighed down by constantly running heavy queries on a wide range of records when the queries returned no data. This value of this mandatory property must match the column type of the source database system for proper casting of parameters.int
CursorConfiguration.DistinctMandatory. This property is a true/false Boolean type that, when set to true, applies a distinct clause on your query to avoid any duplicate records.true

*NEXTOFFSET and MAXOFFSET reserved keyword values

Cinchy has added support for {'NEXTOFFSET'} and {'MAXOFFSET'} reserved keyword values to narrow the search window of the query for both the FromClause and FilterCondition parameters. These values help to create a focused range for data polls using unique CursorColumn values to reduce the execution time of the query. You can read more about these values here

Connection Attributes

ParameterDescriptionExample
databaseTypeMandatory. TSQL or DB2TSQL
connectionStringMandatory. This should be the connection string for your data source."Server=;Database=;User ID=cinchy;password=example;Trusted_Connection=False;Connection Timeout=30;Min Pool Size=10;"

Example:

{
"databaseType": "TSQL",
"connectionString": "Server=;Database=;User ID=cinchy;password=example;Trusted_Connection=False;Connection Timeout=30;Min Pool Size=10;",
}

Examples

This section provides detailed examples and configurations for implementing data polling in Cinchy, specifically focusing on scenarios involving MSSQL and Aurora DBs. It shows how to effectively monitor and track changes in database tables through various polling techniques and configurations.

This section has three examples:

  1. CursorConfiguration Example: Demonstrates how to set up and utilize cursor-based configurations to track and poll data changes in the Orders table.
  2. ReturnDataConfiguration Example: Provides insights on configuring data return mechanisms. Using an OrderLines table example, it shows how to gather relevant data efficiently.
  3. CursorColumn Types: Discusses different types of cursor columns like date and identity columns, using the TransactionLog table.

CursorConfiguration example

The Orders table is structured with the following columns:

OrderIdOrderDateCustomerIdOrderLastUpdatedOrderStatus
12023-01-01 14:00102023-01-01 14:10Order being picked
22023-01-01 14:03202023-01-01 14:03Order placed
  • OrderId: Unique identifier for each order.
  • OrderDate: Represents the date and time when the order was initially placed.
  • CustomerId: Identifies the customer associated with the order.
  • OrderLastUpdated: Records the date and time of the most recent update to the order, reflecting changes such as order line item shipments, quantity adjustments, or cancellations.
  • OrderStatus: Indicates the current lifecycle stage of the order, which can change over time.

The OrderDate remains constant once set during the order placement. To track changes to the order or its status, the OrderLastUpdated column is utilized. Whenever an order is initially placed or undergoes updates, this column reflects the latest date and time.

The OrderLastUpdated column serves as a reliable forward-moving cursor for detecting new orders and capturing changes to the OrderStatus. This ensures efficient monitoring and tracking of the evolving lifecycle stages of each order in the system.

SQL query

The SQL query to identify Orders inserted or updated since the last polling time:

SELECT
o.OrderId,
o.OrderDate,
o.CustomerId,
o.OrderLastUpdated,
o.OrderStatus
FROM Orders o
WHERE
o.Region = 'North America'
AND o.OrderLastUpdated > @TheLastTimePolled

Listener config topic JSON

The corresponding [Cinchy].[Listener Config].[Topic] configuration:

{
"CursorConfiguration": {
"FromClause": "Orders",
"CursorColumn": "OrderLastUpdated",
"BatchSize": 100,
"FilterCondition": "Region = 'North America'",
"Columns": [
"OrderId",
"OrderDate",
"CustomerId",
"OrderLastUpdated",
"OrderStatus"
]
},
"Delay": 10
}

ReturnDataConfiguration Example

The OrderLines table represents line item records of an order with the following columns:

OrderLineIdOrderIdItemIdQuantityPriceLineStatus
114568.49Cancelled
2197112.68Picked
3110249.87Backordered
4234314.73To be picked
5288101.99To be picked
  • OrderLineId: Unique identifier for each order line.
  • OrderId: Identifies the order to which the line item belongs.
  • ItemId: Identifier for the item in the line.
  • Quantity: Represents the quantity of the item in the order.
  • Price: Indicates the unit price of the item.
  • LineStatus: Reflects the status of the order line, such as Cancelled, Picked, or Backordered.

Unlike the Orders table, the OrderLines table lacks a specific column for detecting changes or updates to individual line items. However, changes to the order, including its line items, trigger an update to the Orders.OrderLastUpdated column. Leveraging the relationship between OrderLines and Orders, we can use the Orders.OrderLastUpdated column to identify inserted or updated OrderLines.

SQL query

The SQL query to identify OrderLines inserted or updated since the last polling time:

SELECT
ol.OrderLineId,
ol.OrderId,
ol.ItemId,
ol.Quantity,
ol.Price,
ol.LineStatus
FROM OrderLines ol
INNER JOIN Orders o ON o.OrderId = ol.OrderId
WHERE
o.OrderLastUpdated > @TheLastTimePolled

Listener config topic JSON

To poll data from the OrderLines table, the corresponding [Cinchy].[Listener Config].[Topic] includes both a CursorConfiguration object, which identifies the table being polled to detect changes, and a ReturnDataConfiguration object, which represents the data you want to collect. The configuration is the following:

{
"CursorConfiguration": {
"FromClause": "Orders",
"CursorColumn": "OrderLastUpdated",
"BatchSize": 100,
"FilterCondition": "Region = 'North America'",
"Columns": [
"OrderId",
"OrderDate",
"CustomerId",
"OrderLastUpdated",
"OrderStatus"
]
},
"ReturnDataConfiguration": {
"CursorAlias": "o",
"JoinClause": "OrderLines ol ON ol.OrderId = o.OrderId",
"FilterCondition": "ol.LineStatus != 'Cancelled'",
"OrderByClause": "o.OrderId, ol.OrderLineId",
"Columns": [
"ol.OrderLineId",
"ol.OrderId",
"ol.ItemId",
"ol.Quantity",
"ol.Price",
"ol.LineStatus"
]
},
"Delay": 10
}

CursorColumn Types

A data polling cursor can be a Date column like in the previous example, or it can be an identity column where an insert always represents the latest change to that table. Consider the following TransactionLog table example:

IdTableRecordChangedIdChangeDateChangeTime
1Orders12023-01-0114:00
2OrderLines12023-01-0114:00
3OrderLines22023-01-0114:00
4OrderLines32023-01-0114:00
5Orders22023-01-0114:03
6OrderLines42023-01-0114:03
7OrderLines52023-01-0114:03
8Orders12023-01-0114:10

In this example, data is only inserted into TransactionLog and updates never occur. The forward-moving (and likely indexed) Id column would be an efficient and reliable data polling cursor to use.

SQL query

This alternative SQL query incorporates change polling from a TransactionLog table for our Orders example:

SELECT
o.OrderId,
o.OrderDate,
o.CustomerId,
o.OrderLastUpdated,
o.OrderStatus
FROM Orders o
INNER JOIN TransactionLog tl
ON tl.RecordChangedId = o.OrderId
AND tl.Id > @TheLastCursorValuePolled
WHERE
o.Region = 'North America'
AND tl.Table = 'Orders'

Listener config topic JSON

The corresponding [Cinchy].[Listener Config].[Topic] configuration, which introduces a ReturnDataConfiguration to the Orders table:

{
"CursorConfiguration": {
"FromClause": "TransactionLog",
"CursorColumn": "Id",
"BatchSize": 100,
"FilterCondition": "",
"Columns": ["Id", "Table", "RecordChangedId"]
},
"ReturnDataConfiguration": {
"CursorAlias": "tl",
"JoinClause": "Orders o ON o.OrderId = tl.RecordChangedId",
"FilterCondition": "o.Region = 'North America' AND tl.Table = 'Orders'",
"OrderByClause": "tl.Id, o.OrderId",
"Columns": [
"o.OrderId",
"o.OrderDate",
"o.CustomerId",
"o.OrderLastUpdated",
"o.OrderStatus"
]
},
"Delay": 10
}

Appendix A

Optional Configurations

There are a few optional configurations that you can enable in your appsettings.json deployment file for the Data Polling source. These are detailed below.

DataPollingConcurrencyIndex: This property allows only a certain number of threads to run queries against the source database, which works to reduce the load against the database.

  • The default number of threads is set to 12.
  • To configure this property, navigate to your appSettings.json deployment file > "DataPollingConcurrencyIndex": <numberOfThreads>

QueueWriteConcurrencyIndex: This property allows only a certain number of threads to be concurrently sending messages to the queue. This works to provide a more consistent batching by the worker and reduce your batching errors. run queries against the source database, which works to reduce the load against the database.

  • The default number of threads is set to 12.
  • To configure this property, navigate to your appSettings.json deployment file > "QueueWriteConcurrencyIndex": <numberOfThreads>.
  • Note that this index is shared across all listener configs, meaning that if it's set to 1 only one listener config will be pushing the messages to the queue at a single moment in time.

AppSettings JSON Examples

// App Settings JSON Example
// Example of the configurable properties: DataPollingConcurrencyIndex (set to "1" and QueueWriteConcurrencyIndex (set to "1")
"AppSettings": {
"GetNewListenerConfigsInterval": "",
"StateFileWriteDelaySeconds": "",
"KafkaClientConfig": {
"BootstrapServers": ""
},
"KafkaRealtimeDatasyncTopic": "",
"KafkaJobCancellationTopic": "",
"DataPollingConcurrencyIndex": 1,
"QueueWriteConcurrencyIndex": 1
}

Appendix B

NEXTOFFSET and MAXOFFSET

Cinchy has added support for {'NEXTOFFSET'} and {'MAXOFFSET'} reserved keyword values to narrow the search window of the query for both the FromClause and FilterCondition parameters. These values help to create a focused range for data polls using unique CursorColumn values to reduce the execution time of the query.

These values can be used to filter records from batch queries.

MAXOFFSET: Returns the maximum value of the CursorColumn value from the query.

NEXTOFFSET: Returns the highest CursorColumn value from queries that have already been processed.

If using {'NEXTOFFSET'}, it's recommended to use it with ISNULL to avoid situations where the {'NEXTOFFSET'} value isn't provided.

success

Please make sure your ISNULL check matches the database language you use.

ISNULL example

//ISNULL TSQL/DB2 example
ISNULL({NEXTOFFSET}, 40000000);
caution

ISNULL might also affect query performance. If you are confident that you don't require an ISNULL check, it's recommended to remove a ISNULL check.

Escaped reserved keywords

If you use {'NEXTOFFSET'} or {'MAXOFFSET'} as CursorColumn values, you must escape the CursorColumn value to differentiate it from the actual {NEXTOFFSET} reserved keyword.

Escaped reserved keywords example

The following generic JSON schema shows the use of both {NEXTOFFSET} as both a value and a reserved keyword.

{
"CursorConfiguration": {
"BatchSize": 1000,
"CursorColumn": "generic_table.SERIAL_COLUMN",
"Distinct": true,
"CursorColumnDataType": "INTEGER",
"FromClause": "SCHEMA_NAME.GENERIC_TABLE generic_table INNER JOIN SCHEMA_NAME.RELATED_TABLE related_table ON related_table.RELATED_SERIAL = generic_table.SERIAL_COLUMN AND generic_table.SERIAL_COLUMN >= ISNULL({NEXTOFFSET}, default_start_value)",
"FilterCondition": "related_table.CRITERIA_COLUMN != 'Exclusion_Criteria' AND related_table.REFERENCE_TABLE = 'TARGET_TABLE_NAME' AND generic_table.SERIAL_COLUMN >= ISNULL({NEXTOFFSET}, default_start_value) AND generic_table.ADDITIONAL_CRITERIA_COLUMN == '\\{NEXTOFFSET\\}'",
"Columns": [
"related_table.TARGET_COLUMN"
]
}
}

messageKeyExpression

The messageKeyExpression parameter is an optional, but recommended, parameter that can be used to ensure that you aren't faced with a unique constraint violation during your data sync. This violation could occur if both an insert and an update statement happened at nearly the same time. If you choose not to use the messageKeyExpression parameter, you could face data loss in your sync.

This parameter was added to the Data Polling event stream in Cinchy

v5.6. :::

Each of your Event Listener message keys a message key. By default, this key is unique for every message in the queue.

When the worker processes your Event Listener messages it does so in batches and, for efficiency and to guarantee order, messages that contain the same key will not be processed in the same batch.

The messageKeyExpression property allows you to change the default message key to something else.

Example:

  "MessageKeyExpression": "id"