用Java实现缓冲多线程的无阻塞读取远程文件_C/C++语言_黑客防线网安服务器维护基地--Powered by WWW.RONGSEN.COM.CN

用Java实现缓冲多线程的无阻塞读取远程文件

作者:黑客防线网安C/C++教程基地 来源:黑客防线网安C/C++教程基地 浏览次数:0

本篇关键词:读取远程文件实现
黑客防线网安网讯:   我平时比较喜欢从网上听歌,有些链接下载速度太慢了。如果用HttpURLConnection类的方法打开连接,然后用InputStream类获得输入流,再用BufferedInputStream构造出带缓冲区的输入流,如果...

   我平时比较喜欢从网上听歌有些链接下载速度太慢了如果用HttpURLConnection类的方法打开连接然后用InputStream类获得输入流,再用BufferedInputStream构造出带缓冲区的输入流,如果网速太慢的话,无论缓冲区设置多大,听起来都是断断续续的,达不到真正缓冲的目的于是尝试编写代码实现用缓冲方式读取远程文件,以下贴出的代码是我写的MP3解码器的一部分。我是不怎么赞同使用多线程下载的,加之有的链接下载速度本身就比较快,所以在下载速度足够的情况下,就让下载线程退出,直到只剩下一个下载线程。当然,多线程中令人头痛的死锁问题、HttpURLConnection的超时阻塞问题都会使代码看起来异常复杂。
 
简要介绍一下实现多线程环形缓冲的方法。将缓冲区buf[]分为16块,每块32K,下载线程负责向缓冲区写数据,每次写一块;读线程(BuffRandAcceURL类)每次读小于32K的任意字节。同步描述:写/写互斥等待空闲块;写/写并发填写buf[];读/写并发使用buf[]。
 
经过我很长一段时间使用,我认为比较满意地实现了我的目标,同其它MP3播放器对比,我的这种方法能够比较流畅、稳定地下载并播放。我把实现多线程下载缓冲的方法写出来,不足之处恳请批评指正。
 
一、HttpReader类功能:HTTP协议从指定URL读取数据
 
      /** *//**
* author by http://www.bt285.cn http://www.5a520.cn
*/
package instream;  
 
import java.io.IOException;  
import java.io.InputStream;  
import java.net.HttpURLConnection;  
import java.net.URL;  
 
public final class HttpReader {  
    public static final int MAX_RETRY = 10;  
    private static long content_length;  
    private URL url;  
    private HttpURLConnection httpConnection;  
    private InputStream in_stream;  
    private long cur_pos;           //用于决定seek方法中是否执行文件定位  
    private int connect_timeout;  
    private int read_timeout;  
      
    public HttpReader(URL u) {  
        this(u, 5000, 5000);  
    }  
      
    public HttpReader(URL u, int connect_timeout, int read_timeout) {  
        this.connect_timeout = connect_timeout;  
        this.read_timeout = read_timeout;  
        url = u;  
        if (content_length == 0) {  
            int retry = 0;  
            while (retry < HttpReader.MAX_RETRY)  
                try {  
                    this.seek(0);  
                    content_length = httpConnection.getContentLength();  
                    break;  
                } catch (Exception e) {  
                    retry++;  
                }  
        }  
    }  
      
    public static long getContentLength() {  
        return content_length;  
    }  
      
    public int read(byte[] b, int off, int len) throws IOException {  
        int r = in_stream.read(b, off, len);  
        cur_pos += r;  
        return r;  
    }  
      
    public int getData(byte[] b, int off, int len) throws IOException {  
        int r, rema = len;  
        while (rema > 0) {  
            if ((r = in_stream.read(b, off, rema)) == -1) {  
                return -1;  
            }  
            rema -= r;  
            off += r;  
            cur_pos += r;  
        }  
        return len;  
    }  
      
    public void close() {  
        if (httpConnection != null) {  
            httpConnection.disconnect();  
            httpConnection = null;  
        }  
        if (in_stream != null) {  
            try {  
                in_stream.close();  
            } catch (IOException e) {}  
            in_stream = null;  
        }  
        url = null;  
    }  
      
