当前位置:   article > 正文

envoy 代码阅读记录 - - 过滤器_envoy过滤器

envoy过滤器

本文基于 envoy 1.21.0 (latest)
1 过滤器介绍
在这里插入图片描述

从 socket 中收取的请求先经过 listener_filters 处理,然后再由 filter_chains 处理,前者包含的 filter 称为 listener filter,后者包含的 filter 称为 network filter。因为 listener_filters 先起作用,因此它可以修改请求的信息,从而影响 filter_chains 的匹配。
1.1 过滤器简述

  • 监听器过滤器
    可以在加入 listener 的 listener_filter 字段中的 listener filter。 这些 filter 的主要作用是检测协议、解析协议,通过它们解析出的信息被用于匹配 filter_chains 中的 filter。
    例如:
    envoy.listener.http_inspector:判断应用层的数据是否使用 HTTP 协议,如果是,续继判断 HTTP 协议的版本号(HTTP 1.0、HTTP 1.1、HTTP 2)。
    envoy.listener.original_src:用于透明代理,让 uptream 看到的是请求端的 IP,双方均感知不到 envoy 的存在。
    envoy.listener.original_dst: 用来读取 socket 的配置项 SO_ORIGINAL_DST,用该 filter 获取报文的原始目地地址。
    envoy.listener.proxy_protocol:解析代理协议,用该 filter 可以解析出真实的源 IP。
    envoy.listener.tls_inspector: 用来判断是否使用 TLS 协议,如果是 TLS 协议,解析出 Server Name、Negotiation 信息,解析出来的信息用于 FilterChain 的匹配。

  • network 过滤器
    操作原始字节。允许混合不同的过滤器组合,并匹配和附加到给定的监听器。
    网络过滤器在一个有序的列表中被链起来(过滤器栈),称为过滤器链。每个监听器都有多个过滤器链和一个可选默认过滤器链。如果找不到最佳匹配的过滤链,将选择默认的过滤链来处理请求。如果没有提供默认的过滤链,连接将被关闭。
    参考:https://www.envoyproxy.io/docs/envoy/latest/configuration/listeners/network_filters/network_filters
    例如:
    envoy.filters.network.mysql_proxy:能够解析 mysql 的通信协议,需要和 [TCP proxy][] 一起使用。
    配置文件
    filter_chains:

  • filters:
    • name: envoy.filters.network.mysql_proxy
      typed_config:
      “@type”: type.googleapis.com/envoy.config.filter.network.mysql_proxy.v1alpha1.MySQLProxy
      stat_prefix: mysql
    • name: envoy.tcp_proxy
      typed_config:
      “@type”: type.googleapis.com/envoy.config.filter.network.tcp_proxy.v2.TcpProxy
      stat_prefix: tcp
      cluster: …

envoy.http_connection_manager:专门处理 http 协议的 network filter,内部又实现了 http filter。因为 http 最经常使用的协议,对代理功能需求也非常多样, HTTP connection manager 本身是一个比较复杂的 network filter,在 envoy 文档中被单独列出:HTTP connection 。 proto message 字段参考 连接管理器.proto 。

  • http 过滤器
    在连接管理器中支持HTTP级别的过滤器栈。
    envoy.filters.http.router :路由器过滤器,执行高级路由任务。参考
    HTTP 转发是用路由过滤器实现的。该过滤器的主要职能就是执行路由表中的指令。在重定向和转发这两个主要任务之外,路由过滤器还需要处理重试、统计之类的任务。

1.2 network filter chain 配置
说明:envoy 中每一个资源对象的参数配置都是通过 protobuf 来定义的。
监听器配置

过滤链配置定义,这里指 网络过滤器链,
listener.FilterChain
{
“filter_chain_match”: “{…}”,
“tls_context”: “{…}”,
“filters”: [],
“use_proxy_proto”: “{…}”,
“transport_socket”: “{…}”
}
过滤器链包裹着一组匹配判据,一个选项TLS上下文,一组过滤器,以及其他各种参数。
具体字段的说明:
在这里插入图片描述

