写点什么

Amazon Redshift 助力 Equinox Fitness Clubs 完成客户旅程

  • 2019-09-26
  • 本文字数:6791 字

    阅读完需:约 22 分钟

Amazon Redshift 助力 Equinox Fitness Clubs 完成客户旅程

点击流分析工具可以很好地处理数据,有些工具甚至具有令人印象深刻的 BI 界面。但是,孤立地分析点击流数据存在很多限制。例如,客户对您网站上的产品或服务感兴趣,却在您的实体店购买。点击流分析师会问:“他们浏览产品之后发生了什么?”,而商业分析师会问:“他们购买之前发生了什么?”


点击流数据可以增强您的其他数据源,这并不足为奇。如果结合购买数据使用,它有助于您确定要放弃购买的产品或优化营销支出。同样,它还可以帮助您分析线上和线下行为,甚至是客户在注册帐户之前的行为。但是,一旦点击流数据馈送的好处显现,您必须快速适应新的请求。


这篇文章介绍了我们在 Equinox Fitness Clubs 如何将数据从 Amazon Redshift 迁移到 Amazon S3,以便对点击流数据使用后期绑定视图策略。我们将在这篇文章中广泛讨论 Apache Spark、Apache Parquet、数据湖、hive 分区和外部表等有趣的内容!


当我们开始将点击流数据从其自有工具传送到我们的 Amazon Redshift 数据仓库时,速度是主要考虑事项。我们的初始用例是将 Salesforce 数据与 Adobe Analytics 数据合并在一起,以便深入了解我们的潜在客户开发流程。Adobe Analytics 可以告诉我们潜在客户来自哪些渠道和宣传活动、访问期间浏览了哪些网页以及是否在我们的网站上提交了潜在客户表单。Salesforce 可以告诉我们潜在客户是否符合条件、是否咨询过顾问以及最终是否注册会员。将这两个数据集合并在一起有助于我们更好地理解和优化我们的营销策略。


首先,我们来了解将 Salesforce 和 Adobe Analytics 数据集中到 Amazon Redshift 所涉及的步骤。但即使在 Redshift 中合并在一起,也需要一个通用标识符才能进行交互。第一步是在我们的网站上提交潜在客户表单时,生成 GUID 并将相同的 GUID 发送到 Salesforce 和 Adobe Analytics。



接下来,我们要将 Salesforce 数据传送到 Redshift。幸运的是,这些馈送已存在,因此我们可以在馈送中添加新的 GUID 属性并在 Redshift 中对其进行描述。


同样,我们必须生成从 Adobe Analytics 到 Amazon Redshift 的数据馈送。Adobe Analytics 提供了 Amazon S3 作为我们数据的目标选项,因此我们将数据传送给 S3,然后创建作业将其发送到 Redshift。此作业涉及到获取每日 Adobe Analytics 馈送,并附有一个含有数百列和数十万行的数据文件、一系列查找文件(如数据标题)和一个描述已发送文件的清单文件 – 然后以其原始状态全部传送至 Amazon S3。接着,我们使用 Amazon EMR 和 Apache Spark 将数据馈送文件处理为单个 CSV 文件,然后将其保存到 S3,以便我们执行 COPY 命令将数据发送到 Amazon Redshift。


此作业持续了几周时间,在我们开始频繁使用数据之前运行良好。虽然作业效率很高,但新列数据发生了日期回溯的情况(模式演变)。因此我们确定需要更大的灵活性,这是由数据的性质决定的。

数据湖补救

当我们决定重构作业时,我们做了两项准备工作。首先,我们逐渐开始采用更多的数据湖策略。其次,近期发布Redshift Spectrum。它使我们能在数据湖中查询点击流数据的平面文件,而无需运行 COPY 命令并将其存储在 Redshift 中。此外,我们可以更有效地将点击流数据合并到存储在 Redshift 中的其他数据源。


我们想利用自描述数据,该数据将数据模式与数据本身相结合。将数据转换为自描述数据有助于我们管理广泛的点击流数据集,并防止模式演变相关的挑战。我们可以将所需的每一列都放入数据湖文件中,然后只使用查询中重要的列以加快处理。为实现这种灵活性,我们使用的是 Apache Parquet 文件格式,因为其列式存储技术,该格式不仅具有自描述性,而且速度很快。我们在 Amazon EMR 上使用 Apache Spark 将 CSV 转换为 Parquet 格式,并对数据分区以获区扫描性能,如以下代码所示。



