YAPOG  0.0.1
Yet Another Pokemon Online Game
pgstream.cpp
Go to the documentation of this file.
1 /*
2 pgstream.cpp: main file for the pgstream library.
3 Copyright (c) 2004-2012, Daniel Verite.
4 See COPYING file for license terms.
5 http://www.manitou-mail.org/pgstream
6 */
7 
8 
9 #include <stdlib.h>
10 #include <stdio.h>
11 #include <string.h>
12 #include <ctype.h>
13 #include <malloc.h>
14 
15 #ifdef _MSC_VER
16 #define sprintf sprintf_s
17 #define snprintf _snprintf
18 #pragma warning( push)
19 #pragma warning( disable : 4996 )
20 #endif
21 
22 #include "pgstream/pgstream.hpp"
23 
24 namespace pgs
25 {
26  //static
27  const char* sql_bind_param::m_type_names[] = {
28  "bigint",
29  "bool",
30  "bytea",
31  "int",
32  "numeric",
33  "smallint",
34  "text",
35  "varchar"
36  };
37 
38  //static
39  int sql_bind_param::m_oids[] = {
40  oid_int8,
41  oid_bool,
42  oid_bytea,
43  oid_int4,
44  oid_numeric,
45  oid_int2,
46  oid_text,
47  oid_varchar
48  };
49 
50  void
51  sql_bind_param::set_type(const char* t)
52  {
53  const int arrsz=sizeof(m_type_names)/sizeof(m_type_names[0]);
54  for (int i=0; i<arrsz; i++) {
55  if (!strcmp(t, m_type_names[i])) {
56  m_type_oid=m_oids[i];
57  m_type_name=t;
58  return;
59  }
60  }
61  throw pg_excpt("pgstream", "Unknown variable type");
62  }
63 
64  void
66  {
67  // the type the user has explictly set has precedence
68  // FIXME: express this differently
69  if (!m_type_name.empty())
70  return;
71 
72  m_type_oid=o;
73  switch(o) {
74  case oid_int2:
75  m_type_name="smallint";
76  break;
77  case oid_int4:
78  m_type_name="int";
79  break;
80  case oid_int8:
81  m_type_name="bigint";
82  break;
83  case oid_text:
84  m_type_name="text";
85  break;
86  case oid_varchar:
87  m_type_name="varchar";
88  break;
89  case oid_numeric:
90  m_type_name="numeric";
91  break;
92  case oid_bool:
93  m_type_name="bool";
94  break;
95  case oid_bytea:
96  m_type_name="bytea";
97  break;
98  default:
99  throw pg_excpt("pgstream", "Unknown oid type");
100  }
101  }
102 
103  pg_stream::pg_stream(const std::string query, pg_cnx& db, int prepare_mode) :
104  m_db(db)
105  {
106  init(query.c_str(), prepare_mode, 0);
107  }
108 
109  pg_stream::pg_stream(const char *query, pg_cnx& db, int prepare_mode) :
110  m_db(db)
111  {
112  init(query, prepare_mode, 0);
113  }
114 
115  void
116  pg_stream::init(const char *query, int prepare_mode, unsigned int cursor_step)
117  {
119  if (prepare_mode==2)
121  else
122  m_prepare_wanted = (prepare_mode==1)?true:false;
123 
124  m_argpos = 0;
126  m_query_bufsize = sizeof(m_localQueryBuf)-1;
127  m_query_fmt = query;
128  m_chunk_size = 1024;
129  m_executed = false;
130  m_pg_res = NULL;
131  m_cursor_step = cursor_step;
132 
133  int len = strlen(query);
134  if (len > m_query_bufsize)
135  query_make_space(len);
136 
138  strcpy(m_query_buf, query);
139 
140  const char* q=query;
141  int in_quotespl=0;
142  while (*q) {
143  switch(*q) {
144  case '\'':
145  in_quotespl = !in_quotespl;
146  q++;
147  break;
148  case '\\':
149  q++;
150  if (in_quotespl && *q=='\'') {
151  // skip the quoted simple quote
152  q++;
153  }
154  break;
155  case ':':
156  q++;
157  if (!in_quotespl) {
158  if (*q==':') { // :: is reserved for casting
159  q++;
160  break;
161  }
162  const char* start_var=q;
163  while (isalnum(*q)) q++;
164  std::string vtype;
165  if (*q=='<') {
166  // explicit type
167  const char* start_type=++q;
168  while (isalnum(*q)) q++;
169  if (*q=='>') {
170  vtype.assign(start_type, q-start_type);
171  q++;
172  }
173  else {
174  throw pg_excpt("pg_stream", "syntax error in bind parameter");
175  }
176  }
177  sql_bind_param p(std::string(start_var,q-start_var), (start_var-1)-query);
178  if (!vtype.empty()) {
179  p.set_type(vtype.c_str());
180  }
181  m_vars.push_back(p);
182  }
183  break;
184  default:
185  q++;
186  break;
187  }
188  }
189 
190  if (m_vars.empty())
191  execute();
192  }
193 
195  {
196  if (!m_prepare_name.empty()) {
197  std::string q = std::string("DEALLOCATE ")+m_prepare_name;
198  PGresult* res = PQexec(m_db.conn(), q.c_str());
199  if (res) PQclear(res);
200  }
201  if (!m_cursor_name.empty()) {
202  std::string q = std::string("CLOSE ")+m_cursor_name;
203  PGresult* res = PQexec(m_db.conn(), q.c_str());
204  if (res) PQclear(res);
205  }
206  if (m_pg_res)
207  PQclear(m_pg_res);
209  free(m_query_buf);
210  }
211 
212  int
214 
215  void
217  {
218  char buf[9];
219  sprintf(buf, "%x", ++m_uniq_cnt);
220  m_prepare_name = std::string("pgst_prep_") + buf;
221  std::string args;
222  std::vector<sql_bind_param>::iterator it = m_vars.begin();
223  for (; it != m_vars.end(); ++it) {
224  if (!args.empty())
225  args.append(1,',');
226 #if 0
227  if (it->null()) {
228  args += "int4"; // hack (FIXME)
229  }
230  else
231 #endif
232  args += it->type_name();
233  }
234  if (!args.empty())
235  args = std::string("(") + args + ")";
236  std::string q = std::string("PREPARE ") +
237  m_prepare_name + args + std::string(" AS ") + m_query_buf;
238  PGresult* r = PQexec(m_db.conn(), q.c_str());
239  if (!r || PQresultStatus(r) != PGRES_COMMAND_OK) {
240  m_prepare_name.clear();
241  throw pg_excpt::mk_excpt(r, q.c_str());
242  }
243  }
244 
245  void
247  {
248  char buf[9];
249  sprintf(buf, "%08x", m_uniq_cnt++);
250  m_cursor_name = std::string("pgst_cur_") + buf;
251  std::string q = std::string("DECLARE ") +
252  m_cursor_name + std::string(" CURSOR FOR ") + m_query_buf;
253  PGresult* r = PQexec(m_db.conn(), q.c_str());
254  if (!r || PQresultStatus(r) != PGRES_COMMAND_OK) {
255  m_cursor_name.clear();
256  throw pg_excpt::mk_excpt(r, q.c_str());
257  }
258  }
259 
260  void
262  {
263  m_executed=false;
264  m_argpos=0;
265  if (m_pg_res) {
266  PQclear(m_pg_res);
267  m_pg_res=NULL;
268  }
269  strcpy(m_query_buf, m_query_fmt.c_str());
270  unsigned int i, s=m_vars.size();
271  for (i=0; i<s; i++) {
272  m_vars[i].reset_offset();
273  }
275  }
276 
277  void
279  {
280  if (m_query_len+len < m_query_bufsize)
281  return; // m_query_buf is big enough
283  char* p=(char*)malloc(1+m_query_bufsize+len+m_chunk_size);
284  if (p) {
285  strcpy (p, m_query_buf);
286  m_query_buf = p;
287  }
288  }
289  else {
290  m_query_buf=(char*)realloc(m_query_buf, 1+m_query_bufsize+len+m_chunk_size);
291  }
292  if (!m_query_buf)
293  throw pg_excpt("pg_stream", "not enough memory");
295  m_chunk_size = m_chunk_size*2; // grow exponentially
296  }
297 
298  /*
299  Replace a placeholder inside the query buffer, either by a value if
300  m_inline_params is true, or by a $N PQExecParams-compatible
301  placeholder.
302  - argpos is the number of the placeholder (start at 0)
303  - buf points to the ascii value
304  - size is the value's length or -1 to indicate the sql NULL value
305  - binary is true if buf contains binary data (for a bytea column)
306  */
307  void
308  pg_stream::replace_placeholder(unsigned int argpos, const char* buf, int size, bool binary)
309  {
310  sql_bind_param& p=m_vars[argpos];
311  if (size>=0) {
312  if (!binary)
313  p.set_value(buf);
314  else
315  p.set_binary_data(buf,size);
316  }
317  else {
318  p.set_null();
319  }
320  char bind_num_buf[4]; // a $ followed by up to 3 digits
321 
322  if (!m_inline_params) {
323  if (argpos>=999)
324  throw pg_excpt("pg_stream", "too many bind variables (max:999)");
325  sprintf(bind_num_buf, "$%u", argpos+1);
326  buf = bind_num_buf;
327  size = strlen(bind_num_buf);
328  }
329 
330  if (size>0)
331  query_make_space(size);
332 
333  // Replace the placeholder with the value or the $N variable
334  int placeholder_len=p.name().size()+1; // +1 for the leading ':'
335  // shift the substring at the right of the placeholder
336  memmove(m_query_buf+p.pos()+size,
337  m_query_buf+p.pos()+placeholder_len,
338  m_query_len-(p.pos()+placeholder_len));
339  // insert the value where the placeholder was
340  memcpy(m_query_buf+p.pos(), buf, size);
341  m_query_len+=(size-placeholder_len);
342  m_query_buf[m_query_len]='\0';
343  // change the offsets of the remaining placeholders
344  for (unsigned int i=argpos+1; i<m_vars.size(); i++) {
345  m_vars[i].offset(size-placeholder_len);
346  }
347  }
348 
349  void
351  {
352  if (++m_argpos>=m_vars.size()) {
353  execute();
354  }
355  }
356 
357  pg_stream&
358  pg_stream::operator<<(const char* p)
359  {
360  check_binds();
362 
363  if (!m_inline_params) {
364  if (p)
365  replace_placeholder(m_argpos, p, strlen(p));
366  else
367  replace_placeholder(m_argpos, "null", -1);
368  }
369  else {
370  if (p) {
371  size_t len = strlen(p);
372  /* We use a buffer on stack to store the quoted version of the
373  string parameter if it fits, else we'll allocate the space on the
374  heap. Hopefully it will fit most of times and we'll avoid the malloc
375  overhead */
376  char local_buf[1024+2+1]; // space for 1024 chars + 2 quotes + 1 '\0'
377  char* buf;
378  if (len < (sizeof(local_buf)-1)/2)
379  buf=local_buf;
380  else
381  buf=(char*)malloc(2+2*len+1);
382  int escaped_size=PQescapeString(buf+1, p, len);
383  buf[0]='\'';
384  buf[escaped_size+1]='\'';
385  buf[escaped_size+2]='\0';
386 
387  replace_placeholder(m_argpos, buf, escaped_size+2);
388 
389  if (buf!=local_buf)
390  free(buf);
391  }
392  else {
393  // null pointer => null sql value
394  replace_placeholder(m_argpos, "null", 4);
395  }
396  }
397  next_bind();
398  return *this;
399  }
400 
401  pg_stream&
402  pg_stream::operator<<(const std::string& s)
403  {
404  return operator<<(s.c_str());
405  }
406 
407  pg_stream&
409  {
410  check_binds();
411  char buf[15];
412  if (i>=0) {
413  sprintf(buf,"%d", i);
414  }
415  else {
416  // parenthesis are put around the negative value to avoid a double
417  // dash (a comment) in case the character just before is a dash
418  sprintf(buf,"(%d)", i);
419  }
421  replace_placeholder(m_argpos, buf, strlen(buf));
422  next_bind();
423  return *this;
424  }
425 
426  pg_stream&
427  pg_stream::operator<<(unsigned int i)
428  {
429  check_binds();
430  char buf[15];
431  sprintf(buf,"%u", i);
433  replace_placeholder(m_argpos, buf, strlen(buf));
434  next_bind();
435  return *this;
436  }
437 
438  pg_stream&
440  {
441  check_binds();
442  char buf[15];
443  if (l>=0) {
444  sprintf(buf,"%ld", l);
445  }
446  else {
447  // parenthesis are put around the negative value to avoid a double
448  // dash (a comment) in case the character just before is a dash
449  sprintf(buf,"(%ld)", l);
450  }
452  replace_placeholder(m_argpos, buf, strlen(buf));
453  next_bind();
454  return *this;
455  }
456 
457  pg_stream&
458  pg_stream::operator<<(unsigned long l)
459  {
460  check_binds();
461  char buf[15];
462  sprintf(buf,"%lu", l);
464  replace_placeholder(m_argpos, buf, strlen(buf));
465  next_bind();
466  return *this;
467  }
468 
469  pg_stream&
471  {
472  check_binds();
473  char buf[30];
474  if (l>=0) {
475  sprintf(buf,"%lld", l);
476  }
477  else {
478  // parenthesis are put around the negative value to avoid a double
479  // dash (a comment) in case the character just before is a dash
480  sprintf(buf,"(%lld)", l);
481  }
483  replace_placeholder(m_argpos, buf, strlen(buf));
484  next_bind();
485  return *this;
486  }
487 
488  pg_stream&
489  pg_stream::operator<<(unsigned long long l)
490  {
491  check_binds();
492  char buf[30];
493  sprintf(buf,"%llu", l);
495  replace_placeholder(m_argpos, buf, strlen(buf));
496  next_bind();
497  return *this;
498  }
499 
500  pg_stream&
502  {
503  check_binds();
504  char buf[15];
505  sprintf(buf,"%hd", s);
507  replace_placeholder(m_argpos, buf, strlen(buf));
508  next_bind();
509  return *this;
510  }
511 
512  pg_stream&
513  pg_stream::operator<<(unsigned short s)
514  {
515  check_binds();
516  char buf[15];
517  sprintf(buf,"%hu", s);
519  replace_placeholder(m_argpos, buf, strlen(buf));
520  next_bind();
521  return *this;
522  }
523 
524  pg_stream&
526  {
527  check_binds();
528  char buf[100];
529  snprintf(buf, sizeof(buf), "%g", d);
531  replace_placeholder(m_argpos, buf, strlen(buf));
532  next_bind();
533  return *this;
534  }
535 
536  pg_stream&
538  {
539  check_binds();
540  const char* buf;
541  if (b)
542  buf="true";
543  else
544  buf="false";
546  replace_placeholder(m_argpos, buf, strlen(buf));
547  next_bind();
548  return *this;
549  }
550 
552  m_data_ptr(NULL), m_data_len(0), m_allocated(false)
553  {
554  }
555 
556  pg_bytea::pg_bytea(void* ptr, unsigned int len) :
557  m_data_ptr(NULL), m_data_len(0), m_allocated(false)
558  {
559  set_src_data(ptr, len);
560  }
561 
563  {
564  if (m_allocated && m_data_ptr)
565  PQfreemem(m_data_ptr);
566  }
567 
568  pg_stream&
570  {
571  check_binds();
573  if (!m_inline_params) {
574  if (b.m_data_ptr)
575  replace_placeholder(m_argpos, (const char*)b.m_data_ptr, b.m_data_len, 1);
576  else
577  replace_placeholder(m_argpos, "null", -1);
578  }
579  else {
580  if (b.m_data_ptr) {
581  size_t len = (size_t)b.m_data_len;
582  size_t escaped_size; // includes terminating zero
583  unsigned char* buf = PQescapeBytea((const unsigned char*)b.m_data_ptr,
584  len, &escaped_size);
585 
586  /* TODO: find a way to avoid doubling the memory allocation
587  (either hack something in replace_placeholder for it to
588  add the single quotes itself, or do our own version
589  of PQescapeBytea) */
590  unsigned char* buf1 = (unsigned char*)malloc(escaped_size+2);
591  if (!buf1)
592  throw pg_excpt("pg_stream", "not enough memory");
593 
594  buf1[0]='\'';
595  memcpy(buf1+1, buf, escaped_size);
596  buf1[escaped_size]='\'';
597  buf1[escaped_size+1]='\0';
598  PQfreemem(buf);
599  replace_placeholder(m_argpos, (const char*)buf1, escaped_size+1);
600  free(buf1);
601  }
602  else {
603  // null pointer => null sql value
604  replace_placeholder(m_argpos, "null", 4);
605  }
606  }
607 
608  next_bind();
609  return *this;
610  }
611 
612  pg_stream&
614  {
615  check_binds();
616  replace_placeholder(m_argpos, "null", -1);
617  next_bind();
618  return *this;
619  }
620 
621  void
623  {
624  if (m_executed && m_argpos==m_vars.size()) {
625  // reset the query for another execution
626  reset_results();
627  }
628  if (m_argpos >= m_vars.size())
629  throw pg_excpt("pg_stream", "Mismatch between bound variables and query");
630  }
631 
632 #if 0
633  void
634  pg_stream::print() // DEBUG
635  {
636  std::cout << "buf=" << m_query_buf << std::endl;
637  std::cout << "len=" << m_query_len << ", bufsize=" << m_query_bufsize << std::endl;
638  std::cout << "params\n";
639  for (int i=0; i<(int)m_vars.size(); i++) {
640  const sql_bind_param& v=m_vars[i];
641  std::cout << v.name() << " => pos=" << v.pos() << " value=" << v.value() << "\n";
642  }
643  }
644 #endif
645 
646  void
647  pg_stmt::execute(const char* stmt, pg_cnx& db)
648  {
649  m_res=PQexec(db.conn(), stmt);
650  if (!m_res)
651  throw pg_excpt(stmt, PQerrorMessage(db.conn()));
652  if (PQresultStatus(m_res)!=PGRES_COMMAND_OK) {
653  const char *errp = PQresultErrorField(m_res, PG_DIAG_SQLSTATE);
654  if (errp)
655  throw pg_excpt(stmt, PQresultErrorMessage(m_res), errp);
656  else
657  throw pg_excpt(stmt, PQresultErrorMessage(m_res));
658  }
659  }
660 
661  pg_stmt::pg_stmt(const char* stmt, pg_cnx& db)
662  {
663  execute(stmt, db);
664  }
665 
666  pg_stmt::pg_stmt(const std::string stmt, pg_cnx& db)
667  {
668  execute(stmt.c_str(), db);
669  }
670 
672  {
673  if (m_res)
674  PQclear(m_res);
675  }
676 
677  void
679  {
680  if (m_argpos < m_vars.size()) {
681  const char fmt[] = "%u variable(s) not bound\n";
682  char errbuf[4+sizeof(fmt)];
683  sprintf(errbuf, fmt, m_vars.size()-m_argpos);
684  throw pg_excpt(m_query_buf, errbuf);
685  }
686 
687  if (m_cursor_step > 0) {
688  create_cursor();
689  }
690 
691  if (m_prepare_wanted && m_prepare_name.empty()) {
692  do_prepare();
693  }
694 
695  if (!m_inline_params) {
696  unsigned int sz=m_vars.size();
697  Oid* param_types = (Oid*)alloca(sz*sizeof(Oid));
698  const char** param_values = (const char**)alloca(sz*sizeof(const char*));
699  int* param_lengths = (int*)alloca(sz*sizeof(int));
700  int* param_formats = (int*)alloca(sz*sizeof(int));
701  std::vector<sql_bind_param>::iterator it = m_vars.begin();
702  unsigned int i=0;
703  for (;it != m_vars.end(); ++it, ++i) {
704  param_types[i] = it->pg_type();
705  if (param_types[i]==(int)sql_bind_param::oid_bytea) { // binary
706  param_formats[i] = 1;
707  if (!it->null()) {
708  param_values[i] = (const char*)it->data_ptr();
709  param_lengths[i] = it->data_size();
710  }
711  else {
712  param_values[i] = NULL;
713  param_lengths[i] = 0;
714  }
715  }
716  else { // text
717  param_formats[i] = 0;
718  if (!it->null()) {
719  param_lengths[i] = it->value().length();
720  param_values[i] = it->value().c_str();
721  }
722  else {
723  param_values[i] = NULL;
724  param_lengths[i] = 0;
725  }
726  }
727  }
728  if (m_prepare_name.empty()) {
729  m_pg_res = PQexecParams(m_db.conn(), m_query_buf, sz, param_types,
730  param_values, param_lengths, param_formats,
731  0); // output in text format
732  }
733  else {
734  m_pg_res = PQexecPrepared(m_db.conn(), m_prepare_name.c_str(),
735  sz, param_values, param_lengths,
736  param_formats,
737  0); // output in text format
738  }
739  }
740  else {
741  if (m_cursor_step > 0)
742  cursor_fetch();
743  else
744  m_pg_res=PQexec(m_db.conn(), m_query_buf);
745  }
746  if (!m_pg_res)
747  throw pg_excpt(m_query_buf, PQerrorMessage(m_db.conn()));
748  if (PQresultStatus(m_pg_res)!=PGRES_TUPLES_OK &&
749  PQresultStatus(m_pg_res)!=PGRES_COMMAND_OK) {
750  const char *errp = PQresultErrorField(m_pg_res, PG_DIAG_SQLSTATE);
751  if (errp)
752  throw pg_excpt(m_query_buf, PQresultErrorMessage(m_pg_res), errp);
753  else
754  throw pg_excpt(m_query_buf, PQresultErrorMessage(m_pg_res));
755  }
756  const char* t=PQcmdTuples(m_pg_res);
757  if (t && *t) {
758  m_affected_rows=atoi(t);
759  }
760  else
761  m_affected_rows=0;
762 
763  m_row_number=0;
764  m_col_number=0;
765  m_executed=true;
766  }
767 
768  void
770  {
771  char nb[10];
772  sprintf(nb, "%u", m_cursor_step);
773  std::string fetch = std::string("FETCH ") + nb + " FROM " + m_cursor_name;
774  if (m_pg_res){
775  PQclear(m_pg_res);
776  m_pg_res=NULL;
777  }
778  m_pg_res = PQexec(m_db.conn(), fetch.c_str());
779  if (!m_pg_res || PQresultStatus(m_pg_res)!=PGRES_TUPLES_OK) {
780  // mk_excpt will free the contents of m_pg_res so we need
781  // to zero it at this point.
782  PGresult* res = m_pg_res;
783  m_pg_res=NULL;
784  throw pg_excpt::mk_excpt(res, fetch.c_str());
785  }
786  m_row_number=0;
787  }
788 
789  int
791  {
792  if (!m_executed)
793  execute();
794  if (m_row_number < PQntuples(m_pg_res))
795  return false;
796  else {
797  if (m_cursor_step > 0) {
798  cursor_fetch();
799  return m_row_number >= PQntuples(m_pg_res);
800  }
801  else {
802  return true;
803  }
804  }
805  }
806 
807  void
809  {
810  m_col_number=((m_col_number+1)%PQnfields(m_pg_res));
811  if (m_col_number==0)
812  m_row_number++;
813  }
814 
815  bool
817  {
818  m_val_null = PQgetisnull(m_pg_res, m_row_number, m_col_number)?true:false;
819  return m_val_null;
820  }
821 
822  void
824  {
825  if (eof())
826  throw pg_excpt(m_query_buf, "End of stream reached");
827  }
828 
829  pg_stream&
831  {
832  check_eof();
833  if (PQftype(m_pg_res, m_col_number) != 17)
834  throw pg_excpt("pgstream", "Type mismatch: bytea type expected");
835  m_val_null = PQgetisnull(m_pg_res, m_row_number, m_col_number)?true:false;
836  if (!m_val_null) {
837  b.m_data_len=PQgetlength(m_pg_res, m_row_number, m_col_number);
838  size_t to_len;
839  unsigned char* buf = PQunescapeBytea((const unsigned char*)PQgetvalue(m_pg_res, m_row_number, m_col_number), &to_len);
840  b.m_data_ptr = buf;
841  b.m_data_len = to_len;
842  b.m_allocated = true;
843  }
844  else {
845  b.m_data_ptr=NULL;
846  b.m_data_len=0;
847  }
848  next_result();
849  return *this;
850  }
851 
852  pg_stream&
854  {
855  check_eof();
856  m_val_null = PQgetisnull(m_pg_res, m_row_number, m_col_number)?true:false;
857  if (!m_val_null)
858  // check for overflow??
859  s=atoi(PQgetvalue(m_pg_res, m_row_number, m_col_number));
860  else
861  s=0;
862  next_result();
863  return *this;
864  }
865 
866  pg_stream&
867  pg_stream::operator>>(unsigned short& s)
868  {
869  check_eof();
870  if (!check_null())
871  // check for overflow??
872  s=atoi(PQgetvalue(m_pg_res, m_row_number, m_col_number));
873  else
874  s=0;
875  next_result();
876  return *this;
877  }
878 
879  pg_stream&
881  {
882  check_eof();
883  if (!check_null())
884  i=atoi(PQgetvalue(m_pg_res, m_row_number, m_col_number));
885  else
886  i=0;
887  next_result();
888  return *this;
889  }
890 
891  pg_stream&
892  pg_stream::operator>>(unsigned int& i)
893  {
894  check_eof();
895  unsigned long ul=strtoul(PQgetvalue(m_pg_res, m_row_number, m_col_number),
896  NULL, 10);
897  if (!check_null())
898  i=(unsigned int)ul;
899  else
900  i=0;
901  next_result();
902  return *this;
903  }
904 
905  pg_stream&
907  {
908  check_eof();
909  if (!check_null())
910  d=atof(PQgetvalue(m_pg_res, m_row_number, m_col_number));
911  else
912  d=0.0;
913  next_result();
914  return *this;
915  }
916 
917  pg_stream&
919  {
920  check_eof();
921  if (!check_null()) {
922  const char* p=PQgetvalue(m_pg_res, m_row_number, m_col_number);
923  if (p && *p=='t')
924  b=true;
925  else
926  b=false;
927  }
928  else
929  b=false;
930  next_result();
931  return *this;
932  }
933 
934  pg_stream&
936  {
937  check_eof();
938  if (check_null())
939  *p='\0';
940  else {
941  // pretty dangerous if the buffer is not big enough, but
942  // we have no way of knowing. Better use C++ strings
943  strcpy(p, PQgetvalue(m_pg_res, m_row_number, m_col_number));
944  }
945  next_result();
946  return *this;
947  }
948 
949  pg_stream&
950  pg_stream::operator>>(std::string& s)
951  {
952  check_eof();
953  s=PQgetvalue(m_pg_res, m_row_number, m_col_number);
954  check_null();
955  next_result();
956  return *this;
957  }
958 
959 
960  pg_cnx::pg_cnx(PGconn* cnx) : m_conn(cnx)
961  {
962  m_inline_params=true;
963  m_default_prepare=false;
964  m_destructor_close = (cnx==NULL);
965  m_nested_trans=false;
966  }
967 
968 
969  void
970  pg_cnx::set_option(const char* param, bool bvalue)
971  {
972  if (!strcmp(param, "prepare_statements"))
973  m_default_prepare=bvalue;
974  else if (!strcmp(param, "bind_variables"))
975  m_inline_params=!bvalue;
976  else if (!strcmp(param, "nested_transactions"))
977  m_nested_trans=bvalue;
978  else {
979  std::string err=std::string("Unrecognized option passed to pg_cnx::set_option: ")+param;
980  throw pg_excpt("pg_cnx", err.c_str());
981  }
982  }
983 
984  bool
985  pg_cnx::get_option(const char* param) const
986  {
987  if (!strcmp(param, "prepare_statements"))
988  return m_default_prepare;
989  else if (!strcmp(param, "bind_variables"))
990  return !m_inline_params;
991  else if (!strcmp(param, "nested_transactions"))
992  return m_nested_trans;
993  else {
994  std::string err=std::string("Unrecognized option passed to pg_cnx::get_option: ")+param;
995  throw pg_excpt("pg_cnx", err.c_str());
996  }
997  }
998 
1000  {
1001  if (m_conn) {
1002  if (m_destructor_close)
1003  PQfinish(m_conn);
1004  m_conn=NULL;
1005  }
1006  }
1007 
1008  void
1009  pg_cnx::connect(const char* cnx_string)
1010  {
1011  if (m_conn)
1012  PQfinish(m_conn);
1013  m_conn = PQconnectdb(cnx_string);
1014  if (!m_conn) {
1015  throw pg_excpt("pg_cnx", "Unable to allocate a connection");
1016  }
1017  if (PQstatus(m_conn) == CONNECTION_BAD) {
1018  throw pg_excpt("connect", PQerrorMessage(m_conn));
1019  }
1020  }
1021 
1022  pg_trans::pg_trans(pg_cnx& cnx) : m_cnx(cnx), m_trans_done(false)
1023  {
1024  if (PQtransactionStatus(m_cnx.conn()) != PQTRANS_INTRANS) {
1025  pg_stmt("BEGIN", m_cnx);
1026  m_begin=true;
1027  m_savepoint=false;
1028  }
1029  else {
1030  m_begin=false;
1031  if (m_cnx.m_nested_trans) {
1032  pg_stmt("SAVEPOINT pgs_s", m_cnx);
1033  m_savepoint=true;
1034  }
1035  else
1036  m_savepoint=false;
1037  }
1038  }
1039 
1041  {
1042  if (!m_trans_done)
1043  rollback();
1044  }
1045 
1046  void
1048  {
1049  if (m_begin) {
1050  pg_stmt("COMMIT", m_cnx);
1051  }
1052  else if (m_savepoint) {
1053  pg_stmt("RELEASE SAVEPOINT pgs_s", m_cnx);
1054  }
1055  m_trans_done=true;
1056  }
1057 
1058  void
1060  {
1061  if (m_begin) {
1062  pg_stmt("ROLLBACK", m_cnx);
1063  }
1064  else if (m_savepoint) {
1065  pg_stmt("ROLLBACK TO pgs_s", m_cnx);
1066  }
1067  m_trans_done=true;
1068  }
1069 
1070  /*
1071  Build a pg_excpt instance from a PGresult; then free that result
1072  */
1073  //static
1074  pg_excpt
1075  pg_excpt::mk_excpt(PGresult *r, const char* query/*=NULL*/)
1076  {
1077  std::string errstr="";
1078  std::string ecode;
1079  if (r) {
1080  errstr = PQresultErrorMessage(r);
1081  ecode = PQresultErrorField(r, PG_DIAG_SQLSTATE);
1082  PQclear(r);
1083  }
1084  return pg_excpt(query?query:"pgstream", errstr.c_str(), ecode);
1085  }
1086 
1087  std::string
1089  {
1090  std::string txt = m_query;
1091  if (!txt.empty())
1092  txt.append("\n");
1093  txt.append(m_err_msg);
1094  if (m_err_code.empty()) {
1095  txt.append("(ERR=");
1096  txt.append(m_err_code);
1097  txt.append(")\n");
1098  }
1099  return txt;
1100  }
1101 
1102  pg_cursor::pg_cursor(unsigned int step, const char* query, pg_cnx& cnx):
1103  pg_stream(cnx)
1104  {
1105  init(query, 2, step);
1106  }
1107 
1108  pg_cursor::pg_cursor(unsigned int step, std::string query, pg_cnx& cnx):
1109  pg_stream(cnx)
1110  {
1111  init(query.c_str(), 2, step);
1112  }
1113 } // namespace pgs
1114 
1115 #ifdef _MSC_VER
1116 #pragma warning( pop)
1117 #endif