AICon上海|与字节、阿里、腾讯等企业共同探索Agent 时代的落地应用 了解详情
写点什么

Amazon Redshift 助力 Equinox Fitness Clubs 完成客户旅程

  • 2019-09-26
  • 本文字数:6791 字

    阅读完需:约 22 分钟

Amazon Redshift 助力 Equinox Fitness Clubs 完成客户旅程

点击流分析工具可以很好地处理数据,有些工具甚至具有令人印象深刻的 BI 界面。但是,孤立地分析点击流数据存在很多限制。例如,客户对您网站上的产品或服务感兴趣,却在您的实体店购买。点击流分析师会问:“他们浏览产品之后发生了什么?”,而商业分析师会问:“他们购买之前发生了什么?”


点击流数据可以增强您的其他数据源,这并不足为奇。如果结合购买数据使用,它有助于您确定要放弃购买的产品或优化营销支出。同样,它还可以帮助您分析线上和线下行为,甚至是客户在注册帐户之前的行为。但是,一旦点击流数据馈送的好处显现,您必须快速适应新的请求。


这篇文章介绍了我们在 Equinox Fitness Clubs 如何将数据从 Amazon Redshift 迁移到 Amazon S3,以便对点击流数据使用后期绑定视图策略。我们将在这篇文章中广泛讨论 Apache Spark、Apache Parquet、数据湖、hive 分区和外部表等有趣的内容!


当我们开始将点击流数据从其自有工具传送到我们的 Amazon Redshift 数据仓库时,速度是主要考虑事项。我们的初始用例是将 Salesforce 数据与 Adobe Analytics 数据合并在一起,以便深入了解我们的潜在客户开发流程。Adobe Analytics 可以告诉我们潜在客户来自哪些渠道和宣传活动、访问期间浏览了哪些网页以及是否在我们的网站上提交了潜在客户表单。Salesforce 可以告诉我们潜在客户是否符合条件、是否咨询过顾问以及最终是否注册会员。将这两个数据集合并在一起有助于我们更好地理解和优化我们的营销策略。


首先,我们来了解将 Salesforce 和 Adobe Analytics 数据集中到 Amazon Redshift 所涉及的步骤。但即使在 Redshift 中合并在一起,也需要一个通用标识符才能进行交互。第一步是在我们的网站上提交潜在客户表单时,生成 GUID 并将相同的 GUID 发送到 Salesforce 和 Adobe Analytics。



接下来,我们要将 Salesforce 数据传送到 Redshift。幸运的是,这些馈送已存在,因此我们可以在馈送中添加新的 GUID 属性并在 Redshift 中对其进行描述。


同样,我们必须生成从 Adobe Analytics 到 Amazon Redshift 的数据馈送。Adobe Analytics 提供了 Amazon S3 作为我们数据的目标选项,因此我们将数据传送给 S3,然后创建作业将其发送到 Redshift。此作业涉及到获取每日 Adobe Analytics 馈送,并附有一个含有数百列和数十万行的数据文件、一系列查找文件(如数据标题)和一个描述已发送文件的清单文件 – 然后以其原始状态全部传送至 Amazon S3。接着,我们使用 Amazon EMR 和 Apache Spark 将数据馈送文件处理为单个 CSV 文件,然后将其保存到 S3,以便我们执行 COPY 命令将数据发送到 Amazon Redshift。


此作业持续了几周时间,在我们开始频繁使用数据之前运行良好。虽然作业效率很高,但新列数据发生了日期回溯的情况(模式演变)。因此我们确定需要更大的灵活性,这是由数据的性质决定的。

数据湖补救

当我们决定重构作业时,我们做了两项准备工作。首先,我们逐渐开始采用更多的数据湖策略。其次,近期发布Redshift Spectrum。它使我们能在数据湖中查询点击流数据的平面文件,而无需运行 COPY 命令并将其存储在 Redshift 中。此外,我们可以更有效地将点击流数据合并到存储在 Redshift 中的其他数据源。


