YPC  0.2.0
json_to_data_batch.h
1 #pragma once
2 #include <boost/foreach.hpp>
3 #include <boost/property_tree/ptree.hpp>
4 #include <glog/logging.h>
5 #include <hpda/processor/processor_base.h>
6 
7 namespace hpda {
8 namespace processor {
9 namespace internal {
10 
11 template <typename OutputObjType>
13  : public processor_base<std::string, OutputObjType> {
14 public:
15  typedef std::function<const boost::property_tree::ptree &(
16  const boost::property_tree::ptree &)>
17  data_item_picker_t;
18  typedef std::function<OutputObjType(const std::string &item)>
19  item_transformer_t;
20 
23  const data_item_picker_t &data_item_picker,
24  const item_transformer_t &item_transformer)
26  m_func(data_item_picker), m_item_transformer(item_transformer),
27  m_next_data_index(0) {}
28  virtual ~json_to_data_batch_impl() {}
29 
31 
32  virtual bool process() {
33  m_next_data_index++;
34  if (m_next_data_index >= m_all_data.size()) {
35  if (!parse_data()) {
36  base::consume_input_value();
37  return false;
38  }
39  }
40  return true;
41  }
42 
43  virtual OutputObjType output_value() { return m_all_data[m_next_data_index]; }
44 
45 protected:
46  bool parse_data() {
47  m_next_data_index = 0;
48  m_all_data.clear();
49  if (!base::has_input_value()) {
50  return false;
51  }
52  try {
53  std::string v = base::input_value();
54  std::stringstream ss;
55  ss << v;
56  boost::property_tree::ptree tree;
57  boost::property_tree::read_json(ss, tree);
58  auto child = m_func(tree);
59  BOOST_FOREACH (boost::property_tree::ptree::value_type &v, child) {
60  assert(v.first.empty()); // array elements have no names
61  try {
62  m_all_data.push_back(m_item_transformer(v.second.data()));
63  } catch (std::exception &e) {
64  // ignore this data
65  continue;
66  }
67  }
68 
69  return true;
70  } catch (std::exception &e) {
71  LOG(INFO) << "parse_data got exception: " << e.what();
72  return false;
73  }
74  }
75 
76 protected:
77  std::vector<OutputObjType> m_all_data;
78  int m_next_data_index;
79  data_item_picker_t m_func;
80  item_transformer_t m_item_transformer;
81 };
82 } // namespace internal
83 template <typename... ARGS>
84 using json_to_data_batch = internal::json_to_data_batch_impl<ntobject<ARGS...>>;
85 } // namespace processor
86 } // namespace hpda
hpda::processor::internal::processor_base
Definition: processor_base.h:9
hpda::processor::internal::json_to_data_batch_impl
Definition: json_to_data_batch.h:12
hpda::internal::processor_with_output< std::string >