Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Commit

Permalink
Merge pull request #48 from github/multi-schema-query
Browse files Browse the repository at this point in the history
Support for multiple-schemas
  • Loading branch information
Shlomi Noach authored Oct 31, 2017
2 parents 2b02539 + 59799c0 commit 0d38dff
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 21 deletions.
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ Usage of ccql:
MySQL password
-q string
Query/queries to execute
-s string
List of databases to query from; overrides -d, prints schema name to output
-t float
Connect timeout seconds
-u string
Expand All @@ -58,6 +60,16 @@ You may provide a query or a list of queries in the following ways:
Queries are delimited by a semicolon (`;`). The last query may, but does not have to, be terminated by a semicolon.
Quotes are respected, up to a reasonable level. It is valid to include a semicolon in a quoted text, as in `select 'single;query'`. However `ccql` does not employ a full blown parser, so please don't overdo it. For example, the following may not be parsed correctly: `select '\';\''`. You get it.

#### Schemas

You may either provide:

- An implicit, default schema via `-d schema_name`
- Schema name is not visible on output.
- Or explicit list of schemas via `-s "schema_1,schema_2[,schema_3...]"` (overrides `-d`)
- Queries are executed per host, per schema.
- Schema name printed as output column.

#### Credentials input

You may provide credentials in the following ways:
Expand Down Expand Up @@ -144,6 +156,20 @@ Set `sync_binlog=0` on all intermediate masters:
cat /tmp/hosts.txt | ccql -q "show slave status;" | awk -F $'\t' '{print $3 ":" $5}' | sort | uniq | ccql -q "show slave status" | awk '{print $1}' | ccql -q "set global sync_binlog=0"
```

Multiple schemas:

```shell
$ cat /tmp/hosts.txt | ccql -t 0.5 -s "test,meta" -q "select uuid() from dual" | column -t
host3:3306 test d0d95311-b8ad-11e7-81e7-008cfa542442
host2:3306 meta d0d95311-b8ad-11e7-a16c-a0369fb3dc94
host2:3306 test d0d95fd6-b8ad-11e7-9a23-008cfa544064
host1:3306 meta d0d95311-b8ad-11e7-9a15-a0369fb5fdd0
host3:3306 meta d0d95311-b8ad-11e7-bd26-a0369fb5f3d8
host4:3306 meta d0d95311-b8ad-11e7-a16c-a0369fb3dc94
host1:3306 test d0d96924-b8ad-11e7-9bde-008cfa5440e4
host4:3306 test d0d99a9d-b8ad-11e7-a680-008cfa542c9e
```

## LICENSE

