当前位置:   article > 正文

MQ12-源码分析-amqp_consume_message函数_internal error: unable to determine timeout reason

internal error: unable to determine timeout reason

报错信息

#0  0x00007f72020b0e87 in raise () from /opt/ASApplication/bin/./../lib/libc.so.6
#1  0x00007f72020b27f1 in abort () from /opt/ASApplication/bin/./../lib/libc.so.6
#2  0x00007f7203db4bcd in amqp_abort (fmt=0x7f7203dcb500 "Internal error: unable to determine timeout reason") at /work/ClipServer/depend/rabbitmq-c/librabbitmq/amqp_api.c:151
#3  0x00007f7203dc5711 in wait_frame_inner (state=0x7f71cc0049b0, decoded_frame=0x7f71d65428a0, timeout_deadline=...) at /work/ClipServer/depend/rabbitmq-c/librabbitmq/amqp_socket.c:797
#4  0x00007f7203dc5b65 in amqp_simple_wait_frame_noblock (state=0x7f71cc0049b0, decoded_frame=0x7f71d65428a0, timeout=0x0) at /work/ClipServer/depend/rabbitmq-c/librabbitmq/amqp_socket.c:930
#5  0x00007f7203db7217 in amqp_consume_message (state=0x7f71cc0049b0, envelope=0x7f71d6542960, timeout=0x0, flags=0) at /work/ClipServer/depend/rabbitmq-c/librabbitmq/amqp_consumer.c:122
#6  0x000056452272773b in MaterialClipConsume::funRun (this=0x564523ef5b40, conn=0x7f71cc0049b0) at /work/ClipServer.huawei_git/clipServer/src/as/src/ASApplication/materialClip/materialclipconsume.cpp:234
#7  0x00005645227274c0 in MaterialClipConsume::run (this=0x564523ef5b40) at /work/ClipServer.huawei_git/clipServer/src/as/src/ASApplication/materialClip/materialclipconsume.cpp:208
#8  0x00007f72036b9503 in QThreadPrivate::start(void*) () from /opt/ASApplication/bin/./../lib/libQt5Core.so.5
#9  0x00007f722d18f6db in start_thread (arg=0x7f71d654d700) at pthread_create.c:463
#10 0x00007f720219361f in clone () from /opt/ASApplication/bin/./../lib/libc.so.6
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

mq中,amqp_connection_state_t_作为通信句柄,所以要了解该结构内部

amqp_connection_state_t_结构体

定义


struct amqp_connection_state_t_ {
  amqp_pool_table_entry_t *pool_table[POOL_TABLE_SIZE];

  amqp_connection_state_enum state;

  int channel_max;
  int frame_max;

  /* Heartbeat interval in seconds. If this is <= 0, then heartbeats are not
   * enabled, and next_recv_heartbeat and next_send_heartbeat are set to
   * infinite */
  int heartbeat;
  amqp_time_t next_recv_heartbeat;
  amqp_time_t next_send_heartbeat;

  /* buffer for holding frame headers.  Allows us to delay allocating
   * the raw frame buffer until the type, channel, and size are all known
   */
  char header_buffer[HEADER_SIZE + 1];
  amqp_bytes_t inbound_buffer;

  size_t inbound_offset;
  size_t target_size;

  amqp_bytes_t outbound_buffer;

  amqp_socket_t *socket;

  amqp_bytes_t sock_inbound_buffer;
  size_t sock_inbound_offset;
  size_t sock_inbound_limit;

  amqp_link_t *first_queued_frame;
  amqp_link_t *last_queued_frame;

  amqp_rpc_reply_t most_recent_api_result;

  amqp_table_t server_properties;
  amqp_table_t client_properties;
  amqp_pool_t properties_pool;

