写点什么

实现标记自动化:将海量元数据引入 Data Catalog

Shirley Cohen & Shekhar Bapat

  • 2020-11-03
  • 本文字数:4899 字

    阅读完需:约 16 分钟

实现标记自动化:将海量元数据引入 Data Catalog

Data Catalog 让您能够通过交互式接口摄取和编辑业务元数据。它包括可用于实现常见任务自动化的编程接口。许多企业必须使用 Data Catalog 定义和采集一组元数据,因此,我们将在这里提供一些关于如何从长期角度声明、创建和维护这类元数据的最佳实践。


在以往的文章中,我们介绍了标签模板能够如何通过描述用于对数据资产进行分类的词汇表来促进数据发现、治理和质量控制。在本文中,我们将探讨如何使用标签模板对数据进行标记。标记指创建一个标签模板的实例并为模板字段分配值,以对特定数据资产进行分类。撰写本文时,Data Catalog 支持三种存储后端:BigQuery、Cloud Storage 和 Pub/Sub。在此,我们将着重介绍如何对在这些后端中存储的资产进行标记,例如,表、列、文件和消息主题。


我们将介绍适合在数据湖和数据仓库环境中标记数据的三种使用模式:配置新数据源、处理派生数据以及更新标签和模板。对于每种应用场景,您将了解到我们推荐的用于大规模标记数据的方法。

1. 配置数据源

配置数据源通常涉及几种活动:根据存储后端创建表或者文件、利用一些初始数据填充它们以及对这些资源设置访问权限。我们在此基础至上还多增加了一种活动:在 Data Catalog 中标记新创建的资源。以下是涉及的具体步骤:


标记数据源需要了解拟使用的标签模板的含义以及数据源中的数据语义的领域专家。基于所具有的知识,领域专家会选择附加哪些模板以及从这些模板创建哪类标签。鉴于许多决策依赖于标签的准确性,人的参与至关重要。


基于我们与客户的合作经验,我们观察到两种类型的标签。一种类型称为 static(静态),因为,字段值是预先知道的,并且预计很少变更。另一种类型称为 dynamic(动态),因为字段值会根据基础数据的内容定期变更。静态标签的一个示例是包括 data_domain(数据域)、data confidentiality(数据保密性)和 data_retention(数据保留)的数据治理字段的集合。这些字段的值由组织的数据使用策略决定。它们通常在数据源创建时便已知晓,而且不会频繁变更。动态标签的一个示例是数据质量字段的集合,例如,number_values(数值)、unique_values(唯一值)、min_value(最小值)和 max_value(最大值)。每当运行新的负载或者对数据源进行修改时,这些字段值预计会频繁变更。


除了这些差异,静态标签还有级联属性,表明其字段应当以何种方式传播 —— 从源到派生数据。(在后续部分,我们将进一步详解这一概念。)与此形成对照的是,动态标签有查询表达式和刷新属性,指示应当用于计算字段值的查询以及计算的频率。在第一个代码段显示了一个静态标签的配置示例,第二个代码段显示的是动态标签的示例。


