YPC  0.2.0
concat.h
1 #pragma once
2 #include <hpda/processor/processor_base.h>
3 
4 namespace hpda {
5 namespace processor {
6 namespace internal {
7 template <typename InputObjType>
8 class concat_impl : public processor_base<InputObjType, InputObjType> {
9 public:
14  m_upper_streams.push_back(upper_stream);
15  m_index = 0;
16  }
17 
18  void add_upper_stream(
20  m_upper_streams.push_back(upper_stream);
21  base::add_predecessor(upper_stream);
22  }
23  virtual bool process() {
24  while (m_index < m_upper_streams.size() &&
25  !m_upper_streams[m_index]->has_value()) {
26  m_index++;
27  }
28  if (m_index >= m_upper_streams.size()) {
29  return false;
30  }
31  auto b = m_upper_streams[m_index]->has_value();
32  if (b) {
33  m_data = m_upper_streams[m_index]->output_value().make_copy();
34  }
35  m_upper_streams[m_index]->reset_done_value();
36  return b;
37  }
38 
39  virtual InputObjType output_value() { return m_data; }
40 
41 protected:
42  typedef ::hpda::internal::processor_with_output<InputObjType> upper_stream_t;
43  size_t m_index;
44  std::vector<upper_stream_t *> m_upper_streams;
45  InputObjType m_data;
46 };
47 } // namespace internal
48 template <typename... ARGS>
49 using concat = internal::concat_impl<ntobject<ARGS...>>;
50 } // namespace processor
51 } // namespace hpda
hpda::processor::internal::processor_base
Definition: processor_base.h:9
hpda::processor::internal::concat_impl
Definition: concat.h:8
hpda::internal::processor_with_output< InputObjType >