Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 37 additions & 3 deletions singer-connectors/tap-mysql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,19 @@ The tap support two ways of consuming log events: using binlog coordinates or GT
binlog coordinates, when turning the `use_gtid` flag, you have to specify the engine flavor (mariadb/mysql) due to
how different are the GTID implementations in these two engines.

When enabling the `use_gtid` flag and the engine is MariaDB, the tap will dynamically infer the GTID pos from
existing binlog coordinate in the state, if the engine is mysql, it will fail.
**MySQL (`engine: mysql`):**
The tap tracks the full GTID set from `@@GLOBAL.gtid_executed`. All streams share a
single accumulated GTID set value in state. The set contains one entry per source UUID
seen during replication — a single UUID for standard single-source setups, or multiple
entries in multi-source or post-migration topologies. GTID mode requires at least one
prior full-table sync so that an initial GTID position exists in state — switching
directly from binlog coordinates to GTID is not supported for MySQL.

**MariaDB (`engine: mariadb`):**
The tap tracks a single-domain GTID per stream (format: `domain-serverid-sequence`).
MariaDB supports migration from existing binlog coordinates: if no GTID exists in state,
the tap will infer the starting GTID position from the stored binlog file and position
using `BINLOG_GTID_POS()`.

#### State when using binlog coordinates
```json
Expand All @@ -317,7 +328,30 @@ existing binlog coordinate in the state, if the engine is mysql, it will fail.
}
```

#### State when using GTID
#### State when using GTID (MySQL)

All streams hold the same GTID set value. The set contains one entry per source UUID
seen during replication and grows incrementally as events are consumed. A single-source
setup will have one UUID entry; a multi-source or post-migration setup will have more.

```json
{
"bookmarks": {
"example_db-table1": {"log_file": "mysql-binlog.0003", "log_pos": 3244,
"gtid": "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-599,3E11FA47-71BB-11E1-9E33-C80AA9429562:1-81"},
"example_db-table2": {"log_file": "mysql-binlog.0001", "log_pos": 42,
"gtid": "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-599,3E11FA47-71BB-11E1-9E33-C80AA9429562:1-81"},
"example_db-table3": {"log_file": "mysql-binlog.0003", "log_pos": 100,
"gtid": "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-599,3E11FA47-71BB-11E1-9E33-C80AA9429562:1-81"}
}
}
```

#### State when using GTID (MariaDB)

Each stream holds an independent single-domain GTID. Streams may hold different values
as they advance at their own pace.

