Skip to content

Commit

Permalink
published 0.4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
PNixx committed Sep 18, 2020
1 parent cede97b commit b8be22c
Show file tree
Hide file tree
Showing 14 changed files with 363 additions and 93 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
### Version 0.4.0 (Sep 18, 2020)

* Full support migration and rollback database
* Support cluster and replica. Auto inject to SQL queries.
* Fix schema dump/load

### Version 0.3.10 (Dec 20, 2019)

* Support structure dump/load [@StoneGod](https://github.com/StoneGod)
Expand Down
22 changes: 17 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Clickhouse::Activerecord

A Ruby database ActiveRecord driver for ClickHouse. Support Rails >= 5.2.
Tested on ClickHouse version 18.14.
Support ClickHouse version from 19.14 LTS.

## Installation

Expand Down Expand Up @@ -31,8 +31,8 @@ default: &default
ssl: true # optional for using ssl connection
debug: true # use for showing in to log technical information
migrations_paths: db/clickhouse # optional, default: db/migrate_clickhouse
cluster: 'cluster_name' # optional for creating tables in cluster
replica: '{shard}' # optional for creating system tables for shards
cluster_name: 'cluster_name' # optional for creating tables in cluster
replica_name: '{replica}' # replica macros name, optional for creating replicated tables
```
## Usage in Rails 5
Expand Down Expand Up @@ -111,8 +111,7 @@ Migration:

$ rails g clickhouse_migration MIGRATION_NAME COLUMNS
$ rake clickhouse:migrate

Rollback migration not supported!
$ rake clickhouse:rollback

### Dump / Load for multiple using databases

Expand Down Expand Up @@ -159,6 +158,19 @@ ActionView.maximum(:date)
#=> 'Wed, 29 Nov 2017'
```

### Using replica and cluster params in connection parameters

```yml
default: &default
***
cluster_name: 'cluster_name'
replica_name: '{replica}'
```

`ON CLUSTER cluster_name` will be attach to all queries create / drop.

Engines `MergeTree` and all support replication engines will be replaced to `Replicated***('/clickhouse/tables/cluster_name/database.table', '{replica}')`

## Donations

Donations to this project are going directly to [PNixx](https://github.com/PNixx), the original author of this project:
Expand Down
2 changes: 1 addition & 1 deletion clickhouse-activerecord.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Gem::Specification.new do |spec|
spec.add_runtime_dependency 'activerecord', '>= 5.2'

spec.add_development_dependency 'bundler', '~> 1.15'
spec.add_development_dependency 'rake', '~> 10.0'
spec.add_development_dependency 'rake', '~> 13.0'
spec.add_development_dependency 'rspec', '~> 3.4'
spec.add_development_dependency 'pry', '~> 0.12'
end
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,26 @@ def add_table_options!(create_sql, options)

create_sql
end

def visit_TableDefinition(o)
create_sql = +"CREATE#{table_modifier_in_create(o)} #{o.view ? "VIEW" : "TABLE"} "
create_sql << "IF NOT EXISTS " if o.if_not_exists
create_sql << "#{quote_table_name(o.name)} "

statements = o.columns.map { |c| accept c }
statements << accept(o.primary_keys) if o.primary_keys

create_sql << "(#{statements.join(', ')})" if statements.present?
add_table_options!(create_sql, table_options(o))
create_sql << " AS #{to_sql(o.as)}" if o.as
create_sql
end

# Returns any SQL string to go between CREATE and TABLE. May be nil.
def table_modifier_in_create(o)
" TEMPORARY" if o.temporary
" MATERIALIZED" if o.materialized
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,35 @@ module ConnectionAdapters
module Clickhouse
class TableDefinition < ActiveRecord::ConnectionAdapters::TableDefinition

attr_reader :view, :materialized, :if_not_exists

def initialize(
conn,
name,
temporary: false,
if_not_exists: false,
options: nil,
as: nil,
comment: nil,
view: false,
materialized: false,
**
)
@conn = conn
@columns_hash = {}
@indexes = []
@foreign_keys = []
@primary_keys = nil
@temporary = temporary
@if_not_exists = if_not_exists
@options = options
@as = as
@name = @conn.apply_cluster(name)
@comment = comment
@view = view || materialized
@materialized = materialized
end

def integer(*args, **options)
if options[:limit] == 8
args.each { |name| column(name, :big_integer, options.except(:limit)) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,26 @@ def do_execute(sql, name = nil, format: 'JSONCompact', settings: {})
end
end

def assume_migrated_upto_version(version, migrations_paths = nil)
version = version.to_i
sm_table = quote_table_name(schema_migration.table_name)

migrated = migration_context.get_all_versions
versions = migration_context.migrations.map(&:version)

unless migrated.include?(version)
exec_insert "INSERT INTO #{sm_table} (version) VALUES (#{quote(version.to_s)})", nil, nil
end

inserting = (versions - migrated).select { |v| v < version }
if inserting.any?
if (duplicate = inserting.detect { |v| inserting.count(v) > 1 })
raise "Duplicate migration #{duplicate}. Please renumber your migrations to resolve the conflict."
end
execute insert_versions_sql(inserting)
end
end

private

def apply_format(sql, format)
Expand Down Expand Up @@ -95,11 +115,7 @@ def schema_creation
end

def create_table_definition(*args)
if ActiveRecord::version >= Gem::Version.new('6')
Clickhouse::TableDefinition.new(self, *args)
else
Clickhouse::TableDefinition.new(*args)
end
Clickhouse::TableDefinition.new(self, *args)
end

def new_column_from_field(table_name, field)
Expand Down
70 changes: 53 additions & 17 deletions lib/active_record/connection_adapters/clickhouse_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require 'clickhouse-activerecord/arel/visitors/to_sql'
require 'clickhouse-activerecord/arel/table'
require 'clickhouse-activerecord/migration'
require 'active_record/connection_adapters/clickhouse/oid/date'
require 'active_record/connection_adapters/clickhouse/oid/date_time'
require 'active_record/connection_adapters/clickhouse/oid/big_integer'
Expand Down Expand Up @@ -110,17 +111,17 @@ def initialize(logger, connection_parameters, config, full_config)

# Support SchemaMigration from v5.2.2 to v6+
def schema_migration # :nodoc:
if ActiveRecord::version >= Gem::Version.new('6')
super
else
ActiveRecord::SchemaMigration
end
ClickhouseActiverecord::SchemaMigration
end

def migrations_paths
@full_config[:migrations_paths] || 'db/migrate_clickhouse'
end

def migration_context # :nodoc:
ClickhouseActiverecord::MigrationContext.new(migrations_paths, schema_migration)
end

def arel_visitor # :nodoc:
ClickhouseActiverecord::Arel::Visitors::ToSql.new(self)
end
Expand Down Expand Up @@ -209,12 +210,29 @@ def create_database(name)
end
end

def create_table(table_name, comment: nil, **options)
super(
apply_cluster(table_name),
comment: comment,
**options
)
def create_view(table_name, **options)
options.merge!(view: true)
options = apply_replica(table_name, options)
td = create_table_definition(table_name, options)
yield td if block_given?

if options[:force]
drop_table(table_name, options.merge(if_exists: true))
end

execute schema_creation.accept td
end

def create_table(table_name, **options)
options = apply_replica(table_name, options)
td = create_table_definition(table_name, options)
yield td if block_given?

if options[:force]
drop_table(table_name, options.merge(if_exists: true))
end

execute schema_creation.accept td
end

# Drops a ClickHouse database.
Expand All @@ -230,6 +248,22 @@ def drop_table(table_name, options = {}) # :nodoc:
do_execute apply_cluster "DROP TABLE#{' IF EXISTS' if options[:if_exists]} #{quote_table_name(table_name)}"
end

def cluster
@full_config[:cluster_name]
end

def replica
@full_config[:replica_name]
end

def replica_path(table)
"/clickhouse/tables/#{cluster}/#{@config[:database]}.#{table}"
end

def apply_cluster(sql)
cluster ? "#{sql} ON CLUSTER #{cluster}" : sql
end

protected

def last_inserted_id(result)
Expand All @@ -242,12 +276,14 @@ def connect
@connection = Net::HTTP.start(@connection_parameters[0], @connection_parameters[1], use_ssl: @connection_parameters[2], verify_mode: OpenSSL::SSL::VERIFY_NONE)
end

def cluster
@full_config[:cluster]
end

def apply_cluster(sql)
cluster ? "#{sql} ON CLUSTER #{cluster}" : sql
def apply_replica(table, options)
if replica && cluster
match = options[:options].match(/^(.*?MergeTree)\(([^\)]*)\)(.*?)$/)
if match
options[:options] = "Replicated#{match[1]}(#{([replica_path(table), replica].map{|v| "'#{v}'"} + [match[2].presence]).compact.join(', ')})#{match[3]}"
end
end
options
end
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/clickhouse-activerecord.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

if defined?(Rails::Railtie)
require 'clickhouse-activerecord/railtie'
require 'clickhouse-activerecord/schema'
require 'clickhouse-activerecord/schema_dumper'
require 'clickhouse-activerecord/tasks'
ActiveRecord::Tasks::DatabaseTasks.register_task(/clickhouse/, "ClickhouseActiverecord::Tasks")
Expand Down
92 changes: 92 additions & 0 deletions lib/clickhouse-activerecord/migration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
module ClickhouseActiverecord

class SchemaMigration < ::ActiveRecord::SchemaMigration
class << self

def create_table
unless table_exists?
version_options = connection.internal_string_options_for_primary_key

connection.create_table(table_name, id: false, options: 'ReplacingMergeTree(ver) PARTITION BY version ORDER BY (version)') do |t|
t.string :version, version_options
t.column :active, 'Int8', null: false, default: '1'
t.datetime :ver, null: false, default: -> { 'now()' }
end
end
end

def all_versions
from("#{table_name} FINAL").where(active: 1).order(:version).pluck(:version)
end
end
end

class InternalMetadata < ::ActiveRecord::InternalMetadata
class << self
def create_table
unless table_exists?
key_options = connection.internal_string_options_for_primary_key

connection.create_table(table_name, id: false, options: 'MergeTree() PARTITION BY toDate(created_at) ORDER BY (created_at)') do |t|
t.string :key, key_options
t.string :value
t.timestamps
end
end
end
end
end

class MigrationContext < ::ActiveRecord::MigrationContext #:nodoc:
attr_reader :migrations_paths, :schema_migration

def initialize(migrations_paths, schema_migration)
@migrations_paths = migrations_paths
@schema_migration = schema_migration
end

def down(target_version = nil)
selected_migrations = if block_given?
migrations.select { |m| yield m }
else
migrations
end

ClickhouseActiverecord::Migrator.new(:down, selected_migrations, schema_migration, target_version).migrate
end

def get_all_versions
if schema_migration.table_exists?
schema_migration.all_versions.map(&:to_i)
else
[]
end
end

end

class Migrator < ::ActiveRecord::Migrator

def initialize(direction, migrations, schema_migration, target_version = nil)
@direction = direction
@target_version = target_version
@migrated_versions = nil
@migrations = migrations
@schema_migration = schema_migration

validate(@migrations)

@schema_migration.create_table
ClickhouseActiverecord::InternalMetadata.create_table
end

def record_version_state_after_migrating(version)
if down?
migrated.delete(version)
@schema_migration.create!(version: version.to_s, active: 0)
else
super
end
end
end
end
Loading

0 comments on commit b8be22c

Please sign in to comment.