php pthreads的使用方法

php pthreads的使用方法:1、通过“pecl install pthreads”安装pthreads;2、在需要控制多个线程同一时刻只能有一个线程工作的情况下使用互斥锁。

php pthreads的使用方法

本文操作环境:windows7系统、php7.0.2版,DELL G3电脑

php多线程pthreads的安装与使用

安装Pthreads 基本上需要重新编译PHP,加上 –enable-maintainer-zts 参数,但是用这个文档很少;bug会很多很有很多意想不到的问题,生成环境上只能呵呵了,所以这个东西玩玩就算了,真正多线程还是用Python、C等等

以下代码大部分来自网络

一、安装

这里使用的是 php-7.0.2

./configure --prefix=/usr/local/php7 --with-config-file-path=/etc --with-config-file-scan-dir=/etc/php.d --enable-debug --enable-maintainer-zts --enable-pcntl --enable-fpm --enable-opcache --enable-embed=shared --enable-json=shared --enable-phpdbg --with-curl=shared --with-mysql=/usr/local/mysql --with-mysqli=/usr/local/mysql/bin/mysql_config --with-pdo-mysqlmake && make install

登录后复制

安装pthreads

pecl install pthreads

登录后复制

二、Thread

getThreadId()}";                                                                                  }   };$thread->start() && $thread->join();#2class workerThread extends Thread { public function __construct($i){$this->i=$i;}public function run(){while(true){echo $this->i."";sleep(1);} } }for($i=0;$istart();}?>

登录后复制

三、 Worker 与 Stackable

Stackables are tasks that are executed by Worker threads. You can synchronize with, read, and write Stackable objects before, after and during their execution.

sql = $sql;}public function run() {$dbh  = $this->worker->getConnection();$row = $dbh->query($this->sql);while($member = $row->fetch(PDO::FETCH_ASSOC)){print_r($member);}}}class ExampleWorker extends Worker {public static $dbh;public function __construct($name) {}public function run(){self::$dbh = new PDO('mysql:host=10.0.0.30;dbname=testdb','root','123456');}public function getConnection(){return self::$dbh;}}$worker = new ExampleWorker("My Worker Thread");$sql1 = new SQLQuery('select * from test order by id desc limit 1,5');$worker->stack($sql1);$sql2 = new SQLQuery('select * from test order by id desc limit 5,5');$worker->stack($sql2);$worker->start();$worker->shutdown();?>

登录后复制

四、 互斥锁

什么情况下会用到互斥锁?在你需要控制多个线程同一时刻只能有一个线程工作的情况下可以使用。一个简单的计数器程序,说明有无互斥锁情况下的不同

mutex = $mutex;$this->handle = fopen("/tmp/counter.txt", "w+");}public function __destruct(){fclose($this->handle);}public function run() {if($this->mutex)$locked=Mutex::lock($this->mutex);$counter = intval(fgets($this->handle));$counter++;rewind($this->handle);fputs($this->handle, $counter );printf("Thread #%lu says: %s", $this->getThreadId(),$counter);if($this->mutex)Mutex::unlock($this->mutex);}}//没有互斥锁for ($i=0;$istart();}//加入互斥锁$mutex = Mutex::create(true);for ($i=0;$istart();}Mutex::unlock($mutex);for ($i=0;$ijoin();}Mutex::destroy($mutex);?>

登录后复制

多线程与共享内存

在共享内存的例子中,没有使用任何锁,仍然可能正常工作,可能工作内存操作本身具备锁的功能

立即学习“PHP免费学习笔记(深入)”;

shmid = $shmid;}public function run() {$counter = shm_get_var( $this->shmid, 1 );$counter++;shm_put_var( $this->shmid, 1, $counter );printf("Thread #%lu says: %s", $this->getThreadId(),$counter);}}for ($i=0;$istart();}for ($i=0;$ijoin();}shm_remove( $shmid );shm_detach( $shmid );?>

登录后复制

五、 线程同步

有些场景我们不希望 thread->start() 就开始运行程序,而是希望线程等待我们的命令。$thread->wait();测作用是 thread->start()后线程并不会立即运行,只有收到 $thread->notify(); 发出的信号后才运行

shmid = $shmid;}public function run() {$this->synchronized(function($thread){$thread->wait();}, $this);$counter = shm_get_var( $this->shmid, 1 );$counter++;shm_put_var( $this->shmid, 1, $counter );printf("Thread #%lu says: %s", $this->getThreadId(),$counter);}}for ($i=0;$istart();}for ($i=0;$isynchronized(function($thread){$thread->notify();}, $threads[$i]);}for ($i=0;$ijoin();}shm_remove( $shmid );shm_detach( $shmid );?>

登录后复制

六、线程池

一个Pool类