  struct timeval *handshake_timeout;
  struct timeval internal_handshake_timeout;
  struct timeval *rpc_timeout;
  struct timeval internal_rpc_timeout;
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

amqp_consume_message函数

原型

amqp_rpc_reply_t amqp_consume_message(amqp_connection_state_t state,
                                      amqp_envelope_t *envelope,
                                      const struct timeval *timeout,
                                      AMQP_UNUSED int flags)
  • 1
  • 2
  • 3
  • 4

实现


amqp_rpc_reply_t amqp_consume_message(amqp_connection_state_t state,
                                      amqp_envelope_t *envelope,
                                      const struct timeval *timeout,
                                      AMQP_UNUSED int flags) {
  int res;
  amqp_frame_t frame;
  amqp_basic_deliver_t *delivery_method;
  amqp_rpc_reply_t ret;

  memset(&ret, 0, sizeof(ret));
  memset(envelope, 0, sizeof(*envelope));
    
  //取出frame,计算超时时间
  res = amqp_simple_wait_frame_noblock(state, &frame, timeout);
  if (AMQP_STATUS_OK != res) {
    ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
    ret.library_error = res;
    goto error_out1;
  }

  if (AMQP_FRAME_METHOD != frame.frame_type ||
      AMQP_BASIC_DELIVER_METHOD != frame.payload.method.id) {
    amqp_put_back_frame(state, &frame);
    ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
    ret.library_error = AMQP_STATUS_UNEXPECTED_STATE;
    goto error_out1;
  }

  delivery_method = frame.payload.method.decoded;

  envelope->channel = frame.channel;
  envelope->consumer_tag = amqp_bytes_malloc_dup(delivery_method->consumer_tag);
  envelope->delivery_tag = delivery_method->delivery_tag;
  envelope->redelivered = delivery_method->redelivered;
  envelope->exchange = amqp_bytes_malloc_dup(delivery_method->exchange);
  envelope->routing_key = amqp_bytes_malloc_dup(delivery_method->routing_key);

  if (amqp_bytes_malloc_dup_failed(envelope->consumer_tag) ||
      amqp_bytes_malloc_dup_failed(envelope->exchange) ||
      amqp_bytes_malloc_dup_failed(envelope->routing_key)) {
    ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
    ret.library_error = AMQP_STATUS_NO_MEMORY;
    goto error_out2;
  }

  ret = amqp_read_message(state, envelope->channel, &envelope->message, 0);
  if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
    goto error_out2;
  }

