赞
踩
今天接到一个任务,线上的mongodb积累了大量的无用数据,导致宕机,现在对里面的数据进行批量删除。
其中库里面的一个log记录有2000w+条,他的存储字段比较少,格式如下:
{ "_id" : ObjectId("5ecb648b17bee8673ef09024"), "level" : 1, "pay" : 0, "rand" : 64090, "uid" : NumberLong(120196967) }
我们对这个表里面的删除记录就是将1年以上的数据进行清楚,根据ObjectId这个字段的生成规则,用如下的方法判断:
- Date date1 = new Date();
- System.out.println("开始统计要删除的时间 :" + date1);
- DBCursor dbCursor = collection.find();
- int count = 0;
- while(dbCursor.hasNext()) {
- DBObject object = dbCursor.next();
- String id = object.get("_id").toString();
- String time16Str = id.substring(0, 8);
- long createTime = str16To10(time16Str);
- if(createTime < lastTime) {
- count++;
- }
- }
- Date date2 = new Date();
- System.out.println("结束统计要删除的时间 :" + date2);
- System.out.println("统计花费的时间 :" + (date2.getTime()-date1.getTime()) + "ms");
- System.out.println("dbCursor count :" + dbCursor.count());
- System.out.println("need del count :" + count);

统计运行的时间如下,从下图中可以看出2000+的统计时间大约1分钟不到,这个还是还是可以接收的:
- 开始统计要删除的时间 :Tue May 26 18:38:07 CST 2020
- 结束统计要删除的时间 :Tue May 26 18:38:50 CST 2020
- 统计花费的时间 :43079ms
- dbCursor count :21595385
- need del count :16398864
然后在if中添加删除的两行代码:
- if(createTime < lastTime) {
- DBObject query = new BasicDBObject().append("_id", new ObjectId(id));
- WriteResult result = collection.remove(query); //1
- // collection.findAndModify(query, null, null, true, null, false, false); //2
- }
上面两行代码的执行删除的时间截然不同,remove5分59s删除了36956,而findAndModify5分3s删除了127090,如果是要删除16398864条数据的话,remove大约需要44h,而findAndModify需要10h。
针对这种批量删除,设计的方案如下:
- public Long clearUserRecommandOldData() {
- // 根据objectId清理两年之前的老数据
- Calendar calendar = Calendar.getInstance();
- calendar.add(Calendar.YEAR, -2);
- final long lastTime = calendar.getTime().getTime();
- final OpLogService service = this.opLogService;
-
- Long result = new MongoExecutor<Long>() {
- @Override
- protected Long doInMongo(MongoClient client) {
- ExecutorService executorService = Executors.newFixedThreadPool(2);
-
- DB database = client.getDB(ConfigManager.get(GeoMongodbConfig.class).getServer().getDbName());
- DBCollection collection = database.getCollection(COLLECTION_NAME);
-
- DBObject fields = new BasicDBObject();
- fields.put("_id" , true);
- // 游标每次从库里面一次拿出5w条数据,避免出现SocketTimeOut异常
- DBCursor dbCursor = collection.find(new BasicDBObject(), fields).batchSize(CloverConstants.DBCUSTOR_BATCH_SIZE);
- long count = 0;
- List<Object> objectIds = new ArrayList<>();
-
- Date date1 = new Date();
- logger.info("user_recommand开始统计, date:" + date1 + ", dbCursor count:" + dbCursor.count());
-
- while(dbCursor.hasNext()) {
- DBObject object = dbCursor.next();
- String id = object.get("_id").toString();
- String time16Str = id.substring(0, 8); // 获取objectId中的16进制的创建时间
- long createTime = Long.valueOf(time16Str, 16) * 1000;
- if (createTime < lastTime) {
- count++;
- objectIds.add(object.get("_id"));
-
- // 每次取出100条数据就另外开启一个线程来执行删除任务
- if(count % CloverConstants.DBCUSTOR_BATCH_SIZE == 0) {
- final List<Object> delObjectIds = new ArrayList<>(objectIds);
- objectIds.clear();
- threadPoolHandler(executorService, delObjectIds);
- }
- }
- }
- dbCursor.close();
- Date date2 = new Date();
- logger.info("user_recommand结束统计, date:" + date2 + ", cost time:" + (date2.getTime()-date1.getTime()) + "ms");
- threadPoolHandler(executorService, objectIds);
- return count;
- }
- }.execute();
- if(result == null) {
- result = 0l;
- }
- return result;
- }
-
- private void threadPoolHandler(final ExecutorService executorService, final List<Object> objects) {
- new MongoExecutor<Long>() {
- @Override
- protected Long doInMongo(MongoClient client) {
- DB database = client.getDB(ConfigManager.get(GeoMongodbConfig.class).getServer().getDbName());
- final DBCollection collection = database.getCollection(COLLECTION_NAME);
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- Date date1 = new Date();
- logger.info("user_recommand开始删除, date:" + date1 + ", count:" + objects.size());
- for(Object objectId : objects) {
- DBObject query = new BasicDBObject().append("_id", objectId);
- // 这个删除方法效率是remove的至少10倍
- collection.findAndModify(query, null, null, true, null, false, false);
- }
- Date date2 = new Date();
- logger.info("user_recommand结束删除, date:" + date2 + ", cost time:" + (date2.getTime()-date1.getTime()) + "ms");
- }
- });
- return 0l;
- }
- }.execute();
- }