我们想利用自描述数据,该数据将数据模式与数据本身相结合。将数据转换为自描述数据有助于我们管理广泛的点击流数据集,并防止模式演变相关的挑战。我们可以将所需的每一列都放入数据湖文件中,然后只使用查询中重要的列以加快处理。为实现这种灵活性,我们使用的是 Apache Parquet 文件格式,因为其列式存储技术,该格式不仅具有自描述性,而且速度很快。我们在 Amazon EMR 上使用 Apache Spark 将 CSV 转换为 Parquet 格式,并对数据分区以获区扫描性能,如以下代码所示。



from datetime import date, timedeltafrom pyspark.sql.types import *from pyspark.sql import SparkSessionimport jsonimport argparse # Usage# spark-submit all_omniture_to_parquet.py 2017-10-31 s3a:// eqxdl-prod-l-omniture eqxios eqxdl-prod eqeqxiosprod omniture_eqxios# python -m tasks.s2w_all_omniture_to_parquet 2017-10-31
parser = argparse.ArgumentParser()parser.add_argument('year_month_day_arg', help='Run date (yyyy-mm-dd)', type=str, default='XXX')parser.add_argument('s3_protocol', help='S3 protocol i.e. s3a://',type=str, default='XXX')parser.add_argument('source_bucket', help='Omniture source data bucket',type=str, default='XXX')parser.add_argument('source_path', help='Omniture source data path',type=str, default='XXX')parser.add_argument('target_bucket', help='Omniture target data bucket',type=str, default='XXX')parser.add_argument('report_suite', help='Omniture report suite ID',type=str, default='XXX')parser.add_argument('application', help='App name for job',type=str, default='XXX')args = parser.parse_args()
spark = SparkSession\ .builder\ .appName(args.application)\ .getOrCreate()
sc = spark.sparkContext
def manifest_toJSON(file, location): text = sc.textFile(file).collect() manifest = {'lookup_files': [], 'data_files': location, 'total_rows': 0} for x in text: if 'Lookup-File:' in x: manifest['lookup_files'].append(location+x.split(': ')[1]) elif 'Data-File: 01' in x: wildcard_path = x.replace('Data-File: 01','*') manifest['data_files'] += wildcard_path elif 'Record-Count:' in x: manifest['total_rows'] += int(x.split(': ')[1]) return manifest
# Create metadata by stitching together the file paths for# the header file and the data file from the manifest file# base_filepath = '/Users/rkelly/projects/sparkyTest/project_remodeling/ios_test_data/'base_filepath = '{}{}/{}/'.format(args.s3_protocol, args.source_bucket, args.source_path)manifest_filepath = base_filepath+'{}_{}.txt'.format(args.report_suite, args.year_month_day_arg)metadata = manifest_toJSON(manifest_filepath, base_filepath)
# Create a list of files and their data# Look specifically for the column_headers.tsv data# Split on \x00 to remove the garbage encoding and return a string of headerslookup_files = sc.textFile(','.join(metadata['lookup_files'])).collect()encoded_header = lookup_files[[idx for idx, s in enumerate(lookup_files) if 'column_headers.tsv' in s][0]].split('\x00')header = encoded_header[[idx for idx, s in enumerate(encoded_header) if '\t' in s][0]]\ .replace('\n', '')\ .replace('(', '')\ .replace(')', '')\ .replace(' ', '-')
# Create a schema for the list from the header file splitting on tabs# Cast everything as a string to avoid data type failuresschema = StructType([ StructField(field, StringType(), True) for field in header.split('\t')])
# Bypass RDD and write data file as a dataframe# then save as parquet to tie headers to their respective valuesdf = spark.read.csv(metadata['data_files'], header=False, schema=schema, sep='\t', nullValue=None)destination_filepath = '{}{}/{}/dt={}/'.format(args.s3_protocol, args.target_bucket, args.application, args.year_month_day_arg)df.write.mode('overwrite').parquet(destination_filepath)
# Gracefully exit out of spark and this filesc.stop()exit()
复制代码


