Beam学习笔记

2023-09-25 18 0

Beam学习笔记

编程指导

https://beam.apache.org/documentation/programming-guide/

  • 创建驱动程序,定义pipeline,包括输入、转换、输出,以及执行参数(主要包括runner,决定pipeline运行的后端)

    1. 创建pipeline,设置参数
    2. 创建初始的PCollection
    3. 对PCollection应用PTransform:可以对PCollection中每个元素进行修改、过滤、分组、分析等操作,输出新的PCollection;PTransform可以很复杂,自由组合
    4. 输出最终的PCollection到外部数据源
    5. 在真实的Runner上运行pipeline
  • 抽象出了

    • Pipeline:囊括了从头到尾的整个数据处理任务,包括读取输入数据,转换该数据以及写入输出数据。其描述了由PCollection为节点,PTransform为边组成的DAG。
    • PCollection:分布式数据集,不可变,有界或无界(流式数据)
    • PTransform:表示数据处理操作或步骤,输入1个或多个PCollection,对每个元素做处理,输出0个或多个PCollection
    • I/O Source和Sink PTransform的IO库,用来将数据读取或写入各种外部存储系统的库

Pipeline

  • PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
    解析命令行获取参数 .withValidation验证必填参数是否存在,参数是否合法

  • 扩展参数
    通过getter和setter,用注解提供help说明和默认值

public interface MyOptions extends PipelineOptions {@Description("Input for the pipeline")@Default.String("gs://my-bucket/input")String getInput();void setInput(String input);@Description("Output for the pipeline")@Default.String("gs://my-bucket/output")String getOutput();void setOutput(String output);
}// 注册使用
PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

PCollection

  • TextIO.Read
    reads from an external text file and returns a PCollection whose elements are of type String, each String represents one line from the text file

  • Create.of(Java集合)

  • setCoder(StringUtf8Coder.of()) 元素编码器

  • PCollection特性

    • 只属于一个Pipeline,不同Pipeline间不能共享
    • 支持任意类型,但元素的类型必须相同,即一个PCollection只能包含一种类型的元素
    • 支持元素schema:大部分情况下PCollection的元素支持反射,如JSON、ProtoBuffer、Avro和数据库记录。schema可以为元素字段提供命名,操作起来更丰富灵活。
    • 不支持随机访问单独元素。PTransform单独处理每个元素,元素之间无关
    • 无界数据看作由连续的有限窗口组成,聚合转换作用与单个窗口
    • 每个元素都有一个时间戳,一般用元素被读取或添加的时间,也可以用元素自带的时间

PTransform

  • PTransform可以用方法链的形式调用
ParDo
  • ParDo(Parallel Do)类似Map/Shuffle/Reduce的Map,对每个元素执行变换函数,输出N个元素。
    可以逐个元素进行过滤、格式化/类型转换、提取字段、运算等处理。

  • DoFn缓存数据时注意不要依赖调用次数,调用次数是无法保证的
    lifecyclem没看明白???

GroupByKey
  • GroupByKey 类似Map/Shuffle/Reduce的Shuffle,multimap变uni-map
    因为需要全部数据,所以只能用于非全局窗口或聚合触发,
    对多个PCollection使用时,必须要求相同的开窗策略和窗口大小,否则不能对各PCollection同时处理
CoGroupByKey
  • CoGroupByKey 类似join,把两个集合按key聚合,把value串起来
    没看到multimap会怎样,应该先调GroupByKey
Combine
  • Combine 一种聚合操作,将集合中的元素结合,如sum、max、min等
    简单的可以只实现1个函数,复杂的需要实现4个函数:

    • createAccumulator:创建保存累积结果的本地实例
    • addInput:累积输入,增加单个元素到累积结果中
    • mergeAccumulators:合并各累积实例,如多个worker的实例
    • extractOutput:提取要输出的结果
  • Combine.globally 输出的结果集只有一个元素
    .withoutDefaults() 如果输入是空数据集,则输出空的结果集,不会生成缺省值的结果集

  • Combine 对非全局窗口有两个选项,必须指定一种
    .withoutDefaults
    .asSingletonView:provide a default value for each empty window when used as a side input没看懂???

  • Combine.<key类型, val类型, 结果类型>perKey(CombineFn) 按key对val进行combine

Flatten
  • Flatten 合并多个PCollection为一个,类似python list的extend效果
    merge多个PCollection时,要求各PCollection的开窗策略和窗口大小compatible,应该是因为只有这样才能同时处理各集合
