【宝马娱乐在线】MaxCompute重装上阵 第五弹 – SELECT TRANSFOR

  1. Using
    子句指定的是要执行的命令,而非资源列表,这一点和大多数的MaxCompute
    SQL语法不一样,这么做是为了和hive的语法保持兼容。

  2. 输入从stdin传入,输出从stdout传出;

  3. 可以配置分隔符,默认使用 t 分隔列,用换行分隔行;

  4. 可以自定义reader/writer,但用内置的reader/writer会快很多

  5. 使用自定义的资源(脚本文件,数据文件等),可以使用 set
    odps.sql.session.resources=foo.sh,bar.txt;
    来指定。可以指定多个resource文件,用逗号隔开(因此不允许resource名字中包含逗号和分号)。此外我们还提供了resources子句,可以在using
    子句后面指定 resources ‘foo.sh’, ‘bar.txt’
    来指定资源,两种方式是等价的(参考“用odps跑测试”的例子);

Hive中的TRANSFORM:自定义Mapper和Reducer完成Map/Reduce

/**
 * Mapper.
 */
public interface Mapper {
  /**
   * Maps a single row into an intermediate rows.
   * 
   * @param record
   *          input record
   * @param output
   *          collect mapped rows.
   * @throws Exception
   *           on error
   */
  void map(String[] record, Output output) throws Exception;
}

可以将一列拆分为多列

【宝马娱乐在线】MaxCompute重装上阵 第五弹 – SELECT TRANSFOR。使用样例:

public class ExecuteMap {

    private static final String FULL_PATH_CLASS = "com.***.dpop.ods.mr.impl.";

    private static final Map<String, Mapper> mappers = new HashMap<String, Mapper>();

    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            throw new Exception("Process class must be given");
        }

        new GenericMR().map(System.in, System.out,
                getMapper(args[0], Arrays.copyOfRange(args, 1, args.length)));
    }

    private static Mapper getMapper(String parserClass, String[] args)
            throws ClassNotFoundException {
        if (mappers.containsKey(parserClass)) {
            return mappers.get(parserClass);
        }

        Class[] classes = new Class[args.length];
        for (int i = 0; i < classes.length; ++i) {
            classes[i] = String.class;
        }
        try {
            Mapper mapper = (Mapper) Class.forName(FULL_PATH_CLASS + parserClass).getConstructor(classes).newInstance(args);
            mappers.put(parserClass, mapper);
            return mapper;
        } catch (ClassNotFoundException e) {
            throw new ClassNotFoundException("Unknown MapperClass:" + parserClass, e);
        } catch (Exception e) {
            throw new  ClassNotFoundException("Error Constructing processor", e);
        }

    }
}

MR_USING=" USING 'java -Xmx512m -Xms512m -cp ods-mr-1.0.jar:hive-contrib-2.3.33.jar com.***.dpop.ods.mr.api.ExecuteMap "

COMMAND="FROM dw_rtb.event_fact_adx_auction "
COMMAND="${COMMAND} INSERT overwrite TABLE dw_rtb.event_fact_mid_adx_auction_ad PARTITION(yymmdd=${CURRENT_DATE}) SELECT transform(search_id, print_time, pthread_id, ad_s) ${MR_USING} EventFactMidAdxAuctionAdMapper' as search_id, print_time, pthread_id, ad_s, ssp_id WHERE $INSERT_PARTITION and original = 'exinternal' "

使用 UDF(User Defined Function)处理数据

此文中采用MaxCompute Studio作展示,首先,安装MaxCompute
Studio,导入测试MaxCompute项目,创建工程,建立一个新的MaxCompute脚本文件, 如下

UDAF

  • Hive
    udaf开发入门和运行过程详解
  • Hive通用型自定义聚合函数(UDAF)

vid bigint,