row = $row;        $this->sql = null;    }    public function run() {if(strlen($this->row['bankno']) > 100 ){$bankno = safenet_decrypt($this->row['bankno']);}else{$error = sprintf("%s, %s",$this->row['id'], $this->row['bankno']);file_put_contents("bankno_error.log", $error, FILE_APPEND);}if( strlen($bankno) > 7 ){$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);$this->sql = $sql;}printf("%s",$this->sql);    }}class Pool {public $pool = array();public function __construct($count) {$this->count = $count;}public function push($row){if(count($this->pool) count){$this->pool[] = new Update($row);return true;}else{return false;}}public function start(){foreach ( $this->pool as $id => $worker){$this->pool[$id]->start();}}public function join(){foreach ( $this->pool as $id => $worker){               $this->pool[$id]->join();}}public function clean(){foreach ( $this->pool as $id => $worker){if(! $worker->isRunning()){            unset($this->pool[$id]);            }}}}try {$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES 'UTF8'',PDO::MYSQL_ATTR_COMPRESS => true));$sql  = "select id,bankno from members order by id desc";$row = $dbh->query($sql);$pool = new Pool(5);while($member = $row->fetch(PDO::FETCH_ASSOC)){while(true){if($pool->push($member)){ //压入任务到池中break;}else{ //如果池已经满,就开始启动线程$pool->start();$pool->join();$pool->clean();}}}$pool->start();    $pool->join();$dbh = null;} catch (Exception $e) {    echo '[' , date('H:i:s') , ']', '系统错误', $e->getMessage(), "";}?>

登录后复制

动态队列线程池

上面的例子是当线程池满后执行start统一启动,下面的例子是只要线程池中有空闲便立即创建新线程。

row = $row;$this->sql = null;//print_r($this->row);}public function run() {if(strlen($this->row['bankno']) > 100 ){$bankno = safenet_decrypt($this->row['bankno']);}else{$error = sprintf("%s, %s",$this->row['id'], $this->row['bankno']);file_put_contents("bankno_error.log", $error, FILE_APPEND);}if( strlen($bankno) > 7 ){$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);$this->sql = $sql;}printf("%s",$this->sql);}}try {$dbh    = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES 'UTF8'',PDO::MYSQL_ATTR_COMPRESS => true));$sql     = "select id,bankno from members order by id desc limit 50";$row = $dbh->query($sql);$pool = array();while($member = $row->fetch(PDO::FETCH_ASSOC)){$id = $member['id'];while (true){if(count($pool) start();break;}else{foreach ( $pool as $name => $worker){if(! $worker->isRunning()){unset($pool[$name]);}}}}}$dbh = null;} catch (Exception $e) {echo '【' , date('H:i:s') , '】', '【系统错误】', $e->getMessage(), "";}?> 

登录后复制

pthreads Pool类

logger = $logger;}protected $loger;}class WebWork extends Stackable {public function isComplete() {return $this->complete;}public function run() {$this->worker->logger->log("%s executing in Thread #%lu",__CLASS__, $this->worker->getThreadId());$this->complete = true;}protected $complete;}class SafeLog extends Stackable {protected function log($message, $args = []) {$args = func_get_args();if (($message = array_shift($args))) {echo vsprintf("{$message}", $args);}}}$pool = new Pool(8, WebWorker::class, [new SafeLog()]);$pool->submit($w=new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->shutdown();$pool->collect(function($work){return $work->isComplete();});var_dump($pool);

登录后复制

七、多线程文件安全读写

LOCK_SH 取得共享锁定(读取的程序)

LOCK_EX 取得独占锁定(写入的程序

LOCK_UN 释放锁定(无论共享或独占)

LOCK_NB 如果不希望 flock() 在锁定时堵塞


登录后复制

八、多线程与数据连接

pthreads 与 pdo 同时使用是,需要注意一点,需要静态声明public static $dbh;并且通过单例模式访问数据库连接。

Worker 与 PDO

worker->getConnection();                $sql     = "select id,name from members order by id desc limit 50";                $row = $dbh->query($sql);                while($member = $row->fetch(PDO::FETCH_ASSOC)){                        print_r($member);                }        }}class ExampleWorker extends Worker {        public static $dbh;        public function __construct($name) {        }        /*        * The run method should just prepare the environment for the work that is coming ...        */        public function run(){                self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');        }        public function getConnection(){                return self::$dbh;        }}$worker = new ExampleWorker("My Worker Thread");$work=new Work();$worker->stack($work);$worker->start();$worker->shutdown();?>

登录后复制

Pool 与 PDO

在线程池中链接数据库

# cat pool.phplogger = $logger;}protected $logger;}/* the collectable class implements machinery for Pool::collect */class Work extends Stackable {public function __construct($number) {$this->number = $number;}public function run() {                $dbhost = 'db.example.com';               // 数据库服务器                $dbuser = 'example.com';                 // 数据库用户名                $dbpw = 'password';                               // 数据库密码                $dbname = 'example_real';$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES 'UTF8'',                        PDO::MYSQL_ATTR_COMPRESS => true,PDO::ATTR_PERSISTENT => true                        )                );$sql = "select OPEN_TIME, `COMMENT` from MT4_TRADES where LOGIN='".$this->number['name']."' and CMD='6' and `COMMENT` = '".$this->number['order'].":DEPOSIT'";#echo $sql;$row = $dbh->query($sql);$mt4_trades  = $row->fetch(PDO::FETCH_ASSOC);if($mt4_trades){$row = null;$sql = "UPDATE db_example.accounts SET paystatus='成功', deposit_time='".$mt4_trades['OPEN_TIME']."' where `order` = '".$this->number['order']."';";$dbh->query($sql);#printf("%s",$sql);}$dbh = null;printf("runtime: %s, %s, %s", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$this->number['order']);}}class Logging extends Stackable {protected  static $dbh;public function __construct() {$dbhost = 'db.example.com';// 数据库服务器        $dbuser = 'example.com';                 // 数据库用户名        $dbpw = 'password';                               // 数据库密码$dbname = 'example_real';// 数据库名self::$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES 'UTF8'',PDO::MYSQL_ATTR_COMPRESS => true));}protected function log($message, $args = []) {$args = func_get_args();if (($message = array_shift($args))) {echo vsprintf("{$message}", $args);}}protected function getConnection(){                return self::$dbh;        }}$pool = new Pool(200, ExampleWorker::class, [new Logging()]);$dbhost = 'db.example.com';                      // 数据库服务器$dbuser = 'example.com';                 // 数据库用户名$dbpw = 'password';                               // 数据库密码$dbname = 'db_example';$dbh    = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES 'UTF8'',                        PDO::MYSQL_ATTR_COMPRESS => true                        )                );$sql = "select `order`,name from accounts where deposit_time is null order by id desc";$row = $dbh->query($sql);while($account = $row->fetch(PDO::FETCH_ASSOC)){        $pool->submit(new Work($account));}$pool->shutdown();?> 

登录后复制

进一步改进上面程序,我们使用单例模式 $this->worker->getInstance(); 全局仅仅做一次数据库连接,线程使用共享的数据库连接

logger = $logger;#}#protected $logger;protected  static $dbh;public function __construct() {}public function run(){$dbhost = 'db.example.com';// 数据库服务器    $dbuser = 'example.com';        // 数据库用户名        $dbpw = 'password';             // 数据库密码$dbname = 'example';// 数据库名self::$dbh  = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES 'UTF8'',PDO::MYSQL_ATTR_COMPRESS => true,PDO::ATTR_PERSISTENT => true));}protected function getInstance(){        return self::$dbh;    }}/* the collectable class implements machinery for Pool::collect */class Work extends Stackable {public function __construct($data) {$this->data = $data;#print_r($data);}public function run() {#$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );try {$dbh  = $this->worker->getInstance();#print_r($dbh);               $id = $this->data['id'];$mobile = safenet_decrypt($this->data['mobile']);#printf("%d, %s ", $id, $mobile);if(strlen($mobile) > 11){$mobile = substr($mobile, -11);}if($mobile == 'null'){#$sql = "UPDATE members_digest SET mobile = '".$mobile."' where id = '".$id."'";#printf("%s",$sql);#$dbh->query($sql);$mobile = '';$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";}else{$sql = "UPDATE members_digest SET mobile = md5(:mobile) where id = :id";}$sth = $dbh->prepare($sql);$sth->bindValue(':mobile', $mobile);$sth->bindValue(':id', $id);$sth->execute();#echo $sth->debugDumpParams();}catch(PDOException $e) {$error = sprintf("%s,%s", $mobile, $id );file_put_contents("mobile_error.log", $error, FILE_APPEND);}#$dbh = null;printf("runtime: %s, %s, %s, %s", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id);#printf("runtime: %s, %s", date('Y-m-d H:i:s'), $this->number);}}$pool = new Pool(100, ExampleWorker::class, []);#foreach (range(0, 100) as $number) {#$pool->submit(new Work($number));#}$dbhost = 'db.example.com';                     // 数据库服务器$dbuser = 'example.com';                 // 数据库用户名$dbpw = 'password';                             // 数据库密码$dbname = 'example';$dbh    = new PDO("mysql:host=$dbhost;port=3307;dbname=$dbname", $dbuser, $dbpw, array(                        PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES 'UTF8'',                        PDO::MYSQL_ATTR_COMPRESS => true                        )                );#print_r($dbh);#$sql = "select id, mobile from members where id prepare($sql);#$sth->bindValue(':id',300);#$sth->execute();#$result = $sth->fetchAll();#print_r($result);##$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";#$sth = $dbh->prepare($sql);#$sth->bindValue(':mobile', 'aa');#$sth->bindValue(':id','272');#echo $sth->execute();#echo $sth->queryString;#echo $sth->debugDumpParams();$sql = "select id, mobile from members order by id asc"; // limit 1000";$row = $dbh->query($sql);while($members = $row->fetch(PDO::FETCH_ASSOC)){        #$order =  $account['order'];        #printf("%s",$order);        //print_r($members);        $pool->submit(new Work($members));#unset($account['order']);}$pool->shutdown();?>

登录后复制

多线程中操作数据库总结

总的来说 pthreads 仍然处在发展中,仍有一些不足的地方,我们也可以看到pthreads的git在不断改进这个项目

数据库持久链接很重要,否则每个线程都会开启一次数据库连接,然后关闭,会导致很多链接超时。

 true));?>