from datetime import date, timedeltafrom pyspark.sql.types import *from pyspark.sql import SparkSessionimport jsonimport argparse # Usage# spark-submit all_omniture_to_parquet.py 2017-10-31 s3a:// eqxdl-prod-l-omniture eqxios eqxdl-prod eqeqxiosprod omniture_eqxios# python -m tasks.s2w_all_omniture_to_parquet 2017-10-31
parser = argparse.ArgumentParser()parser.add_argument('year_month_day_arg', help='Run date (yyyy-mm-dd)', type=str, default='XXX')parser.add_argument('s3_protocol', help='S3 protocol i.e. s3a://',type=str, default='XXX')parser.add_argument('source_bucket', help='Omniture source data bucket',type=str, default='XXX')parser.add_argument('source_path', help='Omniture source data path',type=str, default='XXX')parser.add_argument('target_bucket', help='Omniture target data bucket',type=str, default='XXX')parser.add_argument('report_suite', help='Omniture report suite ID',type=str, default='XXX')parser.add_argument('application', help='App name for job',type=str, default='XXX')args = parser.parse_args()
spark = SparkSession\ .builder\ .appName(args.application)\ .getOrCreate()
sc = spark.sparkContext
def manifest_toJSON(file, location): text = sc.textFile(file).collect() manifest = {'lookup_files': [], 'data_files': location, 'total_rows': 0} for x in text: if 'Lookup-File:' in x: manifest['lookup_files'].append(location+x.split(': ')[1]) elif 'Data-File: 01' in x: wildcard_path = x.replace('Data-File: 01','*') manifest['data_files'] += wildcard_path elif 'Record-Count:' in x: manifest['total_rows'] += int(x.split(': ')[1]) return manifest
# Create metadata by stitching together the file paths for# the header file and the data file from the manifest file# base_filepath = '/Users/rkelly/projects/sparkyTest/project_remodeling/ios_test_data/'base_filepath = '{}{}/{}/'.format(args.s3_protocol, args.source_bucket, args.source_path)manifest_filepath = base_filepath+'{}_{}.txt'.format(args.report_suite, args.year_month_day_arg)metadata = manifest_toJSON(manifest_filepath, base_filepath)
# Create a list of files and their data# Look specifically for the column_headers.tsv data# Split on \x00 to remove the garbage encoding and return a string of headerslookup_files = sc.textFile(','.join(metadata['lookup_files'])).collect()encoded_header = lookup_files[[idx for idx, s in enumerate(lookup_files) if 'column_headers.tsv' in s][0]].split('\x00')header = encoded_header[[idx for idx, s in enumerate(encoded_header) if '\t' in s][0]]\ .replace('\n', '')\ .replace('(', '')\ .replace(')', '')\ .replace(' ', '-')
# Create a schema for the list from the header file splitting on tabs# Cast everything as a string to avoid data type failuresschema = StructType([ StructField(field, StringType(), True) for field in header.split('\t')])
# Bypass RDD and write data file as a dataframe# then save as parquet to tie headers to their respective valuesdf = spark.read.csv(metadata['data_files'], header=False, schema=schema, sep='\t', nullValue=None)destination_filepath = '{}{}/{}/dt={}/'.format(args.s3_protocol, args.target_bucket, args.application, args.year_month_day_arg)df.write.mode('overwrite').parquet(destination_filepath)
# Gracefully exit out of spark and this filesc.stop()exit()
复制代码


借助 AWS Glue Data Catalog,我们可以在 Amazon Redshift 和其他查询工具(如 Amazon Athena 和 Apache Spark)中查询可用的点击流数据。这是通过将 Parquet 文件映射到关系模式来实现的。AWS Glue 允许在几秒钟内查询其他数据。这是因为模式会实时发生更改。这意味着您可以同时完成列删除/添加、列索引重新排序和列类型更改。然后,就可以在保存模式后立即查询数据。此外,Parquet 格式可防止数据形状发生变化时出现故障,或者放弃和删除数据集中的某些列。