Select
transform允许sql用户指定在服务器上执行一句shell命令,将上游数据各字段用tab分隔,每条记录一行,逐行输入shell命令的stdin,并从stdout读取数据作为输出,送到下游。Shell命令的本质是调用Unix的一些utility,因此可以启动其他的脚本解释器。包括python,java,php,awk,ruby等。

Hive中的TRANSFORM:使用脚本完成Map/Reduce

转自:
http://www.coder4.com/archives/4052

首先来看一下数据:

hive> select * from test;
OK
1       3
2       2
3       1

假设,我们要输出每一列的md5值。在目前的hive中是没有这个udf的。

我们看一下Python的代码:

#!/home/tops/bin/python

import sys
import hashlib

for line in sys.stdin:
    line = line.strip()
    arr = line.split()
    md5_arr = []
    for a in arr:
        md5_arr.append(hashlib.md5(a).hexdigest())
    print "t".join(md5_arr)

在Hive中,使用脚本,首先要将他们加入:

add file /xxxx/test.py

然后,在调用时,使用TRANSFORM语法。

SELECT 
    TRANSFORM (col1, col2) 
    USING './test.py' 
    AS (new1, new2) 
FORM 
    test;

这里,我们使用了AS,指定输出的若干个列,分别对应到哪个列名。如果省略这句,则Hive会将第1个tab前的结果作为key,后面其余作为value。

这里有一个小坑:有时候,我们结合INSERT
OVERWRITE使用上述TRANSFORM,而目标表,其分割副可能不是t。但是请牢记:TRANSFORM的分割符号,传入、传出脚本的,永远是t。不要考虑外面其他的分割符号!

最后,解释一下MAP、REDUCE。

在有的Hive语句中,大家可能会看到SELECT MAP (…) USING ‘xx.py’这样的语法。

然而,在Hive中,MAP、REDUCE只不过是TRANSFORM的别名,Hive不保证一定会在map/reduce中调用脚本。看看官方文档是怎么说的:

Formally, MAP ... and REDUCE ... are syntactic transformations of SELECT TRANSFORM ( ... ). In other words, they serve as comments or notes to the reader of the query. BEWARE: Use of these keywords may be dangerous as (e.g.) typing "REDUCE" does not force a reduce phase to occur and typing "MAP" does not force a new map phase!

所以、混用map
reduce语法关键字,甚至会引起混淆,所以建议大家还是都用TRANSFORM吧。

友情提示:如果脚本不是Python,而是awk、sed等系统内置命令,可以直接使用,而不用add
file。

如果表中有MAP,ARRAY等复杂类型,怎么用TRANSFORM生成?

例如:

CREATE TABLE features
(
    id BIGINT,
    norm_features MAP<STRING, FLOAT> 
);

答案是,要在脚本的输出中,对特殊字段按照HDFS文件中的格式输出即可。

例如,以上面的表结构为例,每行输出应为:

1^Ifeature1^C1.0^Bfeature2^C2.0

其中I是tab键,这是TRANSFORM要求的分割符号。B和^C是Hive存储时MAP类型的KV分割符。

另外,在Hive的TRANSFORM语句的时候,要注意AS中加上类型声明:

SELECT TRANSFORM(stuff)
USING 'script'
AS (thing1 INT, thing2 MAP<STRING, FLOAT>)

4.在bin/odpscmd 下输入环境变量,显式开启 ODPS 2.0
的非结构化功能( 仅在 ODPS 2.0 计算框架完全上线为必须),单独执行
xx.sql 文件时也需要将下属设置写在 SQL 文件的开头处。

性能上,SELECT TRANSFORM 与UDTF
各有千秋。经过多种场景对比测试,数据量较小时,大多数场景下select
transform有优势,而数据量大时UDTF有优势。由于transform的开发更加简便,所以select
transform非常适合做adhoc的数据分析。

UDTF

  • Hive中UDTF编写和使用

 

第四弹 – CTE,VALUES,SEMIJOIN

Hive Python Streaming的原理及写法

http://www.tuicool.com/articles/vmumUjA

end_point=

