diff --git a/lib/timescaledb.rb b/lib/timescaledb.rb index 4dc31db..60cde6c 100644 --- a/lib/timescaledb.rb +++ b/lib/timescaledb.rb @@ -3,6 +3,7 @@ require_relative 'timescaledb/application_record' require_relative 'timescaledb/acts_as_hypertable' require_relative 'timescaledb/acts_as_hypertable/core' +require_relative 'timescaledb/continuous_aggregates_helper' require_relative 'timescaledb/connection' require_relative 'timescaledb/toolkit' require_relative 'timescaledb/chunk' diff --git a/lib/timescaledb/acts_as_hypertable/core.rb b/lib/timescaledb/acts_as_hypertable/core.rb index 1c6eaf3..6290f5a 100644 --- a/lib/timescaledb/acts_as_hypertable/core.rb +++ b/lib/timescaledb/acts_as_hypertable/core.rb @@ -35,7 +35,7 @@ def define_association_scopes CompressionSettings.where(hypertable_name: table_name) end - scope :continuous_aggregates, -> do + scope :caggs, -> do ContinuousAggregates.where(hypertable_name: table_name) end end diff --git a/lib/timescaledb/continuous_aggregates.rb b/lib/timescaledb/continuous_aggregates.rb index 6f00d22..5c4e61f 100644 --- a/lib/timescaledb/continuous_aggregates.rb +++ b/lib/timescaledb/continuous_aggregates.rb @@ -1,5 +1,5 @@ module Timescaledb - class ContinuousAggregate < ::Timescaledb::ApplicationRecord + class ContinuousAggregates < ::Timescaledb::ApplicationRecord self.table_name = "timescaledb_information.continuous_aggregates" self.primary_key = 'materialization_hypertable_name' @@ -39,5 +39,4 @@ class ContinuousAggregate < ::Timescaledb::ApplicationRecord end end end - ContinuousAggregates = ContinuousAggregate end diff --git a/lib/timescaledb/continuous_aggregates_helper.rb b/lib/timescaledb/continuous_aggregates_helper.rb new file mode 100644 index 0000000..700feae --- /dev/null +++ b/lib/timescaledb/continuous_aggregates_helper.rb @@ -0,0 +1,119 @@ +module Timescaledb + module ContinuousAggregatesHelper + extend ActiveSupport::Concern + + class_methods do + def continuous_aggregates(options = {}) + @time_column = options[:time_column] || 'ts' + @timeframes = options[:timeframes] || [:minute, :hour, :day, :week, :month, :year] + + scopes = options[:scopes] || [] + @aggregates = {} + + scopes.each do |scope_name| + @aggregates[scope_name] = { + scope_name: scope_name, + select: nil, + group_by: nil, + refresh_policy: options[:refresh_policy] || {} + } + end + + # Allow for custom aggregate definitions to override or add to scope-based ones + @aggregates.merge!(options[:aggregates] || {}) + + define_continuous_aggregate_classes + end + + def refresh_aggregates(timeframes = nil) + timeframes ||= @timeframes + @aggregates.each do |aggregate_name, _| + timeframes.each do |timeframe| + klass = const_get("#{aggregate_name}_per_#{timeframe}".classify) + klass.refresh! + end + end + end + + def create_continuous_aggregates(with_data: false) + @aggregates.each do |aggregate_name, config| + previous_timeframe = nil + @timeframes.each do |timeframe| + klass = const_get("#{aggregate_name}_per_#{timeframe}".classify) + interval = "'1 #{timeframe.to_s}'" + base_query = + if previous_timeframe + prev_klass = const_get("#{aggregate_name}_per_#{previous_timeframe}".classify) + prev_klass + .select("time_bucket(#{interval}, #{@time_column}) as #{@time_column}, #{config[:select]}") + .group(1, *config[:group_by]) + else + scope = public_send(config[:scope_name]) + select_values = scope.select_values.join(', ') + group_values = scope.group_values + + config[:select] = select_values.gsub('count(*) as total', 'sum(total) as total') + config[:group_by] = (2...(2 + group_values.size)).map(&:to_s).join(', ') + + self.select("time_bucket(#{interval}, #{@time_column}) as #{@time_column}, #{select_values}") + .group(1, *group_values) + end + + connection.execute <<~SQL + CREATE MATERIALIZED VIEW IF NOT EXISTS #{klass.table_name} + WITH (timescaledb.continuous) AS + #{base_query.to_sql} + #{with_data ? 'WITH DATA' : 'WITH NO DATA'}; + SQL + + if (policy = klass.refresh_policy) + connection.execute <<~SQL + SELECT add_continuous_aggregate_policy('#{klass.table_name}', + start_offset => INTERVAL '#{policy[:start_offset]}', + end_offset => INTERVAL '#{policy[:end_offset]}', + schedule_interval => INTERVAL '#{policy[:schedule_interval]}'); + SQL + end + + previous_timeframe = timeframe + end + end + end + + private + + def define_continuous_aggregate_classes + @aggregates.each do |aggregate_name, config| + @timeframes.each do |timeframe| + _table_name = "#{aggregate_name}_per_#{timeframe}" + class_name = "#{aggregate_name}_per_#{timeframe}".classify + const_set(class_name, Class.new(ActiveRecord::Base) do + extend ActiveModel::Naming + + class << self + attr_accessor :config, :timeframe + end + + self.table_name = _table_name + self.config = config + self.timeframe = timeframe + + + def self.refresh! + connection.execute("CALL refresh_continuous_aggregate('#{table_name}', null, null);") + end + + def readonly? + true + end + + def self.refresh_policy + config[:refresh_policy]&.dig(timeframe) + end + end) + end + end + end + end + end +end diff --git a/spec/timescaledb/continuos_aggregates_helper_spec.rb b/spec/timescaledb/continuos_aggregates_helper_spec.rb new file mode 100644 index 0000000..a4d5c21 --- /dev/null +++ b/spec/timescaledb/continuos_aggregates_helper_spec.rb @@ -0,0 +1,146 @@ +require 'spec_helper' + +class Download < ActiveRecord::Base + include Timescaledb::ContinuousAggregatesHelper + + acts_as_hypertable time_column: 'ts' + + scope :total_downloads, -> { select("count(*) as total") } + scope :downloads_by_gem, -> { select("gem_name, count(*) as total").group(:gem_name) } + scope :downloads_by_version, -> { select("gem_name, gem_version, count(*) as total").group(:gem_name, :gem_version) } + + continuous_aggregates( + time_column: 'ts', + timeframes: [:minute, :hour, :day, :month], + scopes: [:total_downloads, :downloads_by_gem, :downloads_by_version], + refresh_policy: { + minute: { start_offset: "10 minutes", end_offset: "1 minute", schedule_interval: "1 minute" }, + hour: { start_offset: "4 hour", end_offset: "1 hour", schedule_interval: "1 hour" }, + day: { start_offset: "3 day", end_offset: "1 day", schedule_interval: "1 hour" }, + month: { start_offset: "3 month", end_offset: "1 hour", schedule_interval: "1 hour" } + } + ) +end + +RSpec.describe Timescaledb::ContinuousAggregatesHelper do + let(:test_class) do + Download + end + + before(:all) do + ActiveRecord::Base.connection.instance_exec do + hypertable_options = { + time_column: 'ts', + chunk_time_interval: '1 day', + compress_segmentby: 'gem_name, gem_version', + compress_orderby: 'ts DESC', + } + create_table(:downloads, id: false, hypertable: hypertable_options) do |t| + t.timestamptz :ts, null: false + t.text :gem_name, :gem_version, null: false + t.jsonb :payload + end + end + end + + after(:all) do + ActiveRecord::Base.connection.drop_table :downloads, if_exists: true + end + + describe '.continuous_aggregates' do + it 'defines aggregate classes' do + expect(test_class.const_defined?(:TotalDownloadsPerMinute)).to be true + expect(test_class.const_defined?(:TotalDownloadsPerHour)).to be true + expect(test_class.const_defined?(:TotalDownloadsPerDay)).to be true + expect(test_class.const_defined?(:TotalDownloadsPerMonth)).to be true + + expect(test_class.const_defined?(:DownloadsByVersionPerMinute)).to be true + expect(test_class.const_defined?(:DownloadsByVersionPerHour)).to be true + expect(test_class.const_defined?(:DownloadsByVersionPerDay)).to be true + expect(test_class.const_defined?(:DownloadsByVersionPerMonth)).to be true + + expect(test_class.const_defined?(:DownloadsByGemPerMinute)).to be true + expect(test_class.const_defined?(:DownloadsByGemPerHour)).to be true + expect(test_class.const_defined?(:DownloadsByGemPerDay)).to be true + expect(test_class.const_defined?(:DownloadsByGemPerMonth)).to be true + end + + it 'sets up correct table names for aggregates' do + expect(test_class::TotalDownloadsPerMinute.table_name).to eq('total_downloads_per_minute') + expect(test_class::TotalDownloadsPerHour.table_name).to eq('total_downloads_per_hour') + expect(test_class::TotalDownloadsPerDay.table_name).to eq('total_downloads_per_day') + expect(test_class::TotalDownloadsPerMonth.table_name).to eq('total_downloads_per_month') + + expect(test_class::DownloadsByVersionPerMinute.table_name).to eq('downloads_by_version_per_minute') + expect(test_class::DownloadsByVersionPerHour.table_name).to eq('downloads_by_version_per_hour') + expect(test_class::DownloadsByVersionPerDay.table_name).to eq('downloads_by_version_per_day') + expect(test_class::DownloadsByVersionPerMonth.table_name).to eq('downloads_by_version_per_month') + + expect(test_class::DownloadsByGemPerMinute.table_name).to eq('downloads_by_gem_per_minute') + expect(test_class::DownloadsByGemPerHour.table_name).to eq('downloads_by_gem_per_hour') + expect(test_class::DownloadsByGemPerDay.table_name).to eq('downloads_by_gem_per_day') + expect(test_class::DownloadsByGemPerMonth.table_name).to eq('downloads_by_gem_per_month') + end + + it 'defines rollup scope for aggregates' do + test_class.create_continuous_aggregates + aggregate_classes = [test_class::TotalDownloadsPerMinute, test_class::TotalDownloadsPerHour, test_class::TotalDownloadsPerDay, test_class::TotalDownloadsPerMonth] + aggregate_classes.each do |agg_class| + expect(agg_class).to respond_to(:rollup) + expect(agg_class.rollup.to_sql).to include('time_bucket') + expect(agg_class.rollup.to_sql).to include('count(*) as total') + end + end + + it 'defines time-based scopes for aggregates' do + aggregate_classes = [test_class::TotalDownloadsPerMinute, test_class::TotalDownloadsPerHour, test_class::TotalDownloadsPerDay, test_class::TotalDownloadsPerMonth] + aggregate_scopes = [:total_downloads, :downloads_by_gem, :downloads_by_version] + + aggregate_scopes.each do |scope| + aggregate_classes.each do |agg_class| + expect(agg_class).to respond_to(scope) + end + end + end + end + + describe '.create_continuous_aggregates' do + before do + allow(ActiveRecord::Base.connection).to receive(:execute).and_call_original + end + + it 'creates materialized views for each aggregate' do + test_class.create_continuous_aggregates + + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW.*downloads_total_downloads_per_minute/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW.*downloads_total_downloads_per_hour/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW.*downloads_total_downloads_per_day/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW.*downloads_total_downloads_per_month/i) + end + + it 'sets up refresh policies for each aggregate' do + test_class.create_continuous_aggregates + + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_minutely/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_total_downloads_per_hour/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_total_downloads_per_day/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_total_downloads_per_month/i) + end + end + + describe 'refresh policies' do + it 'defines appropriate refresh policies for each timeframe' do + policies = { + minute: { start_offset: "INTERVAL '10 minutes'", end_offset: "INTERVAL '1 minute'", schedule_interval: "INTERVAL '1 minute'" }, + hour: { start_offset: "INTERVAL '4 hour'", end_offset: "INTERVAL '1 hour'", schedule_interval: "INTERVAL '1 hour'" }, + day: { start_offset: "INTERVAL '3 day'", end_offset: "INTERVAL '1 day'", schedule_interval: "INTERVAL '1 day'" }, + month: { start_offset: "INTERVAL '3 month'", end_offset: "INTERVAL '1 day'", schedule_interval: "INTERVAL '1 day'" } + } + + policies.each do |timeframe, expected_policy| + actual_policy = test_class.const_get(timeframe).refresh_policy + expect(actual_policy).to eq(expected_policy) + end + end + end +end