借助 AWS Glue Data Catalog,我们可以在 Amazon Redshift 和其他查询工具(如 Amazon Athena 和 Apache Spark)中查询可用的点击流数据。这是通过将 Parquet 文件映射到关系模式来实现的。AWS Glue 允许在几秒钟内查询其他数据。这是因为模式会实时发生更改。这意味着您可以同时完成列删除/添加、列索引重新排序和列类型更改。然后,就可以在保存模式后立即查询数据。此外,Parquet 格式可防止数据形状发生变化时出现故障,或者放弃和删除数据集中的某些列。


我们使用以下查询为 Adobe Analytics 网站数据创建了第一个 AWS Glue 表。我们在 SQL Workbench 的 Amazon Redshift 中运行了此查询。


--First create your schemacreate external schema omniture_prodfrom data catalog database 'omniture' iam_role 'arn:aws:iam:::role
--Then create your “table” CREATE EXTERNAL TABLE omniture_prod.eqx_web ( date_time VARCHAR, va_closer_id VARCHAR, va_closer_detail VARCHAR, va_finder_detail VARCHAR, va_finder_id VARCHAR, ip VARCHAR, domain VARCHAR, post_evar1 VARCHAR)STORED AS PARQUETLOCATION 's3://eqxdl-prod/omniture/eqx_web/'table properties ('parquet.compress'='SNAPPY');
--Check your databases, schemas, and tablesselect * from pg_catalog.svv_external_databases;select * from pg_catalog.svv_external_schemas;select * from pg_catalog.svv_external_tables;
复制代码


运行此查询后,我们通过 AWS Glue 界面根据请求对 schema 添加了其他列。我们还使用分区来加快查询和降低成本。



此时,我们的数据库中有个新的 schema 文件夹。它包含可以查询的外部表,但我们希望更进一步。我们需要增加一些数据转换,例如:


  • 将 ID 重命名为字符串

  • 连接值

  • 操作字符串,不包括我们从 AWS 发送的用来测试网站的 bot 流量

  • 更改列名称以方便使用。

  • 为此,我们创建了如下所示的外部表视图:


