当前位置:   article > 正文

向量数据库Milvus-sdk-java整合Spring-Boot, 并整合apache-commons-pool2实现连接池管理

milvus-sdk-java

前言

Milvus官方提供的milvus-sdk-java, 未能提供连接池管理能力, 故笔者借助apache-commons-pool2实现了基本的milvus连接池, 且将之与spring-boot整合, 方便嵌入开发流程, 仅供大家参考

依赖版本

  • Milvus使用2.4.0, 截至目前(2024/05/17)是最新版本
  • Milvus-sdk-java使用2.4.1, sdk版本需按官方文档与milvus库对应
  • Spring-Boot 2.7.11
  • Apache-commons-pool2 2.12.0
  • 项目使用maven构建, 以下是部分相关pom
        <dependency>
            <groupId>io.milvus</groupId>
            <artifactId>milvus-sdk-java</artifactId>
            <version>2.4.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.12.0</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

实现代码及思路

一. 连接池实现代码

  • Milvus官方建立连接示例
//连接到 Milvus 服务器:通过 MilvusServiceClient 类连接到 Milvus 服务器。
ConnectParam connectParam = ConnectParam.newBuilder()
        .withHost("localhost")
        .withPort(19530)
        .withAuthorization("root","Milvus")
        .build();
RetryParam retryParam = RetryParam.newBuilder()
        .withMaxRetryTimes(3)
        .build();
MilvusClient milvusClient = new MilvusServiceClient(connectParam).withRetry(retryParam);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 接下来是通过连接池管理MilvusServiceClient代码
  • 实例工厂实现: MilvusConnectPoolFactory.java
@Slf4j
public class MilvusConnectPoolFactory implements PooledObjectFactory<MilvusClient> {

    private final String username;
    private final String password;
    private final String host;
    private final Integer port;

    public MilvusConnectPoolFactory(String username, String password, String host, Integer port) {
        this.username = username;
        this.password = password;
        this.host = host;
        this.port = port;
    }
    
    @Override
    public void activateObject(PooledObject<MilvusClient> p) throws Exception {
        log.info("每次获取MilvusClient实例时触发此方法");
    }

    @Override
    public void destroyObject(PooledObject<MilvusClient> p) throws Exception {
        log.info("注销MilvusClient实例时触发此方法, 可使用MilvusClient.close()关闭连接");
        p.getObject().close();
    }

    @Override
    public PooledObject<MilvusClient> makeObject() throws Exception {
        log.info("创建MilvusClient实例");
        try {
            //连接参数
            ConnectParam connectParam = ConnectParam.newBuilder()
                    .withHost(host)
                    .withPort(port)
                    .withAuthorization(username,password)
                    .build();
            //重试参数
            RetryParam retryParam = RetryParam.newBuilder()
                    .withMaxRetryTimes(3)
                    .build();
            MilvusClient milvusClient = new MilvusServiceClient(connectParam).withRetry(retryParam);
            milvusClient.setLogLevel(LogLevel.Error);
            return new DefaultPooledObject<>(milvusClient);
        } catch (Exception e) {
            throw new RuntimeException("无法创建Milvus数据库连接", e);
        }
    }

    @Override
    public void passivateObject(PooledObject<MilvusClient> p) throws Exception {
        log.info("归还MilvusClient实例时触发此方法");
    }

