sqoop 自动化脚本

Sqoop是一款开源的工具,主要用于在HADOOP(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS,NOSQL中,也可以将HDFS的数据导进到关系型数据库中。
Sqoop项目开始于2009年,最早是作为Hadoop的一个第三方模块存在,后来为了让使用者能够快速部署,也为了让开发人员能够更快速的迭代开发,Sqoop独立成为一个Apache项目。

1、sqoop auto hive && snappy

  • (1)-m 参数的说明,代表含义是使用多少个并行,如果参数值是1说明没有开启并功能。

  • (2)-m 参数的值调大,使用并行导入的功能,如果-m 4说明可以并行开启4个进程,同时进行数据导入。

  • (3)如果从Oracle中导入的表没有主键,那么会出现如下的错误提示:
    ERROR tool.ImportTool: Error during import: No primary key could be
    found for table tmp.tbls . Please specify one with --split-by or perform
    a sequential import with '-m 1'.

  • (4)需要手动指定一个oracle字段,SQOOP开启并行导入功能,首先他会根据这个字段
    执行如下查询:

    1
    2
    select min(local_code), max(local_code) from pu_web.bigdata_area_code where
    data_desc='2016-02-26' [where 如果指定了where子句,没有则忽略] ==> 0691,9999

    然后sqoop会根据并行导入-m值,进行拆分,比如上面的结果0691-9999,-m 16那么会被均分为16份,并行执行导入。

    1
    select * from table where 0 <= id < 0691+(9999-0691)/16;  累死这样的算法,均分16个范围,并行导入数据。

注意,拆分的字段需要是整数。

1
2
3
4
5
6
7
8
9
10
11
1、测试连接
sqoop list-tables --connect jdbc:oracle:thin:@oracle_ip:1521:orcl --username TMP --password 123

2、删除已存在表
hive -e 'drop table if exists tmp.barea_code'

3、自动建立hive表,并且导入数据到相应目录
sqoop import -D mapreduce.job.queuename=mapred --connect jdbc:oracle:thin:@oracle_ip:1521:orcl --username TMP --password 123 --table BAREA_CODE --fields-terminated-by "\t" --lines-terminated-by "\n" --hive-import --create-hive-table --hive-overwrite --hive-table tmp.barea_code --null-string '\\N' --null-non-string '\\N' --as-parquetfile --compression-codec "org.apache.hadoop.io.compress.SnappyCodec" -m 16 --split-by local_code

4、重写已经存在hive表的数据
sqoop import -D mapreduce.job.queuename=mapred --connect jdbc:oracle:thin:@oracle_ip:1521:orcl --username TMP --password 123 --table BAREA_CODE --fields-terminated-by "\t" --lines-terminated-by "\n" --hive-import --hive-overwrite --hive-table tmp.barea_code --null-string '\\N' --null-non-string '\\N' --compression-codec "org.apache.hadoop.io.compress.SnappyCodec" -m 16 --split-by local_code

上面命令优点:
1、任务放到mapred任务队列
2、增加并行执行度,提升效率
3、文件以snappy格式存放到集群,目前snappy压缩,是最佳选择。

  • (5)、auto rdbms to hive && compress SnappyCodec
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    $ cat rdbms_to_hive_auto.py 
    #!/usr/bin/env python
    # coding: utf-8
    __author__ = 'whoami'
    import commands,logging

    def is_num_by_except(num):
    try:
    int(num)
    return True
    except ValueError:
    # print "%s ValueError" % num
    return False

    def db_settings():
    # db info format --> id,db_username,jdbc_info,db_passswd
    db_info = {1:'WEB,jdbc:oracle:thin:@host_ip:1521:orcl,123',
    2:'TARI,jdbc:oracle:thin:@host_ip:1521:orcl,123',
    3:'TEST,jdbc:oracle:thin:@host_ip:1521:orcl,123',
    4:'mysql,com.mysql.jdbc.Driver,admin123'}

    return db_info

    def database_info():
    lines = db_settings()
    print 'SQOOP:Oracle jdbc connect info...'
    for k,v in lines.items():
    print ' ',k,v

    def get_matche_conn(db):

    while 1:
    number = raw_input("Please input number: ")
    if is_num_by_except(number):
    con = db.get(int(number))
    return con
    break
    else:
    logging.error(' Input types Error,please input number types... ')
    continue

    def is_sqoop_split():
    is_split = raw_input("Do you want support sqoop split [y/n] (n)?")
    if 'y' is is_split:
    return True
    else:
    return False

    def sqoop_split_column():
    while 1:
    sqoop_split_cloumn = raw_input("Please input sqoop split cloumn,cloumn type is integer: ")
    number = raw_input("Please input sqoop split map concurrent execution number: ")

    if is_num_by_except(number):
    return sqoop_split_cloumn,number
    break
    else:
    logging.error(' Input types Error,please input number types... ')
    continue

    def delete_hive_table(table_name):
    command = "hive -e 'drop table if exists %s'" %(table_name)
    logging.warn('starting drop table if exists %s ... ' %table_name)
    status,result = commands.getstatusoutput(command)

    if status == 0:
    logging.warn(result)
    logging.warn('drop hive table success... '),
    else:
    logging.warn('drop hive table fail...')

    def raw_table():
    rdbms_table_name = raw_input("Please input rdbms table name: ")

    hive_table_name = raw_input("Please input hive table name: ")

    return rdbms_table_name,hive_table_name

    def sqoop_task_parse(is_split,rdbms_table_name,hive_table_name,rdbms_username,rdbms_jdbc,rdbms_passwd):
    if is_split:
    split_column,split_m=sqoop_split_column()
    sqoop_import = 'sqoop import -D mapreduce.job.queuename=db --connect %s --username %s --password %s --table %s --fields-terminated-by "\\t" --lines-terminated-by "\\n" --hive-import --create-hive-table --hive-table %s --compression-codec "org.apache.hadoop.io.compress.SnappyCodec" -m %s --split-by %s' %(rdbms_jdbc,rdbms_username,rdbms_passwd,rdbms_table_name,hive_table_name,split_m,split_column) + " --null-string '\\\N' --null-non-string '\\\N'"
    return sqoop_import
    else:
    sqoop_import = 'sqoop import -D mapreduce.job.queuename=db --connect %s --username %s --password %s --table %s --fields-terminated-by "\\t" --lines-terminated-by "\\n" --hive-import --create-hive-table --hive-table %s --compression-codec "org.apache.hadoop.io.compress.SnappyCodec" -m 1' %(rdbms_jdbc,rdbms_username,rdbms_passwd,rdbms_table_name,hive_table_name) + " --null-string '\\\N' --null-non-string '\\\N'"
    return sqoop_import

    def sqoop_task(sqoop_import):
    logging.warn('starting sqoop import rdbms to hive ...')
    status,result = commands.getstatusoutput(sqoop_import)

    if status == 0:
    logging.warn(result)
    logging.warn('sqoop import rdbms to hive success... '),
    else:
    logging.warn('sqoop import rdbms to hive fail...')



    def task_execute(hive_table_name,sqoop_import):
    delete_hive_table(hive_table_name)
    print sqoop_import
    sqoop_task(sqoop_import)

    def sqoop_main():
    database_info()
    db = db_settings()
    conn = get_matche_conn(db).strip().split(',')
    rdbms_table_name,hive_table_name = raw_table()
    is_split = is_sqoop_split()
    sqoop_import = sqoop_task_parse(is_split,rdbms_table_name,hive_table_name,conn[0],conn[1],conn[2])
    task_execute(hive_table_name,sqoop_import)

    sqoop_main()

2、auto hdfs to oracle

3、问题整理?

  • Oracle导数据需要注意,表明和库明要大写,否则会找不到表。

  • 导parquet格式数据:

    • ERROR:Dataset name tmp.barea_code is not alphanumeric (plus ‘_’)

    • impala 生成的parquet表无法导回oracle,mr生成的parquet能导回oracle

    • Import null内容处理,select * from d_area_code where area_name is null;

      • –null-string ‘\N’
      • –null-non-string ‘\N’
      • rdbms导入数据到Hadoop集群,加入上面两个参数,执行上面SQL可以过滤出结果
      • 不加上面两个参数,导入hdfs之后结果变成’null‘,执行上面SQL无法过滤出结果
      • 如果加入参数,可以过滤出结果,显示’NULL’内容。
    • Export null内容处理和Import道理一致

      • –input-null-string ‘\N’
      • –input-null-non-string ‘\N’

原创文章,转载请注明: 转载自Itweet的博客
本博客的文章集合: http://www.itweet.cn/blog/archive/