00001 typedef struct dblib_buffer_row {
00003 TDSRESULTINFO *resinfo;
00005 unsigned char *row_data;
00007 DBINT row;
00009 TDS_INT *sizes;
00010 } DBLIB_BUFFER_ROW;
00011
00012 static void buffer_struct_print(const DBPROC_ROWBUF *buf);
00013 static int buffer_save_row(DBPROCESS *dbproc);
00014 static DBLIB_BUFFER_ROW* buffer_row_address(const DBPROC_ROWBUF * buf, int idx);
00015
00016 #if ENABLE_EXTRA_CHECKS
00017 static void buffer_check(const DBPROC_ROWBUF *buf)
00018 {
00019 int i;
00020
00021
00022 if (buf->capacity == 0 || buf->capacity == 1) {
00023 assert(buf->head == 0);
00024 assert(buf->tail == 0 || buf->tail == 1);
00025 assert(buf->capacity == 1 || buf->rows == NULL);
00026 return;
00027 }
00028
00029 assert(buf->capacity > 0);
00030 assert(buf->head >= 0);
00031 assert(buf->tail >= 0);
00032 assert(buf->head < buf->capacity);
00033 assert(buf->tail <= buf->capacity);
00034
00035
00036 if (buf->tail == buf->capacity) {
00037 assert(buf->head == 0);
00038 for (i = 0; buf->rows && i < buf->capacity; ++i) {
00039 assert(buf->rows[i].resinfo == NULL);
00040 assert(buf->rows[i].row_data == NULL);
00041 assert(buf->rows[i].sizes == NULL);
00042 assert(buf->rows[i].row == 0);
00043 }
00044 return;
00045 }
00046
00047 if (buf->rows == NULL)
00048 return;
00049
00050
00051 i = buf->tail;
00052 do {
00053 assert(i >= 0 && i < buf->capacity);
00054 assert(buf->rows[i].resinfo != NULL);
00055 assert(buf->rows[i].row > 0);
00056 assert(buf->rows[i].row <= buf->received);
00057 ++i;
00058 if (i == buf->capacity)
00059 i = 0;
00060 } while (i != buf->head);
00061
00062
00063 if (buf->head != buf->tail) {
00064 i = buf->head;
00065 do {
00066 assert(i >= 0 && i < buf->capacity);
00067 assert(buf->rows[i].resinfo == NULL);
00068 assert(buf->rows[i].row_data == NULL);
00069 assert(buf->rows[i].sizes == NULL);
00070 assert(buf->rows[i].row == 0);
00071 ++i;
00072 if (i == buf->capacity)
00073 i = 0;
00074 } while (i != buf->tail);
00075 }
00076 }
00077 #define BUFFER_CHECK(buf) buffer_check(buf)
00078 #else
00079 #define BUFFER_CHECK(buf) do {} while(0)
00080 #endif
00081
00110 static int
00111 buffer_count(const DBPROC_ROWBUF *buf)
00112 {
00113 BUFFER_CHECK(buf);
00114 return (buf->head > buf->tail) ?
00115 buf->head - buf->tail :
00116 buf->capacity - (buf->tail - buf->head);
00117 }
00118
00122 static int
00123 buffer_is_full(const DBPROC_ROWBUF *buf)
00124 {
00125 BUFFER_CHECK(buf);
00126 return buf->capacity == buffer_count(buf) && buf->capacity > 1;
00127 }
00128
00129 #ifndef NDEBUG
00130 static int
00131 buffer_index_valid(const DBPROC_ROWBUF *buf, int idx)
00132 {
00133 BUFFER_CHECK(buf);
00134 if (buf->tail <= buf->head)
00135 if (buf->head <= idx && idx <= buf->tail)
00136 return 1;
00137
00138 if (0 <= idx && idx <= buf->head)
00139 return 1;
00140
00141 if (buf->tail <= idx && idx < buf->capacity)
00142 return 1;
00143 #if 0
00144 printf("buffer_index_valid: idx = %d\n", idx);
00145 buffer_struct_print(buf);
00146 #endif
00147 return 0;
00148 }
00149 #endif
00150
00151 static void
00152 buffer_free_row(DBLIB_BUFFER_ROW *row)
00153 {
00154 if (row->sizes)
00155 TDS_ZERO_FREE(row->sizes);
00156 if (row->row_data) {
00157 tds_free_row(row->resinfo, row->row_data);
00158 row->row_data = NULL;
00159 }
00160 tds_free_results(row->resinfo);
00161 row->resinfo = NULL;
00162 row->row = 0;
00163 }
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175 static void
00176 buffer_free(DBPROC_ROWBUF *buf)
00177 {
00178 BUFFER_CHECK(buf);
00179 if (buf->rows != NULL) {
00180 int i;
00181 for (i = 0; i < buf->capacity; ++i)
00182 buffer_free_row(&buf->rows[i]);
00183 TDS_ZERO_FREE(buf->rows);
00184 }
00185 BUFFER_CHECK(buf);
00186 }
00187
00188
00189
00190
00191
00192 static void
00193 buffer_reset(DBPROC_ROWBUF *buf)
00194 {
00195 buf->head = 0;
00196 buf->current = buf->tail = buf->capacity;
00197 BUFFER_CHECK(buf);
00198 }
00199
00200 static int
00201 buffer_idx_increment(const DBPROC_ROWBUF *buf, int idx)
00202 {
00203 if (++idx >= buf->capacity) {
00204 idx = 0;
00205 }
00206 return idx;
00207 }
00208
00213 static DBLIB_BUFFER_ROW*
00214 buffer_row_address(const DBPROC_ROWBUF * buf, int idx)
00215 {
00216 BUFFER_CHECK(buf);
00217 if (idx < 0 || idx >= buf->capacity) {
00218 printf("idx is %d:\n", idx);
00219 buffer_struct_print(buf);
00220 return NULL;
00221 }
00222
00223 return &(buf->rows[idx]);
00224 }
00225
00229 static DBINT
00230 buffer_idx2row(const DBPROC_ROWBUF *buf, int idx)
00231 {
00232 BUFFER_CHECK(buf);
00233 return buffer_row_address(buf, idx)->row;
00234 }
00235
00239 static int
00240 buffer_row2idx(const DBPROC_ROWBUF *buf, int row_number)
00241 {
00242 int i, ii, idx = -1;
00243
00244 BUFFER_CHECK(buf);
00245 if (buf->tail == buf->capacity) {
00246 assert (buf->head == 0);
00247 return -1;
00248 }
00249
00250
00251
00252
00253
00254 for (ii=0, i = buf->tail; i != buf->head || ii == 0; i = buffer_idx_increment(buf, i)) {
00255 if( buffer_idx2row(buf, i) == row_number) {
00256 idx = i;
00257 break;
00258 }
00259 assert(ii++ < buf->capacity);
00260 }
00261
00262 return idx;
00263 }
00264
00269 static void
00270 buffer_delete_rows(DBPROC_ROWBUF * buf, int count)
00271 {
00272 int i;
00273
00274 BUFFER_CHECK(buf);
00275 if (count < 0 || count > buffer_count(buf)) {
00276 count = buffer_count(buf);
00277 }
00278
00279 for (i=0; i < count; i++) {
00280 if (buf->tail < buf->capacity)
00281 buffer_free_row(&buf->rows[buf->tail]);
00282 buf->tail = buffer_idx_increment(buf, buf->tail);
00283
00284
00285
00286
00287 if (buf->tail == buf->head) {
00288 buffer_reset(buf);
00289 break;
00290 }
00291 }
00292 #if 0
00293 buffer_struct_print(buf);
00294 #endif
00295 BUFFER_CHECK(buf);
00296 }
00297
00298 static void
00299 buffer_transfer_bound_data(DBPROC_ROWBUF *buf, TDS_INT res_type, TDS_INT compute_id, DBPROCESS * dbproc, int idx)
00300 {
00301 int i;
00302 int srctype, desttype;
00303 BYTE *src;
00304 const DBLIB_BUFFER_ROW *row;
00305
00306 tdsdump_log(TDS_DBG_FUNC, "buffer_transfer_bound_data(%p %d %d %p %d)\n", buf, res_type, compute_id, dbproc, idx);
00307 BUFFER_CHECK(buf);
00308 assert(buffer_index_valid(buf, idx));
00309
00310 row = buffer_row_address(buf, idx);
00311 assert(row->resinfo);
00312
00313 for (i = 0; i < row->resinfo->num_cols; i++) {
00314 DBINT srclen;
00315 TDSCOLUMN *curcol = row->resinfo->columns[i];
00316
00317 if (row->sizes)
00318 curcol->column_cur_size = row->sizes[i];
00319
00320 if (curcol->column_nullbind) {
00321 if (curcol->column_cur_size < 0) {
00322 *(DBINT *)(curcol->column_nullbind) = -1;
00323 } else {
00324 *(DBINT *)(curcol->column_nullbind) = 0;
00325 }
00326 }
00327 if (!curcol->column_varaddr)
00328 continue;
00329
00330 if (row->row_data)
00331 src = &row->row_data[curcol->column_data - row->resinfo->current_row];
00332 else
00333 src = curcol->column_data;
00334 srclen = curcol->column_cur_size;
00335 if (is_blob_col(curcol))
00336 src = (BYTE *) ((TDSBLOB *) src)->textvalue;
00337 desttype = _db_get_server_type(curcol->column_bindtype);
00338 srctype = tds_get_conversion_type(curcol->column_type, curcol->column_size);
00339
00340 if (srclen <= 0) {
00341 if (srclen == 0 || !curcol->column_nullbind)
00342 dbgetnull(dbproc, curcol->column_bindtype, curcol->column_bindlen,
00343 (BYTE *) curcol->column_varaddr);
00344 } else {
00345 copy_data_to_host_var(dbproc, srctype, src, srclen, desttype,
00346 (BYTE *) curcol->column_varaddr, curcol->column_bindlen,
00347 curcol->column_bindtype, (DBINT*) curcol->column_nullbind);
00348 }
00349 }
00350
00351
00352
00353
00354
00355
00356
00357
00358 buf->current = buffer_idx_increment(buf, buf->current);
00359
00360 }
00361
00362 static void
00363 buffer_struct_print(const DBPROC_ROWBUF *buf)
00364 {
00365 assert(buf);
00366
00367 printf("\t%d rows in buffer\n", buffer_count(buf));
00368
00369 printf("\thead = %d\t", buf->head);
00370 printf("\ttail = %d\t", buf->tail);
00371 printf("\tcurrent = %d\n", buf->current);
00372 printf("\tcapacity = %d\t", buf->capacity);
00373 printf("\thead row number = %d\n", buf->received);
00374 }
00375
00376
00377
00394 static int
00395 buffer_current_index(const DBPROCESS *dbproc)
00396 {
00397 const DBPROC_ROWBUF *buf = &dbproc->row_buf;
00398 #if 0
00399 buffer_struct_print(buf);
00400 #endif
00401 if (buf->capacity <= 1)
00402 return -1;
00403 if (buf->current == buf->head || buf->current == buf->capacity)
00404 return -1;
00405
00406 assert(buf->current >= 0);
00407 assert(buf->current < buf->capacity);
00408
00409 if( buf->tail < buf->head) {
00410 assert(buf->tail < buf->current);
00411 assert(buf->current < buf->head);
00412 } else {
00413 if (buf->current > buf->head)
00414 assert(buf->current > buf->tail);
00415 }
00416 return buf->current;
00417 }
00418
00419
00420
00421
00422
00423 static void
00424 buffer_set_capacity(DBPROCESS *dbproc, int nrows)
00425 {
00426 DBPROC_ROWBUF *buf = &dbproc->row_buf;
00427
00428 buffer_free(buf);
00429
00430 memset(buf, 0, sizeof(DBPROC_ROWBUF));
00431
00432 if (0 == nrows) {
00433 buf->capacity = 1;
00434 BUFFER_CHECK(buf);
00435 return;
00436 }
00437
00438 assert(0 < nrows);
00439
00440 buf->capacity = nrows;
00441 BUFFER_CHECK(buf);
00442 }
00443
00444
00445
00446
00447
00448
00449
00450
00451
00452 static void
00453 buffer_alloc(DBPROCESS *dbproc)
00454 {
00455 DBPROC_ROWBUF *buf = &dbproc->row_buf;
00456
00457
00458
00459 assert(buf);
00460 assert(buf->capacity > 0);
00461 assert(buf->rows == NULL);
00462
00463 buf->rows = (DBLIB_BUFFER_ROW *) calloc(buf->capacity, sizeof(DBLIB_BUFFER_ROW));
00464
00465 assert(buf->rows);
00466
00467 buffer_reset(buf);
00468
00469 buf->received = 0;
00470 }
00471
00476 static int
00477 buffer_add_row(DBPROCESS *dbproc, TDSRESULTINFO *resinfo)
00478 {
00479 DBPROC_ROWBUF *buf = &dbproc->row_buf;
00480 DBLIB_BUFFER_ROW *row;
00481 int i;
00482
00483 assert(buf->capacity >= 0);
00484
00485 if (buffer_is_full(buf))
00486 return -1;
00487
00488 row = buffer_row_address(buf, buf->head);
00489
00490
00491 if (row->resinfo) {
00492 tds_free_row(row->resinfo, row->row_data);
00493 tds_free_results(row->resinfo);
00494 }
00495 row->row = ++buf->received;
00496 ++resinfo->ref_count;
00497 row->resinfo = resinfo;
00498 row->row_data = NULL;
00499 if (row->sizes)
00500 free(row->sizes);
00501 row->sizes = (TDS_INT *) calloc(resinfo->num_cols, sizeof(TDS_INT));
00502 for (i = 0; i < resinfo->num_cols; ++i)
00503 row->sizes[i] = resinfo->columns[i]->column_cur_size;
00504
00505
00506 if (buf->tail == buf->capacity) {
00507
00508 assert(buf->head == 0);
00509 buf->tail = 0;
00510 }
00511
00512
00513 buf->current = buf->head;
00514 buf->head = buffer_idx_increment(buf, buf->head);
00515
00516 return buf->current;
00517 }
00518
00519 static int
00520 buffer_save_row(DBPROCESS *dbproc)
00521 {
00522 DBPROC_ROWBUF *buf = &dbproc->row_buf;
00523 DBLIB_BUFFER_ROW *row;
00524 int idx = buf->head - 1;
00525
00526 if (buf->capacity <= 1)
00527 return SUCCEED;
00528
00529 if (idx < 0)
00530 idx = buf->capacity - 1;
00531 if (idx >= 0 && idx < buf->capacity) {
00532 row = &buf->rows[idx];
00533
00534 if (row->resinfo && !row->row_data) {
00535 row->row_data = row->resinfo->current_row;
00536 tds_alloc_row(row->resinfo);
00537 }
00538 }
00539
00540 return SUCCEED;
00541 }
00542