登录后复制

推荐学习:《PHP视频教程》

以上就是php pthreads的使用方法的详细内容,更多请关注【创想鸟】其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至253000106@qq.com举报,一经查实,本站将立刻删除。

发布者:PHP中文网,转转请注明出处:https://www.chuangxiangniao.com/p/2108328.html

(0)
上一篇 2025年2月24日 15:17:18
下一篇 2025年2月23日 23:40:21

AD推荐 黄金广告位招租... 更多推荐

相关推荐

  • php分页有几种实现方法

    php分页有3种实现方法,分别是:1、通过“mysql_query”等函数将sql查询进行分页;2、使用ajax的方法实现分页;3、通过“function viewpage(p){…}”脚本实现分页。 本文操作环境:window…

    2025年2月24日
    200
  • php怎么实现英文转小写

    php实现英文转小写的方法:1、使用lcfirst()函数将第一个英文单词首字母变成小写;2、通过strtolower()函数将所有字母变成小写。 本文操作环境:windows7系统、PHP7.1版,DELL G3电脑 php怎么实现英文转…

    2025年2月24日
    200
  • bcadd php是函数还是扩展

    bcadd是php中的一个函数,作用是计算两个任意精度数字的加法,该函数的使用语法如“bcadd(string $num1, string $num2, ?int $scale = null): string”。 本文操作环境:window…

    2025年2月24日
    200
  • 如何解决php is_writable 失败问题

    php is_writable失败的解决办法:1、打开命令窗口;2、执行“chcon -R -t httpd_sys_content_rw_t /var/www/html”即可。 本文操作环境:centos7系统、PHP7版,DELL G3…

    2025年2月24日
    200
  • php zip中文乱码怎么办

    php zip中文乱码的解决办法:1、打开相应的PHP代码文件;2、用PHP的zip封装协议解压去解决ZipArchive乱码问题即可。 本文操作环境:windows7系统、PHP7.1版,DELL G3电脑 php zip中文乱码怎么办?…

    2025年2月24日
    200
  • php怎么修改文件的名字

    在php中,可以利用rename()函数来修改文件的名字,该函数可以重命名一个文件或者目录,语法“rename(要修改的文件名, 新的文件名)”;如果修改成功则返回TRUE,如果修改失败则返回FALSE。 本教程操作环境:windows7系…

    2025年2月24日 编程技术
    200
  • php mysql_query()怎么用

    php mysql_query()函数用于执行sql语句,在查询、更新、添加数据时,可以写好sql语句,然后使用mysql_query()函数执行该sql语句对指定数据库进行操作;语法“mysql_query($sql [,$con])”。…

    2025年2月24日
    200
  • php怎么去掉小数位

    php去掉小数位的方法:1、使用intval()函数获取变量的整数值部分,语法“intval($number)”;2、使用floor()函数向下取整,语法“floor($number)”。 本教程操作环境:windows7系统、PHP7.1…

    2025年2月24日 编程技术
    200
  • php文件怎样可以读取却不能写入数据

    php文件实现读取又不能写入数据的方法:1、使用“fopen(‘文件路径’, ‘r’)”语句以只读的方式来打开文件;2、使用fgetc()、fgets()、fgetss()等函数读取数据即可。…

    2025年2月24日
    200
  • php怎么复制文件后改名

    在php中,可以利用сoру()函数来实现复制文件后修改文件名,该函数可以将一个文件复制(拷贝)到指定目录中,语法“copy($file, $newfile)”;如果执行成功则返回TRUE,如果执行失败则返回FALSE。 本教程操作环境:w…

    2025年2月24日 编程技术
    200

发表回复

登录后才能评论