在DataNode的本地存储空间上,与存储服务密切相关的,比如创建数据块,恢复数据块,数据块校验等相关的代码都在org.apache.hadoop.hdfs.server.datanode.fsdataset包下(代码版本CDH5.1)

    首先说下org.apache.hadoop.hdfs.server.datanode.fsdataset下的主要接口,FsDatasetSpi接口的方法比较多,主要分三类。第一类主要是和数据块相关的,如创建rbw状态和temporary状态的block,追加block,恢复block,提交block,缓存block,打开block的输出流等。第二类主要是目录管理方面的,获取volume列表,获取block对应的volume,trash目录相关的管理等。第三类主要是FsDataset自身的健康检查,资源回收等。FsDataSetSpi对应cdh3x版本中的FsDataSet接口。

    FsVolumeSpi接口主要是用于volume管理,方法比较少,代码如下

public interface FsVolumeSpi {  //获取volume下的存储UUID  public String getStorageID();  //获取BlockPool的列表  public String[] getBlockPoolList();  //获取可用的存储空间大小  public long getAvailable() throws IOException;  //获取volume的基本路径(current目录的父目录路径)  public String getBasePath();  //获取volume的绝对路径  public String getPath(String bpid) throws IOException;  //获取block的提交目录  public File getFinalizedDir(String bpid) throws IOException;    //获取存储类型  public StorageType getStorageType();}

    RollingLogs接口用于对DataNode上的EditLog进行管理,内部包含两个重要的接口。LineIterator用于遍历EditLog的内容,Appender接口用于追加内容到EditLog。

    VolumeChoosingPolicy接口提供写入block副本的volume选择策略,实现类有两个,和接口在同一个包下,分别是RoundRobinVolumeChoosingPolicy和AvailableSpaceVolumeChoosingPolicy。

    org.apache.hadoop.hdfs.server.datanode.fsdataset.impl主要包括相关接口的实现类和一些工具类。FsDataSet将存储空间分为三个级别,LDir(cdh3x的FSDir),FsVolumeImpl(cdh3x的FsVolume)和FsVolumeList(cdh3x的FsVolumeSet)。LDir代表current目录下的子目录,FsVolumeImpl代表${dfs.datanode.data.dir}中的一项,DataNode可以有多个数据目录,FsVolumeList负责管理FsVolumeImpl对象。

    以创建数据块为入口,详细分析下代码的执行流程,在HDFS支持Append特性之前,一个block副本在DataNode的状态要么是finalized,要么是temporary。temporary状态的block会在DataNode重启过程中被删除。但支持了Append之后,HDFS必须为正在构建中的block提供更强的可持久化支持,一些temporary的block副本必须在DataNode重启过程中持久存在。所以在后续的版本中代码的变动比较大。详情参考。

