YPC  0.2.0
csv_reader.h
Go to the documentation of this file.
1 
8 #pragma once
9 #include "csv.h"
10 #include "csv_read_assign_helper.h"
11 #include "ypc/corecommon/package.h"
12 #include <boost/property_tree/json_parser.hpp>
13 #include <boost/property_tree/ptree.hpp>
14 #include <ff/util/ntobject.h>
15 #include <fstream>
16 #include <iostream>
17 #include <memory>
18 #include <stdint.h>
19 
20 #ifdef __cplusplus
21 extern "C" {
22 #endif
23 
24 void *create_item_reader(const char *extra_param, int len);
25 
26 int reset_for_read(void *handle);
27 int read_item_data(void *handle, char *buf, int *len);
28 int close_item_reader(void *handle);
29 uint64_t get_item_number(void *handle);
30 
31 #ifdef __cplusplus
32 }
33 #endif
34 
35 namespace ypc {
36 namespace plugins {
37 class csv_reader {
38 public:
39  virtual int reset_for_read() = 0;
40  virtual int read_item_data(char *buf, int *len) = 0;
41  virtual int close_item_reader() = 0;
42  virtual int get_item_number() = 0;
43 };
44 
45 
46 template <typename NT> struct ntobject_size { const static size_t size = 0; };
47 template <typename T1> struct ntobject_size<::ff::util::ntobject<T1>> {
48  const static size_t size = 1;
49 };
50 template <typename T1, typename... ARGS>
51 struct ntobject_size<::ff::util::ntobject<T1, ARGS...>> {
52  const static size_t size =
53  1 + ntobject_size<::ff::util::ntobject<ARGS...>>::size;
54 };
55 
56 template <typename T>
57 class typed_csv_reader : public csv_reader {
58 public:
59  typedef T item_t;
60 
61  // TODO we should add more csv options here
63  // {file_path: "xxx"}
64  typed_csv_reader(const std::string &extra_param)
65  : m_extra_param(extra_param), m_file_path(extra_param) {
66  /*
67  boost::property_tree::ptree pt;
68  std::stringstream ss;
69  ss << extra_param;
70  boost::property_tree::json_parser::read_json(ss, pt);
71  m_file_path = pt.get_child("file_path").get_value<std::string>();
72 */
73  m_stream.reset(new std::ifstream(m_file_path));
74  if (!m_stream->is_open()) {
75  throw std::runtime_error("file not exist");
76  }
77  m_reader.reset(
78  new io::CSVReader<ntobject_size<item_t>::size>(m_file_path, *m_stream));
79  }
80  virtual int reset_for_read() {
81  m_stream.reset(new std::ifstream(m_file_path));
82  if (!m_stream->is_open()) {
83  return -1;
84  }
85  m_reader.reset(
86  new io::CSVReader<ntobject_size<item_t>::size>(m_file_path, *m_stream));
87  return 0;
88  }
89  item_t read_typed_item() {
90  typedef typename cast_obj_to_package<item_t>::type package_t;
91  package_t v;
92  internal::assign_helper<item_t>::read_row(m_reader.get(), v);
93  item_t ret = v;
94  return ret;
95  }
96 
97  virtual int read_item_data(char *buf, int *len) {
98  typedef typename cast_obj_to_package<item_t>::type package_t;
99  package_t v;
100  bool rv = internal::assign_helper<item_t>::read_row(m_reader.get(), v);
101  if (!rv) {
102  return 1;
103  }
104  if (len) {
105  ff::net::marshaler lm(ff::net::marshaler::length_retriver);
106  v.arch(lm);
107  *len = static_cast<int>(lm.get_length());
108  }
109  if (buf) {
110  ff::net::marshaler sm(buf, *len, ff::net::marshaler::serializer);
111  v.arch(sm);
112  }
113  return 0;
114  }
115  virtual int close_item_reader() {
116  m_reader.reset();
117  m_stream.reset();
118  return 0;
119  }
120  virtual int get_item_number() {
121  std::ifstream s(m_file_path);
123 
124  while (r.next_line()) {
125  }
126  auto l = r.get_file_line();
127  return l;
128  }
129 
130 protected:
131  const std::string m_extra_param;
132  std::string m_file_path;
133  std::unique_ptr<std::ifstream> m_stream;
134  std::unique_ptr<io::CSVReader<ntobject_size<item_t>::size>> m_reader;
135 };
136 } // namespace plugins
137 } // namespace ypc
138 
139 #define impl_csv_reader(type) \
140  void *create_item_reader(const char *extra_param, int len) { \
141  try { \
142  ypc::plugins::csv_reader *reader = \
143  new type(std::string(extra_param, len)); \
144  return reader; \
145  } catch (const std::exception &e) { \
146  std::cout << "create_item_reader got exception: " << e.what() \
147  << std::endl; \
148  return nullptr; \
149  } \
150  } \
151  int reset_for_read(void *handle) { \
152  ypc::plugins::csv_reader *reader = (ypc::plugins::csv_reader *)handle; \
153  return reader->reset_for_read(); \
154  } \
155  int read_item_data(void *handle, char *buf, int *len) { \
156  ypc::plugins::csv_reader *reader = (ypc::plugins::csv_reader *)handle; \
157  return reader->read_item_data(buf, len); \
158  } \
159  int close_item_reader(void *handle) { \
160  ypc::plugins::csv_reader *reader = (ypc::plugins::csv_reader *)handle; \
161  reader->close_item_reader(); \
162  delete reader; \
163  return 0; \
164  } \
165  \
166  uint64_t get_item_number(void *handle) { \
167  ypc::plugins::csv_reader *reader = (ypc::plugins::csv_reader *)handle; \
168  return reader->get_item_number(); \
169  }
170 
ypc::plugins::ntobject_size
Definition: csv_reader.h:46
ypc::plugins::typed_csv_reader
Definition: csv_reader.h:57
io::CSVReader
Definition: csv.h:1115
ypc::plugins::csv_reader
Definition: csv_reader.h:37
ypc::plugins::typed_csv_reader::typed_csv_reader
typed_csv_reader(const std::string &extra_param)
@extra_param should be a json string, like this
Definition: csv_reader.h:64