    @Override
    public boolean validateObject(PooledObject<MilvusClient> p) {
        log.info("判断MilvusClient实例状态, 借助MilvusClient.checkHealth()实现");
        R<CheckHealthResponse> health = p.getObject().checkHealth();
        if (health.getStatus() == R.Status.Success.getCode()) {
            return true;
        } else {
            log.error("连接状态异常, 异常信息: {}", health.getMessage());
            return false;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 连接池MilvusConnectPool.java 申明, 用于注册Bean
public class MilvusConnectPool extends GenericObjectPool<MilvusClient> {

    /**
     * Creates a new {@code GenericObjectPool} using defaults from
     * {@link GenericObjectPoolConfig}.
     *
     * @param factory The object factory to be used to create object instances
     *                used by this pool
     */
    public MilvusConnectPool(PooledObjectFactory<MilvusClient> factory) {
        super(factory);
    }

    /**
     * Creates a new {@code GenericObjectPool} using a specific
     * configuration.
     *
     * @param factory The object factory to be used to create object instances
     *                used by this pool
     * @param config  The configuration to use for this pool instance. The
     *                configuration is used by value. Subsequent changes to
     *                the configuration object will not be reflected in the
     *                pool.
     */
    public MilvusConnectPool(PooledObjectFactory<MilvusClient> factory, GenericObjectPoolConfig<MilvusClient> config) {
        super(factory, config);
    }

    /**
     * Creates a new {@code GenericObjectPool} that tracks and destroys
     * objects that are checked out, but never returned to the pool.
     *
     * @param factory         The object factory to be used to create object instances
     *                        used by this pool
     * @param config          The base pool configuration to use for this pool instance.
     *                        The configuration is used by value. Subsequent changes to
     *                        the configuration object will not be reflected in the
     *                        pool.
     * @param abandonedConfig Configuration for abandoned object identification
     *                        and removal.  The configuration is used by value.
     */
    public MilvusConnectPool(PooledObjectFactory<MilvusClient> factory, GenericObjectPoolConfig<MilvusClient> config, AbandonedConfig abandonedConfig) {
        super(factory, config, abandonedConfig);
    }

    /**
     * 获取MilvusClient实例
     */
    public MilvusClient getMilvusClient() throws Exception {
        return super.borrowObject();
    }
    
    /**
     * 归还MilvusClient实例
     */
    public void releaseMilvusClient(MilvusClient milvusClient) {
        if (milvusClient!= null) {
            super.returnObject(milvusClient);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 注册连接池实现: MilvusConnectPoolConfig.java
@Configuration
public class MilvusConnectPoolConfig {

    private static MilvusConnectPool pool;

    @Value("${spring.datasource.milvus-connect-pool.milvus.username}")
    private String username;

    @Value("${spring.datasource.milvus-connect-pool.milvus.password}")
    private String password;
   
    @Value("${spring.datasource.milvus-connect-pool.milvus.host}")
    private String host;

    @Value("${spring.datasource.milvus-connect-pool.milvus.port}")
    private Integer port;

    /** 最大空闲数 */
    @Value("${spring.datasource.milvus-connect-pool.max-idle}")
    private Integer maxIdle;

    /** 最小空闲数 */
    @Value("${spring.datasource.milvus-connect-pool.min-idle}")
    private Integer minIdle;

    /** 最大总数 */
    @Value("${spring.datasource.milvus-connect-pool.max-total}")
    private Integer maxTotal;

    @Bean("milvusConnectPool")
    public MilvusConnectPool milvusConnectPool(){
        // 配置连接池的参数
        GenericObjectPoolConfig<MilvusClient> config = new GenericObjectPoolConfig<>();
        config.setMaxTotal(maxTotal); // 设置连接池的最大连接数
        config.setMaxIdle(maxIdle); // 设置连接池的最大空闲连接数
        config.setMinIdle(minIdle); // 设置连接池的最小空闲连接数
        config.setMinEvictableIdleTime(Duration.ofMinutes(30));//逐出连接的最小空闲时间, 默认1800000毫秒(30分钟)
        config.setTimeBetweenEvictionRuns(Duration.ofMinutes(30));// 多久执行一次对象扫描,将无用的对象销毁,默认-1不扫描
        config.setTestOnBorrow(true);// 在获取对象的时候检查有效性, 默认false
        config.setTestOnReturn(false);// 在归还对象的时候检查有效性, 默认false
        config.setTestWhileIdle(false);// 在空闲时检查有效性, 默认false
        config.setMaxWait(Duration.ofSeconds(1));// 最大等待时间, 默认的值为-1,表示无限等待。
        config.setLifo(true);// 是否启用后进先出, 默认true
        config.setBlockWhenExhausted(true);// 连接耗尽时是否阻塞, false立即抛异常,true阻塞直到超时, 默认true
        config.setNumTestsPerEvictionRun(3);// 每次逐出检查时 逐出的最大数目 默认3
        //此处建议关闭jmx或是设置config.setJmxNameBase(), 因为默认注册的jmx会与项目可能已经存在的其他基于池类的实现bean冲突
        config.setJmxEnabled(false);

        // 创建连接工厂
        MilvusConnectPoolFactory factory = new MilvusConnectPoolFactory(username, password, host, port);

        // 初始化连接池
        pool = new MilvusConnectPool(factory, config);

        // 以最小空闲数量为初始连接数, 添加初始连接
        if(minIdle > 0){
            for (int i = 0; i < minIdle; i++) {
                try {
                    pool.addObject();
                }catch (Exception e){
                    log.error("添加初始连接失败");
                }

            }
        }
        return pool;
    }

    /**
     * 注销连接池
     */
    @PreDestroy
    public static void close() {
        if (pool != null) {
            pool.close();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • application.yml相关配置
spring:
  datasource:
    milvus-connect-pool:
      max-idle: 5
      min-idle: 2
      max-total: 10
      milvus:
        username: root
        password: Milvus
        host: 你的Milvus服务器地址
        port: 19530
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 获取MilvusClient示例
    /**
     * 获取MilvusClient示例, 一定记得归还连接
     */
    @GetMapping("/demo")
    public ResponseEntity<String> demo() {
        //申明
        MilvusClient milvusClient= null;
        try {
            //获取
            milvusClient = milvusConnectPool.getMilvusClient();
            //使用相关逻辑...
            return ResponseEntity.ok("获取成功");
        }catch (Exception e){
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("获取失败");
        }finally {
            //归还
            milvusConnectPool.releaseMilvusClient(milvusClient);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

二. milvus-sdk-java使用实例, 此处代码milvus-sdk-java版本相关, 基本复制自官方文档

    private static final String COLLECTION_NAME = "java_sdk_example_simple";
    private static final String ID_FIELD = "book_id";
    private static final String VECTOR_FIELD = "book_intro";
    private static final String TITLE_FIELD = "book_title";
    private static final Integer VECTOR_DIM = 4;
    
    public void demo() {
    	MilvusServiceClient milvusClient = null;
        try {
            //获取MilvusServiceClient
            milvusClient = milvusConnectPool.getMilvusClient();
            
            //定义字段:定义了集合中的字段,包括一个主键字段(ID_FIELD)、一个向量字段(VECTOR_FIELD)和一个 varchar 字段(TITLE_FIELD)。

            List<FieldType> fieldsSchema = Arrays.asList(
                    FieldType.newBuilder()
                            .withName(ID_FIELD)
                            .withDataType(DataType.Int64)
                            .withPrimaryKey(true)
                            .withAutoID(false)
                            .build(),
                    FieldType.newBuilder()
                            .withName(VECTOR_FIELD)
                            .withDataType(DataType.FloatVector)
                            .withDimension(VECTOR_DIM)
                            .build(),
                    FieldType.newBuilder()
                            .withName(TITLE_FIELD)
                            .withDataType(DataType.VarChar)
                            .withMaxLength(64)
                            .build()
            );
            
            //创建集合:使用 createCollection() 方法创建集合,指定集合名和字段类型。
            R<RpcStatus> ret = milvusClient.createCollection(CreateCollectionParam.newBuilder()
                    .withCollectionName(COLLECTION_NAME)
                    .withFieldTypes(fieldsSchema)
                    .build());
            if (ret.getStatus() != R.Status.Success.getCode()) {
                throw new RuntimeException("Failed to create collection! Error: " + ret.getMessage());
            }

            //创建向量字段的索引:使用 createIndex() 方法在向量字段上创建索引,这里使用了 FLAT 索引类型和 L2 距离度量。
            ret = milvusClient.createIndex(CreateIndexParam.newBuilder()
                    .withCollectionName(COLLECTION_NAME)
                    .withFieldName(VECTOR_FIELD)
                    .withIndexType(IndexType.FLAT)
                    .withMetricType(MetricType.L2)
                    .build());
            if (ret.getStatus() != R.Status.Success.getCode()) {
                throw new RuntimeException("Failed to create index on vector field! Error: " + ret.getMessage());
            }

            //创建 varchar 字段的索引:使用 createIndex() 方法在 varchar 字段上创建索引,这里使用了 TRIE 索引类型。
            ret = milvusClient.createIndex(CreateIndexParam.newBuilder()
                    .withCollectionName(COLLECTION_NAME)
                    .withFieldName(TITLE_FIELD)
                    .withIndexType(IndexType.TRIE)
                    .build());
            if (ret.getStatus() != R.Status.Success.getCode()) {
                throw new RuntimeException("Failed to create index on varchar field! Error: " + ret.getMessage());
            }

            //加载集合:调用 loadCollection() 方法加载集合,以便自动加载数据以供搜索。
            milvusClient.loadCollection(LoadCollectionParam.newBuilder()
                    .withCollectionName(COLLECTION_NAME)
                    .build());

            System.out.println("Collection created");

            //插入数据:向集合中插入了10条记录,每条记录包括一个唯一的 ID、一个向量和一个标题。
            List<JSONObject> rows = new ArrayList<>();
            for (long i = 1L; i <= 10; ++i) {
                JSONObject row = new JSONObject();
                row.put(ID_FIELD, i);
                List<Float> vector = Arrays.asList((float)i, (float)i, (float)i, (float)i);
                row.put(VECTOR_FIELD, vector);
                row.put(TITLE_FIELD, "Tom and Jerry " + i);
                rows.add(row);
            }

            R<MutationResult> insertRet = milvusClient.insert(InsertParam.newBuilder()
                    .withCollectionName(COLLECTION_NAME)
                    .withRows(rows)
                    .build());
            if (insertRet.getStatus() != R.Status.Success.getCode()) {
                throw new RuntimeException("Failed to insert! Error: " + insertRet.getMessage());
            }

            //刷新数据:调用 flush() 方法确保插入的记录被 Milvus 服务器消耗,以便立即进行搜索。
            // 在这个例子中只是一个特殊的动作。实际上,您不需要频繁调用flush()。
            milvusClient.flush(FlushParam.newBuilder()
                    .addCollectionName(COLLECTION_NAME)
                    .build());
            System.out.println("10 entities inserted");

            // 搜索数据:使用 search() 方法搜索与给定向量相似的前5条记录,并返回它们的标题。
            // 这个向量等于3号记录,我们认为3号记录是最相似的。
            List<Float> vector = Arrays.asList(3.0f, 3.0f, 3.0f, 3.0f);
            R<SearchResults> searchRet = milvusClient.search(SearchParam.newBuilder()
                    .withCollectionName(COLLECTION_NAME)
                    .withMetricType(MetricType.L2)
                    .withTopK(5)
                    .withFloatVectors(Arrays.asList(vector))
                    .withVectorFieldName(VECTOR_FIELD)
                    .withParams("{}")
                    .addOutField(TITLE_FIELD)
                    .build());
            if (searchRet.getStatus() != R.Status.Success.getCode()) {
                throw new RuntimeException("Failed to search! Error: " + searchRet.getMessage());
            }

            // 打印搜索结果:打印搜索结果中与给定向量最相似的记录的标题。这里只输入一个向量进行查找,得到0号向量的结果打印出来
            SearchResultsWrapper resultsWrapper = new SearchResultsWrapper(searchRet.getData().getResults());
            List<SearchResultsWrapper.IDScore> scores = resultsWrapper.getIDScore(0);
            System.out.println("The result of No.0 target vector:");
            for (SearchResultsWrapper.IDScore score:scores) {
                System.out.println(score);
            }

            // 删除集合:如果不再需要集合,使用 dropCollection() 方法删除集合。
            milvusClient.dropCollection(DropCollectionParam.newBuilder()
                    .withCollectionName(COLLECTION_NAME)
                    .build());
        }catch (Exception e){
			//异常信息
        }finally {
            //归还
            milvusConnectPool.releaseMilvusClient(milvusClient);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131

三. spring-boot-starter快速集成

  • 稍晚更新…

参考资料

  • https://milvus.io/api-reference/java/v2.4.x/About.md
  • https://github.com/milvus-io/milvus-sdk-java
  • https://blog.csdn.net/dndndnnffj/article/details/127352952
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/863364
推荐阅读
相关标签
  

闽ICP备14008679号