YPC  0.2.0
mysql_reader.h
Go to the documentation of this file.
1 
9 #pragma once
10 #include "ypc/corecommon/package.h"
11 #include <boost/property_tree/json_parser.hpp>
12 #include <boost/property_tree/ptree.hpp>
13 #include <ff/net/middleware/ntpackage.h>
14 #include <ff/sql/mysql.hpp>
15 #include <ff/util/ntobject.h>
16 #include <fstream>
17 #include <iostream>
18 #include <memory>
19 #include <stdint.h>
20 
21 #ifdef __cplusplus
22 extern "C" {
23 #endif
24 
25 void *create_item_reader(const char *extra_param, int len);
26 
27 int reset_for_read(void *handle);
28 int read_item_data(void *handle, char *buf, int *len);
29 int close_item_reader(void *handle);
30 uint64_t get_item_number(void *handle);
31 
32 #ifdef __cplusplus
33 }
34 #endif
35 
36 namespace ypc {
37 namespace plugins {
38 class mysql_reader {
39 public:
40  virtual int reset_for_read() = 0;
41  virtual int read_item_data(char *buf, int *len) = 0;
42  virtual int close_item_reader() = 0;
43  virtual int get_item_number() = 0;
44 };
45 
46 template <typename T, typename Table>
48 public:
49  typedef T item_t;
50 
51  // TODO we should add more mysql options here
53  // {url: "xxx", username:"xxx", password:"xxx", dbname="xxx", table{name:"xx",
54  // schema:[]}}
55  typed_mysql_reader(const std::string &extra_param)
56  : m_extra_param(extra_param) {
57  boost::property_tree::ptree pt;
58  boost::property_tree::json_parser::read_json(m_extra_param, pt);
59 
60  auto url = pt.get_child("url").get_value<std::string>();
61  auto username = pt.get_child("username").get_value<std::string>();
62  auto password = pt.get_child("password").get_value<std::string>();
63  auto dbname = pt.get_child("dbname").get_value<std::string>();
64  m_engine.reset(
65  new ff::sql::mysql<ff::sql::cppconn>(url, username, password, dbname));
66  }
67 
68  virtual int reset_for_read() {
69  m_to_read_index = 0;
70  return 0;
71  }
72  template <typename NTObjType> struct read_helper {};
73  template <typename... ARGS>
74  struct read_helper<::ff::util::ntobject<ARGS...>> {
75  template <typename TableT, typename EngineT, typename RetT>
76  static void read_item_data(EngineT *engine, RetT &ret) {
77  ret = TableT::template select<ARGS...>(engine).eval();
78  }
79  };
80 
81  template <typename... ARGS> struct assign_helper {};
82  template <typename ARGT> struct assign_helper<ARGT> {
83  template <typename RT, typename ST>
84  static void assign(RT &to, const ST &from) {
85  to.template set<ARGT>(from.template get<ARGT>());
86  }
87  };
88 
89  template <typename ARGT1, typename ARGT2, typename... ARGS>
90  struct assign_helper<ARGT1, ARGT2, ARGS...> {
91  template <typename RT, typename ST>
92  static void assign(RT &to, const ST &from) {
93  to.template set<ARGT1>(from.template get<ARGT1>());
95  }
96  };
97  template <typename RT> struct assign_package {};
98  template <typename... ARGS>
99  struct assign_package<::ff::util::ntobject<ARGS...>> {
100  template <typename PT>
101  static void to(PT &_to, const ::ff::util::ntobject<ARGS...> &from) {
103  }
104  };
105 
106  virtual int read_item_data(char *buf, int *len) {
107  typedef typename ypc::cast_obj_to_package<item_t>::type package_t;
108  if (m_all_items.empty()) {
109  read_all_items();
110  }
111 
112  if (m_to_read_index >= m_all_items.size()) {
113  return -1;
114  }
115  package_t v;
116  assign_package<item_t>::to(v, m_all_items[m_to_read_index]);
117  m_to_read_index++;
118 
119  if (len) {
120  ff::net::marshaler lm(ff::net::marshaler::length_retriver);
121  v.arch(lm);
122  *len = static_cast<int>(lm.get_length());
123  }
124  if (buf) {
125  ff::net::marshaler sm(buf, *len, ff::net::marshaler::serializer);
126  v.arch(sm);
127  }
128  return 0;
129  }
130 
131  virtual int close_item_reader() {
132  m_all_items.clear();
133  return 0;
134  }
135  virtual int get_item_number() {
136  if (m_all_items.empty()) {
137  read_all_items();
138  }
139  return static_cast<int>(m_all_items.size());
140  }
141  const item_t &item_at(size_t index) const { return m_all_items[index]; }
142 
143 protected:
144  void read_all_items() {
145  read_helper<item_t>::template read_item_data<Table>(m_engine.get(),
146  m_all_items);
147  m_to_read_index = 0;
148  }
149 
150  const std::string m_extra_param;
151  std::unique_ptr<::ff::sql::mysql<ff::sql::cppconn>> m_engine;
152  typename Table::row_collection_type m_all_items;
153  size_t m_to_read_index;
154 };
155 } // namespace plugins
156 } // namespace ypc
157 
158 #define impl_mysql_reader(type) \
159  void *create_item_reader(const char *extra_param, int len) { \
160  try { \
161  ypc::plugins::mysql_reader *reader = \
162  new type(std::string(extra_param, len)); \
163  return reader; \
164  } catch (const std::exception &e) { \
165  std::cout << "create_item_reader got: " << e.what() << std::endl; \
166  return nullptr; \
167  } \
168  } \
169  int reset_for_read(void *handle) { \
170  ypc::plugins::mysql_reader *reader = (ypc::plugins::mysql_reader *)handle; \
171  return reader->reset_for_read(); \
172  } \
173  int read_item_data(void *handle, char *buf, int *len) { \
174  ypc::plugins::mysql_reader *reader = (ypc::plugins::mysql_reader *)handle; \
175  return reader->read_item_data(buf, len); \
176  } \
177  int close_item_reader(void *handle) { \
178  ypc::plugins::mysql_reader *reader = (ypc::plugins::mysql_reader *)handle; \
179  reader->close_item_reader(); \
180  delete reader; \
181  return 0; \
182  } \
183  \
184  uint64_t get_item_number(void *handle) { \
185  ypc::plugins::mysql_reader *reader = (ypc::plugins::mysql_reader *)handle; \
186  return reader->get_item_number(); \
187  }
188 
ypc::plugins::typed_mysql_reader::typed_mysql_reader
typed_mysql_reader(const std::string &extra_param)
@extra_param should be a json string, like this
Definition: mysql_reader.h:55
ypc::plugins::typed_mysql_reader
Definition: mysql_reader.h:47
ypc::plugins::typed_mysql_reader::assign_helper
Definition: mysql_reader.h:81
ypc::plugins::mysql_reader
Definition: mysql_reader.h:38
ypc::cast_obj_to_package
Definition: package.h:67
ypc::plugins::typed_mysql_reader::assign_package
Definition: mysql_reader.h:97
ypc::plugins::typed_mysql_reader::read_helper
Definition: mysql_reader.h:72