FilterChainMatch 配置定义
指定为 Listener 选择特定过滤链的匹配判据。
为了选择一个过滤链,它的所有判据必须被传入的连接满足,其属性由网络堆栈和/或监听器过滤器设置。
以下顺序适用:

  • 目的地端口。
  • 目的地IP地址。
  • 服务器名称(例如:TLS协议的SNI)。
  • 传输协议。
  • 应用协议(例如:TLS协议的ALPN)。
  • 直接连接的源IP地址(这只有在使用覆盖源地址的监听器过滤器时才会与源IP地址不同,如代理协议监听器过滤器)。
  • 源类型(例如,任何,本地或外部网络)。
  • 源IP地址。
  • 源端口
    proto message中各个字段说明
    字段
    格式
    说明
    destination_port
    google.protobuf.UInt32Value
    当监听器上设置use_original_dst时,在确定过滤链匹配时要考虑的可选目标端口。
    prefix_ranges
    core.v3.CidrRange[]
    如果非空,则是一个IP地址和前缀长度,以便在监听器被绑定到0.0.0.0/::或指定use_original_dst时匹配地址。
    direct_source_prefix_ranges
    core.v3.CidrRange[]
    如果下游连接的直接连接源IP地址包含在至少一个指定的子网中,则满足该条件。如果没有指定参数或者列表为空,直接连接的源IP地址将被忽略。
    source_type
    ConnectionSourceType
    指定连接源IP匹配类型。可以是任何,本地或外部网络。
    source_prefix_ranges
    core.v3.CidrRange[]
    如果下游连接的源IP地址包含在至少一个指定的子网中,则满足该条件。如果没有指定参数或列表为空,则源IP地址被忽略。
    source_ports
    uint32[]
    如果下游连接的源端口包含在至少一个指定的端口中,则满足该条件。如果没有指定该参数,源端口将被忽略。
    server_names
    string[]
    如果非空,则是一个服务器名称的列表(例如TLS协议的SNI),在确定过滤器链匹配时要考虑。这些值将与新连接的服务器名称进行比较,当被一个监听器过滤器检测到时。

服务器名称将与所有通配符域名进行匹配,即www.example.com,然后是*.example.com,然后是*.com。

注意,不支持部分通配符,像*w.example.com这样的值是无效的。
transport_protocol
string
如果非空,在确定过滤器链匹配时要考虑的传输协议。这个值将与新连接的传输协议进行比较,当它被一个监听器过滤器检测到时。

建议的值包括:

  • raw_buffer - 默认值,在没有检测到传输协议时使用。

  • tls - 当检测到TLS协议时由envoy.filters.listener.tls_inspector设置。
    application_protocols
    string[]
    如果非空,则是一个应用协议的列表(例如,TLS协议的ALPN),在确定过滤器链的匹配时要考虑。这些值将与新连接的应用协议进行比较,当被一个监听器过滤器检测到时。

