HDFS 工具类
package com.hdfs;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.URI;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Logger;
/**
* HDFS文件操作相关服务 <br/>
* 由于在storm中会出现java.io.NotSerializableException异常,顾该类实现序列化 <br/>
* 作者 : 师成 <br/>
* 版本 : 1.0 <br/>
* 创建时间 : 2013年05月20日 下午04:27:56 <br/>
* 联系作者 : <a href="mailTo:shichengoooo@gmail.com">师成</a>
*/
public class HDFSService implements Serializable{
private static final long serialVersionUID = 3471158538744178633L;
private static final Logger logger = Logger.getLogger(HDFSDataService.class);
private static final String DEFAULT_HDFS_URL = "hdfs://127.0.0.1:9000";// 默认HDFS地址
private static String HDFS_PATH = null;
private static Configuration conf;//声明Configuration
private static FileSystem fs;//获取文件系统
private static HDFSDataService instance;//创建类对象
private HDFSDataService() {}
/**
* 单例创建该类,使其线程安全(暂不使用双重加锁机制)
* 该操作已经创建服务器连接
*/
public synchronized static HDFSDataService getInstance() {
if (instance == null) {
instance = new HDFSDataService();
}
return instance;
}
/**
* 初始化各个属性
*/
static {
try {
HDFS_PATH = CommonParam.getValue("hdfs.path");//此处读取配置文件hdfs.path信息
if (StringUtils.isEmpty(HDFS_PATH)) {
HDFS_PATH = DEFAULT_HDFS_URL;// 如果HDFS地址为空时,则使用系统默认的路径
}
conf = new Configuration();
// conf.set("fs.defaultFS", HDFS_PATH);//此处配置后,程序中不需要出现HDFS路径,可直接操作文件
fs = FileSystem.get(conf);
} catch (Exception ex) {
logger.error(ex);
}
}
/**
* HDFS上传文件
* @param sFilePath 文件路径(全路径如: /home/1.txt)
* @param sHdfsPath 格式为hdfs://ip:port/destination
* @param sFileName 上传文件名称
* @param iType 该处暂时保留意为上传数据或者追加数据使用类型判断
* @return 0|1
* 0文件存放成功,1文件存放失败:当localFile|hdfsPath任一地址为空时,文件即存放失败
* java.lang.IllegalArgumentException: Wrong FS: hdfs://192.168.10.36/, expected: file:///
* 出现以上异常时,hadoop需要把集群上的core-site.xml和hdfs-site.xml放到当前工程下
*/
public int uploadFile(String sFilePath,String sHdfsPath){
int iResult = 0;
try {
if (!StringUtils.isEmpty(sFilePath) && !StringUtils.isEmpty(sHdfsPath)) {
InputStream in = new BufferedInputStream(new FileInputStream(sFilePath));
OutputStream out = fs.create(new Path(sHdfsPath));
IOUtils.copyBytes(in, out, conf, true);
iResult = 1;
}
} catch (Exception ex) {
logger.error(ex);
}
return iResult;
}
/**
* HDFS上传文件
* @param sFilePath 文件路径(全路径如: /home/)
* @param sHdfsPath 格式为hdfs://ip:port/destination
* @param sFileName 上传文件名称
* @return 0|1
* 0文件存放成功,1文件存放失败:当localFile|hdfsPath任一地址为空时,文件即存放失败
* java.lang.IllegalArgumentException: Wrong FS: hdfs://192.168.10.36/, expected: file:///
* 出现以上异常时,hadoop需要把集群上的core-site.xml和hdfs-site.xml放到当前工程下
*/
public int uploadFile(String sFilePath,String sHdfsPath, String sFileName){
int iResult = 0;
try {
if (!StringUtils.isEmpty(sFilePath) && !StringUtils.isEmpty(sHdfsPath)) {
InputStream in = new BufferedInputStream(new FileInputStream(sFilePath));
// OutputStream out = fs.create(new Path(sHdfsPath + sFileName), new Progressable() {
// public void progress() {
// System.out.print("."); //该处用于显示进度
// }
// });
OutputStream out = fs.create(new Path(sHdfsPath + sFileName));
IOUtils.copyBytes(in, out, conf, true);
iResult = 1;
}
} catch (Exception ex) {
logger.error(ex);
}
return iResult;
}
/**
* HDFS文件追加
* @param sFilePath 服务器已存在文件
* @param sAppendFilePath 追加的文件
* @return 0|1 只有该文件存在服务器时文件追加,文件不存在时,则创建新文件之服务器
* 0文件追加成功,1文件追加失败,创建新文件至服务器
* 当localFile|hdfsPath任一地址为空时,文件即追加失败
* TODO:HDFS设计之处并不支持给文件追加内容
*/
public int appendFile(String sFilePath,String sAppendFilePath) {
int iResult = 0;
try {
if (!StringUtils.isEmpty(sFilePath) && !StringUtils.isEmpty(sAppendFilePath)) {
if (fs.exists(new Path(sFilePath))) {
fs = FileSystem.get(URI.create(sFilePath), conf);//取得原有文件
InputStream in = new BufferedInputStream(new FileInputStream(sAppendFilePath));//生成追加的文件流
OutputStream out = fs.append(new Path(sFilePath));
IOUtils.copyBytes(in, out, 4096, true);
} else {
uploadFile(sFilePath, sAppendFilePath);
}
}
} catch (Exception ex) {
logger.error(ex);
}
return iResult;
}
/**
* 下载文件
* @param sHdfsPath 服务器文件地址
* @param sDownPath 文件下载本地路径
*/
public void downFile(String sHdfsPath, String sDownPath) {
try {
if (!StringUtils.isEmpty(sHdfsPath) && !StringUtils.isEmpty(sDownPath)) {
if (fs.exists(new Path(sHdfsPath))) {
InputStream inputStream = fs.open(new Path(sHdfsPath));
OutputStream outputStream = new FileOutputStream(sDownPath);
IOUtils.copyBytes(inputStream, outputStream, conf, true);
}
}
} catch (Exception ex) {
logger.error(ex);
}
}
/**
* 删除文件
* @param sDelPath 文件路径
* @return 0|1 0文件删除成功,1文件删除失败
* 该操作直接传递文件夹时,则直接删除该文件夹
*/
public int deleteFile(String sDelPath) {
int iResult = 0;
try {
if (!StringUtils.isEmpty(sDelPath)) {
if (fs.exists(new Path(sDelPath))) {
fs.delete(new Path(sDelPath), true);
iResult = 1;
}
}
} catch (Exception ex) {
logger.error(ex);
}
return iResult;
}
/**
* 多文件删除
* @param sDelPath 删除文件的路径
* @param files 删除的文件
* @return 受影响的个数
*/
public int deleteManyFile(String sDelPath, Object... files) {
int iResult = 0;
try {
if (!StringUtils.isEmpty(sDelPath) && files != null) {
if (fs.exists(new Path(sDelPath))) {
if (!sDelPath.substring(sDelPath.length() - 1, sDelPath.length()).equals(File.separator)) {
sDelPath = sDelPath + File.separatorChar;
}
for (Object obj : files) {
String sFileName = String.valueOf(obj);
if (sFileName.startsWith(File.separator)) {
sFileName = sFileName.substring(1, sFileName.length());
}
iResult += deleteFile(sDelPath + sFileName);
}
}
}
} catch (Exception ex) {
logger.error(ex);
}
return iResult;
}
/**
* 创建文件夹
* @param sFloderName 文件夹名称
* @return 0 | 1
*/
public int mkdirFloder(String sFloderName){
int iResult = 0;
try {
if (!StringUtils.isEmpty(sFloderName)) {
if (!fs.exists(new Path(sFloderName))) {
fs.mkdirs(new Path(sFloderName));
iResult = 1;
}
}
} catch (Exception ex) {
logger.error(ex);
}
return iResult;
}
/**
* 获取图片数据流
* @param sFilePath 图片HDFS文件路径
* @return 图片数据流
*/
public FSDataInputStream getImageStream(String sFilePath) {
FSDataInputStream dataInputStream = null;
try {
if (!StringUtils.isEmpty(sFilePath)) {
Path path = new Path(sFilePath);
dataInputStream = fs.open(path);
}
} catch (Exception ex) {
logger.error(ex);
}
return dataInputStream;
}
}
服务器文件显示类(该处使用数据流形式进行显示数据)
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.fs.FSDataInputStream;
import com.hdfs.HDFSService;
@WebServlet("/ImageServlet")
public class ImageServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
public ImageServlet() {}
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
doPost(request, response);
}
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
response.reset();
PrintWriter out = response.getWriter();
response.setContentType("image/jpeg");
java.util.Date date = new java.util.Date();
HDFSDataService dataService = HDFSDataService.getInstance();
FSDataInputStream inputStream = dataService.getImageStream("hdfs://master:9000/picture/1.jpg");
OutputStream os = response.getOutputStream();
byte[] buffer = new byte[400];
int length = 0;
while ((length = inputStream.read(buffer)) > 0) {
os.write(buffer, 0, length);
}
os.flush();
os.close();
inputStream.close();
}
}