platform-db/changesets/registry/procedures/p_load_table_from_csv.sql (91 lines of code) (raw):

--liquibase formatted sql /* * Copyright 2021 EPAM Systems. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ --changeset platform:p_load_table_from_csv splitStatements:false stripComments:false runOnChange:true CREATE OR REPLACE PROCEDURE p_load_table_from_csv(p_table_name text, p_file_name text, p_table_columns text[], p_target_table_columns text[] DEFAULT NULL::text[]) LANGUAGE plpgsql AS $procedure$ DECLARE l_sql TEXT; l_sys_cols text := 'curr_user=>"admin",source_system=>"Initial load",source_application=>"Initial load",source_process=>"Initial load process",process_id=>"0000"'; l_cols text := ''; l_row record; l_target_table_columns text[] := coalesce(p_target_table_columns,p_table_columns); l_col_name TEXT; l_col_value TEXT; l_is_uuid BOOLEAN := FALSE; l_uuid TEXT := NULL; j INT := 0; l_ref refs; l_col_names TEXT[]; l_col_vals TEXT[]; l_curr_idx int; l_system_roles_arr text := 'array[null, null, null, null]::text[]'; -- system role marker to skip RBAC check during initial data load BEGIN -- FOR i IN array_lower(p_table_columns, 1)..array_upper(p_table_columns, 1) LOOP l_cols := l_cols || p_table_columns[i] || ' TEXT,'; IF p_table_columns[i] = 'uuid' THEN l_is_uuid := TRUE; END IF; END LOOP; l_cols := TRIM(TRAILING ',' FROM l_cols); -- l_sql := format($$DROP FOREIGN TABLE IF EXISTS %I_csv$$, p_table_name); execute l_sql; -- l_sql := format($$CREATE FOREIGN TABLE %I_csv(%s) SERVER srv_file_fdw OPTIONS (FILENAME '%s', FORMAT 'csv', HEADER 'true', DELIMITER ',', ENCODING 'UTF8' )$$, p_table_name, l_cols, p_file_name); CALL p_raise_notice(l_sql); execute l_sql; -- l_cols := ''; FOR i IN array_lower(l_target_table_columns, 1)..array_upper(l_target_table_columns, 1) LOOP -- CASE WHEN split_part(l_target_table_columns[i],'::',2) = '' THEN l_col_name := split_part(l_target_table_columns[i],'::',1); l_col_value := l_col_name; WHEN split_part(l_target_table_columns[i],'::',2) LIKE 'ref(%' THEN l_col_name := split_part(l_target_table_columns[i],'::',1); l_ref := f_get_ref_record(l_target_table_columns[i]); l_col_value := format('(f_get_id_from_ref_table(''%s'',''%s'',''%s'',%s))', l_ref.ref_table, l_ref.ref_col, l_ref.ref_id, l_ref.lookup_col); WHEN split_part(l_target_table_columns[i],'::',2) LIKE 'ref_array(%' then NULL; ELSE l_col_name := split_part(l_target_table_columns[i],'::',1); l_col_value := split_part(l_target_table_columns[i],'::',2) ; END CASE; l_cols := l_cols ||''||l_col_name||'=>''||coalesce(''"''||REGEXP_REPLACE(trim('||l_col_value||', chr(160)),''"'',''\"'',''g'')||''"'',''NULL'')||'',' ; -- END LOOP; -- FOR i IN array_lower(l_target_table_columns, 1)..array_upper(l_target_table_columns, 1) LOOP CASE WHEN split_part(l_target_table_columns[i],'::',2) LIKE 'ref_array(%' then -- merge duplicated columns l_curr_idx := array_position(l_col_names, split_part(l_target_table_columns[i],'::',1)); IF l_curr_idx IS NULL THEN l_col_names := array_append(l_col_names, split_part(l_target_table_columns[i],'::',1)); l_col_vals := array_append(l_col_vals, NULL); l_curr_idx := array_position(l_col_names, split_part(l_target_table_columns[i],'::',1)); END IF; l_ref := f_get_ref_record(l_target_table_columns[i]); l_col_vals[l_curr_idx] := concat_ws(',', l_col_vals[l_curr_idx], format('(f_get_id_from_ref_array_table(''%s'',''%s'',''%s'',%s,''%s''))', l_ref.ref_table, l_ref.ref_col, l_ref.ref_id, l_ref.lookup_col, l_ref.list_delim)); ELSE NULL; END CASE; END LOOP; -- IF array_length(l_col_names, 1) > 0 then FOR i IN array_lower(l_col_names, 1)..array_upper(l_col_names, 1) LOOP l_cols := l_cols ||''||l_col_names[i]||'=>''||coalesce(''"{''||REGEXP_REPLACE(trim(concat_ws('','','||l_col_vals[i]||'), chr(160)),''"'',''\"'',''g'')||''}"'',''NULL'')||'',' ; END LOOP; END IF; -- l_cols := TRIM(TRAILING ',' FROM l_cols); CALL p_raise_notice(l_cols); -- IF l_is_uuid THEN -- l_sql := format('SELECT ''($$%s$$)::hstore);'' f FROM %I_csv', l_cols, p_table_name); l_sql := format('SELECT ''SELECT f_row_insert(''''%I'''', (''''%s'''')::hstore,($$%s$$)::hstore, ' || l_system_roles_arr || ', ''''''||uuid||''''''::uuid);'' f FROM %I_csv' , p_table_name, l_sys_cols, l_cols, p_table_name); -- , array_to_string(l_lookups,', '), p_table_name, l_sys_cols, l_cols, p_table_name); ELSE l_sql := format('SELECT ''SELECT f_row_insert(''''%I'''', (''''%s'''')::hstore,($$%s$$)::hstore, ' || l_system_roles_arr || ');'' f FROM %I_csv' , p_table_name, l_sys_cols, l_cols, p_table_name); END IF; CALL p_raise_notice(l_sql); -- FOR l_row IN EXECUTE l_sql LOOP CALL p_raise_notice(l_row.f); EXECUTE l_row.f; END LOOP; -- l_sql := format($$DROP FOREIGN TABLE IF EXISTS %I_csv$$, p_table_name); execute l_sql; -- END; $procedure$ SECURITY DEFINER SET search_path = registry, public, pg_temp;