建议的值包括。

  • http/1.1 - 由 envoy. filters.listener.tls_inspector 设置。

  • h2 - 由 envoy.filters.listener.tls_inspector 设置。

  1. listener 初始化 和启动 源码概览
    2.1 初始化中 解析配置文件的 listeners
    核心函数:InstanceImpl::initialize
  • 调用 MainImpl::initialize
    初始化listeners配置
    // source/server/server.cc envoy 初始化
    void InstanceImpl::initialize(Network::Address::InstanceConstSharedPtr local_address,
    ComponentFactory& component_factory) {
    //…
    config_.initialize(bootstrap_, *this, *cluster_manager_factory_);
    //…
    }
    // source/server/configuration_impl.cc 解析 yaml配置文件,初始化 listeners
    void MainImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
    Instance& server,
    Upstream::ClusterManagerFactory& cluster_manager_factory) {
    //…
    const auto& listeners = bootstrap.static_resources().listeners();
    ENVOY_LOG(info, “loading {} listener(s)”, listeners.size());
    for (ssize_t i = 0; i < listeners.size(); i++) {
    ENVOY_LOG(debug, “listener #{}:”, i);
    server.listenerManager().addOrUpdateListener(listeners[i], “”, false);
    }
    //…
    }

// source/server/listener_manager_impl.cc 创建 ListenerImpl,ListenerImpl 做 bind 操作
bool ListenerManagerImpl::addOrUpdateListener(const envoy::config::listener::v3::Listener& config,
const std::string& version_info, bool added_via_api) {
//…
return addOrUpdateListenerInternal(config, version_info, added_via_api, name);
//…
}
bool ListenerManagerImpl::addOrUpdateListenerInternal(
const envoy::config::listener::v3::Listener& config, const std::string& version_info,
bool added_via_api, const std::string& name) {
//…
ListenerImplPtr new_listener = nullptr;
//…
ENVOY_LOG(debug, “use full listener update path for listener name={} hash={}”, name, hash);
new_listener =
std::make_unique(config, version_info, *this, name, added_via_api,
workers_started_, hash, server_.options().concurrency());
//…
}
主要目的:解析配置文件中的 listeners , 以构造对应的 ListenerImpl 。

  • 配置文件举例
    envoy配置文件
    static_resources:
    listeners:
    • address:
      socket_address:
      address: 0.0.0.0
      port_value: 8080
      filter_chains:
      • filters:
        • name: envoy.filters.network.http_connection_manager
          typed_config:
          “@type”: type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
          codec_type: AUTO
          stat_prefix: ingress_http
          route_config:
          name: local_route
          virtual_hosts:
          - name: backend
          domains:
          - “*”
          routes:
          - match:
          prefix: “/service/1”
          route:
          cluster: service1
          - match:
          prefix: “/service/2”
          route:
          cluster: service2
          http_filters:
          • name: envoy.filters.http.router
  • proto 定义message(.proto文件 → .pb.h 和 .pb.cc文件, 将 message 转成 class)
    proto中的message定义
    message Bootstrap {
    message StaticResources {
    repeated listener.v3.Listener listeners = 1;
    repeated cluster.v3.Cluster clusters = 2;
    repeated envoy.extensions.transport_sockets.tls.v3.Secret secrets = 3;
    }
    StaticResources static_resources = 2;
    // …
    }

message Listener {
string name = 1;
core.v3.Address address = 2 [(validate.rules).message = {required: true}];
repeated FilterChain filter_chains = 3;
FilterChain default_filter_chain = 25;
repeated ListenerFilter listener_filters = 9;
// …
}
参考 https://cloudnative.to/envoy/api-v3/config/listener/v3/listener.proto.html
各种过滤器:https://cloudnative.to/envoy/api-v3/config/filter/filter.html#
HttpConnectionManager参考https://cloudnative.to/envoy/api-v3/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto.html

  • ListenerImpl 构造函数
    (1)调用 createListenerFilterFactories 来获取 listener_filters ,赋值给 其成员变量 listener_filter_factories_
    解析 listener_fileters 配置
    void ListenerImpl::createListenerFilterFactories(Network::Socket::Type socket_type) {
    if (!config_.listener_filters().empty()) {
    switch (socket_type) {
    case Network::Socket::Type::Datagram:
    udp_listener_filter_factories_ = parent_.factory_.createUdpListenerFilterFactoryList(
    config_.listener_filters(), *listener_factory_context_);
    break;
    case Network::Socket::Type::Stream:
    // 注意下面这一行, 这里的 config_ 是 const envoy::config::listener::v3::Listener
    listener_filter_factories_ = parent_.factory_.createListenerFilterFactoryList(
    config_.listener_filters(), *listener_factory_context_);
    break;
    default:
    NOT_REACHED_GCOVR_EXCL_LINE;
    }
    }
    }
    // createListenerFilterFactoryList 调用下面这个函数
    std::vectorNetwork::ListenerFilterFactoryCb
    ProdListenerComponentFactory::createListenerFilterFactoryList_(
    const Protobuf::RepeatedPtrFieldenvoy::config::listener::v3::ListenerFilter& filters,
    Configuration::ListenerFactoryContext& context) {
    std::vectorNetwork::ListenerFilterFactoryCb ret;
    for (ssize_t i = 0; i < filters.size(); i++) {
    const auto& proto_config = filters[i];
    ENVOY_LOG(debug, " filter #{}:“, i);
    ENVOY_LOG(debug, " name: {}”, proto_config.name());
    ENVOY_LOG(debug, " config: {}",
    MessageUtil::getJsonStringFromMessageOrError(
    static_cast<const Protobuf::Message&>(proto_config.typed_config())));
    // Now see if there is a factory that will accept the config.
    auto& factory =
    Config::Utility::getAndCheckFactoryConfiguration::NamedListenerFilterConfigFactory(
    proto_config);
    auto message = Config::Utility::translateToFactoryConfig(
    proto_config, context.messageValidationVisitor(), factory);
    ret.push_back(factory.createListenerFilterFactoryFromProto(
    *message, createListenerFilterMatcher(proto_config), context));
    }
    return ret;
    }
    (2)调用了buildFilterChains 来获取 filter_chains。
    filter_chain_manager_.addFilterChains 中 检查是否有默认过滤链, 已经把所有过滤链及其匹配条件 filter_chain_match 保存到fc_contexts_。
    FilterChainManagerImpl::addFilterChains 如下
    解析网络过滤链 filter_chains 配置
    void ListenerImpl::buildFilterChains() {
    transport_factory_context_->setInitManager(*dynamic_init_manager_);
    ListenerFilterChainFactoryBuilder builder(*this, *transport_factory_context_);
    filter_chain_manager_.addFilterChains(
    config_.filter_chains(),
    config_.has_default_filter_chain() ? &config_.default_filter_chain() : nullptr, builder,
    filter_chain_manager_);
    }

void FilterChainManagerImpl::addFilterChains(
absl::Span<const envoy::config::listener::v3::FilterChain* const> filter_chain_span,
const envoy::config::listener::v3::FilterChain* default_filter_chain,
FilterChainFactoryBuilder& filter_chain_factory_builder,
FilterChainFactoryContextCreator& context_creator) {
Cleanup cleanup(this { origin_ = absl::nullopt; });
absl::node_hash_map<envoy::config::listener::v3::FilterChainMatch, std::string, MessageUtil,
MessageUtil>
filter_chains;
uint32_t new_filter_chain_size = 0;
for (const auto& filter_chain : filter_chain_span) {
const auto& filter_chain_match = filter_chain->filter_chain_match();
if (!filter_chain_match.address_suffix().empty() || filter_chain_match.has_suffix_len()) {
throw EnvoyException(fmt::format("error adding listener ‘{}’: filter chain ‘{}’ contains "
“unimplemented fields”,
address_->asString(), filter_chain->name()));
}
const auto& matching_iter = filter_chains.find(filter_chain_match);
if (matching_iter != filter_chains.end()) {
throw EnvoyException(fmt::format("error adding listener ‘{}’: filter chain ‘{}’ has "
“the same matching rules defined as ‘{}’”,
address_->asString(), filter_chain->name(),
matching_iter->second));
}
filter_chains.insert({filter_chain_match, filter_chain->name()});

auto createAddressVector = [](const auto& prefix_ranges) -> std::vector<std::string> {
  std::vector<std::string> ips;
  ips.reserve(prefix_ranges.size());
  for (const auto& ip : prefix_ranges) {
    const auto& cidr_range = Network::Address::CidrRange::create(ip);
    ips.push_back(cidr_range.asString());
  }
  return ips;
};

// Validate IP addresses.
std::vector<std::string> destination_ips =
    createAddressVector(filter_chain_match.prefix_ranges());
std::vector<std::string> source_ips =
    createAddressVector(filter_chain_match.source_prefix_ranges());
std::vector<std::string> direct_source_ips =
    createAddressVector(filter_chain_match.direct_source_prefix_ranges());

std::vector<std::string> server_names;
// Reject partial wildcards, we don't match on them.
for (const auto& server_name : filter_chain_match.server_names()) {
  if (server_name.find('*') != std::string::npos && !isWildcardServerName(server_name)) {
    throw EnvoyException(
        fmt::format("error adding listener '{}': partial wildcards are not supported in "
                    "\"server_names\"",
                    address_->asString()));
  }
  server_names.push_back(absl::AsciiStrToLower(server_name));
}

// Reuse created filter chain if possible.
// FilterChainManager maintains the lifetime of FilterChainFactoryContext
// ListenerImpl maintains the dependencies of FilterChainFactoryContext
auto filter_chain_impl = findExistingFilterChain(*filter_chain);
if (filter_chain_impl == nullptr) {
  filter_chain_impl =
      filter_chain_factory_builder.buildFilterChain(*filter_chain, context_creator);
  ++new_filter_chain_size;
}

addFilterChainForDestinationPorts(
    destination_ports_map_,
    PROTOBUF_GET_WRAPPED_OR_DEFAULT(filter_chain_match, destination_port, 0), destination_ips,
    server_names, filter_chain_match.transport_protocol(),
    filter_chain_match.application_protocols(), direct_source_ips,
    filter_chain_match.source_type(), source_ips, filter_chain_match.source_ports(),
    filter_chain_impl);

fc_contexts_[*filter_chain] = filter_chain_impl;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

}
convertIPsToTries();
copyOrRebuildDefaultFilterChain(default_filter_chain, filter_chain_factory_builder,
context_creator);
ENVOY_LOG(debug, “new fc_contexts has {} filter chains, including {} newly built”,
fc_contexts_.size(), new_filter_chain_size);
}

  • 另外初始化函数InstanceImpl::initialize 中也加载了配置文件中线程数的设置,并且
    woker的初始化
    void InstanceImpl::initialize(Network::Address::InstanceConstSharedPtr local_address,
    ComponentFactory& component_factory) {
    //…
    // Workers get created first so they register for thread local updates.
    listener_manager_ =
    std::make_unique(*this, listener_component_factory_, worker_factory_,
    bootstrap_.enable_dispatcher_stats(), quic_stat_names_);
    //…
    }
    // ListenerManagerImpl构造函数中初始化 woker 数组 wokers_
    ListenerManagerImpl::ListenerManagerImpl(Instance& server,
    ListenerComponentFactory& listener_factory,
    WorkerFactory& worker_factory,
    bool enable_dispatcher_stats,
    Quic::QuicStatNames& quic_stat_names)
    : server_(server), factory_(listener_factory),
    scope_(server.stats().createScope(“listener_manager.”)), stats_(generateStats(*scope_)),
    config_tracker_entry_(server.admin().getConfigTracker().add(
    “listeners”,
    [this](const Matchers::StringMatcher& name_matcher) {
    return dumpListenerConfigs(name_matcher);
    })),
    enable_dispatcher_stats_(enable_dispatcher_stats), quic_stat_names_(quic_stat_names) {
    for (uint32_t i = 0; i < server.options().concurrency(); i++) {
    workers_.emplace_back(
    worker_factory.createWorker(i, server.overloadManager(), absl::StrCat(“worker_”, i)));
    }
    }
    // 创建WorkerImpl实例时 先 构造ConnectionHandlerImpl, 并赋值给 WorkerImpl 的成员变量 handler_
    WorkerPtr ProdWorkerFactory::createWorker(uint32_t index, OverloadManager& overload_manager,
    const std::string& worker_name) {
    Event::DispatcherPtr dispatcher(
    api_.allocateDispatcher(worker_name, overload_manager.scaledTimerFactory()));
    auto conn_handler = std::make_unique(*dispatcher, index);
    return std::make_unique(tls_, hooks_, std::move(dispatcher), std::move(conn_handler),
    overload_manager, api_, stat_names_);
    }
    2.2 启动
    InstanceImpl::run()
    核心:调用 ListenerManagerImpl::startWorkers,线程绑定 listener 和 启动 woker。
    每个线程都绑定了所有的监听器,监听配置的端口,而没有任何分片。创建 socket ,注册事件循环,内核决定新到来的连接 分配给哪个woker线程。一旦 woker accept了一个连接,那么这个连接永远不会离开这个worker 。
    启动 woker
    void ListenerManagerImpl::startWorkers(GuardDog& guard_dog, std::function<void()> callback) {
    // …
    // 成员变量 active_listeners_ 在初始化调用 MainImpl::initialize函数中就被配置文件中的listeners 赋值了
    // 下面 两重循环将所有的 listener 绑定到所有的 woker 上。
    for (auto listener_it = active_listeners_.begin(); listener_it != active_listeners_.end()
    声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/907847
推荐阅读
相关标签