这个方案的重要一点这这一行代码:
DBCursor dbCursor = collection.find(newBasicDBObject(), fields).batchSize(CloverConstants.DBCUSTOR_BATCH_SIZE);
find()中的两个参数,第一个表示查询条件,我这里是什么查询条件也没有,第二个表示查询返回的字段,这里一定要写,如果你只需要几个查询字段,一定要带上这个,上面的一条log记录它的json结构很小,如果你的json结构很大的话,每次查询返回一个很大的DBObject,会造成系统频繁的gc。
batchSize()中是这个数字我写的是100,表示游标每次从数据库一次拿100条数据到内存,这个值,不能设置的太大,也不要设置太小。
batchSize设置太大,会出现SocketTimeOut超时问题,设置太小,游标每次从数据库拿一条效率也很低。
现在的mongodb启动时设置的SocketTimeOut时间时5s,batchSize时100,这个也是测试设置了好几种才决定的。
代码里面还有一个很重要的点,就是下面这行:
// 每次取出100条数据就另外开启一个线程来执行删除任务
if(count % CloverConstants.DBCUSTOR_BATCH_SIZE == 0) {
final List<Object> delObjectIds = new ArrayList<>(objectIds);
objectIds.clear();
threadPoolHandler(executorService, delObjectIds);
}
每收集100条可删除的数据,然后就放到线程池里面去执行,线程池初始化设置了2个,
当前条件改成删除两年前的老数据,库里面一共时2100+的数据,删除数据有500+万条数据,耗时15min左右就删干净了,看了一下数据库里面增加的不是很频繁,在系统里面添加了一个定时器,每两个星期清理一次Mongodb中的数据。
补充:
上面这个方案弊端是一边遍历一边删除,dbCustor的游标一直在不停的变化,很可能在删除的时候会出现超时的异常,具体如下图:
上面这个就是某些数据删除过慢,在日志中打印出来的,之前用remove()做删除这个时间消耗的有2s-20s左右的都有。但是如果删除不是那么频繁的话就不会出现这个问题。
我们项目中需求是定时清理过期的数据,这张表现在有2100w+的记录,删除当前时间两年前的数据大约有500w+条,线上一个星期大约增长了10w+条数据,之前在本次测试23分钟大约删除了18w条,现在我做的删除具体代码如下:
- public Long clearUserRecommandOldData(final boolean auto) {
- final TwoTuple<Long, Long> executeTime = opLogService.getExecuteTime(COLLECTION_NAME, auto);
- // 判断当前的任务是否在可执行的指定时间片中
- if (executeTime.getFirst() == 0l || executeTime.getSecond() == 0l) {
- return 0l;
- }
- long now = System.currentTimeMillis();
- if (now < executeTime.getFirst() || now > executeTime.getSecond()) {
- return 0l;
- }
-
- // 根据objectId清理两年之前的老数据
- Calendar calendar = Calendar.getInstance();
- calendar.add(Calendar.YEAR, -2);
- final long twoYearsAgo = calendar.getTime().getTime();
- final OpLogService service = this.opLogService;
-
- // 返回字段
- DBObject fields = new BasicDBObject();
- fields.put("_id", true);
-
- // 排序条件
- DBObject orderBy = new BasicDBObject();
- orderBy.put("_id", 1); // 时间升序
- long count = 0;
- try {
- MongoClient client = MongoManager.getInstance().getClient();
- DB database = client.getDB(ConfigManager.get(GeoMongodbConfig.class).getServer().getDbName());
- DBCollection collection = database.getCollection(COLLECTION_NAME);
-
- // 游标每次从库里面一次拿出100条数据,避免出现SocketTimeOut异常,
- // 按_id升序排列,时间大于lastTime就立马返回,不用遍历所有
- DBCursor dbCursor = collection.find(new BasicDBObject(), fields).sort(orderBy).batchSize(CloverConstants.DBCUSTOR_BATCH_SIZE);
- Date date1 = new Date();
- logger.error("user_recommand开始统计, date:" + date1 + ", dbCursor count:" + dbCursor.count());
-
- List<Object> objectIds = new ArrayList<>();
- while (dbCursor.hasNext()) {
- DBObject object = dbCursor.next();
- String id = object.get("_id").toString();
- String time16Str = id.substring(0, 8); // 获取objectId中的16进制的创建时间
- long createTime = Long.valueOf(time16Str, 16) * 1000;
- if (createTime > twoYearsAgo)
- break;
-
- count++;
- objectIds.add(object.get("_id"));
-
- // 每次取出100条数据就来执行删除任务
- if (count % CloverConstants.DBCUSTOR_BATCH_SIZE == 0) {
- final List<Object> delObjectIds = new ArrayList<>(objectIds);
- objectIds.clear();
- // 避免删除频繁,让出一些空闲时间给主程序调用mongodb
- try {
- Thread.sleep(500);
- } catch (Exception e) {
- logger.error(COLLECTION_NAME + " clear sleep error" + e);
- }
- // 继续删除
- service.delOldData(collection, delObjectIds, COLLECTION_NAME, 0);
- }
-
- // 如果划分的时间片用完,就结束执行,保证别的日志也能被清理到
- if (System.currentTimeMillis() >= executeTime.getSecond()) {
- logger.error(COLLECTION_NAME + " time period over");
- break;
- }
- }
- dbCursor.close();
- Date date2 = new Date();
- logger.error("user_recommand结束统计, date:" + date2 + ", cost time:" + (date2.getTime() - date1.getTime()) + "ms, del count:" + count);
- service.delOldData(collection, objectIds, COLLECTION_NAME, 0);
-
- return count;
- } catch (Exception e) {
- logger.error(COLLECTION_NAME + "execute error:" + e.getMessage());
- count += clearUserRecommandOldData(auto);
- }
- return count;
- }
-
- public void delOldData(DBCollection collection, List<Object> objectIds, String colName, int startIndex) {
- if (objectIds.isEmpty())
- return;
- if (startIndex >= objectIds.size())
- return;
-
- //Date date1 = new Date();
- //logger.info(colName + "开始删除, date:" + date1 + ", count:" + objectIds.size());
-
- try {
- for (int i = startIndex; i < objectIds.size(); i++) {
- DBObject query = new BasicDBObject().append("_id", objectIds.get(startIndex));
- // 比remove删除效率高10+倍
- collection.findAndModify(query, null, null, true, null, false, false);
- startIndex++;
- //collection.remove(query);
- }
- } catch (MongoException exception) {
- logger.error("MongoException error, _id:" + objectIds.get(startIndex) + "error msg:" + exception.getMessage());
- startIndex++;
- delOldData(collection, objectIds, colName, startIndex);
- }
- //Date date2 = new Date();
- //logger.info(colName + "结束删除, date:" + date2 + ", cost time:" + (date2.getTime() - date1.getTime()) + "ms");
-
- }