  ret.reply_type = AMQP_RESPONSE_NORMAL;
  return ret;

error_out2:
  amqp_bytes_free(envelope->routing_key);
  amqp_bytes_free(envelope->exchange);
  amqp_bytes_free(envelope->consumer_tag);
error_out1:
  return ret;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

amqp_simple_wait_frame_noblock函数

原型

int amqp_simple_wait_frame_noblock(amqp_connection_state_t state,
                                   amqp_frame_t *decoded_frame,
                                   const struct timeval *timeout) 
  • 1
  • 2
  • 3

实现

int amqp_simple_wait_frame_noblock(amqp_connection_state_t state,
                                   amqp_frame_t *decoded_frame,
                                   const struct timeval *timeout) {
  amqp_time_t deadline;
  //timeout 超时时间
  int res = amqp_time_from_now(&deadline, timeout);
  //deadline为下次超时的时间点,通过timeout计算得来
  if (AMQP_STATUS_OK != res) {
    return res;
  }

  if (state->first_queued_frame != NULL) {
    amqp_frame_t *f = (amqp_frame_t *)state->first_queued_frame->data;
    state->first_queued_frame = state->first_queued_frame->next;
    if (state->first_queued_frame == NULL) {
      state->last_queued_frame = NULL;
    }
    *decoded_frame = *f;
    return AMQP_STATUS_OK;
  } else {
  //当前 队列连接中,没有数据接收,进入此逻辑进行等待
    return wait_frame_inner(state, decoded_frame, deadline);
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

wait_frame_inner函数

原型

static int wait_frame_inner(amqp_connection_state_t state,
                            amqp_frame_t *decoded_frame,
                            amqp_time_t timeout_deadline)
  • 1
  • 2
  • 3

实现


static int wait_frame_inner(amqp_connection_state_t state,
                            amqp_frame_t *decoded_frame,
                            amqp_time_t timeout_deadline) {
  amqp_time_t deadline;
  int res;

  //死循环等待接收数据
  for (;;) {
  //amqp_data_in_buffer函数功能:检查缓冲区中是否有数据。如果返回1,将避免在amqp_simple_wait_frame中立即阻塞读取。
  //依照报错来看,程序并没有进入while进行数据接收,所以这里为0,缓冲区没有数据
    while (amqp_data_in_buffer(state)) {
      res = consume_one_frame(state, decoded_frame);//取出一帧数据

      if (AMQP_STATUS_OK != res) {
        return res;
      }

      if (AMQP_FRAME_HEARTBEAT == decoded_frame->frame_type) {
        amqp_maybe_release_buffers_on_channel(state, 0);
        continue;
      }

      if (decoded_frame->frame_type != 0) {
        /* Complete frame was read. Return it. */
        return AMQP_STATUS_OK;
      }
    }

  beginrecv:
    res = amqp_time_has_past(state->next_send_heartbeat);
    if (AMQP_STATUS_TIMER_FAILURE == res) {
      return res;
    } else if (AMQP_STATUS_TIMEOUT == res) {
      amqp_frame_t heartbeat;
      heartbeat.channel = 0;
      heartbeat.frame_type = AMQP_FRAME_HEARTBEAT;

      res = amqp_send_frame(state, &heartbeat);//发送一个心跳帧
      if (AMQP_STATUS_OK != res) {
        return res;
      }
    }
    /*
    amqp_time_first(state->next_recv_heartbeat,state->next_send_heartbeat)
    接收心跳<发送心跳,返回接收心跳值,反之返回发送心跳值
    amqp_time_first(timeout_deadline,...)
    拿设定超时心跳值与上诉心跳值对比,设定心跳小于任何一个(发送或接收),返回设定心跳值
    */
    //下面反推,所以这里deadline的值是timeout_deadline
    deadline = amqp_time_first(timeout_deadline,
                               amqp_time_first(state->next_recv_heartbeat,
                                               state->next_send_heartbeat));

    /* TODO this needs to wait for a _frame_ and not anything written from the
     * socket 
     TODO需要等待一个_frame_,而不是从套接字写入任何东西
     */
    res = recv_with_timeout(state, deadline);//这里返回了AMQP_STATUS_TIMEOUT,进入了下面的if判断
    
    if (AMQP_STATUS_TIMEOUT == res) {
      if (amqp_time_equal(deadline, state->next_recv_heartbeat)) {
        amqp_socket_close(state->socket, AMQP_SC_FORCE);
        return AMQP_STATUS_HEARTBEAT_TIMEOUT;
      } else if (amqp_time_equal(deadline, timeout_deadline)) {
        return AMQP_STATUS_TIMEOUT;
      } else if (amqp_time_equal(deadline, state->next_send_heartbeat)) {
        /* send heartbeat happens before we do recv_with_timeout */
        goto beginrecv;
      } else {
      //deadline的值与next_recv_heartbeat,timeout_deadline,next_send_heartbeat均不想等,退出
      //所以这里的deadline值为多少是关键,需要返回去看amqp_time_first函数
        amqp_abort("Internal error: unable to determine timeout reason");
      }
    } else if (AMQP_STATUS_OK != res) {
      return res;
    }
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80

amqp_time_t_结构体定义

这表示一个单调时钟的时间点。

内部表示是ns,相对于单调时钟。

有两个“特殊”值:
- 0:表示“这一刻”,它的意思是投票0超时,或非阻塞选项
UINT64_MAX:表示“在无穷大”,它的平均值轮询有一个无限超时
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

原型

typedef struct amqp_time_t_ {
  uint64_t time_point_ns;
} amqp_time_t;
  • 1
  • 2
  • 3

amqp_time_equal函数

功能:判断两个时间值是否相等

原型

int amqp_time_equal(amqp_time_t l, amqp_time_t r) {
  return l.time_point_ns == r.time_point_ns;
}
  • 1
  • 2
  • 3

amqp_time_from_now函数

原型

int amqp_time_from_now(amqp_time_t *time, const struct timeval *timeout)
  • 1

定义

int amqp_time_from_now(amqp_time_t *time, const struct timeval *timeout) {
  uint64_t now_ns;
  uint64_t delta_ns;

  assert(NULL != time);

  if (NULL == timeout) {
    //值为uint最大值
    *time = amqp_time_infinite();
    return AMQP_STATUS_OK;
  }

  if (timeout->tv_sec < 0 || timeout->tv_usec < 0) {
    return AMQP_STATUS_INVALID_PARAMETER;
  }
  
  //timeout 的ns
  delta_ns = (uint64_t)timeout->tv_sec * AMQP_NS_PER_S +
             (uint64_t)timeout->tv_usec * AMQP_NS_PER_US;

  //获取从系统启动这一刻起开始计时,不受系统时间被用户改变的影响
  now_ns = amqp_get_monotonic_timestamp();
  if (0 == now_ns) {
    return AMQP_STATUS_TIMER_FAILURE;
  }

    //当前的时间+超时时间=下次发送的时间
  time->time_point_ns = now_ns + delta_ns;
  if (now_ns > time->time_point_ns || delta_ns > time->time_point_ns) {
    return AMQP_STATUS_INVALID_PARAMETER;
  }

  return AMQP_STATUS_OK;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

amqp_get_monotonic_timestamp函数

功能:

函数原型

extern int clock_gettime (clockid_t __clock_id, struct timespec *__tp) __THROW;

参数
__clock_id : 时钟类型,posix标准定义了下面的四种基本类型,Linux系统有其他的扩充
CLOCK_REALTIME:系统实时时间,随系统实时时间改变而改变,即从UTC1970-1-1 0:0:0开始计时,中间时刻如果系统时间被用户改成其他,则对应的时间相应改变。
CLOCK_MONOTONIC:从系统启动这一刻起开始计时,不受系统时间被用户改变的影响
CLOCK_PROCESS_CPUTIME_ID:本进程到当前代码系统CPU花费的时间。需要注意是不是进程开始到当前代码的时间。
CLOCK_THREAD_CPUTIME_ID:本线程到当前代码系统CPU花费的时间。需要注意是不是线程开始到当前代码的时间。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

原型

uint64_t amqp_get_monotonic_timestamp(void)
  • 1

实现

#ifdef AMQP_POSIX_TIMER_API
#include <time.h>

uint64_t amqp_get_monotonic_timestamp(void) {
#ifdef __hpux
  return (uint64_t)gethrtime();
#else
  struct timespec tp;
  if (-1 == clock_gettime(CLOCK_MONOTONIC, &tp)) {
    return 0;
  }

  return ((uint64_t)tp.tv_sec * AMQP_NS_PER_S + (uint64_t)tp.tv_nsec);
#endif
}
#endif /* AMQP_POSIX_TIMER_API */
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

分析

  1. 在函数wait_frame_inner中,关键的是deadline值与state->next_send_heartbeat,state->next_recv_heartbeat,timeout_deadline值得对比,deadline与任何一个值都不相等的话,则崩溃,问题就出在这里
  2. 所以,在recv_with_timeout函数前后,打印出deadline的值得变化,用来分析

通过《MQ13-源码分析-心跳参数设置》分析,获取到了state->next_send_heartbeat,state->next_recv_heartbeat的值均为uint64。

timeout_deadline的值则是来自于amqp_simple_wait_frame_noblock函数,则需要分析如下函数

amqp_time_from_now函数

原型

int amqp_time_from_now(amqp_time_t *time, const struct timeval *timeout)
  • 1

实现

int amqp_time_from_now(amqp_time_t *time, const struct timeval *timeout) {
  uint64_t now_ns;
  uint64_t delta_ns;

  assert(NULL != time);
//timeout 传入NULL,进入if
  if (NULL == timeout) {
  //amqp_time_infinite 返回uint64
    *time = amqp_time_infinite();
    return AMQP_STATUS_OK;
  }

  if (timeout->tv_sec < 0 || timeout->tv_usec < 0) {
    return AMQP_STATUS_INVALID_PARAMETER;
  }

  delta_ns = (uint64_t)timeout->tv_sec * AMQP_NS_PER_S +
             (uint64_t)timeout->tv_usec * AMQP_NS_PER_US;

  now_ns = amqp_get_monotonic_timestamp();
  if (0 == now_ns) {
    return AMQP_STATUS_TIMER_FAILURE;
  }

  time->time_point_ns = now_ns + delta_ns;
  if (now_ns > time->time_point_ns || delta_ns > time->time_point_ns) {
    return AMQP_STATUS_INVALID_PARAMETER;
  }

  return AMQP_STATUS_OK;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

amqp_time_infinite函数

原型

amqp_time_t amqp_time_infinite(void)
  • 1

实现

amqp_time_t amqp_time_infinite(void) {
  amqp_time_t time;
  time.time_point_ns = UINT64_MAX;
  return time;
}
  • 1
  • 2
  • 3
  • 4
  • 5

分析

通过对amqp_time_from_now函数分析,可以得到deadline值为UINT64_MAX,那么deadline值与state->next_send_heartbeat,state->next_recv_heartbeat,timeout_deadline值均为UINT64_MAX,所以从逻辑上讲,是不可能走到amqp_abort("Internal error: unable to determine timeout reason");的,所以还是要通过日志打印出具体值来进行查看分析

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/994114?site
推荐阅读
相关标签
  

闽ICP备14008679号