2 #include <hpda/processor/processor_base.h>
3 #include <hpda/processor/set/helper/copy_helper.h>
4 #include <hpda/processor/set/helper/upper_stream_helper.h>
10 template <
typename OutputObjType,
typename CT>
17 template <
typename KT,
typename InputObjType>
18 void add_upper_stream(
21 InputObjType>::add_upper_stream(upper_stream,
29 bool has_difference() {
30 CT val_0 = m_traits[0](m_upper_streams[0]);
32 for (
size_t i = 1; i < m_upper_streams.size(); i++) {
33 if (m_upper_streams[i]->has_value()) {
34 CT val_i = m_traits[i](m_upper_streams[i]);
35 val_min = std::min(val_min, val_i);
38 bool flag_min =
false;
39 for (
size_t i = 1; i < m_upper_streams.size(); i++) {
40 if (m_upper_streams[i]->has_value()) {
41 CT val_i = m_traits[i](m_upper_streams[i]);
42 if (val_i == val_min) {
44 m_upper_streams[i]->reset_done_value();
49 if (val_0 == val_min) {
50 m_upper_streams[0]->reset_done_value();
55 m_filler[0](m_upper_streams[0], m_data);
56 m_upper_streams[0]->reset_done_value();
60 virtual bool process() {
61 if (m_upper_streams.size() < 2) {
64 if (!m_upper_streams[0]->has_value()) {
67 return has_difference();
70 virtual OutputObjType output_value() {
return m_data; }
73 std::vector<functor *> m_upper_streams;
74 typedef std::function<CT(
functor *)> kt_traits_t;
75 std::vector<kt_traits_t> m_traits;
78 typedef std::function<void(
functor *, OutputObjType &)> output_t;
79 std::vector<output_t> m_filler;
84 template <
typename CT,
typename... ARGS>