Partition
  • Partition 将一个PCollection分成固定个数的数据集
  • 个数必须要在图构建时确定下来:可以在构建前传参调整,运行时不能修改
  • PartitionFn 返回的是PCollectionList的下标,确定元素归属结果中的哪个PCollection

transforms编写须知

  • 转换函数会存在多份,同时运行在不同机器上,相互之间独立,没有通信或共享状态

  • 函数对象必须是可序列化的,这样才能发往其他机器

    • 函数对象中的Transient fields不会被传输,因为不会被自动序列化
      • 可以重新计算出来,或者不想序列化的字段可以用transient关键字标记
    • 避免在序列化之前向字段加载大量数据
    • 函数对象apply后再改动是无效的(理解为传值,不是传址)
    • Take care when declaring your function object inline by using an anonymous inner class instance. In a non-static context, your inner class instance will implicitly contain a pointer to the enclosing class and that class’ state. That enclosing class will also be serialized, and thus the same considerations that apply to the function object itself also apply to this outer class
      这条太复杂,不想踩坑就不要在匿名内部类里声明函数对象???
  • 函数对象必须是thread-compatible,因为Beam SDK不是线程安全的
    线程安全五个等级:

    • immutable 不可变对象
    • thread-safe 线程安全的,可以放心使用,如java.util.Timer
    • conditionally thread-safe 条件线程安全的,如Vector和Hashtable,一般是安全的,除非存在几个方法调用之间的顺序不能被打断,这时可以用额外的锁来完成
    • thread-compatible 可以使用synchronized (objectReference)来协助完成对线程的调用
    • thread-hostile 不安全的

    一个函数对象实例只会被一个worker的单个线程使用,除非自己建线程,这时就要自己负责同步

  • 函数对象最好是幂等的,不知道会调用多少次

Side inputs

  • 是ParDo的另一种形式的输入,每次处理PCollection的元素时,DoFn都可以访问SideInputs

  • 当ParDo处理时需要额外数据,而且该数据不能写死,需要从输入或其他pipeline分支生成时,用SideInputs

  • 对于开窗的PCollection,每个windows有一个PCollectionView

  • 如果主输入和side inputs窗口一致,两者的窗口一一对应,就可以直接找到相应的side input窗口

  • 如果不一致,会根据主输入元素的窗口去找side input的合适的窗口。比如主窗口是1分钟的固定窗口,side input是1小时的固定窗口,则主输入元素对找相应小时的side input窗口

  • 如果主输入的元素在多个窗口中,每个窗口中都会调用一次processElement,每次找到side input窗口可能不一样

  • 如果side input有多个trigger,beam选择最新的一个

Additional outputs

  • ParDo可以有一个主输出和多个额外输出,在DoFn里根据条件选择不同的集合输出
	// Specify the tag for the main output..withOutputTags(wordsBelowCutOffTag,// Specify the tags for the two additional outputs as a TupleTagList.TupleTagList.of(wordLengthsAboveCutOffTag).and(markedWordsTag)));
  • DoFn还有其他参数:

    • @Timestamp Instant timestamp
    • Window: 要与PCollection的窗口匹配,如果元素落入多个窗口,每个窗口会被调用处理一次
    • PaneInfo:使用trigger时可以通过PaneInfo获取当前触发的信息。Using PaneInfo you can determine whether this is an early or a late firing, and how many times this window has already fired for this key.
    • PipelineOptions
  • @OnTimer 具体怎么用还没看???

