php如何发送数据到kafka?
发布于 作者:苏南大叔 来源:程序如此灵动~ 我们相信:世界是美好的,你是我也是。平行空间的世界里面,不同版本的生活也在继续...
在苏南大叔给大家带来的大数据套装里面,kafka
仅仅是个小小的纽带。经常用于数据的发送及转移。在kafka
官方的例子中,其实并没有php
的相关实现版本。现在网上流传的kafka
的相关php
库,都是些编程爱好者们自己写的类库,所以就肯定不会有太统一的接口标准了。
下面以某个类库为例,展示相关的kafka
的php
扩展库使用。综合比较了几家kafka
的php
库,苏南大叔觉得下面的这个开源类库,nmred/kafka-php
,比较简洁方便一些。
composer
安装nmred/kafka-php
先要安装composer
的类库nmred/kafka-php
。composer
的基本使用问题,大家可以查看苏南大叔的composer
相关文章。
composer
安装nmred/kafka-php
的命令,如下:
composer require "nmred/kafka-php" -vvv
当然,你也可以使用镜像加速下载,加速下载版的composer.json
如下图所示。
{
"config": {
"secure-http": false,
"preferred-install": "dist",
"sort-packages": true
},
"repositories": {
"packagist": {
"type": "composer",
"url": "https://packagist.phpcomposer.com"
}
},
"require": {
"nmred/kafka-php": "v0.2.0.7"
}
}
确定端口及topic
,查看kafka
版本号
我选择本地的端口是9092
,topic
是test1
,同时查看本地的kafka
版本是0.11.0.0
。这些在代码中都是要用到的。
生产者代码(异步)
<?php
require 'vendor/autoload.php';
date_default_timezone_set('PRC');
// use Monolog\Logger;
// use Monolog\Handler\StdoutHandler;
// // Create the logger
// $logger = new Logger('my_logger');
// // Now add some handlers
// $logger->pushHandler(new StdoutHandler());
$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setBrokerVersion('0.11.0.0');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer(function() {
return array(
array(
'topic' => 'test1', //注意对应topic
'key' => 'testkey',
'value' => 'test....message.',
),
);
});
// $producer->setLogger($logger);
$producer->success(function($result) {
var_dump($result);
});
$producer->error(function($errorCode) {
var_dump($errorCode);
});
$producer->send(true);
生产者代码(同步)
<?php
require 'vendor/autoload.php';
date_default_timezone_set('PRC');
// use Monolog\Logger;
// use Monolog\Handler\StdoutHandler;
// Create the logger
// $logger = new Logger('my_logger');
// Now add some handlers
// $logger->pushHandler(new StdoutHandler());
$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setBrokerVersion('0.11.0.0');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer();
// $producer->setLogger($logger);
for($i = 0; $i < 100; $i++) {
$result = $producer->send(array(
array(
'topic' => 'test1',
'value' => 'test1....message.',
'key' => 'key'.$i,
),
));
var_dump($result);
}
这两个生产者的代码,可以用下列shell命令接收。
kafka-console-consumer --bootstrap-server localhost:9092 --topic test1 --from-beginning
消费者代码
require 'vendor/autoload.php';
date_default_timezone_set('PRC');
// use Monolog\Logger;
// use Monolog\Handler\StdoutHandler;
// Create the logger
// $logger = new Logger('my_logger');
// Now add some handlers
// $logger->pushHandler(new StdoutHandler());
$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setGroupId('test');
$config->setBrokerVersion('0.11.0.0');
$config->setTopics(array('test1'));
//$config->setOffsetReset('earliest');
$consumer = new \Kafka\Consumer();
//$consumer->setLogger($logger);
$consumer->start(function($topic, $part, $message) {
var_dump($message);
});
这个消费者代码,可以通过下面的shell命令发送数据。
kafka-console-producer --broker-list localhost:9092 --topic test1
值得特殊说明的是,这个消费者的代码,在网页里面执行也是可以的。页面会实时显示相关数据。估计php端是个无限长的endless状态。
相关链接
这个开源类库在github
上面叫做weiboad/kafka-php
,似乎是国人作品,所以有个中文文档。
小结
当然,大家要记得开启zookeeper
和kafka
,才能做这些实验。相关kafka
的安装问题,请点击这里查看:
如果本文对您有帮助,或者节约了您的时间,欢迎打赏瓶饮料,建立下友谊关系。
本博客不欢迎:各种镜像采集行为。请尊重原创文章内容,转载请保留作者链接。
本博客不欢迎:各种镜像采集行为。请尊重原创文章内容,转载请保留作者链接。
你好,请问下monolog在这里是什么作用?
不错
原版带的这个monolog,个人觉得没用,所以注释掉了啊。一种日志工具罢了~