@@ -50,22 +50,124 @@ def backfill(logger, db, table, batch_size: 10_000, iterations: -1)
50
50
51
51
logger . info ( "starting bigint backfill on table '#{ table } ' (batch_size: #{ batch_size } , iterations: #{ iterations } )" )
52
52
loop do
53
- updated_rows = db .
54
- from ( table , :batch ) .
55
- with ( :batch , db [ table ] . select ( :id ) . where ( id_bigint : nil ) . order ( :id ) . limit ( batch_size ) . for_update . skip_locked ) .
56
- where ( Sequel . qualify ( table , :id ) => :batch__id ) .
57
- update ( id_bigint : :batch__id )
53
+ updated_rows = backfill_batch ( db , table , :id , :id_bigint , batch_size )
58
54
logger . info ( "updated #{ updated_rows } rows" )
59
55
iterations -= 1 if iterations > 0
60
56
break if updated_rows < batch_size || iterations == 0
61
57
end
62
58
logger . info ( "finished bigint backfill on table '#{ table } '" )
63
59
end
64
60
61
+ def migration_completed? ( db , table )
62
+ column_type ( db , table , :id ) == 'bigint'
63
+ end
64
+
65
+ def migration_skipped? ( db , table )
66
+ !column_exists? ( db , table , :id_bigint )
67
+ end
68
+
69
+ def add_check_constraint ( db , table )
70
+ return if has_check_constraint? ( db , table )
71
+
72
+ constraint_name = check_constraint_name ( table )
73
+ db . alter_table ( table ) do
74
+ add_constraint ( constraint_name ) do
75
+ Sequel . lit ( 'id_bigint IS NOT NULL AND id_bigint = id' )
76
+ end
77
+ end
78
+ end
79
+
80
+ def drop_check_constraint ( db , table )
81
+ return unless has_check_constraint? ( db , table )
82
+
83
+ constraint_name = check_constraint_name ( table )
84
+ db . alter_table ( table ) do
85
+ drop_constraint ( constraint_name )
86
+ end
87
+ end
88
+
89
+ def has_check_constraint? ( db , table )
90
+ check_constraint_exists? ( db , table , check_constraint_name ( table ) )
91
+ end
92
+
93
+ def drop_pk_column ( db , table )
94
+ db . drop_column ( table , :id , if_exists : true )
95
+ end
96
+
97
+ def add_id_column ( db , table )
98
+ db . add_column ( table , :id , :integer , if_not_exists : true )
99
+ end
100
+
101
+ def rename_bigint_column ( db , table )
102
+ db . rename_column ( table , :id_bigint , :id ) if column_exists? ( db , table , :id_bigint ) && !column_exists? ( db , table , :id )
103
+ end
104
+
105
+ def revert_bigint_column_name ( db , table )
106
+ db . rename_column ( table , :id , :id_bigint ) if column_exists? ( db , table , :id ) && column_type ( db , table , :id ) == 'bigint' && !column_exists? ( db , table , :id_bigint )
107
+ end
108
+
109
+ def add_pk_constraint ( db , table )
110
+ return if db . primary_key ( table ) == 'id'
111
+
112
+ db . alter_table ( table ) do
113
+ add_primary_key ( [ :id ] )
114
+ end
115
+ end
116
+
117
+ def drop_pk_constraint ( db , table )
118
+ return unless db . primary_key ( table ) == 'id'
119
+
120
+ constraint_name = pk_constraint_name ( table )
121
+ db . alter_table ( table ) do
122
+ drop_constraint ( constraint_name )
123
+ set_column_allow_null ( :id , true )
124
+ end
125
+ end
126
+
127
+ def add_timestamp_pk_index ( db , table )
128
+ db . add_index ( table , %i[ timestamp id ] , name : timestamp_id_index_name ( table ) , unique : false , if_not_exists : true )
129
+ end
130
+
131
+ def drop_timestamp_pk_index ( db , table )
132
+ db . drop_index ( table , %i[ timestamp id ] , name : timestamp_id_index_name ( table ) , if_exists : true )
133
+ end
134
+
135
+ def set_pk_as_identity_with_correct_start_value ( db , table )
136
+ return if column_attribute ( db , table , :id , :auto_increment ) == true
137
+
138
+ block = <<~BLOCK
139
+ DO $$
140
+ DECLARE
141
+ max_id BIGINT;
142
+ BEGIN
143
+ SELECT COALESCE(MAX(id), 0) + 1 INTO max_id FROM #{ table } ;
144
+
145
+ EXECUTE format('ALTER TABLE #{ table } ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (START WITH %s)', max_id);
146
+ END $$;
147
+ BLOCK
148
+ db . run ( block )
149
+ end
150
+
151
+ def drop_identity ( db , table )
152
+ db . run ( "ALTER TABLE #{ table } ALTER COLUMN id DROP IDENTITY IF EXISTS" )
153
+ end
154
+
155
+ def backfill_id ( db , table )
156
+ batch_size = 10_000
157
+ loop do
158
+ updated_rows = backfill_batch ( db , table , :id_bigint , :id , batch_size )
159
+ break if updated_rows < batch_size
160
+ end
161
+ end
162
+
65
163
private
66
164
165
+ def column_attribute ( db , table , column , attribute )
166
+ db . schema ( table ) . find { |col , _ | col == column } &.dig ( 1 , attribute )
167
+ end
168
+
67
169
def column_type ( db , table , column )
68
- db . schema ( table ) . find { | col , _ | col == column } &. dig ( 1 , :db_type )
170
+ column_attribute ( db , table , column , :db_type )
69
171
end
70
172
71
173
def function_name ( table )
@@ -79,5 +181,29 @@ def trigger_name(table)
79
181
def column_exists? ( db , table , column )
80
182
db [ table ] . columns . include? ( column )
81
183
end
184
+
185
+ def check_constraint_name ( table )
186
+ :"#{ table } _check_id_bigint_matches_id"
187
+ end
188
+
189
+ def check_constraint_exists? ( db , table , constraint_name )
190
+ db . check_constraints ( table ) . include? ( constraint_name )
191
+ end
192
+
193
+ def pk_constraint_name ( table )
194
+ :"#{ table } _pkey"
195
+ end
196
+
197
+ def timestamp_id_index_name ( table )
198
+ :"#{ table } _timestamp_id_index"
199
+ end
200
+
201
+ def backfill_batch ( db , table , from_column , to_column , batch_size )
202
+ db .
203
+ from ( table , :batch ) .
204
+ with ( :batch , db [ table ] . select ( from_column ) . where ( to_column => nil ) . order ( from_column ) . limit ( batch_size ) . for_update . skip_locked ) .
205
+ where ( Sequel . qualify ( table , from_column ) => :"batch__#{ from_column } " ) .
206
+ update ( to_column => :"batch__#{ from_column } " )
207
+ end
82
208
end
83
209
end
0 commit comments