理论上select transform能实现的功能udtf都能实现,但是select
transform比udtf要灵活得多。且select
transform不仅支持java和python,还支持shell,perl等其它脚本和工具。
且编写的过程要简单,特别适合adhoc功能的实现。举几个例子:

  • 实例名称:vehicle-test
  • 数据表名称:vehicle_track
  • 主键信息:vid(int); gt (int)
  • 访问域名:https://vehicle-test.cn-shanghai.ots-internal.aliyuncs.com

MaxCompute基于ODPS2.0新一代的SQL引擎,显著提升了SQL语言编译过程的易用性与语言的表达能力。我们在此推出MaxCompute(ODPS2.0)重装上阵系列文章

MaxCompute 与 TableStore
是两个独立的大数据计算以及大数据存储服务,所以两者之间的网络必须保证连通性。
对于 MaxCompute 公共云服务访问 TableStore 存储,推荐使用 TableStore
私网
地址,例如

  1. 支持其他脚本语言

select vid,count(*),avg(speed),avg(oil_consumption) from
ots_vehicle_track where vid
<4 and gt<1469171387  group by
vid;

上面的语句造出一份有50行的数据表,值是从1到50;
测试时候的数据就可以方便造出来了。功能看似简单,但以前是odps的一个痛点,没有方便的办法造数据,就不方便测试以及初学者的学习和探索。当然这也可以通过udtf来实现,但是需要复杂的流程:进入ide->写udtf->打包->add
jar/python->create function->执行->drop function->drop
resource。

//
我们选出来1行数据,并将name/name传入UDF,返回两个string的累加

这个例子是为了说明,很多java的utility可以直接拿来运行。java和python虽然有现成的udtf框架,但是用select
transform编写更简单,并且不需要额外依赖,也没有格式要求,甚至可以实现离线脚本拿来直接就用。

1.下载并安装大数据计算服务客户端

宝马娱乐在线 1

 

本文为云栖社区原创内容,未经允许不得转载。返回搜狐,查看更多

# this url is for odpscmd update

宝马娱乐在线 2

latitude double,

宝马娱乐在线 3

data_size_confirm=100.0

select transform (key, value) using “perl -e ‘while($input =
<STDIN>){print $input;}'” from src;

 

6.
资源文件会被下载到执行指定命令的工作目录,可以使用文件接口打开./bar.txt文件。

什么是大数据计算服务 MaxCompute?

小结

set odps.sql.type.system.odps2=true; –是支持表格存储的binary类型

  1. 无中生有造数据

比如下面就是一个简单的 UDF
定义,只是简单的将两个字符串连接。 MaxCompute
支持更复杂的UDF,包括自定义窗口执行逻辑等,详细请参考MaxCompute Studio-开发
UDF。

宝马娱乐在线 4

access_key= ODPS-AccessKey

  • 注一,USING
    后面的字符串,在后台是直接起的子进程来调起命令,没有起shell,所以shell的某些语法,如输入输出重定向,管道等是不支持的。如果用户需要可以以
    shell 作为命令,真正的命令作为数据输入,参考“无中生有造数据”的例子;
  • 注二,JAVA 和 PYTHON 的实际路径,可以从JAVA_HOME 和 PYTHON_HOME
    环境变量中得到作业;
  1. FAILED: ODPS-0010000:System internal error – fuxi
    job failed, WorkerPackageNotExist:需要设置set
    odps.task.major.version=unstructured_data
  2. FAILED: ODPS-0010000:System internal error –
    std::exception:Message: a timeout was
    reached:一般情况下是OTS的endpoint填写错误,导致ODPS没法访问,可以咨询ots_support旺旺账号
  3. logview invalid
    end_point:在执行过程中,会返回一个logview
    URL地址,如果使用浏览器访问该地址返回错误,可能是配置不对,请检查
    MaxCompute 配置,并咨询 odps_support 旺旺账号

宝马娱乐在线 5