我们使用以下查询为 Adobe Analytics 网站数据创建了第一个 AWS Glue 表。我们在 SQL Workbench 的 Amazon Redshift 中运行了此查询。


--First create your schemacreate external schema omniture_prodfrom data catalog database 'omniture' iam_role 'arn:aws:iam:::role
--Then create your “table” CREATE EXTERNAL TABLE omniture_prod.eqx_web ( date_time VARCHAR, va_closer_id VARCHAR, va_closer_detail VARCHAR, va_finder_detail VARCHAR, va_finder_id VARCHAR, ip VARCHAR, domain VARCHAR, post_evar1 VARCHAR)STORED AS PARQUETLOCATION 's3://eqxdl-prod/omniture/eqx_web/'table properties ('parquet.compress'='SNAPPY');
--Check your databases, schemas, and tablesselect * from pg_catalog.svv_external_databases;select * from pg_catalog.svv_external_schemas;select * from pg_catalog.svv_external_tables;
复制代码


运行此查询后,我们通过 AWS Glue 界面根据请求对 schema 添加了其他列。我们还使用分区来加快查询和降低成本。



此时,我们的数据库中有个新的 schema 文件夹。它包含可以查询的外部表,但我们希望更进一步。我们需要增加一些数据转换,例如:


  • 将 ID 重命名为字符串

  • 连接值

  • 操作字符串,不包括我们从 AWS 发送的用来测试网站的 bot 流量

  • 更改列名称以方便使用。

  • 为此,我们创建了如下所示的外部表视图:


create view edw_t.f_omniture_web as select    REPLACE(dt, '-', '') as hive_date_key,    va_closer_id,    va_closer_detail as last_touch_campaign,    CASE        WHEN (va_closer_id) = '1' THEN 'Paid Search'        WHEN (va_closer_id) = '2' THEN 'Natural Search'        WHEN (va_closer_id) = '3' THEN 'Display'        WHEN (va_closer_id) = '4' THEN 'Email Acq'        WHEN (va_closer_id) = '5' THEN 'Direct'        WHEN (va_closer_id) = '6' THEN 'Session Refresh'        WHEN (va_closer_id) = '7' THEN 'Social Media'        WHEN (va_closer_id) = '8' THEN 'Referring Domains'        WHEN (va_closer_id) = '9' THEN 'Email Memb'        WHEN (va_closer_id) = '10' THEN 'Social Placement'        WHEN (va_closer_id) = '11' THEN 'Other Placement'        WHEN (va_closer_id) = '12' THEN 'Partnership'        WHEN (va_closer_id) = '13' THEN 'Other Eqx Sites'        WHEN (va_closer_id) = '14' THEN 'Influencers'        ELSE NULL    END AS last_touch_channel,    va_finder_detail as first_touch_campaign,    va_finder_id as va_finder_id,    CASE        WHEN (va_finder_id) = '1' THEN 'Paid Search'        WHEN (va_finder_id) = '2' THEN 'Natural Search'        WHEN (va_finder_id) = '3' THEN 'Display'        WHEN (va_finder_id) = '4' THEN 'Email Acq'        WHEN (va_finder_id) = '5' THEN 'Direct'        WHEN (va_finder_id) = '6' THEN 'Session Refresh'        WHEN (va_finder_id) = '7' THEN 'Social Media'        WHEN (va_finder_id) = '8' THEN 'Referring Domains'        WHEN (va_finder_id) = '9' THEN 'Email Memb'        WHEN (va_finder_id) = '10' THEN 'Social Placement'        WHEN (va_finder_id) = '11' THEN 'Other Placement'        WHEN (va_finder_id) = '12' THEN 'Partnership'        WHEN (va_finder_id) = '13' THEN 'Other Eqx Sites'        WHEN (va_closer_id) = '14' THEN 'Influencers'        ELSE NULL    END AS first_touch_channel,    ip as ip_address,    domain as domain,    post_evar1 AS internal_compaign,    post_evar10 as site_subsection_nm,    post_evar11 as IOS_app_view_txt,    post_evar12 AS site_section_nm,    post_evar15 AS transaction_id,    post_evar23 as join_barcode_id,    post_evar3 AS page_nm,    post_evar32 as host_nm,    post_evar41 as class_category_id,    post_evar42 as class_id,    post_evar43 as class_instance_id,    post_evar60 AS referral_source_txt,    post_evar69 as adwords_gclid,    post_evar7 as usersec_tracking_id,    post_evar8 as facility_id,    post_event_list as post_event_list,     post_visid_low||post_visid_high as unique_adobe_id,    post_visid_type as post_visid_type,    post_page_event as hit_type,    visit_num as visit_number,    visit_start_time_gmt,    post_evar25 as login_status,    exclude_hit as exclude_hit,    hit_source as hit_source,    geo_zip,    geo_city,    geo_region,    geo_country,    post_evar64 as api_error_msg,    post_evar70 as page_load_time,    post_evar78 as join_transaction_id,    post_evar9 as page_url,    visit_start_pagename as entry_pg,    post_tnt as abtest_campaign,    post_tnt_action as abtest_experience,    user_agent as user_agent,    mobile_id as mobile_id,    cast(date_time as timestamp) as date_time,    CONVERT_TIMEZONE(        'America/New_York', -- timezone of origin        (cast(            case             when post_t_time_info like '%undefined%' then '0'            when post_t_time_info is null then '0'            when post_t_time_info = '' then '0'            when cast(split_part(post_t_time_info,' ',4) as int) < 0              then left(split_part(post_t_time_info,' ',4),4)            else left(split_part(post_t_time_info,' ',4),3) end as int        )/60),        cast(date_time as timestamp)    ) as date_time_local,    post_t_time_info as local_timezonefrom omniture_prod.eqx_webwhere exclude_hit = '0'and hit_source not in ('5','7','8','9')
and domain <> 'amazonaws.com'and domain <> 'amazon.com'
WITH NO SCHEMA BINDING;