Pipeline I/O

  • 支持通配符
    如:TextIO.read().from(“protocol://my_bucket/path/to/input-*.csv”));
    获取的PCollection的集合(或者元素是List)?再使用Flatten转换为一个PCollection

  • 文件输出默认输出多个文件,可以添加前缀、后缀,中间自动添加数字(应该有格式化的方法)

Schemas

Schemas provide us a type-system for Beam records that is independent of any specific programming-language type.

如果不同类的字段相同,通过Schema,Beam可以无缝转换这些类的对象。

  • 使用注解
    @DefaultSchema(JavaBeanSchema.class)
    @SchemaCreate

  • 支持原始类型、集合(ARRAY、ITERABLE、MAP)和嵌套

  • 通过继承LogicalType扩展schema类型,可以做为field的类型
    枚举、OneOf(联合union)

  • 使用方便,可以直接用字段名访问,支持内嵌字段、通配

  • 可以增删、重命名schema字段

windowing

  • 默认只有一个全局窗口

  • 对于无界数据,至少采用一种措施

    • 设置一个非全局窗口
    • 设置一个触发器
  • 设置窗口后,对后面的非窗口类的Transform无效,直到一个需要窗口的Transform才有效

  • 一个元素可能属于多个窗口,比如滑动窗口会创建重叠的窗口

  • Fixed time窗口是开闭区间,即[开始时间,开始时间+时长)

  • Sliding time窗口,参数开窗间隔和窗口时长,对周期性统计比较有用
    是不是开窗周期和时长相同就和Fixed time一样了?

  • Session窗口,通过元素间的时间间隔划分窗口

  • global窗口,一般用于有限数据源
    无界不做聚合类操作应该也可以用

  • .withAllowedLateness可以延长窗口关闭的时间

  • .outputWithTimestamp增加时间戳

Triggers

  • 默认在windows时关闭时触发

  • 默认几种触发器

    • 基于事件时间,默认触发器
    • 基于处理时间
    • 数据驱动:数据满足一定条件触发
    • 以上组合
  • 触发器还提供两个额外的能力

    • 允许提交早期结果:在所有数据到达前,如一定时间或一定数据
    • 允许处理延迟的数据
  • AfterWatermark基于事件时间触发,watermark超过窗口结束时触发,然后每次延迟数据到来时再触发
    watermark的行为怎么理解???

    • withEarlyFirings 用来提前供预估
    • withLateFirings 用来后续修正
  • AfterProcessingTime基于处理时间触发
    pastFirstElementInPane 数据到达后的一段时间触发

  • AfterPane基于数据驱动触发
    elementCountAtLeast:元素个数接收到一定个数后触发,不够数量不会触发

  • pane:每次触发器提交的数据

  • 设置触发器时,必须同时设置窗口的累积模式

    • accumulatingFiredPanes 保留历史数据
    • discardingFiredPanes 不保存历史数据
  • withAllowedLateness影响后续转换生成的PCollection,需要显示调用Window.configure().withAllowedLateness()修改

  • 组合触发器

    • AfterWatermark.pastEndOfWindow与.withEarlyFirings/.withLateFirings
    • Repeatedly.forever 一直触发,可以搭配.orFinally做退出
    • AfterEach.inOrder 顺序执行各触发器
    • AfterFirst 各触发器有一个满足就执行,就是多个触发器的逻辑“或”
    • AfterAll 所有触发器全满足才执行,就是多个触发器的逻辑“与”
    • orFinally 触发一次后不再触发
  .apply(Window.configure().triggering(AfterWatermark // 基于时间时间.pastEndOfWindow()   // BEAM估计数据已经全到了(watermark超出windows).withLateFirings(AfterProcessingTime // 基于处理时间修正延迟的数据.pastFirstElementInPane() // 接收到数据后,延迟10分钟触发.plusDelayOf(Duration.standardMinutes(10)))).withAllowedLateness(Duration.standardDays(2))); // 留2天时间处理延迟数据,2天后彻底关闭窗口

Metrics

  • 用于提供一些后台信息

    • 检查错误数量
    • 监测RPC调用次数
    • 获取当前处理的元素数量
  • 指标的名称由命名空间和名字组成,命名空间可以避免重名,也可以查询整个命名空间的指标

  • 每个指标都有它的作用范围,表明在执行pipeline的哪个步骤,哪段代码在运行

  • 指标不需要提前声明,可以在运行时创建

  • 如果后端不支持某个上报指标,可以忽略,不会导致pipeline失败,如果不支持某个查询指标,可以只返回支持的部分

  • 目前有三种指标类型

    • Counter:只能表示一个计数,long型,可增减
    • Distribution:值的分布情况(直方图)
    • Gauge:获取正在处理的最新值之一(因为有多个worker同时在执行)
  • 度量指标可以导出到外部,用MetricsOptions配置,默认5秒输出一次

State and Timers

  • 为开发人员提供手工管理每个key状态,可以在聚合方面提供更细粒度的控制

  • state API按key存储状态,数据集需要时PCollection<KV<K, V>>类型

  • ParDo可以声明状态变量,并赋值及更新,状态只对当前处理的key可见

  • 开窗的情况下, 第一个key读到的状态是空的,当窗口关闭时会进行gc

  • 如果状态处理用于在DoFn内实现状态机,需要注意元素的顺序是不能保证的

  • 状态的类型

    • ValueState 标量状态值,可以被读写
    • CombiningState 也是保存一个值,写的时候通过Combiner(如sum、max等)生成合适的值更新
    • BagState 可以用于保存处理过的元素
  • state.read()会导致runner阻塞,多个state顺序读取时可能增大延迟
    通过@AlwaysFetched预取状态
    如果有代码分支不需要state时,@AlwaysFetched会增加不必要的预取,可以通过readLater异步读取,让runnrer在后面一起批量读取

Timers

  • Beam支持per-key定时回调API

  • 一个定时器只能设置一个时间戳,后面设置的覆盖前面设置的

  • Event-time定时器可以用于基于事件时间的聚合

  • Processing-time定时器一般用于创建大批数据,也可用于定时触发事件,可以设置绝对时间和相对时间

  • 动态定时器,通过TimerMap可以设置多个不同的定时器,可以根据定时器标签动态选择

  • Timer output timestamps 没看懂???

  • state的GC

    • 窗口关闭state即被回收
    • 用定时器回收

执行模型

https://beam.apache.org/documentation/runtime/model

  • 元素的序列化和传输是分布式执行中代价最高的操之一
    避免的方法:失败后在本地重新处理,限制输出分发到其他机器

  • 传输元素的原因

    • 分组操作时需要把元素路由到相应的worker上
    • 重新分发元素调整并发
    • 把元素广播到所有worker
    • 同一worker的转换之间可能可以避免序列化,之间传内存中的元素
  • 元素持久化的原因

    • 有状态的DoFn,需要保存一些状态
    • 输出处理结果时,需要保存为checkpoint
  • 并行的尴尬:不能顺序执行(如给PCollection里每个元素顺序编号),不能全量操作(如把所有元素输出或保存检查点状态)

  • 分批处理:由runner划分,流处理选小批,批处理选大批

  • 一批由一个worker执行,多个worker可以并行

  • 如果单个转换失败,可能由其他worker重新执行

  • 如果多个转换失败,一般由当前worker重新执行失败的操作,可以避免转换间持久化的代价

WordCount示例

https://beam.apache.org/get-started/wordcount-example/

  • Pipeline
    Pipeline用来定义处理流程,可以通过PipelineOptions定义runner等
    其描述了由PCollection为节点,PTransform为边组成的DAG。

  • PipelineOptions
    可以指定runner等
    runner有多种:Direct(本地),Spark等

  • PCollection
    PCollection.apply(PTransform),设置处理实例

  • TextIO.read().from(文件) 读文件生成PCollection,每个元素为一行数据

  • TextIO.write().to(?)

  • FlatMapElements.into(结果类型).via(指定函数) 一种PTransform,每个元素执行指定的函数,结果不保持分组,所有元素重新组成集合[kv,kv,kv,kv]

  • MapElements.into(结果类型).via(指定函数),结果保持输入的分组,[ [kv,kv], [kv, kv]]
    结果类型用TypeDescriptors创建,如

TypeDescriptors.strings()
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
  • Count.<key类型>perElement() 一种PTransform,将PCollection按key计数,结果集为key/value(整型,计数)
  • p.run().waitUntilFinish(): run()是异步,用waitUntilFinish()阻塞
  • 自定义DoFn<KeyType, ValType>,由Transform调用
static class ExtractWordsFn extends DoFn<String, String> {...@ProcessElementpublic void processElement(ProcessContext c) {...}
}
  • 自定义PTransform,组合多个转换,更好的模块化和复用
    PTransform<输入集合类型,输出集合类型>
public static class CountWords extends PTransform<PCollection<String>,PCollection<KV<String, Long>>> {@Overridepublic PCollection<KV<String, Long>> expand(PCollection<String> lines) {// Convert lines of text into individual words.PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));// Count the number of times each word occurs.PCollection<KV<String, Long>> wordCounts =words.apply(Count.<String>perElement());return wordCounts;}
}
  • ParDo.of(new DoFn()) 通过DoFn快速创建一个PTransform

  • 调试两张方法

    • 日志: 需要Slf4j
    • PAssert:小规模测试数据集
  • window: 无界数据没有结束,需要定义处理的范围,即将流数据转换为批数据,流变成N个窗口
    Window.<数据类型>into(FixedWindows.of(窗口时间))

    • FixedWindows:固定窗口
    • 滑动窗口
    • 会话 窗口
  • 时间戳:PCollection中每个元素都有时间戳,由创建PCollection的源赋值,可以使用数据自带或处理时间等

Mobile Gaming示例

https://beam.apache.org/get-started/mobile-gaming-example

  • skew 时间差,事件产生到处理的时间差

  • KV.of(gInfo.getKey(field), gInfo.getScore())) 创建KV实例

  • Sum.integersPerKey() 按key对value(整型)求和

  • WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp()))) 为元素增加时间戳

  • Filter.by(过滤函数) 过滤元素

  • GlobalWindows 全局窗口能处理从开始到当前的所有数据,其他窗口如FixedWindows只能处理一段时间的数据

  • Window.triggering(Trigger)