tag_config:    template:     - template_id: dg_template     - project_id: sandbox     - region: us-central1   fields:      - {name: data_domain, value: HR, cascade: true}     - {name: data_confidentiality, value: SENSITIVE, cascade: true}     - {name: data_retention, value: 30_DAYS, cascade: false}   lineage:      - template_id: derived_template     - parents_field: parents   rules:     - included_uri_patterns: bigquery/project/sandbox/dataset/covid/*     - excluded_uri_patterns: bigquery/project/sandbox/dataset/covid/Employee_input_*     - included_uri_patterns: pubsub/project/sandbox/subscriptions/employee-RTO

复制代码


基于 YAML 的静态标签配置


tag_config:    template:     - template_id: dg_template     - project_id: sandbox     - region: us-central1   refresh: 1-hour   fields:      - {name: count, query_expression: select count(rto) from $$}     - {name: unique_values, query_expression: select distinct rto from $$}     - {name: null_values, query_expression: select count(*) from $$ where rto is null}   rules:     - included_uri_patterns: bigquery/project/sandbox/dataset/covid/Employee_RTO.rto
复制代码


基于 YAML 的动态标签配置


所前所述,当他们为数据源设置标记时,领域专家将为这些配置提供输入。更具体而言,他们首先选择要为数据源附加的模板。其次,他们会选择拟使用的标签类型,即,静态还是动态。接下来,他们会输入各字段的值,如果类型是静态的,还要输入级联设置,如果类型是动态的,要输入查询表达式和刷新设置。这些输入通过 UI 提供,因此,领域专家无需编写原始 YAML 文件。


一旦生成 YAML 文件,工具将基于规范在 Data Catalog 中解析配置并创建实际标签。工具还会根据刷新设置安排动态标签的重新计算。尽管需要领域专家进行初始输入,但实际的标记任务可完全自动化。我们推荐采用以下这种方法,这样,不仅能在发布时对新创建的数据源进行标记,而且无需人工操作即可对标签进行长期维护。

2. 处理派生数据

除了标记数据源,能够对派生数据进行大规模标记同样至关重要。我们将派生数据宽泛地定义为以从一个或多个数据源转换的方式创建的任何数据段。这种类型的数据与数据湖和数据仓库场景尤其具有相关性,在这类环境中,数据产品通常从各种数据源派生。


派生数据的标签应当由原数据源和应用于数据的转换类型组成。原数据源的 URI 被保持在标签中,并且在标签中还存储一个或多个转换类型 —— 例如,聚合、匿名化、归一化等。我们建议将标记创建逻辑融入生成派生数据的管道中。利用 Airflow DAGs 和 Beam,这是可行的。例如,如果一个数据管道连接两个数据源,聚合结果并将其存储到表中,您可以参考两个原数据源以及 aggregation:true,基于结果表创建标签。以下,您可以看到创建此标签的 Beam 管道的代码段:


  with beam.Pipeline(options=pipeline_options) as p:     sql = 'select covid_county, covid_state, sum_new_cases from            views.v_covid_new_cases'      bq_source = beam.io.BigQuerySource(query=sql, use_standard_sql=True)      covid_query_results = p | 'Read from BigQuery' >> beam.io.Read(bq_source)     subscription_name = 'projects/scohen-sandbox/subscriptions/employee-RTO'                           message = p | 'Read message' >> ReadFromPubSub(topic=None,                   subscription=subscription_name, timestamp_attribute=None)     emp_pcoll = message | 'Get Age' >> beam.ParDo(GetAge())                                                                                joined_emp_pcoll = emp_pcoll | 'Join Data' >> beam.ParDo(Join(),                         beam.pvalue.AsList(covid_query_results))     batch_joined_pcoll = joined_emp_pcoll | 'Batch Join' >>                          BatchElements(min_batch_size=10, max_batch_size=20)                                                                                                                                masked_dob_pcoll = batch_joined_pcoll | 'Mask DOB' >> beam.ParDo(MaskDOB())            batch_masked_pcoll = masked_dob_pcoll| 'Batch Mask' >>                           BatchElements(min_batch_size=10, max_batch_size=20)                                                                                                       bucket_age_pcoll = batch_masked_pcoll | 'Bucket Age' >>                         beam.ParDo(BucketAge())       batch_age_pcoll = bucket_age_pcoll | 'Batch Bucket Age' >>                        BatchElements(min_batch_size=4, max_batch_size=5)                                                                                                                     hash_id_pcoll = batch_age_pcoll | 'Hash Id' >> beam.ParDo(HashId())               hash_id_pcoll | 'Write Table' >> WriteToBigQuery(table, schema)     # Tag Employee_RTO table with Derived Data template      template = 'derived_template'       dc_client = datacatalog_v1.DataCatalogClient()     tag.template = dc_client.tag_template_path(project_id, region, template)      tag = datacatalog_v1.types.Tag()          table_resource = '//bigquery.googleapis.com/projects/' + project_id +                       '/datasets/' + dataset + '/tables/' + short_table_name      table_entry = dc_client.lookup_entry(linked_resource=table_resource)     tag.fields['parents'].string_value = 'pubsub/project/sandbox/subscriptions/employee-RTO,bigquery/project/sandbox/dataset/views/v_covid_new_cases'      tag.fields['aggregated_data'].bool_value = False     tag.fields['pseudo_anonymized_data'].bool_value = True     tag.fields['anonymized_data'].bool_value = False     tag.fields['origin_product'].enum_value.display_name = 'DATAFLOW'            long_ts = datetime.now(tz.gettz("America/Chicago")).isoformat()     ts = timestamp_value[0:19] + timestamp_value[26:32]      tag.fields['date_data_processed'].timestamp_value.FromJsonString(ts)       response = dc_client.create_tag(parent=table_entry.name, tag=tag)
复制代码


带标记逻辑的 Beam 管道


一旦利用其原数据源对派生数据进行标记,您可以使用此信息传播附加到原数据源的静态标签。这是 cascade 属性发挥作用的地方,指示哪些字段应当被传播到其派生数据。在以上所示的第一个代码段中显示了 cascade 属性的示例,其中 data_domain 和 data_confidentiality 字段被传播,而 data_retention 字段未被传播。这意味着,BigQuery 中派生的任何表将使用 dg_template 利用 data_domain:HR 和 data_confidentiality:CONFIDENTIAL 进行标记。

3. 处理更新

有几种场景需要针对标签和模板的更新能力。例如,如果业务分析师发现标签中的一个错误,需要对一个或多个值进行更正。如果要采用新的数据使用策略,可能需要为模板添加新的字段并对现有字段重命名或者删除。


我们为标记和模板更新提供配置,如下图所示。标记更新配置指定将变更的每个字段的当前值和新值。工具处理配置并基于规范更新标签中的字段的值。如果更新的标签是静态的,工具还会将变更传播至派生数据的相同标签。


模板更新配置制定变更的字段名、字段类型以及任何枚举值。工具通过首先确定变更的性质来处理更新。撰写本文时,Data Catalog 支持对模板添加和删除字段以及添加枚举值,但尚不支持字段重命名或者类型变更。因此,如果需要简单添加或者删除,工具会对现有模板进行修改。否则,必须重新创建整个模板以及所有从属标签。


  tag_config:  template:   - template_id: dg_template   - project_id: sandbox   - region: us-central1   fields:      - {name: data_confidentiality, current: SENSITIVE, new: SHARED_INTERNALLY, cascade: true}   - {name: data_retention, current: 30_DAYS, new: 60_DAYS, cascade: false}   rules:     - included_uri_patterns: bigquery/project/sandbox/dataset/covid/*   - excluded_uri_patterns: bigquery/project/sandbox/dataset/covid/Employee_input_*   - included_uri_patterns: pubsub/project/sandbox/subscriptions/employee-RTO

复制代码


基于 YAML 的标签更新配置


  template_config:  - template_id: dg_template   - project_id: sandbox   - region: us-central1   fields:     - {name: data_confidentiality, type: enum, values: {SENSITIVE, SHARED_INTERNALLY, SHARED_EXTERNALLY, PUBLIC, UNKNOWN}    - {name: data_retention, type: enum, values: {30_DAYS, 60_DAYS, 90_DAYS, 120_DAYS, 1_YEAR, 2_YEARS, 5_YEARS, UNKNOWN}

复制代码


基于 YAML 的模板更新配置


我们已经开始对这些方法进行原型创建,以发布一个开源工具,实现按照我们建议的使用模式在 Data Catalog 中创建和维护标签所涉及的许多任务的自动化。


2020-11-03 13:151358

评论 4 条评论

发布
用户头像
2021-02-27 11:21
回复
赞赞赞
2021-02-27 11:21
回复
赞赞赞赞赞赞
2021-02-27 11:22
回复
赞赞赞赞赞赞赞赞赞
2021-02-27 11:22
回复
没有更多了
发现更多内容
实现标记自动化:将海量元数据引入 Data Catalog_文化 & 方法_InfoQ精选文章