发新话题
打印

php 多线程实例讲解

php 多线程实例讲解

PHP语言本身是不支持多线程的. 总结了一下网上关于PHP模拟多线程的方法, 总的来说, 都是利用了PHP的好伙伴们本身所具有的多线程能力. PHP的好伙伴指的就是LINUX和APACHE啦, LAMP嘛.

  另外, 既然是模拟的, 就不是真正的多线程. 其实只是多进程. 进程和线程是两个不同的概念. 好了, 以下方法都是从网上找来的.

  1. 利用LINUX操作系统

<?php
for ($i=0;$i<10;$i++) {
  echo $i;
  sleep(5);
}
?>

  上面存成test.php, 然后写一段SHELL代码

#!/bin/bash
for i in 1 2 3 4 5 6 7 8 9 10
do
  php -q test.php &
done

  2. 利用fork子进程(其实同样是利用LINUX操作系统)

<?php
declare(ticks=1);
$bWaitFlag = FALSE; /// 是否等待进程结束
$intNum = 10;      /// 进程总数
$pids = array();    /// 进程PID数组
echo ("Startn");
for($i = 0; $i < $intNum; $i++) {
$pids[$i] = pcntl_fork();/// 产生子进程,而且从当前行之下开试运行代码,而且不继承父进程的数据信息
if(!$pids[$i]) {
  // 子进程进程代码段_Start
  $str="";
  sleep(5+$i);
  for ($j=0;$j<$i;$j++) {$str.="*";}
  echo "$i -> " . time() . " $str n";
  exit();
  // 子进程进程代码段_End
}
}
if ($bWaitFlag)
{
for($i = 0; $i < $intNum; $i++) {
  pcntl_waitpid($pids[$i], $status, WUNTRACED);
  echo "wait $i -> " . time() . "n";
}
}
echo ("Endn");
?>

  3. 利用WEB SERVER, PHP不支持多线程, APACHE可是支持的, 呵呵.

  假设我们现在运行的是a.php这个文档. 但是我在程式中又请求WEB服务器运行另一个b.php

  那么这两个文档将是同时执行的.

<?php
function runThread()
{
$fp = fsockopen('localhost', 80, $errno, $errmsg);
fputs($fp, "GET /a.php?act=brnrn");
fclose($fp);
}
function a()
{
$fp = fopen('result_a.log', 'w');
fputs($fp, 'Set in ' . Date('h:i:s', time()) . (double)microtime() . "rn");
fclose($fp);
}
function b()
{
$fp = fopen('result_b.log', 'w');
fputs($fp, 'Set in ' . Date('h:i:s', time()) . (double)microtime() . "rn");
fclose($fp);
}
if(!isset($_GET['act'])) $_GET['act'] = 'a';
if($_GET['act'] == 'a')
{
runThread();
a();
}
else if($_GET['act'] == 'b') b();
?>

  当然啦,也可以把需要多线程处理的部分交给JAVA去处理, 然后在PHP里调用, 哈哈.

<?php
system('java multiThread.java');
?>

TOP

当有人想要实现并发功能时,他们通常会想到用fork或者spawn threads,但是当他们发现php不支持多线程的时候,大概会转换思路去用一些不够好的语言,比如perl。

其实的是大多数情况下,你大可不必使用fork或者线程,并且你会得到比用fork或thread更好的性能。

假设你要建立一个服务来检查正在运行的n台服务器,以确定他们还在正常运转。你可能会写下面这样的代码:

<?php
$hosts = array("host1.sample.com", "host2.sample.com", "host3.sample.com");
$timeout = 15;
$status = array();
foreach ($hosts as $host) {
$errno = 0;
$errstr = "";
$s = fsockopen($host, 80, $errno, $errstr, $timeout);
if ($s) {
   $status[$host] = "Connectedn";
   fwrite($s, "HEAD / HTTP/1.0rnHost: $hostrnrn");
   do {
    $data = fread($s, 8192);
    if (strlen($data) == 0) {
    break;
    }
    $status[$host] .= $data;
   } while (true);
   fclose($s);
} else {
   $status[$host] = "Connection failed: $errno $errstrn";
}
}
print_r($status);
?>

它运行的很好,但是在fsockopen()分析完hostname并且建立一个成功的连接(或者延时$timeout秒)之前,扩充这段代码来管理大量服务器将耗费很长时间。
因此我们必须放弃这段代码;我们可以建立异步连接-不需要等待fsockopen返回连接状态。PHP仍然需要解析hostname(所以直接使用ip更加明智),不过将在打开一个连接之后立刻返回,继而我们就可以连接下一台服务器。
有两种方法可以实现;PHP5中可以使用新增的stream_socket_client()函数直接替换掉fsocketopen()。PHP5之前的版本,你需要自己动手,用sockets扩展解决问题。

下面是PHP5中的解决方法:
<?php
$hosts = array("host1.sample.com", "host2.sample.com", "host3.sample.com");
$timeout = 15;
$status = array();
$sockets = array();
/* Initiate connections to all the hosts simultaneously */
foreach ($hosts as $id => $host) {
$s = stream_socket_client("$host:80", $errno, $errstr, $timeout,
   STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);
if ($s) {
   $sockets[$id] = $s;
   $status[$id] = "in progress";
} else {
   $status[$id] = "failed, $errno $errstr";
}
}
/* Now, wait for the results to come back in */
while (count($sockets)) {
$read = $write = $sockets;
/* This is the magic function - explained below */
$n = stream_select($read, $write, $e = null, $timeout);
if ($n > 0) {
   /* readable sockets either have data for us, or are failed
   * connection attempts */
   foreach ($read as $r) {
      $id = array_search($r, $sockets);
      $data = fread($r, 8192);
      if (strlen($data) == 0) {
    if ($status[$id] == "in progress") {
     $status[$id] = "failed to connect";
    }
    fclose($r);
    unset($sockets[$id]);
      } else {
    $status[$id] .= $data;
      }
   }
   /* writeable sockets can accept an HTTP request */
   foreach ($write as $w) {
    $id = array_search($w, $sockets);
    fwrite($w, "HEAD / HTTP/1.0rnHost: "
     . $hosts[$id] . "rnrn");
    $status[$id] = "waiting for response";
   }
} else {
   /* timed out waiting; assume that all hosts associated
   * with $sockets are faulty */
   foreach ($sockets as $id => $s) {
    $status[$id] = "timed out " . $status[$id];
   }
   break;
}
}
foreach ($hosts as $id => $host) {
echo "Host: $hostn";
echo "Status: " . $status[$id] . "nn";
}

