YPC  0.2.0
union.h
1 #pragma once
2 #include <hpda/processor/processor_base.h>
3 #include <hpda/processor/set/helper/copy_helper.h>
4 #include <hpda/processor/set/helper/upper_stream_helper.h>
5 
6 namespace hpda {
7 namespace processor {
8 namespace internal {
9 
10 template <typename OutputObjType, typename CT>
12  : public ::hpda::internal::processor_with_output<OutputObjType> {
13 
14 public:
15  ordered_union_impl() : m_set_engine(false) {}
16 
17  template <typename KT, typename InputObjType>
18  void add_upper_stream(
20  return upper_stream_helper<OutputObjType, CT, KT,
21  InputObjType>::add_upper_stream(upper_stream,
22  m_upper_streams,
23  m_traits,
24  m_filler,
25  m_set_engine,
26  this);
27  }
28 
29  bool has_union() {
30  bool flag = false;
31  size_t index;
32  CT val;
33  for (size_t i = 0; i < m_upper_streams.size(); i++) {
34  if (m_upper_streams[i]->has_value()) {
35  CT tmp = m_traits[i](m_upper_streams[i]);
36  if (!flag) {
37  val = tmp;
38  index = i;
39  flag = true;
40  }
41  if (val > tmp) {
42  val = tmp;
43  index = i;
44  }
45  }
46  }
47  if (!flag) {
48  return false;
49  }
50  for (size_t i = 0; i < m_upper_streams.size(); i++) {
51  if (m_upper_streams[i]->has_value()) {
52  CT tmp = m_traits[i](m_upper_streams[i]);
53  if (i != index && tmp == val) {
54  m_upper_streams[i]->reset_done_value();
55  }
56  }
57  }
58  m_filler[index](m_upper_streams[index], m_data);
59  m_upper_streams[index]->reset_done_value();
60  return true;
61  }
62 
63  virtual bool process() {
64  if (m_upper_streams.size() < 2) {
65  return false;
66  }
67  return has_union();
68  }
69 
70  virtual OutputObjType output_value() { return m_data; }
71 
72 protected:
73  std::vector<functor *> m_upper_streams;
74  typedef std::function<CT(functor *)> kt_traits_t;
75  std::vector<kt_traits_t> m_traits;
76  bool m_set_engine;
77 
78  typedef std::function<void(functor *, OutputObjType &)> output_t;
79  std::vector<output_t> m_filler;
80  OutputObjType m_data;
81 };
82 
83 } // namespace internal
84 template <typename CT, typename... ARGS>
85 using ordered_union = internal::ordered_union_impl<ntobject<ARGS...>, CT>;
86 } // namespace processor
87 } // namespace hpda
hpda::processor::internal::ordered_union_impl
Definition: union.h:11
hpda::processor::internal::upper_stream_helper
Definition: upper_stream_helper.h:11
hpda::internal::processor_with_output
Definition: processor_with_output.h:9
hpda::functor
Definition: functor.h:7