复制代码


现在,我们可以从 Amazon Redshift 执行查询,将我们的结构化 Salesforce 数据与半结构化动态 Adobe Analytics 数据相结合。通过这些更改,我们的数据变得极其灵活、对存储大小非常友好、查询特别高效。从那时起,我们开始将 Redshift Spectrum 用于很多用例,包括数据质量检查、机器数据、历史数据存档,使我们的数据分析师和科学家能够更轻松地混合和加载数据。


with web_leads as (  select transaction_id,last_touch_channel  from edw_t.f_omniture_web  where hive_date_key = '20170301'      and post_event_list like '%201%'      and transaction_id != '807f0cdc-80cf-42d3-8d75-e55e277a8718'),opp_lifecycle as (  SELECT lifecycle,weblead_transactionid  FROM edw_t.f_opportunity  where weblead_transactionid is not null  and created_date_key between '20170220'and '20170310')select  web_leads.transaction_id,  coalesce(opp_lifecycle.lifecycle, 'N/A') as Lifecyclefrom web_leadsleft join opp_lifecycle on web_leads.transaction_id = opp_lifecycle.weblead_transactionid

复制代码

小结

通过将 Amazon S3 数据湖与 Amazon Redshift 合并,我们能为点击流数据构建高效灵活的分析平台。从而无需始终将点击流数据加载到数据仓库中,并且使平台适应传入数据中 schema 更改。请阅读 Redshift Spectrum 入门文档,还可以在下面观看我们在 AWS Chicago Summit 2018 上的演讲。


相关文章:


如果觉得这篇文章有用,请务必阅读从数据湖到数据仓库:利用 Amazon Redshift Spectrum 增强 Customer 360 和 Narrativ 通过 Amazon Redshift 帮助创建者将其数字内容货币化。


作者介绍:


Ryan Kelly 是 Equinox 的数据架构师,帮助草拟和实施数据计划框架。他还负责点击流跟踪,帮助团队深入了解他们的数字计划。Ryan 热衷于让人们更方便地访问和提取他们的数据,以获取商业智能、执行分析和丰富产品/服务。他还喜欢探索和审查新技术,了解是如何改善他们在 Equinox 的工作。


本文转载自 AWS 技术博客。


原文链接:


https://amazonaws-china.com/cn/blogs/china/closing-the-customer-journey-loop-with-amazon-redshift-at-equinox-fitness-clubs/


