赞
踩
#0 0x00007f72020b0e87 in raise () from /opt/ASApplication/bin/./../lib/libc.so.6
#1 0x00007f72020b27f1 in abort () from /opt/ASApplication/bin/./../lib/libc.so.6
#2 0x00007f7203db4bcd in amqp_abort (fmt=0x7f7203dcb500 "Internal error: unable to determine timeout reason") at /work/ClipServer/depend/rabbitmq-c/librabbitmq/amqp_api.c:151
#3 0x00007f7203dc5711 in wait_frame_inner (state=0x7f71cc0049b0, decoded_frame=0x7f71d65428a0, timeout_deadline=...) at /work/ClipServer/depend/rabbitmq-c/librabbitmq/amqp_socket.c:797
#4 0x00007f7203dc5b65 in amqp_simple_wait_frame_noblock (state=0x7f71cc0049b0, decoded_frame=0x7f71d65428a0, timeout=0x0) at /work/ClipServer/depend/rabbitmq-c/librabbitmq/amqp_socket.c:930
#5 0x00007f7203db7217 in amqp_consume_message (state=0x7f71cc0049b0, envelope=0x7f71d6542960, timeout=0x0, flags=0) at /work/ClipServer/depend/rabbitmq-c/librabbitmq/amqp_consumer.c:122
#6 0x000056452272773b in MaterialClipConsume::funRun (this=0x564523ef5b40, conn=0x7f71cc0049b0) at /work/ClipServer.huawei_git/clipServer/src/as/src/ASApplication/materialClip/materialclipconsume.cpp:234
#7 0x00005645227274c0 in MaterialClipConsume::run (this=0x564523ef5b40) at /work/ClipServer.huawei_git/clipServer/src/as/src/ASApplication/materialClip/materialclipconsume.cpp:208
#8 0x00007f72036b9503 in QThreadPrivate::start(void*) () from /opt/ASApplication/bin/./../lib/libQt5Core.so.5
#9 0x00007f722d18f6db in start_thread (arg=0x7f71d654d700) at pthread_create.c:463
#10 0x00007f720219361f in clone () from /opt/ASApplication/bin/./../lib/libc.so.6
在mq中,amqp_connection_state_t_
作为通信句柄,所以要了解该结构内部
定义
struct amqp_connection_state_t_ {
amqp_pool_table_entry_t *pool_table[POOL_TABLE_SIZE];
amqp_connection_state_enum state;
int channel_max;
int frame_max;
/* Heartbeat interval in seconds. If this is <= 0, then heartbeats are not
* enabled, and next_recv_heartbeat and next_send_heartbeat are set to
* infinite */
int heartbeat;
amqp_time_t next_recv_heartbeat;
amqp_time_t next_send_heartbeat;
/* buffer for holding frame headers. Allows us to delay allocating
* the raw frame buffer until the type, channel, and size are all known
*/
char header_buffer[HEADER_SIZE + 1];
amqp_bytes_t inbound_buffer;
size_t inbound_offset;
size_t target_size;
amqp_bytes_t outbound_buffer;
amqp_socket_t *socket;
amqp_bytes_t sock_inbound_buffer;
size_t sock_inbound_offset;
size_t sock_inbound_limit;
amqp_link_t *first_queued_frame;
amqp_link_t *last_queued_frame;
amqp_rpc_reply_t most_recent_api_result;
amqp_table_t server_properties;
amqp_table_t client_properties;
amqp_pool_t properties_pool;
struct timeval *handshake_timeout;
struct timeval internal_handshake_timeout;
struct timeval *rpc_timeout;
struct timeval internal_rpc_timeout;
};
原型
amqp_rpc_reply_t amqp_consume_message(amqp_connection_state_t state,
amqp_envelope_t *envelope,
const struct timeval *timeout,
AMQP_UNUSED int flags)
实现
amqp_rpc_reply_t amqp_consume_message(amqp_connection_state_t state,
amqp_envelope_t *envelope,
const struct timeval *timeout,
AMQP_UNUSED int flags) {
int res;
amqp_frame_t frame;
amqp_basic_deliver_t *delivery_method;
amqp_rpc_reply_t ret;
memset(&ret, 0, sizeof(ret));
memset(envelope, 0, sizeof(*envelope));
//取出frame,计算超时时间
res = amqp_simple_wait_frame_noblock(state, &frame, timeout);
if (AMQP_STATUS_OK != res) {
ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
ret.library_error = res;
goto error_out1;
}
if (AMQP_FRAME_METHOD != frame.frame_type ||
AMQP_BASIC_DELIVER_METHOD != frame.payload.method.id) {
amqp_put_back_frame(state, &frame);
ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
ret.library_error = AMQP_STATUS_UNEXPECTED_STATE;
goto error_out1;
}
delivery_method = frame.payload.method.decoded;
envelope->channel = frame.channel;
envelope->consumer_tag = amqp_bytes_malloc_dup(delivery_method->consumer_tag);
envelope->delivery_tag = delivery_method->delivery_tag;
envelope->redelivered = delivery_method->redelivered;
envelope->exchange = amqp_bytes_malloc_dup(delivery_method->exchange);
envelope->routing_key = amqp_bytes_malloc_dup(delivery_method->routing_key);
if (amqp_bytes_malloc_dup_failed(envelope->consumer_tag) ||
amqp_bytes_malloc_dup_failed(envelope->exchange) ||
amqp_bytes_malloc_dup_failed(envelope->routing_key)) {
ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
ret.library_error = AMQP_STATUS_NO_MEMORY;
goto error_out2;
}
ret = amqp_read_message(state, envelope->channel, &envelope->message, 0);
if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
goto error_out2;
}
ret.reply_type = AMQP_RESPONSE_NORMAL;
return ret;
error_out2:
amqp_bytes_free(envelope->routing_key);
amqp_bytes_free(envelope->exchange);
amqp_bytes_free(envelope->consumer_tag);
error_out1:
return ret;
}
原型
int amqp_simple_wait_frame_noblock(amqp_connection_state_t state,
amqp_frame_t *decoded_frame,
const struct timeval *timeout)
实现
int amqp_simple_wait_frame_noblock(amqp_connection_state_t state,
amqp_frame_t *decoded_frame,
const struct timeval *timeout) {
amqp_time_t deadline;
//timeout 超时时间
int res = amqp_time_from_now(&deadline, timeout);
//deadline为下次超时的时间点,通过timeout计算得来
if (AMQP_STATUS_OK != res) {
return res;
}
if (state->first_queued_frame != NULL) {
amqp_frame_t *f = (amqp_frame_t *)state->first_queued_frame->data;
state->first_queued_frame = state->first_queued_frame->next;
if (state->first_queued_frame == NULL) {
state->last_queued_frame = NULL;
}
*decoded_frame = *f;
return AMQP_STATUS_OK;
} else {
//当前 队列连接中,没有数据接收,进入此逻辑进行等待
return wait_frame_inner(state, decoded_frame, deadline);
}
}
原型
static int wait_frame_inner(amqp_connection_state_t state,
amqp_frame_t *decoded_frame,
amqp_time_t timeout_deadline)
实现
static int wait_frame_inner(amqp_connection_state_t state,
amqp_frame_t *decoded_frame,
amqp_time_t timeout_deadline) {
amqp_time_t deadline;
int res;
//死循环等待接收数据
for (;;) {
//amqp_data_in_buffer函数功能:检查缓冲区中是否有数据。如果返回1,将避免在amqp_simple_wait_frame中立即阻塞读取。
//依照报错来看,程序并没有进入while进行数据接收,所以这里为0,缓冲区没有数据
while (amqp_data_in_buffer(state)) {
res = consume_one_frame(state, decoded_frame);//取出一帧数据
if (AMQP_STATUS_OK != res) {
return res;
}
if (AMQP_FRAME_HEARTBEAT == decoded_frame->frame_type) {
amqp_maybe_release_buffers_on_channel(state, 0);
continue;
}
if (decoded_frame->frame_type != 0) {
/* Complete frame was read. Return it. */
return AMQP_STATUS_OK;
}
}
beginrecv:
res = amqp_time_has_past(state->next_send_heartbeat);
if (AMQP_STATUS_TIMER_FAILURE == res) {
return res;
} else if (AMQP_STATUS_TIMEOUT == res) {
amqp_frame_t heartbeat;
heartbeat.channel = 0;
heartbeat.frame_type = AMQP_FRAME_HEARTBEAT;
res = amqp_send_frame(state, &heartbeat);//发送一个心跳帧
if (AMQP_STATUS_OK != res) {
return res;
}
}
/*
amqp_time_first(state->next_recv_heartbeat,state->next_send_heartbeat)
接收心跳<发送心跳,返回接收心跳值,反之返回发送心跳值
amqp_time_first(timeout_deadline,...)
拿设定超时心跳值与上诉心跳值对比,设定心跳小于任何一个(发送或接收),返回设定心跳值
*/
//下面反推,所以这里deadline的值是timeout_deadline
deadline = amqp_time_first(timeout_deadline,
amqp_time_first(state->next_recv_heartbeat,
state->next_send_heartbeat));
/* TODO this needs to wait for a _frame_ and not anything written from the
* socket
TODO需要等待一个_frame_,而不是从套接字写入任何东西
*/
res = recv_with_timeout(state, deadline);//这里返回了AMQP_STATUS_TIMEOUT,进入了下面的if判断
if (AMQP_STATUS_TIMEOUT == res) {
if (amqp_time_equal(deadline, state->next_recv_heartbeat)) {
amqp_socket_close(state->socket, AMQP_SC_FORCE);
return AMQP_STATUS_HEARTBEAT_TIMEOUT;
} else if (amqp_time_equal(deadline, timeout_deadline)) {
return AMQP_STATUS_TIMEOUT;
} else if (amqp_time_equal(deadline, state->next_send_heartbeat)) {
/* send heartbeat happens before we do recv_with_timeout */
goto beginrecv;
} else {
//deadline的值与next_recv_heartbeat,timeout_deadline,next_send_heartbeat均不想等,退出
//所以这里的deadline值为多少是关键,需要返回去看amqp_time_first函数
amqp_abort("Internal error: unable to determine timeout reason");
}
} else if (AMQP_STATUS_OK != res) {
return res;
}
}
}
这表示一个单调时钟的时间点。
内部表示是ns,相对于单调时钟。
有两个“特殊”值:
- 0:表示“这一刻”,它的意思是投票0超时,或非阻塞选项
UINT64_MAX:表示“在无穷大”,它的平均值轮询有一个无限超时
原型
typedef struct amqp_time_t_ {
uint64_t time_point_ns;
} amqp_time_t;
功能:判断两个时间值是否相等
原型
int amqp_time_equal(amqp_time_t l, amqp_time_t r) {
return l.time_point_ns == r.time_point_ns;
}
原型
int amqp_time_from_now(amqp_time_t *time, const struct timeval *timeout)
定义
int amqp_time_from_now(amqp_time_t *time, const struct timeval *timeout) {
uint64_t now_ns;
uint64_t delta_ns;
assert(NULL != time);
if (NULL == timeout) {
//值为uint最大值
*time = amqp_time_infinite();
return AMQP_STATUS_OK;
}
if (timeout->tv_sec < 0 || timeout->tv_usec < 0) {
return AMQP_STATUS_INVALID_PARAMETER;
}
//timeout 的ns
delta_ns = (uint64_t)timeout->tv_sec * AMQP_NS_PER_S +
(uint64_t)timeout->tv_usec * AMQP_NS_PER_US;
//获取从系统启动这一刻起开始计时,不受系统时间被用户改变的影响
now_ns = amqp_get_monotonic_timestamp();
if (0 == now_ns) {
return AMQP_STATUS_TIMER_FAILURE;
}
//当前的时间+超时时间=下次发送的时间
time->time_point_ns = now_ns + delta_ns;
if (now_ns > time->time_point_ns || delta_ns > time->time_point_ns) {
return AMQP_STATUS_INVALID_PARAMETER;
}
return AMQP_STATUS_OK;
}
功能:
函数原型
extern int clock_gettime (clockid_t __clock_id, struct timespec *__tp) __THROW;
参数
__clock_id : 时钟类型,posix标准定义了下面的四种基本类型,Linux系统有其他的扩充
CLOCK_REALTIME:系统实时时间,随系统实时时间改变而改变,即从UTC1970-1-1 0:0:0开始计时,中间时刻如果系统时间被用户改成其他,则对应的时间相应改变。
CLOCK_MONOTONIC:从系统启动这一刻起开始计时,不受系统时间被用户改变的影响
CLOCK_PROCESS_CPUTIME_ID:本进程到当前代码系统CPU花费的时间。需要注意是不是进程开始到当前代码的时间。
CLOCK_THREAD_CPUTIME_ID:本线程到当前代码系统CPU花费的时间。需要注意是不是线程开始到当前代码的时间。
原型
uint64_t amqp_get_monotonic_timestamp(void)
实现
#ifdef AMQP_POSIX_TIMER_API
#include <time.h>
uint64_t amqp_get_monotonic_timestamp(void) {
#ifdef __hpux
return (uint64_t)gethrtime();
#else
struct timespec tp;
if (-1 == clock_gettime(CLOCK_MONOTONIC, &tp)) {
return 0;
}
return ((uint64_t)tp.tv_sec * AMQP_NS_PER_S + (uint64_t)tp.tv_nsec);
#endif
}
#endif /* AMQP_POSIX_TIMER_API */
通过《MQ13-源码分析-心跳参数设置》分析,获取到了state->next_send_heartbeat,state->next_recv_heartbeat的值均为uint64。
timeout_deadline的值则是来自于amqp_simple_wait_frame_noblock函数,则需要分析如下函数
原型
int amqp_time_from_now(amqp_time_t *time, const struct timeval *timeout)
实现
int amqp_time_from_now(amqp_time_t *time, const struct timeval *timeout) {
uint64_t now_ns;
uint64_t delta_ns;
assert(NULL != time);
//timeout 传入NULL,进入if
if (NULL == timeout) {
//amqp_time_infinite 返回uint64
*time = amqp_time_infinite();
return AMQP_STATUS_OK;
}
if (timeout->tv_sec < 0 || timeout->tv_usec < 0) {
return AMQP_STATUS_INVALID_PARAMETER;
}
delta_ns = (uint64_t)timeout->tv_sec * AMQP_NS_PER_S +
(uint64_t)timeout->tv_usec * AMQP_NS_PER_US;
now_ns = amqp_get_monotonic_timestamp();
if (0 == now_ns) {
return AMQP_STATUS_TIMER_FAILURE;
}
time->time_point_ns = now_ns + delta_ns;
if (now_ns > time->time_point_ns || delta_ns > time->time_point_ns) {
return AMQP_STATUS_INVALID_PARAMETER;
}
return AMQP_STATUS_OK;
}
原型
amqp_time_t amqp_time_infinite(void)
实现
amqp_time_t amqp_time_infinite(void) {
amqp_time_t time;
time.time_point_ns = UINT64_MAX;
return time;
}
通过对amqp_time_from_now函数分析,可以得到deadline值为UINT64_MAX,那么deadline值与state->next_send_heartbeat,state->next_recv_heartbeat,timeout_deadline值均为UINT64_MAX,所以从逻辑上讲,是不可能走到amqp_abort("Internal error: unable to determine timeout reason");
的,所以还是要通过日志打印出具体值来进行查看分析
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。