具体删除策略如下:
线上的connections连接数配置的是5,所以采用的单线程删除策略;
开启一个定时器,每天3点-7点执行,每张表规定一个时间段删除,保证有限的时间内每张表都能够有时间删到;
保证你的时间段删除的数量比它每天的增长量要多,不然你怎么都删不完就尴尬了;
如果你的删除数据任务不是要求立马删完,可以删除一些数据之后,让线程休眠一段时间再继续删除,不然频繁删除不仅会一直占用着连接,线上写数据可能会有影响,而且会报SocketTimeOut异常;
- public TwoTuple<Long, Long> getExecuteTime(String colName, boolean auto) {
- long startTime = 0l;
- long endTime = 0l;
- Calendar c = Calendar.getInstance();
-
- int nowHour = c.get(Calendar.HOUR_OF_DAY);
- long now = c.getTimeInMillis();
-
- if (auto) {
- // 定时器触发 3-7点之间,3-5执行user_recommend, 5-6执行op_log, 6-7执行op_result
- int endHour = CloverConstants.CLEAR_END_HOUR;
- if (colName.equals(FriendRecService.COLLECTION_NAME)) {
- endHour = 5;
- } else if (colName.equals(COL_OP_LOG)) {
- endHour = 6;
- }
-
- startTime = now;
- c.set(c.get(Calendar.YEAR), c.get(Calendar.MONTH), c.get(Calendar.DAY_OF_MONTH), endHour, 0, 0);
- endTime = c.getTimeInMillis();
- } else {
- // 手动触发,如果碰上定时器的执行时间段则不执行
- startTime = now;
-
- long endTime1 = 0l;
- if (nowHour < CloverConstants.CLEAR_START_HOUR) {
- c.set(c.get(Calendar.YEAR), c.get(Calendar.MONTH), c.get(Calendar.DAY_OF_MONTH), CloverConstants.CLEAR_START_HOUR, 0, 0);
- endTime1 = c.getTimeInMillis() - 5 * HeConsts.MILLISECONDS_PER_MINUTE;
- if (startTime > endTime1) {
- return Tuple.tuple(0l, 0l);
- }
- } else if (nowHour >= CloverConstants.CLEAR_END_HOUR) {
- c.add(Calendar.DAY_OF_MONTH, 1);
- c.set(c.get(Calendar.YEAR), c.get(Calendar.MONTH), c.get(Calendar.DAY_OF_MONTH), CloverConstants.CLEAR_START_HOUR, 0, 0);
- endTime1 = c.getTimeInMillis() - 5 * HeConsts.MILLISECONDS_PER_MINUTE;
- } else {
- return Tuple.tuple(0l, 0l);
- }
- // 手动触发,每个表删除时间分配2小时,保证手动执行每个表也有时间去删到
- endTime = startTime + HeConsts.MILLISECONDS_PER_HOUR * 2;
- //endTime = startTime + 5*60*1000; // for test
- if (endTime > endTime1) {
- endTime = endTime1;
- }
- }
- return Tuple.tuple(startTime, endTime);
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。