WITH SERDEPROPERTIES ( — (2)

  1. awk 用户会很喜欢这个功能

set
odps.sql.planner.mode=lot;

摘要:
MaxCompute(原ODPS)是阿里云自主研发的具有业界领先水平的分布式大数据处理平台,
尤其在集团内部得到广泛应用,支撑了多个BU的核心业务。
MaxCompute除了持续优化性能外,也致力于提升SQL语言的用户体验和表达能力,提高广大ODPS开发者的生产力。

有时候用户在表格存储里面的数据有独特的结构,希望自己开发逻辑来处理每一行数据,比如解析特定的json字符串,这一块的开发也已经很方便了。

上面的语句仅仅是把value原样输出,但是熟悉awk的用户,从此过上了写awk脚本不写sql的日子

关系数据库已经存在半个世纪,有非常广泛的使用场景,但是在快速迭代的互联网领域其扩展性和
schema 灵活性被诟病颇多,因此类似 TableStore/BigTable/HBase
等强调扩展性和灵活性的NoSQL数据库逐步流行起来,这些 NoSQL 数据库只提供
API 接口,不提供 SQL 访问,这就导致很多熟悉 SQL
但是不喜欢写代码的用户没法很舒服的使用此类NoSQL数据库。基于此,表格存储开发团队联合
MaxCompute(下文中 ODPS 与 MaxCompute 同义)团队打通了 ODPS-SQL
访问表格存储的路径,这样一个只懂 SQL
的用户也可以愉快的访问表格存储里面的大量数据了。

MaxCompute基于ODPS2.0的SQL引擎,提供了SELECT
TRANSFORM功能,可以明显简化对脚本代码的引用,与此同时,也提高了性能!我们推荐您尽量使用SELECT
TRANSFORM。

CREATE EXTERNAL TABLE IF NOT
EXISTS
ots_vehicle_track

责任编辑:

STORED BY ‘com.aliyun.odps.TableStoreStorageHandler’
— (1)

  1. 可以串联着用,使用 distribute by和 sort by对输入数据做预处理

3.行bin/odpscmd,输入show
tables,正常执行则表示上面配置正确。

宝马娱乐在线 6

update_url=

第一弹 – 善用MaxCompute编译器的错误和警告

网络连通性

或者使用python

小提示:由于 MaxCompute 在 2.0
版本的计算框架才能支持直接访问 TableStore
数据,该版本还在灰度上线中,目前还需要 申请MaxCompute
2.0试用
,具体开通使用方法请参见 如何申请试用MaxCompute
2.0。

  1. 用odps跑测试

(

UDTF的优势:

set odps.sql.ddl.odps2=true;

MaxCompute(原ODPS)是阿里云自主研发的具有业界领先水平的分布式大数据处理平台,
尤其在集团内部得到广泛应用,支撑了多个BU的核心业务。
MaxCompute除了持续优化性能外,也致力于提升SQL语言的用户体验和表达能力,提高广大ODPS开发者的生产力。

摘要: 大数据计算服务 MaxCompute
能够提供强大的分析能力,而分布式 NoSQL
数据库表格存储在行级别上的实时更新和可覆盖性写入等特性,相对于
MaxCompute 内置表 append-only 批量操作,提供了一个很好的补充。

SELECT TRANSFORM 介绍

  • com.aliyun.odps.TableStoreStorageHandler 是 MaxCompute
    内置的处理 TableStore 数据的 StorageHandler, 定义了 MaxCompute 和
    TableStore 的交互,相关逻辑由 MaxCompute 实现。
  • SERDEPROPERITES
    可以理解成提供参数选项的接口,在使用 TableStoreStorageHandler
    时,有两个必须指定的选项,分别是下面介绍的
    tablestore.columns.mapping 和 tablestore.table.name。
    更多的可选选项将在后面其他例子中提及。
  • tablestore.columns.mapping
    选项:必需选项,用来描述对需要 MaxCompute 将访问的 TableStore
    表的列,包括主键和属性列。 这其中以 : 打头的用来表示 TableStore
    主键,例如这个例子中的 :vid:gt。 其他的均为属性列。
    TableStore支持最少1个,最多4个主键,主键类型为 bigint 或
    string,其中第一个主键为分区键。 在指定映射的时候,用户必须提供指定
    TableStore 表的
    所有主键,对于属性列则没有必要全部提供,可以只提供需要通过
    MaxCompute 来访问的属性列。
  • tablestore.table.name:需要访问的 TableStore 表名。
    如果指定的 TableStore 表名错误(不存在),则会报错,MaxCompute
    不会主动去创建 TableStore 表。
  • LOCATION 用来指定访问的 TableStore 的实例信息,包括
    instance 名字,endpoint 等。 
  • 数据格式对应,MaxCompute 与 TableStore
    的数据格式对应如下:

上面用的是perl。这其实不仅仅是语言支持的扩展,一些简单的功能,awk,
python, perl, shell
都支持直接在命令里面写脚本,不需要写脚本文件,上传资源等过程,开发过程更简单。另外,由于目前我们计算集群上没有php和ruby,所以这两种脚本不支持。

‘tablestore.columns.mapping’=’:vid, :gt, longitude, latitude, distance, speed,
oil_consumption’, —
(3)

  • SELECT TRANSFORM。

  • 场景1

  • 我的系统要迁移到MaxCompute平台上,系统中原来有很多功能是使用脚本来完成的,包括python,shell,ruby等脚本。
    要迁移到MaxCompute上,我需要把这些脚本全部都改造成UDF/UDAF/UDTF。改造过程不仅需要耗费时间人力,还需要做一遍又一遍的测试,从而保证改造成的udf和原来的脚本在逻辑上是等价的。我希望能有更简单的迁移方式。
  • 场景2
  • SQL比较擅长的是集合操作,而我需要做的事情要对一条数据做更多的精细的计算,现有的内置函数不能方便的实现我想要的功能,而UDF的框架不够灵活,并且Java/Python我都不太熟悉。相比之下我更擅长写脚本。我就希望能够写一个脚本,数据全都输入到我的脚本里来,我自己来做各种计算,然后把结果输出。而MaxCompute平台就负责帮我把数据做好切分,让我的脚本能够分布式执行,负责数据的输入表和输出表的管理,负责JOIN,UNION等关系操作就好了。

然后,准备好一个表格存储的实例以及一张数据表,表格存储实例管理,准备好实例名、EndPoint,为了区别其他产品的AccessId和AccessKey,后面我们称之为TableStore-InstanceName,TableStore-EndPoint。

宝马娱乐在线 7

select
cloud_metric_extract_md5(name,
name) as udf_test from test_table
limit 1;

理论上OpenMR的模型都可以映射到上面的计算过程。注意,使用map,reduce,select
transform这几个语法其实语义是一样的,用哪个关键字,哪种写法,不影响直接过程和结果。

关联的数据表信息如下:

标注

access_id=ODPS-AccessId

SELECT TRANSFORM 的优势:

)

第三弹 – 复杂类型

TableStore数据类型

MaxCompute数据类型

string

string

binary

blob

int

bigint

double

double

或者用map,reduce的关键字会让逻辑显得清楚一些

下面首先我们将介绍环境准备,这是所有后面的操作的基础。然后会介绍使用
OdpsCmd
访问表格存储。在第三节我们介绍使用 OdpsStudio
访问表格存储。最后介绍如何写 UDF、部署 UDF 以及在查询中使用 UDF。

上述功能可以使用SELECT TRANSFORM来实现

 

第二弹 – 新的基本数据类型与内建函数

distance double
,

该命令兼容Hive的Transform功能,可以参考Hive的文档。一些需要注意的点如下:

DROP TABLE IF EXISTS
ots_vehicle_track;

  1. 子进程和父进程是两个进程,而UDTF是单线程的,如果计算占比比较高,数据吞吐量比较小,可以利用服务器的多核特性
  2. 数据的传输通过更底层的系统调用来读写,效率比java高
  3. SELECT
    TRANSFORM支持的某些工具,如awk,是natvie代码实现的,和java相比理论上可能会有性能优势。

 

宝马娱乐在线 8

# confirm threshold for query input size(unit:
GB)

