10亿数据如何最快插入MySQL?( 四 )

  • tableIndex 代表被分配的表名后缀 。
  • parentTaskId,即总的任务id 。
  • offset可以用来记录当前任务的进度 。
  • 10亿条数据导入数据库,切分为100个任务后,会新增100个taskId,分别处理一部分数据 , 即一个10G文件 。
  • status 状态用来区分当前任务是否在执行,执行完成 。
  • 如何把任务分配给每一个节点,可以考虑抢占方式 。每个任务节点都需要抢占任务 , 每个节点同时只能抢占1个任务 。具体如何实现呢?可以考虑 每个节点都启动一个定时任务,定期扫表,扫到待执行子任务,尝试执行该任务 。
    如何控制并发呢?可以使用redission的信号量 。key为数据库id:
    RedissonClient redissonClient = Redisson.create(config);
    RSemaphore rSemaphore = redissonClient.getSemaphore("semaphore");
    // 设置1个并发度
    rSemaphore.trySetPermits(1);
    rSemaphore.tryAcquire();//申请加锁,非阻塞 。
    由任务负责定期轮训 , 抢到名额后,就开始执行任务 。将该任务状态置为Process,任务完成后或失败后,释放信号量 。
    10亿数据如何最快插入MySQL?

    文章插图
    但是使用信号量限流有个问题,如果任务忘记释放信号量,或者进程Crash无法释放信号量,如何处理呢?可以考虑给信号量增加一个超时时间 。那么如果任务执行过长,导致提前释放信号量,另一个客户单争抢到信号量 , 导致 两个客户端同时写一个任务如何处理呢?
    what,明明是将10亿数据导入数据库,怎么变成分布式锁超时的类似问题?
    实际上 Redisson的信号量并没有很好的办法解决信号量超时问题,正常思维:如果任务执行过长,导致信号量被释放,解决这个问题只需要续约就可以了,任务在执行中,只要发现信号量快过期了,就续约一段时间,始终保持信号量不过期 。但是 Redission并没有提供信号量续约的能力,怎么办?
    不妨换个思路,我们一直在尝试让多个节点争抢信号量,进而限制并发度 。可以试试选取一个主节点,通过主节点轮训任务表 。分三种情况:
    情况1:当前执行中数量小于并发度
    • 则选取id最小的待执行任务,状态置为进行中,通知发布消息 。
    • 消费到消息的进程,申请分布式锁,开始处理任务 。处理完成释放锁 。借助于Redission分布式锁续约,保证任务完成前,锁不会超时 。
    情况2:当前执行中数量等于并发度
    • 主节点尝试 get 进行中任务是否有锁 。
    • 如果没有锁,说明有任务执行失败,此时应该重新发布任务 。如果有锁,说明有任务正在执行中 。
    情况3:当前执行中数量大于并发度
    • 上报异常情况,报警,人工介入 。
    使用主节点轮训任务,可以减少任务的争抢,通过kafka发布消息,接收到消息的进程处理任务 。为了保证更多的节点参与消费,可以考虑增加Kafka分片数 。虽然每个节点可能同时处理多个任务,但是不会影响性能,因为性能瓶颈在数据库 。
    那么主节点应该如何选取呢?可以通过Zookeeper+curator 选取主节点 。可靠性比较高 。
    10亿条数据插入数据库的时间影响因素非常多 。包括数据库磁盘类型、性能 。数据库分库数量如果能切分1000个库当然性能更快 , 要根据线上实际情况决策分库和分表数量,这极大程度决定了写入的速率 。最后数据库批量插入的阈值也不是一成不变的,需要不断测试调整,以求得最佳的性能 。可以按照100,1000,10000等不断尝试批量插入的最佳阈值 。
    总结
    最后总结一下几点重要的: