Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (

require (
filippo.io/edwards25519 v1.1.1 // indirect
github.com/coreos/go-semver v0.3.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/klauspost/compress v1.18.0 // indirect
Expand All @@ -32,3 +33,5 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/pingcap/tidb/pkg/parser => github.com/morgo/tidb/pkg/parser v0.0.0-20260409131709-d591801319ba
Comment on lines +36 to +37
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The committed replace pins github.com/pingcap/tidb/pkg/parser to a personal fork. This makes builds depend on a non-upstream module path/commit and can break reproducibility and supply-chain expectations. Consider waiting for the upstream TiDB parser change to be released and then updating the require version (removing the replace), or alternatively vendor/fork under an org-controlled module path with a clear upgrade plan and tracking issue.

Suggested change
replace github.com/pingcap/tidb/pkg/parser => github.com/morgo/tidb/pkg/parser v0.0.0-20260409131709-d591801319ba

Copilot uses AI. Check for mistakes.
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ github.com/alecthomas/kong v0.7.1/go.mod h1:n1iCIO2xS46oE8ZfYCNDqdR0b0wZNrXAIAqr
github.com/alecthomas/repr v0.1.0 h1:ENn2e1+J3k09gyj2shc0dHr/yjaWSHRlrJ4DPMevDqE=
github.com/alecthomas/repr v0.1.0/go.mod h1:2kn6fqh/zIyPLmm3ugklbEi5hg5wS435eygvNfaDQL8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr4=
github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03VsM8rvUec=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
Expand All @@ -33,15 +35,15 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/morgo/tidb/pkg/parser v0.0.0-20260409131709-d591801319ba h1:gDABs8OfAupna7y6JuxIKpd6AGbxip7Ks5sTobk59Ac=
github.com/morgo/tidb/pkg/parser v0.0.0-20260409131709-d591801319ba/go.mod h1:zDLDsfNBU5+L6T4J9/OgWAHc/WZvMUjbpgHqQ/t3yKo=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.5-0.20250523034308-74f78ae071ee h1:/IDPbpzkzA97t1/Z1+C3KlxbevjMeaI6BQYxvivu4u8=
github.com/pingcap/errors v0.11.5-0.20250523034308-74f78ae071ee/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9 h1:qG9BSvlWFEE5otQGamuWedx9LRm0nrHvsQRQiW8SxEs=
github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA=
github.com/pingcap/tidb/pkg/parser v0.0.0-20250811102254-4230cf349b01 h1:n3qowTSsPQ6PwOqEmrmVHvlsuW3Ig2i2yRbjE7vd2Fg=
github.com/pingcap/tidb/pkg/parser v0.0.0-20250811102254-4230cf349b01/go.mod h1:mpCcwRdMnmvNkBxcT4AqiE0yuvfJTdmCJs7cfznJw1w=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
Expand Down
62 changes: 62 additions & 0 deletions pkg/copier/buffered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,3 +359,65 @@ func (d *delayedCallbackApplier) Stop() error {
func (d *delayedCallbackApplier) GetTargets() []applier.Target {
return d.realApplier.GetTargets()
}

// TestBufferedCopierGeometry tests that the buffered copier correctly handles
// GEOMETRY column data (binary spatial values). This is important because
// geometry data is stored as binary blobs with internal structure, and
// incorrect handling (e.g. charset conversion, escaping) could corrupt it.
func TestBufferedCopierGeometry(t *testing.T) {
testutils.RunSQL(t, "DROP TABLE IF EXISTS geomsrc, geomdst")
testutils.RunSQL(t, `CREATE TABLE geomsrc (
id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
location GEOMETRY NOT NULL SRID 4326,
SPATIAL INDEX idx_location (location)
)`)
testutils.RunSQL(t, `CREATE TABLE geomdst (
id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
location GEOMETRY NOT NULL SRID 4326,
SPATIAL INDEX idx_location (location)
)`)
testutils.RunSQL(t, `INSERT INTO geomsrc (name, location) VALUES
('Statue of Liberty', ST_GeomFromText('POINT(-74.0445 40.6892)', 4326, 'axis-order=long-lat')),
('Eiffel Tower', ST_GeomFromText('POINT(2.2945 48.8584)', 4326, 'axis-order=long-lat')),
('Big Ben', ST_GeomFromText('POINT(-0.1246 51.5007)', 4326, 'axis-order=long-lat')),
('Colosseum', ST_GeomFromText('POINT(12.4924 41.8902)', 4326, 'axis-order=long-lat')),
('Sydney Opera House', ST_GeomFromText('POINT(151.2153 -33.8568)', 4326, 'axis-order=long-lat')),
('Great Wall of China', ST_GeomFromText('POINT(116.5704 40.4319)', 4326, 'axis-order=long-lat')),
('Machu Picchu', ST_GeomFromText('POINT(-72.5450 -13.1631)', 4326, 'axis-order=long-lat')),
('Taj Mahal', ST_GeomFromText('POINT(78.0421 27.1751)', 4326, 'axis-order=long-lat')),
('Christ the Redeemer', ST_GeomFromText('POINT(-43.2105 -22.9519)', 4326, 'axis-order=long-lat')),
('Golden Gate Bridge', ST_GeomFromText('POINT(-122.4783 37.8199)', 4326, 'axis-order=long-lat'))
`)

db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig())
assert.NoError(t, err)
defer utils.CloseAndLog(db)

t1 := table.NewTableInfo(db, "test", "geomsrc")
assert.NoError(t, t1.SetInfo(t.Context()))
t2 := table.NewTableInfo(db, "test", "geomdst")
assert.NoError(t, t2.SetInfo(t.Context()))

cfg := NewCopierDefaultConfig()
cfg.Applier, err = applier.NewSingleTargetApplier(applier.Target{DB: db}, applier.NewApplierDefaultConfig())
require.NoError(t, err)
chunker, err := table.NewChunker(t1, t2, cfg.TargetChunkTime, cfg.Logger)
assert.NoError(t, err)
assert.NoError(t, chunker.Open())

copier, err := NewCopier(db, chunker, cfg)
assert.NoError(t, err)
assert.NoError(t, copier.Run(t.Context()))

// Verify geometry data was copied correctly by comparing ST_AsText output.
var checksumSrc, checksumDst string
err = db.QueryRowContext(t.Context(),
"SELECT BIT_XOR(CRC32(CONCAT(id, name, ST_AsText(location)))) FROM geomsrc").Scan(&checksumSrc)
assert.NoError(t, err)
err = db.QueryRowContext(t.Context(),
"SELECT BIT_XOR(CRC32(CONCAT(id, name, ST_AsText(location)))) FROM geomdst").Scan(&checksumDst)
assert.NoError(t, err)
assert.Equal(t, checksumSrc, checksumDst, "geometry data checksum mismatch after buffered copy")
}
51 changes: 51 additions & 0 deletions pkg/migration/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,57 @@ IF(
assert.NoError(t, err)
}

// TestSpatialGeneratedColumnAndIndex tests that the parser accepts a combined ALTER TABLE
// that adds a GEOMETRY generated column with SRID and a SPATIAL INDEX in one statement.
// The actual geometry data lives in a text column, so this only validates parsing + DDL execution.
func TestSpatialGeneratedColumnAndIndex(t *testing.T) {
t.Parallel()
testutils.RunSQL(t, `DROP TABLE IF EXISTS t1spatial, _t1spatial_new`)
testutils.RunSQL(t, `CREATE TABLE t1spatial (
id bigint NOT NULL AUTO_INCREMENT,
name varchar(255) NOT NULL,
geometry_wkt varchar(500) NOT NULL,
PRIMARY KEY (id)
)`)
testutils.RunSQL(t, `INSERT INTO t1spatial (name, geometry_wkt) VALUES
('Statue of Liberty', 'POINT(-74.0445 40.6892)'),
('Eiffel Tower', 'POINT(2.2945 48.8584)'),
('Big Ben', 'POINT(-0.1246 51.5007)'),
('Colosseum', 'POINT(12.4924 41.8902)'),
('Sydney Opera House', 'POINT(151.2153 -33.8568)'),
('Great Wall of China', 'POINT(116.5704 40.4319)'),
('Machu Picchu', 'POINT(-72.5450 -13.1631)'),
('Taj Mahal', 'POINT(78.0421 27.1751)'),
('Christ the Redeemer', 'POINT(-43.2105 -22.9519)'),
('Golden Gate Bridge', 'POINT(-122.4783 37.8199)')
`)

cfg, err := mysql.ParseDSN(testutils.DSN())
assert.NoError(t, err)

migration := &Migration{
Host: cfg.Addr,
Username: cfg.User,
Password: &cfg.Passwd,
Database: cfg.DBName,
Threads: 2,
Statement: `ALTER TABLE t1spatial
ADD COLUMN points_of_interest GEOMETRY GENERATED ALWAYS AS (ST_GeomFromText(geometry_wkt, 4326, 'axis-order=long-lat')) STORED NOT NULL SRID 4326,
ADD SPATIAL INDEX idx_points_of_interest (points_of_interest)`,
}
err = migration.Run()
assert.NoError(t, err)

db, err := sql.Open("mysql", testutils.DSN())
assert.NoError(t, err)
defer func() { assert.NoError(t, db.Close()) }()

var count int
err = db.QueryRowContext(t.Context(), `SELECT COUNT(*) FROM t1spatial`).Scan(&count)
assert.NoError(t, err)
assert.Equal(t, 10, count)
}

type testcase struct {
OldType string
NewType string
Expand Down
120 changes: 120 additions & 0 deletions pkg/repl/subscription_buffered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,123 @@ func TestBufferedMapFlushWithoutLockRespectsWatermark(t *testing.T) {
err = db.QueryRowContext(t.Context(), "SELECT name FROM _subscription_test_new WHERE id = 5").Scan(&name)
assert.Error(t, err, "Row with id=5 (at watermark) should NOT exist yet")
}

// TestBufferedMapGeometry tests that GEOMETRY column data (binary spatial values)
// is correctly handled by the buffered map subscription. The buffered map stores
// full row images from binlog events, so this verifies the binary geometry
// representation survives the replication pipeline without corruption due to
// partial values replicated, strange encoding etc.
// This test requires full binlog_row_image (non-minimal RBR).
func TestBufferedMapGeometry(t *testing.T) {
if testutils.IsMinimalRBRTestRunner(t) {
t.Skip("Skipping test for minimal RBR test runner")
}
testutils.RunSQL(t, "DROP TABLE IF EXISTS subscription_test, _subscription_test_new")
testutils.RunSQL(t, `CREATE TABLE subscription_test (
id INT NOT NULL,
name VARCHAR(255) NOT NULL,
location GEOMETRY NOT NULL SRID 4326,
PRIMARY KEY (id)
)`)
testutils.RunSQL(t, `CREATE TABLE _subscription_test_new (
id INT NOT NULL,
name VARCHAR(255) NOT NULL,
location GEOMETRY NOT NULL SRID 4326,
PRIMARY KEY (id)
)`)

db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig())
assert.NoError(t, err)
defer utils.CloseAndLog(db)

srcTable := table.NewTableInfo(db, "test", "subscription_test")
assert.NoError(t, srcTable.SetInfo(t.Context()))
dstTable := table.NewTableInfo(db, "test", "_subscription_test_new")
assert.NoError(t, dstTable.SetInfo(t.Context()))

cfg, err := mysql2.ParseDSN(testutils.DSN())
assert.NoError(t, err)
target := applier.Target{
DB: db,
KeyRange: "0",
Config: cfg,
}
appl, err := applier.NewSingleTargetApplier(target, applier.NewApplierDefaultConfig())
assert.NoError(t, err)
client := NewClient(db, cfg.Addr, cfg.User, cfg.Passwd, &ClientConfig{
Logger: slog.Default(),
Concurrency: 4,
TargetBatchTime: time.Second,
ServerID: NewServerID(),
Applier: appl,
})
assert.NoError(t, client.AddSubscription(srcTable, dstTable, nil))
assert.NoError(t, client.Run(t.Context()))
defer client.Close()

// INSERT geometry data.
testutils.RunSQL(t, `INSERT INTO subscription_test (id, name, location) VALUES
(1, 'Statue of Liberty', ST_GeomFromText('POINT(-74.0445 40.6892)', 4326, 'axis-order=long-lat')),
(2, 'Eiffel Tower', ST_GeomFromText('POINT(2.2945 48.8584)', 4326, 'axis-order=long-lat')),
(3, 'Big Ben', ST_GeomFromText('POINT(-0.1246 51.5007)', 4326, 'axis-order=long-lat'))
`)
assert.NoError(t, client.BlockWait(t.Context()))
assert.Equal(t, 3, client.GetDeltaLen())

// Inspect the buffered subscription directly to verify row images contain geometry data.
sub, ok := client.subscriptions["test.subscription_test"].(*bufferedMap)
assert.True(t, ok)
assert.False(t, sub.changes["1"].logicalRow.IsDeleted)
// The row image should have 3 columns: id, name, location (as binary geometry).
assert.Len(t, sub.changes["1"].logicalRow.RowImage, 3)

// Flush and verify the geometry data was applied correctly.
allFlushed, err := sub.Flush(t.Context(), false, nil)
assert.NoError(t, err)
assert.True(t, allFlushed)

var count int
err = db.QueryRowContext(t.Context(), "SELECT COUNT(*) FROM _subscription_test_new").Scan(&count)
assert.NoError(t, err)
assert.Equal(t, 3, count)

// Verify the geometry data round-trips correctly via ST_AsText.
var wkt string
err = db.QueryRowContext(t.Context(),
"SELECT ST_AsText(location) FROM _subscription_test_new WHERE id = 1").Scan(&wkt)
assert.NoError(t, err)
assert.Contains(t, wkt, "POINT")

// UPDATE geometry data — move the Eiffel Tower.
testutils.RunSQL(t, `UPDATE subscription_test SET location = ST_GeomFromText('POINT(2.3 48.9)', 4326, 'axis-order=long-lat') WHERE id = 2`)
assert.NoError(t, client.BlockWait(t.Context()))
allFlushed, err = sub.Flush(t.Context(), false, nil)
assert.NoError(t, err)
assert.True(t, allFlushed)

err = db.QueryRowContext(t.Context(),
"SELECT ST_AsText(location) FROM _subscription_test_new WHERE id = 2").Scan(&wkt)
assert.NoError(t, err)
assert.Contains(t, wkt, "2.3")

// DELETE a row.
testutils.RunSQL(t, `DELETE FROM subscription_test WHERE id = 3`)
assert.NoError(t, client.BlockWait(t.Context()))
allFlushed, err = sub.Flush(t.Context(), false, nil)
assert.NoError(t, err)
assert.True(t, allFlushed)

err = db.QueryRowContext(t.Context(), "SELECT COUNT(*) FROM _subscription_test_new").Scan(&count)
assert.NoError(t, err)
assert.Equal(t, 2, count)

// Final checksum: verify all geometry data matches.
var checksumSrc, checksumDst string
err = db.QueryRowContext(t.Context(),
"SELECT BIT_XOR(CRC32(CONCAT(id, name, ST_AsText(location)))) FROM subscription_test").Scan(&checksumSrc)
assert.NoError(t, err)
err = db.QueryRowContext(t.Context(),
"SELECT BIT_XOR(CRC32(CONCAT(id, name, ST_AsText(location)))) FROM _subscription_test_new").Scan(&checksumDst)
assert.NoError(t, err)
assert.Equal(t, checksumSrc, checksumDst)
}
8 changes: 8 additions & 0 deletions pkg/statement/statement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ func TestExtractFromStatement(t *testing.T) {
assert.Equal(t, "ADD INDEX `idx` (`name`(10), `description`(20))", abstractStmt[0].Alter)
assert.Equal(t, "/* rewritten from CREATE INDEX */ ALTER TABLE `t1` ADD INDEX `idx` (`name`(10), `description`(20))", abstractStmt[0].Statement)

// Test create spatial index is rewritten.
abstractStmt, err = New("CREATE SPATIAL INDEX idx_geom ON t1 (geom)")
assert.NoError(t, err)
assert.Equal(t, "t1", abstractStmt[0].Table)
assert.Equal(t, "ADD SPATIAL INDEX `idx_geom` (`geom`)", abstractStmt[0].Alter)
assert.Equal(t, "/* rewritten from CREATE INDEX */ ALTER TABLE `t1` ADD SPATIAL INDEX `idx_geom` (`geom`)", abstractStmt[0].Statement)
assert.True(t, abstractStmt[0].IsAlterTable()) // StmtNode should be an AlterTableStmt

// Test drop index is rewritten.
abstractStmt, err = New("DROP INDEX idx ON t1")
assert.NoError(t, err)
Expand Down
Loading