create view edw_t.f_omniture_web as select    REPLACE(dt, '-', '') as hive_date_key,    va_closer_id,    va_closer_detail as last_touch_campaign,    CASE        WHEN (va_closer_id) = '1' THEN 'Paid Search'        WHEN (va_closer_id) = '2' THEN 'Natural Search'        WHEN (va_closer_id) = '3' THEN 'Display'        WHEN (va_closer_id) = '4' THEN 'Email Acq'        WHEN (va_closer_id) = '5' THEN 'Direct'        WHEN (va_closer_id) = '6' THEN 'Session Refresh'        WHEN (va_closer_id) = '7' THEN 'Social Media'        WHEN (va_closer_id) = '8' THEN 'Referring Domains'        WHEN (va_closer_id) = '9' THEN 'Email Memb'        WHEN (va_closer_id) = '10' THEN 'Social Placement'        WHEN (va_closer_id) = '11' THEN 'Other Placement'        WHEN (va_closer_id) = '12' THEN 'Partnership'        WHEN (va_closer_id) = '13' THEN 'Other Eqx Sites'        WHEN (va_closer_id) = '14' THEN 'Influencers'        ELSE NULL    END AS last_touch_channel,    va_finder_detail as first_touch_campaign,    va_finder_id as va_finder_id,    CASE        WHEN (va_finder_id) = '1' THEN 'Paid Search'        WHEN (va_finder_id) = '2' THEN 'Natural Search'        WHEN (va_finder_id) = '3' THEN 'Display'        WHEN (va_finder_id) = '4' THEN 'Email Acq'        WHEN (va_finder_id) = '5' THEN 'Direct'        WHEN (va_finder_id) = '6' THEN 'Session Refresh'        WHEN (va_finder_id) = '7' THEN 'Social Media'        WHEN (va_finder_id) = '8' THEN 'Referring Domains'        WHEN (va_finder_id) = '9' THEN 'Email Memb'        WHEN (va_finder_id) = '10' THEN 'Social Placement'        WHEN (va_finder_id) = '11' THEN 'Other Placement'        WHEN (va_finder_id) = '12' THEN 'Partnership'        WHEN (va_finder_id) = '13' THEN 'Other Eqx Sites'        WHEN (va_closer_id) = '14' THEN 'Influencers'        ELSE NULL    END AS first_touch_channel,    ip as ip_address,    domain as domain,    post_evar1 AS internal_compaign,    post_evar10 as site_subsection_nm,    post_evar11 as IOS_app_view_txt,    post_evar12 AS site_section_nm,    post_evar15 AS transaction_id,    post_evar23 as join_barcode_id,    post_evar3 AS page_nm,    post_evar32 as host_nm,    post_evar41 as class_category_id,    post_evar42 as class_id,    post_evar43 as class_instance_id,    post_evar60 AS referral_source_txt,    post_evar69 as adwords_gclid,    post_evar7 as usersec_tracking_id,    post_evar8 as facility_id,    post_event_list as post_event_list,     post_visid_low||post_visid_high as unique_adobe_id,    post_visid_type as post_visid_type,    post_page_event as hit_type,    visit_num as visit_number,    visit_start_time_gmt,    post_evar25 as login_status,    exclude_hit as exclude_hit,    hit_source as hit_source,    geo_zip,    geo_city,    geo_region,    geo_country,    post_evar64 as api_error_msg,    post_evar70 as page_load_time,    post_evar78 as join_transaction_id,    post_evar9 as page_url,    visit_start_pagename as entry_pg,    post_tnt as abtest_campaign,    post_tnt_action as abtest_experience,    user_agent as user_agent,    mobile_id as mobile_id,    cast(date_time as timestamp) as date_time,    CONVERT_TIMEZONE(        'America/New_York', -- timezone of origin        (cast(            case             when post_t_time_info like '%undefined%' then '0'            when post_t_time_info is null then '0'            when post_t_time_info = '' then '0'            when cast(split_part(post_t_time_info,' ',4) as int) < 0              then left(split_part(post_t_time_info,' ',4),4)            else left(split_part(post_t_time_info,' ',4),3) end as int        )/60),        cast(date_time as timestamp)    ) as date_time_local,    post_t_time_info as local_timezonefrom omniture_prod.eqx_webwhere exclude_hit = '0'and hit_source not in ('5','7','8','9')
and domain <> 'amazonaws.com'and domain <> 'amazon.com'
WITH NO SCHEMA BINDING;

复制代码


现在,我们可以从 Amazon Redshift 执行查询,将我们的结构化 Salesforce 数据与半结构化动态 Adobe Analytics 数据相结合。通过这些更改,我们的数据变得极其灵活、对存储大小非常友好、查询特别高效。从那时起,我们开始将 Redshift Spectrum 用于很多用例,包括数据质量检查、机器数据、历史数据存档,使我们的数据分析师和科学家能够更轻松地混合和加载数据。


with web_leads as (  select transaction_id,last_touch_channel  from edw_t.f_omniture_web  where hive_date_key = '20170301'      and post_event_list like '%201%'      and transaction_id != '807f0cdc-80cf-42d3-8d75-e55e277a8718'),opp_lifecycle as (  SELECT lifecycle,weblead_transactionid  FROM edw_t.f_opportunity  where weblead_transactionid is not null  and created_date_key between '20170220'and '20170310')select  web_leads.transaction_id,  coalesce(opp_lifecycle.lifecycle, 'N/A') as Lifecyclefrom web_leadsleft join opp_lifecycle on web_leads.transaction_id = opp_lifecycle.weblead_transactionid

复制代码

小结

通过将 Amazon S3 数据湖与 Amazon Redshift 合并,我们能为点击流数据构建高效灵活的分析平台。从而无需始终将点击流数据加载到数据仓库中,并且使平台适应传入数据中 schema 更改。请阅读 Redshift Spectrum 入门文档,还可以在下面观看我们在 AWS Chicago Summit 2018 上的演讲。


相关文章:


如果觉得这篇文章有用,请务必阅读从数据湖到数据仓库:利用 Amazon Redshift Spectrum 增强 Customer 360 和 Narrativ 通过 Amazon Redshift 帮助创建者将其数字内容货币化。