```json
{
"bookmarks": {
Expand Down
124 changes: 89 additions & 35 deletions singer-connectors/tap-mysql/tap_mysql/sync_strategies/binlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.constants import FIELD_TYPE
from pymysqlreplication.event import RotateEvent, MariadbGtidEvent, GtidEvent
from pymysqlreplication.gtid import Gtid, GtidSet
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
Expand Down Expand Up @@ -132,16 +133,16 @@ def fetch_current_gtid_pos(
engine: str
) -> str:
"""
Find the given server's current GTID position.
Find the current GTID position for the given server.

The sever we're connected to can have a comma separated list of gtids (e.g from past server migrations),
the right gtid is the one with the same server ID as the given server ID.
For MySQL, returns the full @@GLOBAL.gtid_executed set covering all source UUIDs (whitespace-stripped).
For MariaDB, returns the single-domain GTID matching the connected server's domain-server ID.

Args:
mysql_conn: Mysql connection instance
engine: DB engine (mariadb/mysql)

Returns: Gtid position if found, otherwise raises exception
Returns: Full GTID set string (MySQL) or single-domain GTID (MariaDB), raises exception if not found
"""

if engine == connection.MARIADB_ENGINE:
Expand All @@ -165,6 +166,17 @@ def fetch_current_gtid_pos(
gtids = result[0]
LOGGER.debug('Found GTID(s): %s in server %s', gtids, server)

if engine != connection.MARIADB_ENGINE:
# Return the full GTID set (all sources), whitespace-stripped
full_set = re.sub(r'\s+', '', gtids)
if not full_set:
raise Exception(f'No suitable GTID was found for server {server}.')
LOGGER.info('Using full GTID set for state bookmark: %d source(s), length=%d chars',
len(full_set.split(',')), len(full_set))
LOGGER.debug('Full GTID set: %s', full_set)
return full_set
Comment thread
DrashtiChhatralia marked this conversation as resolved.

# MariaDB: find the entry matching this server's domain-serverid
gtid_to_use = None

for gtid in gtids.split(','):
Expand All @@ -173,22 +185,13 @@ def fetch_current_gtid_pos(
if not gtid:
continue

if engine != connection.MARIADB_ENGINE:
gtid_parts = gtid.split(':')

if len(gtid_parts) != 2:
continue
gtid_parts = gtid.split('-')

if gtid_parts[0] == server:
gtid_to_use = gtid
else:
gtid_parts = gtid.split('-')

if len(gtid_parts) != 3:
continue
if len(gtid_parts) != 3:
continue

if gtid_parts[1] == server:
gtid_to_use = gtid
if gtid_parts[1] == server:
gtid_to_use = gtid

if gtid_to_use:
LOGGER.info('Using GTID %s for state bookmark', gtid_to_use)
Expand All @@ -197,6 +200,38 @@ def fetch_current_gtid_pos(
raise Exception(f'No suitable GTID was found for server {server}.')


def normalize_gtid_state(gtid_value: str, engine: str) -> str:
"""
Upgrades a legacy single-GTID state value (e.g. uuid:100) to canonical GtidSet
range format (e.g. uuid:1-100). Also strips embedded whitespace (e.g. newlines
from @@GLOBAL.gtid_executed). No-op for MariaDB or empty values.
Falls back to the original string if parsing fails.
"""
if engine == connection.MARIADB_ENGINE or not gtid_value:
return gtid_value
try:
cleaned = re.sub(r'\s+', '', gtid_value)
return str(GtidSet(cleaned))
except Exception: # pylint: disable=broad-except
return gtid_value


def merge_gtid_into_set(current_gtid_set: str, new_gtid: str, engine: str) -> str:
"""
Merges a single GTID received from a GtidEvent into the accumulated GTID set
stored in state. For MariaDB, returns the raw new GTID (existing behaviour).
Falls back to current_gtid_set on merge failure to preserve the last known.
"""
if engine == connection.MARIADB_ENGINE:
return new_gtid
try:
gtid_set = GtidSet(current_gtid_set) if current_gtid_set else GtidSet('')
gtid_set.merge_gtid(Gtid(new_gtid))
return str(gtid_set)
except Exception: # pylint: disable=broad-except
return current_gtid_set


def json_bytes_to_string(data):
if isinstance(data, bytes):
return data.decode()
Expand Down Expand Up @@ -292,14 +327,16 @@ def calculate_gtid_bookmark(
engine: str
) -> str:
"""
Finds the earliest bookmarked gtid in the state
Finds the bookmarked GTID in the state to resume replication from.
For MySQL, returns the first GTID set found (all streams share the same value).
For MariaDB, returns the GTID with the lowest sequence number across streams.
Args:
mysql_conn: instance of MySqlConnection
binlog_streams_map: dictionary of selected streams
state: state dict with bookmarks
engine: the DB flavor mysql/mariadb

Returns: Min Gtid
Returns: GTID string to use as auto_position for the binlog reader
"""
min_gtid = None
min_seq_no = None
Expand All @@ -315,17 +352,14 @@ def calculate_gtid_bookmark(
if gtid:
if engine == connection.MARIADB_ENGINE:
gtid_seq_no = int(gtid.split('-')[2])
else:
gtid_interval = gtid.split(':')[1]

if '-' in gtid_interval:
gtid_seq_no = int(gtid_interval.split('-')[1])
else:
gtid_seq_no = int(gtid_interval)

if min_seq_no is None or gtid_seq_no < min_seq_no:
min_seq_no = gtid_seq_no
if min_seq_no is None or gtid_seq_no < min_seq_no:
min_seq_no = gtid_seq_no
min_gtid = gtid
else:
# All MySQL streams share the same GTID set — use the first one found
min_gtid = gtid
break
Comment thread
DrashtiChhatralia marked this conversation as resolved.

if not min_gtid:

Expand All @@ -351,7 +385,7 @@ def calculate_gtid_bookmark(
LOGGER.info('The inferred GTID is "%s", it will be used to resume replication',
min_gtid)
else:
LOGGER.info('The earliest bookmarked GTID found in the state is "%s", and will be used to resume replication',
LOGGER.info('The bookmarked GTID found in the state is "%s", and will be used to resume replication',
min_gtid)

return min_gtid
Expand Down Expand Up @@ -667,12 +701,10 @@ def _run_binlog_sync(
gtid_pos
)

elif isinstance(binlog_event, (MariadbGtidEvent, GtidEvent)):
gtid_pos = binlog_event.gtid
elif isinstance(binlog_event, GtidEvent):
gtid_pos = merge_gtid_into_set(gtid_pos, binlog_event.gtid, config['engine'])

LOGGER.debug('%s: gtid=%s',
binlog_event.__class__.__name__,
gtid_pos)
LOGGER.debug('GtidEvent: gtid_set=%s', gtid_pos)

state = update_bookmarks(state,
binlog_streams_map,
Expand All @@ -687,6 +719,20 @@ def _run_binlog_sync(
# consuming binlog from old GTID pos when connection to server is lost.
reader.auto_position = gtid_pos

elif isinstance(binlog_event, MariadbGtidEvent):
gtid_pos = binlog_event.gtid

LOGGER.debug('MariadbGtidEvent: gtid=%s', gtid_pos)

state = update_bookmarks(state,
binlog_streams_map,
log_file,
log_pos,
gtid_pos
)

reader.auto_position = gtid_pos

else:
time_extracted = utils.now()

Expand Down Expand Up @@ -895,6 +941,14 @@ def sync_binlog_stream(
log_file = log_pos = gtid = None

if config['use_gtid']:
# Upgrade any legacy single-GTID or whitespace-contaminated state values
for tap_stream_id in binlog_streams_map:
existing_gtid = state.get('bookmarks', {}).get(tap_stream_id, {}).get('gtid')
if existing_gtid:
normalized = normalize_gtid_state(existing_gtid, config['engine'])
if normalized != existing_gtid:
state = singer.write_bookmark(state, tap_stream_id, 'gtid', normalized)

gtid = calculate_gtid_bookmark(mysql_conn, binlog_streams_map, state, config['engine'])
else:
log_file, log_pos = calculate_bookmark(mysql_conn, binlog_streams_map, state)
Expand Down
20 changes: 17 additions & 3 deletions singer-connectors/tap-mysql/tests/integration/test_tap_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,10 @@ def test_binlog_stream_with_gtid(self):
config['use_gtid'] = True
config['engine'] = engine

# For MySQL, fetch_current_gtid_pos now returns the full GTID set
if engine == MYSQL_ENGINE:
self.assertIn(':', gtid)

self.state = singer.write_bookmark(self.state,
'tap_mysql_test-binlog_1',
'gtid',
Expand Down Expand Up @@ -1064,11 +1068,21 @@ def test_binlog_stream_with_gtid(self):

self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_1', 'log_file'))
self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_1', 'log_pos'))
self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_1', 'gtid'))

self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_2', 'log_file'))
self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_2', 'log_pos'))
self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_2', 'gtid'))

if engine == MYSQL_ENGINE:
gtid_1 = singer.get_bookmark(self.state, 'tap_mysql_test-binlog_1', 'gtid')
gtid_2 = singer.get_bookmark(self.state, 'tap_mysql_test-binlog_2', 'gtid')

# All MySQL streams share the same accumulated GTID set after sync
self.assertEqual(gtid_1, gtid_2)

# Value must be a valid GTID set format (uuid:interval)
self.assertIn(':', gtid_1)
else:
self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_1', 'gtid'))
self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_2', 'gtid'))

def test_binlog_stream_switching_from_binlog_to_gtid_with_mysql_fails(self):
global SINGER_MESSAGES
Expand Down
Loading