-
Notifications
You must be signed in to change notification settings - Fork 51
/
Copy pathpostgres_to_redshift.rb
147 lines (117 loc) · 4.8 KB
/
postgres_to_redshift.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
require "postgres_to_redshift/version"
require 'pg'
require 'uri'
require 'aws-sdk'
require 'zlib'
require 'tempfile'
require "postgres_to_redshift/table"
require "postgres_to_redshift/column"
class PostgresToRedshift
class << self
attr_accessor :source_uri, :target_uri
end
attr_reader :source_connection, :target_connection, :s3
KILOBYTE = 1024
MEGABYTE = KILOBYTE * 1024
GIGABYTE = MEGABYTE * 1024
def self.update_tables
update_tables = PostgresToRedshift.new
update_tables.tables.each do |table|
target_connection.exec("CREATE TABLE IF NOT EXISTS #{schema}.#{target_connection.quote_ident(table.target_table_name)} (#{table.columns_for_create})")
update_tables.copy_table(table)
update_tables.import_table(table)
end
end
def self.source_uri
@source_uri ||= URI.parse(ENV['POSTGRES_TO_REDSHIFT_SOURCE_URI'])
end
def self.target_uri
@target_uri ||= URI.parse(ENV['POSTGRES_TO_REDSHIFT_TARGET_URI'])
end
def self.source_connection
unless instance_variable_defined?(:"@source_connection")
@source_connection = PG::Connection.new(host: source_uri.host, port: source_uri.port, user: source_uri.user || ENV['USER'], password: source_uri.password, dbname: source_uri.path[1..-1])
@source_connection.exec("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;")
end
@source_connection
end
def self.target_connection
unless instance_variable_defined?(:"@target_connection")
@target_connection = PG::Connection.new(host: target_uri.host, port: target_uri.port, user: target_uri.user || ENV['USER'], password: target_uri.password, dbname: target_uri.path[1..-1])
end
@target_connection
end
def self.schema
ENV.fetch('POSTGRES_TO_REDSHIFT_TARGET_SCHEMA')
end
def source_connection
self.class.source_connection
end
def target_connection
self.class.target_connection
end
def tables
source_connection.exec("SELECT * FROM information_schema.tables WHERE table_schema = 'public' AND table_type in ('BASE TABLE', 'VIEW')").map do |table_attributes|
table = Table.new(attributes: table_attributes)
next if table.name =~ /^pg_/
table.columns = column_definitions(table)
table
end.compact
end
def column_definitions(table)
source_connection.exec("SELECT * FROM information_schema.columns WHERE table_schema='public' AND table_name='#{table.name}' order by ordinal_position")
end
def s3
@s3 ||= AWS::S3.new(access_key_id: ENV['S3_DATABASE_EXPORT_ID'], secret_access_key: ENV['S3_DATABASE_EXPORT_KEY'])
end
def bucket
@bucket ||= s3.buckets[ENV['S3_DATABASE_EXPORT_BUCKET']]
end
def copy_table(table)
tmpfile = Tempfile.new("psql2rs")
zip = Zlib::GzipWriter.new(tmpfile)
chunksize = 5 * GIGABYTE # uncompressed
chunk = 1
bucket.objects.with_prefix("export/#{table.target_table_name}.psv.gz").delete_all
begin
puts "Downloading #{table}"
copy_command = "COPY (SELECT #{table.columns_for_copy} FROM #{table.name}) TO STDOUT WITH DELIMITER '|'"
source_connection.copy_data(copy_command) do
while row = source_connection.get_copy_data
zip.write(row)
if (zip.pos > chunksize)
zip.finish
tmpfile.rewind
upload_table(table, tmpfile, chunk)
chunk += 1
zip.close unless zip.closed?
tmpfile.unlink
tmpfile = Tempfile.new("psql2rs")
zip = Zlib::GzipWriter.new(tmpfile)
end
end
end
zip.finish
tmpfile.rewind
upload_table(table, tmpfile, chunk)
source_connection.reset
ensure
zip.close unless zip.closed?
tmpfile.unlink
end
end
def upload_table(table, buffer, chunk)
puts "Uploading #{table.target_table_name}.#{chunk}"
bucket.objects["export/#{table.target_table_name}.psv.gz.#{chunk}"].write(buffer, acl: :authenticated_read)
end
def import_table(table)
puts "Importing #{table.target_table_name}"
schema = self.class.schema
target_connection.exec("DROP TABLE IF EXISTS #{schema}.#{table.target_table_name}_updating")
target_connection.exec("BEGIN;")
target_connection.exec("ALTER TABLE #{schema}.#{target_connection.quote_ident(table.target_table_name)} RENAME TO #{table.target_table_name}_updating")
target_connection.exec("CREATE TABLE #{schema}.#{target_connection.quote_ident(table.target_table_name)} (#{table.columns_for_create})")
target_connection.exec("COPY #{schema}.#{target_connection.quote_ident(table.target_table_name)} FROM 's3://#{ENV['S3_DATABASE_EXPORT_BUCKET']}/export/#{table.target_table_name}.psv.gz' CREDENTIALS 'aws_access_key_id=#{ENV['S3_DATABASE_EXPORT_ID']};aws_secret_access_key=#{ENV['S3_DATABASE_EXPORT_KEY']}' GZIP TRUNCATECOLUMNS ESCAPE DELIMITER as '|';")
target_connection.exec("COMMIT;")
end
end