Ali Canal?https://github.com/alibaba/canal
Hive 的数据源可以设置为 HBase
canal.deployer/conf/canal.properties canal.deployer/conf/instanceA/instance.properties canal.deployer/conf/instanceB/instance.properties
# slaveId 不能与 my.cnf 中的 server-id 项重复 canal.instance.mysql.slaveId = 1234 canal.instance.master.address = 12⑦0.0.1:3306 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.connectionCharset = UTF-8 # 订阅实例中所有的数据库和表 canal.instance.filter.regex = .*\\..*
Loading properties file from class path resource [canal.properties] Loading properties file from class path resource [example/instance.properties] start CannalInstance for 1-example [destination = example , address = /12⑦0.0.1:3306 , EventParser] prepare to find start position just show master status
CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("12⑦0.0.1", 11111), "example", "", ""); connector.connect(); connector.subscribe(".*\\..*"); while (true) { Message message = connector.getWithoutAck(100); long batchId = message.getId(); if (batchId == -1 || message.getEntries().isEmpty()) { Thread.sleep(3000); } else { printEntries(message.getEntries()); connector.ack(batchId); } }
// printEntries RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); for (RowData rowData : rowChange.getRowDatasList()) { if (rowChange.getEventType() == EventType.INSERT) { printColumns(rowData.getAfterCollumnList()); } }
// printColumns String line = columns.stream() .map(column -> column.getName() ◆ "=" ◆ column.getValue()) .collect(Collectors.joining(",")); System.out.println(line);
以上就是土嘎嘎小编为大家整理的使用 Binlog 和 Canal 从 MySQL 抽取数据相关主题介绍,如果您觉得小编更新的文章只要能对粉丝们有用,就是我们最大的鼓励和动力,不要忘记讲本站分享给您身边的朋友哦!!