1
0

pqueue.c 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. #include <errno.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include <assert.h>
  5. #include <syslog.h>
  6. #include "pqueue.h"
  7. #ifdef DEBUG_PQUEUE
  8. #define DEBUG_ON 1
  9. #else
  10. #define DEBUG_ON 0
  11. #endif
  12. #define DEBUG_CMD(_a) if (DEBUG_ON) { _a }
  13. #define MIN_CAPACITY 128 /* min allocated buffer for a packet */
  14. static int pqueue_alloc (int seq, unsigned char *packet, int packlen, pqueue_t **new);
  15. int packet_timeout_usecs = DEFAULT_PACKET_TIMEOUT * 1000000;
  16. static pqueue_t *pq_head = NULL, *pq_tail = NULL;
  17. /* contains a list of free queue elements.*/
  18. static pqueue_t *pq_freelist_head = NULL;
  19. static int pqueue_alloc(int seq, unsigned char *packet, int packlen, pqueue_t **new) {
  20. pqueue_t *newent;
  21. DEBUG_CMD(syslog(LOG_DEBUG, "seq=%d, packlen=%d", seq, packlen););
  22. /* search the freelist for one that has sufficient space */
  23. if (pq_freelist_head) {
  24. for (newent = pq_freelist_head; newent; newent = newent->next) {
  25. if (newent->capacity >= packlen) {
  26. /* unlink from freelist */
  27. if (pq_freelist_head == newent)
  28. pq_freelist_head = newent->next;
  29. if (newent->prev)
  30. newent->prev->next = newent->next;
  31. if (newent->next)
  32. newent->next->prev = newent->prev;
  33. if (pq_freelist_head)
  34. pq_freelist_head->prev = NULL;
  35. break;
  36. } /* end if capacity >= packlen */
  37. } /* end for */
  38. /* nothing found? Take first and reallocate it */
  39. if (NULL == newent) {
  40. newent = pq_freelist_head;
  41. pq_freelist_head = pq_freelist_head->next;
  42. if (pq_freelist_head)
  43. pq_freelist_head->prev = NULL;
  44. DEBUG_CMD(syslog(LOG_DEBUG, "realloc capacity %d to %d",newent->capacity, packlen););
  45. newent->packet = (unsigned char *)realloc(newent->packet, packlen);
  46. if (!newent->packet) {
  47. syslog(LOG_WARNING, "error reallocating packet: %s", strerror(errno));
  48. return -1;
  49. }
  50. newent->capacity = packlen;
  51. }
  52. DEBUG_CMD(syslog(LOG_DEBUG, "Recycle entry from freelist. Capacity: %d", newent->capacity););
  53. } else {
  54. /* allocate a new one */
  55. newent = (pqueue_t *)malloc( sizeof(pqueue_t) );
  56. if (!newent) {
  57. syslog(LOG_WARNING, "error allocating newent: %s", strerror(errno));
  58. return -1;
  59. }
  60. newent->capacity = 0;
  61. DEBUG_CMD(syslog(LOG_DEBUG, "Alloc new queue entry"););
  62. }
  63. if ( ! newent->capacity ) {
  64. /* a new queue entry was allocated. Allocate the packet buffer */
  65. int size = packlen < MIN_CAPACITY ? MIN_CAPACITY : packlen;
  66. /* Allocate at least MIN_CAPACITY */
  67. DEBUG_CMD(syslog(LOG_DEBUG, "allocating for packet size %d", size););
  68. newent->packet = (unsigned char *)malloc(size);
  69. if (!newent->packet) {
  70. syslog(LOG_WARNING, "error allocating packet: %s", strerror(errno));
  71. return -1;
  72. }
  73. newent->capacity = size;
  74. } /* endif ! capacity */
  75. assert( newent->capacity >= packlen );
  76. /* store the contents into the buffer */
  77. memcpy(newent->packet, packet, packlen);
  78. newent->next = newent->prev = NULL;
  79. newent->seq = seq;
  80. newent->packlen = packlen;
  81. gettimeofday(&newent->expires, NULL);
  82. newent->expires.tv_usec += packet_timeout_usecs;
  83. newent->expires.tv_sec += (newent->expires.tv_usec / 1000000);
  84. newent->expires.tv_usec %= 1000000;
  85. *new = newent;
  86. return 0;
  87. }
  88. int pqueue_add (int seq, unsigned char *packet, int packlen) {
  89. pqueue_t *newent, *point;
  90. /* get a new entry */
  91. if ( 0 != pqueue_alloc(seq, packet, packlen, &newent) ) {
  92. return -1;
  93. }
  94. for (point = pq_head; point != NULL; point = point->next) {
  95. if (point->seq == seq) {
  96. // queue already contains this packet
  97. syslog(LOG_WARNING, "discarding duplicate packet %d", seq);
  98. pqueue_del(newent);
  99. return -1;
  100. }
  101. if (point->seq > seq) {
  102. // gone too far: point->seq > seq and point->prev->seq < seq
  103. if (point->prev) {
  104. // insert between point->prev and point
  105. DEBUG_CMD(syslog(LOG_DEBUG, "adding %d between %d and %d",
  106. seq, point->prev->seq, point->seq););
  107. point->prev->next = newent;
  108. } else {
  109. // insert at head of queue, before point
  110. DEBUG_CMD(syslog(LOG_DEBUG, "adding %d before %d", seq, point->seq););
  111. pq_head = newent;
  112. }
  113. newent->prev = point->prev; // will be NULL, at head of queue
  114. newent->next = point;
  115. point->prev = newent;
  116. return 0;
  117. }
  118. }
  119. /* We didn't find anywhere to insert the packet,
  120. * so there are no packets in the queue with higher sequences than this one,
  121. * so all the packets in the queue have lower sequences,
  122. * so this packet belongs at the end of the queue (which might be empty)
  123. */
  124. if (pq_head == NULL) {
  125. DEBUG_CMD(syslog(LOG_DEBUG, "adding %d to empty queue", seq););
  126. pq_head = newent;
  127. } else {
  128. DEBUG_CMD(syslog(LOG_DEBUG, "adding %d as tail, after %d", seq, pq_tail->seq););
  129. pq_tail->next = newent;
  130. }
  131. newent->prev = pq_tail;
  132. pq_tail = newent;
  133. return 0;
  134. }
  135. int pqueue_del (pqueue_t *point) {
  136. DEBUG_CMD(syslog(LOG_DEBUG, "Move seq %d to freelist", point->seq););
  137. /* unlink from pq */
  138. if (pq_head == point) pq_head = point->next;
  139. if (pq_tail == point) pq_tail = point->prev;
  140. if (point->prev) point->prev->next = point->next;
  141. if (point->next) point->next->prev = point->prev;
  142. /* add point to the freelist */
  143. point->next = pq_freelist_head;
  144. point->prev = NULL;
  145. if (point->next)
  146. point->next->prev = point;
  147. pq_freelist_head = point;
  148. DEBUG_CMD(
  149. int pq_count = 0;
  150. int pq_freelist_count = 0;
  151. pqueue_t *point;
  152. for ( point = pq_head; point ; point = point->next) {
  153. ++pq_count;
  154. }
  155. for ( point = pq_freelist_head; point ; point = point->next) {
  156. ++pq_freelist_count;
  157. }
  158. syslog(LOG_DEBUG, "queue length is %d, freelist length is %d", pq_count, pq_freelist_count);
  159. );
  160. return 0;
  161. }
  162. pqueue_t *pqueue_head () {
  163. return pq_head;
  164. }
  165. int pqueue_expiry_time (pqueue_t *entry) {
  166. struct timeval tv;
  167. int expiry_time;
  168. gettimeofday(&tv, NULL);
  169. expiry_time = (entry->expires.tv_sec - tv.tv_sec) * 1000000;
  170. expiry_time += (entry->expires.tv_usec - tv.tv_usec);
  171. return expiry_time;
  172. }