    在cdh3x中,FsDataSet中的writeToBlock方法用于选择block写入的volume,并创建写入的输出流,新版本中将这个方法拆分为三个方法,分别是createRbw,createTemporary和append。详情见。rbw状态代表一个block副本刚刚被创建或者被Append,temporary则代表DataNode间block的备份和reblance。下面详细分析下客户端写入文件创建block输出流的过程。首先看一段cdh3x的代码以做对比

BlockReceiver(Block block, DataInputStream in, String inAddr,                String myAddr, boolean isRecovery, String clientName,                 DatanodeInfo srcDataNode, DataNode datanode) throws IOException {            //通过调用writeToBlock方法创建流对象      streams = datanode.data.writeToBlock(block, isRecovery,                              clientName == null || clientName.length() == 0);      this.finalized = false;      if (streams != null) {        //获取输出流对象        this.out = streams.dataOut;        //获取校验信息输出流对象        this.checksumOut = new DataOutputStream(new BufferedOutputStream(streams.checksumOut, SMALL_BUFFER_SIZE));      }}

    cdh3x中通过调用writeToBlock来创建对应的输出流,而在新代码中获取输出流对象的逻辑相对复杂一些。代码如下

      //如果是DataNode节点发起的数据块复制或者移动      if (isDatanode) {         //创建temporary状态的replica        replicaInfo = datanode.data.createTemporary(block);      } else {        //如果是客户端发起的,根据所处的阶段执行不同的操作        switch (stage) {        //pipeline启动创建        case PIPELINE_SETUP_CREATE:          //创建rbw状态的replicaInfo          replicaInfo = datanode.data.createRbw(block);          //向NameNode已经创建block replica的消息          datanode.notifyNamenodeReceivingBlock(block, replicaInfo.getStorageUuid());          break;        case PIPELINE_SETUP_STREAMING_RECOVERY:          //获取rbw恢复的replicaInfo          replicaInfo = datanode.data.recoverRbw(block, newGs, minBytesRcvd, maxBytesRcvd);          //设置block新版本号          block.setGenerationStamp(newGs);          break;        case PIPELINE_SETUP_APPEND:          //block追加操作          replicaInfo = datanode.data.append(block, newGs, minBytesRcvd);          if (datanode.blockScanner != null) {            //通过blockScanner删除旧的block            datanode.blockScanner.deleteBlock(block.getBlockPoolId(),block.getLocalBlock());          }          //设置block的新版本号          block.setGenerationStamp(newGs);          //向NameNode发送变更消息          datanode.notifyNamenodeReceivingBlock(block, replicaInfo.getStorageUuid());          break;        case PIPELINE_SETUP_APPEND_RECOVERY:          //获取append恢复的replicaInfo          replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);          if (datanode.blockScanner != null) {             //通过blockScanner删除旧的block            datanode.blockScanner.deleteBlock(block.getBlockPoolId(),                block.getLocalBlock());          }          //设置block的新版本号          block.setGenerationStamp(newGs);          //向NameNode发送变更消息          datanode.notifyNamenodeReceivingBlock(block, replicaInfo.getStorageUuid());          break;        case TRANSFER_RBW:        case TRANSFER_FINALIZED:          //DataNode之间传输创建的replicaInfo          replicaInfo = datanode.data.createTemporary(block);          break;        default: throw new IOException("Unsupported stage " + stage +               " while receiving block " + block + " from " + inAddr);        }      }      this.dropCacheBehindWrites = (cachingStrategy.getDropBehind() == null) ?        datanode.getDnConf().dropCacheBehindWrites :          cachingStrategy.getDropBehind();      this.syncBehindWrites = datanode.getDnConf().syncBehindWrites;            final boolean isCreate = isDatanode || isTransfer           || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;      //创建输出流对象      streams = replicaInfo.createStreams(isCreate, requestedChecksum);      //获取校验相关的信息      this.clientChecksum = requestedChecksum;      this.diskChecksum = streams.getChecksum();      this.needsChecksumTranslation = !clientChecksum.equals(diskChecksum);      this.bytesPerChecksum = diskChecksum.getBytesPerChecksum();      this.checksumSize = diskChecksum.getChecksumSize();      //获取数据输出流      this.out = streams.getDataOut();

    由于支持了Append操作,block副本所处的状态更多了,代码实现上也更加复杂。下面详细分析下FsDatasetImpl类下的createRbw方法

public synchronized ReplicaInPipeline createRbw(ExtendedBlock b)      throws IOException {    //从volumeMap中获取replicaInfo,因为是新创建的,如果在volumeMap存在,则不能继续创建    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),         b.getBlockId());    if (replicaInfo != null) {      throw new ReplicaAlreadyExistsException("Block " + b +      " already exists in state " + replicaInfo.getState() +      " and thus cannot be created.");    }    //根据相应的规则获取要写入的volume    FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());    //在得到的volume下创建rbw文件    File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());    //创建ReplicaBeingWritten对象,此对象继承自ReplicaInfo    ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),         b.getGenerationStamp(), v, f.getParentFile());    //将新创建的replicaInfo加入到volumeMap中    volumeMap.add(b.getBlockPoolId(), newReplicaInfo);    return newReplicaInfo;  }

    在选择写入的volume的时候,用户可以根据dfs.datanode.fsdataset.volume.choosing.policy属性来选择使用何种策略来选择volume。默认提供两种策略,分别是根据volume可用空间和轮询的方法来选择volume,对应的实现类分别是AvailableSpaceVolumeChoosingPolicy和RoundRobinVolumeChoosingPolicy,它们都继承自VolumeChoosingPolicy接口,用户也可以根据自己的需求来自定义选择策略。