0

试用Azure Data Factory

云计算与大数据可谓倚天屠龙相辅相依。为了大数据处理,企业往往需要先把云上云下各种各样的数据汇集在云存储中。一个典型的用户案例是,企业的业务数据往往保存在企业内部SQL Server中,需要先导入Azure Blob才能够调用HD Insight(Azure中的Hadoop发行版本)进行大规模并行处理。Azure Data Factory(简称ADF)就是微软为了解决这个问题而发布的在线服务。使用ADF,用户可以方便地开发高效率并且可扩展的数据管道来搬运并处理数据。

下图展示了ADF的应用程序模型:

其中:

  • 连接与收集:连接到各种各样的数据源
  • 变形与增强:创建管道以复制用户数据到数据中心方便处理
  • 发布:发布处理好的数据以便进一步分析处理

百闻不如一见,还是让我们把玩一下,创建一个从SQL Server到Azure Storage的数据管道。

首先在Azure门户里面建立一个Data Factory,名字叫做CoolDataFactory。并且创建一个名为CoolResourceGroup的资源组与之相关联。资源组是Azure新引入的概念,可以一起管理多个资源的生命周期。创建完后界面如下:

接下来让我创建Linked Service。所谓Linked Service,是指数据管道可以处理的数据或者计算。在这个预览版本中只有Azure Storage、(企业防火墙内部的)SQL Server和Azure Database可供使用。

首先让我们来创建Azure Storage。创建过程非常直观,根据提示提供帐户名和密码即可。创建SQL Server则需要一点技巧。由于数据库在企业内部,从云端尝试直接访问一定会被企业防火墙所阻拦。ADF提供了Data Management Gateway(第一个版本是我设计的,嘿嘿),通过浏览器便可以轻松安装在本机,以便连接SQL Server并且把数据发送到云端。

至此,我们实际已经创建了三个Linked Service:Azure Storage、SQL Server、以及Gateway。为了方便用户,Azure门户把前两者归类为Data Store与Gateway区分开来,也算比较贴切。

可能是由于还处在预览阶段,从数据源进一步定义数据集并没有提供任何UI,居然要求用户手写JSON定义用PowerShell执行,一夜回到解放前啊……(这个跟我没关系)

以下是从SQL Server定义源数据集的JSON源代码:

{
    "name": "EmpOnPremSqlTable",
    "properties":
    {
        "location":
        {
            "type": "OnPremisesSqlServerTableLocation",
            "tableName": "Emp",
            "linkedServiceName": "CoolSqlServer"
        },
        "availability":
        {
            "frequency": "Hour",
            "interval": 1,
            "waitOnExternal":
            {
                "retryInterval": "00:01:00",
                "retryTimeout": "00:10:00",
                "maximumRetry": 3
            }
        }
    }
}

以下是从Azure Blob定义目标数据集的JSON源代码:

{
    "name": "EmpTableFromBlob",
    "properties":
    {
        "structure":
        [
            { "name": "Id", "type": "Int" },
            { "name": "FirstName", "type": "String" },
            { "name": "LastName", "type": "String" }
        ],
        "location":
        {
            "type": "AzureBlobLocation",
            "folderPath": "adftutorial",
            "fileName": "emp.txt",
            "format":
            {
                "type": "TextFormat",
                "columnDelimiter": ","
            },
            "linkedServiceName": "CoolBlobStorage"
        },
        "availability":
        {
            "frequency": "hour",
            "interval": 1,
        }
    }
}

简而言之,SQL Server里面有个表叫做Emp的表,以一小时为间隔,需要内容它以CSV格式复制到Azure Blob中。由于CSV不像数据库有schema,需要在数据集定义中显示声明。

然后需要用以下命令在CoolDataFactory中创建出来:

New-AzureDataFactoryTable -ResourceGroupName CoolResourceGroup -DataFactoryName CoolDataFactory –File EmpOnPremSqlTable.json

New-AzureDataFactoryTable -ResourceGroupName CoolResourceGroup -DataFactoryName CoolDataFactory –File EmpTableFromBlob.json

现在可以定义数据管道如下:

{
    "name": "SqlToBlobPipeline",
    "properties":
    {
        "description" : "Copy data from on-prem SQL to Azure blob",
        "activities":
        [
            {
                "name": "CopyFromSQLtoBlob",
                "description": "Copy data from on-prem SQL server to blob",
                "type": "CopyActivity",
                "inputs": [ {"name": "EmpOnPremSqlTable"} ],
                "outputs": [ {"name": "EmpTableFromBlob"} ],
                "transformation":
                {
                    "source":
                    {
                        "type": "SqlSource",
                        "sqlReaderQuery": "select * from emp"
                    },
                    "sink":
                    {
                        "type": "BlobSink"
                    }
                },
                "Policy":
                {
                    "concurrency": 1,
                    "executionPriorityOrder": "NewestFirst",
                    "style": "StartOfInterval",
                    "retry": 0,
                    "timeout": "01:00:00"
                }
             }
        ]
    }
}

这个管道里面仅仅有一个CopyActivity,把数据从SQL Server数据集复制到Azure Storage数据集。

用以下命令在CoolDataFactory中创建这个数据管道:

New-AzureDataFactoryPipeline -ResourceGroupName CoolResourceGroup -DataFactoryName CoolDataFactory -File SqlToBlobPipeline.json

回到ADF可以观看数据集与数据管道之间的图示:

现在可以激活这个管道了:

Set-AzureDataFactoryPipelineActivePeriod -ResourceGroupName CoolResourceGroup -DataFactoryName CoolDataFactory -StartDateTime 2014-12-02 –EndDateTime 2014-12-03 –Name SqlToBlobPipeline

这样数据便会源源不断地写入Azure Blob中。感兴趣的话可以参照Use Pig and Hive with Data Factory进一步创建HD Insight Activity做一个完整的应用场景。

最后推荐一下Mike Flasko同学在TechEd Europe 2014的精彩讲演Introducing Data Factory: Orchestration on Big Data,许多干货。

 



张 琪

发表评论

电子邮件地址不会被公开。 必填项已用*标注