diff --git a/singer-connectors/tap-mysql/README.md b/singer-connectors/tap-mysql/README.md index b5f94e527..3e4e9f9a5 100644 --- a/singer-connectors/tap-mysql/README.md +++ b/singer-connectors/tap-mysql/README.md @@ -303,8 +303,19 @@ The tap support two ways of consuming log events: using binlog coordinates or GT binlog coordinates, when turning the `use_gtid` flag, you have to specify the engine flavor (mariadb/mysql) due to how different are the GTID implementations in these two engines. -When enabling the `use_gtid` flag and the engine is MariaDB, the tap will dynamically infer the GTID pos from -existing binlog coordinate in the state, if the engine is mysql, it will fail. +**MySQL (`engine: mysql`):** +The tap tracks the full GTID set from `@@GLOBAL.gtid_executed`. All streams share a +single accumulated GTID set value in state. The set contains one entry per source UUID +seen during replication — a single UUID for standard single-source setups, or multiple +entries in multi-source or post-migration topologies. GTID mode requires at least one +prior full-table sync so that an initial GTID position exists in state — switching +directly from binlog coordinates to GTID is not supported for MySQL. + +**MariaDB (`engine: mariadb`):** +The tap tracks a single-domain GTID per stream (format: `domain-serverid-sequence`). +MariaDB supports migration from existing binlog coordinates: if no GTID exists in state, +the tap will infer the starting GTID position from the stored binlog file and position +using `BINLOG_GTID_POS()`. #### State when using binlog coordinates ```json @@ -317,7 +328,30 @@ existing binlog coordinate in the state, if the engine is mysql, it will fail. } ``` -#### State when using GTID +#### State when using GTID (MySQL) + +All streams hold the same GTID set value. The set contains one entry per source UUID +seen during replication and grows incrementally as events are consumed. A single-source +setup will have one UUID entry; a multi-source or post-migration setup will have more. + +```json +{ + "bookmarks": { + "example_db-table1": {"log_file": "mysql-binlog.0003", "log_pos": 3244, + "gtid": "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-599,3E11FA47-71BB-11E1-9E33-C80AA9429562:1-81"}, + "example_db-table2": {"log_file": "mysql-binlog.0001", "log_pos": 42, + "gtid": "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-599,3E11FA47-71BB-11E1-9E33-C80AA9429562:1-81"}, + "example_db-table3": {"log_file": "mysql-binlog.0003", "log_pos": 100, + "gtid": "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-599,3E11FA47-71BB-11E1-9E33-C80AA9429562:1-81"} + } +} +``` + +#### State when using GTID (MariaDB) + +Each stream holds an independent single-domain GTID. Streams may hold different values +as they advance at their own pace. + ```json { "bookmarks": { diff --git a/singer-connectors/tap-mysql/tap_mysql/sync_strategies/binlog.py b/singer-connectors/tap-mysql/tap_mysql/sync_strategies/binlog.py index 6a75f334f..1b7221c51 100644 --- a/singer-connectors/tap-mysql/tap_mysql/sync_strategies/binlog.py +++ b/singer-connectors/tap-mysql/tap_mysql/sync_strategies/binlog.py @@ -17,6 +17,7 @@ from pymysqlreplication import BinLogStreamReader from pymysqlreplication.constants import FIELD_TYPE from pymysqlreplication.event import RotateEvent, MariadbGtidEvent, GtidEvent +from pymysqlreplication.gtid import Gtid, GtidSet from pymysqlreplication.row_event import ( DeleteRowsEvent, UpdateRowsEvent, @@ -132,16 +133,16 @@ def fetch_current_gtid_pos( engine: str ) -> str: """ - Find the given server's current GTID position. + Find the current GTID position for the given server. - The sever we're connected to can have a comma separated list of gtids (e.g from past server migrations), - the right gtid is the one with the same server ID as the given server ID. + For MySQL, returns the full @@GLOBAL.gtid_executed set covering all source UUIDs (whitespace-stripped). + For MariaDB, returns the single-domain GTID matching the connected server's domain-server ID. Args: mysql_conn: Mysql connection instance engine: DB engine (mariadb/mysql) - Returns: Gtid position if found, otherwise raises exception + Returns: Full GTID set string (MySQL) or single-domain GTID (MariaDB), raises exception if not found """ if engine == connection.MARIADB_ENGINE: @@ -165,6 +166,17 @@ def fetch_current_gtid_pos( gtids = result[0] LOGGER.debug('Found GTID(s): %s in server %s', gtids, server) + if engine != connection.MARIADB_ENGINE: + # Return the full GTID set (all sources), whitespace-stripped + full_set = re.sub(r'\s+', '', gtids) + if not full_set: + raise Exception(f'No suitable GTID was found for server {server}.') + LOGGER.info('Using full GTID set for state bookmark: %d source(s), length=%d chars', + len(full_set.split(',')), len(full_set)) + LOGGER.debug('Full GTID set: %s', full_set) + return full_set + + # MariaDB: find the entry matching this server's domain-serverid gtid_to_use = None for gtid in gtids.split(','): @@ -173,22 +185,13 @@ def fetch_current_gtid_pos( if not gtid: continue - if engine != connection.MARIADB_ENGINE: - gtid_parts = gtid.split(':') - - if len(gtid_parts) != 2: - continue + gtid_parts = gtid.split('-') - if gtid_parts[0] == server: - gtid_to_use = gtid - else: - gtid_parts = gtid.split('-') - - if len(gtid_parts) != 3: - continue + if len(gtid_parts) != 3: + continue - if gtid_parts[1] == server: - gtid_to_use = gtid + if gtid_parts[1] == server: + gtid_to_use = gtid if gtid_to_use: LOGGER.info('Using GTID %s for state bookmark', gtid_to_use) @@ -197,6 +200,38 @@ def fetch_current_gtid_pos( raise Exception(f'No suitable GTID was found for server {server}.') +def normalize_gtid_state(gtid_value: str, engine: str) -> str: + """ + Upgrades a legacy single-GTID state value (e.g. uuid:100) to canonical GtidSet + range format (e.g. uuid:1-100). Also strips embedded whitespace (e.g. newlines + from @@GLOBAL.gtid_executed). No-op for MariaDB or empty values. + Falls back to the original string if parsing fails. + """ + if engine == connection.MARIADB_ENGINE or not gtid_value: + return gtid_value + try: + cleaned = re.sub(r'\s+', '', gtid_value) + return str(GtidSet(cleaned)) + except Exception: # pylint: disable=broad-except + return gtid_value + + +def merge_gtid_into_set(current_gtid_set: str, new_gtid: str, engine: str) -> str: + """ + Merges a single GTID received from a GtidEvent into the accumulated GTID set + stored in state. For MariaDB, returns the raw new GTID (existing behaviour). + Falls back to current_gtid_set on merge failure to preserve the last known. + """ + if engine == connection.MARIADB_ENGINE: + return new_gtid + try: + gtid_set = GtidSet(current_gtid_set) if current_gtid_set else GtidSet('') + gtid_set.merge_gtid(Gtid(new_gtid)) + return str(gtid_set) + except Exception: # pylint: disable=broad-except + return current_gtid_set + + def json_bytes_to_string(data): if isinstance(data, bytes): return data.decode() @@ -292,14 +327,16 @@ def calculate_gtid_bookmark( engine: str ) -> str: """ - Finds the earliest bookmarked gtid in the state + Finds the bookmarked GTID in the state to resume replication from. + For MySQL, returns the first GTID set found (all streams share the same value). + For MariaDB, returns the GTID with the lowest sequence number across streams. Args: mysql_conn: instance of MySqlConnection binlog_streams_map: dictionary of selected streams state: state dict with bookmarks engine: the DB flavor mysql/mariadb - Returns: Min Gtid + Returns: GTID string to use as auto_position for the binlog reader """ min_gtid = None min_seq_no = None @@ -315,17 +352,14 @@ def calculate_gtid_bookmark( if gtid: if engine == connection.MARIADB_ENGINE: gtid_seq_no = int(gtid.split('-')[2]) - else: - gtid_interval = gtid.split(':')[1] - if '-' in gtid_interval: - gtid_seq_no = int(gtid_interval.split('-')[1]) - else: - gtid_seq_no = int(gtid_interval) - - if min_seq_no is None or gtid_seq_no < min_seq_no: - min_seq_no = gtid_seq_no + if min_seq_no is None or gtid_seq_no < min_seq_no: + min_seq_no = gtid_seq_no + min_gtid = gtid + else: + # All MySQL streams share the same GTID set — use the first one found min_gtid = gtid + break if not min_gtid: @@ -351,7 +385,7 @@ def calculate_gtid_bookmark( LOGGER.info('The inferred GTID is "%s", it will be used to resume replication', min_gtid) else: - LOGGER.info('The earliest bookmarked GTID found in the state is "%s", and will be used to resume replication', + LOGGER.info('The bookmarked GTID found in the state is "%s", and will be used to resume replication', min_gtid) return min_gtid @@ -667,12 +701,10 @@ def _run_binlog_sync( gtid_pos ) - elif isinstance(binlog_event, (MariadbGtidEvent, GtidEvent)): - gtid_pos = binlog_event.gtid + elif isinstance(binlog_event, GtidEvent): + gtid_pos = merge_gtid_into_set(gtid_pos, binlog_event.gtid, config['engine']) - LOGGER.debug('%s: gtid=%s', - binlog_event.__class__.__name__, - gtid_pos) + LOGGER.debug('GtidEvent: gtid_set=%s', gtid_pos) state = update_bookmarks(state, binlog_streams_map, @@ -687,6 +719,20 @@ def _run_binlog_sync( # consuming binlog from old GTID pos when connection to server is lost. reader.auto_position = gtid_pos + elif isinstance(binlog_event, MariadbGtidEvent): + gtid_pos = binlog_event.gtid + + LOGGER.debug('MariadbGtidEvent: gtid=%s', gtid_pos) + + state = update_bookmarks(state, + binlog_streams_map, + log_file, + log_pos, + gtid_pos + ) + + reader.auto_position = gtid_pos + else: time_extracted = utils.now() @@ -895,6 +941,14 @@ def sync_binlog_stream( log_file = log_pos = gtid = None if config['use_gtid']: + # Upgrade any legacy single-GTID or whitespace-contaminated state values + for tap_stream_id in binlog_streams_map: + existing_gtid = state.get('bookmarks', {}).get(tap_stream_id, {}).get('gtid') + if existing_gtid: + normalized = normalize_gtid_state(existing_gtid, config['engine']) + if normalized != existing_gtid: + state = singer.write_bookmark(state, tap_stream_id, 'gtid', normalized) + gtid = calculate_gtid_bookmark(mysql_conn, binlog_streams_map, state, config['engine']) else: log_file, log_pos = calculate_bookmark(mysql_conn, binlog_streams_map, state) diff --git a/singer-connectors/tap-mysql/tests/integration/test_tap_mysql.py b/singer-connectors/tap-mysql/tests/integration/test_tap_mysql.py index ca7410302..9095c21a2 100644 --- a/singer-connectors/tap-mysql/tests/integration/test_tap_mysql.py +++ b/singer-connectors/tap-mysql/tests/integration/test_tap_mysql.py @@ -1018,6 +1018,10 @@ def test_binlog_stream_with_gtid(self): config['use_gtid'] = True config['engine'] = engine + # For MySQL, fetch_current_gtid_pos now returns the full GTID set + if engine == MYSQL_ENGINE: + self.assertIn(':', gtid) + self.state = singer.write_bookmark(self.state, 'tap_mysql_test-binlog_1', 'gtid', @@ -1064,11 +1068,21 @@ def test_binlog_stream_with_gtid(self): self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_1', 'log_file')) self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_1', 'log_pos')) - self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_1', 'gtid')) - self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_2', 'log_file')) self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_2', 'log_pos')) - self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_2', 'gtid')) + + if engine == MYSQL_ENGINE: + gtid_1 = singer.get_bookmark(self.state, 'tap_mysql_test-binlog_1', 'gtid') + gtid_2 = singer.get_bookmark(self.state, 'tap_mysql_test-binlog_2', 'gtid') + + # All MySQL streams share the same accumulated GTID set after sync + self.assertEqual(gtid_1, gtid_2) + + # Value must be a valid GTID set format (uuid:interval) + self.assertIn(':', gtid_1) + else: + self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_1', 'gtid')) + self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_2', 'gtid')) def test_binlog_stream_switching_from_binlog_to_gtid_with_mysql_fails(self): global SINGER_MESSAGES diff --git a/singer-connectors/tap-mysql/tests/unit/sync_strategies/test_binlog.py b/singer-connectors/tap-mysql/tests/unit/sync_strategies/test_binlog.py index c43757e02..f394307c7 100644 --- a/singer-connectors/tap-mysql/tests/unit/sync_strategies/test_binlog.py +++ b/singer-connectors/tap-mysql/tests/unit/sync_strategies/test_binlog.py @@ -1856,13 +1856,48 @@ def test_fetch_current_log_file_and_pos_fail_if_no_result(self, connect_with_bac @patch('tap_mysql.sync_strategies.binlog.connection.fetch_server_uuid') @patch('tap_mysql.sync_strategies.binlog.connect_with_backoff') - def test_fetch_current_gtid_pos_for_mysql_not_found_expect_exception( + def test_fetch_current_gtid_pos_for_mysql_returns_full_set_regardless_of_server_uuid( self, connect_with_backoff, fetch_server_uuid): mysql_con = MagicMock(spec_set=MySQLConnection).return_value cur_mock = MagicMock(spec_set=Cursor).return_value + # MySQL returns @@GLOBAL.gtid_executed with \n between entries — mimics real server output cur_mock.__enter__.return_value.fetchone.side_effect = [ - ['3E11FA47-71CA-11E1-9E21-C80AA9429562:1,3E11FA47-71BB-11E1-9E33-C80AA9429562:2:143,0-3-1123,,' - '3E11FA47-71CA-11E1-9E33-C80AA9429562:2:332'], + ['3E11FA47-71CA-11E1-9E21-C80AA9429562:1,\n3E11FA47-71BB-11E1-9E33-C80AA9429562:2:143,\n' + '0-3-1123,,3E11FA47-71CA-11E1-9E33-C80AA9429562:2:332'], + ] + + mysql_con.__enter__.return_value.cursor.return_value = cur_mock + + connect_with_backoff.return_value = mysql_con + fetch_server_uuid.return_value = '3E11FA47-71CA-11E1-9E33-C80AA9429562' + + result = binlog.fetch_current_gtid_pos(mysql_con, connection.MYSQL_ENGINE) + + # Embedded \n must be stripped from the result + self.assertNotIn('\n', result) + + # All UUIDs must be present — no filtering by server UUID + self.assertIn('3E11FA47-71CA-11E1-9E21-C80AA9429562', result) + self.assertIn('3E11FA47-71BB-11E1-9E33-C80AA9429562', result) + self.assertIn('3E11FA47-71CA-11E1-9E33-C80AA9429562', result) + + connect_with_backoff.assert_called_with(mysql_con) + fetch_server_uuid.assert_called_with(mysql_con) + cur_mock.__enter__.return_value.execute.assert_has_calls( + [ + call('select @@GLOBAL.gtid_executed;'), + ] + ) + + @patch('tap_mysql.sync_strategies.binlog.connection.fetch_server_uuid') + @patch('tap_mysql.sync_strategies.binlog.connect_with_backoff') + def test_fetch_current_gtid_pos_for_mysql_empty_gtid_executed_expect_exception( + self, connect_with_backoff, fetch_server_uuid): + mysql_con = MagicMock(spec_set=MySQLConnection).return_value + cur_mock = MagicMock(spec_set=Cursor).return_value + # @@GLOBAL.gtid_executed returns empty string — fresh server with no committed transactions + cur_mock.__enter__.return_value.fetchone.side_effect = [ + [''] ] mysql_con.__enter__.return_value.cursor.return_value = cur_mock @@ -1870,9 +1905,11 @@ def test_fetch_current_gtid_pos_for_mysql_not_found_expect_exception( connect_with_backoff.return_value = mysql_con fetch_server_uuid.return_value = '3E11FA47-71CA-11E1-9E33-C80AA9429562' - with self.assertRaises(Exception): + with self.assertRaises(Exception) as context: binlog.fetch_current_gtid_pos(mysql_con, connection.MYSQL_ENGINE) + self.assertIn('No suitable GTID was found for server', str(context.exception)) + connect_with_backoff.assert_called_with(mysql_con) fetch_server_uuid.assert_called_with(mysql_con) cur_mock.__enter__.return_value.execute.assert_has_calls( @@ -1900,7 +1937,11 @@ def test_fetch_current_gtid_pos_for_mysql_succeeds( result = binlog.fetch_current_gtid_pos(mysql_con, connection.MYSQL_ENGINE) - self.assertEqual('3E11FA47-71CA-11E1-9E33-C80AA9429562:1', result) + self.assertEqual( + '3E11FA47-71CA-11E1-9E33-C80AA9429562:1,3E11FA47-71BB-11E1-9E33-C80AA9429562:2:143,0-3-1123,,' + '3E11FA47-71CA-11E1-9E33-C80AA9429562:2:332', + result + ) connect_with_backoff.assert_called_with(mysql_con) fetch_server_uuid.assert_called_with(mysql_con) @@ -1970,6 +2011,80 @@ def test_fetch_current_gtid_pos_no_gtid_found_for_given_server_expect_exception( ] ) + def test_normalize_gtid_state(self): + # MariaDB — no-op, returned as-is regardless of content + self.assertEqual('0-123-556', binlog.normalize_gtid_state('0-123-556', connection.MARIADB_ENGINE)) + + # empty value — no-op + self.assertEqual('', binlog.normalize_gtid_state('', connection.MYSQL_ENGINE)) + + # strips embedded newlines left by @@GLOBAL.gtid_executed + contaminated = '3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5291,\n3E11FA47-71BB-11E1-9E33-C80AA9429562:1-81' + result = binlog.normalize_gtid_state(contaminated, connection.MYSQL_ENGINE) + self.assertNotIn('\n', result) + self.assertIn('3E11FA47-71CA-11E1-9E33-C80AA9429562', result) + self.assertIn('3E11FA47-71BB-11E1-9E33-C80AA9429562', result) + + # unparseable value — original string returned unchanged (fallback guard) + self.assertEqual('not-a-valid-gtid', binlog.normalize_gtid_state('not-a-valid-gtid', connection.MYSQL_ENGINE)) + + def test_merge_gtid_into_set(self): + # MariaDB — accumulated set is ignored, raw incoming GTID is returned + self.assertEqual('0-123-556', binlog.merge_gtid_into_set('0-123-550', '0-123-556', connection.MARIADB_ENGINE)) + + # MySQL — second source UUID is added to the set rather than dropped + result = binlog.merge_gtid_into_set( + '3E11FA47-71CA-11E1-9E33-C80AA9429562:1-100', + '3E11FA47-71BB-11E1-9E33-C80AA9429562:1', + connection.MYSQL_ENGINE + ) + self.assertIn('3E11FA47-71CA-11E1-9E33-C80AA9429562', result) + self.assertIn('3E11FA47-71BB-11E1-9E33-C80AA9429562', result) + + # MySQL — parse failure preserves the accumulated set, NOT new_gtid + current = '3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5291,3E11FA47-71BB-11E1-9E33-C80AA9429562:1-81' + self.assertEqual(current, binlog.merge_gtid_into_set(current, 'not-valid', connection.MYSQL_ENGINE)) + + @patch('tap_mysql.sync_strategies.binlog.fetch_current_log_file_and_pos', + return_value=('binlog0001', 100)) + @patch('tap_mysql.sync_strategies.binlog.BinLogStreamReader', autospec=True) + @patch('tap_mysql.sync_strategies.binlog.make_connection_wrapper') + @patch('tap_mysql.sync_strategies.binlog.calculate_gtid_bookmark', + return_value='3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5291,' + '3E11FA47-71BB-11E1-9E33-C80AA9429562:1-81') + def test_sync_binlog_stream_normalizes_gtid_state_before_bookmark_lookup( + self, calculate_gtid_bookmark_mock, make_connection_wrapper_mock, reader_mock, *args): + + uuid_a = '3E11FA47-71CA-11E1-9E33-C80AA9429562' + uuid_b = '3E11FA47-71BB-11E1-9E33-C80AA9429562' + + config = {'server_id': '123', 'use_gtid': True, 'engine': 'mysql'} + mysql_con = Mock(spec_set=MySQLConnection) + binlog_streams_map = {'my_db-stream1': {}} + + reader_mock.return_value.__iter__ = lambda _: iter([]) + reader_mock.return_value.auto_position = f'{uuid_a}:1-5291,{uuid_b}:1-81' + + # Scenario A: contaminated state — \n must be stripped before calculate_gtid_bookmark reads it + state = {'bookmarks': {'my_db-stream1': {'gtid': f'{uuid_a}:1-5291,\n{uuid_b}:1-81'}}} + + binlog.sync_binlog_stream(mysql_con, config, binlog_streams_map, state) + + state_passed = calculate_gtid_bookmark_mock.call_args[0][2] + self.assertNotIn('\n', state_passed['bookmarks']['my_db-stream1']['gtid']) + self.assertIn(uuid_a, state_passed['bookmarks']['my_db-stream1']['gtid']) + self.assertIn(uuid_b, state_passed['bookmarks']['my_db-stream1']['gtid']) + + # Scenario B: clean state — must reach calculate_gtid_bookmark unchanged + calculate_gtid_bookmark_mock.reset_mock() + clean_gtid = f'{uuid_a}:1-5291,{uuid_b}:1-81' + state = {'bookmarks': {'my_db-stream1': {'gtid': clean_gtid}}} + + binlog.sync_binlog_stream(mysql_con, config, binlog_streams_map, state) + + state_passed = calculate_gtid_bookmark_mock.call_args[0][2] + self.assertEqual(clean_gtid, state_passed['bookmarks']['my_db-stream1']['gtid']) + def test_calculate_gtid_bookmark_for_mariadb_returns_earliest(self): binlog_streams = { @@ -2117,7 +2232,7 @@ def test_calculate_gtid_bookmark_for_mariadb_no_gtid_nor_binlog_found_expect_exc self.assertEqual("No binlog coordinates in state to infer gtid position! Cannot resume logical replication", str(context.exception)) - def test_calculate_gtid_bookmark_for_mysql_returns_earliest(self): + def test_calculate_gtid_bookmark_for_mysql_returns_first_found(self): binlog_streams = { 'stream1': {'schema': {}}, @@ -2139,7 +2254,8 @@ def test_calculate_gtid_bookmark_for_mysql_returns_earliest(self): mysql_conn = Mock(spec_set=MySQLConnection) result = binlog.calculate_gtid_bookmark(mysql_conn, binlog_streams, state, connection.MYSQL_ENGINE) - self.assertEqual(result, '3E11FA47-71CA-11E1-9E33-C80AA9429562:1-2') + # MySQL streams all share the same GTID set — first bookmark found is returned immediately + self.assertEqual(result, '3E11FA47-71CA-11E1-9E33-C80AA9429562:1-165') def test_calculate_gtid_bookmark_for_mysql_no_gtid_found_expect_exception(self):