    /**//* 
     * 抛出异常通知再试 
     * 响应码503可能是由某种暂时的原因引起的,例如同一IP频繁的连接请求可能遭服务器拒绝 
     */ 
    public void seek(long start_pos) throws IOException {  
        if (start_pos == cur_pos && in_stream != null)  
            return;  
        if (httpConnection != null) {  
            httpConnection.disconnect();  
            httpConnection = null;  
        }  
        if (in_stream != null) {  
            in_stream.close();  
            in_stream = null;  
        }  
        httpConnection = (HttpURLConnection) url.openConnection();  
        httpConnection.setConnectTimeout(connect_timeout);  
        httpConnection.setReadTimeout(read_timeout);  
        String sProperty = "bytes=" + start_pos + "-";  
        httpConnection.setRequestProperty("Range", sProperty);  
        //httpConnection.setRequestProperty("Connection", "Keep-Alive");  
        int responseCode = httpConnection.getResponseCode();  
        if (responseCode < 200 || responseCode >= 300) {  
            try {  
                Thread.sleep(500);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
            throw new IOException("HTTP responseCode="+responseCode);  
        }  
 
        in_stream = httpConnection.getInputStream();  
        cur_pos = start_pos;  
    }  
 
}

 
二、IWriterCallBack接口功能:实现读/写通信。
 
      package instream;  
 
public interface IWriterCallBack {  
    public boolean tryWriting(Writer w) throws InterruptedException;  
    public void updateBuffer(int i, int len);  
    public void updateWriterCount();  
    public void terminateWriters();  
}

 
  
 
三、Writer类:下载线程,负责向buf[]写数据。
 
        /** *//**
* http://www.bt285.cn http://www.5a520.cn
*/
package instream;  
import java.io.IOException;  
import java.net.URL;  
 
public final class Writer implements Runnable {  
    private static boolean isalive = true;  
    private byte[] buf;  
    private IWriterCallBack icb;  
    protected int index;            //buf[]内"块"索引号  
    protected long start_pos;       //index对应的文件位置(相对于文件首的偏移量)  
    protected int await_count;      //用于判断:下载速度足够就退出一个"写"线程  
    private HttpReader hr;  
      
    public Writer(IWriterCallBack call_back, URL u, byte[] b, int i) {  
        hr = new HttpReader(u);  
        if(HttpReader.getContentLength() == 0)  //实例化HttpReader对象都不成功  
            return;  
        icb = call_back;  
        buf = b;  
        Thread t = new Thread(this,"dt_"+i);  
        t.setPriority(Thread.NORM_PRIORITY + 1);  
        t.start();  
    }  
      
    public void run() {  
        int write_bytes=0, write_pos=0, rema = 0, retry = 0;  
        boolean cont = true;  
        while (cont) {  
            try {  
                // 1.等待空闲块  
                if(retry == 0) {  
                    if (icb.tryWriting(this) == false)  
                        break;  
                    write_bytes = 0;  
                    rema = BuffRandAcceURL.UNIT_LENGTH;  
                    write_pos = index << BuffRandAcceURL.UNIT_LENGTH_BITS;  
                }  
                  
                // 2.定位  
                hr.seek(start_pos);  
 
                // 3.下载"一块"  
                int w;  
                while (rema > 0 && isalive) {  
                    w = (rema < 2048) ? rema : 2048; //每次读几K合适?  
                    if ((w = hr.read(buf, write_pos, w)) == -1) {  
                        cont = false;  
                        break;  
                    }  
                    rema -= w;  
                    write_pos += w;  
                    start_pos += w;  
                    write_bytes += w;  
                }  
                  
                //4.通知"读"线程  
                retry = 0;  
                icb.updateBuffer(index, write_bytes);  
            } catch (InterruptedException e) {  
                isalive = false;  
                icb.terminateWriters();  
                break;  
            } catch (IOException e) {  
                if(++retry == HttpReader.MAX_RETRY) {  
                    isalive = false;  
                    icb.terminateWriters();  
                    break;  
                }  
            }  
        }  
        icb.updateWriterCount();  
        try {  
            hr.close();  
        } catch (Exception e) {}  
        hr = null;  
        buf = null;  
        icb = null;  
    }  
 
}

 
四、IRandomAccess接口:
 
随机读取文件接口,BuffRandAcceURL类和BuffRandAcceFile类实现接口方法。BuffRandAcceFile类实现读取本地磁盘文件,这儿就不给出其源码了。
 
        package instream;  
 
public interface IRandomAccess {  
    public int read() throws Exception;  
    public int read(byte b[]) throws Exception;  
    public int read(byte b[], int off, int len) throws Exception;  
    public int dump(int src_off, byte b[], int dst_off, int len) throws Exception;  
    public void seek(long pos) throws Exception;  
    public long length();  
    public long getFilePointer();  
    public void close();  
}

 
五、BuffRandAcceURL类功能:创建下载线程;read方法从buf[]读数据。
 
关键是如何简单有效防止死锁?以下只是我的一次尝试,请指正。
 