原标题:MaxCompute重装上阵 第五弹 – SELECT TRANSFOR

常见错误处理:

性能

 

宝马娱乐在线 9

FAQ

目前odps select transform完全兼容了hive的语法、功能和行为,包括
input/output row format 以及
reader/writer。Hive上的脚本,大部分可以直接拿来运行,部分脚本只需要经过少许改动即可运行。另外我们很多功能都用比hive更高执行效率的语言
(C++) 重构,用以优化性能。

3.打开bin/odpscmd,输入

或者

)

作者:隐林

speed double,

应用场景举例

5.创建一张 MaxCompute 的数据表关联到 TableStore
的某一张表。

提交作业可以看到执行计划(全部展开后的视图):

本篇文章就以一个小白用户的身份体验如何使用
MaxCompute-SQL 查询表格存储里面的数据,以及如何开发自定义逻辑(User
Defined Function, UDF)来处理用户特定的数据格式。

  1. UDTF是有类型,而Transform的子进程基于stdin/stdout传输数据,所有数据都当做string处理,因此transform多了一步类型转换;
  2. Transform数据传输依赖于操作系统的管道,而目前管道的buffer仅有4KB,且不能设置,
    transform读/写 空/满 的pipe会导致进程被挂起;
  3. UDTF的常量参数可以不用传输,而Transform没办法利用这个优化。

