apache的Hadoop项目提供一类api可以通过java工程操作hdfs中的文件,包括:文件打开,读写,删除等、目录的创建,删除,读取目录中所有文件等.
/**
*
*/
package org.jrs.wlh;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
* @PutMeger.java
* java操作hdfs 往 hdfs中上传数据
* @version $Revision$/br
* @version $Revision$/br
* @version $Revision$/br
* update: $Date$
public class PutMeger {
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
FileSystem fileS= FileSystem.get(conf);
FileSystem localFile = FileSystem.getLocal(conf); //得到一个本地的FileSystem对象
Path input = new Path(str[0]); //设定文件输入保存路径
Path out = new Path(str[1]); //文件到hdfs输出路径
try{
FileStatus[] inputFile = localFile.listStatus(input); //listStatus得到输入文件路径的文件列表
FSDataOutputStream outStream = fileS.create(out); //创建输出流
for (int i = 0; i inputFile.length; i++) {
System.out.println(inputFile[i].getPath().getName());
FSDataInputStream in = localFile.open(inputFile[i].getPath());
int bytesRead = 0;
while((bytesRead = in.read(buffer))0){ //按照字节读取数据
System.out.println(buffer);
outStream.write(buffer,0,bytesRead);
}
in.close();
}catch(Exception e){
e.printStackTrace();
//流读入和写入
InputStream in=null;
//获取HDFS的conf
//读取HDFS上的文件系统
FileSystem hdfs=FileSystem.get(conf);
//使用缓冲流,进行按行读取的功能
BufferedReader buff=null;
//获取日志文件的根目录
FileStatus stats[]=hdfs.listStatus(listf);
//自定义j,方便查看插入信息
int j=0;
for(int i = 0; i stats.length; i++){
//获取子目录下的文件路径
FileStatus temp[]=hdfs.listStatus(new Path(stats[i].getPath().toString()));
for(int k = 0; k temp.length;k++){
System.out.println("文件路径名:"+temp[k].getPath().toString());
//获取Path
Path p=new Path(temp[k].getPath().toString());
//打开文件流
in=hdfs.open(p);
//BufferedReader包装一个流
buff=new BufferedReader(new InputStreamReader(in));
String str=null;
while((str=buff.readLine())!=null){
System.out.println(str);
buff.close();
存入HDFS有好几种数据格式,我这里给你列出一种格式的存储,sequence的
public?class?SeqWrite?{
public?static?void?main(String[]?args)?throws?IOException,?Exception?{
Configuration?configuration?=?new?Configuration();
//这里是你主机的地址
//这个是存储的路径
Path?path?=?new?Path("/tmp/test1.seq");
Option?option?=?SequenceFile.Writer.file(path);
Option?optKey?=?SequenceFile.Writer.keyClass(IntWritable.class);
Option?optValue?=?SequenceFile.Writer.valueClass(Text.class);
SequenceFile.Writer?writer?=?null;
IntWritable?key?=?new?IntWritable(10);
Text?value?=?new?Text();
writer?=?SequenceFile.createWriter(configuration,?option,?optKey,?optValue);
for?(int?i?=?0;?i?data.length;?i++)?{
key.set(i);
value.set(data[i]);
writer.append(key,?value);
writer.hsync();
Thread.sleep(10000L);
IOUtils.closeStream(writer);
以上就是土嘎嘎小编为大家整理的java代码操作hdfs相关主题介绍,如果您觉得小编更新的文章只要能对粉丝们有用,就是我们最大的鼓励和动力,不要忘记讲本站分享给您身边的朋友哦!!