        /** *//**
* http://www.5a520.cn  http://www.bt285.cn
*/
package instream;  
 
import java.net.URL;  
import java.net.URLDecoder;  
import decode.Header;  
import tag.MP3Tag;  
import tag.TagThread;  
 
public final class BuffRandAcceURL implements IRandomAccess, IWriterCallBack {  
    public static final int UNIT_LENGTH_BITS = 15;                  //32K  
    public static final int UNIT_LENGTH = 1 << UNIT_LENGTH_BITS;  
    public static final int BUF_LENGTH = UNIT_LENGTH << 4;            //16块  
    public static final int UNIT_COUNT = BUF_LENGTH >> UNIT_LENGTH_BITS;  
    public static final int BUF_LENGTH_MASK = (BUF_LENGTH - 1);  
    private static final int MAX_WRITER = 8;  
    private static long file_pointer;  
    private static int read_pos;  
    private static int fill_bytes;  
    private static byte[] buf;      //同时也作读写同步锁:buf.wait()/buf.notify()  
    private static int[] buf_bytes;  
    private static int buf_index;  
    private static int alloc_pos;  
    private static URL url = null;  
    private static boolean isalive = true;  
    private static int writer_count;  
    private static int await_count;  
    private long file_length;  
    private long frame_bytes;  
      
    public BuffRandAcceURL(String sURL) throws Exception {  
        this(sURL,MAX_WRITER);  
    }  
      
    public BuffRandAcceURL(String sURL, int download_threads) throws Exception {  
        buf = new byte[BUF_LENGTH];  
        buf_bytes = new int[UNIT_COUNT];  
        url = new URL(sURL);  
          
        //创建线程以异步方式解析ID3  
        new TagThread(url);  
          
        //打印当前文件名  
        try {  
            String s = URLDecoder.decode(sURL, "GBK");  
            System.out.println("start>> " + s.substring(s.lastIndexOf("/") + 1));  
            s = null;  
        } catch (Exception e) {  
            System.out.println("start>> " + sURL);  
        }  
          
        //创建"写"线程  
        for(int i = 0; i < download_threads; i++)  
            new Writer(this, url, buf, i+1);  
        frame_bytes = file_length = HttpReader.getContentLength();  
        if(file_length == 0) {  
            Header.strLastErr = "连接URL出错,重试 " + HttpReader.MAX_RETRY + " 次后放弃。";  
            throw new Exception("retry " + HttpReader.MAX_RETRY);  
        }  
        writer_count = download_threads;  
          
        //缓冲  
        try_cache();  
          
        //跳过ID3 v2  
        MP3Tag mP3Tag = new MP3Tag();  
        int v2_size = mP3Tag.checkID3V2(buf,0);  
        if (v2_size > 0) {  
            frame_bytes -= v2_size;  
            //seek(v2_size):  
            fill_bytes -= v2_size;  
            file_pointer = v2_size;  
            read_pos = v2_size;  
            read_pos &= BUF_LENGTH_MASK;  
            int units = v2_size >> UNIT_LENGTH_BITS;  
            for(int i = 0; i < units; i++) {  
                buf_bytes[i] = 0;  
                this.notifyWriter();  
            }  
            buf_bytes[units] -= v2_size;  
            this.notifyWriter();  
        }  
        mP3Tag = null;  
    }  
      
    private void try_cache() throws InterruptedException {  
        int cache_size = BUF_LENGTH;  
        if(cache_size > (int)file_length - alloc_pos)  
            cache_size = (int)file_length - alloc_pos;  
        cache_size -= UNIT_LENGTH;  
          
        //等待填写当前正在读的那"一块"缓冲区  
        /**//*if(fill_bytes >= cache_size && writer_count > 0) { 
            synchronized (buf) { 
                buf.wait(); 
            } 
            return; 
        }*/ 
          
        //等待填满缓冲区  
        while (fill_bytes < cache_size) {  
            if (writer_count == 0 || isalive == false)  
                return;  
            if(BUF_LENGTH > (int)file_length - alloc_pos)  
                cache_size = (int)file_length - alloc_pos - UNIT_LENGTH;  
            System.out.printf(" [缓冲%1$6.2f%%] ",(float)fill_bytes / cache_size * 100);  
            synchronized (buf) {  
                buf.wait();  
            }  
        }  
        System.out.printf(" ");  
    }  
      