Window.<GameActionInfo>into(new GlobalWindows())// Get periodic results every ten minutes..triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_MINUTES))).accumulatingFiredPanes().withAllowedLateness(allowedLateness))

通过触发器调用accumulatingFiredPanes

Repeatedly.forever 一直执行
Create a composite trigger that repeatedly executes the trigger repeated, firing each time it fires and ignoring any indications to finish.

AfterProcessingTime
A Trigger trigger that fires at a specified point in processing time, relative to when input first arrives.

AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_MINUTES)
第一个元素到达后的十分钟,用ProcessingTime计算

  • accumulatingFiredPanes()
    Returns a new Window PTransform that uses the registered WindowFn and Triggering behavior, and that accumulates elements in a pane after they are triggered.
    累积触发的窗格,即通过触发器将数据细分为窗格,可以保存之前窗格的数据,与后来的数据累积计算

  • withAllowedLateness()
    默认情况下,当watermark通过end-of-window之后,再有之前的数据到达时,这些数据会被删除。
    为了避免有些迟到的数据被删除,因此产生了allowedLateness的概念。
    简单来讲,allowedLateness就是针对event time而言,对于watermark超过end-of-window之后,还允许有一段时间(也是以event time来衡量)来等待之前的数据到达,以便再次处理这些数据。