首先,准备好一个 MaxCompute 的工程,工程创建指导文档,准备好AccessId和AccessKey备用,为了区别其他产品的AccessId和AccessKey,后面我们称之为ODPS-AccessId,ODPS-AccessKey。并在RAM中授权
MaxCompute 访问 TableStore 的权限,授权方式请参考MaxCompute访问TableStore数据——授权

上次向您介绍了CTE,VALUES,SEMIJOIN,本篇向您介绍MaxCompute对其他脚本语言的支持

// 统计编号 4 以下的车辆在时间戳 1469171387
以前的平均速度和平均油耗

https_check=true

longitude double,

6.执行ODPS-SQL

一种快速、完全托管的TB/PB级数据仓库解决方案,提供多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题。

set odps.task.major.version=2dot0_demo_flighting;

环境准备

LOCATION ‘tablestore://vehicle-test.cn-shanghai.ots-internal.aliyuncs.com’; — (5)

如果网络不通,可以使用公网地址,TableStore原生支持 VPC
网络控制,也需要将网络类型设置为 “允许任意网络访问”

project_name=上面申请的ODPS工程名

目前ODPS-SQL访问 TaleStore
还在对执行逻辑进行深度的优化,如果有需求请联系ots_support,我们将针对业务场景来进行优化。

使用客户端 ODPS-CMD

oil_consumption double

 

1.按照MaxCompute
Studio文档的说明在IntelliJ里面安装MaxCompute-Java/MaxCompute-Studio插件,一旦插件安装完毕,就可以直接开发。

什么是表格存储 TableStore?

分布式NoSQL数据存储服务,无缝支持单表PB级数据及百万级访问并发,弹性资源,按量计费,对数据高频的增、删、改支持的很好,保证单行数据读写的强一致性。

写在最后

2.下载解压,将conf/odps_config.ini
的内容修改为:

2.打包之后可以上传到
MaxCompute,其中打包这里有需要注意的地方,File->Project
Structure->Artifacts, 填写好 Name 和 Output Directory 后,要点击
+ 选择输出模块,打包后通过 ODPS Project Explorer
来上传资源、创建函数,然后就可以在SQL中调用。

‘tablestore.table.name’=’vehicle_track’ —
(4)

set
odps.sql.preparse.odps2=lot;

gt bigint,

相关文章