See [LICENSE](LICENSE). _ccql_ imports and includes 3rd party libraries, which have their own license. These are found under [vendor](vendor).
Expand Down
6 changes: 5 additions & 1 deletion go/cmd/ccql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ func main() {
askPassword := flag.Bool("ask-pass", false, "prompt for MySQL password")
credentialsFile := flag.String("C", "", "Credentials file, expecting [client] scope, with 'user', 'password' fields. Overrides -u and -p")
defaultSchema := flag.String("d", "information_schema", "Default schema to use")
schemasList := flag.String("s", "", "List of databases to query from; overrides -d, prints schema name to output")
hostsList := flag.String("h", "", "Comma or space delimited list of hosts in hostname[:port] format. If not given, hosts read from stdin")
hostsFile := flag.String("H", "", "Hosts file, hostname[:port] comma or space or newline delimited format. If not given, hosts read from stdin")
queriesText := flag.String("q", "", "Query/queries to execute")
queriesFile := flag.String("Q", "", "Query/queries input file")
timeout := flag.Float64("t", 0, "Connect timeout seconds")
maxConcurrency := flag.Uint("m", 32, "Max concurrent connections")

flag.Parse()

if AppVersion == "" {
Expand Down Expand Up @@ -117,7 +119,9 @@ func main() {
*password = string(passwd)
}

if err := logic.QueryHosts(hosts, *user, *password, *defaultSchema, queries, *maxConcurrency, *timeout); err != nil {
schemas := text.SplitNonEmpty(*schemasList, ",")

if err := logic.QueryHosts(hosts, *user, *password, *defaultSchema, schemas, queries, *maxConcurrency, *timeout); err != nil {
os.Exit(1)
}
}
52 changes: 32 additions & 20 deletions go/logic/ccql.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,29 @@ import (
"fmt"
"log"
"strings"
"sync"

"github.com/outbrain/golib/sqlutils"
)

// queryHost connects to a given host, issues the given set of queries, and outputs the results
// line per row in tab delimited format
func queryHost(host string, user string, password string, defaultSchema string, queries []string, timeout float64) error {
mysqlURI := fmt.Sprintf("%s:%s@tcp(%s)/%s?timeout=%fs", user, password, host, defaultSchema, timeout)
func queryHost(host string, user string, password string, schema string, queries []string, timeout float64, printSchema bool) error {
mysqlURI := fmt.Sprintf("%s:%s@tcp(%s)/%s?timeout=%fs", user, password, host, schema, timeout)
db, _, err := sqlutils.GetDB(mysqlURI)
if err != nil {
return err
}

for _, query := range queries {
resultData, err := sqlutils.QueryResultData(db, query)
if err != nil {
return err
}
for _, row := range resultData {
output := []string{host}
if printSchema {
output = append(output, schema)
}
for _, rowCell := range row {
output = append(output, rowCell.String)
}
Expand All @@ -35,25 +38,34 @@ func queryHost(host string, user string, password string, defaultSchema string,
}

// QueryHosts will issue concurrent queries on given list of hosts
func QueryHosts(hosts []string, user string, password string, defaultSchema string, queries []string, maxConcurrency uint, timeout float64) (anyError error) {
concurrentHosts := make(chan bool, maxConcurrency)
completedHosts := make(chan bool)

func QueryHosts(hosts []string, user string, password string,
defaultSchema string, schemas []string, queries []string,
maxConcurrency uint, timeout float64,
) (anyError error) {
concurrentQueries := make(chan bool, maxConcurrency)
printSchema := len(schemas) > 0
if len(schemas) == 0 {
schemas = []string{defaultSchema}
}
var wg sync.WaitGroup
for _, host := range hosts {
go func(host string) {
concurrentHosts <- true
if err := queryHost(host, user, password, defaultSchema, queries, timeout); err != nil {
anyError = err
log.Printf("%s %s", host, err.Error())
}
<-concurrentHosts

completedHosts <- true
}(host)
// For each host, run all queries for the respective schema
for _, schema := range schemas {
wg.Add(1)
go func(host, schema string) {
concurrentQueries <- true
defer func() { <-concurrentQueries }()
defer wg.Done()
if err := queryHost(host, user, password, schema, queries, timeout, printSchema); err != nil {
anyError = err
log.Printf("%s %s", host, err.Error())
}
}(host, schema)
}
}

// Barrier. Wait for all to complete
for range hosts {
<-completedHosts
}
wg.Wait()

return anyError
}
10 changes: 10 additions & 0 deletions go/text/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,13 @@ func ParseHosts(hostsList string, hostsFile string) (hosts []string, err error)

return hosts, err
}

func SplitNonEmpty(s string, sep string) (result []string) {
tokens := strings.Split(s, sep)
for _, token := range tokens {
if token != "" {
result = append(result, strings.TrimSpace(token))
}
}
return result
}
14 changes: 14 additions & 0 deletions go/text/hosts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,17 @@ func TestParseHostsMulti(t *testing.T) {
}
}
}

func TestSplitNonEmpty(t *testing.T) {
s := "the, quick,, brown,fox ,,"
splits := SplitNonEmpty(s, ",")

if len(splits) != 4 {
t.Errorf("expected 4 tokens; got %+v", len(splits))
}
join := strings.Join(splits, ";")
expected := "the;quick;brown;fox"
if join != expected {
t.Errorf("expected tokens: `%+v`. Got: `%+v`", expected, join)
}
}

0 comments on commit 0d38dff

Please sign in to comment.