Skip to content

Commit 4cd1109

Browse files
srebhanliujiacheng777
authored andcommitted
apacheGH-34332: [Go][FlightRPC] Add driver for database/sql framework (apache#34331)
### Rationale for this change Using Golang's `database/sql` framework is well known, offers goodies like connection pooling and is easy to use. Therefore using FlightSQL trough this framework is a good starting point for users performing simple queries, inserts etc. ### What changes are included in this PR? This PR adds an `database/sql/driver` implementation currently supporting `sqlite` and `InfluxData IOx` (query only). Unit-tests are added using the SQLite server example implementation and the driver and driver settings are documented. ### Are these changes tested? Yes, a test-suite is added for the driver. Futhermore, the IOx backend is additionally tested against a real local instance using [this code](https://github.com/srebhan/go-flightsql-example). ### Are there any user-facing changes? This PR does not contain breaking changes. All modifications to the FlightSQL client code are transparent to the user. * Closes: apache#34332 Authored-by: Sven Rebhan <srebhan@influxdata.com> Signed-off-by: Matt Topol <zotthewizard@gmail.com>
1 parent 7e4d799 commit 4cd1109

7 files changed

Lines changed: 1869 additions & 3 deletions

File tree

go/arrow/flight/client.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,10 @@ func NewFlightClient(addr string, auth ClientAuthHandler, opts ...grpc.DialOptio
271271
// being the inner most wrapper around the actual call. It also passes along the dialoptions passed in such
272272
// as TLS certs and so on.
273273
func NewClientWithMiddleware(addr string, auth ClientAuthHandler, middleware []ClientMiddleware, opts ...grpc.DialOption) (Client, error) {
274+
return NewClientWithMiddlewareCtx(context.Background(), addr, auth, middleware, opts...)
275+
}
276+
277+
func NewClientWithMiddlewareCtx(ctx context.Context, addr string, auth ClientAuthHandler, middleware []ClientMiddleware, opts ...grpc.DialOption) (Client, error) {
274278
unary := make([]grpc.UnaryClientInterceptor, 0, len(middleware))
275279
stream := make([]grpc.StreamClientInterceptor, 0, len(middleware))
276280
if auth != nil {
@@ -288,7 +292,7 @@ func NewClientWithMiddleware(addr string, auth ClientAuthHandler, middleware []C
288292
}
289293
}
290294
opts = append(opts, grpc.WithChainUnaryInterceptor(unary...), grpc.WithChainStreamInterceptor(stream...))
291-
conn, err := grpc.Dial(addr, opts...)
295+
conn, err := grpc.DialContext(ctx, addr, opts...)
292296
if err != nil {
293297
return nil, err
294298
}

go/arrow/flight/flightsql/client.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,11 @@ import (
3939
// its arguments to flight.NewClientWithMiddleware to create the
4040
// underlying Flight Client.
4141
func NewClient(addr string, auth flight.ClientAuthHandler, middleware []flight.ClientMiddleware, opts ...grpc.DialOption) (*Client, error) {
42-
cl, err := flight.NewClientWithMiddleware(addr, auth, middleware, opts...)
42+
return NewClientCtx(context.Background(), addr, auth, middleware, opts...)
43+
}
44+
45+
func NewClientCtx(ctx context.Context, addr string, auth flight.ClientAuthHandler, middleware []flight.ClientMiddleware, opts ...grpc.DialOption) (*Client, error) {
46+
cl, err := flight.NewClientWithMiddlewareCtx(ctx, addr, auth, middleware, opts...)
4347
if err != nil {
4448
return nil, err
4549
}
@@ -1110,7 +1114,9 @@ func (p *PreparedStatement) clearParameters() {
11101114
func (p *PreparedStatement) SetParameters(binding arrow.Record) {
11111115
p.clearParameters()
11121116
p.paramBinding = binding
1113-
p.paramBinding.Retain()
1117+
if p.paramBinding != nil {
1118+
p.paramBinding.Retain()
1119+
}
11141120
}
11151121

11161122
// SetRecordReader takes a RecordReader to send as the parameter bindings when
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
<!---
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
# FlightSQL driver
20+
21+
A FlightSQL-Driver for Go's [database/sql](https://golang.org/pkg/database/sql/)
22+
package. This driver is a lightweight wrapper around the FlightSQL client in
23+
pure Go. It provides all advantages of a `database/sql` driver like automatic
24+
connection pooling, transactions combined with ease of use (see (#usage)).
25+
26+
---------------------------------------
27+
28+
* [Prerequisits](#prerequisits)
29+
* [Usage](#usage)
30+
* [Data Source Name (DSN)](#data-source-name-dsn)
31+
* [Driver config usage](#driver-config-usage)
32+
* [TLS setup](#tls-setup)
33+
34+
---------------------------------------
35+
36+
## Prerequisits
37+
38+
* Go 1.19+
39+
* Installation via `go get -u github.com/apache/arrow/go/v12/arrow/flight/flightsql`
40+
* Backend speaking FlightSQL
41+
42+
---------------------------------------
43+
44+
## Usage
45+
46+
_Go FlightQL Driver_ is an implementation of Go's `database/sql/driver`
47+
interface to use the [`database/sql`](https://golang.org/pkg/database/sql/)
48+
framework. The driver is registered as `flightsql` and configured using a
49+
[data-source name (DSN)](#data-source-name-dsn).
50+
51+
A basic example using a SQLite backend looks like this
52+
53+
```go
54+
import (
55+
"database/sql"
56+
"time"
57+
58+
_ "github.com/apache/arrow/go/v12/arrow/flight/flightsql"
59+
)
60+
61+
// Open the connection to an SQLite backend
62+
db, err := sql.Open("flightsql", "flightsql://localhost:12345?timeout=5s")
63+
if err != nil {
64+
panic(err)
65+
}
66+
// Make sure we close the connection to the database
67+
defer db.Close()
68+
69+
// Use the connection e.g. for querying
70+
rows, err := db.Query("SELECT * FROM mytable")
71+
if err != nil {
72+
panic(err)
73+
}
74+
// ...
75+
```
76+
77+
## Data Source Name (DSN)
78+
79+
A Data Source Name has the following format:
80+
81+
```text
82+
flightsql://[user[:password]@]<address>[:port][?param1=value1&...&paramN=valueN]
83+
```
84+
85+
The data-source-name (DSN) requires the `address` of the backend with an
86+
optional port setting. The `user` and `password` parameters are passed to the
87+
backend as GRPC Basic-Auth headers. If your backend requires a token based
88+
authentication, please use a `token` parameter (see
89+
[common parameters](#common-parameters) below).
90+
91+
**Please note**: All parameters are case-sensitive!
92+
93+
Alternatively to specifying the DSN directly you can use the `DriverConfig`
94+
structure to generate the DSN string. See the
95+
[Driver config usage section](#driver-config-usage) for details.
96+
97+
### Common parameters
98+
99+
The following common parameters exist
100+
101+
#### `token`
102+
103+
The `token` parameter can be used to specify the token for token-based
104+
authentication. The value is passed on to the backend as a GRPC Bearer-Auth
105+
header.
106+
107+
#### `timeout`
108+
109+
The `timeout` parameter can be set using a duration string e.g. `timeout=5s`
110+
to limit the maximum time an operation can take. This prevents calls that wait
111+
forever, e.g. if the backend is down or a query is taking very long. When
112+
not set, the driver will use an _infinite_ timeout.
113+
114+
## Driver config usage
115+
116+
Alternatively to specifying the DSN directly you can fill the `DriverConfig`
117+
structure and generate the DSN out of this. Here is some example
118+
119+
```golang
120+
package main
121+
122+
import (
123+
"database/sql"
124+
"log"
125+
"time"
126+
127+
"github.com/apache/arrow/go/v12/arrow/flight/flightsql"
128+
)
129+
130+
func main() {
131+
config := flightsql.DriverConfig{
132+
Address: "localhost:12345",
133+
Token: "your token",
134+
Timeout: 10 * time.Second,
135+
Params: map[string]string{
136+
"my-custom-parameter": "foobar",
137+
},
138+
}
139+
db, err := sql.Open("flightsql", config.DSN())
140+
if err != nil {
141+
log.Fatalf("open failed: %v", err)
142+
}
143+
defer db.Close()
144+
145+
...
146+
}
147+
```
148+
149+
## TLS setup
150+
151+
Currently TLS is not yet supported and will be added later.
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
package driver
17+
18+
import (
19+
"crypto/tls"
20+
"fmt"
21+
"net/url"
22+
"time"
23+
)
24+
25+
type DriverConfig struct {
26+
Address string
27+
Username string
28+
Password string
29+
Token string
30+
Timeout time.Duration
31+
Params map[string]string
32+
33+
TLSEnabled bool
34+
TLSConfig *tls.Config
35+
}
36+
37+
func NewDriverConfigFromDSN(dsn string) (*DriverConfig, error) {
38+
u, err := url.Parse(dsn)
39+
if err != nil {
40+
return nil, err
41+
}
42+
43+
// Sanity checks on the given connection string
44+
if u.Scheme != "flightsql" {
45+
return nil, fmt.Errorf("invalid scheme %q", u.Scheme)
46+
}
47+
if u.Path != "" {
48+
return nil, fmt.Errorf("unexpected path %q", u.Path)
49+
}
50+
51+
// Extract the settings
52+
var username, password string
53+
if u.User != nil {
54+
username = u.User.Username()
55+
if v, set := u.User.Password(); set {
56+
password = v
57+
}
58+
}
59+
60+
config := &DriverConfig{
61+
Address: u.Host,
62+
Username: username,
63+
Password: password,
64+
Params: make(map[string]string),
65+
}
66+
67+
// Determine the parameters
68+
for key, values := range u.Query() {
69+
// We only support single instances
70+
if len(values) > 1 {
71+
return nil, fmt.Errorf("too many values for %q", key)
72+
}
73+
var v string
74+
if len(values) > 0 {
75+
v = values[0]
76+
}
77+
78+
switch key {
79+
case "token":
80+
config.Token = v
81+
case "timeout":
82+
config.Timeout, err = time.ParseDuration(v)
83+
if err != nil {
84+
return nil, err
85+
}
86+
default:
87+
config.Params[key] = v
88+
}
89+
}
90+
91+
return config, nil
92+
}
93+
94+
func (config *DriverConfig) DSN() string {
95+
u := url.URL{
96+
Scheme: "flightsql",
97+
Host: config.Address,
98+
}
99+
if config.Username != "" {
100+
if config.Password == "" {
101+
u.User = url.User(config.Username)
102+
} else {
103+
u.User = url.UserPassword(config.Username, config.Password)
104+
}
105+
}
106+
107+
// Set the parameters
108+
values := url.Values{}
109+
if config.Token != "" {
110+
values.Add("token", config.Token)
111+
}
112+
if config.Timeout > 0 {
113+
values.Add("timeout", config.Timeout.String())
114+
}
115+
for k, v := range config.Params {
116+
values.Add(k, v)
117+
}
118+
119+
// Check if we do have parameters at all and set them
120+
if len(values) > 0 {
121+
u.RawQuery = values.Encode()
122+
}
123+
124+
return u.String()
125+
}

0 commit comments

Comments
 (0)