public class PictureTransferJob extends QuartzJobBean { protected void executeInternal(JobExecutionContext arg0) throws JobExecutionException { //实际的FTP配置是读取配置文件获取的 //FTP地址 String hostName =""; //FTP端口 int port = 2001; /FTP账号 String userName = "test1"; //ftp密码 String password = "test1"; //ftp文件存储目录 String ftpDowload = "/"; //文件本地存储路径 String path = this.getClass().getResource("/").getPath(); //图片地址文件存储目录 String addrPath=path.substring(1, path.indexOf("WEB-INF/classes"))+"picAddr"; //实际下载的图片存储目录 String picPath=path.substring(1, path.indexOf("WEB-INF/classes"))+"pic"; addrPath = addrPath.replace("%20"," "); picPath = picPath.replace("%20"," "); try { //创建存储图片地址的文件 creatFile(addrPath); //创建存储实际图片的文件 creatFile(picPath); String oldAddrPath = addrPath; String oldPicPath = picPath; //创建FTP连接 FtpUtil2 ftpUtil2 = new FtpUtil2(hostName, port,userName, password, ftpDowload, true); //遍历FTP目录下的文件 String[] files = ftpUtil2.ListAllFiles(); //本地数据库会有一个表记录下载过的文件,这里会查询数据库和ftp列出的文件名比较,如果已经下载过的文件就不会下载,避免重复下载。 //下面省略比较的过程,循环files数组,在本地创建文件 for(int i=0;i服务器存储文件的地址,addrPath是本地存储文件的地址 //这里一个返回状态判断文件是否下载成功 boolean downloadInvestorFlag = ftpUtil2.downloadFile(ftpDowload, addrPath); //文件下载成功后调读取文件的方法,将需要下载的图片地址存入容器 boolean entityState = setPictureDetail(addrPath,picPath,fileNameDate); } } catch (Exception e) { e.printStackTrace(); //调记录错误日志的业务类用于发送下载文件出错的短信 } } //这里开始读图片地址 private boolean setPictureDetail(String addrPath,String picPath,String synDate) { System.out.println("----------进入setPictureDetail方法-----------"); BufferedReader br = null; try { br=new BufferedReader(new InputStreamReader(new FileInputStream(addrPath),"UTF-8")); String row; int count=0; //map中存储每行读取到的图片名称和URL地址 Map addrMap=new HashMap (); while ((row=br.readLine())!=null) { try { count++; if (count==1) { continue; } String[] column = row.split("\\|\\|", -1); addrMap.put(column[0].trim(), column[1].trim()); } catch (Exception e) { e.printStackTrace(); } } System.out.println(new Date()); //这里调用压缩方法,压缩方法中会调用执行下载图片的方法 zipPic(picPath,synDate,addrMap); System.out.println(new Date()); System.out.println("----------完成--------------"); return true; } catch (Exception e) { e.printStackTrace(); //调用记录错误日志的业务类 return false; }finally { try { if (null != br) br.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 根据url地址下载图片 * @throws IOException */ private boolean downPic(String picPath,List > addrList,List picList)throws IOException{ InputStream is=null; FileOutputStream fos=null; URL url=null; String fileName=null; String picAddr=null; File pic=null; try { for(Map.Entry addrEntry:addrList) { fileName=addrEntry.getKey(); picAddr=addrEntry.getValue(); //创建Url对象 url=new URL(picAddr); is=url.openStream(); //URLConnection获取到的流通过InputStream直接写入字节数组会缺失数据,导致下载的图片不完整,使用org.apache.commons.io.IOUtils.toByteArray(urlconnection.openstream())可以解决 byte[] bytes=IOUtils.toByteArray(is);//new byte[is.available()];获取的字节 //流中数据读入字节数组,读入后,流中数据清空 pic=new File(picPath+fileName+".jpg"); fos=new FileOutputStream(pic); fos.write(bytes); //将下载的图片存入List,待图片全部下载完成后传入zip方法进行压缩 picList.add(pic); fos.flush(); fos.close(); is.close(); } return true; } catch (Exception e) { e.printStackTrace(); return false; } finally{ if (null!=fos) { fos.close(); } if (null!=is) { is.close(); } } } //这里是压缩文件的伪代码 private void zipPic(picPath,synDate,addrMap);{ //传入需要压缩的文件列表和压缩文件名 ZipUtil.zipByStream(picList,new File(picPath+synDate+".zip")); } /** * 创建文件 * @param path */ private void creatFile(String path) { File file = new File(path); if(!file.exists()) { file.mkdirs(); } } }
/** * 将下载的图片按天压缩 * @throws IOException */ private boolean zipPic(String picPath,String synDate,MapaddrMap) throws IOException{ //这里由于是多线程存储图片流,所以需要使用线程安全的map,因此使用ConcurrentHashMap Map pictureList=new ConcurrentHashMap (); //这里定义每个线程下载的图片个数 int count=400; //存储需要下载的图片地址 List > addrList=new ArrayList >(addrMap.entrySet()); //线程数,加一是因为要创建一个线程下载最后不足400个的图片 int nThreads=(addrList.size()/count)+1; //CountDownLatch countDownLatch = new CountDownLatch(nThreads); try { boolean downPic=false; //执行多线程下载图片 downPic=downPic(picPath,addrList,picList,pictureList,nThreads,count); if (downPic) { ZipUtil.zipByArray(picList,new File(picPath+synDate+".zip")); } return true; } catch (Exception e) { e.printStackTrace(); return false; } }
/** * 根据url地址下载图片 * @throws InterruptedException */ private boolean downPic(String picPath,List> addrList,Map picList,Map pictureList,int nThreads,int count)throws IOException, InterruptedException{ ExecutorService threadPool=Executors.newFixedThreadPool(nThreads); // 创建两个个计数器 CountDownLatch begin=new CountDownLatch(0); CountDownLatch end=new CountDownLatch(nThreads); // 循环创建线程 for (int i = 0; i < nThreads; i++) { List >subAddrList=null; // 计算每个线程执行的数据 if ((i + 1) == nThreads) { int startIndex = (i * count); int endIndex = addrList.size(); subAddrList = addrList.subList(startIndex, endIndex); } else { int startIndex = (i * count); int endIndex = (i + 1) * count; subAddrList = addrList.subList(startIndex, endIndex); } // 线程类 PicDownload mythead = new PicDownload(picPath,subAddrList,picList,pictureList); // 这里执行线程的方式是调用线程池里的threadPool.execute(mythead)方法。 try { threadPool.execute(mythead); } catch (Exception e) { //记录错误日志 return false; } } begin.countDown(); end.await(); // 执行完关闭线程池 threadPool.shutdown(); //这里一定要循环直到线程池中所有线程都结束才能往下走,测试时由于没有这一步导致子线程下载图片还没完成,而主线程已经往下走了,导致压缩包内没有图片 //也可以使用CountDownLatch实现 /*while (true) { if (threadPool.isTerminated()) { System.out.println("所有子线程已结束!"); break; } }*/ return true; }
class PicDownload implements Runnable{ //下载图片的地址列表 List> addrList; //装载下载成功的图片列表 Map picList; Map pictureList; //图片本地存储路径 String picPath; CountDownLatch begin,end; public PicDownload(String picPath,List > addrList,Map picList,CountDownLatch begin,CountDownLatch end){ this.addrList=addrList; this.picList=picList; this.picPath=picPath; this.begin=begin; this.end=end; } @Override public void run() { try { System.out.println(Thread.currentThread().getName()+"------"+Thread.currentThread().getId()); downPicture(addrList); //System.out.println(countDownLatch.getCount()); begin.await(); } catch (Exception e) { e.printStackTrace(); }finally{ end.countDown(); //countDownLatch.countDown(); } } public boolean downPicture(List > addrList) throws Exception{ InputStream is=null; FileOutputStream fos=null; URL url=null; String fileName=null; String picAddr=null; File pic=null; try { for(Map.Entry addrEntry:addrList) { fileName=addrEntry.getKey(); picAddr=addrEntry.getValue(); //创建Url对象 url=new URL(picAddr); is=url.openStream(); //URLConnection获取到的流通过InputStream直接写入字节数组会缺失数据,导致下载的图片不完整,使用org.apache.commons.io.IOUtils.toByteArray(urlconnection.openstream())可以解决 //byte[] bytes=IOUtils.toByteArray(is);//new byte[is.available()];获取的字节 //流中数据读入字节数组,读入后,流中数据清空 picList.put(fileName+".jpg", is); //这时候由于没有把流写入文件,一定不能关闭流,否则流中的数据就会丢失 //is.close(); } return true; } catch (Exception e) { e.printStackTrace(); return false; } finally{ //不能关闭流 /*if (null!=is) { is.close(); }*/ } } }
上面使用流来压缩遇到了另一个问题,在压缩文件时会出现java.net.SocketException:Connection reset
/** *使用容器存储下载的图片字节数组 */ public boolean downPicture(List> addrList) throws Exception{ InputStream is=null; FileOutputStream fos=null; URL url=null; String fileName=null; String picAddr=null; File pic=null; try { for(Map.Entry addrEntry:addrList) { fileName=addrEntry.getKey(); picAddr=addrEntry.getValue(); //创建Url对象 url=new URL(picAddr); //打开连接,创建java.net.URLConnection对象,该对象没有关闭连接的方法,可以转为它的子类HttpURLConnection调用disconnect方法关闭连接。 //java.net.URLConnection和java.net.HttpURLConnection都有设置超时时间的方法关闭连接 //HttpURLConnection uc=(HttpURLConnection)url.openConnection(); is=uc.getInputStream(); //URLConnection获取到的流通过InputStream直接写入字节数组会缺失数据,导致下载的图片不完整,使用org.apache.commons.io.IOUtils.toByteArray(urlconnection.openstream())可以解决 byte[] bytes=IOUtils.toByteArray(is);//new byte[is.available()];获取的字节 //流中数据读入字节数组,读入后,流中数据清空 //is.read(bytes); picList.put(fileName+".jpg",bytes); is.close(); } return true; } catch (Exception e) { e.printStackTrace(); return false; } finally{ if (null!=is) { is.close(); } } }
import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPClientConfig; import org.apache.commons.net.ftp.FTPConnectionClosedException; import org.apache.commons.net.ftp.FTPReply; public class FtpUtil2 { private FTPClient ftpClient = null; // ftp服务器地址 private String hostName; // ftp服务器默认端口 public static int defaultport = 21; // 登录名 private String userName; // 登录密码 private String password; // 需要访问的远程目录 private String remoteDir; /** * @param hostName * 主机地址 * @param port * 端口号 * @param userName * 用户名 * @param password * 密码 * @param remoteDir * 默认工作目录 * @param is_zhTimeZone * 是否是中文FTP Server端 * @return * @return */ /** * 新增方法 */ public FtpUtil2() { PropConfig config = PropConfig.loadConfig("system.properties"); String hostName = config.getConfig("ftpAddress"); String port = config.getConfig("ftpPort"); String userName = config.getConfig("ftpUserName"); String password = config.getConfig("ftpPassword"); String remoteDir = config.getConfig("remoteFilePath"); boolean is_zhTimeZone= true; this.hostName = hostName; this.userName = userName; this.password = password; this.remoteDir = remoteDir == null ? "" : remoteDir; this.ftpClient = new FTPClient(); if (is_zhTimeZone) { this.ftpClient.configure(FtpUtil2.Config()); this.ftpClient.setControlEncoding("GBK"); } // 登录 this.login(); // 切换目录 this.changeDir(this.remoteDir); this.setFileType(FTPClient.BINARY_FILE_TYPE); ftpClient.setDefaultPort(Integer.parseInt(port)); } public FtpUtil2(String hostName, int port, String userName, String password, String remoteDir, boolean is_zhTimeZone) { this.hostName = hostName; this.userName = userName; this.password = password; defaultport=port; this.remoteDir = remoteDir == null ? "" : remoteDir; this.ftpClient = new FTPClient(); if (is_zhTimeZone) { this.ftpClient.configure(FtpUtil2.Config()); this.ftpClient.setControlEncoding("GBK"); } // 登录 this.login(); // 切换目录 this.changeDir(this.remoteDir); this.setFileType(FTPClient.ASCII_FILE_TYPE); ftpClient.setDefaultPort(port); } /** * 登录FTP服务器 */ public boolean login() { boolean success = false; try { ftpClient.connect(this.hostName,defaultport); ftpClient.login(this.userName, this.password); int reply; reply = ftpClient.getReplyCode(); if (!FTPReply.isPositiveCompletion(reply)) { ftpClient.disconnect(); return success; } } catch (FTPConnectionClosedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } success = true; System.out.println("连接到ftp服务器:" + this.hostName + " 成功..开始登录"); return success; } private static FTPClientConfig Config() { FTPClientConfig conf = new FTPClientConfig(FTPClientConfig.SYST_UNIX); conf.setRecentDateFormatStr("MM月dd日 HH:mm"); // conf.setRecentDateFormatStr("(YYYY年)?MM月dd日( HH:mm)?"); return conf; } /** * 变更工作目录 * * @param remoteDir * */ public void changeDir(String remoteDir) { try { this.remoteDir = remoteDir; ftpClient.changeWorkingDirectory(remoteDir); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("变更工作目录为:" + remoteDir); } /** * 返回上一级目录(父目录) */ public void toParentDir() { try { ftpClient.changeToParentDirectory(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 列出当前工作目录下所有文件 */ public String[] ListAllFiles() { String[] names = this.ListFiles("*"); return this.sort(names); } /** * 列出指定工作目录下的匹配文件 * * @param dir * exp: /cim/ * @param file_regEx * 通配符为* */ public String[] ListAllFiles(String dir, String file_regEx) { String[] names = this.ListFiles(dir + file_regEx); return this.sort(names); } /** * 列出匹配文件 * * @param file_regEx * 匹配字符,通配符为* */ public String[] ListFiles(String file_regEx) { try { /** * FTPFile[] remoteFiles = ftpClient.listFiles(file_regEx); * //System.out.println(remoteFiles.length); String[] name = new * String[remoteFiles.length]; if(remoteFiles != null) { for(int * i=0;i下面是ZIP工具类:
import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.Enumeration; import java.util.List; import java.util.Map; import java.util.zip.ZipEntry; import java.util.zip.ZipFile; import java.util.zip.ZipOutputStream; import org.apache.commons.io.IOUtils; import com.ibatis.common.logging.Log; import com.ibatis.common.logging.LogFactory; public class ZipUtil { private static final Log log = LogFactory.getLog(ZipUtil.class); /** * 压缩文件 * * @param srcfile File[] 需要压缩的文件列表 * @param zipfile File 压缩后的文件 */ public static OutputStream zipFiles(Listsrcfile, OutputStream outputStream) { byte[] buf = new byte[1024]; try { // Create the ZIP file ZipOutputStream out = new ZipOutputStream(outputStream); // Compress the files for (int i = 0; i < srcfile.size(); i++) { File file = srcfile.get(i); FileInputStream in = new FileInputStream(file); // Add ZIP entry to output stream. out.putNextEntry(new ZipEntry(file.getName())); // Transfer bytes from the file to the ZIP file int len; while ((len = in.read(buf)) > 0) { //System.out.println(len+"=============="); out.write(buf, 0, len); } // Complete the entry out.closeEntry(); in.close(); } // Complete the ZIP file out.close(); } catch (IOException e) { log.error("ZipUtil zipFiles exception:"+e); } return outputStream; } /** * 压缩文件 * * @param srcfile File[] 需要压缩的文件列表 * @param zipfile File 压缩后的文件 */ public static void zipFiles(List srcfile, File zipfile) { byte[] buf = new byte[1024]; try { // Create the ZIP file ZipOutputStream out = new ZipOutputStream(new FileOutputStream(zipfile)); // Compress the files for (int i = 0; i < srcfile.size(); i++) { File file = srcfile.get(i); FileInputStream in = new FileInputStream(file); // Add ZIP entry to output stream. out.putNextEntry(new ZipEntry(file.getName())); // Transfer bytes from the file to the ZIP file int len; while ((len = in.read(buf)) > 0) { out.write(buf, 0, len); } // Complete the entry out.closeEntry(); in.close(); } // Complete the ZIP file out.close(); } catch (IOException e) { log.error("ZipUtil zipFiles exception:"+e); } } /** * 压缩文件 * srcfile:key:文件名,value:文件对应的输入流 * @param srcfile * @param zipfile * @see */ public static void zipByStream(Map srcfile, File zipfile) { try { // Create the ZIP file ZipOutputStream out = new ZipOutputStream(new FileOutputStream(zipfile)); // Compress the files System.out.println(srcfile.entrySet().size()); for (Map.Entry fileEntry:srcfile.entrySet()) { InputStream in = fileEntry.getValue(); // Add ZIP entry to output stream. System.out.println(in.available()); out.putNextEntry(new ZipEntry(fileEntry.getKey())); // Transfer bytes from the file to the ZIP file byte[] bytes=IOUtils.toByteArray(in); out.write(bytes); out.closeEntry(); in.close(); } // Complete the ZIP file out.close(); } catch (IOException e) { log.error("ZipUtil zipFiles exception:"+e); System.out.println(e.getMessage()); } } /** * 压缩文件 * srcfile:key:文件名,value:文件对应的字节数组 * @param srcfile * @param zipfile * @see */ public static void zipByArray(Map srcfile, File zipfile) { byte[] buf = new byte[1024]; try { // Create the ZIP file ZipOutputStream out = new ZipOutputStream(new FileOutputStream(zipfile)); // Compress the files System.out.println(srcfile.entrySet().size()); for (Map.Entry fileEntry:srcfile.entrySet()) { //InputStream in = fileEntry.getValue(); // Add ZIP entry to output stream. out.putNextEntry(new ZipEntry(fileEntry.getKey())); // Transfer bytes from the file to the ZIP file byte[] bytes=fileEntry.getValue();//IOUtils.toByteArray(in); out.write(bytes); out.closeEntry(); //in.close(); } // Complete the ZIP file out.close(); } catch (IOException e) { log.error("ZipUtil zipFiles exception:"+e); System.out.println(e.getMessage()); } } /** * 解压缩 * * @param zipfile File 需要解压缩的文件 * @param descDir String 解压后的目标目录 */ public static void unZipFiles(File zipfile, String descDir) { try { // Open the ZIP file ZipFile zf = new ZipFile(zipfile); for (Enumeration entries = zf.entries(); entries.hasMoreElements();) { // Get the entry name ZipEntry entry = ((ZipEntry) entries.nextElement()); String zipEntryName = entry.getName(); InputStream in = zf.getInputStream(entry); // System.out.println(zipEntryName); OutputStream out = new FileOutputStream(descDir + zipEntryName); byte[] buf1 = new byte[1024]; int len; while ((len = in.read(buf1)) > 0) { out.write(buf1, 0, len); } // Close the file and stream in.close(); out.close(); } } catch (IOException e) { log.error("ZipUtil unZipFiles exception:"+e); } } /** * Main * * @param args */ public static void main(String[] args) { List srcfile=new ArrayList (); srcfile.add(new File("d:\\1.jpg")); srcfile.add(new File("d:\\2.jpg")); srcfile.add(new File("d:\\3.jpg")); srcfile.add(new File("d:\\4.jpg")); File zipfile = new File("d:\\pic.zip"); ZipUtil.zipFiles(srcfile, zipfile); } }