Dataflow: Look up a previous event in an event streamBeam/Dataflow Python: AttributeError: '_UnwindowedValues' object has no attribute 'sort'Element value based writing to Google Cloud Storage using DataflowCustom Apache Beam Python version in DataflowDataflow pipline “lost contact with the service”Apache beam Dataflow SDK error with exampleJava/Dataflow - Unable to use ClassLoader to detect classpath elementsAre there any runners supported for apache beam python besides google cloud dataflow?Java code outside of the pipeline won't run on DataflowApache Beam / DataFlow: streaming pipeline to terminate itselfDataFlow pipeline stuck on initializing tempLocation?
Why do ¬, ∀ and ∃ have the same precedence?
How do you make your own symbol when Detexify fails?
Circuit Analysis: Obtaining Close Loop OP - AMP Transfer function
What are some good ways to treat frozen vegetables such that they behave like fresh vegetables when stir frying them?
What to do when eye contact makes your coworker uncomfortable?
Why should universal income be universal?
I found an audio circuit and I built it just fine, but I find it a bit too quiet. How do I amplify the output so that it is a bit louder?
Will number of steps recorded on FitBit/any fitness tracker add up distance in PokemonGo?
A Trivial Diagnosis
What does Apple's new App Store requirement mean
Pre-mixing cryogenic fuels and using only one fuel tank
How could a planet have erratic days?
Is it allowed to activate the ability of multiple planeswalkers in a single turn?
Microchip documentation does not label CAN buss pins on micro controller pinout diagram
Are Captain Marvel's powers affected by Thanos breaking the Tesseract and claiming the stone?
Is my low blitz game drawing rate at www.chess.com an indicator that I am weak in chess?
What kind of floor tile is this?
What is the highest possible scrabble score for placing a single tile
How to get directions in deep space?
How can ping know if my host is down
C++ check if statement can be evaluated constexpr
How does electrical safety system work on ISS?
awk assign to multiple variables at once
Permission on Database
Dataflow: Look up a previous event in an event stream
Beam/Dataflow Python: AttributeError: '_UnwindowedValues' object has no attribute 'sort'Element value based writing to Google Cloud Storage using DataflowCustom Apache Beam Python version in DataflowDataflow pipline “lost contact with the service”Apache beam Dataflow SDK error with exampleJava/Dataflow - Unable to use ClassLoader to detect classpath elementsAre there any runners supported for apache beam python besides google cloud dataflow?Java code outside of the pipeline won't run on DataflowApache Beam / DataFlow: streaming pipeline to terminate itselfDataFlow pipeline stuck on initializing tempLocation?
Resuming what I'm looking for to do with Apache Beam in Google Dataflow is something like LAG in the Azure Stream Analytics
Using a window of X minutes where I'm receiving data:
|||||| |||||| |||||| |||||| |||||| ||||||
| 1 | | 2 | | 3 | | 4 | | 5 | | 6 |
|id=x| |id=x| |id=x| |id=x| |id=x| |id=x|
|||||| ,|||||| ,|||||| ,|||||| ,|||||| ,|||||| , ...
I need to compare the data(n) with data(n-1), for example, following with the previous example, it will be something like this:
if data(6) inside and data(5) outside then ...
if data(5) inside and data(4) outside then ...
if data(4) inside and data(3) outside then ...
if data(3) inside and data(2) outside then ...
if data(2) inside and data(1) outside then ...
Is there any "practical "way to do this?
python google-cloud-platform google-cloud-dataflow apache-beam
add a comment |
Resuming what I'm looking for to do with Apache Beam in Google Dataflow is something like LAG in the Azure Stream Analytics
Using a window of X minutes where I'm receiving data:
|||||| |||||| |||||| |||||| |||||| ||||||
| 1 | | 2 | | 3 | | 4 | | 5 | | 6 |
|id=x| |id=x| |id=x| |id=x| |id=x| |id=x|
|||||| ,|||||| ,|||||| ,|||||| ,|||||| ,|||||| , ...
I need to compare the data(n) with data(n-1), for example, following with the previous example, it will be something like this:
if data(6) inside and data(5) outside then ...
if data(5) inside and data(4) outside then ...
if data(4) inside and data(3) outside then ...
if data(3) inside and data(2) outside then ...
if data(2) inside and data(1) outside then ...
Is there any "practical "way to do this?
python google-cloud-platform google-cloud-dataflow apache-beam
add a comment |
Resuming what I'm looking for to do with Apache Beam in Google Dataflow is something like LAG in the Azure Stream Analytics
Using a window of X minutes where I'm receiving data:
|||||| |||||| |||||| |||||| |||||| ||||||
| 1 | | 2 | | 3 | | 4 | | 5 | | 6 |
|id=x| |id=x| |id=x| |id=x| |id=x| |id=x|
|||||| ,|||||| ,|||||| ,|||||| ,|||||| ,|||||| , ...
I need to compare the data(n) with data(n-1), for example, following with the previous example, it will be something like this:
if data(6) inside and data(5) outside then ...
if data(5) inside and data(4) outside then ...
if data(4) inside and data(3) outside then ...
if data(3) inside and data(2) outside then ...
if data(2) inside and data(1) outside then ...
Is there any "practical "way to do this?
python google-cloud-platform google-cloud-dataflow apache-beam
Resuming what I'm looking for to do with Apache Beam in Google Dataflow is something like LAG in the Azure Stream Analytics
Using a window of X minutes where I'm receiving data:
|||||| |||||| |||||| |||||| |||||| ||||||
| 1 | | 2 | | 3 | | 4 | | 5 | | 6 |
|id=x| |id=x| |id=x| |id=x| |id=x| |id=x|
|||||| ,|||||| ,|||||| ,|||||| ,|||||| ,|||||| , ...
I need to compare the data(n) with data(n-1), for example, following with the previous example, it will be something like this:
if data(6) inside and data(5) outside then ...
if data(5) inside and data(4) outside then ...
if data(4) inside and data(3) outside then ...
if data(3) inside and data(2) outside then ...
if data(2) inside and data(1) outside then ...
Is there any "practical "way to do this?
python google-cloud-platform google-cloud-dataflow apache-beam
python google-cloud-platform google-cloud-dataflow apache-beam
edited 12 hours ago
IoT user
asked 12 hours ago
IoT userIoT user
15012
15012
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
With Beam, as explained in the docs, state is maintained per key and window. Therefore, you can't access values from previous windows.
To do what you want to do you might need a more complex pipeline design. My idea, developed here as an example, would be to duplicate your messages in a ParDo:
- Emitting them unmodified to the main output
- At the same time, send them to a side output with a one-window lag
To do the second bullet point, we can add the duration of a window (WINDOW_SECONDS) to the element timestamp:
class DuplicateWithLagDoFn(beam.DoFn):
def process(self, element, timestamp=beam.DoFn.TimestampParam):
# Main output gets unmodified element
yield element
# The same element is emitted to the side output with a 1-window lag added to timestamp
yield beam.pvalue.TaggedOutput('lag_output', beam.window.TimestampedValue(element, timestamp + WINDOW_SECONDS))
We call the function specifying the correct tags:
beam.ParDo(DuplicateWithLagDoFn()).with_outputs('lag_output', main='main_output')
and then apply the same windowing scheme to both, co-group by key, etc.
windowed_main = results.main_output | 'Window main output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))
windowed_lag = results.lag_output | 'Window lag output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))
merged = (windowed_main, windowed_lag) | 'Join Pcollections' >> beam.CoGroupByKey()
Finally, we can have both values (old and new) inside the same ParDo:
class CompareDoFn(beam.DoFn):
def process(self, element):
logging.info("Combined with previous vale: ".format(element))
try:
old_value = int(element[1][1][0].split(',')[1])
except:
old_value = 0
try:
new_value = int(element[1][0][0].split(',')[1])
except:
new_value = 0
logging.info("New value: , Old value: , Difference: ".format(new_value, old_value, new_value - old_value))
return (element[0], new_value - old_value)
To test this I run the pipeline with the direct runner and, on a separate shell, I publish two messages more than 10 seconds apart (in my case WINDOW_SECONDS was 10s):
gcloud pubsub topics publish lag --message="test,120"
sleep 12
gcloud pubsub topics publish lag --message="test,40"
And the job output shows the expected difference:
INFO:root:New message: (u'test', u'test,120')
INFO:root:Combined with previous vale: (u'test', ([u'test,120'], []))
INFO:root:New value: 120, Old value: 0, Difference: 120
INFO:root:New message: (u'test', u'test,40')
INFO:root:Combined with previous vale: (u'test', ([u'test,40'], [u'test,120']))
INFO:root:New value: 40, Old value: 120, Difference: -80
INFO:root:Combined with previous vale: (u'test', ([], [u'test,40']))
INFO:root:New value: 0, Old value: 40, Difference: -40
Full code for my example here. Take into account performance considerations as you are duplicating elements but it makes sense if you need to have values available during two windows.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55279946%2fdataflow-look-up-a-previous-event-in-an-event-stream%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
With Beam, as explained in the docs, state is maintained per key and window. Therefore, you can't access values from previous windows.
To do what you want to do you might need a more complex pipeline design. My idea, developed here as an example, would be to duplicate your messages in a ParDo:
- Emitting them unmodified to the main output
- At the same time, send them to a side output with a one-window lag
To do the second bullet point, we can add the duration of a window (WINDOW_SECONDS) to the element timestamp:
class DuplicateWithLagDoFn(beam.DoFn):
def process(self, element, timestamp=beam.DoFn.TimestampParam):
# Main output gets unmodified element
yield element
# The same element is emitted to the side output with a 1-window lag added to timestamp
yield beam.pvalue.TaggedOutput('lag_output', beam.window.TimestampedValue(element, timestamp + WINDOW_SECONDS))
We call the function specifying the correct tags:
beam.ParDo(DuplicateWithLagDoFn()).with_outputs('lag_output', main='main_output')
and then apply the same windowing scheme to both, co-group by key, etc.
windowed_main = results.main_output | 'Window main output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))
windowed_lag = results.lag_output | 'Window lag output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))
merged = (windowed_main, windowed_lag) | 'Join Pcollections' >> beam.CoGroupByKey()
Finally, we can have both values (old and new) inside the same ParDo:
class CompareDoFn(beam.DoFn):
def process(self, element):
logging.info("Combined with previous vale: ".format(element))
try:
old_value = int(element[1][1][0].split(',')[1])
except:
old_value = 0
try:
new_value = int(element[1][0][0].split(',')[1])
except:
new_value = 0
logging.info("New value: , Old value: , Difference: ".format(new_value, old_value, new_value - old_value))
return (element[0], new_value - old_value)
To test this I run the pipeline with the direct runner and, on a separate shell, I publish two messages more than 10 seconds apart (in my case WINDOW_SECONDS was 10s):
gcloud pubsub topics publish lag --message="test,120"
sleep 12
gcloud pubsub topics publish lag --message="test,40"
And the job output shows the expected difference:
INFO:root:New message: (u'test', u'test,120')
INFO:root:Combined with previous vale: (u'test', ([u'test,120'], []))
INFO:root:New value: 120, Old value: 0, Difference: 120
INFO:root:New message: (u'test', u'test,40')
INFO:root:Combined with previous vale: (u'test', ([u'test,40'], [u'test,120']))
INFO:root:New value: 40, Old value: 120, Difference: -80
INFO:root:Combined with previous vale: (u'test', ([], [u'test,40']))
INFO:root:New value: 0, Old value: 40, Difference: -40
Full code for my example here. Take into account performance considerations as you are duplicating elements but it makes sense if you need to have values available during two windows.
add a comment |
With Beam, as explained in the docs, state is maintained per key and window. Therefore, you can't access values from previous windows.
To do what you want to do you might need a more complex pipeline design. My idea, developed here as an example, would be to duplicate your messages in a ParDo:
- Emitting them unmodified to the main output
- At the same time, send them to a side output with a one-window lag
To do the second bullet point, we can add the duration of a window (WINDOW_SECONDS) to the element timestamp:
class DuplicateWithLagDoFn(beam.DoFn):
def process(self, element, timestamp=beam.DoFn.TimestampParam):
# Main output gets unmodified element
yield element
# The same element is emitted to the side output with a 1-window lag added to timestamp
yield beam.pvalue.TaggedOutput('lag_output', beam.window.TimestampedValue(element, timestamp + WINDOW_SECONDS))
We call the function specifying the correct tags:
beam.ParDo(DuplicateWithLagDoFn()).with_outputs('lag_output', main='main_output')
and then apply the same windowing scheme to both, co-group by key, etc.
windowed_main = results.main_output | 'Window main output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))
windowed_lag = results.lag_output | 'Window lag output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))
merged = (windowed_main, windowed_lag) | 'Join Pcollections' >> beam.CoGroupByKey()
Finally, we can have both values (old and new) inside the same ParDo:
class CompareDoFn(beam.DoFn):
def process(self, element):
logging.info("Combined with previous vale: ".format(element))
try:
old_value = int(element[1][1][0].split(',')[1])
except:
old_value = 0
try:
new_value = int(element[1][0][0].split(',')[1])
except:
new_value = 0
logging.info("New value: , Old value: , Difference: ".format(new_value, old_value, new_value - old_value))
return (element[0], new_value - old_value)
To test this I run the pipeline with the direct runner and, on a separate shell, I publish two messages more than 10 seconds apart (in my case WINDOW_SECONDS was 10s):
gcloud pubsub topics publish lag --message="test,120"
sleep 12
gcloud pubsub topics publish lag --message="test,40"
And the job output shows the expected difference:
INFO:root:New message: (u'test', u'test,120')
INFO:root:Combined with previous vale: (u'test', ([u'test,120'], []))
INFO:root:New value: 120, Old value: 0, Difference: 120
INFO:root:New message: (u'test', u'test,40')
INFO:root:Combined with previous vale: (u'test', ([u'test,40'], [u'test,120']))
INFO:root:New value: 40, Old value: 120, Difference: -80
INFO:root:Combined with previous vale: (u'test', ([], [u'test,40']))
INFO:root:New value: 0, Old value: 40, Difference: -40
Full code for my example here. Take into account performance considerations as you are duplicating elements but it makes sense if you need to have values available during two windows.
add a comment |
With Beam, as explained in the docs, state is maintained per key and window. Therefore, you can't access values from previous windows.
To do what you want to do you might need a more complex pipeline design. My idea, developed here as an example, would be to duplicate your messages in a ParDo:
- Emitting them unmodified to the main output
- At the same time, send them to a side output with a one-window lag
To do the second bullet point, we can add the duration of a window (WINDOW_SECONDS) to the element timestamp:
class DuplicateWithLagDoFn(beam.DoFn):
def process(self, element, timestamp=beam.DoFn.TimestampParam):
# Main output gets unmodified element
yield element
# The same element is emitted to the side output with a 1-window lag added to timestamp
yield beam.pvalue.TaggedOutput('lag_output', beam.window.TimestampedValue(element, timestamp + WINDOW_SECONDS))
We call the function specifying the correct tags:
beam.ParDo(DuplicateWithLagDoFn()).with_outputs('lag_output', main='main_output')
and then apply the same windowing scheme to both, co-group by key, etc.
windowed_main = results.main_output | 'Window main output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))
windowed_lag = results.lag_output | 'Window lag output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))
merged = (windowed_main, windowed_lag) | 'Join Pcollections' >> beam.CoGroupByKey()
Finally, we can have both values (old and new) inside the same ParDo:
class CompareDoFn(beam.DoFn):
def process(self, element):
logging.info("Combined with previous vale: ".format(element))
try:
old_value = int(element[1][1][0].split(',')[1])
except:
old_value = 0
try:
new_value = int(element[1][0][0].split(',')[1])
except:
new_value = 0
logging.info("New value: , Old value: , Difference: ".format(new_value, old_value, new_value - old_value))
return (element[0], new_value - old_value)
To test this I run the pipeline with the direct runner and, on a separate shell, I publish two messages more than 10 seconds apart (in my case WINDOW_SECONDS was 10s):
gcloud pubsub topics publish lag --message="test,120"
sleep 12
gcloud pubsub topics publish lag --message="test,40"
And the job output shows the expected difference:
INFO:root:New message: (u'test', u'test,120')
INFO:root:Combined with previous vale: (u'test', ([u'test,120'], []))
INFO:root:New value: 120, Old value: 0, Difference: 120
INFO:root:New message: (u'test', u'test,40')
INFO:root:Combined with previous vale: (u'test', ([u'test,40'], [u'test,120']))
INFO:root:New value: 40, Old value: 120, Difference: -80
INFO:root:Combined with previous vale: (u'test', ([], [u'test,40']))
INFO:root:New value: 0, Old value: 40, Difference: -40
Full code for my example here. Take into account performance considerations as you are duplicating elements but it makes sense if you need to have values available during two windows.
With Beam, as explained in the docs, state is maintained per key and window. Therefore, you can't access values from previous windows.
To do what you want to do you might need a more complex pipeline design. My idea, developed here as an example, would be to duplicate your messages in a ParDo:
- Emitting them unmodified to the main output
- At the same time, send them to a side output with a one-window lag
To do the second bullet point, we can add the duration of a window (WINDOW_SECONDS) to the element timestamp:
class DuplicateWithLagDoFn(beam.DoFn):
def process(self, element, timestamp=beam.DoFn.TimestampParam):
# Main output gets unmodified element
yield element
# The same element is emitted to the side output with a 1-window lag added to timestamp
yield beam.pvalue.TaggedOutput('lag_output', beam.window.TimestampedValue(element, timestamp + WINDOW_SECONDS))
We call the function specifying the correct tags:
beam.ParDo(DuplicateWithLagDoFn()).with_outputs('lag_output', main='main_output')
and then apply the same windowing scheme to both, co-group by key, etc.
windowed_main = results.main_output | 'Window main output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))
windowed_lag = results.lag_output | 'Window lag output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))
merged = (windowed_main, windowed_lag) | 'Join Pcollections' >> beam.CoGroupByKey()
Finally, we can have both values (old and new) inside the same ParDo:
class CompareDoFn(beam.DoFn):
def process(self, element):
logging.info("Combined with previous vale: ".format(element))
try:
old_value = int(element[1][1][0].split(',')[1])
except:
old_value = 0
try:
new_value = int(element[1][0][0].split(',')[1])
except:
new_value = 0
logging.info("New value: , Old value: , Difference: ".format(new_value, old_value, new_value - old_value))
return (element[0], new_value - old_value)
To test this I run the pipeline with the direct runner and, on a separate shell, I publish two messages more than 10 seconds apart (in my case WINDOW_SECONDS was 10s):
gcloud pubsub topics publish lag --message="test,120"
sleep 12
gcloud pubsub topics publish lag --message="test,40"
And the job output shows the expected difference:
INFO:root:New message: (u'test', u'test,120')
INFO:root:Combined with previous vale: (u'test', ([u'test,120'], []))
INFO:root:New value: 120, Old value: 0, Difference: 120
INFO:root:New message: (u'test', u'test,40')
INFO:root:Combined with previous vale: (u'test', ([u'test,40'], [u'test,120']))
INFO:root:New value: 40, Old value: 120, Difference: -80
INFO:root:Combined with previous vale: (u'test', ([], [u'test,40']))
INFO:root:New value: 0, Old value: 40, Difference: -40
Full code for my example here. Take into account performance considerations as you are duplicating elements but it makes sense if you need to have values available during two windows.
answered 3 hours ago
Guillem XercavinsGuillem Xercavins
2,0751419
2,0751419
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55279946%2fdataflow-look-up-a-previous-event-in-an-event-stream%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown