网页主动探测工具-增加Netty模式
接前文
http://blog.itpub.net/29254281/viewspace-1344706/
http://blog.itpub.net/29254281/viewspace-1347985/
http://blog.itpub.net/29254281/viewspace-2134876/
http://blog.itpub.net/29254281/viewspace-2135131/
还是那个程序,在之前的基础上,改用Netty作为客户端.
也不知道用的到底对不对,先记录一下,以后慢慢学习.
http://blog.itpub.net/29254281/viewspace-1344706/
http://blog.itpub.net/29254281/viewspace-1347985/
http://blog.itpub.net/29254281/viewspace-2134876/
http://blog.itpub.net/29254281/viewspace-2135131/
还是那个程序,在之前的基础上,改用Netty作为客户端.
也不知道用的到底对不对,先记录一下,以后慢慢学习.
- import java.io.IOException;
- import java.nio.channels.Selector;
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.PreparedStatement;
- import java.sql.SQLException;
- import java.sql.Timestamp;
- import java.util.ArrayList;
- import java.util.HashSet;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Set;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.atomic.AtomicInteger;
- import java.util.regex.Matcher;
- import java.util.regex.Pattern;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.codec.LineBasedFrameDecoder;
- import io.netty.handler.codec.string.StringDecoder;
- class Reactor implements Runnable {
- public static int GETCOUNT() {
- return COUNT.get();
- }
- public static int getQueueSize() {
- return QUEUE.size();
- }
- private static final AtomicInteger COUNT = new AtomicInteger();
- private static final AtomicInteger TASKCOUNT = new AtomicInteger();
- public int startTask() {
- return TASKCOUNT.incrementAndGet();
- }
- public int finishTask() {
- return TASKCOUNT.decrementAndGet();
- }
- public int incrementAndGet() {
- return COUNT.incrementAndGet();
- }
- public final Selector selector;
- private static BlockingQueue QUEUE = new LinkedBlockingQueue();
- public void addTask(Task task) {
- try {
- QUEUE.put(task);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- public Reactor() throws IOException {
- selector = Selector.open();
- }
- @Override
- public void run() {
- EventLoopGroup group = new NioEventLoopGroup(3);
- final Reactor reactor = this;
- while (!Thread.interrupted()) {
- int maxClient = 500;
- Task task = null;
- if (TASKCOUNT.get() < maxClient) {
- try {
- while ((task = (Task) QUEUE.take()) != null) {
- final Task t = task;
- reactor.startTask();
- Bootstrap boot = new Bootstrap();
- boot.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer() {
- @Override
- protected void initChannel(Channel ch) throws Exception {
- ch.pipeline().addLast(new LineBasedFrameDecoder(409600));
- ch.pipeline().addLast(new StringDecoder());
- ch.pipeline().addLast(new HttpClientInboundHandler(reactor, t));
- }
- });
- boot.connect(task.getHost(), task.getPort());
- if (TASKCOUNT.get() > maxClient) {
- break;
- }
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- } else {
- //如果已经连接了500个网页,则主线程休眠一段时间.
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- group.shutdownGracefully();
- }
- }
- class HttpClientInboundHandler extends ChannelInboundHandlerAdapter {
- private Task task;
- private Reactor reactor;
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- ctx.channel().closeFuture();
- ctx.close();
- this.reactor.finishTask();
- task.setEndtime(System.currentTimeMillis());
- this.reactor.incrementAndGet();
- new ParseHandler(reactor, task).run();
- }
- public HttpClientInboundHandler(Reactor reactor, Task task) {
- this.task = task;
- this.reactor = reactor;
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- task.setStarttime(System.currentTimeMillis());
- StringBuilder sb = new StringBuilder();
- sb.append("GET " + task.getCurrentPath() + " HTTP/1.0\r\n");
- sb.append("HOST:" + task.getHost() + "\r\n");
- sb.append("Accept:*/*\r\n");
- sb.append("\r\n");
- ByteBuf bb = Unpooled.copiedBuffer(sb.toString().getBytes("utf8"));
- ctx.writeAndFlush(bb);
- }
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- String content = (String) msg;
- task.getContent().append(content);
- task.getContent().append("\n");
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- }
- public class Probe {
- public static void main(String[] args) throws IOException, InterruptedException {
- for (int i = 0; i < 1; i++) {
- Reactor reactor = new Reactor();
- reactor.addTask(new Task("news.163.com", 80, "/index.html"));
- new Thread(reactor, "ReactorThread_" + i).start();
- }
- long start = System.currentTimeMillis();
- while (true) {
- Thread.sleep(1000);
- long end = System.currentTimeMillis();
- float interval = ((end - start) / 1000);
- int connectTotal = Reactor.GETCOUNT();
- int persistenceTotal = PersistenceHandler.GETCOUNT();
- int connectps = Math.round(connectTotal / interval);
- int persistenceps = Math.round(persistenceTotal / interval);
- System.out.print("\r连接总数:" + connectTotal + " \t每秒连接:" + connectps + "\t连接队列剩余:" + Reactor.getQueueSize()
- + " \t持久化总数:" + persistenceTotal + " \t每秒持久化:" + persistenceps + "\t持久化队列剩余:"
- + PersistenceHandler.getInstance().getSize());
- }
- }
- }
- class Task {
- private String host;
- private int port;
- private String currentPath;
- private long starttime;
- private long endtime;
- private String type;
- private StringBuilder content = new StringBuilder(2400);
- private int state;
- private boolean isValid = true;
- public Task() {
- }
- public Task(String host, int port, String path) {
- init(host, port, path);
- }
- public void init(String host, int port, String path) {
- this.setCurrentPath(path);
- this.host = host;
- this.port = port;
- }
- public long getStarttime() {
- return starttime;
- }
- public void setStarttime(long starttime) {
- this.starttime = starttime;
- }
- public long getEndtime() {
- return endtime;
- }
- public void setEndtime(long endtime) {
- this.endtime = endtime;
- }
- public boolean isValid() {
- return isValid;
- }
- public void setValid(boolean isValid) {
- this.isValid = isValid;
- }
- public int getState() {
- return state;
- }
- public void setState(int state) {
- this.state = state;
- }
- public String getCurrentPath() {
- return currentPath;
- }
- public void setCurrentPath(String currentPath) {
- this.currentPath = currentPath;
- int i = 0;
- if (currentPath.indexOf("?") != -1) {
- i = currentPath.indexOf("?");
- } else {
- if (currentPath.indexOf("#") != -1) {
- i = currentPath.indexOf("#");
- } else {
- i = currentPath.length();
- }
- }
- this.type = currentPath.substring(currentPath.indexOf(".") + 1, i);
- }
- public long getTaskTime() {
- return getEndtime() - getStarttime();
- }
- public String getType() {
- return type;
- }
- public void setType(String type) {
- this.type = type;
- }
- public String getHost() {
- return host;
- }
- public int getPort() {
- return port;
- }
- public StringBuilder getContent() {
- return content;
- }
- public void setContent(StringBuilder content) {
- this.content = content;
- }
- }
- class ParseHandler implements Runnable {
- private static final Set SET = new HashSet();
- PersistenceHandler persistencehandler = PersistenceHandler.getInstance();
- List domainlist = new ArrayList();
- Task task;
- private interface Filter {
- void doFilter(Task fatherTask, Task newTask, String path, Filter chain);
- }
- private class FilterChain implements Filter {
- private List list = new ArrayList();
- {
- addFilter(new TwoLevel());
- addFilter(new OneLevel());
- addFilter(new FullPath());
- addFilter(new Root());
- addFilter(new Default());
- }
- private void addFilter(Filter filter) {
- list.add(filter);
- }
- private Iterator it = list.iterator();
- @Override
- public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {
- if (it.hasNext()) {
- ((Filter) it.next()).doFilter(fatherTask, newTask, path, chain);
- }
- }
- }
- private class TwoLevel implements Filter {
- @Override
- public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {
- if (path.startsWith("../../")) {
- String prefix = getPrefix(fatherTask.getCurrentPath(), 3);
- newTask.init(fatherTask.getHost(), fatherTask.getPort(), path.replace("../../", prefix));
- } else {
- chain.doFilter(fatherTask, newTask, path, chain);
- }
- }
- }
- private class OneLevel implements Filter {
- @Override
- public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {
- if (path.startsWith("../")) {
- String prefix = getPrefix(fatherTask.getCurrentPath(), 2);
- newTask.init(fatherTask.getHost(), fatherTask.getPort(), path.replace("../", prefix));
- } else {
- chain.doFilter(fatherTask, newTask, path, chain);
- }
- }
- }
- private class FullPath implements Filter {
- @Override
- public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {
- if (path.startsWith("http://")) {
- Iterator it = domainlist.iterator();
- boolean flag = false;
- while (it.hasNext()) {
- String domain = (String) it.next();
- if (path.startsWith("http://" + domain + "/")) {
- newTask.init(domain, fatherTask.getPort(), path.replace("http://" + domain + "/", "/"));
- flag = true;
- break;
- }
- }
- if (!flag) {
- newTask.setValid(false);
- }
- } else {
- chain.doFilter(fatherTask, newTask, path, chain);
- }
- }
- }
- private class Root implements Filter {
- @Override
- public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {
- if (path.startsWith("/")) {
- newTask.init(fatherTask.getHost(), fatherTask.getPort(), path);
- } else {
- chain.doFilter(fatherTask, newTask, path, chain);
- }
- }
- }
- private class Default implements Filter {
- @Override
- public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {
- if (path.contains(":")) {
- newTask.setValid(false);
- return;
- }
- String prefix = getPrefix(fatherTask.getCurrentPath(), 1);
- newTask.init(fatherTask.getHost(), fatherTask.getPort(), prefix + "/" + path);
- }
- }
- public ParseHandler(Reactor reactor, Task task) {
- this.reactor = reactor;
- this.task = task;
- // 增加白名单
- this.domainlist.add("news.163.com");
- }
- private Reactor reactor;
- private Pattern pattern = Pattern.compile("\"[^\"]+\\.htm[^\"]*\"");
- private void parseTaskState(Task task) {
- if (task.getContent().toString().startsWith("HTTP/1.1")) {
- task.setState(Integer.parseInt(task.getContent().substring(9, 12)));
- } else {
- try {
- task.setState(Integer.parseInt(task.getContent().substring(9, 12)));
- } catch (Exception ex) {
- ex.printStackTrace();
- System.out.println(task.getContent());
- }
- }
- }
- /**
- * @param fatherTask
- * @param path
- * @throws Exception
- */
- private void createNewTask(Task fatherTask, String path) throws Exception {
- Task newTask = new Task();
- FilterChain filterchain = new FilterChain();
- filterchain.doFilter(fatherTask, newTask, path, filterchain);
- if (newTask.isValid()) {
- synchronized (SET) {
- if (SET.contains(newTask.getHost() + newTask.getCurrentPath())) {
- return;
- }
- SET.add(newTask.getHost() + newTask.getCurrentPath());
- }
- reactor.addTask(newTask);
- }
- }
-
private String getPrefix(String s,
本文名称:网页主动探测工具-增加Netty模式
本文URL:http://scyanting.com/article/jhpshc.html