    private int try_reading(int i, int len) throws Exception {  
        int n = (i == UNIT_COUNT - 1) ? 0 : (i + 1);  
        int r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]);  
        while (r < len) {  
            if (writer_count == 0 || isalive == false)  
                return r;  
            try_cache();  
            r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]);  
        }  
          
        return len;  
    }  
      
    /**//* 
     * 各个"写"线程互斥等待空闲块 
     */ 
    public synchronized boolean tryWriting(Writer w) throws InterruptedException {  
        await_count++;  
        while (buf_bytes[buf_index] != 0 && isalive) {  
            this.wait();  
        }  
          
        //下载速度足够就结束一个"写"线程  
        if(writer_count > 1 && w.await_count >= await_count &&  
                w.await_count >= writer_count)  
            return false;  
          
        if(alloc_pos >= file_length)  
            return false;  
        w.await_count = await_count;  
        await_count--;  
        w.start_pos = alloc_pos;  
        w.index = buf_index;  
        alloc_pos += UNIT_LENGTH;  
        buf_index = (buf_index == UNIT_COUNT - 1) ? 0 : buf_index + 1;  
        return isalive;  
    }  
      
    public void updateBuffer(int i, int len) {  
        synchronized (buf) {  
            buf_bytes[i] = len;  
            fill_bytes += len;  
            buf.notify();  
        }  
    }  
      
    public void updateWriterCount() {  
        synchronized (buf) {  
            writer_count--;  
            buf.notify();  
        }  
    }  
      
    public synchronized void notifyWriter() {  
        this.notifyAll();  
    }  
      
    public void terminateWriters() {  
        synchronized (buf) {  
            if (isalive) {  
                isalive = false;  
                Header.strLastErr = "读取文件超时。重试 " + HttpReader.MAX_RETRY  
                        + " 次后放弃,请您稍后再试。";  
            }  
            buf.notify();  
        }  
          
        notifyWriter();       
    }  
      
    public int read() throws Exception {  
        int iret = -1;  
        int i = read_pos >> UNIT_LENGTH_BITS;  
        // 1."等待"有1字节可读  
        while (buf_bytes[i] < 1) {  
            try_cache();  
            if (writer_count == 0)  
                return -1;  
        }  
        if(isalive == false)  
            return -1;  
 
        // 2.读取  
        iret = buf[read_pos] & 0xff;  
        fill_bytes--;  
        file_pointer++;  
        read_pos++;  
        read_pos &= BUF_LENGTH_MASK;  
        if (--buf_bytes[i] == 0)  
            notifyWriter();     // 3.通知  
 
        return iret;  
    }  
      
    public int read(byte b[]) throws Exception {  
        return read(b, 0, b.length);  
    }  
 
    public int read(byte[] b, int off, int len) throws Exception {  
        if(len > UNIT_LENGTH)  
            len = UNIT_LENGTH;  
        int i = read_pos >> UNIT_LENGTH_BITS;  
          
        // 1."等待"有足够内容可读  
        if(try_reading(i, len) < len || isalive == false)  
            return -1;  
 
        // 2.读取  
        int tail_len = BUF_LENGTH - read_pos; // write_pos != BUF_LENGTH  
        if (tail_len < len) {  
            System.arraycopy(buf, read_pos, b, off, tail_len);  
            System.arraycopy(buf, 0, b, off + tail_len, len - tail_len);  
        } else 
            System.arraycopy(buf, read_pos, b, off, len);  
 
        fill_bytes -= len;  
        file_pointer += len;  
        read_pos += len;  
        read_pos &= BUF_LENGTH_MASK;  
        buf_bytes[i] -= len;  
        if (buf_bytes[i] < 0) {  
            int ni = read_pos >> UNIT_LENGTH_BITS;  
            buf_bytes[ni] += buf_bytes[i];  
            buf_bytes[i] = 0;  
            notifyWriter();  
        } else if (buf_bytes[i] == 0)  
            notifyWriter();  
          
        return len;  
    }  
      
    /**//* 
     * 从src_off位置复制,不移动文件"指针" 
     */ 
    public int dump(int src_off, byte b[], int dst_off, int len) throws Exception {  
        int rpos = read_pos + src_off;  
        if(try_reading(rpos >> UNIT_LENGTH_BITS, len) < len || isalive == false)  
            return -1;  
        int tail_len = BUF_LENGTH - rpos;  
        if (tail_len < len) {  
            System.arraycopy(buf, rpos, b, dst_off, tail_len);  
            System.arraycopy(buf, 0, b, dst_off + tail_len, len - tail_len);  
        } else 
            System.arraycopy(buf, rpos, b, dst_off, len);  
        // 不发信号  
 
        return len;  
    }  
      
    public long length() {  
        return file_length;  
    }  
      
    public long getFilePointer() {  
        return file_pointer;  
    }  
 
    public void close() {  
        //  
    }  
      
    //  
    public void seek(long pos) throws Exception {  
        //  
    }  
      
}

    黑客防线网安服务器维护方案本篇连接:http://www.rongsen.com.cn/show-15338-1.html
网站维护教程更新时间:2012-04-04 22:53:16  【打印此页】  【关闭
我要申请本站N点 | 黑客防线官网 |  
专业服务器维护及网站维护手工安全搭建环境,网站安全加固服务。黑客防线网安服务器维护基地招商进行中!QQ:29769479

footer  footer  footer  footer