2019-09-26 10:42751
用户头像

发布了 1947 篇内容, 共 164.4 次阅读, 收获喜欢 81 次。

关注

评论

发布
暂无评论
发现更多内容

技术详解 阿里云AIoT物模型支撑设备规模已超亿级——设备管理运维类

阿里云AIoT

运维 安全 监控 物联网 芯片

阿里云AIoT物联网平台如何实现设备全球就近接入——设备接入类

阿里云AIoT

运维 监控 物联网 中间件 数据采集

专场直播预约 | KaiwuDB 离散制造业场景解决方案

KaiwuDB

数据库 KaiwuDB 离线制造业 行业解决发展

什么是智慧公厕?智慧公厕存在的意义!

光明源智慧厕所

智慧城市

AGI时代即将降临,现代化产业建设的出路又在何处?

加入高科技仿生人

低代码 AGI 现代化产业

阿里120W年薪架构师力荐750页微服务架构深度解析笔记

程序知音

Java 微服务 编程语言 后端技术

有效载荷标识与内容类型--MQTT 5.0新特性

EMQ映云科技

物联网 IoT mqtt 企业号 3 月 PK 榜 有效载荷标识

全能代码编辑器:CodeRunner 最新激活版

真大的脸盆

Mac 代码编辑器 Mac 软件 代码编辑 编辑代码

保姆级教程!基于声网 Web SDK实现音视频通话及屏幕共享

声网

实战|网站监控如何做好监测点管理与内网数据采集

云智慧AIOps社区

安全 监控 监控宝 云智慧 网站监控

火山引擎DataLeap:数据秒级生产,揭秘电商实时数仓最佳实践!

字节跳动数据平台

数据治理 电商 数据研发 企业号 3 月 PK 榜

【3.10-3.17】写作社区优秀技术博文一览

InfoQ写作社区官方

热门活动 优质创作周报

Matlab常用图像处理命令108例(八)

timerring

图像处理

从“可用”到“好用” 京东云构建融合开放适配国产化应用的全栈产品矩阵

京东科技开发者

国产化 京东云 国产化替代 京东云峰会

焱融为国家电网打造存算一体的融合基础架构 助推能源行业新基建

焱融科技

文件存储 容器存储 分布式文件存储 高性能存储 国家电网

企业支出如何一眼看全局,用友BIP很在行

用友BIP

商旅费控

物联网数据应用开发最佳实践——数据价值类

阿里云AIoT

数据挖掘 物联网 存储 数据管理 调度

大资管行业数字化转型解决方案 | 行业方案

袋鼠云数栈

大数据 数字化转型 解决方案

Macbook技巧,Type-c接口失灵怎么办

互联网搬砖工作者

基于声网 Flutter SDK 实现互动直播

声网

flutter

PS 2023版本 24.2有哪些新功能?增加了哪些相机配置?

Rose

ps ps 2023 Photoshop 2023下载

解决运行VMWare虚拟机报错“打不开 /dev/vmmon:断裂管道”

互联网搬砖工作者

共享订阅--MQTT 5.0新特性

EMQ映云科技

物联网 IoT mqtt 企业号 3 月 PK 榜 共享订阅

GuavaCache与物模型大对象引起的内存暴涨分析——设备管理运维类

阿里云AIoT

缓存 算法 监控 物联网 数据格式

Tapdata Connector 实用指南:云原生数仓场景之数据实时同步到 Databend

tapdata

数据库 大数据

浪潮inBuilder低代码平台社区版来了!

inBuilder低代码平台

开源 低代码 企业级低代码平台

NFTScan 与 UniPass 达成合作伙伴,双方在多链 NFT 数据方面展开合作!

NFT Research

NFT

浅谈DWS函数出参方式

华为云开发者联盟

数据库 后端 华为云 华为云开发者联盟 企业号 3 月 PK 榜

8年Java架构师面试官教你正确的面试姿势,10W字面试题搞定春招!

小小怪下士

Java 程序员 后端 java面试

什么是安全文件传输

镭速

未来智安入选FreeBuf《CCSIP 2022中国网络安全行业全景册(第五版)》

未来智安XDR SEC

Amazon Redshift 助力 Equinox Fitness Clubs 完成客户旅程_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章