1

在Azure上把玩Spark

Spark是开源的大数据分析框架。要在如日中天的Hadoop面前站稳脚跟,Spark肯定得有两把刷子。不错,Spark最引以为傲的是分布式可容错的内存计算,可选择性地把计算结果缓冲在内存中,为交互式查询以及迭代任务提供了高性能的保障。相比之下,Hadoop把每步MapReduce的结果都老老实实存储在HDFS中,这网络和磁盘开销,足够在Hive返回结果前泡杯咖啡啦。

Spark在业界是非常火爆的,下面的Google Trend就是一个很好的证明:红色的Apache Spark正在快速超越Apache Hadoop!当然,Hadoop社区也没有闲着,包括Tez等技术也在大力推进中,此是后话。

azure-spark-01

与Hadoop一样,Azure也很自然地把Spark做成了托管服务,用户只要在Azure门户网站创建实例便可以使用,然后为使用的计算埋单,省却了设备与运维的烦恼。此外,Azure还集成了Zeppelin,这样用户便可以在网页中用Scala或者Spark SQL查询分析以及可视化数据了。

下面让我们看看如何在Azure中使用Spark。

首先是最经典的数单词例子,用到的bible.txt是钦定版圣经,可以在这里下载:

val text = sc.textFile("wasb://.../bible.txt")
val regex = "[.,:;?!]".r
val count = text.flatMap(line => line.split(" "))
                .map(word => regex.replaceAllIn(word, ""))
                .map(word => (word, 1))
                .reduceByKey((a, b) => a + b)
                .sortBy(_._2, false)
count.collect()

其中,sc是Zeppelin提供的Spark Context,text是一个Spark的可容错分布式数据集(Resilient Redistributed Dataset,RDD),表示一行行的文字集合。之后,regex定义了不想要的标点符号的正则表达式。最后,就可以分割每行单词、过滤标点符号、计算单词总数,然后排序输出。在Zeppelin输入以上代码并运行,很快就会得到结果:除了介词、助词、冠词等之外,Lord、God、Israel是圣经中用得最多的单词。Load和God还没有机会见到,不过Israel借着公差去过好几回啦,assalam alaikum、shalom、hello什么都会说,谢谢!

很明显,与Hadoop面向过程写Java代码的编程方式不同,Spark是基于Scala的函数式开发,在数据集上的每个操作都是一次变形,串联起来便形成一个数据流。这种声明式的编程范式远比过程式的更适合刻画数据流,这点也被Cascading、Google Dataflow、Storm Trident、Linq、Java Streaming API等证实。

接着让我们看看Spark为了支持表格型数据而提供的Spark SQL。我们将使用Azure为每个Spark实例内置的样本文件tweets.txt,里面实际上是许多树形JSON文档的集合,表示一个个的tweet。

下面的代码展示了用Spark SQL读取这个文件并选择追随者大于10000人然后倒序排序:

var df = sqlContext.load("wasb://.../tweets.txt", "json")
df.filter(df("user.followers_count") > 10000)
  .select("user.name", "user.followers_count")
  .orderBy(desc("user.followers_count"))
  .show()

其中,sqlContext是Zeppelin提供的Spark SQL Context,它的load方法返回了Data Frame风格的接口,提供了到关系数据库、文件、Hive等的数据的存取,以及select、filter、group by、order by等关系代数算子,方便处理数据流。此外,Spark SQL还内置了Catalyst优化器,会对表达式进行分析并找出最优执行计划,比如谓词下推等等,来提高执行效率。

结果如下,云计算果然还是很火的啊:

name               followers_count
Ericsson Research  41291          
The Cloud Network  30118          
WeNurses           25489          
Metafisica         15802          
CIO Insight        15183          
Cloud Blogs        14452          
Brony Retweet      10019

对于带有聚合的复杂查询,Data Frame也有力不从心的时候,这时Zeppelin内置的SQL语句支持就非常好用了。比如,可以通过以下Scala语句定义临时表:

var df = sqlContext.load("wasb://.../tweets.txt", "json")
df.registerTempTable("tweets")

之后,便可以新开一个Zeppelin窗口,输入如下SQL语句并查看结果:

%sql
SELECT DISTINCT user.name, user.followers_count as followers
FROM tweets
ORDER BY followers
DESC LIMIT 10

azure-spark-02

当然,你还可以切换其他可视化,比如柱状图,如下:

azure-spark-03

至此,希望你对Spark有了初步的了解。虽然只是一个年轻的产品,Spark却有着不小的雄心:一方面提供统一的语言来刻画数据导入、变形、分析以及导出,另一方面又以分布式可容错的内存计算作为高性能的保障,外加对机器学习、图计算、流式计算的支持,绝对有潜力成为炙手可热的明星产品!

 



张 琪

One Comment

发表评论

电子邮件地址不会被公开。 必填项已用*标注