platform-db/changesets/registry/procedures/f_check_permissions_dcm.sql (65 lines of code) (raw):

--liquibase formatted sql /* * Copyright 2021 EPAM Systems. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ --changeset platform:f_check_permissions_dcm splitStatements:false stripComments:false runOnChange:true CREATE OR REPLACE FUNCTION public.f_check_permissions_dcm(p_table_name text, p_key_name text, p_uuid uuid, p_columns_arr text[], p_roles_arr text[], OUT r_is_check_passed boolean, OUT r_columns4rbac_arr text[]) RETURNS record LANGUAGE plpgsql AS $function$ declare c_unit_name text := 'f_check_permissions_dcm'; l_sql text; l_column_name text; l_dcm_access_role type_access_role[]; i record; begin call p_raise_notice(format('%s: p_table_name [%s]', c_unit_name, p_table_name)); call p_raise_notice(format('%s: p_key_name [%s]', c_unit_name, p_key_name)); call p_raise_notice(format('%s: p_uuid [%s]', c_unit_name, p_uuid)); call p_raise_notice(format('%s: p_columns_arr (list of updated columns) [%s]', c_unit_name, p_columns_arr)); call p_raise_notice(format('%s: p_roles_arr (list of user roles) [%s]', c_unit_name, p_roles_arr)); r_is_check_passed := true; r_columns4rbac_arr := p_columns_arr; -- check if dcm_access_role column exists in p_table_name l_sql := 'select column_name from information_schema.columns where table_schema = ''registry'' and table_name = ''' || p_table_name || ''' and column_name = ''dcm_access_role'' '; execute l_sql into l_column_name; if l_column_name is null then call p_raise_notice(format('%s: column [dcm_access_role] not found in table [%s] => dcm check is skipped', c_unit_name, p_table_name)); return; end if; -- get dcm_access_role value l_sql := 'select dcm_access_role from ' || p_table_name || ' where ' || p_key_name || ' = ''' || p_uuid || ''''; execute l_sql into l_dcm_access_role; -- check if dcm_access_role is empty call p_raise_notice(format('%s: l_dcm_access_role [%s]', c_unit_name, l_dcm_access_role)); call p_raise_notice(format('%s: cardinality(l_dcm_access_role) [%s]', c_unit_name, cardinality(l_dcm_access_role))); if l_dcm_access_role is null or cardinality(l_dcm_access_role) = 0 then call p_raise_notice(format('%s: dcm_access_role is empty => dcm check is skipped', c_unit_name)); return; end if; -- check permissions for columns specified in data_column_name foreach i in array l_dcm_access_role loop call p_raise_notice(format('%s: i.data_column_name [%s], i.access_role[%s]', c_unit_name, i.data_column_name, i.access_role)); if trim(coalesce(i.data_column_name, '')) = '' then call p_raise_notice(format('%s: data_column_name is empty => skip', c_unit_name)); continue; end if; if not (i.data_column_name = any(p_columns_arr)) then call p_raise_notice(format('%s: column [%s] not found in the list of updated columns => skip', c_unit_name, i.data_column_name)); continue; end if; -- NB. i.access_role is not checked for null or empty if p_roles_arr is null or cardinality(p_roles_arr) = 0 or not (p_roles_arr && i.access_role) then call p_raise_notice(format('%s: column [%s], ' || case when p_roles_arr is null or cardinality(p_roles_arr) = 0 then 'list of user roles is empty' else 'none of user roles found in access_role' end || ' => access denied', c_unit_name, i.data_column_name)); r_is_check_passed := false; return; end if; -- access permitted => exclude i.data_column_name from the list of updated columns for further checks call p_raise_notice(format('%s: column [%s], one of user roles found in access_role => access permitted', c_unit_name, i.data_column_name)); r_columns4rbac_arr := array_remove(r_columns4rbac_arr, i.data_column_name); end loop; call p_raise_notice(format('%s: r_columns4rbac_arr [%s]', c_unit_name, r_columns4rbac_arr)); return; end; $function$ SECURITY DEFINER SET search_path = registry, public, pg_temp;