in kong/kong/db/init.lua [478:665]
function DB:run_migrations(migrations, options)
if type(migrations) ~= "table" then
error("migrations must be a table", 2)
end
if type(options) ~= "table" then
error("options must be a table", 2)
end
local run_up = options.run_up
local run_teardown = options.run_teardown
local skip_teardown_migrations = {}
if run_teardown and options.skip_teardown_migrations then
for _, t in ipairs(options.skip_teardown_migrations) do
for _, mig in ipairs(t.migrations) do
local ok, mod = utils.load_module_if_exists(t.namespace .. "." ..
mig.name)
if ok then
local strategy_migration = mod[self.strategy]
if strategy_migration and strategy_migration.teardown then
if not skip_teardown_migrations[t.subsystem] then
skip_teardown_migrations[t.subsystem] = {}
end
skip_teardown_migrations[t.subsystem][mig.name] = true
end
end
end
end
end
if not run_up and not run_teardown then
error("options.run_up or options.run_teardown must be given", 2)
end
local ok, err = self.connector:connect_migrations()
if not ok then
return nil, prefix_err(self, err)
end
local n_migrations = 0
local n_pending = 0
for i, t in ipairs(migrations) do
log("migrating %s on %s '%s'...", t.subsystem, self.infos.db_desc,
self.infos.db_name)
for _, mig in ipairs(t.migrations) do
local ok, mod = utils.load_module_if_exists(t.namespace .. "." ..
mig.name)
if not ok then
self.connector:close()
return nil, fmt_err(self, "failed to load migration '%s': %s",
mig.name, mod)
end
local strategy_migration = mod[self.strategy]
if not strategy_migration then
self.connector:close()
return nil, fmt_err(self, "missing %s strategy for migration '%s'",
self.strategy, mig.name)
end
log.debug("running migration: %s", mig.name)
if run_up then
if strategy_migration.up and strategy_migration.up ~= "" then
ok, err = self.connector:run_up_migration(mig.name,
strategy_migration.up)
if not ok then
self.connector:close()
return nil, fmt_err(self, "failed to run migration '%s' up: %s",
mig.name, err)
end
end
if strategy_migration.up_f then
local pok, perr, err = xpcall(strategy_migration.up_f, debug.traceback, self.connector)
if not pok or err then
self.connector:close()
return nil, fmt_err(self, "failed to run migration '%s' up_f: %s",
mig.name, perr or err)
end
end
local state = "executed"
if strategy_migration.teardown then
state = "pending"
n_pending = n_pending + 1
end
ok, err = self.connector:record_migration(t.subsystem, mig.name,
state)
if not ok then
self.connector:close()
return nil, fmt_err(self, "failed to record migration '%s': %s",
mig.name, err)
end
end
local skip_teardown = skip_teardown_migrations[t.subsystem] and
skip_teardown_migrations[t.subsystem][mig.name]
if not skip_teardown and run_teardown and strategy_migration.teardown then
if run_up then
ok, err = self.connector:wait_for_schema_consensus()
if not ok then
self.connector:close()
return nil, prefix_err(self, err)
end
end
local f = strategy_migration.teardown
local pok, perr, err = xpcall(f, debug.traceback, self.connector)
if not pok or err then
self.connector:close()
return nil, fmt_err(self, "failed to run migration '%s' teardown: %s",
mig.name, perr or err)
end
ok, err = self.connector:record_migration(t.subsystem, mig.name,
"teardown")
if not ok then
self.connector:close()
return nil, fmt_err(self, "failed to record migration '%s': %s",
mig.name, err)
end
n_pending = math.max(n_pending - 1, 0)
if not run_up then
ok, err = self.connector:wait_for_schema_consensus()
if not ok then
self.connector:close()
return nil, prefix_err(self, err)
end
end
end
log("%s migrated up to: %s %s", t.subsystem, mig.name,
strategy_migration.teardown and not run_teardown and "(pending)"
or "(executed)")
n_migrations = n_migrations + 1
end
if run_up and i == #migrations then
ok, err = self.connector:wait_for_schema_consensus()
if not ok then
self.connector:close()
return nil, prefix_err(self, err)
end
end
end
log("%d migration%s processed", n_migrations,
n_migrations > 1 and "s" or "")
local n_executed = n_migrations - n_pending
if n_executed > 0 then
log("%d executed", n_executed)
end
if n_pending > 0 then
log("%d pending", n_pending)
end
self.connector:close()
return true
end