对于trigger是默认的EventTimeTrigger的情况下,allowedLateness会再次触发窗口的计算,而之前触发的数据会buffer起来,直到watermark超过end-of-window + allowedLateness的时间,窗口的数据及元数据信息才会被删除

问题:全局窗口没有watermakr,allowedLateness有什么用???

  • 启发式水印
Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))// We will get early (speculative) results as well as cumulative// processing of late data..triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES)).withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_MINUTES))).withAllowedLateness(allowedLateness).accumulatingFiredPanes()

AfterWatermark.pastEndOfWindow()
Creates a trigger that fires when the watermark passes the end of the window.

.withEarlyFirings() watermark到达窗口结束前的某个点触发
Creates a new Trigger like the this, except that it fires repeatedly whenever the given Trigger fires before the watermark has passed the end of the window.

.withLateFirings() watermark达到窗口结束后的某个点触发
Creates a new Trigger like the this, except that it fires repeatedly whenever the given Trigger fires after the watermark has passed the end of the window.

withAllowedLateness和accumulatingFiredPanes调用顺序有什么影响??

  • Values.create() 从KV中提取value

  • Mean.globally().asSingletonView() 对全部值计算平均值
    Returns a PTransform that produces a PCollectionView whose elements are the result of combining elements per-window in the input PCollection.

  • Mean.globally().withoutDefaults()
    Returns a PTransform identical to this, but that does not attempt to provide a default value in the case of empty input.

  • Metrics.counter(namespace, name) Create a metric that can be incremented and decremented, and is aggregated by taking the sum

  • withSideInputs(PCollectionView) 会将值(PCollectionView类型)广播给其他需要的worker
    c.sideInput(globalMeanScore) 使用sideInput的值

问题:两个数据是异步的,平均值是变化的,多次运行结果可能不一致???

  • Sessions.withGapDuration(间隔时长)
    在间隔时长内新数据则认为会话结束,新开窗口,否则数据继续在原窗口里处理

  • withTimestampCombiner(TimestampCombiner.END_OF_WINDOW) 用窗口结束时间作为输出的时间戳

  • Combine.perKey(x -> 0) 不关心其他数据,只要key,value直接填0,(可能是因为只有KV,没有List/Array)

  • DoFn还有带窗口信息的成员函数void processElement(ProcessContext c, BoundedWindow window)

  /** Calculate and output an element's session duration. */private static class UserSessionInfoFn extends DoFn<KV<String, Integer>, Integer> {@ProcessElementpublic void processElement(ProcessContext c, BoundedWindow window) {IntervalWindow w = (IntervalWindow) window;int duration = new Duration(w.start(), w.end()).toPeriod().toStandardMinutes().getMinutes();c.output(duration);}}
代码编程
赞赏

相关文章

数商云食品行业解决方案:新技术加持食品行业,为企业快速发展提供有力支撑
星期零斩获2021新消费领域“投资界50强企业”和“中国食品工业创新品牌”奖项
2022-2028年中国食品产业园区行业市场运行格局及发展策略分析报告
工业互联网业务知识
劲牌公司荣获2020年度“中国食品工业协会科学技术奖”特等奖
面试3_:不修改数组找出重复的数字