1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406
| import io
import json
import mimetypes
import os
from contextlib import nullcontext
from pathlib import Path
import boto3
from boto3.s3.transfer import TransferConfig
from botocore.exceptions import ClientError
from src.common.components.file_system.fs_abstract_client import AbstractFileSystemClient, FileDescriptor, FileJSON, \
FileInterfaced, FileIOClass, FullFileName, FileBytes
class FileSystemClientByAmazonS3(AbstractFileSystemClient):
def __init__(self, amazon_s3_client: "AmazonS3Client"):
self.__amazon_s3_client = amazon_s3_client
def create_path(self, file_descriptor: FileDescriptor):
self.__amazon_s3_client.create_bucket(
bucket=file_descriptor.root)
def find_one(self, file_descriptor: FileDescriptor) -> FileDescriptor:
prefix = FileDescriptor(file_descriptor.root, file_descriptor.directory,
file_descriptor.filename.replace('.*', '.'))
keys = self.__amazon_s3_client.list_objects_by_prefix_and_suffix(
bucket=file_descriptor.root,
prefix=self.__key_join(prefix))
if keys.__len__() == 1:
key = keys[0]
return FileDescriptor(
file_descriptor.root,
file_descriptor.directory,
key[file_descriptor.directory.__len__() + 1:]
)
return None
def file_exists(self, file_descriptor: FileDescriptor) -> bool:
file_exists = self.__amazon_s3_client.object_exists(
bucket=file_descriptor.root,
key=self.__key_join(file_descriptor))
return file_exists
def read_json(self, file_descriptor: FileDescriptor, allow_not_exists: bool = False) -> FileJSON:
key = self.__key_join(file_descriptor)
file_exists, content = self.__amazon_s3_client.get_object_if_exists(
bucket=file_descriptor.root,
key=key)
if not file_exists and not allow_not_exists:
raise FileNotFoundError(key)
return FileJSON(file_descriptor, content, file_exists)
def put_json(self, file_json: FileJSON) -> FullFileName:
content = json.dumps(file_json.content_json)
key = self.__key_join(file_json.file_descriptor)
self.__amazon_s3_client.write_object(
bucket=file_json.file_descriptor.root,
key=key,
obj=content)
return key
def read_bytes(self, file_descriptor: FileDescriptor, allow_not_exists: bool = False) -> FileBytes:
key = self.__key_join(file_descriptor)
try:
return FileBytes(
file_descriptor,
self.__amazon_s3_client.download_stream(
bucket=file_descriptor.root,
key=key),
True)
except ClientError as e:
if allow_not_exists:
return FileBytes(file_descriptor, b"", False)
else:
raise FileNotFoundError(key)
def put_bytes(self, file_bytes: FileBytes) -> FullFileName:
key = self.__key_join(file_bytes.file_descriptor)
self.__amazon_s3_client.upload_stream(
bucket=file_bytes.file_descriptor.root,
key=key,
stream=file_bytes.content)
return key
def put_interfaced_file(self, file_interfaced: FileInterfaced) -> FullFileName:
key = self.__key_join(file_interfaced.file_descriptor)
self.__amazon_s3_client.upload_stream(
bucket=file_interfaced.file_descriptor.root,
key=key,
stream=file_interfaced.as_readable().to_bytes())
return key
def remove(self, file_descriptor: FileDescriptor):
self.__amazon_s3_client.delete_object(
bucket=file_descriptor.root,
key=self.__key_join(file_descriptor))
def file_lock(self, file_descriptor: FileDescriptor):
return nullcontext()
@staticmethod
def __key_join(file_descriptor: FileDescriptor):
if file_descriptor.directory != "":
return f'{file_descriptor.directory}/{file_descriptor.filename}'
else:
return file_descriptor.filename
def is_same_file(self, file_descriptor_one: FileDescriptor, file_descriptor_two: FileDescriptor):
if file_descriptor_one.root != '' and file_descriptor_two.root != '':
if file_descriptor_one.root != file_descriptor_two.root:
return False
fn1 = self.__key_join(file_descriptor_one)
fn2 = self.__key_join(file_descriptor_two)
return fn1 == fn2
class AmazonS3Client:
# add any mime type that is not automatically detected
mimetypes.add_type('application/json', '.json')
def __init__(self, s3_client=None,
aws_region: str = "", aws_endpoint: str = "",
aws_access_key_id: str = "", aws_access_key_secret: str = ""):
self.config = TransferConfig(multipart_threshold=1024 * 5,
max_concurrency=10,
multipart_chunksize=1024 * 2,
use_threads=True)
# clients are generally thread-safe
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/clients.html#multithreading-or-multiprocessing-with-clients
# Le debuggeur PyCharm ne supporte pas que l'on instancie un boto3.client dans un thread
# il se bloque, il faut le stopper et il continue son execution, c'est douteux
# Utilisez AmazonS3Client comme un objet global partagé
if s3_client is not None:
self.s3_client = s3_client
else:
self.s3_client = boto3.client("s3", region_name=aws_region,
endpoint_url=aws_endpoint,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_access_key_secret
)
def create_bucket(self, **kwargs):
"""
Create a bucket. If the bucket does not exist, it creates it.
Bucket name takes the bucket named parameter. If it exists, it is not created
@param kwargs:
bucket = is the bucket identifier
"""
bucket = kwargs.get('bucket')
try:
if not self.bucket_exists(bucket=bucket):
self.s3_client.create_bucket(Bucket=bucket)
except ClientError as ex:
raise
def write_metadata_to_bucket(self, **kwargs):
# todo to check if we really need this for s3
"""
Create a bucket. If the bucket does not exist, it creates it.
Bucket name takes the bucket named parameter. If it exists, it is not created
@param kwargs:
bucket = is the bucket identifier
"""
bucket = kwargs.get('bucket')
metadata = kwargs.get('metadata')
try:
if not self.bucket_exists(bucket=bucket):
self.s3_client.create_bucket(Bucket=bucket)
except ClientError as ex:
raise
def write_object(self, **kwargs):
"""
Uploads objects to bucket.
@param kwargs:
bucket = is the bucket identifier
key = key index
object
"""
bucket = kwargs.get('bucket')
key = kwargs.get('key')
data = kwargs.get('obj')
file_mime_type, _ = mimetypes.guess_type(key)
if file_mime_type is None:
file_mime_type = 'application/json'
try:
self.s3_client.put_object(Body=data,
Bucket=bucket,
Key=key,
ContentType=file_mime_type)
except ClientError as ex:
raise
def delete_object(self, **kwargs):
bucket = kwargs.get('bucket')
key = kwargs.get('key')
try:
self.s3_client.delete_object(Bucket=bucket, Key=key)
except ClientError as ex:
raise
def create_objects(self, **kwargs):
"""
Uploads objects to bucket. If the bucket does not exist, it creates it.
Bucket name takes the bucket name. If it exists, it is not created
@param kwargs:
bucket = is the bucket identifier
path = path to files (folder) ex: /data
"""
bucket = kwargs.get('bucket')
key = kwargs.get('key')
path = kwargs.get('path')
try:
for subdir, dirs, files in os.walk(path):
for file in files:
full_path = os.path.join(subdir, file)
self.s3_client.upload_file(Filename=full_path, Bucket=bucket,
Key=f'{key}/{file}',
Config=self.config)
except ClientError as ex:
raise
def upload_stream(self, bucket: str, key: str, stream):
"""
Uploads objects to bucket. If the object does not exist, it creates it.
@param stream the object to be uploaded
@param bucket = is the bucket identifier (root bucket)
@param key = is the key identifier aws s3
"""
try:
content = io.BytesIO(stream)
self.s3_client.upload_fileobj(Fileobj=content, Bucket=bucket,
Key=key,
Config=self.config)
except ClientError as ex:
raise
def delete_objects(self, **kwargs):
tenant_id = kwargs.get('tenant_id')
mail_id = kwargs.get('mail_id')
if tenant_id and mail_id:
file_list = self.list_objects(tenant_id=tenant_id, mail_id=mail_id, prefix="")
try:
for file in file_list:
self.s3_client.delete_object(Bucket=tenant_id, Key=file)
except ClientError as ex:
raise
def download_objects(self, **kwargs):
"""
This method downloads all the files contained in a mail_id folder
@param kwargs:
tenant_id = is the tenant_id identifier
mail_id = is the folder containing all attachments for that email
local_path = is the local path where the files will be stored
"""
tenant_id = kwargs.get('tenant_id')
mail_id = kwargs.get('mail_id')
prefix = kwargs.get('prefix')
local_path = kwargs.get('local_path')
try:
found_object_list = self.list_objects(tenant_id=tenant_id, mail_id=mail_id, prefix=prefix)
if len(found_object_list):
for file in found_object_list:
file_path = Path(local_path, file)
file_path.parent.mkdir(parents=True, exist_ok=True)
self.s3_client.download_file(tenant_id, file,
str(file_path), Config=self.config)
except ClientError as ex:
raise
def list_objects(self, **kwargs) -> list:
"""
Uploads objects to bucket. If the bucket does not exist, it creates it.
Bucket name takes the tenant_id name. If it exists, it is not created.
S3 list_objects_v2 can list at max 1000 files in one go.
Paginator is useful when you have more than 1000s of files in S3.
@param kwargs:
tenant_id = is the tenant_id identifier
prefix = prefix to search for ex: 2023/02/12
@return:
List of files, with prefix
"""
obj_list = []
bucket_name = kwargs.get('bucket')
prefix = kwargs.get('prefix')
key = kwargs.get('key')
try:
if bucket_name:
paginator = self.s3_client.get_paginator("list_objects_v2")
response = paginator.paginate(Bucket=bucket_name, PaginationConfig={"PageSize": 1000}, Prefix=prefix)
for page in response:
files = page.get("Contents")
if files:
for file_obj in files:
if key and key in file_obj['Key']:
obj_list.append(file_obj['Key'])
else:
obj_list.append(file_obj['Key'])
return obj_list
return obj_list
except ClientError as ex:
return []
def object_exists(self, **kwargs):
bucket = kwargs.get('bucket')
key = kwargs.get('key')
try:
self.s3_client.get_object(Bucket=bucket, Key=key)
return True
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
return False
else:
raise
def get_object_if_exists(self, **kwargs):
bucket = kwargs.get('bucket')
key = kwargs.get('key')
file_mime_type, _ = mimetypes.guess_type(key)
if file_mime_type is None:
file_mime_type = 'application/json'
try:
obj = self.s3_client.get_object(Bucket=bucket, Key=key)
if file_mime_type == 'application/json':
obj = json.loads(obj.get('Body').read().decode('utf-8'))
return True, obj
return True, obj
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
# normal behaviour, we return false since the object was not found
return False, None
else:
raise
def download_stream(self, bucket: str, key: str) -> bytes:
"""
Download objects to bucket
:return: stream the object to be downloaded
:param bucket: is the bucket identifier (root bucket)
:param key: is the key identifier aws s3
"""
try:
content = io.BytesIO()
self.s3_client.download_fileobj(Fileobj=content, Bucket=bucket, Key=key, Config=self.config)
content.seek(0)
stream: bytes = content.read()
return stream
except ClientError as ex:
raise
def read_object(self, **kwargs):
"""
Reads objects from bucket.
@param kwargs:
bucket = is the bucket identifier
key = key index
object
"""
bucket = kwargs.get('bucket')
key = kwargs.get('key')
try:
obj = self.s3_client.get_object(Bucket=bucket, Key=key).get('Body').read().decode('utf-8')
return json.loads(obj)
except ClientError as ex:
raise
def bucket_exists(self, **kwargs) -> bool:
bucket = kwargs.get('bucket')
if bucket:
for obj in self.s3_client.list_buckets().get('Buckets'):
if obj['Name'] == bucket:
return True
return False
def list_objects_by_prefix_and_suffix(self, **kwargs):
"""
Generate the keys in an S3 bucket.
:param bucket: Name of the S3 bucket.
:param prefix: Only fetch keys that start with this prefix (optional).
:param suffix: Only fetch keys that end with this suffix (optional).
"""
bucket = kwargs.get('bucket')
prefix = kwargs.get('prefix', '')
suffix = kwargs.get('suffix', '')
keys = []
paginator = self.s3_client.get_paginator("list_objects_v2")
response = paginator.paginate(Bucket=bucket, PaginationConfig={"PageSize": 1000}, Prefix=prefix)
for page in response:
objects = page.get("Contents")
if objects:
for obj in objects:
key = obj['Key']
if key.startswith(prefix) and key.endswith(suffix):
keys.append(key)
return keys |
Partager