抽取oracle数据到mysql数据库的实现
这篇文章给大家介绍抽取oracle数据到MySQL数据库的实现,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。
创新互联公司是一家集网站建设,繁峙企业网站建设,繁峙品牌网站建设,网站定制,繁峙网站建设报价,网络营销,网络优化,繁峙网站推广为一体的创新建站企业,帮助传统企业提升企业形象加强企业竞争力。可充分满足这一群体相比中小企业更为丰富、高端、多元的互联网需求。同时我们时刻保持专业、时尚、前沿,时刻以成就客户成长自我,坚持不断学习、思考、沉淀、净化自己,让我们为更多的企业打造出实用型网站。
1、要抽取mysql表、字段及过滤条件的配制文件imp_data.sql
2、建立一个目录ETL_DIR
3、运行oracle数据库程序P_ETL_ORA_DATA,生成各表的csv数据文件,同时也生成一个导入mysql的脚本文件imp_data.sql
4、导入mysql数据,文件内容如下
load data infile "alarm_hist_inc.csv" into table alarm_hist_inc fields terminated by "," enclosed by "^" lines terminated by "\r\n"; load data infile "button_authority.csv" into table button_authority fields terminated by "," enclosed by "^" lines terminated by "\r\n"; load data infile "c3_sms_hist_inc.csv" into table c3_sms_hist_inc fields terminated by "," enclosed by "^" lines terminated by "\r\n"; load data infile "datapermisson.csv" into table datapermisson fields terminated by "," enclosed by "^" lines terminated by "\r\n";
附:数据库脚本P_ETL_ORA_DATA
CREATE OR REPLACE PROCEDURE P_ETL_ORA_DATA ( P_ORA_DIR VARCHAR2, P_DATA_PATH VARCHAR2 ) IS TYPE T_REC IS RECORD( TBN VARCHAR2(40), WHR VARCHAR2(4000)); TYPE T_TABS IS TABLE OF T_REC; V_TABS T_TABS := T_TABS(); V_ETL_DIR VARCHAR2(40) := P_ORA_DIR; V_LOAD_FILE UTL_FILE.FILE_TYPE; PROCEDURE ETL_DATA ( P_SQL_STMT VARCHAR2, P_DATA_PATH VARCHAR2, P_TB_NAME VARCHAR2 ) IS BEGIN DECLARE V_VAR_COL VARCHAR2(32767); V_NUM_COL NUMBER; V_DATE_COL DATE; V_TMZ TIMESTAMP; V_COLS NUMBER; V_COLS_DESC DBMS_SQL.DESC_TAB; V_ROW_STR VARCHAR2(32767); V_COL_STR VARCHAR2(32767); V_SQL_ID NUMBER; V_SQL_REF SYS_REFCURSOR; V_EXP_FILE UTL_FILE.FILE_TYPE; V_DATA_PATH VARCHAR2(200); BEGIN V_DATA_PATH := P_DATA_PATH; IF REGEXP_SUBSTR(V_DATA_PATH, '\\$') IS NULL THEN V_DATA_PATH := V_DATA_PATH || '\'; END IF; V_DATA_PATH := REPLACE(V_DATA_PATH, '\', '\\'); OPEN V_SQL_REF FOR P_SQL_STMT; V_SQL_ID := DBMS_SQL.TO_CURSOR_NUMBER(V_SQL_REF); DBMS_SQL.DESCRIBE_COLUMNS(V_SQL_ID, V_COLS, V_COLS_DESC); FOR I IN V_COLS_DESC.FIRST .. V_COLS_DESC.LAST LOOP CASE WHEN V_COLS_DESC(I).COL_TYPE IN (1, 9, 96) THEN DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_VAR_COL, 32767); WHEN V_COLS_DESC(I).COL_TYPE = 2 THEN DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_NUM_COL); WHEN V_COLS_DESC(I).COL_TYPE = 12 THEN DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_DATE_COL); WHEN V_COLS_DESC(I).COL_TYPE = 180 THEN DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_TMZ); END CASE; END LOOP; DECLARE V_FLUSH_OVER PLS_INTEGER := 1; V_FILE_OVER PLS_INTEGER := 1; V_FILE_NO PLS_INTEGER := 1; V_FILE_NAME VARCHAR2(200); V_LINE VARCHAR2(400); BEGIN WHILE DBMS_SQL.FETCH_ROWS(V_SQL_ID) > 0 LOOP IF V_FILE_OVER = 1 THEN V_FILE_NAME := P_TB_NAME || '_' || V_FILE_NO || '.csv'; V_EXP_FILE := UTL_FILE.FOPEN(V_ETL_DIR, V_FILE_NAME, OPEN_MODE => 'w', MAX_LINESIZE => 32767); END IF; V_ROW_STR := ''; FOR I IN 1 .. V_COLS LOOP V_COL_STR := '\N'; BEGIN CASE WHEN V_COLS_DESC(I).COL_TYPE IN (1, 9, 96) THEN DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_VAR_COL); IF V_VAR_COL IS NOT NULL THEN V_COL_STR := '^' || V_VAR_COL || '^'; END IF; WHEN V_COLS_DESC(I).COL_TYPE = 2 THEN DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_NUM_COL); IF V_NUM_COL IS NOT NULL THEN V_COL_STR := V_NUM_COL; END IF; WHEN V_COLS_DESC(I).COL_TYPE = 12 THEN DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_DATE_COL); IF V_DATE_COL IS NOT NULL THEN V_COL_STR := '^' || TO_CHAR(V_DATE_COL, 'yyyy-mm-dd hh34:mi:ss') || '^'; END IF; WHEN V_COLS_DESC(I).COL_TYPE IN (180, 181, 231) THEN DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_TMZ); IF V_TMZ IS NOT NULL THEN V_COL_STR := '^' || TO_CHAR(V_TMZ, 'yyyy-mm-dd hh34:mi:ss.ff6') || '^'; END IF; END CASE; IF I = 1 THEN V_ROW_STR := V_COL_STR; ELSE V_ROW_STR := V_ROW_STR || ',' || V_COL_STR; END IF; END; END LOOP; UTL_FILE.PUT_LINE(V_EXP_FILE, CONVERT(V_ROW_STR, 'UTF8')); IF V_FILE_OVER > 200000 /*每200000条记录就产生一个新的文件*/ THEN V_FILE_OVER := 1; V_FLUSH_OVER := 1; V_FILE_NO := V_FILE_NO + 1; UTL_FILE.FCLOSE(V_EXP_FILE); V_LINE := 'load data infile "' || V_DATA_PATH || V_FILE_NAME || '" into table ' || P_TB_NAME; V_LINE := V_LINE || ' fields terminated by "," enclosed by "^" lines terminated by "\r\n";'; UTL_FILE.PUT_LINE(V_LOAD_FILE, V_LINE); UTL_FILE.FFLUSH(V_LOAD_FILE); CONTINUE; END IF; V_FILE_OVER := V_FILE_OVER + 1; IF V_FLUSH_OVER > 2000 /*每2000条记录就刷新缓存,写到文件中 */ THEN UTL_FILE.FFLUSH(V_EXP_FILE); V_FLUSH_OVER := 1; ELSE V_FLUSH_OVER := V_FLUSH_OVER + 1; END IF; END LOOP; DBMS_SQL.CLOSE_CURSOR(V_SQL_ID); IF UTL_FILE.IS_OPEN(V_EXP_FILE) THEN UTL_FILE.FCLOSE(V_EXP_FILE); V_LINE := 'load data infile "' || V_DATA_PATH || V_FILE_NAME || '" into table ' || P_TB_NAME; V_LINE := V_LINE || ' fields terminated by "," enclosed by "^" lines terminated by "\r\n";'; UTL_FILE.PUT_LINE(V_LOAD_FILE, V_LINE); UTL_FILE.FFLUSH(V_LOAD_FILE); END IF; END; EXCEPTION WHEN OTHERS THEN IF DBMS_SQL.IS_OPEN(V_SQL_ID) THEN DBMS_SQL.CLOSE_CURSOR(V_SQL_ID); END IF; IF UTL_FILE.IS_OPEN(V_EXP_FILE) THEN UTL_FILE.FCLOSE(V_EXP_FILE); END IF; DBMS_OUTPUT.PUT_LINE(SQLERRM); DBMS_OUTPUT.PUT_LINE(P_SQL_STMT); END; END; BEGIN BEGIN EXECUTE IMMEDIATE 'create table mysql_etl_tbs(tn varchar2(40),cn varchar2(40),ci number) '; EXCEPTION WHEN OTHERS THEN NULL; END; EXECUTE IMMEDIATE 'truncate table mysql_etl_tbs'; DECLARE V_CI PLS_INTEGER; V_CN VARCHAR2(40); V_ETL_COLS VARCHAR2(32767); V_TBN VARCHAR2(30); V_ETL_CFG VARCHAR2(32767); V_CNF_FILE UTL_FILE.FILE_TYPE; V_FROM_POS PLS_INTEGER; BEGIN V_CNF_FILE := UTL_FILE.FOPEN(V_ETL_DIR, 'ETL_TABS.CNF', 'r', 32767); LOOP UTL_FILE.GET_LINE(V_CNF_FILE, V_ETL_CFG, 32767); V_FROM_POS := REGEXP_INSTR(V_ETL_CFG, 'from', 1, 1, 0, 'i'); V_ETL_COLS := SUBSTR(V_ETL_CFG, 1, V_FROM_POS - 1); V_ETL_COLS := REGEXP_SUBSTR(V_ETL_COLS, '(select)(.+)', 1, 1, 'i', 2); V_TBN := REGEXP_SUBSTR(V_ETL_CFG, '(\s+from\s+)(\w+)(\s*)', 1, 1, 'i', 2); V_TBN := UPPER(V_TBN); V_TABS.EXTEND(); V_TABS(V_TABS.LAST).TBN := V_TBN; V_TABS(V_TABS.LAST).WHR := REGEXP_SUBSTR(V_ETL_CFG, '\s+where .+', 1, 1, 'i'); V_CI := 1; LOOP V_CN := REGEXP_SUBSTR(V_ETL_COLS, '\S+', 1, V_CI); EXIT WHEN V_CN IS NULL; V_CN := UPPER(V_CN); EXECUTE IMMEDIATE 'insert into mysql_etl_tbs(tn,cn,ci) values(:1,:2,:3)' USING V_TBN, V_CN, V_CI; COMMIT; V_CI := V_CI + 1; END LOOP; END LOOP; EXCEPTION WHEN UTL_FILE.INVALID_PATH THEN DBMS_OUTPUT.PUT_LINE('指定的目录:ETL_DIR"' || '"无效!'); RETURN; WHEN UTL_FILE.INVALID_FILENAME THEN DBMS_OUTPUT.PUT_LINE('指定的文件:" ETL_TABS.CNF' || '"无效!'); RETURN; WHEN NO_DATA_FOUND THEN UTL_FILE.FCLOSE(V_CNF_FILE); WHEN OTHERS THEN DBMS_OUTPUT.PUT_LINE(SQLERRM); RETURN; END; DECLARE V_CUR_MATCH SYS_REFCURSOR; V_SQL_SMT VARCHAR2(32767); V_TN VARCHAR2(40); V_CN VARCHAR2(40); V_CI PLS_INTEGER; V_COLUMN_NAME VARCHAR2(40); V_ETL_COLS VARCHAR2(32767); V_LINE VARCHAR2(4000); V_TBN VARCHAR2(40); BEGIN V_LOAD_FILE := UTL_FILE.FOPEN(V_ETL_DIR, 'load_data.sql', OPEN_MODE => 'w', MAX_LINESIZE => 32767); FOR T_IX IN V_TABS.FIRST .. V_TABS.LAST LOOP V_SQL_SMT := 'select tn,cn,column_name,ci from ( select * from mysql_etl_tbs where tn='':tbn:'' ) l left join user_tab_columns r on l.tn = r.table_name and l.cn= r.column_name order by ci'; V_TBN := V_TABS(T_IX).TBN; V_SQL_SMT := REPLACE(V_SQL_SMT, ':tbn:', V_TBN); V_ETL_COLS := NULL; OPEN V_CUR_MATCH FOR V_SQL_SMT; LOOP FETCH V_CUR_MATCH INTO V_TN, V_CN, V_COLUMN_NAME, V_CI; EXIT WHEN V_CUR_MATCH%NOTFOUND; IF V_CI > 1 THEN V_ETL_COLS := V_ETL_COLS || ' , '; END IF; IF V_COLUMN_NAME IS NULL THEN V_ETL_COLS := V_ETL_COLS || ' cast(null as number) ' || V_CN; ELSE V_ETL_COLS := V_ETL_COLS || V_CN; END IF; END LOOP; CLOSE V_CUR_MATCH; V_TBN := LOWER(V_TBN); V_SQL_SMT := 'select ' || V_ETL_COLS || ' from ' || V_TBN || V_TABS(T_IX).WHR; ETL_DATA(V_SQL_SMT, P_DATA_PATH, V_TBN); END LOOP; IF UTL_FILE.IS_OPEN(V_LOAD_FILE) THEN UTL_FILE.FCLOSE(V_LOAD_FILE); END IF; END; END P_ETL_ORA_DATA;
关于抽取oracle数据到mysql数据库的实现就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
标题名称:抽取oracle数据到mysql数据库的实现
当前链接:http://scyanting.com/article/jgsjcg.html