diff --git a/README.md b/README.md index 5a18c05..dd7b5c9 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,8 @@ Usage of ccql: MySQL password -q string Query/queries to execute + -s string + List of databases to query from; overrides -d, prints schema name to output -t float Connect timeout seconds -u string @@ -58,6 +60,16 @@ You may provide a query or a list of queries in the following ways: Queries are delimited by a semicolon (`;`). The last query may, but does not have to, be terminated by a semicolon. Quotes are respected, up to a reasonable level. It is valid to include a semicolon in a quoted text, as in `select 'single;query'`. However `ccql` does not employ a full blown parser, so please don't overdo it. For example, the following may not be parsed correctly: `select '\';\''`. You get it. +#### Schemas + +You may either provide: + +- An implicit, default schema via `-d schema_name` + - Schema name is not visible on output. +- Or explicit list of schemas via `-s "schema_1,schema_2[,schema_3...]"` (overrides `-d`) + - Queries are executed per host, per schema. + - Schema name printed as output column. + #### Credentials input You may provide credentials in the following ways: @@ -144,6 +156,20 @@ Set `sync_binlog=0` on all intermediate masters: cat /tmp/hosts.txt | ccql -q "show slave status;" | awk -F $'\t' '{print $3 ":" $5}' | sort | uniq | ccql -q "show slave status" | awk '{print $1}' | ccql -q "set global sync_binlog=0" ``` +Multiple schemas: + +```shell +$ cat /tmp/hosts.txt | ccql -t 0.5 -s "test,meta" -q "select uuid() from dual" | column -t +host3:3306 test d0d95311-b8ad-11e7-81e7-008cfa542442 +host2:3306 meta d0d95311-b8ad-11e7-a16c-a0369fb3dc94 +host2:3306 test d0d95fd6-b8ad-11e7-9a23-008cfa544064 +host1:3306 meta d0d95311-b8ad-11e7-9a15-a0369fb5fdd0 +host3:3306 meta d0d95311-b8ad-11e7-bd26-a0369fb5f3d8 +host4:3306 meta d0d95311-b8ad-11e7-a16c-a0369fb3dc94 +host1:3306 test d0d96924-b8ad-11e7-9bde-008cfa5440e4 +host4:3306 test d0d99a9d-b8ad-11e7-a680-008cfa542c9e +``` + ## LICENSE See [LICENSE](LICENSE). _ccql_ imports and includes 3rd party libraries, which have their own license. These are found under [vendor](vendor). diff --git a/go/cmd/ccql/main.go b/go/cmd/ccql/main.go index ec2c02a..124e3af 100644 --- a/go/cmd/ccql/main.go +++ b/go/cmd/ccql/main.go @@ -40,12 +40,14 @@ func main() { askPassword := flag.Bool("ask-pass", false, "prompt for MySQL password") credentialsFile := flag.String("C", "", "Credentials file, expecting [client] scope, with 'user', 'password' fields. Overrides -u and -p") defaultSchema := flag.String("d", "information_schema", "Default schema to use") + schemasList := flag.String("s", "", "List of databases to query from; overrides -d, prints schema name to output") hostsList := flag.String("h", "", "Comma or space delimited list of hosts in hostname[:port] format. If not given, hosts read from stdin") hostsFile := flag.String("H", "", "Hosts file, hostname[:port] comma or space or newline delimited format. If not given, hosts read from stdin") queriesText := flag.String("q", "", "Query/queries to execute") queriesFile := flag.String("Q", "", "Query/queries input file") timeout := flag.Float64("t", 0, "Connect timeout seconds") maxConcurrency := flag.Uint("m", 32, "Max concurrent connections") + flag.Parse() if AppVersion == "" { @@ -117,7 +119,9 @@ func main() { *password = string(passwd) } - if err := logic.QueryHosts(hosts, *user, *password, *defaultSchema, queries, *maxConcurrency, *timeout); err != nil { + schemas := text.SplitNonEmpty(*schemasList, ",") + + if err := logic.QueryHosts(hosts, *user, *password, *defaultSchema, schemas, queries, *maxConcurrency, *timeout); err != nil { os.Exit(1) } } diff --git a/go/logic/ccql.go b/go/logic/ccql.go index 9e0a39a..1fd3303 100644 --- a/go/logic/ccql.go +++ b/go/logic/ccql.go @@ -4,19 +4,19 @@ import ( "fmt" "log" "strings" + "sync" "github.com/outbrain/golib/sqlutils" ) // queryHost connects to a given host, issues the given set of queries, and outputs the results // line per row in tab delimited format -func queryHost(host string, user string, password string, defaultSchema string, queries []string, timeout float64) error { - mysqlURI := fmt.Sprintf("%s:%s@tcp(%s)/%s?timeout=%fs", user, password, host, defaultSchema, timeout) +func queryHost(host string, user string, password string, schema string, queries []string, timeout float64, printSchema bool) error { + mysqlURI := fmt.Sprintf("%s:%s@tcp(%s)/%s?timeout=%fs", user, password, host, schema, timeout) db, _, err := sqlutils.GetDB(mysqlURI) if err != nil { return err } - for _, query := range queries { resultData, err := sqlutils.QueryResultData(db, query) if err != nil { @@ -24,6 +24,9 @@ func queryHost(host string, user string, password string, defaultSchema string, } for _, row := range resultData { output := []string{host} + if printSchema { + output = append(output, schema) + } for _, rowCell := range row { output = append(output, rowCell.String) } @@ -35,25 +38,34 @@ func queryHost(host string, user string, password string, defaultSchema string, } // QueryHosts will issue concurrent queries on given list of hosts -func QueryHosts(hosts []string, user string, password string, defaultSchema string, queries []string, maxConcurrency uint, timeout float64) (anyError error) { - concurrentHosts := make(chan bool, maxConcurrency) - completedHosts := make(chan bool) - +func QueryHosts(hosts []string, user string, password string, + defaultSchema string, schemas []string, queries []string, + maxConcurrency uint, timeout float64, +) (anyError error) { + concurrentQueries := make(chan bool, maxConcurrency) + printSchema := len(schemas) > 0 + if len(schemas) == 0 { + schemas = []string{defaultSchema} + } + var wg sync.WaitGroup for _, host := range hosts { - go func(host string) { - concurrentHosts <- true - if err := queryHost(host, user, password, defaultSchema, queries, timeout); err != nil { - anyError = err - log.Printf("%s %s", host, err.Error()) - } - <-concurrentHosts - - completedHosts <- true - }(host) + // For each host, run all queries for the respective schema + for _, schema := range schemas { + wg.Add(1) + go func(host, schema string) { + concurrentQueries <- true + defer func() { <-concurrentQueries }() + defer wg.Done() + if err := queryHost(host, user, password, schema, queries, timeout, printSchema); err != nil { + anyError = err + log.Printf("%s %s", host, err.Error()) + } + }(host, schema) + } } + // Barrier. Wait for all to complete - for range hosts { - <-completedHosts - } + wg.Wait() + return anyError } diff --git a/go/text/hosts.go b/go/text/hosts.go index da4299b..d6948d6 100644 --- a/go/text/hosts.go +++ b/go/text/hosts.go @@ -43,3 +43,13 @@ func ParseHosts(hostsList string, hostsFile string) (hosts []string, err error) return hosts, err } + +func SplitNonEmpty(s string, sep string) (result []string) { + tokens := strings.Split(s, sep) + for _, token := range tokens { + if token != "" { + result = append(result, strings.TrimSpace(token)) + } + } + return result +} diff --git a/go/text/hosts_test.go b/go/text/hosts_test.go index 7dafdbb..c292d3b 100644 --- a/go/text/hosts_test.go +++ b/go/text/hosts_test.go @@ -48,3 +48,17 @@ func TestParseHostsMulti(t *testing.T) { } } } + +func TestSplitNonEmpty(t *testing.T) { + s := "the, quick,, brown,fox ,," + splits := SplitNonEmpty(s, ",") + + if len(splits) != 4 { + t.Errorf("expected 4 tokens; got %+v", len(splits)) + } + join := strings.Join(splits, ";") + expected := "the;quick;brown;fox" + if join != expected { + t.Errorf("expected tokens: `%+v`. Got: `%+v`", expected, join) + } +}