作者介绍:


Ryan Kelly 是 Equinox 的数据架构师,帮助草拟和实施数据计划框架。他还负责点击流跟踪,帮助团队深入了解他们的数字计划。Ryan 热衷于让人们更方便地访问和提取他们的数据,以获取商业智能、执行分析和丰富产品/服务。他还喜欢探索和审查新技术,了解是如何改善他们在 Equinox 的工作。


本文转载自 AWS 技术博客。


原文链接:


https://amazonaws-china.com/cn/blogs/china/closing-the-customer-journey-loop-with-amazon-redshift-at-equinox-fitness-clubs/


2019-09-26 10:42673
用户头像

发布了 1856 篇内容, 共 131.6 次阅读, 收获喜欢 81 次。

关注

评论

发布
暂无评论
发现更多内容

iOS开发:Xcode自带的模拟器常用快捷键的使用

三掌柜

8月日更 8月

基于AOP和HashMap原理学习,开发Mysql分库分表路由组件!

小傅哥

小傅哥 hashmap 分库分表 aop 数据散列

老用户运营从哪里切入?

boshi

运营 私域运营

Apache HBase MTTR 优化实践:减少恢复时长

华为云开发者联盟

Apache hadoop zookeeper HBase MapReduce服务

Compose 中的文字

Changing Lin

8月日更

【LeetCode】合并两个排序的链表Java题解

Albert

算法 LeetCode 8月日更

模块五作业

老实人Honey

架构训练营

分片上传Minio存储服务的问题集锦[推荐收藏]

liuzhen007

8月日更

在线文字图标logo文章封面图生成工具

入门小站

工具

AI+云原生,把卫星遥感虐的死去活来

华为云开发者联盟

AI 容器 云原生 k8s 遥感影像

netty系列之:自定义编码和解码器要注意的问题

程序那些事

Java Netty 程序那些事

【设计模式】状态模式

Andy阿辉

C# 编程 后端 设计模式 8月日更

手撸二叉树之单值二叉树

HelloWorld杰少

数据结构与算法 8月日更

低代码:时代的选择

华为云开发者联盟

云计算 软件开发 低代码 硬件 IT系统

敏捷实践 | 分不清Kanban和看板的只剩你了……

LigaAI

Scrum Kanban 敏捷开发 看板

三分钟快速了解 Cglib 动态代理

4ye

Java 后端 cglib 代理模式 8月日更

【Flutter 专题】64 图解基本 TextField 文本输入框 (一)

阿策小和尚

Flutter 小菜 0 基础学习 Flutter Android 小菜鸟 8月日更

Java筑基 - JNI到底是个啥

码农参上

Java jni 8月日更

面试官:你说说一条更新SQL的执行过程?

艾小仙

有效管理数据安全性—— Pulsar Schema 管理

Apache Pulsar

Apache Pulsar StreamNative schema

spring 大事务

Rubble

8月日更

架构实战营第一期 -- 模块五作业

clay

架构实战营

接口测试--apipost如何自定义变量

与风逐梦

软件测试 接口测试

2分钟玩转中文接口测试工具-ApiPost

CodeNongXiaoW

项目管理 大前端 测试 后端

JS对象拷贝:深拷贝和浅拷贝

华为云开发者联盟

js 对象 对象拷贝 深拷贝 浅拷贝

如何请求一个需要登陆才能访问的接口(基于cookie)——apipost

Proud lion

大前端 后端 Postman Cookie 接口工

聊一聊这些年看过的动漫

箭上有毒

8月日更

JVM集合之类加载子系统

阿Q说代码

JVM 加载 类加载器 双亲委派 8月日更

Python代码阅读(第12篇):初始化二维数组

Felix

Python 编程 Code Programing 阅读代码

vue入门:http客户端axios

小鲍侃java

8月日更

LT浏览器——响应式网站测试利器

FunTester

性能测试 自动化测试 web测试 兼容性测试 测试报告

Amazon Redshift 助力 Equinox Fitness Clubs 完成客户旅程_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章