?>

我们用stream_select()等待sockets打开的连接事件。stream_select()调用系统的select(2)函数来工作:前面三个参数是你要使用的streams的数组;你可以对其读取,写入和获取异常(分别针对三个参数)。stream_select()可以通过设置$timeout(秒)参数来等待事件发生-事件发生时,相应的sockets数据将写入你传入的参数。

下面是PHP4.1.0之后版本的实现,如果你已经在编译PHP时包含了sockets(ext/sockets)支持,你可以使用根上面类似的代码,只是需要将上面的streams/filesystem函数的功能用ext/sockets函数实现。主要的不同在于我们用下面的函数代替stream_socket_client()来建立连接:
<?php
// This value is correct for Linux, other systems have other values
define('EINPROGRESS', 115);
function non_blocking_connect($host, $port, &$errno, &$errstr, $timeout) {
$ip = gethostbyname($host);
$s = socket_create(AF_INET, SOCK_STREAM, 0);
if (socket_set_nonblock($s)) {
   $r = @socket_connect($s, $ip, $port);
   if ($r || socket_last_error() == EINPROGRESS) {
    $errno = EINPROGRESS;
    return $s;
   }
}
$errno = socket_last_error($s);
$errstr = socket_strerror($errno);
socket_close($s);
return false;
}
?>

现在用socket_select()替换掉stream_select(),用socket_read()替换掉fread(),用socket_write()替换掉fwrite(),用socket_close()替换掉fclose()就可以执行脚本了!
PHP5的先进之处在于,你可以用stream_select()处理几乎所有的stream-例如你可以通过include STDIN用它接收键盘输入并保存进数组,你还可以接收通过proc_open()打开的管道中的数据。
如果你想让PHP4.3.x自身拥有处理streams的功能,我已经为你准备了一个让fsockopen可以异步工作的patch。不赞成使用该补丁,该补丁不会出现在官方发布的PHP版本中,我在补丁中附带了stream_socket_client()函数的实现,通过它,你可以让你的脚本兼容PHP5。
附件:
documentation for stream_select()
documentation for socket_select()
patch for PHP 4.3.2 and script to emulate stream_socket_client(). (might work with later 4.3.x versions).

------------------------------------

经测试,确实为多线程,弄了整个下午,终于弄好了~~~


$request = array("http://10.1.30.218/test/server.php","http://10.1.30.28/server.php");

foreach($request as $r) {
$temp = parse_url($r);
$scheme[] = $temp['scheme'];
$hosts[] = $temp['host'];
$paths[] = isset($temp['path']) ? $temp['path'] : "" ;
}

//$hosts = array("www.bit.edu.cn");

$timeout = 5;
$status = array();
$sockets = array();
// Initiate connections to all the hosts simultaneously
foreach ($hosts as $id => $host) {
$s = stream_socket_client("$host:80", $errno, $errstr, $timeout, STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);
if ($s) {
   $sockets[$id] = $s;
   $status[$id] = "in progress";
} else {
   $status[$id] = "failed, $errno $errstr";
}
}
//print_r($sockets);print_r($status);
//*
// Now, wait for the results to come back in
while (count($sockets)) {
$read = $write = $sockets;
// This is the magic function - explained below
$ret = stream_select($read, $write, $e = null, $timeout);
if ($ret > 0) {
   // readable sockets either have data for us, or are failed connection attempts
   foreach ($read as $r) {
      $id = array_search($r, $sockets);
    $data = fread($r, 8192);
    if (strlen($data) == 0) {
     if ($status[$id] == "in progress") {
      $status[$id] = "failed to connect";
     }
     fclose($r);
     unset($sockets[$id]);
    } else {
     if ($status[$id] == "in progress") {
      $status[$id] = $data;
     } else {
      $status[$id] .= $data;
     }
    }
   }
   // writeable sockets can accept an HTTP request
   foreach ($write as $w) {
    $id = array_search($w, $sockets);
    //fwrite($w, "HEAD / HTTP/1.0\r\nHost: " . $hosts[$id] . "\r\n\r\n");
    fwrite($w, "GET /".$paths[$id]." HTTP/1.0\r\nHost: " . $hosts[$id] . "\r\n\r\n");
    //$status[$id] = "waiting for response";
   }

} else {
   // timed out waiting; assume that all hosts associated with $sockets are faulty
   foreach ($sockets as $id => $s) {
    $status[$id] = "timed out\r\n\r\n" . $status[$id];
   }
   break;
}
}
foreach ($hosts as $id => $host) {
echo "Host: $host\n";
echo '<pre>'.$status[$id].'</pre>';
/*
$pos = strpos($status[$id],"\r\n\r\n");
$content[$id] = substr($status[$id],$pos);
$status[$id] = substr($status[$id],0,$pos);
//echo "Status: " . $status[$id] . "\n\n";
echo $content[$id] . "\n\n" ;
*/
}

TOP

发新话题