Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
c62022b
chore: add context.Context everywhere
joschi Aug 8, 2024
047c5f8
Add OpenTelemetry instrumentation
joschi Apr 18, 2026
f80bee8
otel: add transparent driver-level span instrumentation
joschi Apr 18, 2026
0a60410
otel: instrument database/sql drivers with otelsql
joschi Apr 18, 2026
95fd12e
database: instrument SQL drivers with otelsql for SQL-level tracing
joschi Apr 19, 2026
acc5d64
database/sqlcipher: instrument with otelsql via ExecerContext adapter
joschi Apr 19, 2026
0fa1334
database/cassandra: instrument with otelgocql for query/batch/connect…
joschi Apr 19, 2026
a3cda31
database: use semconv/v1.40.0 constants for db.system.name attributes
joschi Apr 19, 2026
63fa00a
database: fix TestOTelDriver_DbSystemAttribute after semconv v1.40.0 …
joschi Apr 19, 2026
97bc320
source/aws_s3: migrate to AWS SDK v2 and instrument with otelaws
joschi Apr 19, 2026
3251f21
source: instrument HTTP-based source drivers with otelhttp
joschi Apr 19, 2026
680229d
database/mongodb: instrument with otelmongo
joschi Apr 19, 2026
3e068d4
feat(otel): instrument Spanner and GCS with otelgrpc
joschi Apr 19, 2026
b5157e0
fix(otel): fix semconv key, dead field, and histogram over-recording
joschi Apr 19, 2026
828eef5
fix: update s3/manager and gosnowflake to fix AWS SDK v2 type incompa…
joschi Apr 19, 2026
c70c0ff
fix: address code review comments from PR #1394
joschi Apr 19, 2026
3affbde
chore: downgrade to Go 1.24 and compatible dependency versions
joschi Apr 19, 2026
d1323c7
chore: go fmt
joschi Apr 19, 2026
c220835
fix: address golangci-lint issues
joschi Apr 19, 2026
20729f7
fix: address second round of Copilot review comments
joschi Apr 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ $ docker run -v {{ migration dir }}:/migrations --network host migrate/migrate
* Bring your own logger.
* Uses `io.Reader` streams internally for low memory overhead.
* Thread-safe and no goroutine leaks.
* Emits [OpenTelemetry](https://opentelemetry.io) traces and metrics automatically when a global provider is configured by the host application (see [OpenTelemetry](#opentelemetry) below).

__[Go Documentation](https://pkg.go.dev/github.com/golang-migrate/migrate/v4)__

Expand Down Expand Up @@ -153,6 +154,76 @@ func main() {

Go to [getting started](GETTING_STARTED.md)

## OpenTelemetry

migrate emits [OpenTelemetry](https://opentelemetry.io) traces and metrics through the OTel **API** only — it does **not** initialize an SDK, configure exporters, or set up resources. This means:

- **Zero overhead** when no OTel SDK is configured (global no-op providers are used).
- **Automatic telemetry** when the host application configures OTel global providers (e.g. via `go.opentelemetry.io/otel/sdk`).

### Signals

**Traces** — one parent span per public operation (`migrate.up`, `migrate.down`, `migrate.migrate`, etc.), one child span per migration execution (`migrate.run_migration`), and driver-level spans for every database and source operation.

#### Operation spans (SpanKind: INTERNAL)

| Span name | Emitted by |
|-----------|-----------|
| `migrate.up` | `Up` / `Steps` (positive) |
| `migrate.down` | `Down` / `Steps` (negative) |
| `migrate.migrate` | `Migrate` |
| `migrate.force` | `Force` |
| `migrate.drop` | `Drop` |
| `migrate.run_migration` | Per-migration child of the above |

#### Database driver spans (SpanKind: CLIENT)

| Span name | Method | Key attributes |
|-----------|--------|----------------|
| `db.lock` | Lock | `db.system` |
| `db.unlock` | Unlock | `db.system` |
| `db.run` | Run | `db.system` |
| `db.set_version` | SetVersion | `db.system`, `migrate.version`, `migrate.dirty` |
| `db.version` | Version | `db.system` |
| `db.drop` | Drop | `db.system` |
Comment thread
joschi marked this conversation as resolved.
Outdated

#### Source driver spans (SpanKind: INTERNAL)

| Span name | Method | Key attributes |
|-----------|--------|----------------|
| `source.read_up` | ReadUp | `migrate.source`, `migrate.version` |
| `source.read_down` | ReadDown | `migrate.source`, `migrate.version` |

**Metrics:**

| Metric | Type | Unit | Description |
|--------|------|------|-------------|
| `migrate.migrations.applied` | Counter | `{migration}` | Number of migrations successfully applied |
| `migrate.migrations.failed` | Counter | `{migration}` | Number of migrations that failed to apply |
| `migrate.migration.run.duration` | Histogram | `s` | Execution duration of a single migration |

**Common attributes:** `db.system`, `migrate.source`, `migrate.direction`, `migrate.version`, `migrate.target_version`, `migrate.identifier`.

Comment thread
joschi marked this conversation as resolved.
### Example

```go
import (
"go.opentelemetry.io/otel"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
// ... your exporter of choice
)

// In your application setup:
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(yourExporter),
)
otel.SetTracerProvider(tp)

// migrate will now automatically emit spans and metrics.
m, _ := migrate.New("file:///migrations", "postgres://...")
Comment thread
joschi marked this conversation as resolved.
Outdated
m.Up(ctx) // emits traces and metrics
```

## Tutorials

* [CockroachDB](database/cockroachdb/TUTORIAL.md)
Expand Down
34 changes: 18 additions & 16 deletions database/cassandra/cassandra.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cassandra

import (
"context"
"errors"
"fmt"
"io"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/gocql/gocql"
"github.com/golang-migrate/migrate/v4/database"
"github.com/golang-migrate/migrate/v4/database/multistmt"
"go.opentelemetry.io/contrib/instrumentation/github.com/gocql/gocql/otelgocql"
)

func init() {
Expand Down Expand Up @@ -50,7 +52,7 @@ type Cassandra struct {
config *Config
}

func WithInstance(session *gocql.Session, config *Config) (database.Driver, error) {
func WithInstance(ctx context.Context, session *gocql.Session, config *Config) (database.Driver, error) {
if config == nil {
return nil, ErrNilConfig
} else if len(config.KeyspaceName) == 0 {
Expand All @@ -74,14 +76,14 @@ func WithInstance(session *gocql.Session, config *Config) (database.Driver, erro
config: config,
}

if err := c.ensureVersionTable(); err != nil {
if err := c.ensureVersionTable(ctx); err != nil {
return nil, err
}

return c, nil
}

func (c *Cassandra) Open(url string) (database.Driver, error) {
func (c *Cassandra) Open(ctx context.Context, url string) (database.Driver, error) {
u, err := nurl.Parse(url)
if err != nil {
return nil, err
Expand Down Expand Up @@ -170,7 +172,7 @@ func (c *Cassandra) Open(url string) (database.Driver, error) {
}
}

session, err := cluster.CreateSession()
session, err := otelgocql.NewSessionWithTracing(ctx, cluster)
if err != nil {
return nil, err
}
Expand All @@ -183,34 +185,34 @@ func (c *Cassandra) Open(url string) (database.Driver, error) {
}
}

return WithInstance(session, &Config{
return WithInstance(ctx, session, &Config{
KeyspaceName: strings.TrimPrefix(u.Path, "/"),
MigrationsTable: u.Query().Get("x-migrations-table"),
MultiStatementEnabled: u.Query().Get("x-multi-statement") == "true",
MultiStatementMaxSize: multiStatementMaxSize,
})
}

func (c *Cassandra) Close() error {
func (c *Cassandra) Close(ctx context.Context) error {
c.session.Close()
return nil
}

func (c *Cassandra) Lock() error {
func (c *Cassandra) Lock(ctx context.Context) error {
if !c.isLocked.CompareAndSwap(false, true) {
return database.ErrLocked
}
return nil
}

func (c *Cassandra) Unlock() error {
func (c *Cassandra) Unlock(ctx context.Context) error {
if !c.isLocked.CompareAndSwap(true, false) {
return database.ErrNotLocked
}
return nil
}

func (c *Cassandra) Run(migration io.Reader) error {
func (c *Cassandra) Run(ctx context.Context, migration io.Reader) error {
if c.config.MultiStatementEnabled {
var err error
if e := multistmt.Parse(migration, multiStmtDelimiter, c.config.MultiStatementMaxSize, func(m []byte) bool {
Expand Down Expand Up @@ -241,7 +243,7 @@ func (c *Cassandra) Run(migration io.Reader) error {
return nil
}

func (c *Cassandra) SetVersion(version int, dirty bool) error {
func (c *Cassandra) SetVersion(ctx context.Context, version int, dirty bool) error {
// DELETE instead of TRUNCATE because AWS Keyspaces does not support it
// see: https://docs.aws.amazon.com/keyspaces/latest/devguide/cassandra-apis.html
squery := `SELECT version FROM "` + c.config.MigrationsTable + `"`
Expand Down Expand Up @@ -271,7 +273,7 @@ func (c *Cassandra) SetVersion(version int, dirty bool) error {
}

// Return current keyspace version
func (c *Cassandra) Version() (version int, dirty bool, err error) {
func (c *Cassandra) Version(ctx context.Context) (version int, dirty bool, err error) {
query := `SELECT version, dirty FROM "` + c.config.MigrationsTable + `" LIMIT 1`
err = c.session.Query(query).Scan(&version, &dirty)
switch {
Expand All @@ -289,7 +291,7 @@ func (c *Cassandra) Version() (version int, dirty bool, err error) {
}
}

func (c *Cassandra) Drop() error {
func (c *Cassandra) Drop(ctx context.Context) error {
// select all tables in current schema
query := fmt.Sprintf(`SELECT table_name from system_schema.tables WHERE keyspace_name='%s'`, c.config.KeyspaceName)
iter := c.session.Query(query).Iter()
Expand All @@ -307,13 +309,13 @@ func (c *Cassandra) Drop() error {
// ensureVersionTable checks if versions table exists and, if not, creates it.
// Note that this function locks the database, which deviates from the usual
// convention of "caller locks" in the Cassandra type.
func (c *Cassandra) ensureVersionTable() (err error) {
if err = c.Lock(); err != nil {
func (c *Cassandra) ensureVersionTable(ctx context.Context) (err error) {
if err = c.Lock(ctx); err != nil {
return err
}

defer func() {
if e := c.Unlock(); e != nil {
if e := c.Unlock(ctx); e != nil {
err = errors.Join(err, e)
}
}()
Expand All @@ -322,7 +324,7 @@ func (c *Cassandra) ensureVersionTable() (err error) {
if err != nil {
return err
}
if _, _, err = c.Version(); err != nil {
if _, _, err = c.Version(ctx); err != nil {
return err
}
return nil
Expand Down
12 changes: 7 additions & 5 deletions database/cassandra/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,19 @@ func Test(t *testing.T) {

func test(t *testing.T) {
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
ctx := context.Background()
ip, port, err := c.Port(9042)
if err != nil {
t.Fatal("Unable to get mapped port:", err)
}
addr := fmt.Sprintf("cassandra://%v:%v/testks", ip, port)
p := &Cassandra{}
d, err := p.Open(addr)
d, err := p.Open(ctx, addr)
if err != nil {
t.Fatal(err)
}
defer func() {
if err := d.Close(); err != nil {
if err := d.Close(ctx); err != nil {
t.Error(err)
}
}()
Expand All @@ -97,23 +98,24 @@ func test(t *testing.T) {

func testMigrate(t *testing.T) {
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
ctx := context.Background()
ip, port, err := c.Port(9042)
if err != nil {
t.Fatal("Unable to get mapped port:", err)
}
addr := fmt.Sprintf("cassandra://%v:%v/testks", ip, port)
p := &Cassandra{}
d, err := p.Open(addr)
d, err := p.Open(ctx, addr)
if err != nil {
t.Fatal(err)
}
defer func() {
if err := d.Close(); err != nil {
if err := d.Close(ctx); err != nil {
t.Error(err)
}
}()

m, err := migrate.NewWithDatabaseInstance("file://./examples/migrations", "testks", d)
m, err := migrate.NewWithDatabaseInstance(ctx, "file://./examples/migrations", "testks", d)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading
Loading