问题描述
使用 Go SDK for Apache Beam,我正在尝试使用侧输入创建 PCollection 的视图.
Using the Go SDK for Apache Beam, I'm trying to create a view of a PCollection using a side input.
但我收到了这个奇怪的错误:
But I'm getting this weird error:
Failed to execute job: on ctx= making side input 0:
singleton side input Singleton for int ill-defined
exit status 1
这里是我使用的代码:
// A PCollection of key/value pairs
pairedWithOne := beam.ParDo(s, func(r models.Review) (string, int) {
return r.DoRecommend, 1
}, col)
// A PCollection of ints (demo)
pcollInts := beam.CreateList(s, [3]int{
1, 2, 3,
})
// A PCollection of key/values pairs
summed := stats.SumPerKey(s, pairedWithOne)
// Here is where I'd like to use my side input.
mapped := beam.ParDo(s, func(k string, v int, side int, emit func(ratio
models.RecommendRatio)) {
var ratio = models.RecommendRatio{
DoRecommend: k,
NumVotes: v,
}
emit(ratio)
}, summed, beam.SideInput{Input: pcollInts})
我在 git:
I found this example on git:
// Side Inputs
//
// While a ParDo processes elements from a single "main input" PCollection, it
// can take additional "side input" PCollections. These SideInput along with
// the DoFn parameter form express styles of accessing PCollection computed by
// earlier pipeline operations, passed in to the ParDo transform using SideInput
// options, and their contents accessible to each of the DoFn operations. For
// example:
//
// words := ...
// cufoff := ... // Singleton PCollection<int>
// smallWords := beam.ParDo(s, func (word string, cutoff int, emit func(string)) {
// if len(word) < cutoff {
// emit(word)
// }
// }, words, beam.SideInput{Input: cutoff})
更新: 似乎 Impulse(scope)
函数在这里有作用,但我不知道是什么.来自 GoDoc:
update: It seems like the Impulse(scope)
function has a role here but I cannot figure what. From GoDoc :
Impulse emits a single empty []byte into the global window. The resulting PCollection is a singleton of type []byte.
The purpose of Impulse is to trigger another transform, such as ones that take all information as side inputs.
如果这有帮助,这里是我的结构:
If this can help, here my structs:
type Review struct {
Date time.Time `csv:"date" json:"date"`
DoRecommend string `csv:"doRecommend" json:"doRecommend"`
NumHelpful int `csv:"numHelpful" json:"numHelpful"`
Rating int `csv:"rating" json:"rating"`
Text string `csv:"text" json:"text"`
Title string `csv:"title" json:"title"`
Username string `csv:"username" json:"username"`
}
type RecommendRatio struct {
DoRecommend string `json:"doRecommend"`
NumVotes int `json:"numVotes"`
}
有什么解决办法吗?
谢谢
推荐答案
更新:
这可以通过删除beam.Impulse()
函数来简化(我认为错误的类型导致了这里的麻烦):
This can be simplified by removing the beam.Impulse()
function (I think the wrong type caused the trouble here):
mapped := beam.ParDo(s,
func(k string, v int,
sideCounted int,
emit func(ratio models.RecommendRatio)) {
p := percent.PercentOf(v, sideCounted)
emit(models.RecommendRatio{
DoRecommend: k,
NumVotes: v,
Percent: p,
})
}, summed,
beam.SideInput{Input: counted})
旧:我似乎找到了一个解决方案,也许只是一种解决方法,正在寻找快速审核并留出改进空间.(我相信该函数不是幂等的,因为如果它可能在多个节点工作器上执行多次,则 append() 函数将重复条目...)
Old:Seems like I've found a solution, maybe just a workaround, looking for a quick review and open to room for improvements. (I believe that function isnt idempotent because if it may executed more than once on multiple node workers, the append() function will duplicate entries...)
但这里的全局想法是使用 beam.Impulse(scope)
函数制作 []uint8 字节
的单例 PCollection 并传递所有真实"数据作为辅助输入.
But the global idea here is to make a singleton PCollection of a []uint8 byte
using beam.Impulse(scope)
function and pass all the "real" data as a side inputs.
// Pair each recommendation value with one -> PColl<KV<string, int>>
pairedWithOne := beam.ParDo(s, func(r models.Review) (string, int) {
return r.DoRecommend, 1
}, col)
// Sum num occurrences of a recommendation k/v pair
summed := stats.SumPerKey(s, pairedWithOne)
// Drop keys for latter global count
droppedKey := beam.DropKey(s, pairedWithOne)
// Count globally the number of recommendation values -> PColl<int>
counted := stats.Sum(s, droppedKey)
// Map to a struct with percentage per ratio
mapped := beam.ParDo(s,
func(_ []uint8,
sideSummed func(k *string, v *int) bool,
sideCounted int,
emit func(ratio []models.RecommendRatio)) {
var k string
var v int
var ratios []models.RecommendRatio
for sideSummed(&k, &v) {
p := percent.PercentOf(v, sideCounted)
ratio := models.RecommendRatio{
DoRecommend: k,
NumVotes: v,
Percent: p,
}
ratios = append(ratios, ratio)
}
emit(ratios)
}, beam.Impulse(s),
beam.SideInput{Input: summed},
beam.SideInput{Input: counted})
这篇关于Go SDK Apache Beam:单例侧输入 Singleton for int 定义不明确的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!